In [1]:
%cd /scratch/edk202/ngram-prep

%env LC_ALL=C.UTF-8
%env LANG=C.UTF-8

%pip install -e . --no-build-isolation -q

/scratch/edk202/ngram-prep
env: LC_ALL=C.UTF-8
env: LANG=C.UTF-8
Note: you may need to restart the kernel to use updated packages.


In [2]:
# Auto-reload packages
%load_ext autoreload
%autoreload 2

In [3]:
# Standard stuff
import shutil
from pathlib import Path

# NLTK stuff
from nltk.corpus import stopwords; stopwords = set(stopwords.words("english"))
from nltk.stem import WordNetLemmatizer; lemmatizer = WordNetLemmatizer()

# Raw n-gram acquisition stuff
from ngram_acquire.pipeline.orchestrate import download_and_ingest_to_rocksdb
from ngram_acquire.pipeline.logger import setup_logger
from utilities.save_sample import save_sample_to_db, verify_sample_db

# Downloaded n-gram filtering stuff
from ngram_filter import PipelineConfig, FilterConfig
from ngram_filter.pipeline.old.orchestrator import build_processed_db_sharded

# Cython utilities
from utilities.count_items import count_db_items
from utilities.reservoir_sampler import reservoir_sampling

In [4]:
setup_logger(
    db_path="/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/1grams.db",
    console=False,
    rotate=True,
    max_bytes=100_000_000,
    backup_count=5,
    force=True
)

PosixPath('/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/1grams.db/ngram_download_20250909_095506.log')

# **Download 5-Grams and Ingest to a RocksDB Database**

In [None]:
download_and_ingest_to_rocksdb(
    ngram_size = 5,
    repo_release_id = "20200217",
    repo_corpus_id = "eng",
    db_path = "/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/5grams.db",
    file_range = (0, 19422),
    random_seed = 42,
    workers = 39,
    use_threads = False,
    ngram_type = "tagged",
    overwrite = False,
    write_batch_size = 100_000,
    open_type = "bulk_write",
    post_compact = False
)

[31mStart Time: 2025-09-06 21:58:02[0m
[4m
Download & Ingestion Configuration[0m
Ngram repository:           https://storage.googleapis.com/books/ngrams/books/20200217/eng/eng-5-ngrams_exports.html
RocksDB database path:      /vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/5grams.db
File index range:           0 to 19422 (count ~ 19423)
Total files available:      19423
Files to process:           9657
First file URL:             http://storage.googleapis.com/books/ngrams/books/20200217/eng/5-16099-of-19423.gz
Last file URL:              http://storage.googleapis.com/books/ngrams/books/20200217/eng/5-03676-of-19423.gz
Ngram size:                 5
Ngram filtering:            tagged
Overwrite mode:             False
Files to skip (processed):  9766
Write batch size:           100,000
Worker processes/threads:   39 (processes)



Processing Files:   4%|[34m▍         [0m| 411/9657 [10:44<2:19:25,  1.11files/s] 

## Count the Raw Records

In [None]:
db_path = "/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/5grams.db"

count = count_db_items(
    db_path,
    progress_interval=50_000_000
)

## Sample the Raw Records

In [None]:
db_path = "/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/5grams.db"

sample = reservoir_sampling(
    db_path,
    sample_size=50_000,
    key_type="byte",
    progress_interval=100_000_000,
    return_keys=True,
)

## Save the Sample to a Testing Database

In [None]:
db_path = "/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/1grams_test.db"

save_sample_to_db(
    sample,
    db_path,
    overwrite=True
)

In [None]:
db_path = "/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/1grams_test.db"

valid = verify_sample_db(
    db_path,
    show_count=50,
    decode_output=True,
    unpack_ngram=True
)

## Process the Ngrams

In [None]:
SRC = Path("/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/5grams.db").resolve()
DST = SRC.parent / "5grams_processed.db"
TMP = SRC.parent / "sst_tmp"
VOC = Path("/vast/edk202/NLP_corpora/Google_Books/20200217/eng/5gram_files/vocab.txt")

if DST.exists():
    shutil.rmtree(DST)
if TMP.exists():
    shutil.rmtree(TMP)

p_cfg = PipelineConfig(
    src_db=SRC,
    dst_db=DST,
    tmp_dir=TMP,

    # Parallelism
    readers=8,         # pool size (processes)
    outer_writers=4,   # shard writers
    inner_lanes=32,    # per-writer in-RAM combiner lanes

    # Progress
    progress_every_s=10.0,

    # Dynamic reader pool (small slices -> better balance)
    reader_slice_factor=16,
    reader_slices=0,

    # Stage A (source)
    source_read_profile="read",

    # Stage B (sharded writers)
    writer_profile="bulk_write:packed24",
    writer_disable_wal=True,

    # Stage C (ingest -> final DB)
    ingest_read_profile="read:packed24",
    ingest_write_profile="bulk_write:packed24",
    ingest_batch_bytes=256 << 20,
    ingest_batch_items=500_000,
    ingest_disable_wal=True,
    finalize_shards=True
)

f_cfg = FilterConfig(
    opt_lower = True,
    opt_alpha = False,
    opt_shorts = False,
    opt_stops = False,
    opt_lemmas = False,
    min_len = 0,
    stop_set = None,
    lemma_gen = None,
    vocab_path=VOC
#    stop_set=stopwords,
#    lemma_gen=lemmatizer,
)

build_processed_db_sharded(p_cfg, f_cfg)

RUNNING N-GRAM FILTER PIPELINE

Phase 1: Input DB → Filtered RocksDB Shards
R=readers done | S=recs scanned | W=recs written | K=recs kept | R=throughput | T=elapsed
                              ( statistics are cumulative )                              
R=0/8 | S=0 | W=0 | K=0% | R=0/s | T=10.1s


# Try rerunning the processing pipeline: (1) lowercasing and vocab, (2) alpha, stops, shorts, (2) lemmas.