# Corpus Quality Checks

This notebook inspects the processed corpus to verify text coverage, sidecar availability, and metadata consistency.

## Usage

Before running this notebook:

1. **Set the paper run identifier** (if your data is organized in run-specific subdirectories):
   - Option 1: Set environment variable: `export BTC_PAPER_RUN="cog_psych_2025_run01"`
   - Option 2: Edit the `PAPER_RUN` variable in the first code cell
   - Option 3: Leave as `None` to use the base `data/processed/` and `data/raw/` directories

2. **Ensure data is processed**: Run `python scripts/ingest_and_index.py --config configs/default.yaml` first

3. **Check paths**: The notebook will print the paths it's using - verify they're correct

## What This Notebook Checks

- **Manifest validation**: Verifies manifest.json exists and has expected structure
- **Text coverage**: Token counts, character counts per document
- **Sidecar availability**: Checks for `.pages.jsonl` files
- **Math/LaTeX content**: Detects mathematical notation density
- **Citation markers**: Finds inline citation patterns like [1], [2]
- **Language detection**: Identifies non-English or mixed-language documents
- **Metadata consistency**: Validates metadata.jsonl structure

In [None]:
from __future__ import annotations

import json
import os
from collections import Counter
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import display

# Optional: Load config to use project paths
try:
    from beyond_the_cutoff.config import load_config

    USE_CONFIG = True
except ImportError:
    USE_CONFIG = False


def _find_project_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / "pyproject.toml").exists():
            return candidate
    return start


# Configuration: Set the paper run identifier for this analysis
# If your data is organized in run-specific subdirectories, set this.
# Otherwise, leave as None or empty string to use the base data directories.
PAPER_RUN = os.environ.get("BTC_PAPER_RUN", "cog_psych_2025_run01") or None

PROJECT_ROOT = _find_project_root(Path.cwd().resolve())

# Use config if available, otherwise fall back to hardcoded paths
if USE_CONFIG:
    try:
        config = load_config()
        base_processed = config.paths.processed_data
        base_raw = config.paths.raw_data
    except Exception:
        USE_CONFIG = False

if not USE_CONFIG:
    base_processed = PROJECT_ROOT / "data/processed"
    base_raw = PROJECT_ROOT / "data/raw"

# Construct paths with optional run subdirectory
if PAPER_RUN:
    PROCESSED_DIR = base_processed / PAPER_RUN
    RAW_DIR = base_raw / PAPER_RUN
else:
    PROCESSED_DIR = base_processed
    RAW_DIR = base_raw

DATA_QUALITY_DIR = PROJECT_ROOT / f"evaluation/results/data_quality/{PAPER_RUN or 'default'}"

# Manifest is typically at the root of processed_dir, not in a subdirectory
MANIFEST_PATH = PROCESSED_DIR / "manifest.json"
# If manifest is not found, try the parent directory (for flat structure)
if not MANIFEST_PATH.exists() and PROCESSED_DIR.parent / "manifest.json" != MANIFEST_PATH:
    alt_manifest = PROCESSED_DIR.parent / "manifest.json"
    if alt_manifest.exists():
        MANIFEST_PATH = alt_manifest

METADATA_JSONL = RAW_DIR / "metadata.jsonl"
SELECTION_LOG = RAW_DIR / "selection_log.jsonl"

print(f"Project root: {PROJECT_ROOT}")
print(f"Processed directory: {PROCESSED_DIR}")
print(f"Raw directory: {RAW_DIR}")
print(f"Manifest path: {MANIFEST_PATH}")
print(f"Manifest exists: {MANIFEST_PATH.exists()}")

plt.rcParams.update({"figure.figsize": (8, 4), "axes.grid": True})

In [None]:
import re


def load_manifest(path: Path) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(
            f"Processed manifest missing at {path}\n"
            f"Run 'python scripts/ingest_and_index.py --config configs/default.yaml' first."
        )
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError as e:
        raise ValueError(f"Manifest file is not valid JSON: {e}") from e

    docs = data.get("documents", [])
    if not isinstance(docs, list):
        raise TypeError("Manifest documents field must be a list")

    records: list[dict[str, Any]] = []
    for entry in docs:
        if not isinstance(entry, dict):
            continue
        record = entry.copy()
        # Handle both relative and absolute paths in manifest
        text_path_str = record.get("text_path", "")
        if Path(text_path_str).is_absolute():
            record["text_path"] = Path(text_path_str)
        else:
            record["text_path"] = PROCESSED_DIR / text_path_str

        pages_path = record.get("pages_path")
        if pages_path:
            if Path(pages_path).is_absolute():
                record["pages_path"] = Path(pages_path)
            else:
                record["pages_path"] = PROCESSED_DIR / pages_path
        else:
            record["pages_path"] = None

        records.append(record)

    frame = pd.DataFrame(records)
    if not frame.empty:
        frame["has_pages_sidecar"] = frame["pages_path"].apply(
            lambda p: bool(p and Path(p).exists())
        )
    return frame


def load_metadata(jsonl_path: Path) -> pd.DataFrame:
    if not jsonl_path.exists():
        raise FileNotFoundError(f"Metadata JSONL missing at {jsonl_path}")
    rows: list[dict[str, Any]] = []
    with jsonl_path.open("r", encoding="utf-8") as handle:
        for line in handle:
            line = line.strip()
            if not line:
                continue
            payload = json.loads(line)
            if isinstance(payload, dict):
                rows.append(payload)
    return pd.DataFrame(rows)


def estimate_tokens(text_path: Path) -> tuple[int, int]:
    text = text_path.read_text(encoding="utf-8", errors="ignore")
    words = text.split()
    return len(text), len(words)


MATH_SYMBOLS = set("=±∑∏√∞∂∀∃≤≥×÷∫∇≃≈≠≡⊕⊗⊥⇒⇔→←↔†‡")
LATEX_ENV_PATTERN = re.compile(
    r"\\begin{(?:align\*?|equation\*?|gather\*?|multline\*?|cases|tabular|array)}",
    re.IGNORECASE,
)
LATEX_INLINE_PATTERN = re.compile(
    r"\\(?:frac|sum|int|sqrt|mathbb|mathrm|mathbf|alpha|beta|gamma|delta|lambda|pi|phi|psi|theta)",
    re.IGNORECASE,
)
TABLE_MARKER_PATTERN = re.compile(
    r"\\begin{tabular}|\\hline|\\toprule|\\midrule|\\bottomrule|&",
    re.IGNORECASE,
)
ASCII_TABLE_PATTERN = re.compile(r"^[+|].*[+|]$")
MATH_TOKEN_PATTERN = re.compile(
    r"(\\[A-Za-z]+)|([A-Za-z]*_[A-Za-z0-9]+)|([A-Za-z]*\\^[A-Za-z0-9]+)"
)


def compute_text_features(text: str) -> dict[str, float]:
    tokens = text.split()
    lines = text.splitlines()
    token_count = len(tokens)
    line_count = len(lines)
    if token_count == 0:
        return {
            "math_token_ratio": 0.0,
            "inline_math_hits": 0,
            "latex_env_hits": 0,
            "table_line_ratio": 0.0,
            "dollar_inline_hits": 0,
        }

    math_tokens = 0
    inline_hits = 0
    for token in tokens:
        if LATEX_INLINE_PATTERN.search(token):
            inline_hits += 1
        if any(ch in MATH_SYMBOLS for ch in token):
            math_tokens += 1
            continue
        if ("\\" in token) or ("^" in token) or ("_" in token):
            if MATH_TOKEN_PATTERN.search(token):
                math_tokens += 1
                continue
        digits = sum(ch.isdigit() for ch in token)
        if digits >= max(2, len(token) // 2):
            math_tokens += 1

    latex_env_hits = len(LATEX_ENV_PATTERN.findall(text))
    dollar_inline_hits = text.count("$")

    table_lines = 0
    for line in lines:
        stripped = line.strip()
        if not stripped:
            continue
        if ASCII_TABLE_PATTERN.match(stripped):
            table_lines += 1
            continue
        if stripped.count("|") >= 3:
            table_lines += 1
            continue
        if TABLE_MARKER_PATTERN.search(stripped):
            table_lines += 1

    return {
        "math_token_ratio": math_tokens / token_count,
        "inline_math_hits": inline_hits,
        "latex_env_hits": latex_env_hits,
        "table_line_ratio": table_lines / max(1, line_count),
        "dollar_inline_hits": dollar_inline_hits,
    }


def ensure_output_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)

In [None]:
try:
    manifest_df = load_manifest(MANIFEST_PATH)
    display(manifest_df.head())
    print(f"Total documents in manifest: {len(manifest_df)}")
    if not manifest_df.empty:
        total_bytes = manifest_df["bytes"].sum()
        print(f"Total text bytes: {total_bytes:,}")
        manifest_meta = json.loads(MANIFEST_PATH.read_text(encoding="utf-8"))
        print(f"Manifest generated_at: {manifest_meta.get('generated_at', 'unknown')}")
        print(f"Manifest total_documents: {manifest_meta.get('total_documents', 'unknown')}")
    else:
        print("Warning: Manifest is empty!")
except FileNotFoundError as e:
    print(f"Error: {e}")
    print("\nTo fix this:")
    print("1. Ensure you've run: python scripts/ingest_and_index.py --config configs/default.yaml")
    print("2. Check that PROCESSED_DIR is correct (printed above)")
    raise
except Exception as e:
    print(f"Error loading manifest: {e}")
    raise

In [None]:
# Load metadata if available (optional - may not exist for all datasets)
if METADATA_JSONL.exists():
    try:
        metadata_df = load_metadata(METADATA_JSONL)
        print(f"Metadata rows: {len(metadata_df)}")
        if not metadata_df.empty and "arxiv_id" in metadata_df.columns:
            duplicates = metadata_df["arxiv_id"].str.split("v").str[0].value_counts()
            print(f"Unique arXiv IDs (without version): {len(duplicates)}")
            if len(duplicates) < len(metadata_df):
                print(f"Note: {len(metadata_df) - len(duplicates)} duplicate IDs found")
        display(metadata_df.head() if not metadata_df.empty else None)
    except Exception as e:
        print(f"Warning: Failed to load metadata: {e}")
        metadata_df = pd.DataFrame()
else:
    print(f"Metadata file not found at {METADATA_JSONL}")
    print("This is optional - metadata.jsonl may not exist for all datasets")
    metadata_df = pd.DataFrame()

In [None]:
# Ensure manifest_df is loaded before proceeding
if "manifest_df" not in locals() or manifest_df.empty:
    raise RuntimeError(
        "Manifest is empty or not loaded. " "Run the previous cell to load the manifest first."
    )

byte_lengths: list[int] = []
token_counts: list[int] = []
sidecar_pages: list[float] = []
documents: list[str] = []
math_token_ratios: list[float] = []
inline_math_densities: list[float] = []
latex_env_densities: list[float] = []
table_line_ratios: list[float] = []
dollar_inline_densities: list[float] = []
citation_marker_totals: list[int] = []
citation_marker_unique: list[int] = []
max_citation_indices: list[int] = []
footnote_url_counts: list[int] = []

CITATION_PATTERN = re.compile(r"\[(\d{1,3})\]")
FOOTNOTE_URL_PATTERN = re.compile(r"\n\d+\s+https?://")

for row in manifest_df.itertuples(index=False):
    text_path = Path(row.text_path)
    if not text_path.exists():
        continue
    documents.append(text_path.name)

    chars, tokens = estimate_tokens(text_path)
    text = text_path.read_text(encoding="utf-8", errors="ignore")
    features = compute_text_features(text)

    byte_lengths.append(chars)
    token_counts.append(tokens)

    tokens_norm = max(1, tokens)
    math_token_ratios.append(features["math_token_ratio"])
    inline_math_densities.append(features["inline_math_hits"] / tokens_norm * 1000)
    latex_env_densities.append(features["latex_env_hits"] / tokens_norm * 1000)
    table_line_ratios.append(features["table_line_ratio"])
    dollar_inline_densities.append(features["dollar_inline_hits"] / tokens_norm * 1000)

    citations = [int(mark) for mark in CITATION_PATTERN.findall(text)]
    citation_marker_totals.append(len(citations))
    if citations:
        unique_citations = set(citations)
        citation_marker_unique.append(len(unique_citations))
        max_citation_indices.append(max(unique_citations))
    else:
        citation_marker_unique.append(0)
        max_citation_indices.append(0)

    footnote_url_counts.append(len(FOOTNOTE_URL_PATTERN.findall(text)))

    pages_path = row.pages_path
    if pages_path:
        with Path(pages_path).open("r", encoding="utf-8") as handle:
            page_counter = sum(1 for _ in handle)
    else:
        page_counter = float("nan")
    sidecar_pages.append(page_counter)

summary = pd.DataFrame(
    {
        "document": documents,
        "tokens": token_counts,
        "chars": byte_lengths,
        "sidecar_pages": sidecar_pages,
        "math_token_ratio": math_token_ratios,
        "inline_math_per_1k_tokens": inline_math_densities,
        "latex_env_per_1k_tokens": latex_env_densities,
        "table_line_ratio": table_line_ratios,
        "inline_dollar_per_1k_tokens": dollar_inline_densities,
        "citation_marker_total": citation_marker_totals,
        "citation_marker_unique": citation_marker_unique,
        "max_citation_index": max_citation_indices,
        "footnote_url_count": footnote_url_counts,
    }
)
numeric_cols = summary.select_dtypes(include="number").columns
summary_stats = summary[numeric_cols].describe(percentiles=[0.05, 0.5, 0.95])
display(summary_stats)

print("Documents missing sidecars:", (~manifest_df["has_pages_sidecar"]).sum())
print(
    "Documents containing [n] citation markers:",
    (summary["citation_marker_total"] > 0).sum(),
    f"/ {len(summary)}",
)
print(
    "Max citation index observed:",
    int(summary["max_citation_index"].max()) if not summary.empty else 0,
)
print(
    "Documents with footnote-style URL markers:",
    (summary["footnote_url_count"] > 0).sum(),
)

if not summary.empty:
    top_token_df = summary.sort_values("tokens", ascending=False).head(5)[
        [
            "document",
            "tokens",
            "citation_marker_total",
            "citation_marker_unique",
            "footnote_url_count",
        ]
    ]
    print("Top 5 documents by token count:")
    display(top_token_df)

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].hist(summary["tokens"], bins=30, color="#1f77b4")
axes[0].set_title("Token count distribution")
axes[0].set_xlabel("Tokens per document")
axes[0].set_ylabel("Frequency")

axes[1].hist(summary["sidecar_pages"].dropna(), bins=30, color="#ff7f0e")
axes[1].set_title("Pages per sidecar")
axes[1].set_xlabel("Pages")
axes[1].set_ylabel("Frequency")
plt.tight_layout()
plt.show()

In [None]:
if not summary.empty:
    print("Most math-heavy documents (top 5):")
    display(
        summary.sort_values("math_token_ratio", ascending=False).head(5)[
            ["document", "math_token_ratio", "inline_math_per_1k_tokens", "latex_env_per_1k_tokens"]
        ]
    )

    print("Documents with table-like structure signals (top 5):")
    display(
        summary.sort_values("table_line_ratio", ascending=False).head(5)[
            ["document", "table_line_ratio", "inline_dollar_per_1k_tokens"]
        ]
    )

    high_math = summary[summary["math_token_ratio"] > 0.25]
    if not high_math.empty:
        print(f"Documents exceeding math token ratio 0.25: {len(high_math)}")

    table_heavy = summary[summary["table_line_ratio"] > 0.15]
    if not table_heavy.empty:
        print(f"Documents exceeding table-line ratio 0.15: {len(table_heavy)}")
else:
    print("No documents available for math/table analysis.")

In [None]:
# Language detection requires langid package
# Install with: pip install langid
try:
    import langid

    LANGID_AVAILABLE = True
except ImportError:
    LANGID_AVAILABLE = False
    print("Warning: langid package not available. Language detection will be skipped.")
    print("Install with: pip install langid")

if LANGID_AVAILABLE:
    # Detect languages for each document using multiple samples
    detected_languages = []
    non_english_docs = []
    confidences = []
    doc_details = []
    mixed_language_docs = []

    def get_samples(text: str, num_samples: int = 3, sample_size: int = 1000):
        """Get multiple samples from the text."""
        length = len(text)
        if length <= sample_size:
            return [text]
        samples = []
        step = length // (num_samples + 1)
        for i in range(1, num_samples + 1):
            start = i * step
            end = min(start + sample_size, length)
            samples.append(text[start:end])
        return samples

    for row in manifest_df.itertuples(index=False):
        text_path = Path(row.text_path)
        if not text_path.exists():
            continue
        try:
            text = text_path.read_text(encoding="utf-8", errors="ignore")
            samples = get_samples(text, num_samples=3, sample_size=2000)
            langs = []
            confs = []
            for sample in samples:
                if sample.strip():
                    lang, conf = langid.classify(sample)
                    langs.append(lang)
                    confs.append(conf)
            # Use the most common language, or if tie, the one with highest confidence
            if langs:
                lang_counts = Counter(langs)
                most_common = lang_counts.most_common(1)[0][0]
                # Average confidence for that language
                avg_conf = sum(
                    conf
                    for lang_code, conf in zip(langs, confs, strict=False)
                    if lang_code == most_common
                ) / langs.count(most_common)
                detected_languages.append(most_common)
                confidences.append(avg_conf)
                doc_details.append((text_path.name, most_common, avg_conf, langs, confs))
                if most_common != "en":
                    non_english_docs.append((text_path.name, most_common, avg_conf))
                # Check if any sample is not English
                if any(lang_code != "en" for lang_code in langs):
                    mixed_language_docs.append((text_path.name, langs, confs))
            else:
                detected_languages.append("unknown")
                confidences.append(0.0)
                doc_details.append((text_path.name, "unknown", 0.0, [], []))
                non_english_docs.append((text_path.name, "unknown", 0.0))
        except Exception as e:
            print(f"Warning: Failed to process {text_path.name}: {e}")
            detected_languages.append("unknown")
            confidences.append(0.0)
            doc_details.append((text_path.name, "unknown", 0.0, [], []))
            non_english_docs.append((text_path.name, "unknown", 0.0))

    # Summary of languages
    lang_counts = Counter(detected_languages)
    print("Language distribution:")
    for lang, count in lang_counts.most_common():
        print(f"{lang}: {count}")

    print(f"\nTotal documents: {len(detected_languages)}")
    print(f"Non-English documents: {len(non_english_docs)}")
    print(f"Documents with mixed/foreign language content: {len(mixed_language_docs)}")

    if non_english_docs:
        print("\nNon-English documents:")
        for doc, lang, conf in non_english_docs[:10]:  # Show first 10
            print(f"{doc}: {lang} (avg confidence: {conf:.2f})")
        if len(non_english_docs) > 10:
            print(f"... and {len(non_english_docs) - 10} more")
    else:
        print("All documents appear to be in English.")

    if mixed_language_docs:
        print("\nDocuments with mixed/foreign language content:")
        for doc, langs, confs in mixed_language_docs:
            print(f"{doc}: languages {langs}, confidences {[f'{c:.2f}' for c in confs]}")
    else:
        print("No documents with mixed language content detected.")

    # Show average confidence
    if confidences:
        avg_conf = sum(confidences) / len(confidences)
        print(f"\nAverage language detection confidence: {avg_conf:.2f}")
else:
    print("Skipping language detection (langid not available)")

In [None]:
ensure_output_dir(DATA_QUALITY_DIR / "plots")
output_csv = DATA_QUALITY_DIR / "qa_summary_stats.csv"
summary_stats.to_csv(output_csv, index=True)
print(f"Wrote summary stats to {output_csv}")

selection_entries: list[dict[str, Any]] = []
if SELECTION_LOG.exists():
    with SELECTION_LOG.open("r", encoding="utf-8") as handle:
        for line in handle:
            line = line.strip()
            if not line:
                continue
            selection_entries.append(json.loads(line))
if selection_entries:
    batches = Counter(entry.get("selection_batch", "unknown") for entry in selection_entries)
    print("Selection batches recorded:", batches)
else:
    print("No selection log entries found; ensure selection_log.jsonl is populated in future runs.")