# CRC UDS Prototype (2025)
This notebook ingests:
- Scanned CRC reports (PDF) via OCR
- FOBT CSV extract

It outputs per-patient CRC numerator status for 2025 with an auditor-friendly evidence string.


In [None]:
import os, re, json, datetime as dt
import pandas as pd
from dateutil.relativedelta import relativedelta
import pytesseract
from pdf2image import convert_from_path


In [None]:
import glob, os

# Auto-discover CRC-related PDFs in /mnt/data
PDF_GLOBS = [
    "/mnt/data/*colonoscopy*.pdf",
    "/mnt/data/*_fit*.pdf",
    "/mnt/data/*_ifobt*.pdf",
    "/mnt/data/*fecal_ia*.pdf",
    "/mnt/data/*fobt*.pdf",
]
PDF_PATHS = sorted({p for g in PDF_GLOBS for p in glob.glob(g)})

FOBT_CSV_PATH = "/mnt/data/fobt.csv"
REPORTING_YEAR = 2025

print(f"Found {len(PDF_PATHS)} PDFs:")
for p in PDF_PATHS:
    print(" -", os.path.basename(p))
print("FOBT CSV:", os.path.basename(FOBT_CSV_PATH), "exists:", os.path.exists(FOBT_CSV_PATH))

In [None]:
import os, re, datetime as dt
from dataclasses import dataclass
from typing import Optional, Literal, Dict, Any, List, Tuple

DATE_PAT = re.compile(r'(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})')

def _to_iso(m:int,d:int,y:int) -> Optional[str]:
    if y < 100:
        y = 2000 + y if y < 50 else 1900 + y
    try:
        return dt.date(y,m,d).isoformat()
    except Exception:
        return None

def parse_dates(text: str) -> List[Tuple[str, str]]:
    out=[]
    for m,d,y in DATE_PAT.findall(text or ""):
        iso = _to_iso(int(m), int(d), int(y))
        if iso:
            out.append((iso, f"{m}/{d}/{y}"))
    return out

def looks_like_nonclinical_line(line: str) -> bool:
    l=(line or "").lower()
    if "electronically signed" in l or "signed by" in l:
        return True
    if "dob" in l:
        return True
    if "data:text" in l or "base64" in l:
        return True
    return False

def filename_patient_id(path: str) -> str:
    base=os.path.basename(path)
    m=re.match(r'^(\d+)', base)
    return m.group(1) if m else base

import pdfplumber
from pdf2image import convert_from_path
import pytesseract

KEYWORDS = ["colonoscopy","occult","fecal","fit","fobt","ifobt","colofit","guaiac"]

def extract_first_page_text(pdf_path: str, ocr_dpi: int = 200, min_chars: int = 140) -> str:
    """Extract first-page text.
    Strategy:
      1) Try native PDF text (fast).
      2) If it looks like a scan (often just Name/DOB header) or lacks CRC keywords, OCR the page image and return OCR text.
    """
    text = ""
    try:
        with pdfplumber.open(pdf_path) as pdf:
            if len(pdf.pages) > 0:
                text = (pdf.pages[0].extract_text() or "")
    except Exception:
        text = ""

    text_l = (text or "").lower()
    keyword_hit = any(k in text_l for k in KEYWORDS)

    # Heuristics for "header-only" PDFs (these were causing colonoscopy scans to be misread as DOB dates)
    header_only = False
    if text:
        lines = [ln.strip().lower() for ln in text.splitlines() if ln.strip()]
        # Typical header-only pattern: Name/DOB/Date with no clinical content
        if len(lines) <= 5 and any("dob" in ln for ln in lines) and not any(k in text_l for k in ["procedure", "colonoscopy", "occult", "fecal", "fit", "fobt"]):
            header_only = True

    # If native text looks good enough, keep it
    if text and len(text.strip()) >= min_chars and keyword_hit and not header_only:
        return text

    # Otherwise OCR the first page (works for scans/faxes that contain the real content as an image)
    try:
        pages = convert_from_path(pdf_path, dpi=ocr_dpi, first_page=1, last_page=1)
        if pages:
            # Larger timeout avoids blank returns on dense faxes
            ocr = pytesseract.image_to_string(pages[0], config="--psm 6", timeout=20)
            return ocr or text or ""
    except Exception:
        pass

    return text or ""


EventType = Literal["FOBT","FIT","FIT_DNA","SIGMOIDOSCOPY","CT_COLONOGRAPHY","COLONOSCOPY"]

@dataclass
class ScreeningEvent:
    patient_id: str
    event_type: EventType
    event_date: Optional[str]
    source: Literal["pdf","structured_csv"]
    confidence: float
    needs_review: bool
    evidence: Dict[str, Any]

def classify_doc(text: str) -> Optional[EventType]:
    t=(text or "").lower()
    if "colonoscopy" in t:
        return "COLONOSCOPY"
    if "occult blood" in t and "fecal" in t:
        return "FIT"  # includes IA (FIT)
    if "colofit" in t:
        return "FIT"
    if "\bfit\b" in t and "fecal" in t:
        return "FIT"
    if "ifobt" in t or "fobt" in t or "guaiac" in t:
        return "FOBT"
    return None

def extract_labeled_dates(text: str) -> Dict[str, List[str]]:
    labels = {"procedure":[],"collection":[],"received":[],"result":[],"other":[]}
    for line in (text or "").splitlines():
        if not line.strip():
            continue
        if looks_like_nonclinical_line(line):
            continue
        ll=line.lower()
        label="other"
        if re.search(r'procedure\s*date|date\s*of\s*procedure|date\s*of\s*service|\bdos\b', ll):
            label="procedure"
        elif re.search(r'collection\s*date|date\s*of\s*collection|\bcollected\b', ll):
            label="collection"
        elif re.search(r'\breceived\b', ll):
            label="received"
        elif re.search(r'\bresult\b|\breported\b|\bfinal\b', ll):
            label="result"
        if re.search(r'\b(al|on)\s*\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', ll):
            label="collection"
        for iso,_ in parse_dates(line):
            labels[label].append(iso)
    for k,v in labels.items():
        seen=set(); nv=[]
        for d in v:
            if d not in seen:
                nv.append(d); seen.add(d)
        labels[k]=nv
    return labels

def choose_best_date(event_type: EventType, labeled: Dict[str,List[str]], all_dates: List[str]) -> Tuple[Optional[str], float, bool, str]:
    if event_type=="COLONOSCOPY":
        pref="procedure"
    elif event_type in ["FIT","FOBT"]:
        pref="collection"
    else:
        pref="other"

    candidates = labeled.get(pref, []).copy()
    if not candidates:
        candidates = labeled.get("result",[]) + labeled.get("received",[]) + labeled.get("other",[])
    if not candidates and all_dates:
        candidates = list(dict.fromkeys(all_dates))

    if not candidates:
        return None, 0.2, True, "no date found"

    # choose most recent candidate, but avoid obvious far-past DOB-like dates by clipping to reasonable years
    def year_ok(iso):
        y=int(iso[:4])
        return 1990 <= y <= (dt.date.today().year + 1)
    filtered=[d for d in candidates if year_ok(d)]
    if filtered:
        candidates=filtered

    best = max(candidates)
    conf = 0.9 if (labeled.get(pref) and best in labeled[pref]) else 0.65
    needs_review = False
    rationale = f"picked {best} from {pref if conf>0.8 else 'fallback'}"

    distinct = sorted(set(sum(labeled.values(), [])))
    if conf < 0.8 and len(distinct) >= 2:
        needs_review = True
        conf = min(conf, 0.55)
        rationale += "; multiple conflicting dates"

    return best, conf, needs_review, rationale

def extract_event_from_pdf(pdf_path: str, text: str) -> ScreeningEvent:
    pid = filename_patient_id(pdf_path)
    fname = os.path.basename(pdf_path).lower()
    # Filename hints are very reliable in your dataset (e.g., *_colonoscopy.pdf)
    if "colonoscopy" in fname:
        et = "COLONOSCOPY"
    elif "fit" in fname and "colonoscopy" not in fname:
        # covers *_fit.pdf and many *_ifobt.pdf that are actually FIT immunoassay
        et = "FIT"
    elif "fobt" in fname:
        et = "FOBT"
    else:
        et = classify_doc(text) or "FOBT"
    text_clean = chr(10).join([ln for ln in (text or "").splitlines() if "dob" not in ln.lower()])
    labeled = extract_labeled_dates(text_clean)
    all_dates = [d for d,_ in parse_dates(text_clean)]
    best_date, conf, needs_review, rationale = choose_best_date(et, labeled, all_dates)

    lines = [ln.strip() for ln in (text or "").splitlines() if ln.strip()]
    if et == "COLONOSCOPY":
        keys = ["procedure", "colonoscopy", "impression", "indication", "findings", "date of service", "dos"]
    else:
        keys = ["collection date", "collected", "occult blood", "fecal", "fit", "fobt", "ifobt", "colofit"]
    snippet_lines = []
    for ln in lines:
        lnl = ln.lower()
        if any(k in lnl for k in keys):
            snippet_lines.append(ln)
        if len(snippet_lines) >= 8:
            break
    snippet = (" ".join(snippet_lines) or (text or "")[:600]).replace(chr(10), " ")

    # If we found FIT/FOBT but no confident collection/procedure date, flag for review
    if et in ["FIT","FOBT"] and (best_date is None or conf < 0.8):
        needs_review = True

    return ScreeningEvent(
        patient_id=pid,
        event_type=et,
        event_date=best_date,
        source="pdf",
        confidence=float(conf),
        needs_review=bool(needs_review),
        evidence={
            "file": os.path.basename(pdf_path),
            "rationale": rationale,
            "snippet": snippet,
            "dates_by_label": labeled,
        }
    )

In [None]:
# Extract events from PDFs (fast text extraction with OCR fallback)
pdf_rows=[]
for pdf_path in PDF_PATHS:
    text = extract_first_page_text(pdf_path)
    ev = extract_event_from_pdf(pdf_path, text)
    pdf_rows.append(ev.__dict__)

df_pdf = pd.DataFrame(pdf_rows)
df_pdf

In [None]:
# Parse FOBT CSV extract and convert to ScreeningEvent rows
import pandas as pd
import numpy as np
import datetime as dt

def detect_header_row(csv_path: str, max_scan: int = 40) -> int:
    with open(csv_path, 'r', errors='ignore') as f:
        for i,line in enumerate(f):
            if i>max_scan:
                break
            l=line.lower()
            if 'person' in l and ('nbr' in l or '#' in l) and ('mrn' in l or 'enc' in l or 'date' in l):
                return i
    return 0

hdr = detect_header_row(FOBT_CSV_PATH)
df_csv = pd.read_csv(FOBT_CSV_PATH, header=hdr)
df_csv.columns = [c.strip() for c in df_csv.columns]

# Identify key columns
def find_col(cols, patterns):
    for p in patterns:
        for c in cols:
            if re.search(p, c, re.I):
                return c
    return None

pid_col = find_col(df_csv.columns, [r'person\s*nbr', r'person\s*#', r'patient'])
encdate_col = find_col(df_csv.columns, [r'enc\s*date', r'collection\s*date', r'collected', r'date'])
test_col = find_col(df_csv.columns, [r'test', r'order', r'procedure', r'description', r'name'])

def parse_any_date(x):
    if pd.isna(x): return None
    s=str(x).strip()
    # try mm/dd/yyyy
    m=DATE_PAT.search(s)
    if not m: 
        return None
    mm,dd,yy = map(int, m.groups())
    iso=_to_iso(mm,dd,yy)
    return iso

csv_events=[]
for idx,row in df_csv.iterrows():
    pid = str(row.get(pid_col, '')).strip()
    if not pid or pid.lower()=='nan':
        continue
    ev_date = parse_any_date(row.get(encdate_col, None))
    if not ev_date:
        continue
    test_name = str(row.get(test_col, '')).lower() if test_col else ''
    et = 'FIT' if 'fit' in test_name and 'dna' not in test_name else 'FOBT'
    csv_events.append({
        "patient_id": pid,
        "event_type": et,
        "event_date": ev_date,
        "source": "structured_csv",
        "confidence": 0.95,
        "needs_review": False,
        "evidence": {"source_file": os.path.basename(FOBT_CSV_PATH), "row_index": int(idx), "test_name": test_name}
    })

df_csv_events = pd.DataFrame(csv_events)
df_csv_events.head(), df_csv_events.shape

In [None]:
# CRC 2025 rule engine + merge, de-dup, and scoring
import datetime as dt
from dateutil.relativedelta import relativedelta

year_start = dt.date(REPORTING_YEAR,1,1)
year_end = dt.date(REPORTING_YEAR,12,31)

def counts_for_crc(event_type: str, event_date_iso: str) -> bool:
    if not event_date_iso:
        return False
    d = dt.date.fromisoformat(event_date_iso)
    if event_type in ["FOBT","FIT"]:
        return year_start <= d <= year_end
    if event_type == "FIT_DNA":
        return (year_start - relativedelta(years=2)) <= d <= year_end
    if event_type in ["SIGMOIDOSCOPY","CT_COLONOGRAPHY"]:
        return (year_start - relativedelta(years=4)) <= d <= year_end
    if event_type == "COLONOSCOPY":
        return (year_start - relativedelta(years=9)) <= d <= year_end
    return False

# Merge
df_all = pd.concat([
    df_pdf[["patient_id","event_type","event_date","source","confidence","needs_review","evidence"]],
    df_csv_events[["patient_id","event_type","event_date","source","confidence","needs_review","evidence"]],
], ignore_index=True)

# De-dup: keep highest confidence for same patient/type/date/source
df_all["dedup_key"] = df_all["patient_id"].astype(str)+"|"+df_all["event_type"].astype(str)+"|"+df_all["event_date"].astype(str)
df_all = (df_all
          .sort_values(["confidence","source"], ascending=[False, True])
          .drop_duplicates("dedup_key", keep="first")
          .drop(columns=["dedup_key"])
         )

df_all["counts_crc_2025"] = df_all.apply(lambda r: counts_for_crc(r["event_type"], r["event_date"]), axis=1)
df_all.sort_values(["patient_id","counts_crc_2025","event_date"], ascending=[True, False, False]).head(30)

In [None]:
# Pick best numerator evidence per patient (most recent qualifying; tie-break by confidence)
def pick_best(df):
    q = df[df["counts_crc_2025"]==True].copy()
    if q.empty:
        # keep best available for review (most recent date regardless) if any
        df2 = df.copy()
        df2 = df2[df2["event_date"].notna()]
        if df2.empty:
            return pd.Series({
                "numerator_met": False,
                "best_event_type": None,
                "best_event_date": None,
                "best_source": None,
                "best_confidence": None,
                "needs_review": True,
                "evidence_summary": "no dated CRC evidence found",
            })
        df2 = df2.sort_values(["event_date","confidence"], ascending=[False,False]).head(1)
        row = df2.iloc[0]
        return pd.Series({
            "numerator_met": False,
            "best_event_type": row.event_type,
            "best_event_date": row.event_date,
            "best_source": row.source,
            "best_confidence": row.confidence,
            "needs_review": True,
            "evidence_summary": f"Best non-qualifying evidence: {row.event_type} {row.event_date} ({row.source}); {row.evidence.get('file', row.evidence.get('source_file',''))}",
        })
    q = q.sort_values(["event_date","confidence"], ascending=[False,False]).head(1)
    row = q.iloc[0]
    file_or_row = row.evidence.get('file', row.evidence.get('source_file',''))
    snippet = row.evidence.get('snippet','')
    summary = f"{row.event_type} on {row.event_date} via {row.source} ({file_or_row})"
    if snippet:
        summary += f" | snippet: {snippet[:160]}"
    return pd.Series({
        "numerator_met": True,
        "best_event_type": row.event_type,
        "best_event_date": row.event_date,
        "best_source": row.source,
        "best_confidence": row.confidence,
        "needs_review": bool(row.needs_review),
        "evidence_summary": summary,
    })

df_best = df_all.groupby("patient_id", as_index=False).apply(pick_best).reset_index(drop=True)
df_best = df_best.sort_values(["numerator_met","needs_review","best_event_date"], ascending=[False, True, False])

# Save auditor table
out_csv = "audit_results/crc_2025_audit_table_updated.csv"
df_best.to_csv(out_csv, index=False)
out_csv, df_best.head(50)