Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local cache #197

Merged
merged 16 commits into from
Jun 5, 2023
37 changes: 35 additions & 2 deletions pretrain_data/filters/src/ai2_llm_filters/core_tools/runtime.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import argparse
import gzip
import logging
import multiprocessing
import os
import tempfile
from contextlib import ExitStack
from queue import Queue
from typing import Dict

import msgspec
from cached_path import cached_path
from smashed.utils.io_utils import (
compress_stream,
decompress_stream,
Expand Down Expand Up @@ -70,6 +73,9 @@ def process_single(
# skip on failure
skip_on_failure = kwargs.get("skip_on_failure", False)

# local read cache
local_read_cache = kwargs.get("local_read_cache", None)

# interval at which to update the progress bar; will double if it gets
# too full
update_interval = 1
Expand All @@ -82,13 +88,22 @@ def process_single(
encoder = msgspec.json.Encoder()
decoder = msgspec.json.Decoder(InputSpec)

# this will be used to cache the file locally if needed
caching_path = source_path

with ExitStack() as stack:
try:
# open each file for reading and writing. We use open_file_for_read to handle s3 paths and
# download the file locally if needed, while gzip.open is used to
# read and write gzipped files.
in_file = stack.enter_context(stream_file_for_read(source_path, "rb"))
in_stream = stack.enter_context(decompress_stream(in_file, "rt"))

if local_read_cache is not None:
caching_path = cached_path(source_path, cache_dir=local_read_cache)
in_stream = stack.enter_context(gzip.open(caching_path, mode="rt"))
else:
input_file = stack.enter_context(stream_file_for_read(source_path, mode="rb"))
in_stream = stack.enter_context(decompress_stream(input_file, mode="rt"))

out_file = stack.enter_context(open_file_for_write(destination_path, "wb"))
out_stream = stack.enter_context(compress_stream(out_file, "wt"))

Expand Down Expand Up @@ -134,6 +149,9 @@ def process_single(
raise Ai2LlmRetryableFailure(msg) from e
logger.warning("\nFatal " + msg)
raise Ai2LlmFilterError(msg) from e
finally:
if caching_path != source_path and os.path.exists(caching_path):
os.remove(caching_path)

# increment the files progress bar
cls.increment_progressbar(queue, files=1, documents=docs_cnt)
Expand Down Expand Up @@ -185,6 +203,12 @@ def main(cls):
type=str,
help="If provided, keeps track of which files have already been processed and skips them. ",
)
ap.add_argument(
"--local-read-cache",
default=None,
type=str,
help="If provided, will cache the files locally before processing them.",
)
ap.add_argument(
"--manually-included-paths",
default=None,
Expand All @@ -194,6 +218,9 @@ def main(cls):
ap.add_argument(
"--manually-excluded-paths", default=None, nargs="+", help="If provided, these paths will be skipped."
)
ap.add_argument(
"--safe-mode", action="store_true", help="Run in safe mode; will download locally before processing."
)
opts = ap.parse_args()

if opts.list_taggers:
Expand All @@ -210,6 +237,9 @@ def main(cls):
source_prefix = f"{cls.BASE_S3_PREFIX}/{opts.dataset}/documents"
destination_prefix = f"{cls.BASE_S3_PREFIX}/{opts.dataset}/attributes/{opts.experiment_name}"

# use a local read cache if we are in safe mode or if a local read cache is provided
local_read_cache = opts.local_read_cache or (tempfile.gettempdir() if opts.safe_mode else None)

with tempfile.TemporaryDirectory() as tempdir:
metadata_workdir = opts.reuse_existing or tempdir
ignore_existing = opts.reuse_existing is None
Expand All @@ -225,6 +255,8 @@ def main(cls):
f"skip on fail: {opts.skip_on_failure}\n"
f"reuse prev: {not ignore_existing}\n"
f"workdir: {metadata_workdir}\n"
f"safe mode: {opts.safe_mode}\n"
f"local cache: {local_read_cache}\n"
"---------------------------\n"
)
print(msg)
Expand All @@ -243,5 +275,6 @@ def main(cls):
taggers_names=opts.taggers,
experiment_name=opts.experiment_name,
skip_on_failure=opts.skip_on_failure,
local_read_cache=local_read_cache,
retry_on_read_error=opts.retry_on_read_error,
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ class FastTextJigsawNsfwDocumentTagger(FastTextJigsawHatespeechDocumentTagger):

@TaggerRegistry.add("jigsaw_nsfw_sencence_v2")
class FastTextJigsawNsfwSentenceTagger(FastTextJigsawHatespeechSentenceTagger):
...
MODEL_PATH = "https://ai2-s2-research-public.s3.us-west-2.amazonaws.com/aakankshan/olmo-data-filters/jigsaw_fasttext_bigrams_nsfw_final.bin" # noqa: E501
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head/documents/v1_small-head-0101.json.gz"
],
"work_dir": {
"input": "/data2/v1-small-head/deduper/input",
"output": "/data2/v1-small-head/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head-middle/documents/v1_small-head-middle-0126.json.gz"
],
"work_dir": {
"input": "/tmp/v1-small-head-middle/deduper/input",
"output": "/tmp/v1-small-head-middle/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
24 changes: 24 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/falcon-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/falcon-refinedweb/v0/documents/*.json.gz"
],
"work_dir": {
"input": "/data2/falcon/deduper/input",
"output": "/data2/falcon/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
44 changes: 44 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/rp-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"documents": [
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=wikipedia/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=wikipedia/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=wikipedia/*.gz"
],
"work_dir": {
"input": "/data2/redpajama-v1/deduper/input",
"output": "/data2/redpajama-v1/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
63 changes: 63 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/s2-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"documents": [
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=9/*.gz"
],
"work_dir": {
"input": "/tmp/abl-s2-v2/deduper/input",
"output": "/tmp/abl-s2-v2/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
63 changes: 63 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/s2-v3-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"documents": [
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=9/*.gz"
],
"work_dir": {
"input": "/tmp/s2/deduper/input",
"output": "/tmp/s2/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "cc-v1-small-head-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v1-small-head-dedup/documents",
"max_size_in_bytes": 3894967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/cc-v1-small-head/mixer/input",
"output": "/data2/cc-v1-small-head/mixer/output"
},
"processes": 120
}
Loading
Loading