# üß† DocScribe ‚Äî 02 ¬∑ Extractor Development (Public, No Hardcoding)

- Model: `google/flan-t5-large` (public)
- Prompt: schema-only (no clinical examples)
- Parsing: per-key boundary parsing + grounding (verbatim / relaxed match)
- Backoff: per-field micro-prompts (extractive) + minimal generic regex salvage
- Routing: imaging/labs ‚Üí Orders; dosed meds ‚Üí Plan (mirrored to Orders for demo)
- Fixes: verb stripping for both targets, PRN not treated as follow-up, robust split with relaxed containment

In [1]:
from pathlib import Path
import sys

# If this notebook lives in <repo>/notebooks/, ROOT is the repo root.
NB_DIR = Path.cwd()
if NB_DIR.name.lower() != "notebooks":
    # fallback: look upward for a "notebooks" folder
    probe = NB_DIR
    for _ in range(4):
        if (probe / "notebooks").exists():
            break
        probe = probe.parent
    ROOT = probe
else:
    ROOT = NB_DIR.parent

SRC = ROOT / "src"
SRC.mkdir(exist_ok=True)

# Make 'src' importable in *this* kernel too (not required for 03, but handy)
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

print("ROOT:", ROOT)
print("SRC :", SRC)

ROOT: /Users/saturnine/DocScribe
SRC : /Users/saturnine/DocScribe/src


In [2]:
import os, re, json, time, torch
from typing import Dict, Any, List, Tuple
from pydantic import BaseModel, Field, ValidationError
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

torch.manual_seed(42)
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")

DEVICE = 0 if torch.cuda.is_available() else -1
MODEL_NAME = os.environ.get("DOCSCRIBE_MODEL", "google/flan-t5-large")

print("‚úÖ Device:", "GPU" if DEVICE >= 0 else "CPU")
print("üß© Model:", MODEL_NAME)

# Deterministic gen; for faster CPU demos set num_beams=1, max_new_tokens=320
GEN_KW = dict(
    do_sample=False,
    num_beams=4,
    temperature=0.0,
    max_new_tokens=420,
    early_stopping=True,
)

print("üîÑ Loading model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME)
t5 = pipeline("text2text-generation", model=model, tokenizer=tokenizer, device=DEVICE)
print("‚úÖ Model ready.")

‚úÖ Device: CPU
üß© Model: google/flan-t5-large
üîÑ Loading model...




‚úÖ Model ready.


In [3]:
class ClinicalNote(BaseModel):
    chief_complaint: str = Field(default="")
    assessment: str = Field(default="")
    diagnosis: List[str] = Field(default_factory=list)
    orders: List[str] = Field(default_factory=list)
    plan: List[str] = Field(default_factory=list)
    follow_up: str = Field(default="")

    def pretty(self) -> str:
        return self.json(indent=2, ensure_ascii=False, exclude_none=True)

def compose_soap(note: ClinicalNote) -> str:
    s = note.chief_complaint or "‚Äî"
    o = ", ".join(note.orders) if note.orders else "‚Äî"
    a = note.assessment or (", ".join(note.diagnosis) if note.diagnosis else "‚Äî")
    p = "; ".join(note.plan) if note.plan else "‚Äî"
    f = note.follow_up or "‚Äî"
    return f"S: {s}\nO: {o}\nA: {a}\nP: {p}\nFollow-up: {f}\n"

In [4]:
FEWSHOT = """You are a documentation assistant.

Return ONE valid JSON object ONLY. Start with '{' and end with '}'.
Use EXACTLY these keys and types:
- "chief_complaint": string
- "assessment": string
- "diagnosis": array of strings
- "orders": array of strings
- "plan": array of strings
- "follow_up": string

STRICT RULES:
- Derive content ONLY from the TRANSCRIPT text.
- Every value MUST be a verbatim substring of the TRANSCRIPT (case-insensitive allowed).
- If a value is not present, leave it "" (for strings) or [] (for arrays).
- Do NOT add any text before or after the JSON.

TRANSCRIPT:
{transcript}

JSON:
"""
print("üìã Prompt ready.")

üìã Prompt ready.


In [5]:
KEYS_ORDER = ["chief_complaint","assessment","diagnosis","orders","plan","follow_up"]
KEYS_SET   = set(KEYS_ORDER)
KEY_START_RE = re.compile(r'(?:"?(chief_complaint|assessment|diagnosis|orders|plan|follow_up)"?\s*:)', re.I)

def _find_key_spans(txt: str) -> Dict[str, slice]:
    spans, positions = {}, []
    for m in KEY_START_RE.finditer(txt):
        k = m.group(1).lower()
        positions.append((k, m.start(), m.end()))
    for i, (k, s, e) in enumerate(positions):
        nxt = positions[i+1][1] if i+1 < len(positions) else len(txt)
        spans[k] = slice(e, nxt)
    return spans

def _grab_string_val(chunk: str) -> str:
    m = re.search(r'"\s*([^"]*?)\s*"', chunk)  # "value"
    if m: return m.group(1).strip()
    m = re.search(r':\s*([^,\]\}]+)', chunk)    # : value
    return m.group(1).strip() if m else ""

def _grab_list_val(chunk: str) -> List[str]:
    m = re.search(r'\[\s*([^\]]*?)\s*\]', chunk)
    inside = m.group(1) if m else chunk
    items = re.findall(r'"([^"]+)"', inside) or [x.strip() for x in re.split(r'[;,]', inside) if x.strip()]
    cleaned, seen = [], set()
    for it in items:
        it = it.strip()
        if not it or it.lower() in KEYS_SET or len(it) <= 1:
            continue
        low = it.lower()
        if low not in seen:
            seen.add(low); cleaned.append(it)
    return cleaned

def parse_fields_with_boundaries(raw_txt: str) -> Dict[str, Any]:
    t = (raw_txt or "").replace("‚Äú", '"').replace("‚Äù", '"').replace("‚Äô", "'")
    t = re.sub(r"\s+", " ", t).strip()

    # Try JSON first
    mjson = re.search(r"\{[\s\S]*\}", t)
    if mjson:
        block = mjson.group(0)
        try:
            data = json.loads(block)
            data = {k: v for k, v in data.items() if k in KEYS_SET}
            for k in KEYS_ORDER:
                data.setdefault(k, [] if k in ("diagnosis","orders","plan") else "")
            return data
        except Exception:
            pass

    # Boundary parse
    spans = _find_key_spans(t)
    data: Dict[str, Any] = {k: ([] if k in ("diagnosis","orders","plan") else "") for k in KEYS_ORDER}
    for k in KEYS_ORDER:
        if k not in spans:
            continue
        chunk = t[spans[k]]
        data[k] = _grab_list_val(chunk) if k in ("diagnosis","orders","plan") else _grab_string_val(chunk)
    return data

In [6]:
_LEAD_VERBS = re.compile(r"^\s*(?:start|begin|initiate|recommend|advise|continue|order|obtain|get|perform|schedule)\s+", re.IGNORECASE)
_DETERMINERS = re.compile(r"^\s*(?:to|the|a|an)\s+", re.IGNORECASE)

def _canonical(s: str) -> str:
    if not s: 
        return ""
    x = s.strip().rstrip(".")
    x = _LEAD_VERBS.sub("", x)
    x = _DETERMINERS.sub("", x)
    x = re.sub(r"\s+", " ", x)
    x = re.sub(r"\bx(\d+)\s*day\b", r"x\1 days", x, flags=re.IGNORECASE)  # day‚Üídays
    return x.lower()

def _loose_contains(transcript: str, phrase: str) -> bool:
    if not phrase:
        return False
    t_raw = (transcript or "").lower()
    p_raw = phrase.strip().lower().rstrip(".")
    if p_raw and p_raw in t_raw:
        return True
    # canonical forms
    t_can = _canonical(transcript)
    p_can = _canonical(phrase)
    if p_can and p_can in t_can:
        return True
    # day -> days tweak
    p_alt = re.sub(r"\bx(\d+)\s*day\b", r"x\1 days", p_raw)
    return p_alt in t_raw

def _ground_to_transcript(data: Dict[str, Any], transcript: str) -> Dict[str, Any]:
    out = {}
    for k in KEYS_ORDER:
        v = data.get(k, [] if k in ("diagnosis","orders","plan") else "")
        if isinstance(v, list):
            kept, seen = [], set()
            for s in v:
                s2 = s.strip().rstrip(".")
                key = _canonical(s2)
                if s2 and key and key not in seen and _loose_contains(transcript, s2):
                    seen.add(key); kept.append(s2)
            out[k] = kept
        else:
            s2 = (v or "").strip().rstrip(".")
            out[k] = s2 if s2 and _loose_contains(transcript, s2) else ""
    return out

In [7]:
FIELD_PROMPTS = {
    "chief_complaint": (
        "From the TRANSCRIPT, return the chief complaint as a verbatim substring.\n"
        "Return ONLY the phrase, no quotes, no extra text. If none, return nothing.\n\n"
        "TRANSCRIPT:\n{transcript}\n\nPHRASE:"
    ),
    "assessment": (
        "From the TRANSCRIPT, return the assessment/impression as a verbatim substring.\n"
        "Return ONLY the phrase, no quotes, no extra text. If none, return nothing.\n\n"
        "TRANSCRIPT:\n{transcript}\n\nPHRASE:"
    ),
    "follow_up": (
        "From the TRANSCRIPT, return ONLY the follow-up timing as a verbatim substring "
        "(e.g., '2 days', '1 week', 'return if worsening'). Do not include medications or 'PRN'. "
        "Return ONLY the phrase, no quotes, no extra text. If none, return nothing.\n\n"
        "TRANSCRIPT:\n{transcript}\n\nPHRASE:"
    ),
    "diagnosis": (
        "From the TRANSCRIPT, list diagnoses as a JSON array of verbatim substrings.\n"
        "Return ONLY the JSON array (e.g., [\"...\"]). If none, return [].\n\n"
        "TRANSCRIPT:\n{transcript}\n\nARRAY:"
    ),
    "orders": (
        "From the TRANSCRIPT, extract tests/procedures/medications that are explicitly ordered "
        "as a JSON array of verbatim substrings (minimal phrases only, e.g., 'chest X-ray', "
        "'azithromycin 500 mg daily x5'). If multiple are in one sentence, split into separate items. "
        "Return ONLY the JSON array. If none, return [].\n\nTRANSCRIPT:\n{transcript}\n\nARRAY:"
    ),
    "plan": (
        "From the TRANSCRIPT, extract planned interventions/instructions as a JSON array of verbatim substrings "
        "(minimal phrases only, e.g., 'RICE', 'ibuprofen 400 mg PRN'). If multiple are in one sentence, split into "
        "separate items. Return ONLY the JSON array. If none, return [].\n\nTRANSCRIPT:\n{transcript}\n\nARRAY:"
    ),
}

def _gen_text(prompt: str) -> str:
    return t5(prompt, **GEN_KW)[0]["generated_text"].strip()

def _parse_array(s: str) -> List[str]:
    m = re.search(r"\[[\s\S]*\]", s)
    if m:
        try:
            arr = json.loads(m.group(0))
            if isinstance(arr, list):
                return [x for x in arr if isinstance(x, str)]
        except Exception:
            pass
    items = re.findall(r'"([^"]+)"', s)
    return items or [x.strip() for x in re.split(r"[;,]", s) if x.strip()]

In [8]:
def _split_conjunctions(items: List[str], transcript: str) -> List[str]:
    parts: List[str] = []
    for it in items:
        s = it.strip().rstrip(".")
        chunks = re.split(r"\b(?:and|then|,|;)\b", s, flags=re.IGNORECASE)
        for c in chunks:
            c2 = c.strip().strip(",;.")
            if c2 and _loose_contains(transcript, c2):
                parts.append(c2)
    seen, out = set(), []
    for p in parts:
        key = _canonical(p)
        if key and key not in seen:
            seen.add(key)
            out.append(p.strip())
    return out or items

In [9]:
_TIME_RE = re.compile(
    r"\b(?:(?:in\s+)?\d+\s*(?:day|days|week|weeks|wk|wks|month|months)|"
    r"\d+-\d+\s*(?:days|weeks)|"
    r"(?:return if worse|return if worsening))\b",
    re.IGNORECASE
)
def _extract_time_phrase(text: str) -> str:
    if not text:
        return ""
    m = _TIME_RE.search(text)
    return (m.group(0).strip() if m else "").rstrip(".")

In [10]:
_DOSAGE_PHRASE_RE = re.compile(
    r"\b([A-Za-z][A-Za-z\-]*(?:\s[A-Za-z][A-Za-z\-]*)*\s+"
    r"(?:\d+\s*(?:mg|mcg|g|ml|units)\b(?:\s*(?:daily|q\d+h|BID|TID|QID|PRN))?"
    r"(?:\s*x\d+\s*(?:day|days|week|weeks)?)?))",
    re.IGNORECASE
)
def _extract_dosage_phrases(text: str) -> List[str]:
    return [m.group(1).strip().rstrip(".") for m in _DOSAGE_PHRASE_RE.finditer(text or "")]

_RULE_OUT_RE = re.compile(r"([^\.]*?)\s+to\s+rule\s+out\b", re.IGNORECASE)
def _extract_before_rule_out(text: str) -> List[str]:
    out = []
    for m in _RULE_OUT_RE.finditer(text or ""):
        lhs = m.group(1).strip().rstrip(".")
        chunks = re.split(r"\b(?:and|then|,|;)\b", lhs, flags=re.IGNORECASE)
        out.extend([c.strip().strip(",;.") for c in chunks if c.strip()])
    seen, dedup = set(), []
    for x in out:
        lx = _canonical(x)
        if lx not in seen:
            seen.add(lx); dedup.append(x)
    return dedup

In [11]:
ACTION_PATTERNS = [
    ("orders", r"\b(order|obtain|get|perform|schedule)\b\s+([^\.]+)"),
    ("plan",   r"\b(start|begin|initiate|recommend|advise|continue)\b\s+([^\.]+)"),
]
def _derive_actions_from_transcript(transcript: str) -> Dict[str, List[str]]:
    text = re.sub(r"\s+", " ", transcript or "").strip()
    derived = {"orders": [], "plan": []}

    # 1) Verb-led extraction
    for target, pat in ACTION_PATTERNS:
        for m in re.finditer(pat, text, flags=re.IGNORECASE):
            segment = m.group(2)
            chunks = re.split(r"\b(?:and|then|,|;)\b", segment, flags=re.IGNORECASE)
            for c in chunks:
                c2 = re.sub(r"^\s*(to\s+)", "", c, flags=re.IGNORECASE).strip().rstrip(".")
                if c2:
                    derived[target].append(c2)

    # 2) Dosage phrases ‚Üí Plan candidates
    for phr in _extract_dosage_phrases(text):
        derived["plan"].append(phr)

    # 3) Before "to rule out ..." ‚Üí Orders
    for lhs in _extract_before_rule_out(text):
        derived["orders"].append(lhs)

    # De-dup canonical
    for k in derived:
        seen, out = set(), []
        for it in derived[k]:
            key = _canonical(it)
            if key and it:
                if key not in seen:
                    seen.add(key); out.append(it.strip())
        derived[k] = out
    return derived

In [12]:
ORDER_VERBS = r"(?:order|obtain|get|perform|schedule)"
PLAN_VERBS  = r"(?:start|begin|initiate|recommend|advise|continue)"

def _clip_action_core(s: str, target: str) -> str:
    # NEW: strip any action verb first (applies to both targets)
    txt = _LEAD_VERBS.sub("", s.strip().rstrip("."))  # <‚Äî key change
    if target == "orders":
        m = re.search(rf"\b{ORDER_VERBS}\b\s+(.*)$", txt, flags=re.IGNORECASE)
        if m:
            return m.group(1).strip().rstrip(".")
    elif target == "plan":
        m = re.search(rf"\b{PLAN_VERBS}\b\s+(.*)$", txt, flags=re.IGNORECASE)
        if m:
            return m.group(1).strip().rstrip(".")
    parts = [p.strip() for p in re.split(r"[.]", txt) if p.strip()]
    return parts[-1] if parts else txt

def _is_dosage_like(s: str) -> bool:
    return bool(_DOSAGE_PHRASE_RE.search(s))

def _keep_minimal(s: str) -> bool:
    n_words = len(s.split())
    return n_words <= 12 or _is_dosage_like(s)

In [13]:
HEURISTICS = {
    "imaging": {"x-ray", "xray", "ct", "mri", "ultrasound", "ekg", "ecg", "echo"},
    "labs": {"cbc", "cmp", "a1c", "bmp", "urinalysis", "culture", "strep test"},
}
MIRROR_MEDS_TO_ORDERS = True  # demo: meds appear in Orders & Plan

def _looks_like_imaging_or_lab(s: str) -> bool:
    w = s.lower()
    return any(tok in w for tok in (HEURISTICS["imaging"] | HEURISTICS["labs"]))

def _route_items(orders: List[str], plan: List[str]) -> Tuple[List[str], List[str]]:
    o2, p2 = [], []
    for it in orders:
        s = it.strip().rstrip(".")
        if not s: continue
        if _looks_like_imaging_or_lab(s):
            o2.append(s)
        elif _is_dosage_like(s):
            if MIRROR_MEDS_TO_ORDERS: o2.append(s)
            p2.append(s)
        else:
            o2.append(s)
    for it in plan:
        s = it.strip().rstrip(".")
        if not s: continue
        if _looks_like_imaging_or_lab(s):
            o2.append(s)
        elif _is_dosage_like(s):
            if MIRROR_MEDS_TO_ORDERS: o2.append(s)
            p2.append(s)
        else:
            p2.append(s)

    def dedup(xs: List[str]) -> List[str]:
        seen, out = set(), []
        for x in xs:
            key = _canonical(x)
            if key and key not in seen:
                seen.add(key); out.append(x.strip().rstrip("."))
        return out

    return dedup(o2), dedup(p2)

In [14]:
def _merge_unique(dst: List[str], src: List[str]) -> List[str]:
    seen = {_canonical(x) for x in dst if x}
    out = [d.strip().rstrip(".") for d in dst if d and _canonical(d)]
    for s in src:
        t = s.strip().rstrip(".")
        key = _canonical(t)
        if t and key and key not in seen:
            seen.add(key)
            out.append(t)
    return out

In [15]:
def _refine_empty_fields(transcript: str, data: Dict[str, Any]) -> Dict[str, Any]:
    filled = dict(data)

    # Strings
    for k in ["chief_complaint", "assessment"]:
        if not filled.get(k):
            val = _gen_text(FIELD_PROMPTS[k].format(transcript=transcript)).strip()
            filled[k] = val

    # Follow-up normalize (PRN not captured)
    fu = filled.get("follow_up", "")
    if not fu:
        fu = _gen_text(FIELD_PROMPTS["follow_up"].format(transcript=transcript)).strip()
    filled["follow_up"] = _extract_time_phrase(fu)

    # Arrays
    for k in ["diagnosis", "orders", "plan"]:
        arr = filled.get(k, [])
        if not arr:
            raw = _gen_text(FIELD_PROMPTS[k].format(transcript=transcript))
            arr = _parse_array(raw)

        arr = _split_conjunctions(arr, transcript)

        if k in ("orders", "plan"):
            arr = [_clip_action_core(x, k) for x in arr]

        seen, clean = set(), []
        for it in arr:
            s = it.strip().rstrip(".")
            if not s:
                continue
            if k in ("orders","plan") and not _keep_minimal(s):
                continue
            key = _canonical(s)
            if key and key not in seen:
                seen.add(key); clean.append(s)
        filled[k] = clean

    # Always derive & merge
    derived = _derive_actions_from_transcript(transcript)
    filled["orders"] = _merge_unique(filled.get("orders", []), derived.get("orders", []))
    filled["plan"]   = _merge_unique(filled.get("plan",   []), derived.get("plan",   []))

    # Prune Plan noise
    pruned_plan = []
    for s in filled.get("plan", []):
        s2 = s.strip()
        if s2.count(".") > 0:
            continue
        if re.search(r"\blikely\b|\border\b", s2, re.IGNORECASE):
            continue
        pruned_plan.append(s2)
    filled["plan"] = pruned_plan

    # Canonical routing
    filled["orders"], filled["plan"] = _route_items(filled.get("orders", []), filled.get("plan", []))

    # Ground to transcript
    return _ground_to_transcript(filled, transcript)

In [16]:
def _raw_output_is_bad_list(raw: str, transcript: str) -> bool:
    s = (raw or "").strip()
    if s.startswith("[") and s.endswith("]") and len(s) < 4000:
        inner = re.sub(r'^\[\s*"?|\s*"?\]$', "", s).strip()
        return len(inner) >= 20 and inner.lower() in (transcript or "").lower()
    return False

In [17]:
def extract_note(transcript: str, gen_kwargs: Dict[str, Any] = GEN_KW) -> Tuple[ClinicalNote, str]:
    # Pass A ‚Äî schema-only prompt
    prompt = FEWSHOT.replace("{transcript}", transcript.strip())
    result = t5(prompt, **gen_kwargs)[0]
    raw = result["generated_text"]

    # If raw is "bad list", force empty so backoff fully runs
    if _raw_output_is_bad_list(raw, transcript):
        data = {k: ([] if k in ("diagnosis","orders","plan") else "") for k in KEYS_ORDER}
    else:
        data = _ground_to_transcript(parse_fields_with_boundaries(raw), transcript)

    # Pass B ‚Äî refine + salvage + routing + grounding
    data = _refine_empty_fields(transcript, data)

    # Validate & clean
    try:
        note = ClinicalNote(**data)
    except ValidationError:
        note = ClinicalNote()
    for k in ["diagnosis", "orders", "plan"]:
        arr = getattr(note, k)
        setattr(note, k, [x.strip() for x in arr if x and x.strip()])

    return note, raw

In [18]:
from pathlib import Path
import textwrap

# 1) Make src a package
(SRC / "__init__.py").write_text("", encoding="utf-8")

# 2) Write extract_clinical.py (PUT YOUR REAL IMPLEMENTATION HERE)
#    - Replace the placeholder '... your code here ...' with your working extractor
extract_code = textwrap.dedent("""
    import re, json
    from typing import Dict, Any, List, Tuple

    # -------------------------
    # Your real extractor pieces
    # -------------------------
    # - model/pipeline init (FLAN / Clinical-T5 fallback / etc.)
    # - coerce_json, parse helpers
    # - extract_note(text) -> (note_dict, raw_model_output)
    # Make sure extract_note RETURNS a dict with keys:
    #  chief_complaint, assessment, diagnosis(list), orders(list), plan(list), follow_up
    #
    # Below is a very small placeholder you should REPLACE with your actual extractor from 02.

    def extract_note(transcript: str) -> Tuple[Dict[str, Any], str]:
        t = (transcript or "").strip()
        # TODO: replace this block with your *real* model-backed extractor
        note = {
            "chief_complaint": "",
            "assessment": "",
            "diagnosis": [],
            "orders": [],
            "plan": [],
            "follow_up": ""
        }
        raw = t
        return note, raw
""").strip() + "\n"
(SRC / "extract_clinical.py").write_text(extract_code, encoding="utf-8")

# 3) Write compose_note.py (use your working composer)
compose_code = textwrap.dedent("""
    from typing import Tuple, Dict, Any

    def _to_dict(note) -> Dict[str, Any]:
        if isinstance(note, dict):
            return note
        if hasattr(note, "dict"):
            return note.dict()
        fields = ["chief_complaint","assessment","diagnosis","orders","plan","follow_up"]
        return {k: getattr(note, k, "" if k in ("chief_complaint","assessment","follow_up") else []) for k in fields}

    def compose_note(note) -> Tuple[str, str]:
        data = _to_dict(note)
        s = data.get("chief_complaint") or "‚Äî"
        o = ", ".join(data.get("orders") or []) or "‚Äî"
        a = data.get("assessment") or (", ".join(data.get("diagnosis") or []) or "‚Äî")
        p = "; ".join(data.get("plan") or []) or "‚Äî"
        f = data.get("follow_up") or "‚Äî"
        soap = f"S: {s}\\nO: {o}\\nA: {a}\\nP: {p}\\nFollow-up: {f}"
        summary = f"Visit summary: {s}. Assessment: {a}. Plan: {p}. Follow-up: {f}."
        return soap, summary
""").strip() + "\n"
(SRC / "compose_note.py").write_text(compose_code, encoding="utf-8")

print("‚úÖ Wrote src/extract_clinical.py and src/compose_note.py")

‚úÖ Wrote src/extract_clinical.py and src/compose_note.py


In [19]:
_test = "Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days."
arr = [_test]
arr = _split_conjunctions(arr, _test)
arr = [_clip_action_core(x, "orders") for x in arr]
arr = [x for x in arr if _keep_minimal(x)]
print("SPLIT/CLIP/MIN:", arr)          # expect: ['chest X-ray', 'azithromycin 500 mg daily x5']

derived = _derive_actions_from_transcript(_test)
print("DERIVED:", derived)             # orders includes 'chest X-ray'; plan includes the dosage

SPLIT/CLIP/MIN: ['chest X-ray', 'Follow up in 2 days']
DERIVED: {'orders': ['chest X-ray', 'start azithromycin 500 mg daily x5'], 'plan': ['azithromycin 500 mg daily x5', 'Order chest X-ray and start azithromycin 500 mg daily x5']}


In [20]:
demos = [
    "Fever and cough for 3 days. Mild shortness of breath. Likely CAP. "
    "Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days.",

    "Left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. "
    "X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN.",

    "Dysuria and urinary frequency for 2 days. No fever or flank pain. "
    "Likely uncomplicated UTI. Urinalysis and nitrofurantoin 100 mg BID x5 days."
]

for i, demo in enumerate(demos, 1):
    print("="*80)
    print(f"ü©∫ DEMO {i}\nTRANSCRIPT:", demo)
    t0 = time.time()
    note, raw = extract_note(demo)
    dt = round(time.time()-t0, 2)

    print(f"\n‚è± Latency: {dt} s")
    print("\nüìã JSON:\n", note.pretty())
    print("\nüßæ SOAP:\n", compose_soap(note))
    print("\n=== RAW MODEL OUTPUT ===\n", raw)

ü©∫ DEMO 1
TRANSCRIPT: Fever and cough for 3 days. Mild shortness of breath. Likely CAP. Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days.





‚è± Latency: 30.08 s

üìã JSON:
 {
  "chief_complaint": "Fever and cough",
  "assessment": "Likely CAP",
  "diagnosis": [],
  "orders": [
    "chest X-ray",
    "start azithromycin 500 mg daily x5"
  ],
  "plan": [
    "start azithromycin 500 mg daily x5"
  ],
  "follow_up": "2 days"
}

üßæ SOAP:
 S: Fever and cough
O: chest X-ray, start azithromycin 500 mg daily x5
A: Likely CAP
P: start azithromycin 500 mg daily x5
Follow-up: 2 days


=== RAW MODEL OUTPUT ===
 ["fever and cough for 3 days. Mild shortness of breath. Likely CAP. Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days."]
ü©∫ DEMO 2
TRANSCRIPT: Left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN.

‚è± Latency: 26.7 s

üìã JSON:
 {
  "chief_complaint": "left ankle pain",
  "assessment": "Likely lateral ankle sprain",
  "diagnosis": [],
  "orders": [
    "X-ray ankle",
    "RICE and ibuprofen 400 mg PRN"
  

In [21]:
print("\n\n================ FINAL RAW OUTPUT INSPECTION ================\n")
for i, txt in enumerate(demos, 1):
    prompt = FEWSHOT.replace("{transcript}", txt.strip())
    raw_out = t5(prompt, **GEN_KW)[0]["generated_text"]
    print("="*80)
    print(f"ü©∫ RAW OUTPUT TEST {i}")
    print("TRANSCRIPT:", txt)
    print("\n=== RAW MODEL OUTPUT ===\n", raw_out)




ü©∫ RAW OUTPUT TEST 1
TRANSCRIPT: Fever and cough for 3 days. Mild shortness of breath. Likely CAP. Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days.

=== RAW MODEL OUTPUT ===
 ["fever and cough for 3 days. Mild shortness of breath. Likely CAP. Order chest X-ray and start azithromycin 500 mg daily x5. Follow up in 2 days."]
ü©∫ RAW OUTPUT TEST 2
TRANSCRIPT: Left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN.

=== RAW MODEL OUTPUT ===
 ["left ankle pain after inversion injury yesterday. Likely lateral ankle sprain. X-ray ankle to rule out fracture. RICE and ibuprofen 400 mg PRN."]
ü©∫ RAW OUTPUT TEST 3
TRANSCRIPT: Dysuria and urinary frequency for 2 days. No fever or flank pain. Likely uncomplicated UTI. Urinalysis and nitrofurantoin 100 mg BID x5 days.

=== RAW MODEL OUTPUT ===
 ["dysuria and urinary frequency for 2 days."]


In [22]:
# This tests that your files can be imported *from disk*
import importlib, sys
for mod in ["src.extract_clinical", "src.compose_note"]:
    if mod in sys.modules: del sys.modules[mod]

ec = importlib.import_module("src.extract_clinical")
cn = importlib.import_module("src.compose_note")

assert hasattr(ec, "extract_note"), "extract_note is missing in src/extract_clinical.py"
assert hasattr(cn, "compose_note"), "compose_note is missing in src/compose_note.py"

note, raw = ec.extract_note("Fever and cough for 3 days. Follow up in 2 days.")
soap, summary = cn.compose_note(note)
print("Import OK. Example SOAP:\n", soap)

Import OK. Example SOAP:
 S: ‚Äî
O: ‚Äî
A: ‚Äî
P: ‚Äî
Follow-up: ‚Äî
