# Triage rules baseline

Prototype notebook for rule-based clinical NLP triage.

- Loads `data/lexicon_redflags.csv`
- Loads `data/notes_synthetic.csv`
- Applies weighted rule-based scoring (lexicon `weight`)
- Handles simple negation and history/past context
- Exports `outputs/predictions.csv`
- Evaluates predictions vs. ground-truth `label`


In [None]:
import re
import pandas as pd

lexicon = pd.read_csv("../data/lexicon_redflags.csv")
notes = pd.read_csv("../data/notes_synthetic.csv")

# Guardrails
required_lex_cols = {"term", "weight"}
missing_lex = required_lex_cols - set(lexicon.columns)
if missing_lex:
    raise ValueError(f"lexicon_redflags.csv missing columns: {sorted(missing_lex)}")

required_notes_cols = {"id", "text", "entity"}
missing_notes = required_notes_cols - set(notes.columns)
if missing_notes:
    raise ValueError(f"notes_synthetic.csv missing columns: {sorted(missing_notes)}")

# Normalize lexicon
lexicon["term"] = lexicon["term"].astype(str).str.strip().str.lower()
lexicon["weight"] = pd.to_numeric(lexicon["weight"], errors="coerce").fillna(0)

# Keep non-empty terms
lexicon = lexicon.loc[lexicon["term"].astype(bool)].copy()

# Terms with weights
term_weights = dict(zip(lexicon["term"].tolist(), lexicon["weight"].tolist()))
terms = list(term_weights.keys())

# --- Context handling (baseline, intentionally simple) ---
NEGATION_CUES = {
    "no", "not", "denies", "deny", "denied", "without", "none", "negative for",
    "sin", "niega", "negativo", "ausencia de"
}

HISTORY_CUES = {
    "history of", "hx of", "h/o", "previous", "prior", "years ago", "months ago",
    "antecedent", "antecedentes", "historia", "hace años", "hace meses", "previo"
}

def _tokenize(s: str):
    return re.findall(r"[a-zA-Záéíóúñü]+", str(s).lower())

def _has_history_context(text: str, window_text: str) -> bool:
    t = str(text).lower()
    w = str(window_text).lower()
    return any(cue in t for cue in HISTORY_CUES) or any(cue in w for cue in HISTORY_CUES)

def _is_negated_near(text: str, term: str, window_tokens: int = 6) -> bool:
    """Return True if a negation cue appears within a token window before the term occurrence."""
    s = str(text).lower()

    # Try to find the first occurrence (baseline)
    idx = s.find(term)
    if idx == -1:
        return False

    # Take a left window around the match
    left = s[max(0, idx - 120):idx]  # char window
    toks = _tokenize(left)
    tail = toks[-window_tokens:] if len(toks) >= window_tokens else toks
    tail_str = " ".join(tail)

    # Direct token cues
    if any(cue in tail for cue in NEGATION_CUES):
        return True
    # Phrase cues
    if any(cue in tail_str for cue in NEGATION_CUES):
        return True
    return False

def score_text(text: str):
    """Weighted score from lexicon terms; drops negated matches; downweights history/past."""
    s = str(text).lower()
    score = 0.0
    matched = []

    for term in terms:
        if not term:
            continue
        if term in s:
            # Negation gate
            if _is_negated_near(s, term):
                continue

            w = float(term_weights.get(term, 0.0))

            # History/past downweight (baseline heuristic)
            # If the whole note or local region suggests past history, penalize by 50%.
            idx = s.find(term)
            local = s[max(0, idx - 80): idx + len(term) + 80]
            if _has_history_context(s, local):
                w *= 0.5

            score += w
            matched.append((term, w))

    return score, matched

def predict_label_from_score(score: float) -> str:
    # Baseline thresholds (tunable):
    # - high if >= 8 (typically >=2 strong red flags)
    # - intermediate if 4..7.5
    # - low otherwise
    if score >= 8:
        return "high"
    if score >= 4:
        return "intermediate"
    return "low"

# Apply
scored = notes["text"].apply(score_text)
notes["score"] = scored.apply(lambda x: x[0])
notes["matched_terms"] = scored.apply(lambda x: x[1])
notes["predicted_label"] = notes["score"].apply(predict_label_from_score)

# Export (include score to debug)
notes[["id", "text", "entity", "score", "predicted_label"]].to_csv(
    "../outputs/predictions.csv", index=False
)

print("Saved outputs/predictions.csv")
print("\nScore distribution:")
print(notes["score"].describe().to_string())
print("\nLabel distribution:")
print(notes["predicted_label"].value_counts(dropna=False).to_string())


## Quick evaluation (baseline)

This evaluates the weighted baseline against the synthetic ground truth `label`.

- Overall accuracy
- Confusion matrix (all entities)
- Accuracy by entity
- A few mismatches to inspect


In [None]:
# Guardrails: only run evaluation if ground-truth label exists
if "label" not in notes.columns:
    raise ValueError("notes_synthetic.csv is missing required column: 'label'")

# Overall accuracy
acc = (notes["predicted_label"] == notes["label"]).mean()
print(f"Overall accuracy: {acc:.3f} ({int((notes['predicted_label'] == notes['label']).sum())}/{len(notes)})")

# Confusion matrix
print("\nConfusion matrix (label x predicted_label):")
cm = pd.crosstab(
    notes["label"],
    notes["predicted_label"],
    rownames=["label"],
    colnames=["predicted_label"],
    dropna=False,
)
print(cm.to_string())

# Accuracy by entity
print("\nAccuracy by entity:")
by_entity = notes.groupby("entity").apply(lambda df: (df["predicted_label"] == df["label"]).mean())
by_entity = by_entity.sort_values(ascending=False)
print(by_entity.to_string())

# Show a few mismatches for inspection
mismatches = notes.loc[
    notes["predicted_label"] != notes["label"],
    ["id", "entity", "text", "label", "predicted_label", "score", "matched_terms"],
]
print("\nSample mismatches (first 15):")
print(mismatches.head(15).to_string(index=False))
