Skip to content

Commit

Permalink
Merge pull request #186 from allenai/soldni/tok_improve
Browse files Browse the repository at this point in the history
Added falcon dataset
  • Loading branch information
soldni committed May 29, 2023
2 parents 348ed33 + 53e243b commit 3478cb0
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 30 deletions.
45 changes: 15 additions & 30 deletions pretrain_data/falcon/download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import datetime
import json
import multiprocessing
Expand All @@ -18,6 +17,7 @@
NUM_BYTES = 2_766_953_721_769
DOWNLOAD_SIZE = 466_888_198_663
N_SHARDS = 5_534
PARTITIONS = 500


def convert_timestamp(d: datetime.datetime) -> str:
Expand All @@ -31,28 +31,28 @@ def process_single(
source_path: str,
destination_path: str,
queue: "Queue[Union[None, Tuple[int, ...]]]",
hf_access_token: str
hf_access_token: str,
):
disable_progress_bar()

dataset_name, shard_id_str = source_path.rsplit('/', 1)
dataset = load_dataset(dataset_name, split='train', streaming=True, use_auth_token=hf_access_token)
dataset_name, shard_id_str = source_path.rsplit("/", 1)
dataset = load_dataset(dataset_name, split="train", streaming=True, use_auth_token=hf_access_token)
added = datetime.datetime.now()

shard_id = int(shard_id_str)
num_shards = int(dataset.n_shards) # pyright: ignore
num_shards = PARTITIONS
num_examples = int(NUM_EXAMPLES)
shard_start = round(shard_id * num_examples / num_shards)
shard_end = round((shard_id + 1) * num_examples / num_shards)

if shard_start > 0:
dataset = dataset.skip(shard_start) # pyright: ignore
dataset = dataset.skip(shard_start) # pyright: ignore

doc_cnt = 0

with ExitStack() as stack:
dst_file = stack.enter_context(open_file_for_write(destination_path, 'wb'))
dst_stream = stack.enter_context(compress_stream(dst_file, 'wt'))
dst_file = stack.enter_context(open_file_for_write(destination_path, "wb"))
dst_stream = stack.enter_context(compress_stream(dst_file, "wt"))

for i, row in enumerate(dataset):
if (i := i + shard_start) >= shard_end:
Expand All @@ -62,7 +62,7 @@ def process_single(
"text": row["content"],
"id": md5(row["url"].encode("utf-8")).hexdigest(),
"version": "v0",
"source": dataset_name.strip('/'),
"source": dataset_name.strip("/"),
"added": convert_timestamp(added),
"created": convert_timestamp(row["timestamp"]),
"metadata": {
Expand All @@ -71,7 +71,7 @@ def process_single(
"segment": row["segment"],
"image_urls": row["image_urls"],
"split": "train",
"pos": i
"pos": i,
},
}
dst_stream.write(json.dumps(data) + "\n")
Expand Down Expand Up @@ -108,36 +108,21 @@ def increment_progressbar(self, queue, /, files = 0, documents = 0): # we use

def _get_all_paths(self) -> Tuple[List[MultiPath], List[MultiPath], List[MultiPath]]:
"""Get all paths to process using prefixes provided"""
hf_access_token = os.environ.get("HF_ACCESS_TOKEN")

dataset = load_dataset(
self.source_prefix.as_str, split='train', streaming=True, use_auth_token=hf_access_token
)

all_src = [
MultiPath.parse(f'{self.source_prefix.as_str}/{i}')
for i in range(dataset.n_shards) # pyright: ignore
]
all_dst = [
MultiPath.parse(f'{self.destination_prefix}/{i}.jsonl.gz')
for i in range(dataset.n_shards) # pyright: ignore
]
all_meta = [
MultiPath.parse(f'{self.metadata_prefix}/{i}.meta')
for i in range(dataset.n_shards) # pyright: ignore
]
all_src = [MultiPath.parse(f"{self.source_prefix.as_str}/{i}") for i in range(PARTITIONS)]
all_dst = [MultiPath.parse(f"{self.destination_prefix}/{i}.jsonl.gz") for i in range(PARTITIONS)]
all_meta = [MultiPath.parse(f"{self.metadata_prefix}/{i}.meta") for i in range(PARTITIONS)]

return all_src, all_dst, all_meta


if __name__ == "__main__":
ap = ArgumentParser()
ap.add_argument("-s", "--source-prefix", type=str, default="tiiuae/falcon-refinedweb")
ap.add_argument('-p', '--parallel', type=int, default=1)
ap.add_argument("-p", "--parallel", type=int, default=1)
opts = ap.parse_args()

HF_ACCESS_TOKEN = os.environ.get("HF_ACCESS_TOKEN", None)
multiprocessing.set_start_method('spawn')
multiprocessing.set_start_method("spawn")
dl = FalconDownloader(
source_prefix=opts.source_prefix,
destination_prefix="s3://ai2-llm/pretraining-data/sources/falcon-refinedweb/v0/documents",
Expand Down
24 changes: 24 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/abl-cc-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v2-small/documents/*"
],
"work_dir": {
"input": "/data2/abl-cc-v2-dedup/deduper/input",
"output": "/data2/abl-cc-v2-dedup/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
26 changes: 26 additions & 0 deletions pretrain_data/mixer/config/ablations/mixers/abl-cc-v1-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "abl-cc-v1-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v1-small/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v1-dedup/documents",
"max_size_in_bytes": 4294967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/abl-cc-v1-dedup/mixer/input",
"output": "/data2/abl-cc-v1-dedup/mixer/output"
},
"processes": 120
}
26 changes: 26 additions & 0 deletions pretrain_data/mixer/config/ablations/mixers/abl-cc-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "abl-cc-v2-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v2-small/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v2-dedup/documents",
"max_size_in_bytes": 4294967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/abl-cc-v2-dedup/mixer/input",
"output": "/data2/abl-cc-v2-dedup/mixer/output"
},
"processes": 120
}
66 changes: 66 additions & 0 deletions scripts/split_gz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Split gzipped files into smaller gzipped files.
Author: @soldni
"""

import concurrent.futures
import gzip
import os
from contextlib import ExitStack

import click

MAX_SIZE_4_GB = 4 * 1024 * 1024 * 1024


@click.command()
@click.option("--input_dir", required=True, help="Path to input directory containing gzip files")
@click.option("--input_ext", default=".gz", help="Extension of the input files")
@click.option("--output_dir", required=True, help="Path to output directory for the split files")
@click.option("--output_ext", default=".gz", help="Extension of the output files")
@click.option("--size_limit", default=MAX_SIZE_4_GB, help="Size limit for each split file in bytes")
def main(input_dir: str, input_ext: str, output_dir: str, output_ext: str, size_limit: int):
os.makedirs(output_dir, exist_ok=True)

def split_gzip_file(input_file, output_base, size_limit=size_limit, output_ext=output_ext):
print(f"Splitting {input_file} into {output_base} with size limit {size_limit:,}")
with ExitStack() as stack, gzip.open(input_file, "rt") as f:
count = 0
path = f"{output_base}_{count:04d}{output_ext}"
output = stack.enter_context(gzip.open(path, "wt"))
current_size = 0
for line in f:
line_size = len(line)
if current_size + line_size > size_limit:
stack.pop_all().close()
count += 1
print(f"Wrote {path}")
path = f"{output_base}_{count:04d}{output_ext}"
output = stack.enter_context(gzip.open(path, "wt"))
current_size = 0
output.write(str(line))
current_size += line_size
print(f"Wrote {path}")
stack.pop_all().close()
print(f"Finished splitting {input_file} into {count + 1:,} files")

def process_file(file_name, input_ext=input_ext, input_dir=input_dir, output_dir=output_dir):
input_file = os.path.join(input_dir, file_name)
if file_name.endswith(input_ext) and os.path.isfile(input_file):
base_name = file_name.rstrip(input_ext)
output_base = os.path.join(output_dir, base_name)
split_gzip_file(input_file, output_base)

files_to_process = [
file_name
for file_name in os.listdir(input_dir)
if file_name.endswith(input_ext) and os.path.isfile(os.path.join(input_dir, file_name))
]

with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(process_file, files_to_process)


if __name__ == "__main__":
main()

0 comments on commit 3478cb0

Please sign in to comment.