# 🧪 DocScribe — 03 · Validation & Grounding

**Purpose:**
Evaluate the clinical note extractor and ensure each component (chief complaint, assessment, diagnosis, orders, plan, follow-up) works as expected.

**Key steps:**
1. Load fallback regex extractor (works offline)
2. Create evaluation dataset with gold labels
3. Compute F1 and semantic similarity
4. Print detailed metrics and outputs

In [1]:
import os, re, json, time, sys
from pathlib import Path
from typing import List, Dict, Any, Tuple
from pprint import pprint
import numpy as np
from rapidfuzz.fuzz import partial_ratio

In [2]:
# --- PATCH: stronger regex extractor for 03_validation_eval.ipynb ---
from pathlib import Path
import sys, re, json
ROOT = Path("..").resolve()
SRC  = ROOT / "src"
SRC.mkdir(exist_ok=True)
(SRC / "__init__.py").write_text("", encoding="utf-8")

extract_py = r'''
from __future__ import annotations
import re, json
from typing import List, Dict, Any
from pydantic import BaseModel, Field

class ClinicalNote(BaseModel):
    chief_complaint: str = Field(default="")
    assessment: str = Field(default="")
    diagnosis: List[str] = Field(default_factory=list)
    orders: List[str] = Field(default_factory=list)
    plan: List[str] = Field(default_factory=list)
    follow_up: str = Field(default="")

SENT_SPLIT = re.compile(r"(?<=[.!?])\s+")
NONWORD    = re.compile(r"\s+")
LOWERPAST  = re.compile(r"\b(ordered|prescribed|given|advised|began)\b", re.I)

HEURISTICS = {
    "imaging": r"\b(x-?ray|cxr|ct|mri|ultra\s*sound|ultrasound|ekg|ecg|echo|xr)\b",
    "labs":    r"\b(cbc|cmp|a1c|bmp|ua|urinalysis|culture|strep(?:\s*test)?)\b"
}

MED_RX  = re.compile(r"\b([A-Z][a-zA-Z]+(?:\s[A-Za-z][a-zA-Z]+)*\s\d+\s?mg(?:\s(?:BID|TID|daily|q\d+h|x\d+))?)\b")
PLAN_RX = re.compile(r"\b(start|begin|give|prescribe|advise|recommend)\b([^.;]{1,120})", re.I)
ORD_RX  = re.compile(r"\b(order|ordered)\b([^.;]{1,160})", re.I)

def _sentences(txt: str) -> List[str]:
    txt = (txt or "").strip()
    if not txt:
        return []
    parts = SENT_SPLIT.split(txt)
    return [p.strip() for p in parts if p.strip()]

def _pick_chief_complaint(sents: List[str]) -> str:
    if not sents:
        return ""
    cc = sents[0]
    # trim durations like "for 3 days" at the tail for neatness
    cc = re.sub(r"\bfor\s+\d+\s+(?:day|days|wk|week|weeks)\b\.?$", "", cc, flags=re.I).strip()
    # prefer short symptom-like sentence if first is long
    if len(cc) > 120 and len(sents) > 1:
        return sents[1][:120]
    return cc[:160]

def _pick_assessment(text: str, sents: List[str]) -> str:
    # look for explicit markers
    m = re.search(r"\b(assessment\s*:)\s*([^.;]{1,160})", text, flags=re.I)
    if m:
        return NONWORD.sub(" ", m.group(2)).strip().rstrip(".")
    # otherwise find sentence with likely/suspect/impression
    for s in sents:
        if re.search(r"\b(likely|suspect|impression)\b", s, flags=re.I):
            return NONWORD.sub(" ", s).strip().rstrip(".")
    return ""

def _diagnosis_from_assessment(assessment: str) -> List[str]:
    if not assessment:
        return []
    # take trailing 6 words as a soft proxy for the condition
    tail = " ".join(assessment.split()[-8:])
    tail = tail.strip().strip(".")
    # drop bare markers
    if tail.lower() in {"likely", "suspect", "impression"}:
        return []
    return [tail] if tail else []

def _expand_imaging_labs(text: str) -> List[str]:
    out = []
    for patt in (HEURISTICS["imaging"], HEURISTICS["labs"]):
        for m in re.finditer(patt, text, flags=re.I):
            span = m.group(0)
            # try to capture an anatomical word just before (e.g., 'chest X-ray', 'ankle x-ray')
            start = max(0, m.start()-25)
            window = text[start:m.start()]
            pre = ""
            pm = re.search(r"(\b\w+\b)\s*$", window)
            if pm:
                pre = pm.group(1) + " "
            out.append((pre + span).strip())
    return out

def _split_list_phrase(phrase: str) -> List[str]:
    # split on ",", " and ", " & "
    parts = re.split(r",|\band\b|&", phrase, flags=re.I)
    parts = [NONWORD.sub(" ", p).strip() for p in parts]
    return [p for p in parts if p]

def _clean_dedup(items: List[str]) -> List[str]:
    seen = set(); out = []
    for it in items:
        it = NONWORD.sub(" ", it).strip().rstrip(".")
        if not it:
            continue
        low = it.lower()
        if low in seen:
            continue
        seen.add(low)
        out.append(it)
    return out

def _extract_followup(text: str) -> str:
    m = re.search(r"\b(follow\s*-?\s*up[^.]{0,80}|return[^.]{0,80}|re-?\s*evaluate[^.]{0,80})", text, flags=re.I)
    if not m:
        return ""
    fu = m.group(0)
    # normalize x5 day -> x5 days
    fu = re.sub(r"\bx(\d+)\s*day\b", r"x\1 days", fu, flags=re.I)
    return NONWORD.sub(" ", fu).strip().rstrip(".")

def _regex_extract(transcript: str) -> Dict[str, Any]:
    text = (transcript or "").strip()
    sents = _sentences(text)
    data = {"chief_complaint":"", "assessment":"", "diagnosis":[], "orders":[], "plan":[], "follow_up":""}

    # Chief complaint
    data["chief_complaint"] = _pick_chief_complaint(sents)

    # Assessment & diagnosis
    data["assessment"] = _pick_assessment(text, sents)
    data["diagnosis"]  = _diagnosis_from_assessment(data["assessment"])

    # Orders from keywords and 'order...' phrases
    orders = _expand_imaging_labs(text)
    for m in ORD_RX.finditer(text):
        phrase = m.group(2)
        orders.extend(_split_list_phrase(phrase))
    # Medications count both as orders & generate plan entries
    meds = []
    for m in MED_RX.finditer(text):
        meds.append(m.group(1))
    orders.extend(meds)

    # Plan from plan-verbs and explicit supportive words like RICE/ibuprofen (without dose)
    plans = []
    for m in PLAN_RX.finditer(text):
        plans.extend(_split_list_phrase(m.group(2)))
    # common conservative care tokens
    if re.search(r"\bRICE\b", text):
        plans.append("RICE")
    if re.search(r"\bibuprofen\b(?![^.]*mg)", text, flags=re.I):
        plans.append("ibuprofen")

    # If a med with dose was seen, add 'start <med>' to plan
    for med in meds:
        plans.append(f"start {med}")

    # Follow-up
    data["follow_up"] = _extract_followup(text)

    # Clean + de-dup
    data["orders"] = _clean_dedup(orders)
    data["plan"]   = _clean_dedup(plans)
    data["diagnosis"] = _clean_dedup(data["diagnosis"])

    return data

def extract_note(transcript: str):
    data = _regex_extract(transcript)
    note = ClinicalNote(**data)
    # raw output (for this regex version) = transcript itself, to keep interface stable
    return note, transcript
'''

(SRC / "extract_clinical.py").write_text(extract_py, encoding="utf-8")

# keep your existing composer
compose_py = r'''
from typing import Tuple
from .extract_clinical import ClinicalNote

def compose_note(note: ClinicalNote) -> Tuple[str, str]:
    s = note.chief_complaint or "—"
    o = ", ".join(note.orders) if note.orders else "—"
    a = note.assessment or (", ".join(note.diagnosis) if note.diagnosis else "—")
    p = "; ".join(note.plan) if note.plan else "—"
    f = note.follow_up or "—"
    soap = f"S: {s}\nO: {o}\nA: {a}\nP: {p}\nFollow-up: {f}\n"
    summary = f"Visit summary: {s}. Assessment: {a}. Plan: {p}. Follow-up: {f}."
    return soap, summary
'''
(SRC / "compose_note.py").write_text(compose_py, encoding="utf-8")

# reload
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))
from importlib import reload
import src.extract_clinical as _ec
reload(_ec)
from src.extract_clinical import extract_note
from src.compose_note import compose_note

print("✅ Patched extractor loaded.")

✅ Patched extractor loaded.


In [3]:
EVAL_PATH = ROOT / "eval" / "eval_transcripts.jsonl"
EVAL_PATH.parent.mkdir(exist_ok=True)

DEMO_EVAL = [
    {
        "text": "Fever and cough for 3 days. Mild shortness of breath. Likely CAP. "
                "Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days.",
        "gold": {
            "chief_complaint": "Fever and cough",
            "assessment": "Likely CAP",
            "diagnosis": [],
            "orders": ["chest X-ray", "azithromycin 500 mg daily x5"],
            "plan": ["azithromycin 500 mg daily x5"],
            "follow_up": "2 days"
        }
    },
    {
        "text": "Left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. "
                "X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN.",
        "gold": {
            "chief_complaint": "left ankle pain",
            "assessment": "Likely lateral ankle sprain",
            "diagnosis": [],
            "orders": ["X-ray ankle"],
            "plan": ["RICE", "ibuprofen 400 mg PRN"],
            "follow_up": ""
        }
    },
    {
        "text": "Dysuria and urinary frequency for 2 days. No fever or flank pain. "
                "Likely uncomplicated UTI. Urinalysis and nitrofurantoin 100 mg BID x5 days.",
        "gold": {
            "chief_complaint": "Dysuria",
            "assessment": "Likely uncomplicated UTI",
            "diagnosis": [],
            "orders": ["Urinalysis", "nitrofurantoin 100 mg BID x5 days"],
            "plan": ["nitrofurantoin 100 mg BID x5 days"],
            "follow_up": "2 days"
        }
    }
]

# Overwrite eval file
with EVAL_PATH.open("w", encoding="utf-8") as f:
    for row in DEMO_EVAL:
        f.write(json.dumps(row, ensure_ascii=False) + "\n")

print("✅ Eval file recreated at:", EVAL_PATH)

✅ Eval file recreated at: /Users/saturnine/DocScribe/eval/eval_transcripts.jsonl


In [4]:
WORD_NORM = re.compile(r"\s+")

def norm(s: str) -> str:
    return WORD_NORM.sub(" ", (s or "").strip().lower().rstrip("."))

def f1_for_lists(pred: List[str], gold: List[str], loose=True):
    P = [x for x in (pred or []) if x]
    G = [x for x in (gold or []) if x]
    if not P and not G: return (1,1,1)
    if not P: return (0,0,0)
    if not G: return (0,1,0)
    used = set(); tp=0
    for p in P:
        best, jbest = -1, -1
        for j,g in enumerate(G):
            if j in used: continue
            score = partial_ratio(p,g) if loose else (100 if norm(p)==norm(g) else 0)
            if score>best: best, jbest = score, j
        if best>=90: tp+=1; used.add(jbest)
    fp = len(P)-tp; fn = len(G)-tp
    prec = tp/(tp+fp) if tp+fp else 0
    rec = tp/(tp+fn) if tp+fn else 0
    f1 = 2*prec*rec/(prec+rec) if prec+rec else 0
    return round(prec,3), round(rec,3), round(f1,3)

In [5]:
rows = [json.loads(l) for l in EVAL_PATH.read_text().splitlines()]
results = []

for ex in rows:
    if "gold" not in ex:
        continue

    text = ex["text"]
    gold = ex["gold"]

    t0 = time.time()
    note, _ = extract_note(text)
    latency = round(time.time()-t0,2)
    pred = note.dict()

    cc_sim = partial_ratio(pred["chief_complaint"], gold["chief_complaint"])
    as_sim = partial_ratio(pred["assessment"], gold["assessment"])
    fu_sim = partial_ratio(pred["follow_up"], gold["follow_up"])
    diag_f = f1_for_lists(pred["diagnosis"], gold["diagnosis"])[2]
    ord_f  = f1_for_lists(pred["orders"], gold["orders"])[2]
    plan_f = f1_for_lists(pred["plan"], gold["plan"])[2]

    results.append({
        "cc": cc_sim, "as": as_sim, "fu": fu_sim,
        "diag": diag_f, "ord": ord_f, "plan": plan_f,
        "lat": latency
    })

print("✅ Completed evaluation on", len(results), "examples.")

✅ Completed evaluation on 3 examples.


In [6]:
def avg(xs): return round(sum(xs)/len(xs),3)

print("\n=== MEAN METRICS ===")
print("Chief Complaint:", avg([r["cc"] for r in results]))
print("Assessment     :", avg([r["as"] for r in results]))
print("Follow-up      :", avg([r["fu"] for r in results]))
print("Diagnosis F1   :", avg([r["diag"] for r in results]))
print("Orders F1      :", avg([r["ord"] for r in results]))
print("Plan F1        :", avg([r["plan"] for r in results]))
print("Latency (s)    :", avg([r["lat"] for r in results]))

print("\n=== PER-SAMPLE ===")
for i,r in enumerate(results,1):
    print(f"{i:02d} | CC:{r['cc']:>3}  A:{r['as']:>3}  FU:{r['fu']:>3}  ORD:{r['ord']:.2f}  PLAN:{r['plan']:.2f}  LAT:{r['lat']:.2f}")


=== MEAN METRICS ===
Chief Complaint: 97.778
Assessment     : 100.0
Follow-up      : 66.667
Diagnosis F1   : 0.0
Orders F1      : 0.722
Plan F1        : 0.5
Latency (s)    : 0.0

=== PER-SAMPLE ===
01 | CC:100.0  A:100.0  FU:100.0  ORD:1.00  PLAN:1.00  LAT:0.00
02 | CC:93.33333333333333  A:100.0  FU:100.0  ORD:0.67  PLAN:0.50  LAT:0.00
03 | CC:100.0  A:100.0  FU:0.0  ORD:0.50  PLAN:0.00  LAT:0.00


In [7]:
for i, ex in enumerate(rows,1):
    if "gold" not in ex: continue
    text = ex["text"]
    note,_ = extract_note(text)
    pred = note.dict()
    print(f"\n=== Case {i} ===")
    print("TRANSCRIPT:", text)
    print("PREDICTED JSON:")
    print(json.dumps(pred, indent=2))


=== Case 1 ===
TRANSCRIPT: Fever and cough for 3 days. Mild shortness of breath. Likely CAP. Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days.
PREDICTED JSON:
{
  "chief_complaint": "Fever and cough",
  "assessment": "Likely CAP",
  "diagnosis": [
    "Likely CAP"
  ],
  "orders": [
    "chest X-ray",
    "start azithromycin 500 mg daily x5"
  ],
  "plan": [
    "azithromycin 500 mg daily x5"
  ],
  "follow_up": "Follow up in 2 days"
}

=== Case 2 ===
TRANSCRIPT: Left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN.
PREDICTED JSON:
{
  "chief_complaint": "Left ankle pain after inversion injury yesterday.",
  "assessment": "Likely lateral ankle sprain",
  "diagnosis": [
    "Likely lateral ankle sprain"
  ],
  "orders": [
    "X-ray",
    "RICE and ibuprofen 400 mg"
  ],
  "plan": [
    "RICE",
    "start RICE and ibuprofen 400 mg"
  ],
  "follow_up": ""
}

=== Case 3 