diff --git a/pretrain_data/common_crawl/NOTES.md b/pretrain_data/common_crawl/NOTES.md index b3264ca6e..49c8495cc 100644 --- a/pretrain_data/common_crawl/NOTES.md +++ b/pretrain_data/common_crawl/NOTES.md @@ -12,6 +12,8 @@ Post-process of v0. Drop non-English documents. Deduplicate whole documents by U ~4.8T tokens. High/Med/Low quality split: 20%/25%/55% +**v1-small** is an 8.5% sample of `v1`, about 300B tokens. + ### v2 Post-process of v1. Remove duplicate paragraphs across the entire corpus diff --git a/pretrain_data/mixer/Makefile b/pretrain_data/mixer/Makefile index e37a1c64e..ca27e51dd 100644 --- a/pretrain_data/mixer/Makefile +++ b/pretrain_data/mixer/Makefile @@ -12,6 +12,7 @@ setup-test-data: aws s3 cp tests/data/documents.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/documents/head/0000.json.gz aws s3 cp tests/data/pii-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/pii/head/0000.json.gz aws s3 cp tests/data/toxicity-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/toxicity/head/0000.json.gz + aws s3 cp tests/data/sample-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/sample/head/0000.json.gz debug: cargo build diff --git a/pretrain_data/mixer/config/v1-small.json b/pretrain_data/mixer/config/v1-small.json new file mode 100644 index 000000000..ed8f80b44 --- /dev/null +++ b/pretrain_data/mixer/config/v1-small.json @@ -0,0 +1,26 @@ +{ + "streams": [ + { + "name": "v1_small", + "documents": [ + "pretraining-data/sources/common-crawl/v1/documents/cc_en_head/*", + "pretraining-data/sources/common-crawl/v1/documents/cc_en_middle/*", + "pretraining-data/sources/common-crawl/v1/documents/cc_en_tail/*" + ], + "output": { + "path": "pretraining-data/sources/common-crawl/v1-small/documents", + "max_size_in_bytes": 42949672960 + }, + "attributes": ["sample"], + "filter": { + "include": ["$.attributes[?(@.sample__random_number_v1__random[0][2] < 0.085)]"], + "exclude": [] + } + } + ], + "work_dir": { + "input": "/data1/work/input", + "output": "/data2/work/output" + }, + "processes": 128 +} diff --git a/pretrain_data/mixer/src/bin/deduper.rs b/pretrain_data/mixer/src/bin/deduper.rs index 673085766..8911b978b 100644 --- a/pretrain_data/mixer/src/bin/deduper.rs +++ b/pretrain_data/mixer/src/bin/deduper.rs @@ -22,7 +22,7 @@ use deduper_config::*; fn main() { if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", "info") + env::set_var("RUST_LOG", "ai2_pretraining=info,deduper=info"); } env_logger::init(); diff --git a/pretrain_data/mixer/src/bin/mixer.rs b/pretrain_data/mixer/src/bin/mixer.rs index c571c5614..23837904a 100644 --- a/pretrain_data/mixer/src/bin/mixer.rs +++ b/pretrain_data/mixer/src/bin/mixer.rs @@ -13,7 +13,7 @@ use mixer_config::*; fn main() { if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", "info") + env::set_var("RUST_LOG", "ai2_pretraining=info,mixer=info"); } env_logger::init(); let args: Vec = env::args().collect(); @@ -169,4 +169,29 @@ mod test { local_output_file); Ok(()) } + + #[test] + fn test_filter_by_span() -> Result<(), io::Error> { + if env::var("RUST_LOG").is_err() { + env::set_var("RUST_LOG", "info") + } + env_logger::init(); + + let config = MixerConfig::read_from_file("tests/config/filter-by-spans.json")?; + run(config); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build().unwrap(); + let s3_client = s3_util::new_client()?; + + let local_output_file = "tests/work/output/filter-by-spans.json.gz"; + rt.block_on(download_to_file(&s3_client, "ai2-llm", + "pretraining-data/tests/mixer/outputs/v1/documents/head/filter-by-spans-test-0000.json.gz", + Path::new(local_output_file)))?; + + compare_contents("tests/data/expected/filter-by-spans.json.gz", + local_output_file); + Ok(()) + } } \ No newline at end of file diff --git a/pretrain_data/mixer/src/s3_util.rs b/pretrain_data/mixer/src/s3_util.rs index 8553532d5..c354018f9 100644 --- a/pretrain_data/mixer/src/s3_util.rs +++ b/pretrain_data/mixer/src/s3_util.rs @@ -4,6 +4,7 @@ use std::path::Path; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::{Client as S3Client}; use aws_sdk_s3::config::Region; +use aws_sdk_s3::error::ProvideErrorMetadata; use tokio::fs::{File as TokioFile}; @@ -19,7 +20,9 @@ pub async fn download_to_file( .key(key) .send() .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + .map_err(|e| + io::Error::new(io::ErrorKind::Other, format!("Error downloading {}: {}", key, e.message().unwrap_or_default())) + )?; std::fs::create_dir_all(path.parent().unwrap())?; let mut file = TokioFile::create(path).await?; @@ -42,7 +45,9 @@ pub async fn upload_file( .body(ByteStream::from_path(path).await?) .send() .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + .map_err(|e| + io::Error::new(io::ErrorKind::Other, format!("Error uploading {}: {}", key, e.message().unwrap_or_default())) + )?; Ok(()) } @@ -70,11 +75,16 @@ pub fn find_objects_matching_patterns(s3_client: &S3Client, patterns: &Vec = Vec::new(); for pattern in patterns.iter() { - let index = pattern.chars().position(|c| c == '*').unwrap(); - let prefix = pattern[..index].to_string(); - let mut suffix: Option = None; - if index < pattern.len() - 1 { - suffix = Some(pattern[index + 2..].to_string()); + let start_size = stream_inputs.len(); + let mut prefix = pattern.clone(); + let mut suffix: Option = Some("".to_owned()); + let maybe_index = pattern.chars().position(|c| c == '*'); + if let Some(index) = maybe_index { + prefix = pattern[..index].to_string(); + suffix = None; + if index < pattern.len() - 1 { + suffix = Some(pattern[index + 2..].to_string()); + } } let mut has_more = true; let mut token: Option = None; @@ -107,7 +117,7 @@ pub fn find_objects_matching_patterns(s3_client: &S3Client, patterns: &Vec