# EDGAR Ground Truth Single Feature Extraction/Validation

**Goal**: Validate specific features against Ground Truth with high precision.

**Methodology**:
1. **Protocol A (Extraction)**: Use LLM to extract `Value ||| Evidence`. The evidence (quote) is required.
2. **Protocol B (Validation)**: Compare extracted value with Ground Truth using Fuzzy Matching.
3. **Protocol C (Judge)**: (Optional) Use LLM to judge specific ambiguous cases.
4. **Protocol D (Discovery)**: Bootstrap Ground Truth for NEW features by extracting and manually reviewing evidence.


In [None]:
# 1. Installation & Setup
# !pip install -q torch transformers datasets pandas tqdm accelerate bitsandbytes thefuzz python-Levenshtein

import os
import re
import pandas as pd
import torch
from tqdm import tqdm
from datasets import load_dataset
from transformers import AutoModelForCausalLM, AutoTokenizer

# Fuzzy Matching for Validation
from thefuzz import fuzz

pd.set_option('display.max_colwidth', None)
print("Setup Complete.")

In [None]:
# 2. Load Model (Qwen 2.5-7B-Instruct)
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, logging

logging.set_verbosity_error()

MODEL_NAME = "Qwen/Qwen2.5-7B-Instruct"
DTYPE = torch.bfloat16
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True, trust_remote_code=True)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=DTYPE,
    device_map="auto",
    trust_remote_code=True,
)
model.eval()
print("Loaded:", MODEL_NAME, "| device:", DEVICE)

In [None]:
# 3. Configuration & Question Bank

# TARGET FEATURE to Run (Change this to switch focus)
TARGET_FEATURE_ID = "original_incorporation_state"
GT_COLUMN_MAPPING = {
    "company_name": "company_name_truth",
    "original_incorporation_state": "original_inc_state_truth",
    "original_incorporation_year": "original_inc_year_truth",
    "employee_count": "employee_count_truth",
}

QUESTION_BANK = {
    "section_1": [
        {
            "id": "company_name",
            "prompt": (
                "What is the exact legal name of the registrant? "
                "1. Look for the very first sentence of the 'Business' section or the cover page intro "
                "(e.g., 'Apple Inc. (the Registrant)...'). "
                "2. Do NOT use 'Doing Business As' (DBA) names or brand names. "
                "3. Do NOT include the stock ticker symbol. "
                "4. Include legal suffixes like 'Inc.', 'Corp.', 'Ltd.' if present. "
                "Answer with ONLY the legal name string."
            ),
        },
        {
            "id": "headquarters_city",
            "prompt": (
                "In which city are the registrant's *principal executive offices* physically located? "
                "1. Look for the address under 'Executive Offices' or 'Address of Principal Executive Offices'. "
                "2. CRITICAL WARNING: Do NOT return the city of the 'Registered Agent' or 'State of Incorporation' "
                "(e.g., ignore 'Wilmington' or 'Dover' unless the CEO actually works there). "
                "3. Ignore P.O. Boxes. "
                "Answer with ONLY the city name."
            ),
        },
        {
            "id": "original_incorporation_state",
            "prompt": (
                "In which U.S. state was the registrant *originally* incorporated or organized? "
                "Follow this strict hierarchy: "
                "1. PRIORITIZE HISTORY: Look for phrases like 'originally incorporated in', 'formerly organized in', "
                "or 'predecessor company incorporated in'. "
                "2. REINCORPORATION RULE: If the company reincorporated (e.g., moved from California to Delaware), "
                "you MUST return the OLD state (California), not the current one. "
                "3. MERGER EXCEPTION: Only if the registrant is a *new* successor entity formed by a merger, "
                "return the state of that successor. "
                "4. If no history is mentioned, return the current state. "
                "Answer with ONLY the state name."
            ),
        },
        {
            "id": "original_incorporation_year",
            "prompt": (
                "In which year was the registrant *originally* incorporated or organized? "
                "1. IGNORE 'FOUNDED' dates. Only look for 'incorporated', 'organized', or 'formed'. "
                "2. REINCORPORATION RULE: If the text says 'originally incorporated in 1980' and 'reincorporated in 1995', "
                "return the EARLIEST year (1980). "
                "3. MERGER EXCEPTION: If the current entity was formed by a merger of equals, use the year of that merger. "
                "Answer with ONLY the year (YYYY)."
            ),
        },
        {
            "id": "employee_count",
            "prompt": (
                "What is the total number of employees the registrant has? "
                "1. PREFER FULL-TIME: If the text distinguishes between full-time and part-time, return the full-time count. "
                "2. If only 'total' is given, use that. "
                "3. EXCLUDE: Do not count independent contractors, agents, or temporary staff unless they are the only number given. "
                "4. FORMAT: Remove commas and return ONLY the integer (e.g., return 14500, not 14,500). "
                "If the number is 'approximately 5,000', return 5000."
            ),
        },
        {
            "id": "headquarters_state",
            "prompt": (
                "In which U.S. state are the registrant's *principal executive offices* physically located? "
                "1. This is the state where the HQ building is, NOT necessarily the state of incorporation. "
                "2. CRITICAL: If the text says 'Incorporated in Delaware' but 'Executive offices in California', "
                "return CALIFORNIA. "
                "Answer with ONLY the state name."
            ),
        },
    ],
    "section_10": [
        {
            "id": "ceo_lastname",
            "prompt": (
                "What is the LAST NAME of the registrant's current Chief Executive Officer (CEO)? "
                "1. Look for 'Chief Executive Officer', 'CEO', or 'Principal Executive Officer'. "
                "2. If 'Co-CEOs' are listed, pick the first one mentioned. "
                "3. EXCLUDE titles (Mr., Dr.) and first/middle names. "
                "4. If the CEO has a compound last name (e.g., 'Von Trap'), include the full last name. "
                "Answer with ONLY the last name string."
            ),
        }
    ],
}

In [None]:
# 4. Protocol A
def ask_llm(context, prompt, model, tokenizer, max_new_tokens=50):
    """Sends a prompt to the LLM with the given context."""
    if not context or not context.strip():
        return None

    model_limit = getattr(tokenizer, "model_max_length", 32768) 
    available_for_context = model_limit - 650
    char_limit = available_for_context * 4

    if len(context) > char_limit:
        context = context[:char_limit]

    messages = [
        {"role": "system", "content": "You are a precise data extraction assistant."},
        {
            "role": "user",
            "content": f"Read this SEC 10-K filing excerpt and answer the question. \nQuestion: {prompt} \nIf the information is not present in the context, reply with 'NULL'.\nContext: \"{context}\"",
        },
    ]

    text = tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    inputs = tokenizer([text], return_tensors="pt").to(model.device)

    with torch.no_grad():
        outputs = model.generate(
            **inputs, max_new_tokens=max_new_tokens, do_sample=False, temperature=0.0
        )

    # Clean response
    response = tokenizer.decode(
        outputs[0][inputs.input_ids.shape[1] :], skip_special_tokens=True
    )
    answer = response.strip().split("\n")[0]
    
    # Simple clean to remove periods like "Delaware."
    answer = answer.rstrip(".")

    if "null" in answer.lower() or "not found" in answer.lower():
        return None

    return answer

In [None]:
# 3. POST-HOC EVIDENCE FINDER (No Keywords, Just Search)
# 2. NEW: THE LLM EVIDENCE RETRIEVER (The "Verifier")
def get_evidence_with_llm(context, question ,answer_val, model, tokenizer):
    """
    Asks the LLM to locate the exact sentence supporting the specific answer 'answer_val'.
    """ 
    if not context or not context.strip():
        return "NULL" 

    if not answer_val or answer_val.lower() == "null":
        return "NULL"

    model_limit = getattr(tokenizer, "model_max_length", 32768) 
    available_for_context = model_limit - 650
    char_limit = available_for_context * 4

    if len(context) > char_limit:
        context = context[:char_limit] 
    
    head, sep, tail = question.partition("?")
    question_only = head + sep
    # Specific Prompt to force "Quote Finding"
    prompt = (
        f"You previously determined for this question: {question_only} \n that the answer is: '{answer_val}'.\n"
        "Find the EXACT sentence in the text that supports this answer.\n"
        "1. Copy the sentence word-for-word from the text.\n"
        "2. Do not rewrite or summarize it.\n"
        f"3. If {answer_val} is NULL, return NULL."
    )

    messages = [
        {"role": "system", "content": "You are a forensic text analyst. You only output exact quotes."},
        {"role": "user", "content": f"Task: {prompt}\n\n Text: \"{context}\""}
    ] 

    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer([text], return_tensors="pt").to(model.device)

    with torch.no_grad():
        # Allow more tokens (200) because sentences can be long
        outputs = model.generate(**inputs, max_new_tokens=200, do_sample=False, temperature=0.0)

    evidence_candidate = tokenizer.decode(outputs[0][inputs.input_ids.shape[1] :], skip_special_tokens=True).strip()
    
    return evidence_candidate


In [None]:
# 3. NEW: THE VALIDATOR (The "Guardrail")
def get_fingerprint(text):
    return re.sub(r'[\W_]+', '', text).lower() 

def validate_evidence(full_text, evidence_candidate):
    """
    Checks if the LLM's 'exact quote' actually exists in the text.
    Returns: (final_evidence_string)
    """
    if not evidence_candidate or "null" in evidence_candidate.lower():
        return "NULL"

    # 1. Exact Match Check
    if evidence_candidate in full_text:
        return evidence_candidate
    
    evidence_candidate = evidence_candidate.strip('"\'')
    # 2. Robust Match (Ignore case and extra whitespace)
    # LLMs often strip extra spaces or change capitalization slightly.
    clean_text = " ".join(full_text.split()).lower()
    clean_evd = " ".join(evidence_candidate.split()).lower() 

    fp_text = get_fingerprint(clean_text)
    fp_evd = get_fingerprint(clean_evd)

    if fp_evd in fp_text:
        return "Verified (Approx - Punctuation Ignored): " + evidence_candidate

    return f"HALLUCINATION_FLAG: {evidence_candidate}"

In [None]:
def extract_feature(doc, feature_id, model, tokenizer):
    """
    1. Ask LLM for Answer.
    2. Ask LLM for Evidence of that Answer.
    3. Python Validates the Evidence exists.
    """
    # Setup
    config = None
    target_section = "section_1"
    for section, questions in QUESTION_BANK.items():
        for q in questions:
            if q["id"] == feature_id:
                config = q
                target_section = section
                break
    if not config:
        return None, None
    text = doc.get(target_section, "")

    # Step 1: Extract Value
    val_ext = ask_llm(text, config["prompt"], model, tokenizer)

    # Step 2: Get Evidence via LLM
    evd_candidate = get_evidence_with_llm(text, config["prompt"], val_ext, model, tokenizer)

    # Step 3: Validate Evidence (Python Check)
    final_evd = validate_evidence(text, evd_candidate) 
    
    return val_ext, final_evd

In [None]:
# --- Helper Functions (Your provided normalizers) ---
def normalize_text(val):
    if pd.isna(val) or str(val).lower() in ["null", "nan", "none", ""]:
        return "NULL"
    return str(val).strip().lower().replace(".", "").replace(",", "")

def normalize_year(val):
    try:
        if pd.isna(val) or str(val).lower() in ['null', 'nan', '']:
            return "NULL"
        # Handles 1996.0 -> 1996
        return str(int(float(val)))
    except:
        return "NULL"


def normalize_count(val):
    try:
        if pd.isna(val) or str(val).lower() in ["null", "nan", "none", ""]:
            return "NULL"
        clean_string = str(val).replace(",", "").strip()
        return str(int(float(clean_string)))
    except:
        return "NULL"

In [None]:
# 5. Protocol B: Validation Logic

# --- MAPPING FEATURES TO NORMALIZERS ---
NORMALIZER_MAP = {
    "company_name": normalize_text,
    "headquarters_city": normalize_text,
    "original_incorporation_state": normalize_text,
    "headquarters_state": normalize_text,
    "ceo_lastname": normalize_text,
    "original_incorporation_year": normalize_year,
    "employee_count": normalize_count,
}


def validate_row(val_ext, val_gt, feature_id):
    """
    Compares Extracted Value vs Ground Truth with Normalization.
    """
    # 1. Get the correct normalizer for this feature
    cleaner = NORMALIZER_MAP.get(feature_id, normalize_text)  # Default to text

    # 2. Normalize both inputs
    s1 = cleaner(val_ext)
    s2 = cleaner(val_gt)

    # 3. Check for NULLs
    if s1 == "NULL" and s2 == "NULL":
        return 100, "1"
    if s1 == "NULL" or s2 == "NULL":
        return 0, "0"

    # 4. Exact Match (Post-Normalization)
    if s1 == s2:
        return 100, "1"

    # 5. Fuzzy Match (Fallback for typos in text)
    # We only fuzzy match if it's NOT a year or number (those should be exact)
    if feature_id not in ["original_incorporation_year", "employee_count"]:
        score = fuzz.ratio(s1, s2)
        if score > 90:
            return score, "1"
        if score < 50:
            return score, "0"
        return score, "?"  # Ambiguous

    return 0, "0"  # Fail if numbers/years don't match exactly


def run_validation_loop(target_feature, gt_csv_path, limit=10):
    print(f"--- STARTING VALIDATION FOR: {target_feature} ---")

    # 1. Load Ground Truth
    df_gt = pd.read_csv(gt_csv_path)
    gt_col = GT_COLUMN_MAPPING.get(target_feature)

    if not gt_col or gt_col not in df_gt.columns:
        print(f"Error: Ground Truth column '{gt_col}' not found in CSV.")
        return pd.DataFrame()

    # 2. Load Documents (Streaming with Fix for Parquet Revision)
    dataset = load_dataset(
        "c3po-ai/edgar-corpus",
        "default",
        split="train",
        streaming=True,
        revision="refs/convert/parquet",
    )

    results = []

    count = 0
    for i, doc in enumerate(dataset):
        fname = doc.get("filename")
        # Ineffecient for streaming but necessary if GT is sparse
        if fname not in df_gt["filename"].values:
            continue

        if count >= limit:
            break
        count += 1

        print(f"Processing {fname}...")

        # Extract
        val_ext, evd_ext = extract_feature(doc, target_feature, model, tokenizer)

        # Get Truth
        val_gt = df_gt.loc[df_gt["filename"] == fname, gt_col].values[0]

        # Validate
        score, status = validate_row(val_ext, val_gt, target_feature)

        results.append(
            {
                "Filename": fname,
                "GT_Value": val_gt,
                "Ext_Value": val_ext,
                "Protocol_B_Score": score,
                "Protocol_B_Status": status,
                "Evidence": evd_ext,
            }
        )

    return pd.DataFrame(results)

In [None]:
# 6. Run Validation

# Update path to your actual GT CSV location
GT_PATH = "../csvs/ground_truth/Ground Truth Data - edgar_gt_verified_slim.csv"

df_val = run_validation_loop(
    target_feature=TARGET_FEATURE_ID, # Defined in Config cell
    gt_csv_path=GT_PATH,
    limit=5
)
df_val.fillna("NULL", inplace=True) 

# Display Results
print("\n--- VALIDATION RESULTS ---")
display(df_val)

In [None]:
def ask_judge(gt_value, ext_value, evidence):
    """
    Sends the specific Prompt to model and enforces a 1/0 response.
    """
    # 1. Construct the User Prompt
    user_prompt = (
        f"Ground Truth says: '{gt_value}'. "
        f"Extraction says: '{ext_value}' based on evidence '{evidence}'. "
        "Is the extraction factually correct despite the text mismatch? "
        "Answer 1 for Yes, 0 for No."
    )

    # 2. System Prompt to force binary output
    system_prompt = (
        "You are a strict Judge. Your job is to resolve ambiguity in data extraction. "
        "Compare the Extraction against the Ground Truth. "
        "If they represent the same underlying fact (even if phrased differently), answer 1. "
        "If they are different or the evidence is wrong, answer 0. "
        "Output ONLY the number '1' or '0'."
    )

    # 3. Format for Qwen (Chat Template)
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt}
    ]
    
    # 4. Tokenize & Generate
    text_input = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer([text_input], return_tensors="pt").to(model.device)

    with torch.no_grad():
        # strict limit: we only need 1 token (the number), but give 5 buffer
        outputs = model.generate(
            **inputs, 
            max_new_tokens=5, 
            do_sample=False, 
            temperature=0.0
        )
    
    # 5. Decode & Clean
    # Slice off the input prompt to get just the response
    response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
    clean_resp = response.strip()
    
    # 6. Parse strictly
    if "1" in clean_resp:
        return "1"
    elif "0" in clean_resp:
        return "0"
    else:
        # Fallback: if Qwen chats (e.g., "The answer is 0"), catch the digit
        if "1" in clean_resp and "0" not in clean_resp: return "1"
        if "0" in clean_resp and "1" not in clean_resp: return "0"
        return "0" # Default to Fail if unclear

In [None]:
# 7. Protocol C: The Judge (Binary 1/0)
# Run this cell to resolve ambiguous cases ('?') from Protocol B

def judge_row(row):
    # If it was already a clear pass (1) or fail (0), keep it.
    if row['Protocol_B_Status'] in ['1', '0']:
        return row['Protocol_B_Status']
        
    # 2. Handle missing data gracefully
    if pd.isna(row['Ext_Value']) or pd.isna(row['Evidence']):
        return "0"  
    
    print(f"Judging Row {row.name}...") 
    return ask_judge(row['GT_Value'], row['Ext_Value'], row['Evidence'])

if not df_val.empty:
    df_val['Judge_Valid'] = df_val.apply(judge_row, axis=1)
    display(df_val[[ 'GT_Value', 'Ext_Value', 'Protocol_B_Status', 'Judge_Valid', 'Evidence']])

In [None]:
(df_val['Judge_Valid'] == '1').sum() / 250

In [None]:
(df_val['Judge_Valid'] == '1').sum() / 250

## Protocol D: Discovery Mode (New Feature Extraction)

**Goal**: Extract a NEW feature that has no Ground Truth yet.

**Output**: Creates or Updates `discovered_GT.csv` so you can manually review the evidence and mark it as truth.

In [None]:
def discover_new_feature(target_feature, output_file="discovered_GT.csv", limit=50):
    print(f"--- STARTING DISCOVERY FOR: {target_feature} ---")

    update = False
    # 1. Load Existing CSV if available (Smart Append)
    if os.path.exists(output_file):
        print(f"Loading existing file: {output_file}")
        df_master = pd.read_csv(output_file)
        update = True
    else:
        print("Creating NEW discovery file.")
        df_master = pd.DataFrame(columns=["filename", "cik", "year"])

    # 2. Stream Data
    dataset = load_dataset(
        "c3po-ai/edgar-corpus",
        "default",
        split="train",
        streaming=True,
        revision="refs/convert/parquet",
    )

    new_rows = []
    updates = 0

    count = 0
    for doc in tqdm(dataset, total=limit):
        if count >= limit:
            break
        count += 1

        fname = doc.get("filename")

        # Check if we already have this file in our Master CSV
        existing_idx = df_master.index[df_master["filename"] == fname].tolist()

        # Extract Data
        val, evd = extract_feature(doc, target_feature, model, tokenizer)

        # Prepare data dict
        data = {f"{target_feature}_value": val, f"{target_feature}_evidence": evd}

        if existing_idx:
            # UPDATE existing row
            idx = existing_idx[0]
            for k, v in data.items():
                df_master.at[idx, k] = v
            updates += 1
        else:
            # CREATE new row
            # Ensure we capture identity columns
            row = {
                "filename": fname,
                "cik": doc.get("cik"),
                "year": doc.get("year"),
                **data,
            }
            new_rows.append(row)

    # 3. Save Results
    if new_rows:
        df_new = pd.DataFrame(new_rows)
        # Merge
        df_master = pd.concat([df_master, df_new], ignore_index=True)

    print(
        f"\nDiscovery Complete. Updated {updates} rows. Added {len(new_rows)} new rows."
    )

    filename = output_file
    if update:
        filename = output_file.replace(".csv", "_updated.csv") 
        
    df_master.fillna("NULL", inplace=True)
    df_master.to_csv(filename, index=False)
    return df_master.head()

In [None]:
# Example: Discover 'fiscal_year_end'
# Ensure you add 'fiscal_year_end' to QUESTION_BANK above first!

# UNCOMMENT TO RUN:
# df_disc = discover_new_feature("fiscal_year_end", limit=5)
# display(df_disc)