### Remote Index Strategy

This notebook always scans the full GOV.UK Employment Appeal Tribunal universe.

If `remote_index.json` already exists:
- it is treated as a cache, not a source of truth
- previously indexed slugs are skipped
- only new or missing decisions are fetched and HEAD-checked

This design ensures:
- crash-safe resume (no lost progress)
- detection of newly added decisions
- immunity to GOV.UK reordering or backfills

The Search API scan is cheap; Content API fetches and PDF HEAD requests are the expensive steps and are only performed for unseen decisions.


In [2]:
from pathlib import Path
from urllib.parse import urljoin, urlparse, quote
import json

from eat_remote_index import (
    HttpClient,
    GOVUK,
    write_json,
    _pick_pdf_from_content_api,   # returns first PDF (fine for now)
)

# =========================
# SINGLE SELECTOR (HAMSTER SAFE)
# =========================
MODE = "EAT"   # <-- set to "EAT" or "ET" ONLY

if MODE not in {"EAT", "ET"}:
    raise ValueError("MODE must be 'EAT' or 'ET'")

CONFIG = {
    "EAT": {
        "doc_type": "employment_appeal_tribunal_decision",
        "base_path": "/employment-appeal-tribunal-decisions/",
        "out_name": "remote_index_eat.json",
        "tmp_name": "remote_index_eat.tmp.json",
    },
    "ET": {
        "doc_type": "employment_tribunal_decision",
        "base_path": "/employment-tribunal-decisions/",
        "out_name": "remote_index_et.json",
        "tmp_name": "remote_index_et.tmp.json",
    },
}[MODE]

# =========================
# MANIFEST PATHS (HARD, EXPLICIT)
# =========================
BASE_MANIFESTS = Path("/home/hello/Appeal_reader/manifests").resolve()
OUT_DIR = (BASE_MANIFESTS / MODE).resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUT_PATH = OUT_DIR / CONFIG["out_name"]
TMP_PATH = OUT_DIR / CONFIG["tmp_name"]

CHECKPOINT_EVERY = 200
MAX_ITEMS = None  # set to 200 for smoke test

client = HttpClient(timeout=30.0, max_retries=4, backoff_base=0.8, min_delay=0.15)

def atomic_checkpoint(obj, out_path: Path, tmp_path: Path):
    write_json(tmp_path, obj)
    tmp_path.replace(out_path)

def slug_from_url(url: str, base_path: str) -> str:
    path = urlparse(url).path.rstrip("/")
    return path.split(base_path)[-1].strip("/")

def iter_search_results_with_tqdm_local(client, *, doc_type: str, count: int = 200, max_items=None, order: str = "-public_timestamp"):
    """
    GOV.UK Search API iterator with tqdm that initializes after first response
    (so we know 'total').
    """
    try:
        from tqdm.auto import tqdm
    except Exception:
        def tqdm(x, **kwargs):
            return x

    start = 0
    yielded = 0
    pbar = None

    while True:
        url = (
            f"{GOVUK}/api/search.json"
            f"?filter_document_type={quote(doc_type)}"
            f"&order={quote(order)}"
            f"&count={count}"
            f"&start={start}"
        )
        payload = client.get_json(url)
        results = payload.get("results") or []
        if not results:
            break

        if pbar is None:
            total = payload.get("total")
            pbar = tqdm(total=total, desc=f"{MODE} remote index", unit="doc")

        for r in results:
            yield r
            yielded += 1
            pbar.update(1)
            if max_items is not None and yielded >= max_items:
                pbar.close()
                return

        start += len(results)
        total = payload.get("total")
        if isinstance(total, int) and start >= total:
            break

    if pbar:
        pbar.close()

# =========================
# RESUME
# =========================
if OUT_PATH.exists():
    remote_index = json.loads(OUT_PATH.read_text(encoding="utf-8"))
    print(f"[{MODE}] Resuming: loaded {len(remote_index)} records from {OUT_PATH}")
else:
    remote_index = {}
    print(f"[{MODE}] Starting fresh index -> {OUT_PATH}")

processed = 0
skipped = 0

for r in iter_search_results_with_tqdm_local(client, doc_type=CONFIG["doc_type"], max_items=MAX_ITEMS):
    link = r.get("link")
    if not link:
        continue

    decision_url = urljoin(GOVUK, link)
    slug = slug_from_url(decision_url, CONFIG["base_path"])

    if slug in remote_index:
        skipped += 1
        continue

    content_url = urljoin(GOVUK, "/api/content" + link)
    content = client.get_json(content_url)

    title = (content.get("title") or r.get("title") or "").strip()
    published_date = content.get("public_updated_at") or r.get("public_timestamp")

    pdf_url = _pick_pdf_from_content_api(content)
    pdf_filename = Path(urlparse(pdf_url).path).name if pdf_url else None

    rec = {
        "slug": slug,
        "decision_page_url": decision_url,
        "title": title,
        "decision_date": None,
        "published_date": published_date,
        "pdf_url": pdf_url,
        "pdf_filename": pdf_filename,
        "pdf_etag": None,
        "pdf_last_modified": None,
        "pdf_content_length": None,
        "head_error": None,
    }

    if pdf_url:
        h = client.head(pdf_url)
        rec["pdf_etag"] = h.headers.get("ETag")
        rec["pdf_last_modified"] = h.headers.get("Last-Modified")
        rec["pdf_content_length"] = h.headers.get("Content-Length")
        rec["head_error"] = h.headers.get("x-head-error")

    remote_index[slug] = rec
    processed += 1

    if processed % CHECKPOINT_EVERY == 0:
        atomic_checkpoint(remote_index, OUT_PATH, TMP_PATH)
        print(f"[{MODE}] checkpoint: processed={processed} total={len(remote_index)} skipped={skipped}")

atomic_checkpoint(remote_index, OUT_PATH, TMP_PATH)
print(f"[{MODE}] done: wrote {len(remote_index)} records to {OUT_PATH} (processed={processed}, skipped={skipped})")
print(f"[{MODE}] manifests folder: {OUT_DIR}")


[EAT] Starting fresh index -> /home/hello/Appeal_reader/manifests/EAT/remote_index_eat.json


EAT remote index:   0%|          | 0/2480 [00:00<?, ?doc/s]

[EAT] checkpoint: processed=200 total=200 skipped=0
[EAT] checkpoint: processed=400 total=400 skipped=0
[EAT] checkpoint: processed=600 total=600 skipped=0
[EAT] checkpoint: processed=800 total=800 skipped=0
[EAT] checkpoint: processed=1000 total=1000 skipped=0
[EAT] checkpoint: processed=1200 total=1200 skipped=0
[EAT] checkpoint: processed=1400 total=1400 skipped=0
[EAT] checkpoint: processed=1600 total=1600 skipped=0
[EAT] checkpoint: processed=1800 total=1800 skipped=0
[EAT] checkpoint: processed=2000 total=2000 skipped=0
[EAT] checkpoint: processed=2200 total=2200 skipped=0
[EAT] checkpoint: processed=2400 total=2400 skipped=0
[EAT] done: wrote 2480 records to /home/hello/Appeal_reader/manifests/EAT/remote_index_eat.json (processed=2480, skipped=0)
[EAT] manifests folder: /home/hello/Appeal_reader/manifests/EAT


### Why this step is fast (and why that’s correct)

This phase operates entirely on **local state** and **metadata** only.

What happens here:
- local PDFs are discovered via filesystem walk
- only file metadata is read (size, mtime)
- no PDF contents are read
- no hashing is performed (`compute_sha256=False`)
- results are serialized once via `json.dump`

What does *not* happen here:
- no network calls
- no PDF downloads
- no content parsing
- no OCR
- no LLM usage

As a result, this step should complete in seconds even for thousands of PDFs.
Slowness here would indicate a bug or an unnecessary heavy operation.

This is intentional: indexing and reconciliation should always be cheap;
expensive work is deferred to later pipeline stages.


In [7]:
from pathlib import Path
import json
import importlib

import delta_calc as dc
importlib.reload(dc)

# =========================
# SINGLE SELECTOR (HAMSTER SAFE)
# =========================
MODE = "EAT"   # <-- set to "EAT" or "ET" ONLY

if MODE not in {"EAT", "ET"}:
    raise ValueError("MODE must be 'EAT' or 'ET'")

# =========================
# HARD PATHS (EXPLICIT)
# =========================
BASE_MANIFESTS = Path("/home/hello/Appeal_reader/manifests").resolve()
OUT_DIR = (BASE_MANIFESTS / MODE).resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)

LOCAL_DIRS = {
    "EAT": Path("/media/hello/Tribunals/EAT_Appeals").resolve(),
    "ET":  Path("/media/hello/Tribunals/ET_Cases").resolve(),
}
LOCAL_PDF_DIR = LOCAL_DIRS[MODE]

if not LOCAL_PDF_DIR.exists():
    raise FileNotFoundError(f"[{MODE}] LOCAL_PDF_DIR does not exist: {LOCAL_PDF_DIR}")
if not LOCAL_PDF_DIR.is_dir():
    raise NotADirectoryError(f"[{MODE}] LOCAL_PDF_DIR is not a directory: {LOCAL_PDF_DIR}")

print(f"[{MODE}] OUT_DIR       = {OUT_DIR}")
print(f"[{MODE}] LOCAL_PDF_DIR = {LOCAL_PDF_DIR}")

# =========================
# LOAD REMOTE INDEX (SELF-CONTAINED)
# =========================
REMOTE_INDEX_FILE = {
    "EAT": OUT_DIR / "remote_index_eat.json",
    "ET":  OUT_DIR / "remote_index_et.json",
}[MODE]

if not REMOTE_INDEX_FILE.exists():
    raise FileNotFoundError(
        f"[{MODE}] Remote index not found: {REMOTE_INDEX_FILE}\n"
        f"Run the remote index builder cell first to create it."
    )

remote_index = json.loads(REMOTE_INDEX_FILE.read_text(encoding="utf-8"))
if not isinstance(remote_index, dict):
    raise TypeError(f"[{MODE}] remote_index must be a dict, got: {type(remote_index)}")

print(f"[{MODE}] remote_index loaded: {len(remote_index)} records from {REMOTE_INDEX_FILE}")

# =========================
# LOCAL INDEX
# =========================
local_index = dc.scan_local_pdfs(
    LOCAL_PDF_DIR,
    recursive=True,
    compute_sha256=False,   # True only if you want heavy certainty
)

dc.write_json(OUT_DIR / "local_index.json", local_index)
print(f"[{MODE}] local_index written:", OUT_DIR / "local_index.json")
print(f"[{MODE}] local pdf count:", len(local_index))

# =========================
# DELTA
# =========================
delta = dc.compute_delta(remote_index, local_index)

dc.write_json(OUT_DIR / "delta.json", delta)
print(f"[{MODE}] delta written:", OUT_DIR / "delta.json")

delta["counts"]

missing = (delta.get("missing") or [])[:20]
changed = (delta.get("changed") or [])[:20]
orphaned = (delta.get("orphaned") or [])[:20]

missing, changed, orphaned


[EAT] OUT_DIR       = /home/hello/Appeal_reader/manifests/EAT
[EAT] LOCAL_PDF_DIR = /media/hello/Tribunals/EAT_Appeals
[EAT] remote_index loaded: 2480 records from /home/hello/Appeal_reader/manifests/EAT/remote_index_eat.json
[EAT] local_index written: /home/hello/Appeal_reader/manifests/EAT/local_index.json
[EAT] local pdf count: 2487
[EAT] delta written: /home/hello/Appeal_reader/manifests/EAT/delta.json


([],
 [],
 [{'filename': 'Digital_Communication_Systems_Ltd_v_Mr_C_Scully_UKEAT_0182_19_LA.pdf',
   'path': '/media/hello/Tribunals/EAT_Appeals/Digital_Communication_Systems_Ltd_v_Mr_C_Scully_UKEAT_0182_19_LA.pdf'},
  {'filename': 'Dr_Alaa_Jalaal_v_Grampian_Health_Board_Tayside_Health_Board_NHS_Education_For_Scotland__2024__EAT_97.pdf',
   'path': '/media/hello/Tribunals/EAT_Appeals/Dr_Alaa_Jalaal_v_Grampian_Health_Board_Tayside_Health_Board_NHS_Education_For_Scotland__2024__EAT_97.pdf'},
  {'filename': 'MITIE_Property_Services_UK_Ltd__v__Mr_B_Bennett_and_20_Others_and_Others_UKEATS_0023_19_SS_.pdf',
   'path': '/media/hello/Tribunals/EAT_Appeals/MITIE_Property_Services_UK_Ltd__v__Mr_B_Bennett_and_20_Others_and_Others_UKEATS_0023_19_SS_.pdf'},
  {'filename': 'Mr_John_J_Campbell_v_1__Sheffield_Teaching_North_Hospitals_NHS_Foundation_Trust_2__Mr_Wesley_Hammond__2025__EAT_42.pdf',
   'path': '/media/hello/Tribunals/EAT_Appeals/Mr_John_J_Campbell_v_1__Sheffield_Teaching_North_Hospitals_NHS

### Delta Definitions for above results (Remote vs Local)

This pipeline compares the **remote index** (source of truth) against the **local cache**.

**missing**  
Present in the remote index but absent locally.  
→ New or previously undownloaded decisions.  
→ Action: download.

**changed**  
Present both remotely and locally, but metadata differs (e.g. file size).  
→ Upstream PDF was silently replaced or corrected.  
→ Action: re-download and reprocess.

**orphaned**  
Present locally but no longer present in the remote index.  
→ The EAT site appears to operate a **sliding publication window**, where older
decisions may be removed from public listings.  
→ Orphaned does *not* mean invalid.  
→ Action: retain and quarantine; do not delete automatically.

This design preserves completeness while remaining resilient to upstream changes.


In [6]:
from pathlib import Path
import importlib

import eat_downloader as ed
importlib.reload(ed)

# =========================
# SINGLE SELECTOR (HAMSTER SAFE)
# =========================
MODE = "EAT"   # <-- set to "EAT" or "ET" ONLY

if MODE not in {"EAT", "ET"}:
    raise ValueError("MODE must be 'EAT' or 'ET'")

# =========================
# HARD PATHS (EXPLICIT)
# =========================
BASE_MANIFESTS = Path("/home/hello/Appeal_reader/manifests").resolve()
OUT_DIR = (BASE_MANIFESTS / MODE).resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)

PDF_DIRS = {
    "EAT": Path("/media/hello/Tribunals/EAT_Appeals").resolve(),
    "ET":  Path("/media/hello/Tribunals/ET_Cases").resolve(),
}
CASES_DIR = PDF_DIRS[MODE]
CASES_DIR.mkdir(parents=True, exist_ok=True)

print(f"[{MODE}] OUT_DIR    = {OUT_DIR}")
print(f"[{MODE}] CASES_DIR = {CASES_DIR}")

# =========================
# GUARD: ensure we're using the right delta for MODE
# =========================
delta_path = OUT_DIR / "delta.json"
if delta_path.exists():
    print(f"[{MODE}] OK: delta.json found -> {delta_path}")
else:
    print(f"[{MODE}] WARNING: delta.json not found in OUT_DIR -> {delta_path}")
    print(f"[{MODE}] Make sure you ran the delta cell for MODE={MODE} before downloading.")

# =========================
# DOWNLOAD (missing + changed)
# =========================
results = ed.download_missing_and_changed(
    delta=delta,
    eat_dir=CASES_DIR,        # target folder for PDFs
    out_dir=OUT_DIR,          # manifests folder (checkpoint saved here)
    archive_changed=True,     # changed => archive old then write new
    max_items=10000           # set e.g. 50 for a test
)

results.keys(), len(results["downloaded"]), len(results["archived"]), len(results["failed"])


[EAT] OUT_DIR    = /home/hello/Appeal_reader/manifests/EAT
[EAT] CASES_DIR = /media/hello/Tribunals/EAT_Appeals
[EAT] OK: delta.json found -> /home/hello/Appeal_reader/manifests/EAT/delta.json


Downloading EAT PDFs:   0%|          | 0/4 [00:00<?, ?file/s]

(dict_keys(['downloaded', 'archived', 'failed']), 175, 2, 0)

## Phase 1 – Fast PDF Sniffing & Regex Indexing (CPU-Parallel)

This script performs a **high-throughput first-pass classification** of ET/EAT PDF decisions.  
Its goal is to **quickly label cases by legal topic** (e.g. Unfair Dismissal, Whistleblowing) **without fully parsing or OCR-ing documents**.

---

### What the script does

1. **Selects dataset**
   - Controlled by a single `MODE` flag (`"EAT"` or `"ET"`).
   - Points to the corresponding local PDF directory.

2. **Scans all PDFs recursively**
   - Deterministic ordering (`.rglob("*.pdf")`) for reproducibility.

3. **Extracts text from first N pages only**
   - Default: first **2 pages** per PDF.
   - Uses **PyMuPDF (fitz)** for fast, reliable text extraction.
   - Avoids full-document parsing for speed and robustness.

4. **Applies compiled regex rules**
   - Labels cases such as:
     - Unfair dismissal (ERA 1996 s98)
     - Constructive dismissal
     - Whistleblowing / PIDA
     - Discrimination (EqA 2010)
     - Redundancy
     - TUPE
   - Regex is **CPU-cheap**; PDF decoding is the dominant cost.

5. **Runs in true parallel**
   - Uses `ProcessPoolExecutor` (multiprocessing).
   - Fully bypasses the Python GIL.
   - Scales across all CPU cores (e.g. 24 cores on Threadripper).

6. **Caches results**
   - Cache key = `(file path, size, mtime)`.
   - Unchanged PDFs are **skipped instantly** on reruns.
   - Makes incremental re-indexing cheap.

7. **Writes clean outputs**
   - `labels__{MODE}.jsonl`  
     One row per PDF with labels and basic metadata.
   - `failures__{MODE}.jsonl`  
     PDFs with no extractable text or parse errors (for OCR/LLM later).
   - `stats__{MODE}.json`  
     Aggregate counts, timings, and hit distribution.
   - `cache__{MODE}.json`  
     Incremental processing state.

---

### What it deliberately does *not* do

- ❌ No full-PDF parsing  
- ❌ No OCR  
- ❌ No embeddings / FAISS  
- ❌ No GPU usage  

Those are **Phase 2+**, run only on the shortlisted subset.

---

### Why this design

- **Speed first**: 130k+ PDFs become tractable in hours, not days.
- **Low risk**: first-page text is high-signal and cheap.
- **Fail-loud**: bad PDFs are isolated, not silently skipped.
- **Composable**: output feeds directly into semantic indexing or LLM pipelines.

---

### When to move to Phase 2

After this pass, use the labelled subset to:
- build FAISS indexes,
- run semantic deduplication,
- apply LLM-based legal reasoning.

This script is the **brute-force front gate**, not the courtroom.


In [8]:
from __future__ import annotations

import os
import re
import json
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional
from concurrent.futures import ProcessPoolExecutor, as_completed

# =========================
# SINGLE SELECTOR (HAMSTER SAFE)
# =========================
MODE = "EAT"  # "EAT" or "ET"
if MODE not in {"EAT", "ET"}:
    raise ValueError("MODE must be 'EAT' or 'ET'")

# =========================
# HARD PATHS (EXPLICIT)
# =========================
PDF_DIRS = {
    "EAT": Path("/media/hello/Tribunals/EAT_Appeals").resolve(),
    "ET":  Path("/media/hello/Tribunals/ET_Cases").resolve(),
}
PDF_ROOT = PDF_DIRS[MODE]
if not PDF_ROOT.exists():
    raise FileNotFoundError(f"[{MODE}] PDF_ROOT does not exist: {PDF_ROOT}")
if not PDF_ROOT.is_dir():
    raise NotADirectoryError(f"[{MODE}] PDF_ROOT is not a directory: {PDF_ROOT}")

# =========================
# OUTPUT DIR (MODE-SCOPED)
# =========================
BASE_OUT_DIR = Path("/home/hello/Appeal_reader/indexes").resolve()

OUT_DIR = (BASE_OUT_DIR / MODE).resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)


CACHE_PATH = OUT_DIR / f"cache__{MODE}.json"
LABELS_PATH = OUT_DIR / f"labels__{MODE}.jsonl"
FAILS_PATH = OUT_DIR / f"failures__{MODE}.jsonl"
STATS_PATH = OUT_DIR / f"stats__{MODE}.json"

# =========================
# TUNING
# =========================
FIRST_N_PAGES = 2            # sniff only first pages
MAX_WORKERS = max(1, os.cpu_count() or 1)  # TR 24c -> go wide
CHUNK_PRINT_EVERY = 2000     # progress prints

# =========================
# REGEX RULES (FAST TRIAGE)
# =========================
RULES: Dict[str, re.Pattern] = {
    "UNFAIR_DISMISSAL": re.compile(r"\bunfair dismissal\b|\bera\s*1996\b|\bsection\s*98\b|\bs\.?\s*98\b|\bs\.?\s*98\s*\(\s*4\s*\)\b", re.I),
    "CONSTRUCTIVE_DISMISSAL": re.compile(r"\bconstructive dismissal\b", re.I),
    "WRONGFUL_DISMISSAL": re.compile(r"\bwrongful dismissal\b", re.I),
    "WHISTLEBLOWING_PIDA": re.compile(r"\bwhistleblow|\bprotected disclosure\b|\bpida\b|\bpublic interest disclosure\b", re.I),
    "DISCRIMINATION": re.compile(r"\bequality act\b|\beqa\s*2010\b|\bdiscrimination\b|\bharass\w*\b|\bvictimisa\w*\b", re.I),
    "REDUNDANCY": re.compile(r"\bredundan\w*\b", re.I),
    "TUPE": re.compile(r"\btupe\b|\btransfer of undertakings\b", re.I),
}

# =========================
# HELPERS
# =========================
def load_cache(path: Path) -> Dict[str, Any]:
    if not path.exists():
        return {}
    try:
        obj = json.loads(path.read_text(encoding="utf-8"))
        return obj if isinstance(obj, dict) else {}
    except Exception:
        return {}

def save_cache(path: Path, cache: Dict[str, Any]) -> None:
    tmp = path.with_suffix(".tmp")
    tmp.write_text(json.dumps(cache, ensure_ascii=False), encoding="utf-8")
    tmp.replace(path)

def iter_pdfs(root: Path) -> List[Path]:
    # deterministic ordering
    return sorted(root.rglob("*.pdf"))

def file_sig(p: Path) -> Tuple[int, int]:
    st = p.stat()
    return (int(st.st_size), int(st.st_mtime))

def classify_text(txt: str) -> Dict[str, bool]:
    if not txt:
        return {k: False for k in RULES}
    return {k: bool(rx.search(txt)) for k, rx in RULES.items()}

def extract_first_pages_text(pdf_path: str, first_n_pages: int) -> str:
    """
    Uses PyMuPDF (fitz) because it's fast and reliable for text sniffing.
    """
    import fitz  # type: ignore

    doc = fitz.open(pdf_path)
    n = min(first_n_pages, doc.page_count)
    parts = []
    for i in range(n):
        page = doc.load_page(i)
        parts.append(page.get_text("text") or "")
    doc.close()
    return "\n".join(parts)

def worker(pdf_path: str, first_n_pages: int) -> Dict[str, Any]:
    t0 = time.time()
    try:
        txt = extract_first_pages_text(pdf_path, first_n_pages)
        labels = classify_text(txt)
        out = {
            "path": pdf_path,
            "labels": labels,
            "text_ok": True,
            "text_chars": len(txt),
            "elapsed_s": round(time.time() - t0, 4),
        }
        return out
    except Exception as e:
        return {
            "path": pdf_path,
            "labels": {k: False for k in RULES},
            "text_ok": False,
            "error": repr(e),
            "elapsed_s": round(time.time() - t0, 4),
        }

# =========================
# MAIN
# =========================
def main() -> None:
    cache = load_cache(CACHE_PATH)

    pdfs = iter_pdfs(PDF_ROOT)
    total = len(pdfs)
    print(f"[{MODE}] PDF_ROOT={PDF_ROOT}")
    print(f"[{MODE}] PDFs found: {total:,}")
    print(f"[{MODE}] MAX_WORKERS={MAX_WORKERS} FIRST_N_PAGES={FIRST_N_PAGES}")
    print(f"[{MODE}] OUT_DIR={OUT_DIR}")

    # Build task list (incremental)
    tasks: List[Path] = []
    reused = 0
    for p in pdfs:
        sig = file_sig(p)
        key = str(p)
        prev = cache.get(key)
        if prev and prev.get("sig") == list(sig):
            reused += 1
        else:
            tasks.append(p)

    print(f"[{MODE}] Cache reused: {reused:,}")
    print(f"[{MODE}] To process:   {len(tasks):,}")

    # Open outputs (append-friendly, but we will rewrite fresh for simplicity)
    # We'll accumulate results then write JSONL once to avoid duplicates.
    results: Dict[str, Any] = {}
    failures: List[Dict[str, Any]] = []

    # Pre-load cached results into results dict
    for p in pdfs:
        key = str(p)
        prev = cache.get(key)
        if prev and "result" in prev:
            results[key] = prev["result"]

    processed = 0
    t_start = time.time()

    if tasks:
        with ProcessPoolExecutor(max_workers=MAX_WORKERS) as ex:
            futs = {ex.submit(worker, str(p), FIRST_N_PAGES): str(p) for p in tasks}
            for fut in as_completed(futs):
                path = futs[fut]
                res = fut.result()
                results[path] = res
                cache[path] = {"sig": list(file_sig(Path(path))), "result": res}
                processed += 1

                if (processed % CHUNK_PRINT_EVERY) == 0:
                    elapsed = time.time() - t_start
                    print(f"[{MODE}] processed {processed:,}/{len(tasks):,} new in {elapsed:.1f}s")

    # Write JSONL fresh (deduped, deterministic order)
    ok_count = 0
    hit_counts = {k: 0 for k in RULES}
    text_fail = 0

    with LABELS_PATH.open("w", encoding="utf-8") as f_out, FAILS_PATH.open("w", encoding="utf-8") as f_fail:
        for p in pdfs:
            key = str(p)
            res = results.get(key)
            if not res:
                # Shouldn't happen, but guard
                continue

            if not res.get("text_ok"):
                text_fail += 1
                f_fail.write(json.dumps(res, ensure_ascii=False) + "\n")
            else:
                ok_count += 1

            labels = res.get("labels") or {}
            for k in RULES:
                if labels.get(k):
                    hit_counts[k] += 1

            f_out.write(json.dumps(res, ensure_ascii=False) + "\n")

    # Save cache + stats
    save_cache(CACHE_PATH, cache)

    stats = {
        "mode": MODE,
        "pdf_root": str(PDF_ROOT),
        "total_pdfs": total,
        "text_ok": ok_count,
        "text_fail": text_fail,
        "hits": hit_counts,
        "first_n_pages": FIRST_N_PAGES,
        "max_workers": MAX_WORKERS,
        "labels_path": str(LABELS_PATH),
        "fails_path": str(FAILS_PATH),
        "cache_path": str(CACHE_PATH),
        "elapsed_s": round(time.time() - t_start, 2),
    }
    STATS_PATH.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")

    print(f"[{MODE}] DONE.")
    print(json.dumps(stats, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    main()


[EAT] PDF_ROOT=/media/hello/Tribunals/EAT_Appeals
[EAT] PDFs found: 2,487
[EAT] MAX_WORKERS=48 FIRST_N_PAGES=2
[EAT] OUT_DIR=/home/hello/Appeal_reader/indexes/EAT
[EAT] Cache reused: 2,483
[EAT] To process:   4
[EAT] DONE.
{
  "mode": "EAT",
  "pdf_root": "/media/hello/Tribunals/EAT_Appeals",
  "total_pdfs": 2487,
  "text_ok": 2486,
  "text_fail": 1,
  "hits": {
    "UNFAIR_DISMISSAL": 192,
    "CONSTRUCTIVE_DISMISSAL": 19,
    "WRONGFUL_DISMISSAL": 19,
    "WHISTLEBLOWING_PIDA": 60,
    "DISCRIMINATION": 358,
    "REDUNDANCY": 49,
    "TUPE": 14
  },
  "first_n_pages": 2,
  "max_workers": 48,
  "labels_path": "/home/hello/Appeal_reader/indexes/EAT/labels__EAT.jsonl",
  "fails_path": "/home/hello/Appeal_reader/indexes/EAT/failures__EAT.jsonl",
  "cache_path": "/home/hello/Appeal_reader/indexes/EAT/cache__EAT.json",
  "elapsed_s": 0.52
}


In [9]:
import polars as pl

path = "/home/hello/Appeal_reader/indexes/EAT/labels__EAT.jsonl"

df = pl.read_ndjson(path)

label_cols = df.select("labels").schema["labels"].fields  # list of StructField
names = [f.name for f in label_cols]

df = df.with_columns([pl.col("labels").struct.field(n).alias(n) for n in names]).drop("labels")

df.head()


path,text_ok,text_chars,elapsed_s,UNFAIR_DISMISSAL,CONSTRUCTIVE_DISMISSAL,WRONGFUL_DISMISSAL,WHISTLEBLOWING_PIDA,DISCRIMINATION,REDUNDANCY,TUPE
str,bool,i64,f64,bool,bool,bool,bool,bool,bool,bool
"""/media/hello/Tribunals/EAT_App…",True,2749,0.1603,True,False,False,False,False,False,False
"""/media/hello/Tribunals/EAT_App…",True,3220,0.1379,True,False,False,True,False,False,False
"""/media/hello/Tribunals/EAT_App…",True,994,0.1234,False,False,False,False,False,False,False
"""/media/hello/Tribunals/EAT_App…",True,728,0.1229,False,False,False,False,False,False,False
"""/media/hello/Tribunals/EAT_App…",True,1035,0.1347,False,False,False,False,False,False,False
