Skip to content

Commit

Permalink
Merge pull request #167 from allenai/v1-small-config
Browse files Browse the repository at this point in the history
v1-small config
  • Loading branch information
rodneykinney committed May 23, 2023
2 parents 391091c + cd28636 commit 9da0e4b
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pretrain_data/common_crawl/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Post-process of v0. Drop non-English documents. Deduplicate whole documents by U

~4.8T tokens. High/Med/Low quality split: 20%/25%/55%

**v1-small** is an 8.5% sample of `v1`, about 300B tokens.

### v2

Post-process of v1. Remove duplicate paragraphs across the entire corpus
Expand Down
1 change: 1 addition & 0 deletions pretrain_data/mixer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ setup-test-data:
aws s3 cp tests/data/documents.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/documents/head/0000.json.gz
aws s3 cp tests/data/pii-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/pii/head/0000.json.gz
aws s3 cp tests/data/toxicity-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/toxicity/head/0000.json.gz
aws s3 cp tests/data/sample-attributes.json.gz s3://ai2-llm/pretraining-data/tests/mixer/inputs/v0/attributes/sample/head/0000.json.gz

debug:
cargo build
Expand Down
26 changes: 26 additions & 0 deletions pretrain_data/mixer/config/v1-small.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "v1_small",
"documents": [
"pretraining-data/sources/common-crawl/v1/documents/cc_en_head/*",
"pretraining-data/sources/common-crawl/v1/documents/cc_en_middle/*",
"pretraining-data/sources/common-crawl/v1/documents/cc_en_tail/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/v1-small/documents",
"max_size_in_bytes": 42949672960
},
"attributes": ["sample"],
"filter": {
"include": ["$.attributes[?(@.sample__random_number_v1__random[0][2] < 0.085)]"],
"exclude": []
}
}
],
"work_dir": {
"input": "/data1/work/input",
"output": "/data2/work/output"
},
"processes": 128
}
2 changes: 1 addition & 1 deletion pretrain_data/mixer/src/bin/deduper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use deduper_config::*;

fn main() {
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "info")
env::set_var("RUST_LOG", "ai2_pretraining=info,deduper=info");
}
env_logger::init();

Expand Down
27 changes: 26 additions & 1 deletion pretrain_data/mixer/src/bin/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use mixer_config::*;

fn main() {
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "info")
env::set_var("RUST_LOG", "ai2_pretraining=info,mixer=info");
}
env_logger::init();
let args: Vec<String> = env::args().collect();
Expand Down Expand Up @@ -169,4 +169,29 @@ mod test {
local_output_file);
Ok(())
}

#[test]
fn test_filter_by_span() -> Result<(), io::Error> {
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", "info")
}
env_logger::init();

let config = MixerConfig::read_from_file("tests/config/filter-by-spans.json")?;
run(config);

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build().unwrap();
let s3_client = s3_util::new_client()?;

let local_output_file = "tests/work/output/filter-by-spans.json.gz";
rt.block_on(download_to_file(&s3_client, "ai2-llm",
"pretraining-data/tests/mixer/outputs/v1/documents/head/filter-by-spans-test-0000.json.gz",
Path::new(local_output_file)))?;

compare_contents("tests/data/expected/filter-by-spans.json.gz",
local_output_file);
Ok(())
}
}
26 changes: 18 additions & 8 deletions pretrain_data/mixer/src/s3_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::Path;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{Client as S3Client};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::ProvideErrorMetadata;
use tokio::fs::{File as TokioFile};


Expand All @@ -19,7 +20,9 @@ pub async fn download_to_file(
.key(key)
.send()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
.map_err(|e|
io::Error::new(io::ErrorKind::Other, format!("Error downloading {}: {}", key, e.message().unwrap_or_default()))
)?;

std::fs::create_dir_all(path.parent().unwrap())?;
let mut file = TokioFile::create(path).await?;
Expand All @@ -42,7 +45,9 @@ pub async fn upload_file(
.body(ByteStream::from_path(path).await?)
.send()
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
.map_err(|e|
io::Error::new(io::ErrorKind::Other, format!("Error uploading {}: {}", key, e.message().unwrap_or_default()))
)?;

Ok(())
}
Expand Down Expand Up @@ -70,11 +75,16 @@ pub fn find_objects_matching_patterns(s3_client: &S3Client, patterns: &Vec<Strin

let mut stream_inputs: Vec<String> = Vec::new();
for pattern in patterns.iter() {
let index = pattern.chars().position(|c| c == '*').unwrap();
let prefix = pattern[..index].to_string();
let mut suffix: Option<String> = None;
if index < pattern.len() - 1 {
suffix = Some(pattern[index + 2..].to_string());
let start_size = stream_inputs.len();
let mut prefix = pattern.clone();
let mut suffix: Option<String> = Some("".to_owned());
let maybe_index = pattern.chars().position(|c| c == '*');
if let Some(index) = maybe_index {
prefix = pattern[..index].to_string();
suffix = None;
if index < pattern.len() - 1 {
suffix = Some(pattern[index + 2..].to_string());
}
}
let mut has_more = true;
let mut token: Option<String> = None;
Expand Down Expand Up @@ -107,7 +117,7 @@ pub fn find_objects_matching_patterns(s3_client: &S3Client, patterns: &Vec<Strin
token = resp.next_continuation_token().map(String::from);
has_more = token.is_some();
}
log::info!("Found {} objects for pattern \"{}\"", stream_inputs.len(), pattern);
log::info!("Found {} objects for pattern \"{}\"", stream_inputs.len() - start_size, pattern);
}
stream_inputs.sort();
Ok(stream_inputs)
Expand Down
1 change: 1 addition & 0 deletions pretrain_data/mixer/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl Shard {
shard_inputs.push(inputs_with_sizes[0].0.clone());
for (input, size) in inputs_with_sizes[1..].iter() {
if *size == 0 {
log::warn!("Skipping input {}. Could not determine size", input.doc_path);
continue;
}
shard_size += size;
Expand Down
26 changes: 26 additions & 0 deletions pretrain_data/mixer/tests/config/filter-by-spans.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "filter-by-spans-test",
"documents": [
"pretraining-data/tests/mixer/inputs/v0/documents/*/0000.json.gz"
],
"output": {
"path": "pretraining-data/tests/mixer/outputs/v1/documents/head",
"max_size_in_bytes": 100000
},
"attributes": [
"sample"
],
"filter": {
"include": ["$.attributes[?(@.sample__random_number_v1__random[0][2] < 0.5)]"],
"exclude": []
}
}
],
"work_dir": {
"input": "tests/work/filter-by-spans/input",
"output": "tests/work/filter-by-spans/output"
},
"processes": 1
}
Binary file not shown.
Binary file not shown.

0 comments on commit 9da0e4b

Please sign in to comment.