Model choice (en_core_web_sm vs others)

stronger baseline NER out of the box: en_core_web_trf (transformer).

Keep sm for iteration/debug (fast feedback). Add a second track where you run the exact same probe suites through en_core_web_trf during testing to quantify how much you gain from a stronger baseline.

1. Notebook Configuration
2. Global Constants and Constraints
3. Model Loading & Pipeline Assembly
4. Entity Modules
- 4.1 `PERSON`
- 4.2 `TITLE`
- 4.3 `MONEY`
- 4.4 `ORG`
5. Shared Helpers (Printing, Selection, Normalization)
6. Data loading + canonical doc objects
7. Gold parsing + alignment validator
- 7.1 Section Scope
- 7.2 Gold Assumptions
- 7.3 Prediction Source
- 7.4 Alignment Modes
- 7.5 Match Accounting
- 7.6 Metrics
8. Run harness (Single + Twin)
- 8.1 Execution Mode
- 8.2 Section Routing
- 8.3 Pipeline Variants
- 8.4 Prediction Normalization
- 8.5 Harness Outputs
9. Outputs (human + machine)
- 9.1 Machine Outputs
- 9.2 Human Outputs
10. Scoring
11. Findings/Notes

### 1. Notebook Configuration
- spacy: pipeline loading, docs
- EntityRuler: TITLE, MONEY_CANDIDATE, ORG_CANDIDATE rules
- Span: manual span construction (suffix merges, candidate spans)
- defaultdict: entity inventories grouped by label
- typing: keeps extractor contracts explicit and readable
- re: regex-backed patterns
- string: controlled punctuation handling (money/org normalization)


In [1]:
# ===== Section 0.1: Environment Setup (Colab-safe) =====

# Core spaCy
#!pip install -U spacy

# Small English model
#!python -m spacy download en_core_web_sm

# Transformer English model (large, slow, but higher quality)
!python -m spacy download en_core_web_trf


Collecting en-core-web-trf==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_trf-3.8.0/en_core_web_trf-3.8.0-py3-none-any.whl (457.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m866.7 kB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_trf')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [2]:
import spacy

#Verify Models are loaded
for model in ("en_core_web_sm", "en_core_web_trf"):
    try:
        nlp = spacy.load(model)
        print(f"{model} loaded | pipes={nlp.pipe_names}")
    except Exception as e:
        print(f"{model} FAILED:", e)

en_core_web_sm loaded | pipes=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']
en_core_web_trf loaded | pipes=['transformer', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']


In [3]:

# Core NLP
from spacy.pipeline import EntityRuler
from spacy.tokens import Span as SpacySpan

# Data structures / utilities
from collections import defaultdict, Counter
from typing import Any, List, Dict, Tuple, Iterable, Optional
from dataclasses import dataclass
from pathlib import Path
import json
from __future__ import annotations
import pandas as pd

# Text / normalization helpers
import re
import string

In [4]:
# Mount Drive and point to project
from google.colab import drive

drive.mount("/content/drive")

PROJECT_ROOT = Path("/content/drive/MyDrive/Task 3")

DATA_PATH = PROJECT_ROOT / "primary_data.json"
GOLD_DIR = PROJECT_ROOT / "gold_annotations"

assert DATA_PATH.exists(), DATA_PATH
assert GOLD_DIR.exists(), GOLD_DIR

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### 2. Global Entity Constraints (spaCy-Focused)

This section defines shared constants used by spaCy-based extraction:
- EntityRuler patterns
- candidate filtering
- normalization and exclusion rules

Regex-heavy extractors and legacy heuristics are intentionally excluded.
Those live in separate regex-focused notebooks.

In [5]:
# PERSON — Post-processing rules
# Suffixes allowed to merge into PERSON spans (handled via Span logic)
PERSON_SUFFIXES = {"jr.", "sr.", "ii", "iii", "iv", "v"}

# Tokens that should invalidate a PERSON candidate if included
NON_PERSON_TOKENS = {
    "Chief", "President", "Director", "Chairman", "Officer",
    "Executive", "Committee", "Board", "Company",
    "Inc", "Inc.", "Corp", "Corp.", "Corporation",
    "LLC", "L.L.C.", "Ltd", "Ltd.",
}

# Structural phrases that should never resolve to PERSON
PERSON_BLOCKLIST = {
    "Business Conduct",
    "Business Experience",
    "Additional Information",
}

# TITLE — Canonical vocabulary
# Used for EntityRuler rules + checks
TITLE_TERMS = [
    "Chief Executive Officer", "CEO",
    "Chief Financial Officer", "CFO",
    "Chief Operating Officer", "COO",
    "Chief Accounting Officer", "CAO",
    "President", "Vice President",
    "Chairman", "Chair",
    "Director", "Lead Director",
    "General Counsel", "Secretary", "Treasurer",
]

# Longest-first prevents partial matches (e.g., CEO inside CEO/CFO)
TITLE_TERMS_SORTED = sorted(TITLE_TERMS, key=len, reverse=True)

# ORG — Structural constraints
ORG_SUFFIXES = [
    "Inc", "Inc.", "Corp", "Corp.", "Corporation",
    "LLC", "L.L.C.", "LP", "L.P.", "Ltd", "Ltd.",
    "PLC", "P.L.C.", "Limited"
]

# Exact phrases that should never survive ORG filtering
ORG_STOP_EXACT = {
    "company",
    "the company",
    "“company",
    "\"company",
}

# Substrings that invalidate ORG candidates
ORG_STOP_CONTAINS = {
    "delaware corporation",
    "a delaware corporation",
}

# MONEY — Context classification
# Scale words spaCy rules are allowed to treat as money-like
MONEY_SCALES = {"thousand", "million", "billion"}

# Context words used to classify MONEY_CANDIDATE -> REVENUE_MONEY
REVENUE_CONTEXT_TERMS = {
    "revenue",
    "net sales",
    "sales",
    "income",
    "net income",
    "net loss",
    "turnover",
    "totaled",
}


### 3. Model Loading & Pipeline Assembly

This section is responsible for:
- loading one or more spaCy pipelines
- attaching rule-based components (EntityRuler)
- clearly separating **baseline models** from **augmented pipelines**

At this stage:
- a lightweight baseline model for fast iteration
- an augmented rule-first pipeline for controlled extraction

No entity logic lives here: only pipeline configuration.

#### Pipelines
- **Baseline pipelines**
  - `en_core_web_sm`
  - `en_core_web_trf`
  - used as reference points for native spaCy behavior

- **Rule-augmented pipelines**
  - `en_core_web_sm` + EntityRuler components
  - `en_core_web_trf` + EntityRuler components
  - rule components inserted *before* `ner`
  - rule patterns defined in the Entity Modules section `(TITLE, MONEY_CANDIDATE, ORG_CANDIDATE)`

#### Design Notes
- Pipelines are built via small, composable factory functions
- No global mutation of spaCy pipelines (build fresh per call)
- Multiple pipelines may coexist in memory for side-by-side evaluation
- Rules are attached consistently across `sm` and `trf` so comparisons are apples-to-apples


In [6]:
# Section 3: Model Loading & Pipeline Assembly
MODEL_NAMES = {
    "sm": "en_core_web_sm",
    "trf": "en_core_web_trf",
}

def _try_load(model_name: str) -> spacy.language.Language | None:
    """
    Try to load a spaCy pipeline. Return None if not installed.
    """
    try:
        return spacy.load(model_name)
    except OSError as e:
        print(f"[WARN] Could not load '{model_name}'. Is it installed? ({e})")
        return None


def load_base_pipeline(model_name: str) -> spacy.language.Language | None:
    """
    Load a baseline spaCy pipeline (no custom rules). Returns None if unavailable.
    """
    return _try_load(model_name)


def load_augmented_pipeline(
    model_name: str,
    ruler_name: str = "entity_ruler",
    before_component: str = "ner",
) -> spacy.language.Language | None:
    """
    Load a spaCy pipeline and insert an *empty* EntityRuler before NER.
    Patterns are added later by the Entity Modules section.
    Returns None if the model is unavailable.
    """
    nlp = _try_load(model_name)
    if nlp is None:
        return None

    # Defensive: avoid reusing a prior ruler in the same runtime
    if ruler_name in nlp.pipe_names:
        nlp.remove_pipe(ruler_name)

    if before_component in nlp.pipe_names:
        nlp.add_pipe(ruler_name, before=before_component)
    else:
        nlp.add_pipe(ruler_name)

    return nlp


def load_pipelines(model_names: dict = MODEL_NAMES) -> dict:
    """
    Return baseline + augmented pipelines for sm and trf.
    Any missing models will be skipped (value = None).
    """
    sm_base = load_base_pipeline(model_names["sm"])
    sm_aug  = load_augmented_pipeline(model_names["sm"])

    trf_base = load_base_pipeline(model_names["trf"])
    trf_aug  = load_augmented_pipeline(model_names["trf"])

    return {
        "sm_base": sm_base,
        "sm_aug": sm_aug,
        "trf_base": trf_base,
        "trf_aug": trf_aug,
    }


def describe_pipelines(pipes: dict) -> None:
    """
    Print a quick summary of which pipelines loaded and their components.
    """
    print("Loaded pipelines:")
    for key, nlp in pipes.items():
        if nlp is None:
            print(f"  {key}: <not loaded>")
        else:
            name = nlp.meta.get("name", "unknown")
            print(f"  {key}: {name} | pipes={nlp.pipe_names}")


# Build pipelines now (rules get attached later)
pipelines = load_pipelines()
describe_pipelines(pipelines)


Loaded pipelines:
  sm_base: core_web_sm | pipes=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']
  sm_aug: core_web_sm | pipes=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'entity_ruler', 'ner']
  trf_base: core_web_trf | pipes=['transformer', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'ner']
  trf_aug: core_web_trf | pipes=['transformer', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'entity_ruler', 'ner']


### 4. Entity Modules (Rule Components)

This section defines **rule modules per entity**, without running any documents yet.

Goals:
- Keep rules **modular** `(TITLE, MONEY_CANDIDATE, ORG_CANDIDATE)`
- Attach rules consistently to both augmented pipelines (`sm_aug`, `trf_aug`)
- Avoid overengineering: rules should target known SEC patterns and labeling rules

Outputs:
- Functions that attach patterns to a pipeline's `entity_ruler`
- No model mutation outside of `*_aug` pipelines
- Rules are additive and can be expanded later


In [7]:
# Section 4: Entity Modules (Rule Components)
from spacy.tokens import Span
from spacy.language import Language

# Candidate labels should NOT live in doc.ents (finals only)
CANDIDATE_LABEL_TO_SPANKEY = {
    "MONEY_CANDIDATE": "money_candidates",
    "ORG_CANDIDATE": "org_candidates",
}

def _get_ruler(nlp, ruler_name: str = "entity_ruler") -> EntityRuler:
    """
    Fetch the EntityRuler from an augmented pipeline.
    Raises a clear error if you're accidentally using a base pipeline.
    """
    if ruler_name not in nlp.pipe_names:
        raise ValueError(
            f"Pipeline has no '{ruler_name}'. Did you mean to use an *_aug pipeline?\n"
            f"pipes={nlp.pipe_names}"
        )
    return nlp.get_pipe(ruler_name)


@Language.component("candidate_siphon")
def candidate_siphon(doc):
    """
    Move *_CANDIDATE entities into doc.spans[...] and remove them from doc.ents.

    - Keeps offsets intact
    - Does not normalize text
    - Ensures candidates never pollute downstream scoring that reads doc.ents
    """
    # Ensure span keys exist
    for span_key in CANDIDATE_LABEL_TO_SPANKEY.values():
        doc.spans.setdefault(span_key, [])

    kept = []
    for ent in doc.ents:
        span_key = CANDIDATE_LABEL_TO_SPANKEY.get(ent.label_)
        if span_key:
            doc.spans.setdefault(span_key, [])
            doc.spans[span_key].append(SpacySpan(doc, ent.start, ent.end, label=ent.label_))
        else:
            kept.append(ent)

    doc.ents = kept
    return doc

FINAL_LABELS_BLOCK_FROM_NER = {"MONEY", "ORG"}

@Language.component("strip_ner_money_org")
def strip_ner_money_org(doc):
    """
    Remove NER-produced MONEY/ORG from doc.ents in augmented pipelines.
    We will later add back final MONEY/ORG via our own candidate->final logic.
    """
    doc.ents = [ent for ent in doc.ents if ent.label_ not in FINAL_LABELS_BLOCK_FROM_NER]
    return doc


def attach_strip_ner_money_org(
    nlp,
    *,
    name: str = "strip_ner_money_org",
    after: str = "ner",
) -> None:
    if name in nlp.pipe_names:
        nlp.remove_pipe(name)
    if after in nlp.pipe_names:
        nlp.add_pipe(name, after=after)
    else:
        nlp.add_pipe(name)  # fallback


def attach_candidate_siphon(
    nlp,
    *,
    name: str = "candidate_siphon",
    after: str = "entity_ruler",
    before: str = "ner",
) -> None:
    """
    Attach candidate siphon component so that:
      entity_ruler -> candidate_siphon -> ner
    """
    if name in nlp.pipe_names:
        nlp.remove_pipe(name)

    if after in nlp.pipe_names:
        nlp.add_pipe(name, after=after)
    elif before in nlp.pipe_names:
        nlp.add_pipe(name, before=before)
    else:
        nlp.add_pipe(name)


# 4.1 TITLE rules
def add_title_rules(nlp) -> None:
    ruler = _get_ruler(nlp)

    patterns = []
    for term in TITLE_TERMS_SORTED:
        patterns.append({"label": "TITLE", "pattern": [{"LOWER": t.lower()} for t in term.split()]})

    patterns.append({
        "label": "TITLE",
        "pattern": [
            {"LOWER": "chief"},
            {"IS_TITLE": True, "OP": "+"},
            {"LOWER": "officer"},
        ],
    })

    patterns.append({
        "label": "TITLE",
        "pattern": [
            {"LOWER": "chief"},
            {"IS_TITLE": True, "OP": "+"},
            {"TEXT": "\n", "OP": "?"},
            {"LOWER": "officer"},
        ],
    })

    patterns.append({
        "label": "TITLE",
        "pattern": [
            {"IS_TITLE": True, "OP": "+"},
            {"LOWER": "officer"},
        ],
    })

    ruler.add_patterns(patterns)


# 4.2 MONEY_CANDIDATE rules
def add_money_candidate_rules(nlp) -> None:
    ruler = _get_ruler(nlp)

    patterns = [
        {"label": "MONEY_CANDIDATE", "pattern": [{"TEXT": {"REGEX": r"^\$[\d,]+(\.\d+)?$"}}]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"TEXT": "$"},
            {"TEXT": {"REGEX": r"^[\d,]+(\.\d+)?$"}},
        ]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"LIKE_NUM": True},
            {"LOWER": {"IN": sorted(MONEY_SCALES)}},
        ]},
        {"label": "MONEY_CANDIDATE", "pattern": [{"TEXT": {"REGEX": r"^\(\$?[\d,]+(\.\d+)?\)$"}}]},
        {"label": "MONEY_CANDIDATE", "pattern": [{"TEXT": {"REGEX": r"^[\d]+(\.\d+)?[mMbB]$"}}]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"TEXT": "$"},
            {"TEXT": {"REGEX": r"^[\d]+(\.\d+)?$"}},
            {"LOWER": {"IN": ["m", "b"]}},
        ]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"TEXT": "$"},
            {"TEXT": {"REGEX": r"^[\d]+(\.\d+)?[mMbB]\.?$"}},
        ]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"LIKE_NUM": True},
            {"TEXT": "("},
            {"LOWER": "in", "OP": "?"},
            {"LOWER": "thousands"},
            {"TEXT": ")"},
        ]},
        {"label": "MONEY_CANDIDATE", "pattern": [
            {"TEXT": "$"},
            {"LIKE_NUM": True},
            {"TEXT": "("},
            {"LOWER": "in", "OP": "?"},
            {"LOWER": "thousands"},
            {"TEXT": ")"},
        ]},
    ]

    ruler.add_patterns(patterns)


# 4.3 ORG_CANDIDATE rules
def add_org_candidate_rules(nlp) -> None:
    ruler = _get_ruler(nlp)

    suffixes = [s.lower() for s in ORG_SUFFIXES]

    patterns = [
        {"label": "ORG_CANDIDATE", "pattern": [
            {"IS_TITLE": True, "OP": "+"},
            {"TEXT": ",", "OP": "?"},
            {"LOWER": {"IN": suffixes}},
        ]},
        {"label": "ORG_CANDIDATE", "pattern": [
            {"IS_TITLE": True, "OP": "+"},
            {"TEXT": ",", "OP": "?"},
            {"TEXT": {"REGEX": r"^([A-Za-z]\.){2,}[A-Za-z]\.?$"}},
        ]},
        {"label": "ORG_CANDIDATE", "pattern": [
            {"LOWER": "doing"},
            {"LOWER": "business"},
            {"LOWER": "as"},
            {"IS_TITLE": True, "OP": "+"},
        ]},
    ]

    ruler.add_patterns(patterns)


# Attach all modules (+ siphon) to augmented pipelines
def attach_entity_modules(pipelines_dict: dict) -> None:
    for key, nlp in pipelines_dict.items():
        if nlp is None or not key.endswith("_aug"):
            continue

        add_title_rules(nlp)
        add_money_candidate_rules(nlp)
        add_org_candidate_rules(nlp)

        attach_candidate_siphon(nlp)          # after entity_ruler, before ner
        attach_strip_ner_money_org(nlp)       # after ner

attach_entity_modules(pipelines)

print("Entity modules attached to augmented pipelines:")
for k, nlp in pipelines.items():
    if nlp is None:
        continue
    if k.endswith("_aug"):
        ruler = nlp.get_pipe("entity_ruler")
        print(f"  {k}: ruler patterns={len(ruler.patterns)} | pipes={nlp.pipe_names}")


Entity modules attached to augmented pipelines:
  sm_aug: ruler patterns=32 | pipes=['tok2vec', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'entity_ruler', 'candidate_siphon', 'ner', 'strip_ner_money_org']
  trf_aug: ruler patterns=32 | pipes=['transformer', 'tagger', 'parser', 'attribute_ruler', 'lemmatizer', 'entity_ruler', 'candidate_siphon', 'ner', 'strip_ner_money_org']


### 5. Shared Helpers (Printing, Selection, Normalization)

This section provides small, reusable helpers used across all experiments and both pipelines.

Goals:
- Keep evaluation tooling consistent across `sm` and `trf`
- Support “twin mode” output:
  - **machine-friendly**: `(start, end, label)` spans
  - **human-friendly**: readable grouped summaries by label
- Standardize candidate selection to reduce overlap noise:
  - prefer-longest selection for *_CANDIDATE entities
- Standardize light normalization where appropriate:
  - punctuation trimming for display (e.g., `$3.1M.` -> `$3.1M`)
  - avoid destructive normalization that changes offsets

Key utilities included:
- `run_suite(nlp, texts, ...)`: run a probe list through a pipeline with consistent output controls
- `iter_ents(doc)`: return machine-friendly entity tuples `(start, end, label)`
- `summarize_ents(doc)`: grouped human-friendly entity view by label
- `prefer_longest(doc, label)`: select non-overlapping candidate spans by length
- `normalize_view(text)`: optional *display-only* cleanup (never used for offsets)

Design notes:
- Offsets are always measured against the original doc text
- All normalization is “view-only” unless explicitly stated otherwise
- Helpers should be model-agnostic and reusable for future testing notebooks


In [8]:
# 5. Shared Helpers (Printing, Selection, Normalization)

# Display-only normalization
def normalize_view(text: str) -> str:
    """
    Display-only cleanup. NEVER used for offsets.
    Current behavior: strip trailing punctuation (e.g., "$3.1M." -> "$3.1M").
    """
    return text.rstrip(string.punctuation)

# Machine-friendly entities
def iter_ents(doc) -> List[Tuple[int, int, str]]:
    """
    Machine-friendly entities: (start_char, end_char, label)
    Offsets always reference doc.text.
    """
    return [(ent.start_char, ent.end_char, ent.label_) for ent in doc.ents]

# Human-friendly summary
def summarize_ents(
    doc,
    *,
    normalize: bool = True
) -> Dict[str, List[Tuple[int, int, str]]]:
    """
    Grouped human-friendly view by label.

    Returns:
      {
        "PERSON": [(start, end, "John Smith"), ...],
        "TITLE":  [(start, end, "Chief Executive Officer"), ...],
      }

    Text is optional display-normalized (never used for offsets).
    """
    grouped = defaultdict(list)
    for start, end, label in iter_ents(doc):
        raw = doc.text[start:end]
        view = normalize_view(raw) if normalize else raw
        grouped[label].append((start, end, view))
    return dict(grouped)

# Longest-span candidate selection
def prefer_longest(
    doc,
    label: str,
    *,
    normalize: bool = True
) -> List[Tuple[int, int, str]]:
    """
    Select non-overlapping spans of a given label, preferring longest spans.

    Returns a list of (start, end, text_view) tuples (display text optional).
    Offsets remain authoritative.
    """
    spans = [(ent.start_char, ent.end_char) for ent in doc.ents if ent.label_ == label]
    spans = sorted(spans, key=lambda x: (x[0], -(x[1] - x[0])))

    kept: List[Tuple[int, int]] = []
    for s, e in spans:
        if any(not (e <= ks or s >= ke) for ks, ke in kept):
            continue
        kept.append((s, e))

    out = []
    for s, e in kept:
        raw = doc.text[s:e]
        view = normalize_view(raw) if normalize else raw
        out.append((s, e, view))
    return out

# Run harness for probe lists
def run_suite(
    nlp,
    texts: List[str],
    *,
    header: Optional[str] = None,
    show_tokens: bool = False,
    show_machine: bool = False,
    show_summary: bool = True,
    normalize: bool = True
) -> None:
    """
    Run a list of probe texts through an nlp pipeline and print consistent output.

    Controls:
    - show_tokens: print token list
    - show_machine: print iter_ents(doc)
    - show_summary: print summarize_ents(doc) (grouped by label)
    - normalize: whether summary uses normalize_view()
    """
    if header:
        print(f"=== {header} ===")

    for text in texts:
        doc = nlp(text)
        print("\nTEXT:", text)

        if show_tokens:
            print("TOKENS:", [t.text for t in doc])

        if show_machine:
            print("ENTS_MACHINE:", iter_ents(doc))

        if show_summary:
            summary = summarize_ents(doc, normalize=normalize)
            print("ENTS_SUMMARY:", summary)

def doc_inventory(
    doc,
    *,
    normalize: bool = True
) -> Dict[str, List[Tuple[int, int, str]]]:
    """
    Master entity inventory for a document.

    Returns:
      {
        "PERSON": [(start, end, "John Q. Smith"), ...],
        "TITLE":  [(start, end, "Chief Financial Officer"), ...],
        "MONEY":  [(start, end, "$2.4 million"), ...],
      }

    - Offsets are always authoritative
    - Text is optionally display-normalized
    """
    inventory = defaultdict(list)

    for ent in doc.ents:
        start = ent.start_char
        end = ent.end_char
        raw = doc.text[start:end]
        view = normalize_view(raw) if normalize else raw
        inventory[ent.label_].append((start, end, view))

    return dict(inventory)

def print_doc_inventory(
    doc,
    *,
    normalize: bool = True
) -> None:
    """
    Human-friendly print of doc_inventory().
    """
    inv = doc_inventory(doc, normalize=normalize)
    print("ENTITY INVENTORY:")
    for label in sorted(inv):
        print(f"  {label}:")
        for start, end, text in inv[label]:
            print(f"    ({start}, {end}) → {text}")

def print_entity_provenance(doc) -> None:
    print("ENTS (final):", [(e.text, e.label_, e.start_char, e.end_char) for e in doc.ents])

    for key in ["money_candidates", "org_candidates"]:
        spans = doc.spans.get(key, [])
        print(f"{key}:", [(s.text, s.label_, s.start_char, s.end_char) for s in spans])




6. Data loading + canonical doc objects

This section loads raw SEC 10-K JSON filings and converts them into canonical document objects used throughout the notebook.

Goals
- Create a consistent representation of each filing with section-scoped text fields used for extraction and scoring.
- Preserve metadata (filename, company, dates) for reporting and grouping results.
- Enforce evaluation scope:
  - Item 7 → MONEY only
  - Item 10 → PERSON / TITLE / ORG only
  - No cross-pollination across sections in scoring.

Inputs
- Primary filing JSONs (raw SEC extraction)
  - Must contain text for item_7 and item_10 (or empty if missing).
-Gold annotation JSONs (*.json.gold.json)
  - Schema versioned; includes annotations[] with section-scoped offsets:
    - label, section, start, end, text

Canonical objects
- We will define two lightweight container types:
  1. CanonicalFiling
    - `filename`
    - `company`
    - `filing_date`, `period_of_report` (if available)
    - `item_7_text` (string; may be empty)
    - `item_10_text` (string; may be empty)
  2. `DocBundle` (optional convenience object)
    - Holds processed spaCy docs for twin evaluation:
      - `item7_doc_sm`, `item7_doc_trf`
      - `item10_doc_sm`, `item10_doc_trf`
    - Also stores the raw section strings used to generate offsets.

Design notes
- Offsets are always section-scoped (relative to `item_7_text` or `item_10_text`).
- spaCy docs are created directly from these canonical section strings (no concatenation).
- In augmented pipelines (*_aug):
  - candidate labels (`MONEY_CANDIDATE`, `ORG_CANDIDATE`) are stored in doc.spans[...]
  - NER-produced `MONEY` and `ORG` are stripped to prevent downstream eval pollution
  - final doc.ents contains only the remaining final labels (e.g., PERSON/TITLE), until candidate→final promotion is applied later.

Output of Section 6
-  list of CanonicalFiling objects
- Loaders to:
  - load raw filings from disk
  - load matching gold files from disk
- Utilities to create DocBundles for downstream harness + scoring

In [9]:
# 6. Data loading + canonical doc objects
# Paths (reproducible)
def find_project_root(
    start: Path,
    *,
    markers: Tuple[str, ...] = ("primary_data.json", "gold_annotations", "Notebooks"),
) -> Path:
    """
    Walk upward from `start` until we find a directory that looks like the project root.
    This makes the notebook robust across Jupyter/VSCode/cwd differences.
    """
    start = start.resolve()
    for p in (start, *start.parents):
        hits = 0
        for m in markers:
            if (p / m).exists():
                hits += 1
        # A loose heuristic: if it has primary_data.json OR gold_annotations, it's likely root.
        if (p / "primary_data.json").exists() or (p / "gold_annotations").exists():
            return p
        # Or: if it has multiple markers, also accept
        if hits >= 2:
            return p
    # Fallback: just use start; callers can assert later
    return start

# In notebooks, Path.cwd() is usually Notebooks/ but not always.
PROJECT_ROOT = find_project_root(Path.cwd())
DATA_PATH = PROJECT_ROOT / "primary_data.json"
GOLD_DIR = PROJECT_ROOT / "gold_annotations"


# Canonical containers
@dataclass(frozen=True)
class CanonicalFiling:
    """
    Canonical, section-scoped text container for a single 10-K filing.

    Notes:
    - item_7_text and item_10_text are the authoritative strings for offsets.
    - We do NOT concatenate sections; offsets remain section-scoped.
    """
    filename: str
    company: str = ""
    filing_date: str = ""
    period_of_report: str = ""
    item_7_text: str = ""
    item_10_text: str = ""


@dataclass(frozen=True)
class GoldAnnotation:
    label: str
    section: str
    start: int
    end: int
    text: str


@dataclass(frozen=True)
class GoldDoc:
    """
    Parsed gold template file (*.json.gold.json) matching schema_version=1.0 structure.
    Offsets are section-scoped.
    """
    filename: str
    company: str = ""
    filing_date: str = ""
    period_of_report: str = ""
    notes: str = ""
    schema_version: str = "1.0"
    annotations: Tuple[GoldAnnotation, ...] = ()


@dataclass
class DocBundle:
    """
    Convenience holder for twin-track docs per section.

    We keep both baseline and augmented docs here so downstream harness/scoring can be
    standardized without recreating docs repeatedly.
    """
    filing: CanonicalFiling
    gold: Optional[GoldDoc]

    # Item 7 docs
    item7_sm_base: Optional[Any] = None
    item7_sm_aug: Optional[Any] = None
    item7_trf_base: Optional[Any] = None
    item7_trf_aug: Optional[Any] = None

    # Item 10 docs
    item10_sm_base: Optional[Any] = None
    item10_sm_aug: Optional[Any] = None
    item10_trf_base: Optional[Any] = None
    item10_trf_aug: Optional[Any] = None


# Low-level JSON helpers
def _read_json(path: Path) -> Any:
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)


def _safe_str(x: Any) -> str:
    return "" if x is None else str(x)


def _extract_section_text(raw: Dict[str, Any], section: str) -> str:
    """
    Best-effort extraction of section text from raw filing JSON.

    Supports common shapes:
    - raw["item_7"] / raw["item_10"] as strings
    - raw["item_7"]["text"] / raw["item_10"]["text"]
    - raw["sections"]["item_7"] / raw["sections"]["item_10"]
    - raw["items"]["item_7"] / raw["items"]["item_10"]
    """
    # Direct
    if section in raw:
        v = raw.get(section)
        if isinstance(v, str):
            return v
        if isinstance(v, dict):
            if isinstance(v.get("text"), str):
                return v["text"]
            if isinstance(v.get("content"), str):
                return v["content"]

    # Nested common containers
    for container_key in ("sections", "items", "extracted", "extraction"):
        cont = raw.get(container_key)
        if isinstance(cont, dict) and section in cont:
            v = cont.get(section)
            if isinstance(v, str):
                return v
            if isinstance(v, dict):
                if isinstance(v.get("text"), str):
                    return v["text"]
                if isinstance(v.get("content"), str):
                    return v["content"]

    return ""

# Parse raw filing record(s)
def parse_filing_record(raw: Dict[str, Any], *, filename_fallback: str = "") -> CanonicalFiling:
    """
    Parse a single filing dict (as stored inside primary_data.json) -> CanonicalFiling.
    """
    filename = raw.get("filename") or filename_fallback
    company = _safe_str(raw.get("company") or raw.get("company_name") or raw.get("registrant_name") or "")
    filing_date = _safe_str(raw.get("filing_date") or raw.get("filed_as_of_date") or raw.get("filed") or "")
    period_of_report = _safe_str(raw.get("period_of_report") or raw.get("period") or raw.get("report_period") or "")

    item_7_text = _extract_section_text(raw, "item_7")
    item_10_text = _extract_section_text(raw, "item_10")

    return CanonicalFiling(
        filename=str(filename),
        company=company,
        filing_date=filing_date,
        period_of_report=period_of_report,
        item_7_text=item_7_text or "",
        item_10_text=item_10_text or "",
    )


def load_filings_from_primary_data(primary_data_path: str | Path = DATA_PATH) -> List[CanonicalFiling]:
    """
    primary_data.json loader for your current setup.

    Expected shapes supported:
    1) Dict keyed by filename -> filing dict
    2) List of filing dicts (each with filename field)
    """
    primary_data_path = Path(primary_data_path)
    raw = _read_json(primary_data_path)

    out: List[CanonicalFiling] = []
    if isinstance(raw, dict):
        # Common: { "1066...json": { ...filing... }, ... }
        for k, v in raw.items():
            if isinstance(v, dict):
                out.append(parse_filing_record(v, filename_fallback=str(k)))
    elif isinstance(raw, list):
        for v in raw:
            if isinstance(v, dict):
                out.append(parse_filing_record(v))
    else:
        raise TypeError(f"Unsupported primary_data.json shape: {type(raw)}")

    return out


# Parse gold JSON -> GoldDoc
def parse_gold_json(path: Path) -> GoldDoc:
    raw = _read_json(path)

    anns: List[GoldAnnotation] = []
    for a in (raw.get("annotations", []) or []):
        anns.append(
            GoldAnnotation(
                label=_safe_str(a.get("label")),
                section=_safe_str(a.get("section")),
                start=int(a.get("start")),
                end=int(a.get("end")),
                text=_safe_str(a.get("text")),
            )
        )

    return GoldDoc(
        filename=_safe_str(raw.get("filename") or ""),
        company=_safe_str(raw.get("company") or ""),
        filing_date=_safe_str(raw.get("filing_date") or ""),
        period_of_report=_safe_str(raw.get("period_of_report") or ""),
        notes=_safe_str(raw.get("notes") or ""),
        schema_version=_safe_str(raw.get("schema_version") or "1.0"),
        annotations=tuple(anns),
    )


def load_gold_from_dir(gold_dir: str | Path = GOLD_DIR) -> Dict[str, GoldDoc]:
    """
    Returns mapping: base filename (e.g., '1017655_10K_2020_...json') -> GoldDoc
    """
    gold_dir = Path(gold_dir)
    paths = sorted(gold_dir.glob("*.json.gold.json"))

    out: Dict[str, GoldDoc] = {}
    for p in paths:
        gd = parse_gold_json(p)
        # Gold files store original filename inside; trust it when available.
        key = gd.filename or p.name.replace(".gold.json", "")
        out[key] = gd

    return out

# Gold alignment validator
def validate_gold_offsets(
    filing: CanonicalFiling,
    gold: GoldDoc,
    *,
    max_errors: int = 10,
) -> List[str]:
    """
    Validates that gold annotation substrings match the provided 'text' field.
    Returns a list of human-readable error strings (empty list means OK).
    """
    section_text_map = {
        "item_7": filing.item_7_text,
        "item_10": filing.item_10_text,
    }

    errors: List[str] = []
    for i, ann in enumerate(gold.annotations):
        sec_text = section_text_map.get(ann.section, "")
        if not sec_text:
            errors.append(f"[{i}] Missing section text for {ann.section} (filing={filing.filename})")
            if len(errors) >= max_errors:
                break
            continue

        if ann.start < 0 or ann.end > len(sec_text) or ann.start >= ann.end:
            errors.append(
                f"[{i}] Bad span bounds {ann.start}:{ann.end} for {ann.section} "
                f"(len={len(sec_text)}) text={ann.text!r}"
            )
            if len(errors) >= max_errors:
                break
            continue

        slice_text = sec_text[ann.start:ann.end]
        if slice_text != ann.text:
            errors.append(
                f"[{i}] Text mismatch in {ann.section} {ann.start}:{ann.end}\n"
                f"  gold_text:  {ann.text!r}\n"
                f"  slice_text: {slice_text!r}"
            )
            if len(errors) >= max_errors:
                break

    return errors

# Build DocBundle (twin-track docs)
def make_doc_bundle(
    filing: CanonicalFiling,
    gold: Optional[GoldDoc],
    pipelines: dict,
) -> DocBundle:
    """
    Create section-scoped spaCy docs for both baseline and augmented pipelines.
    This is intentionally "dumb": it just runs text through each loaded pipeline.
    """
    b = DocBundle(filing=filing, gold=gold)

    # Item 7
    text7 = filing.item_7_text or ""
    if pipelines.get("sm_base") is not None:
        b.item7_sm_base = pipelines["sm_base"](text7)
    if pipelines.get("sm_aug") is not None:
        b.item7_sm_aug = pipelines["sm_aug"](text7)
    if pipelines.get("trf_base") is not None:
        b.item7_trf_base = pipelines["trf_base"](text7)
    if pipelines.get("trf_aug") is not None:
        b.item7_trf_aug = pipelines["trf_aug"](text7)

    # Item 10
    text10 = filing.item_10_text or ""
    if pipelines.get("sm_base") is not None:
        b.item10_sm_base = pipelines["sm_base"](text10)
    if pipelines.get("sm_aug") is not None:
        b.item10_sm_aug = pipelines["sm_aug"](text10)
    if pipelines.get("trf_base") is not None:
        b.item10_trf_base = pipelines["trf_base"](text10)
    if pipelines.get("trf_aug") is not None:
        b.item10_trf_aug = pipelines["trf_aug"](text10)

    return b


def build_doc_bundles(
    filings: List[CanonicalFiling],
    gold_map: Dict[str, GoldDoc],
    pipelines: dict,
) -> List[DocBundle]:
    """
    Match filings to gold (by filename), build DocBundles.
    """
    bundles: List[DocBundle] = []
    for f in filings:
        g = gold_map.get(f.filename)
        bundles.append(make_doc_bundle(f, g, pipelines))
    return bundles

# Suggested usage in the notebook
#filings = load_filings_from_primary_data(DATA_PATH)
# gold_map = load_gold_from_dir(GOLD_DIR)
# bundles = build_doc_bundles(filings, gold_map, pipelines)

# (Optional) quick gold sanity check on only the ones that have gold:
# for b in bundles:
#     if b.gold:
#         errs = validate_gold_offsets(b.filing, b.gold, max_errors=3)
#         if errs:
#             print("----", b.filing.filename)
#             print("\n".join(errs))


7. Gold Parsing + Alignment Validator

This section defines how spaCy predictions are aligned against gold annotations for evaluation. The purpose is measurement only:
- no training
- no rule tuning
- no feedback into the pipeline

All alignment is performed post-extraction using immutable `DocBundle` objects.

In [10]:
# Section 7: Alignment primitives
SpanBounds = Tuple[int, int]  # (start, end)

@dataclass(frozen=True)
class PredictedSpan:
    """
    Normalized prediction span used for alignment.
    """
    label: str
    section: str
    start: int
    end: int


@dataclass(frozen=True)
class GoldSpan:
    """
    Normalized gold span used for alignment.
    """
    label: str
    section: str
    start: int
    end: int


@dataclass
class AlignmentResult:
    """
    Result container for a single alignment pass.
    """
    true_positives: List[Tuple[GoldSpan, PredictedSpan]]
    false_negatives: List[GoldSpan]
    false_positives: List[PredictedSpan]


7.1 Section Scope

Evaluation is strictly section-scoped:

- Item 7
  - Evaluated label: `MONEY`
- Item 10
  - Evaluated labels: `PERSON`,`TITLE`, `ORG`

Predictions outside the evaluated section or label set are ignored.
Offsets are always interpreted relative to the section text.

In [11]:
# Section 7.1: Section + label routing

EVAL_LABELS_BY_SECTION = {
    "item_7": {"MONEY"},
    "item_10": {"PERSON", "TITLE", "ORG"},
}


def is_eval_label(section: str, label: str) -> bool:
    return label in EVAL_LABELS_BY_SECTION.get(section, set())


7.2 Gold Assumptions

Gold annotations:
- are authoritative
- are section-scoped
- use character offsets
- must exactly match the underlying section text slice

Gold offsets are validated prior to evaluation.

In [12]:
# Section 7.2: Gold normalization

def gold_spans_from_doc(gold: GoldDoc) -> List[GoldSpan]:
    spans: List[GoldSpan] = []

    for ann in gold.annotations:
        if not is_eval_label(ann.section, ann.label):
            continue

        spans.append(
            GoldSpan(
                label=ann.label,
                section=ann.section,
                start=ann.start,
                end=ann.end,
            )
        )

    return spans


7.3 Prediction Source

Predictions are sourced from the section routing rules defined in the harness:

- Item 7:
  - Predictions are routed from `doc.spans["money_candidates"]`
  - Candidate label `MONEY_CANDIDATE` is normalized to evaluated label `MONEY`

- Item 10
  - Predictions are routed from `doc.ents`
  - Only evaluated labels are retained: `PERSON`, `TITLE`, `ORG`

All predictions are then normalized into a common structure:
- `label`
- `section`
- `start`
- `end`

Candidate entities are never evaluated directly as candidates; they are either routed and normalized (Item 7 MONEY), or ignored.

In [13]:
def predicted_spans_from_doc(doc, section: str) -> List[PredictedSpan]:
    """
    Normalize predictions into PredictedSpan objects for evaluation.

    - Item 7: predictions are not sourced from doc.ents (MONEY is handled elsewhere)
    - Item 10: predictions are sourced from doc.ents and filtered by SECTION_LABELS
    """
    # Item 7 MONEY is not evaluated from doc.ents in this project
    if section == "item_7":
        return []

    allowed = SECTION_LABELS.get(section, set())
    spans: List[PredictedSpan] = []

    for ent in doc.ents:
        if ent.label_ not in allowed:
            continue
        spans.append(
            PredictedSpan(
                label=ent.label_,
                section=section,
                start=ent.start_char,
                end=ent.end_char,
            )
        )

    return spans


In [14]:
def predicted_money_spans_from_candidates(doc, section: str = "item_7") -> List[PredictedSpan]:
    """
    Item 7 prediction source (Option 1):
    Use doc.spans["money_candidates"] (populated by candidate_siphon) and normalize to MONEY.
    """
    out: List[PredictedSpan] = []
    for sp in doc.spans.get("money_candidates", []):
        # candidate spans were labeled MONEY_CANDIDATE; evaluation label is MONEY
        out.append(
            PredictedSpan(
                label="MONEY",
                section=section,
                start=sp.start_char,
                end=sp.end_char,
            )
        )
    return out


7.4 Alignment Modes

Two alignment modes are used:
- Strict
  - label match
  - section match
  - exact `(start, end)` match
- Relaxed
  - label match
  - section match
  - any character overlap

Strict alignment measures exact span recovery.

Relaxed alignment captures near-misses and boundary drift.

In [15]:
# Section 7.4: Alignment logic

def spans_overlap(a: SpanBounds, b: SpanBounds) -> bool:
    return not (a[1] <= b[0] or b[1] <= a[0])


def strict_match(g: GoldSpan, p: PredictedSpan) -> bool:
    return (
        g.label == p.label and
        g.section == p.section and
        g.start == p.start and
        g.end == p.end
    )


def relaxed_match(g: GoldSpan, p: PredictedSpan) -> bool:
    if g.label != p.label or g.section != p.section:
        return False
    return spans_overlap((g.start, g.end), (p.start, p.end))


7.5 Match Accounting

For each pipeline variant:
- TP: gold annotation matched by a prediction
- FN: gold annotation with no match
- FP: prediction with no gold match

Each gold annotation may match at most one prediction per alignment pass.

In [16]:
# Section 7.5: Match accounting

def align_spans(
    gold_spans: List[GoldSpan],
    pred_spans: List[PredictedSpan],
    *,
    matcher,
) -> AlignmentResult:
    matched_preds = set()
    tp = []

    for g in gold_spans:
        match = None
        for i, p in enumerate(pred_spans):
            if i in matched_preds:
                continue
            if matcher(g, p):
                match = i
                break

        if match is not None:
            matched_preds.add(match)
            tp.append((g, pred_spans[match]))

    fn = [g for g in gold_spans if g not in [x[0] for x in tp]]
    fp = [p for i, p in enumerate(pred_spans) if i not in matched_preds]

    return AlignmentResult(tp, fn, fp)


7.6 Metrics

Metrics are reported per:
- section
- label
- pipeline variant

Reported metrics:
- precision
- recall
- F1

Metrics are computed separately for strict and relaxed alignment.

In [17]:
# Section 7.6: Metrics

def precision(tp: int, fp: int) -> float:
    return tp / (tp + fp) if (tp + fp) else 0.0


def recall(tp: int, fn: int) -> float:
    return tp / (tp + fn) if (tp + fn) else 0.0


def f1(p: float, r: float) -> float:
    return 2 * p * r / (p + r) if (p + r) else 0.0


8. Run Harness (Single + Twin)

This section defines the execution harness used to run filings through spaCy pipelines and produce evaluation-ready predictions. The harness is responsible for:

- selecting the appropriate section text
- extracting predictions from the correct pipeline variant
- normalizing predictions into a common span representation
- passing predictions into the Section 7 alignment logic

No scoring is performed here. This section produces inputs to evaluation, not metrics.

In [18]:
# Section 8 — Harness: Paths and checks
# Explicitly anchor to Google Drive project root (Colab-safe)
PROJECT_ROOT = Path("/content/drive/MyDrive/Task 3")
DATA_PATH = PROJECT_ROOT / "primary_data.json"
GOLD_DIR  = PROJECT_ROOT / "gold_annotations"

assert DATA_PATH.exists(), f"Missing primary_data.json at {DATA_PATH}"
assert GOLD_DIR.exists(),  f"Missing gold_annotations at {GOLD_DIR}"



8.1 Execution Modes

Two execution modes are supported:

- Single mode: Runs a single pipeline variant (e.g., `sm_aug`) for focused inspection or debugging.

- Twin mode: Runs paired pipelines side-by-side (e.g., `sm_base` vs `sm_aug`, `sm_aug` vs `trf_aug`) to support direct comparison.

Both modes operate over the same `DocBundle` objects to ensure consistent inputs.

In [19]:
# Section 8.1 — Load primary data and gold
def load_primary_data(primary_path: str | Path) -> List[CanonicalFiling]:
    """
    Load primary_data.json where the structure is:
        {
          "<filename>.json": { ...raw filing payload... },
          ...
        }

    Returns:
        List[CanonicalFiling]
    """
    primary_path = Path(primary_path)
    raw = _read_json(primary_path)

    if not isinstance(raw, dict):
        raise ValueError(f"Expected top-level dict in primary_data.json, got {type(raw)}")

    filings: List[CanonicalFiling] = []
    skipped = 0

    for filename_key, rec in raw.items():
        if not isinstance(rec, dict):
            skipped += 1
            continue

        # Inject filename from dict key (authoritative)
        rec2 = dict(rec)
        rec2["filename"] = rec2.get("filename") or str(filename_key)

        filings.append(parse_filing_record(rec2))

    # Light sanity check: ensure filenames look like filenames
    if filings and not any(f.filename.endswith(".json") for f in filings[:10]):
        print("filenames don't look like '*.json' keys. Check primary_data.json structure.")

    if skipped:
        print(f"Skipped {skipped} non-dict records in primary_data.json")

    return filings


def load_gold(gold_dir: str | Path) -> Dict[str, GoldDoc]:
    """
    Load gold docs from gold_annotations/*.json.gold.json

    Returns:
        Dict[str, GoldDoc] mapping original filing filename -> GoldDoc
    """
    return load_gold_from_dir(gold_dir)


# (Recommended) one-cell "do the thing" usage:
filings = load_primary_data(DATA_PATH)
gold_map = load_gold(GOLD_DIR)
print("Loaded filings:", len(filings))
print("Loaded gold docs:", len(gold_map))


Loaded filings: 191
Loaded gold docs: 8


In [20]:
filings = load_primary_data(DATA_PATH)
print(len(filings))
print(filings[0].filename)
print(len(filings[0].item_7_text), len(filings[0].item_10_text))


191
1001601_10K_2020_0001493152-21-008913.json
29840 6729


8.2 Section Routing

Predictions are generated independently per section:

- Item 7
  - Source text: CanonicalFiling.item_7_text
  - Evaluated label: `MONEY`

- Item 10
  - Source text: CanonicalFiling.item_10_text
  - Evaluated labels: `PERSON`, `TITLE`, `ORG`

Each section is processed separately to preserve section-scoped offsets and avoid cross-section contamination.

In [21]:
# 8.2 Section Routing — configuration

SECTION_LABELS = {
    "item_7": {"MONEY"},
    "item_10": {"PERSON", "TITLE", "ORG"},
}

# Use SECTION_LABELS as the authoritative evaluation scope
EVAL_LABELS_BY_SECTION = SECTION_LABELS

def is_eval_label(section: str, label: str) -> bool:
    return label in EVAL_LABELS_BY_SECTION.get(section, set())

# Item 7 money is produced as candidate spans, not ents
ITEM7_SPAN_KEY = "money_candidates"
ITEM7_CANDIDATE_LABEL = "MONEY_CANDIDATE"

def routed_predictions(doc, *, section: str) -> List[Tuple[int, int, str, str]]:
    """
    Select predictions from a spaCy Doc according to section routing rules.

    Returns:
        List of (start, end, label, text) tuples
        Offsets are section-scoped.
    """
    allowed = SECTION_LABELS.get(section, set())
    out: List[Tuple[int, int, str, str]] = []

    if section == "item_7":
        # Pull from doc.spans["money_candidates"] and normalize label to MONEY
        for sp in doc.spans.get(ITEM7_SPAN_KEY, []):
            # only accept the candidate label we expect
            if sp.label_ != ITEM7_CANDIDATE_LABEL:
                continue

            canon_label = "MONEY"
            if canon_label not in allowed:
                continue

            out.append((sp.start_char, sp.end_char, canon_label, sp.text))
        return out

    # Default: Item 10 (and any future section) routes from doc.ents
    for ent in doc.ents:
        if ent.label_ not in allowed:
            continue
        out.append((ent.start_char, ent.end_char, ent.label_, ent.text))

    return out


In [22]:
# pick a filing with gold (safer)
gold_filename = next(iter(gold_map.keys()))
f_gold = next(f for f in filings if f.filename == gold_filename)

doc7 = pipelines["sm_aug"](f_gold.item_7_text)
pred7 = routed_predictions(doc7, section="item_7")

print("Testing gold-backed filename:", gold_filename)
print("Item 7 routed labels:", {p[2] for p in pred7})
print("Item 7 routed count:", len(pred7))
print("Item 7 sample texts:", [p[3] for p in pred7[:10]])

print("doc.spans keys:", list(doc7.spans.keys()))
print("money_candidates spans:", len(doc7.spans.get("money_candidates", [])))


Testing gold-backed filename: 1017655_10K_2020_0001654954-21-003649.json
Item 7 routed labels: {'MONEY'}
Item 7 routed count: 38
Item 7 sample texts: ['$3,541', '$19,395', '$12,920,789', '$10,548,295', '$15,854', '$3,541', '$19,395', '$120,190', '$27,845', '$148,035']
doc.spans keys: ['money_candidates', 'org_candidates']
money_candidates spans: 38


8.3 Pipeline Variants

Each `DocBundle` may contain predictions from up to four pipeline variants:

- `sm_base`
- `sm_aug`
- `trf_base`
- `trf_aug`

The harness dynamically detects which variants are available and runs only those present. This allows:

- lightweight iteration with `sm`
- higher-quality comparison using `trf`
- apples-to-apples evaluation between baseline and rule-augmented pipelines

In [23]:
# Section 8.3 — Pipeline variants (available + helpers)

PIPELINE_VARIANTS = ("sm_base", "sm_aug", "trf_base", "trf_aug")

def available_variants(pipelines: dict) -> List[str]:
    """
    Return pipeline variant keys that exist and are callable.
    """
    out = []
    for k in PIPELINE_VARIANTS:
        nlp = pipelines.get(k)
        if nlp is not None and callable(nlp):
            out.append(k)
    return out

def run_variant_on_section(
    filing: CanonicalFiling,
    *,
    pipelines: dict,
    variant: str,
    section: str,
):
    """
    Run a specific pipeline variant on a single section's text and return the Doc.
    """
    nlp = pipelines.get(variant)
    if nlp is None:
        raise ValueError(f"Pipeline variant not found: {variant}")

    if section == "item_7":
      text = filing.item_7_text or ""
    elif section == "item_10":
      text = filing.item_10_text or ""
    else:
      raise ValueError(f"Unknown section: {section!r}")
    return nlp(text)

def run_all_variants_on_section(
    filing: CanonicalFiling,
    *,
    pipelines: dict,
    section: str,
) -> Dict[str, Any]:
    """
    Run all available variants for a given section and return mapping:
        variant -> Doc
    """
    docs = {}
    for v in available_variants(pipelines):
        docs[v] = run_variant_on_section(filing, pipelines=pipelines, variant=v, section=section)
    return docs

# --- 8.3 sanity check: show what variants are actually available ---
avs = available_variants(pipelines)
print("Available pipeline variants:", avs)

# Pick one filing to inspect
f0 = filings[0]


Available pipeline variants: ['sm_base', 'sm_aug', 'trf_base', 'trf_aug']


In [24]:
print("pipelines keys:", sorted(pipelines.keys()))
for k in ("sm_base", "sm_aug", "trf_base", "trf_aug"):
    v = pipelines.get(k)
    print(f"{k:8} ->", "LOADED" if v is not None else "None")


pipelines keys: ['sm_aug', 'sm_base', 'trf_aug', 'trf_base']
sm_base  -> LOADED
sm_aug   -> LOADED
trf_base -> LOADED
trf_aug  -> LOADED


In [25]:
# Run all variants for each section (no scoring; just existence + routing output)
docs_item7 = run_all_variants_on_section(f0, pipelines=pipelines, section="item_7")
docs_item10 = run_all_variants_on_section(f0, pipelines=pipelines, section="item_10")

print("\nItem 7 routed labels by variant:")
for v, d in docs_item7.items():
    preds = routed_predictions(d, section="item_7")
    print(f"  {v:8s} -> labels={sorted({p[2] for p in preds})}  n={len(preds)}")

print("\nItem 10 routed labels by variant:")
for v, d in docs_item10.items():
    preds = routed_predictions(d, section="item_10")
    print(f"  {v:8s} -> labels={sorted({p[2] for p in preds})}  n={len(preds)}")


Item 7 routed labels by variant:
  sm_base  -> labels=[]  n=0
  sm_aug   -> labels=['MONEY']  n=93
  trf_base -> labels=[]  n=0
  trf_aug  -> labels=['MONEY']  n=93

Item 10 routed labels by variant:
  sm_base  -> labels=['ORG', 'PERSON']  n=78
  sm_aug   -> labels=['PERSON', 'TITLE']  n=52
  trf_base -> labels=['ORG', 'PERSON']  n=59
  trf_aug  -> labels=['PERSON', 'TITLE']  n=51


8.4 Prediction Normalization

All predictions are normalized into a shared structure prior to evaluation:
- `label`
- `section`
- `start offset`
- `end offset`

Predictions are sourced from the section routing rules (`routed_predictions()`), not directly from spaCy internals:

- Item 7:
  - routed from doc.spans["money_candidates"]
  - candidate label `MONEY_CANDIDATE` is normalized to evaluated label `MONEY`
- Item 10
  - routed from `doc.ents`
  - only evaluated labels are retained (`PERSON`, `TITLE`, `ORG`)

At this stage:
- offsets are preserved (no boundary adjustment)
- candidate entities are not evaluated as candidates
- exact duplicate spans may be removed using (`section`, `label`, `start`, `end`) as the dedupe key

8.5 Harness Outputs

For each document (`CanonicalFiling`) and each available pipeline variant (`sm_base`, `sm_aug`, `trf_base`, `trf_aug`), the harness produces evaluation-ready inputs per section.

For each section (`item_7`, `item_10`) and variant:
- Predictions (normalized)
  - A list of `PredictedSpan` objects created by Section 7.3 (`predicted_spans_from_doc(...)`)
  - Includes: label, section, start, end
  - Offsets remain section-scoped and unchanged
  - Exact duplicates may be removed
- Gold spans (when available):
  - A list of `GoldSpan` objects created by `gold_spans_from_doc(...)`
  - Includes: `label`, `section`, `start`, `end`
  - Only spans in evaluation scope are included (per `EVAL_LABELS_BY_SECTION`)

For each (section, variant), the harness provides the pair:
- `(gold_spans, predicted_spans)`

These are passed directly to `align_spans(...)` using:
- Strict matching (`strict_match`)
- Relaxed matching (`relaxed_match`)

The harness output is organized as `document → variant → section → {gold_spans, predicted_spans}`

9. Outputs (Human + Machine)

This section defines the outputs produced after extraction, routing, and normalization are complete.

Outputs are divided into:
- Machine outputs (Section 9.1): structured, evaluation-ready data
- Human outputs (Section 9.2): readable summaries for inspection

In [26]:
# Section 9 setup — gold-only evaluation scope
filings = load_primary_data(DATA_PATH)
gold_map = load_gold(GOLD_DIR)

eval_filings = [f for f in filings if f.filename in gold_map]

print(f"Total filings loaded: {len(filings)}")
print(f"Filings with gold:     {len(eval_filings)}")

bundles = build_doc_bundles(
    filings=eval_filings,
    gold_map=gold_map,
    pipelines=pipelines,
)


Total filings loaded: 191
Filings with gold:     8


9.1 Machine Outputs

Machine outputs are structured, deterministic, and suitable for downstream scoring and aggregation.

Outputs are organized as:

`document → pipeline variant → section → outputs`

Where:
- document is identified by filing filename
- pipeline variant is one of: `sm_base`, `sm_aug`, `trf_base`, `trf_aug` (when available)
- section is one of: `item_7`, `item_10`

For each (`document`, `variant`, `section`) combination, machine outputs include:
- Normalized prediction spans
  - Produced by Section 7.3
  - Fields: `label`, `section`, `start`, `end`
  - Offsets are section-scoped and unchanged
  - Exact duplicate spans may be removed
- Gold spans (when available)
  - Produced by Section 7.2
  - Fields: `label`, `section`, `start`, `end`
  - Limited to labels within evaluation scope

In [27]:
# Section 9.1 — Machine Outputs
def build_machine_outputs(
    bundles: List[DocBundle],
    *,
    pipelines: dict,
) -> Dict[str, Any]:
    results: Dict[str, Any] = {}
    variants = available_variants(pipelines)

    for bundle in bundles:
        filing = bundle.filing
        filename = filing.filename

        results[filename] = {
            "meta": {
                "company": filing.company,
                "filing_date": filing.filing_date,
                "period_of_report": filing.period_of_report,
            },
            "variants": {},
        }

        for variant in variants:
            results[filename]["variants"][variant] = {}

            for section in SECTION_LABELS.keys():
                # bundle attrs are item7_* / item10_* (no underscore)
                attr = f"{section.replace('_', '')}_{variant}"
                doc = getattr(bundle, attr, None)
                if doc is None:
                    continue

                if section == "item_7":
                  preds = predicted_money_spans_from_candidates(doc, section="item_7")
                else:
                  preds = predicted_spans_from_doc(doc, section) or []

                gold = []
                if bundle.gold:
                    gold = [g for g in gold_spans_from_doc(bundle.gold) if g.section == section]

                results[filename]["variants"][variant][section] = {
                    "pred": preds,
                    "gold": gold,
                }

    return results


In [28]:
machine_outputs = build_machine_outputs(bundles, pipelines=pipelines)

some_file = next(iter(machine_outputs))
print(machine_outputs[some_file].keys())
print(machine_outputs[some_file]["variants"].keys())
print(machine_outputs[some_file]["variants"]["sm_aug"].keys())


dict_keys(['meta', 'variants'])
dict_keys(['sm_base', 'sm_aug', 'trf_base', 'trf_aug'])
dict_keys(['item_7', 'item_10'])


In [29]:
# --- Section 9 sanity checks (quick) ---

# Pick one document
filename = next(iter(machine_outputs))
doc_out = machine_outputs[filename]

print("FILE:", filename)
print("META:", doc_out["meta"].keys())
print("VARIANTS:", list(doc_out["variants"].keys()))

# Pick one variant
variant = next(iter(doc_out["variants"]))
print("\nVARIANT:", variant)

for section in ("item_7", "item_10"):
    sec = doc_out["variants"][variant].get(section)
    if not sec:
        print(f"  {section}: <missing>")
        continue

    preds = sec["pred"] or []
    gold = sec["gold"] or []

    print(f"\n  {section}:")
    print(f"    pred spans: {len(preds)}")
    print(f"    gold spans: {len(gold) if gold is not None else 'None'}")

    # Peek at first span if present
    if preds:
        p = preds[0]
        print(f"    sample pred: ({p.label}, {p.start}:{p.end})")

    if gold:
        g = gold[0]
        print(f"    sample gold: ({g.label}, {g.start}:{g.end})")


FILE: 1017655_10K_2020_0001654954-21-003649.json
META: dict_keys(['company', 'filing_date', 'period_of_report'])
VARIANTS: ['sm_base', 'sm_aug', 'trf_base', 'trf_aug']

VARIANT: sm_base

  item_7:
    pred spans: 0
    gold spans: 63
    sample gold: (MONEY, 13031:13037)

  item_10:
    pred spans: 95
    gold spans: 62
    sample pred: (PERSON, 222:237)
    sample gold: (PERSON, 222:241)


9.2 Human Outputs

Human outputs provide readable views of the machine outputs for inspection, debugging, and comparison. These outputs are derived directly from the machine outputs defined in Section 9.1 and do not introduce new logic.

Human outputs may include:
- per-document summaries grouped by label
- per-section views (item_7, item_10)
- side-by-side comparisons across pipeline variants
- counts of predicted, gold, and matched spans

Human outputs are intended for analysis and presentation only and are not used for scoring or aggregation.

In [30]:
def print_human_summary(machine_outputs, filename):
    doc = machine_outputs[filename]
    print(f"\nFILE: {filename}")
    print(f"Company: {doc['meta'].get('company','')}")
    print("-" * 60)

    for variant, vdata in doc["variants"].items():
        print(f"\nVARIANT: {variant}")
        for section, sdata in vdata.items():
            preds = sdata["pred"] or []
            gold = sdata["gold"] or []

            print(f"  {section}:")
            print(f"    pred spans: {len(preds)}")
            print(f"    gold spans: {len(gold)}")

            if preds:
                p = preds[0]
                print(f"    sample pred: {p.label} ({p.start}:{p.end})")

            if gold:
                g = gold[0]
                print(f"    sample gold: {g.label} ({g.start}:{g.end})")


In [31]:
fname = next(iter(machine_outputs))
print_human_summary(machine_outputs, fname)



FILE: 1017655_10K_2020_0001654954-21-003649.json
Company: PAID INC
------------------------------------------------------------

VARIANT: sm_base
  item_7:
    pred spans: 0
    gold spans: 63
    sample gold: MONEY (13031:13037)
  item_10:
    pred spans: 95
    gold spans: 62
    sample pred: PERSON (222:237)
    sample gold: PERSON (222:241)

VARIANT: sm_aug
  item_7:
    pred spans: 38
    gold spans: 63
    sample pred: MONEY (13031:13037)
    sample gold: MONEY (13031:13037)
  item_10:
    pred spans: 51
    gold spans: 62
    sample pred: PERSON (222:237)
    sample gold: PERSON (222:241)

VARIANT: trf_base
  item_7:
    pred spans: 0
    gold spans: 63
    sample gold: MONEY (13031:13037)
  item_10:
    pred spans: 79
    gold spans: 62
    sample pred: PERSON (259:270)
    sample gold: PERSON (222:241)

VARIANT: trf_aug
  item_7:
    pred spans: 38
    gold spans: 63
    sample pred: MONEY (13031:13037)
    sample gold: MONEY (13031:13037)
  item_10:
    pred spans: 48
    go

## 10. Scoring

This section scores each pipeline variant against gold annotations using the
machine outputs produced in Section 9.

Scoring is performed post-extraction and does not modify predictions or gold
annotations.

### Evaluation Scope

- **Item 7**: MONEY  
  - Predictions sourced from `money_candidates` in augmented pipelines
- **Item 10**: PERSON, TITLE, ORG  
  - Predictions sourced from `doc.ents`

Pipeline variants without applicable predictions for a section are scored
normally.

### Alignment

Two alignment modes are used:

- **Strict**: exact `(start, end)` match
- **Relaxed**: any character overlap

Alignment requires matching label and section. Each gold span may match at most
one prediction per pass.

### Metrics

Metrics are computed per pipeline variant and section:
- precision
- recall
- F1 score

Strict and relaxed metrics are reported separately.

### Notes

- Offsets are authoritative
- No post-hoc adjustment or gold-informed filtering is applied


In [32]:
# Section 10: Scoring (Micro)
ALIGN_MODES = {
    "strict": strict_match,
    "relaxed": relaxed_match,
}

def score_one_pass(
    gold_spans: List[GoldSpan],
    pred_spans: List[PredictedSpan],
    *,
    mode: str,
) -> Dict[str, Any]:
    """
    Score one (gold, pred) list using a single alignment mode.
    Returns counts + metrics.
    """
    matcher = ALIGN_MODES[mode]
    res = align_spans(gold_spans, pred_spans, matcher=matcher)

    tp = len(res.true_positives)
    fn = len(res.false_negatives)
    fp = len(res.false_positives)

    p = precision(tp, fp)
    r = recall(tp, fn)
    return {
        "mode": mode,
        "tp": tp,
        "fp": fp,
        "fn": fn,
        "precision": p,
        "recall": r,
        "f1": f1(p, r),
    }


def score_machine_outputs(machine_outputs: Dict[str, Any]) -> pd.DataFrame:
    """
    Flatten per-doc scores into a dataframe:
      filename, variant, section, mode, tp, fp, fn, precision, recall, f1
    """
    rows = []

    for filename, docdata in machine_outputs.items():
        variants = docdata.get("variants", {})
        for variant, vdata in variants.items():
            for section, sdata in vdata.items():
                gold_spans = sdata.get("gold", []) or []
                pred_spans = sdata.get("pred", []) or []

                # Score both strict + relaxed
                for mode in ("strict", "relaxed"):
                    out = score_one_pass(gold_spans, pred_spans, mode=mode)
                    rows.append({
                        "filename": filename,
                        "variant": variant,
                        "section": section,
                        **out,
                    })

    return pd.DataFrame(rows)


def micro_aggregate(scores_df: pd.DataFrame) -> pd.DataFrame:
    """
    Micro-average by summing TP/FP/FN, then recomputing metrics.
    Grouped by: variant, section, mode
    """
    grouped = (
        scores_df
        .groupby(["variant", "section", "mode"], as_index=False)[["tp", "fp", "fn"]]
        .sum()
    )

    # Recompute metrics from summed counts (micro)
    grouped["precision"] = grouped.apply(lambda r: precision(int(r.tp), int(r.fp)), axis=1)
    grouped["recall"]    = grouped.apply(lambda r: recall(int(r.tp), int(r.fn)), axis=1)
    grouped["f1"]        = grouped.apply(lambda r: f1(float(r.precision), float(r.recall)), axis=1)

    # Nice ordering
    return grouped.sort_values(["section", "mode", "variant"]).reset_index(drop=True)


# ---- Run scoring ----
scores_df = score_machine_outputs(machine_outputs)
micro_df  = micro_aggregate(scores_df)

print("Per-doc score rows:", len(scores_df))
print("Micro-aggregated rows:", len(micro_df))

micro_df


Per-doc score rows: 128
Micro-aggregated rows: 16


Unnamed: 0,variant,section,mode,tp,fp,fn,precision,recall,f1
0,sm_aug,item_10,relaxed,140,183,139,0.433437,0.501792,0.465116
1,sm_base,item_10,relaxed,110,408,169,0.212355,0.394265,0.276035
2,trf_aug,item_10,relaxed,139,156,140,0.471186,0.498208,0.484321
3,trf_base,item_10,relaxed,130,287,149,0.311751,0.46595,0.373563
4,sm_aug,item_10,strict,111,212,168,0.343653,0.397849,0.368771
5,sm_base,item_10,strict,87,431,192,0.167954,0.311828,0.218319
6,trf_aug,item_10,strict,112,183,167,0.379661,0.401434,0.390244
7,trf_base,item_10,strict,116,301,163,0.278177,0.415771,0.333333
8,sm_aug,item_7,relaxed,522,9,420,0.983051,0.55414,0.708758
9,sm_base,item_7,relaxed,0,0,942,0.0,0.0,0.0


In [33]:
micro_df.sort_values(["section", "mode", "f1"], ascending=[True, True, False]).head(12)


Unnamed: 0,variant,section,mode,tp,fp,fn,precision,recall,f1
2,trf_aug,item_10,relaxed,139,156,140,0.471186,0.498208,0.484321
0,sm_aug,item_10,relaxed,140,183,139,0.433437,0.501792,0.465116
3,trf_base,item_10,relaxed,130,287,149,0.311751,0.46595,0.373563
1,sm_base,item_10,relaxed,110,408,169,0.212355,0.394265,0.276035
6,trf_aug,item_10,strict,112,183,167,0.379661,0.401434,0.390244
4,sm_aug,item_10,strict,111,212,168,0.343653,0.397849,0.368771
7,trf_base,item_10,strict,116,301,163,0.278177,0.415771,0.333333
5,sm_base,item_10,strict,87,431,192,0.167954,0.311828,0.218319
8,sm_aug,item_7,relaxed,522,9,420,0.983051,0.55414,0.708758
10,trf_aug,item_7,relaxed,522,9,420,0.983051,0.55414,0.708758


In [38]:
# -------------------------------
# ONE-TIME UNDER-THE-HOOD INSPECTION
# -------------------------------

# pick a gold-backed file (first one is fine)
filename = next(iter(gold_map.keys()))
filing = next(f for f in filings if f.filename == filename)
gold = gold_map[filename]

variant = "trf_aug"     # try: sm_base, sm_aug, trf_base, trf_aug
section = "item_10"     # or "item_7"

print(f"\nFILE: {filename}")
print(f"Company: {filing.company}")
print(f"Variant: {variant} | Section: {section}")
print("-" * 70)

# run pipeline directly (no harness magic)
nlp = pipelines[variant]
text = filing.item_10_text if section == "item_10" else filing.item_7_text
doc = nlp(text)

print("\nRAW spaCy ENTS (first 20):")
for e in doc.ents[:20]:
    print(f"  {e.label_:8s} {e.start_char:5d}:{e.end_char:5d}  → {e.text!r}")

# routed predictions (what you actually score)
preds = routed_predictions(doc, section=section)

print(f"\nROUTED PREDICTIONS (first 20)  n={len(preds)}:")
for s, e, lbl, txt in preds[:20]:
    print(f"  {lbl:8s} {s:5d}:{e:5d}  → {txt!r}")

# gold spans for this section
gold_spans = [
    a for a in gold.annotations
    if a.section == section
]

print(f"\nGOLD SPANS (first 20)  n={len(gold_spans)}:")
for a in gold_spans[:20]:
    print(f"  {a.label:8s} {a.start:5d}:{a.end:5d}  → {a.text!r}")

# text window helper (inline, no function)
def show_window(start, end, pad=60):
    s0 = max(0, start - pad)
    e0 = min(len(text), end + pad)
    return text[s0:start] + "[[" + text[start:end] + "]]" + text[end:e0]

# show a few side-by-side examples
print("\nSIDE-BY-SIDE EXAMPLES")
print("-" * 30)

for i in range(min(5, len(preds))):
    s, e, lbl, txt = preds[i]
    print(f"\nPRED {lbl} {s}:{e}")
    print(show_window(s, e))

for i in range(min(5, len(gold_spans))):
    a = gold_spans[i]
    print(f"\nGOLD {a.label} {a.start}:{a.end}")
    print(show_window(a.start, a.end))



FILE: 1017655_10K_2020_0001654954-21-003649.json
Company: PAID INC
Variant: trf_aug | Section: item_10
----------------------------------------------------------------------

RAW spaCy ENTS (first 20):
  CARDINAL     5:    7  → '10'
  TITLE      250:  253  → 'CEO'
  TITLE      255:  258  → 'CFO'
  PERSON     259:  270  → 'David Scott'
  TITLE      271:  274  → 'COO'
  PERSON     275:  288  → 'Andrew Pilaro'
  TITLE      289:  297  → 'Director'
  PERSON     298:  312  → 'Laurie Bradley'
  TITLE      313:  321  → 'Director'
  PERSON     322:  333  → 'David Ogden'
  TITLE      334:  342  → 'Director'
  PERSON     343:  356  → 'Andrew Pilaro'
  DATE       375:  394  → 'September 19, 2000,'
  EVENT      418:  457  → 'the 2001 Annual Meeting of Stockholders'
  DATE       515:  529  → 'March 27, 2021'
  CARDINAL   609:  613  → 'five'
  CARDINAL   627:  632  → 'three'
  PERSON     658:  673  → 'W. Austin Lewis'
  PERSON     682:  693  → 'Allan Pratt'
  GPE        754:  762  → 'Delaware'

ROUT

In [36]:
from pathlib import Path

EXPORT_DIR = Path(PROJECT_ROOT) / "exports"
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

scores_path = EXPORT_DIR / "scores_per_doc.csv"
micro_path  = EXPORT_DIR / "scores_micro.csv"

scores_df.to_csv(scores_path, index=False)
micro_df.to_csv(micro_path, index=False)

print("Wrote:", scores_path)
print("Wrote:", micro_path)


Wrote: /content/drive/MyDrive/Task 3/exports/scores_per_doc.csv
Wrote: /content/drive/MyDrive/Task 3/exports/scores_micro.csv
