Skip to content

Commit

Permalink
Merge pull request #197 from allenai/soldni/local_cache
Browse files Browse the repository at this point in the history
Local cache
  • Loading branch information
soldni committed Jun 5, 2023
2 parents c642d4f + 601a29a commit 58ad163
Show file tree
Hide file tree
Showing 18 changed files with 530 additions and 13 deletions.
37 changes: 35 additions & 2 deletions pretrain_data/filters/src/ai2_llm_filters/core_tools/runtime.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import argparse
import gzip
import logging
import multiprocessing
import os
import tempfile
from contextlib import ExitStack
from queue import Queue
from typing import Dict

import msgspec
from cached_path import cached_path
from smashed.utils.io_utils import (
compress_stream,
decompress_stream,
Expand Down Expand Up @@ -70,6 +73,9 @@ def process_single(
# skip on failure
skip_on_failure = kwargs.get("skip_on_failure", False)

# local read cache
local_read_cache = kwargs.get("local_read_cache", None)

# interval at which to update the progress bar; will double if it gets
# too full
update_interval = 1
Expand All @@ -82,13 +88,22 @@ def process_single(
encoder = msgspec.json.Encoder()
decoder = msgspec.json.Decoder(InputSpec)

# this will be used to cache the file locally if needed
caching_path = source_path

with ExitStack() as stack:
try:
# open each file for reading and writing. We use open_file_for_read to handle s3 paths and
# download the file locally if needed, while gzip.open is used to
# read and write gzipped files.
in_file = stack.enter_context(stream_file_for_read(source_path, "rb"))
in_stream = stack.enter_context(decompress_stream(in_file, "rt"))

if local_read_cache is not None:
caching_path = cached_path(source_path, cache_dir=local_read_cache)
in_stream = stack.enter_context(gzip.open(caching_path, mode="rt"))
else:
input_file = stack.enter_context(stream_file_for_read(source_path, mode="rb"))
in_stream = stack.enter_context(decompress_stream(input_file, mode="rt"))

out_file = stack.enter_context(open_file_for_write(destination_path, "wb"))
out_stream = stack.enter_context(compress_stream(out_file, "wt"))

Expand Down Expand Up @@ -134,6 +149,9 @@ def process_single(
raise Ai2LlmRetryableFailure(msg) from e
logger.warning("\nFatal " + msg)
raise Ai2LlmFilterError(msg) from e
finally:
if caching_path != source_path and os.path.exists(caching_path):
os.remove(caching_path)

# increment the files progress bar
cls.increment_progressbar(queue, files=1, documents=docs_cnt)
Expand Down Expand Up @@ -185,6 +203,12 @@ def main(cls):
type=str,
help="If provided, keeps track of which files have already been processed and skips them. ",
)
ap.add_argument(
"--local-read-cache",
default=None,
type=str,
help="If provided, will cache the files locally before processing them.",
)
ap.add_argument(
"--manually-included-paths",
default=None,
Expand All @@ -194,6 +218,9 @@ def main(cls):
ap.add_argument(
"--manually-excluded-paths", default=None, nargs="+", help="If provided, these paths will be skipped."
)
ap.add_argument(
"--safe-mode", action="store_true", help="Run in safe mode; will download locally before processing."
)
opts = ap.parse_args()

if opts.list_taggers:
Expand All @@ -210,6 +237,9 @@ def main(cls):
source_prefix = f"{cls.BASE_S3_PREFIX}/{opts.dataset}/documents"
destination_prefix = f"{cls.BASE_S3_PREFIX}/{opts.dataset}/attributes/{opts.experiment_name}"

# use a local read cache if we are in safe mode or if a local read cache is provided
local_read_cache = opts.local_read_cache or (tempfile.gettempdir() if opts.safe_mode else None)

with tempfile.TemporaryDirectory() as tempdir:
metadata_workdir = opts.reuse_existing or tempdir
ignore_existing = opts.reuse_existing is None
Expand All @@ -225,6 +255,8 @@ def main(cls):
f"skip on fail: {opts.skip_on_failure}\n"
f"reuse prev: {not ignore_existing}\n"
f"workdir: {metadata_workdir}\n"
f"safe mode: {opts.safe_mode}\n"
f"local cache: {local_read_cache}\n"
"---------------------------\n"
)
print(msg)
Expand All @@ -243,5 +275,6 @@ def main(cls):
taggers_names=opts.taggers,
experiment_name=opts.experiment_name,
skip_on_failure=opts.skip_on_failure,
local_read_cache=local_read_cache,
retry_on_read_error=opts.retry_on_read_error,
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ class FastTextJigsawNsfwDocumentTagger(FastTextJigsawHatespeechDocumentTagger):

@TaggerRegistry.add("jigsaw_nsfw_sencence_v2")
class FastTextJigsawNsfwSentenceTagger(FastTextJigsawHatespeechSentenceTagger):
...
MODEL_PATH = "https://ai2-s2-research-public.s3.us-west-2.amazonaws.com/aakankshan/olmo-data-filters/jigsaw_fasttext_bigrams_nsfw_final.bin" # noqa: E501
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head/documents/v1_small-head-0101.json.gz"
],
"work_dir": {
"input": "/data2/v1-small-head/deduper/input",
"output": "/data2/v1-small-head/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head-middle/documents/v1_small-head-middle-0126.json.gz"
],
"work_dir": {
"input": "/tmp/v1-small-head-middle/deduper/input",
"output": "/tmp/v1-small-head-middle/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
24 changes: 24 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/falcon-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"documents": [
"pretraining-data/sources/falcon-refinedweb/v0/documents/*.json.gz"
],
"work_dir": {
"input": "/data2/falcon/deduper/input",
"output": "/data2/falcon/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
44 changes: 44 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/rp-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"documents": [
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=train/dataset=wikipedia/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=valid/dataset=wikipedia/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=arxiv/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=book/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=c4/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=common_crawl/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=github/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=stackexchange/*.gz",
"pretraining-data/sources/redpajama/v1/documents/split=test/dataset=wikipedia/*.gz"
],
"work_dir": {
"input": "/data2/redpajama-v1/deduper/input",
"output": "/data2/redpajama-v1/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
63 changes: 63 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/s2-v2-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"documents": [
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2ag/split=valid/part_id=9/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v2/documents/dataset=s2orc/split=valid/part_id=9/*.gz"
],
"work_dir": {
"input": "/tmp/abl-s2-v2/deduper/input",
"output": "/tmp/abl-s2-v2/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
63 changes: 63 additions & 0 deletions pretrain_data/mixer/config/ablations/dedupers/s2-v3-dedup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"documents": [
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=train/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2ag/split=valid/part_id=9/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=0/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=1/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=2/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=3/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=4/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=5/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=6/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=7/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=8/*.gz",
"pretraining-data/sources/s2/v3/documents/dataset=s2orc/split=valid/part_id=9/*.gz"
],
"work_dir": {
"input": "/tmp/s2/deduper/input",
"output": "/tmp/s2/deduper/output"
},
"dedupe": {
"name": "decontamination",
"paragraphs": {
"attribute_name": "bff_duplicate_paragraph_spans"
},
"skip_empty": true
},
"bloom_filter": {
"file": "/tmp/decontamination/deduper_decontamination_lucas_20230525.bin",
"size_in_bytes": 8388608,
"read_only": true,
"estimated_doc_count": 3898706,
"desired_false_positive_rate": 0.001
},
"processes": 120
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"streams": [
{
"name": "cc-v1-small-head-dedup",
"documents": [
"pretraining-data/sources/common-crawl/v1-small-head/documents/*"
],
"output": {
"path": "pretraining-data/sources/common-crawl/abl-cc-v1-small-head-dedup/documents",
"max_size_in_bytes": 3894967296
},
"attributes": ["decontamination"],
"filter": {
"include": [],
"exclude": [
"$@.attributes[?(@.bff_duplicate_paragraph_spans && @.bff_duplicate_paragraph_spans[0] && @.bff_duplicate_paragraph_spans[0][2] >= 1.0)]"
]
}
}
],
"work_dir": {
"input": "/data2/cc-v1-small-head/mixer/input",
"output": "/data2/cc-v1-small-head/mixer/output"
},
"processes": 120
}

0 comments on commit 58ad163

Please sign in to comment.