<a href="https://colab.research.google.com/github/grfaith/May25/blob/main/Optimized_2_get_AS_text_and_light_preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Code to take article_ID, get AS text, bounding boxes, etc. generate link and do light preprocessing on article text.  See for ref: https://chatgpt.com/share/68375389-ee70-800c-95f2-6f6cf5a09d47

Optimization thread: https://chatgpt.com/share/68386b21-b01c-800c-8cc3-37f8bf98754c  

Second optimization (load to RAM, queue on disk, multithread postprocessing)
https://chatgpt.com/share/68557d79-2b68-800c-a51a-c1c5dc24fe34


In [1]:
!pip install ipympl
!pip install symspellpy
!pip install -q orjson

Collecting ipympl
  Downloading ipympl-0.9.7-py3-none-any.whl.metadata (8.7 kB)
Collecting jedi>=0.16 (from ipython<10->ipympl)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading ipympl-0.9.7-py3-none-any.whl (515 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m515.7/515.7 kB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m56.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi, ipympl
Successfully installed ipympl-0.9.7 jedi-0.19.2
Collecting symspellpy
  Downloading symspellpy-6.9.0-py3-none-any.whl.metadata (3.9 kB)
Collecting editdistpy>=0.1.3 (from symspellpy)
  Downloading editdistpy-0.1.6-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Downloading symspellpy-6.9.0-py3-none-any.whl (2.6 MB)
[2K   [90m━━━━━━━━━━━━

In [2]:
# ── 1. Mount your Google Drive ─────────────────────────────────────────────
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# ── 2. Imports ─────────────────────────────────────────────────────────────
import os
import requests
import tarfile
import orjson
import pandas as pd
from tqdm import tqdm
from pandas.errors import EmptyDataError
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import pkg_resources
from symspellpy import SymSpell, Verbosity
import string

Mounted at /content/drive


In [3]:
import string
import pkg_resources
from symspellpy import SymSpell, Verbosity

# ——— Initialize SymSpell once ———
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
dict_path = pkg_resources.resource_filename(
    "symspellpy", "frequency_dictionary_en_82_765.txt"
)
sym_spell.load_dictionary(dict_path, term_index=0, count_index=1)


# ——— 1. Merge hyphenated or split lines ———
def line_merge(text: str) -> str:
    lines = [l.split() for l in text.split("\n")]
    for i in range(len(lines) - 1):
        if not lines[i] or not lines[i + 1]:
            continue
        last, first = lines[i][-1], lines[i + 1][0]
        if last.endswith("-"):
            # Hyphen at end of line
            lines[i][-1] = last[:-1] + first
            lines[i + 1] = lines[i + 1][1:]
        else:
            a = last.strip(string.punctuation).lower()
            b = first.strip(string.punctuation).lower()
            if (a + b) in sym_spell.words:
                lines[i][-1] = last + first
                lines[i + 1] = lines[i + 1][1:]
    return "\n".join(" ".join(words) for words in lines)


# ——— 2. Spell-check individual words ———
def check_word(word: str) -> str:
    core = word.strip(string.punctuation)
    if not core:
        return word
    suggestions = sym_spell.lookup(
        core,
        Verbosity.CLOSEST,
        max_edit_distance=1,
        include_unknown=True,
        transfer_casing=True
    )
    return word.replace(core, suggestions[0].term)


def spell_check(text: str) -> str:
    return "\n".join(
        " ".join(check_word(w) for w in line.split(" "))
        for line in text.split("\n")
    )


# ——— 3. Fix capitalization after sentence boundaries ———
def capitalization_check(text: str) -> str:
    out_lines = []
    for line in text.split("\n"):
        words = line.split(" ")
        for i in range(1, len(words)):
            if words[i - 1].endswith((".", "!", "?")):
                words[i] = words[i].capitalize()
            else:
                core = words[i].strip(string.punctuation).lower()
                if core in sym_spell.words and core not in ("i", "i'll"):
                    words[i] = core
        out_lines.append(" ".join(words))
    return "\n".join(out_lines)


# ——— Final optimized postprocess ———
def postprocess(text: str) -> str:
    if not text:
        return ""
    merged = line_merge(text)
    checked = spell_check(merged)
    return capitalization_check(checked)


In [4]:
GROUPED_DIR    = '/content/drive/MyDrive/AmStories_grouped'
OUT_DIR        = '/content/drive/MyDrive/AmStories_text'
CACHE_DIR      = '/tmp/americanstories'
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)



In [5]:
# ── HELPERS ──────────────────────────────────────────────────────────────
def json_filename_from_aid(aid: str) -> str:
    return aid.split('_', 1)[1] + '.json'

def json_id_from_aid(aid: str) -> str:
    return aid + '.json'

def download_tar(year: int) -> str:
    """Download the tarball (if missing) and return its local path."""
    fn = f'faro_{year}.tar.gz'
    path = os.path.join(CACHE_DIR, fn)
    if not os.path.exists(path):
        url = (
            'https://huggingface.co/datasets/'
            'dell-research-harvard/AmericanStories/resolve/main/'
            + fn
        )
        resp = requests.get(url, stream=True)
        resp.raise_for_status()
        with open(path, 'wb') as out:
            for chunk in resp.iter_content(8_192):
                out.write(chunk)
    return path

def build_member_index(tf: tarfile.TarFile) -> dict:
    idx = {}
    for m in tf.getmembers():
        if m.isfile() and m.name.endswith('.json'):
            idx[os.path.basename(m.name)] = m
    return idx

def extract_fragment_from_cache(page_dict: dict, aid: str) -> tuple[str | None, list | None]:
    # 1) Determine the JSON filename as before
    jfn = json_filename_from_aid(aid)
    page = page_dict.get(jfn)
    if not page:
        return None, None

    # 2) Compute the prefix up to the second underscore
    #    e.g. for "101_1856-11-13_p1_sn..." prefix becomes "101_1856-11-13"
    parts = aid.split('_', 2)
    if len(parts) < 2:
        return None, None
    prefix = f"{parts[0]}_{parts[1]}"

    # 3) Find the article whose "id" starts with that prefix
    art = next(
        (a for a in page.get('full articles', [])
         if a.get('id', '').startswith(prefix)),
        None
    )
    if not art:
        return None, None

    # 4) Assemble the text fields
    pieces = []
    for fld in ('headline', 'byline', 'article'):
        t = art.get(fld)
        if t:
            pieces.append(t.strip())
    text = "\n\n".join(pieces) or None

    # 5) Extract bbox
    bbox = art.get('bbox')
    return text, bbox


def make_article_link(aid: str, bbox: list|None) -> str|None:
    if not bbox:
        return None
    date, page, sn = aid.split('_')[1:4]
    clip = ','.join(map(str, bbox))
    return (
        f"https://www.loc.gov/resource/{sn}/{date}/ed-1/"
        f"?sp={page.lstrip('p')}&clip={clip}"
    )


In [6]:
import os
import time
import shutil
import psutil
import orjson
import tarfile
import pandas as pd
from tqdm.notebook import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
from os import cpu_count

# Top-level helper for multiprocessing
def safe_postprocess(text):
    try:
        return postprocess(text) if text else None
    except Exception as e:
        return f"[POSTPROCESS ERROR] {e}"

# System monitoring
def log_status(tag=""):
    ram = psutil.virtual_memory()
    disk = shutil.disk_usage("/")
    print(f"[{tag}] RAM used:  {ram.used / 1e9:.2f} GB / {ram.total / 1e9:.2f} GB")
    print(f"[{tag}] Disk free: {disk.free / 1e9:.2f} GB / {disk.total / 1e9:.2f} GB")
    print("-" * 60)

# Fast article text lookup from preloaded JSON
def extract_fragment_from_cache(page_dict: dict, aid: str) -> tuple[str|None, list|None]:
    jfn = json_filename_from_aid(aid)
    page = page_dict.get(jfn)
    if not page:
        return None, None

    # prefix match (everything up to the 2nd underscore)
    parts = aid.split("_", 2)
    prefix = f"{parts[0]}_{parts[1]}"

    art = next(
        (a for a in page.get('full articles', [])
         if a.get('id', "").startswith(prefix)),
        None
    )
    if not art:
        return None, None

    # assemble text
    pieces = []
    for fld in ("headline", "byline", "article"):
        t = art.get(fld)
        if t:
            pieces.append(t.strip())
    text = "\n\n".join(pieces) or None

    # get bbox
    bbox = art.get("bbox")
    return text, bbox


# Main year-processing function
def process_year(year: int) -> str:
    start_time = time.time()
    csv_fp = os.path.join(GROUPED_DIR, f'Grouped_KW_Hits_May25_SW_{year}.csv')
    if not os.path.isfile(csv_fp):
        return f"⚠️ Year {year}: no CSV, skipping."

    try:
        df = pd.read_csv(csv_fp, converters={'keyword_counts': lambda x: x})
    except pd.errors.EmptyDataError:
        return f"⚠️ Year {year}: empty CSV, skipping."
    if df.empty:
        return f"⚠️ Year {year}: no rows, skipping."

    tar_path = download_tar(year)
    tf = tarfile.open(tar_path, 'r:gz')

    # Load all JSONs into RAM
    preload_start = time.time()
    page_dict = {
        os.path.basename(m.name): orjson.loads(tf.extractfile(m).read())
        for m in tf.getmembers() if m.isfile() and m.name.endswith('.json')
    }
    preload_time = time.time() - preload_start

    # Sample for estimated duration
    sample_size = min(100, len(df))
    timing_pairs = []
    extract_start = time.time()
    for aid in df['article_ID'][:sample_size]:
        pair = extract_fragment_from_cache(page_dict, aid)
        timing_pairs.append(pair)
    sample_extract_time = time.time() - extract_start

    texts_sample, _ = zip(*timing_pairs)
    post_start = time.time()
    with ProcessPoolExecutor(max_workers=cpu_count()) as pool:
        _ = list(pool.map(safe_postprocess, texts_sample))
    sample_post_time = time.time() - post_start

    total_est_sec = (sample_extract_time + sample_post_time) * len(df) / sample_size
    print(f"[Sample] Est. total: {total_est_sec/60:.1f} min ({total_est_sec/3600:.2f} hrs) "
          f"→ Extract rate: {sample_size/sample_extract_time:.2f}/s, Post rate: {sample_size/sample_post_time:.2f}/s")

    # Optional early exit
    MAX_ALLOWABLE_HOURS = 6
    if total_est_sec > MAX_ALLOWABLE_HOURS * 3600:
        print(f"❌ Year {year} skipped (est. {total_est_sec/3600:.2f} hrs exceeds threshold).")
        return f"⚠️ Year {year} skipped (too slow)"

    # Full extraction with progress bar
    all_pairs = []
    extract_start = time.time()
    for aid in tqdm(df['article_ID'], desc=f"Processing {year}", leave=True):
        all_pairs.append(extract_fragment_from_cache(page_dict, aid))
    extract_time = time.time() - extract_start

    texts, bboxes = zip(*all_pairs)
    processed = [None] * len(texts)

    # Full postprocessing with progress bar + error resilience
    post_start = time.time()
    with ProcessPoolExecutor(max_workers=cpu_count()) as pool:
        futures = {pool.submit(safe_postprocess, t): i for i, t in enumerate(texts)}
        for fut in tqdm(as_completed(futures), total=len(texts), desc=f"Postprocessing {year}"):
            i = futures[fut]
            try:
                processed[i] = fut.result()
            except Exception as e:
                processed[i] = f"[ERROR] {e}"
    post_time = time.time() - post_start

    df['analyze_text'] = processed
    df['bbox'] = bboxes

    # ── HALT IF ANY analyze_text IS MISSING ─────────────────────────────────────────
    missing = df['analyze_text'].isnull() | (df['analyze_text'] == "")
    if missing.any():
        bad_ids = df.loc[missing, 'article_ID'].tolist()
        raise RuntimeError(
            f"⛔️ Missing analyze_text for {len(bad_ids)} articles: {bad_ids[:5]}..."
        )
    # ────────────────────────────────────────────────────────────────────────────────


    # URL columns
    parts = df['article_ID'].str.split('_', expand=True)
    df['date'] = parts[1]
    df['page'] = parts[2].str.lstrip('p')
    df['sn'] = parts[3]
    df['clip'] = df['bbox'].map(lambda b: ','.join(map(str, b)) if b else '')
    df['article_link'] = (
        "https://www.loc.gov/resource/" + df['sn'] + "/" +
        df['date'] + "/ed-1/?sp=" + df['page'] + "&clip=" + df['clip']
    )

    out_fp = os.path.join(OUT_DIR, f'AS_Text_Analyzed_May25_SW_{year}.csv')
    df.to_csv(out_fp, index=False)

    tf.close()
    os.remove(tar_path)
    total_time = time.time() - start_time

    log_status(f"After year {year}")

    return (
        f"✔ Year {year} done → wrote {out_fp}\n"
        f"  • Load JSONs: {preload_time:.1f}s | Sample Est.: {total_est_sec/60:.1f} min\n"
        f"  • Extract: {extract_time:.1f}s | Postprocess: {post_time:.1f}s\n"
        f"  • Total: {total_time/60:.1f} min | Rate: {len(df) / total_time:.2f} articles/sec"
    )


In [11]:
import threading

def background_download(year):
    try:
        print(f"📦 Pre-caching tarball for {year} in background...")
        download_tar(year)
        print(f"✅ Cached year {year}")
    except Exception as e:
        print(f"⚠️ Background download for year {year} failed: {e}")

# Start with no preloaded tarball
years = list(range(1906, 1940))
next_download_thread = None

# Clear logs
open("completed_years.log", "w").close()
open("failed_years.log", "w").close()

for i, year in enumerate(years):
    try:
        print(f"▶ Starting year {year}")

        # Wait for background download (if any) to complete
        if next_download_thread:
            next_download_thread.join()

        # Start background download for next year
        if i + 1 < len(years):
            next_year = years[i + 1]
            next_download_thread = threading.Thread(target=background_download, args=(next_year,))
            next_download_thread.start()

        # This year’s tarball should now exist, or be downloaded synchronously
        download_tar(year)
        log_status(f"Downloaded year {year}")
        result = process_year(year)
        print(result)

        with open("completed_years.log", "a") as f:
            f.write(f"{year}: OK\n")
        log_status(f"Completed year {year}")

    except Exception as e:
        err_msg = f"❌ Year {year} failed: {e}"
        print(err_msg)
        with open("failed_years.log", "a") as f:
            f.write(f"{year}: {e}\n")

print ("Done")

▶ Starting year 1844
[Downloaded year 1844] RAM used:  20.30 GB / 54.75 GB
[Downloaded year 1844] Disk free: 200.07 GB / 242.49 GB
------------------------------------------------------------
[Sample] Est. total: 1.5 min (0.02 hrs) → Extract rate: 70718.33/s, Post rate: 28.03/s


Processing 1844:   0%|          | 0/2477 [00:00<?, ?it/s]

Postprocessing 1844:   0%|          | 0/2477 [00:00<?, ?it/s]

[After year 1844] RAM used:  20.31 GB / 54.75 GB
[After year 1844] Disk free: 200.63 GB / 242.49 GB
------------------------------------------------------------
✔ Year 1844 done → wrote /content/drive/MyDrive/AmStories_text/AS_Text_Analyzed_May25_SW_1844.csv
  • Load JSONs: 24.8s | Sample Est.: 1.5 min
  • Extract: 0.1s | Postprocess: 9.1s
  • Total: 0.6 min | Rate: 64.58 articles/sec
[Completed year 1844] RAM used:  20.30 GB / 54.75 GB
[Completed year 1844] Disk free: 200.63 GB / 242.49 GB
------------------------------------------------------------
Done


In [8]:
# import os
# print(os.cpu_count())
