Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added falcon dataset #186

Merged
merged 5 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 15 additions & 30 deletions pretrain_data/falcon/download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import datetime
import json
import multiprocessing
Expand All @@ -18,6 +17,7 @@
NUM_BYTES = 2_766_953_721_769
DOWNLOAD_SIZE = 466_888_198_663
N_SHARDS = 5_534
PARTITIONS = 500


def convert_timestamp(d: datetime.datetime) -> str:
Expand All @@ -31,28 +31,28 @@ def process_single(
source_path: str,
destination_path: str,
queue: "Queue[Union[None, Tuple[int, ...]]]",
hf_access_token: str
hf_access_token: str,
):
disable_progress_bar()

dataset_name, shard_id_str = source_path.rsplit('/', 1)
dataset = load_dataset(dataset_name, split='train', streaming=True, use_auth_token=hf_access_token)
dataset_name, shard_id_str = source_path.rsplit("/", 1)
dataset = load_dataset(dataset_name, split="train", streaming=True, use_auth_token=hf_access_token)
added = datetime.datetime.now()

shard_id = int(shard_id_str)
num_shards = int(dataset.n_shards) # pyright: ignore
num_shards = PARTITIONS
num_examples = int(NUM_EXAMPLES)
shard_start = round(shard_id * num_examples / num_shards)
shard_end = round((shard_id + 1) * num_examples / num_shards)

if shard_start > 0:
dataset = dataset.skip(shard_start) # pyright: ignore
dataset = dataset.skip(shard_start) # pyright: ignore

doc_cnt = 0

with ExitStack() as stack:
dst_file = stack.enter_context(open_file_for_write(destination_path, 'wb'))
dst_stream = stack.enter_context(compress_stream(dst_file, 'wt'))
dst_file = stack.enter_context(open_file_for_write(destination_path, "wb"))
dst_stream = stack.enter_context(compress_stream(dst_file, "wt"))

for i, row in enumerate(dataset):
if (i := i + shard_start) >= shard_end:
Expand All @@ -62,7 +62,7 @@ def process_single(
"text": row["content"],
"id": md5(row["url"].encode("utf-8")).hexdigest(),
"version": "v0",
"source": dataset_name.strip('/'),
"source": dataset_name.strip("/"),
"added": convert_timestamp(added),
"created": convert_timestamp(row["timestamp"]),
"metadata": {
Expand All @@ -71,7 +71,7 @@ def process_single(
"segment": row["segment"],
"image_urls": row["image_urls"],
"split": "train",
"pos": i
"pos": i,
},
}
dst_stream.write(json.dumps(data) + "\n")
Expand Down Expand Up @@ -108,36 +108,21 @@ def increment_progressbar(self, queue, /, files = 0, documents = 0): # we use

def _get_all_paths(self) -> Tuple[List[MultiPath], List[MultiPath], List[MultiPath]]:
"""Get all paths to process using prefixes provided"""
hf_access_token = os.environ.get("HF_ACCESS_TOKEN")

dataset = load_dataset(
self.source_prefix.as_str, split='train', streaming=True, use_auth_token=hf_access_token
)

all_src = [
MultiPath.parse(f'{self.source_prefix.as_str}/{i}')
for i in range(dataset.n_shards) # pyright: ignore
]
all_dst = [
MultiPath.parse(f'{self.destination_prefix}/{i}.jsonl.gz')
for i in range(dataset.n_shards) # pyright: ignore
]
all_meta = [
MultiPath.parse(f'{self.metadata_prefix}/{i}.meta')
for i in range(dataset.n_shards) # pyright: ignore
]
all_src = [MultiPath.parse(f"{self.source_prefix.as_str}/{i}") for i in range(PARTITIONS)]
all_dst = [MultiPath.parse(f"{self.destination_prefix}/{i}.jsonl.gz") for i in range(PARTITIONS)]
all_meta = [MultiPath.parse(f"{self.metadata_prefix}/{i}.meta") for i in range(PARTITIONS)]

return all_src, all_dst, all_meta


if __name__ == "__main__":
ap = ArgumentParser()
ap.add_argument("-s", "--source-prefix", type=str, default="tiiuae/falcon-refinedweb")
ap.add_argument('-p', '--parallel', type=int, default=1)
ap.add_argument("-p", "--parallel", type=int, default=1)
opts = ap.parse_args()

HF_ACCESS_TOKEN = os.environ.get("HF_ACCESS_TOKEN", None)
multiprocessing.set_start_method('spawn')
multiprocessing.set_start_method("spawn")
dl = FalconDownloader(
source_prefix=opts.source_prefix,
destination_prefix="s3://ai2-llm/pretraining-data/sources/falcon-refinedweb/v0/documents",
Expand Down
24 changes: 24 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/abl-cc-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v2-small/documents/*"
],
"work_dir": {
"input": "/data2/abl-cc-v2-dedup/deduper/input",
"output": "/data2/abl-cc-v2-dedup/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
26 changes: 26 additions & 0 deletions pretrain_data/mixer/config/ablations/mixers/abl-cc-v1-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "abl-cc-v1-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v1-small/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v1-dedup/documents",
"max_size_in_bytes": 4294967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/abl-cc-v1-dedup/mixer/input",
"output": "/data2/abl-cc-v1-dedup/mixer/output"
},
"processes": 120
}
26 changes: 26 additions & 0 deletions pretrain_data/mixer/config/ablations/mixers/abl-cc-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "abl-cc-v2-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v2-small/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v2-dedup/documents",
"max_size_in_bytes": 4294967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/abl-cc-v2-dedup/mixer/input",
"output": "/data2/abl-cc-v2-dedup/mixer/output"
},
"processes": 120
}
66 changes: 66 additions & 0 deletions scripts/split_gz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Split gzipped files into smaller gzipped files.

Author: @soldni
"""

import concurrent.futures
import gzip
import os
from contextlib import ExitStack

import click

MAX_SIZE_4_GB = 4 * 1024 * 1024 * 1024


@click.command()
@click.option("--input_dir", required=True, help="Path to input directory containing gzip files")
@click.option("--input_ext", default=".gz", help="Extension of the input files")
@click.option("--output_dir", required=True, help="Path to output directory for the split files")
@click.option("--output_ext", default=".gz", help="Extension of the output files")
@click.option("--size_limit", default=MAX_SIZE_4_GB, help="Size limit for each split file in bytes")
def main(input_dir: str, input_ext: str, output_dir: str, output_ext: str, size_limit: int):
os.makedirs(output_dir, exist_ok=True)

def split_gzip_file(input_file, output_base, size_limit=size_limit, output_ext=output_ext):
print(f"Splitting {input_file} into {output_base} with size limit {size_limit:,}")
with ExitStack() as stack, gzip.open(input_file, "rt") as f:
count = 0
path = f"{output_base}_{count:04d}{output_ext}"
output = stack.enter_context(gzip.open(path, "wt"))
current_size = 0
for line in f:
line_size = len(line)
if current_size + line_size > size_limit:
stack.pop_all().close()
count += 1
print(f"Wrote {path}")
path = f"{output_base}_{count:04d}{output_ext}"
output = stack.enter_context(gzip.open(path, "wt"))
current_size = 0
output.write(str(line))
current_size += line_size
print(f"Wrote {path}")
stack.pop_all().close()
print(f"Finished splitting {input_file} into {count + 1:,} files")

def process_file(file_name, input_ext=input_ext, input_dir=input_dir, output_dir=output_dir):
input_file = os.path.join(input_dir, file_name)
if file_name.endswith(input_ext) and os.path.isfile(input_file):
base_name = file_name.rstrip(input_ext)
output_base = os.path.join(output_dir, base_name)
split_gzip_file(input_file, output_base)

files_to_process = [
file_name
for file_name in os.listdir(input_dir)
if file_name.endswith(input_ext) and os.path.isfile(os.path.join(input_dir, file_name))
]

with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(process_file, files_to_process)


if __name__ == "__main__":
main()
Loading