### Claims Normalization & Resubmission Pipeline — Project Overview

We will design and build a robust data engineering pipeline that ingests insurance-claim records from multiple, differently formatted EMR systems and converts them into a single, auditable source of truth. The pipeline will standardize schema and semantics (dates, null handling, casing, and a `source_system` provenance tag), detect and classify denial reasons using deterministic rules augmented by a controlled inferential model for ambiguous cases, and apply clear business rules to identify claims that are safe to resubmit automatically.

The result will be a production-ready workflow that minimizes manual review, preserves a full audit trail, and delivers downstream-ready artifacts: an exported list of resubmission candidates, per-run metrics for monitoring, and an append-only rejection log for exceptions. This solution emphasizes repeatability, traceability, and operational safety so stakeholders can confidently recover eligible revenue while retaining oversight.
ly.


### Imports & data ingest

We will load the necessary libraries, read the two EMR source files into dataframes, tag each row with its `source_system`, show a quick preview of the data, and print counts of missing values so we can spot data quality issues up front; doing this gives us confidence the inputs are readable and traceable before we normalize, classify, and generate resubmission candidates.


In [4]:
# Import Libraries
import pandas as pd
import numpy as np
import os, json, re
import json, logging, uuid
from datetime import datetime
from IPython.display import display
import tempfile
import shutil
import logging
from pathlib import Path
from typing import Optional, Dict, Any, Callable

In [6]:
# Read Data
df_alpha = pd.read_csv("data/emr_alpha.csv")
df_beta = pd.read_json("data/emr_beta.json")

In [8]:
# View data
display(df_alpha) 
display(df_beta) 

Unnamed: 0,claim_id,patient_id,procedure_code,denial_reason,submitted_at,status
0,A123,P001,99213,Missing modifier,2025 -07-01,denied
1,A124,P002,99214,Incorrect NPI,2025 -07-10,denied
2,A125,,99215,Authorization expired,2025 -07-05,denied
3,A126,P003,99381,,2025 -07-15,approved
4,A127,P004,99401,Prior auth required,2025 -07-20,denied


Unnamed: 0,id,member,code,error_msg,date,status
0,B987,P010,99213,Incorrect provider type,2025-07-03,denied
1,B988,P011,99214,Missing modifier,2025-07-09,denied
2,B989,P012,99215,,2025-07-10,approved
3,B990,,99401,incorrect procedure,2025-07-01,denied


In [10]:
# Add source_system column
df_alpha["source_system"] = "alpha"
df_beta["source_system"]  = "beta"

# verify source_system columns
display(df_alpha) 
display(df_beta) 

Unnamed: 0,claim_id,patient_id,procedure_code,denial_reason,submitted_at,status,source_system
0,A123,P001,99213,Missing modifier,2025 -07-01,denied,alpha
1,A124,P002,99214,Incorrect NPI,2025 -07-10,denied,alpha
2,A125,,99215,Authorization expired,2025 -07-05,denied,alpha
3,A126,P003,99381,,2025 -07-15,approved,alpha
4,A127,P004,99401,Prior auth required,2025 -07-20,denied,alpha


Unnamed: 0,id,member,code,error_msg,date,status,source_system
0,B987,P010,99213,Incorrect provider type,2025-07-03,denied,beta
1,B988,P011,99214,Missing modifier,2025-07-09,denied,beta
2,B989,P012,99215,,2025-07-10,approved,beta
3,B990,,99401,incorrect procedure,2025-07-01,denied,beta


In [12]:
# Report missing values
print("df_alpha missing values:\n", df_alpha.isna().sum()[lambda x: x > 0], "\n")
print("df_beta  missing values:\n", df_beta.isna().sum()[lambda x: x > 0])

df_alpha missing values:
 patient_id       1
denial_reason    1
dtype: int64 

df_beta  missing values:
 member       1
error_msg    1
dtype: int64


### Schema unification

We will combine claim records from different EMR sources into a single, consistent table with the same column names and data types so downstream processes can trust the data; this makes reporting and automation reliable, reduces errors from mismatched formats, improves traceability back to the original system, and speeds up resubmission decisions and audits.


In [15]:
# Transform alpha + beta into one unified schema table (with full schema enforcement)

schema = ["claim_id","patient_id","procedure_code","denial_reason","status","submitted_at","source_system"]
beta_map = {"id":"claim_id","member":"patient_id","code":"procedure_code",
            "error_msg":"denial_reason","date":"submitted_at"}

# Normalize alpha and beta column names
df_alpha_norm = df_alpha[schema].copy()
df_beta_norm  = df_beta.rename(columns=beta_map)[schema].copy()

# Concatenate
df_unified = pd.concat([df_alpha_norm, df_beta_norm], ignore_index=True)

# Enforce schema dtypes
df_unified = df_unified.astype({
    "claim_id": "string",
    "patient_id": "string",
    "procedure_code": "string",
    "denial_reason": "string",
    "status": "string",
    "source_system": "string"
})

# Parse submitted_at into datetime (ISO format), invalid -> NaT
df_unified["submitted_at"] = pd.to_datetime(df_unified["submitted_at"], errors="coerce")

print("Unified shape:", df_unified.shape)
print("\nData types after enforcing schema:")
print(df_unified.dtypes)

print("\nPreview of unified table:")
display(df_unified)


Unified shape: (9, 7)

Data types after enforcing schema:
claim_id          string[python]
patient_id        string[python]
procedure_code    string[python]
denial_reason     string[python]
status            string[python]
submitted_at      datetime64[ns]
source_system     string[python]
dtype: object

Preview of unified table:


Unnamed: 0,claim_id,patient_id,procedure_code,denial_reason,status,submitted_at,source_system
0,A123,P001,99213,Missing modifier,denied,2025-07-01,alpha
1,A124,P002,99214,Incorrect NPI,denied,2025-07-10,alpha
2,A125,,99215,Authorization expired,denied,2025-07-05,alpha
3,A126,P003,99381,,approved,2025-07-15,alpha
4,A127,P004,99401,Prior auth required,denied,2025-07-20,alpha
5,B987,P010,99213,Incorrect provider type,denied,2025-07-03,beta
6,B988,P011,99214,Missing modifier,denied,2025-07-09,beta
7,B989,P012,99215,,approved,2025-07-10,beta
8,B990,,99401,incorrect procedure,denied,2025-07-01,beta


### Data normalization 

We will clean and standardize the incoming claim data so it’s consistent and easy to use: fill missing patient IDs with “Unknown”, normalize status values to “approved” or “denied”, mark empty denial reasons explicitly, convert submission dates into real date values, and trim extra whitespace; doing this up front makes resubmission decisions, reporting, and audits accurate and reduces manual work.


In [18]:
# short config
logging.basicConfig(level=logging.INFO)              
logger = logging.getLogger("resubmission_pipeline")  
REFERENCE_DATE = pd.to_datetime("2025-07-30")       
REJECTION_LOG_PATH = "rejection_log.csv"            
USE_HYBRID = True                                   
LLM_CONF_THRESHOLD = 0.80
FALLBACK_TO_HEURISTICS_IF_LOW_CONF = True

In [20]:
#  Normalization step

# treat common sentinel tokens as missing-like
sentinels = {'', ' ', 'none', 'null', 'nan', 'n/a', 'na'}

# ensure df_unified exists
if "df_unified" not in globals():
    raise RuntimeError("df_unified not found — run ingestion before normalization")

# 1) normalize patient_id -> "Unknown" where missing-like
if "patient_id" in df_unified.columns:
    pid = df_unified["patient_id"].astype("string").str.strip()
    pid_lower = pid.str.lower().fillna("")
    mask_pid_missing = pid.isna() | pid_lower.isin(sentinels)
    df_unified.loc[mask_pid_missing, "patient_id"] = "Unknown"
    df_unified["patient_id"] = df_unified["patient_id"].astype("string").str.strip()

# 2) normalize status to 'approved'/'denied'
if "status" in df_unified.columns:
    df_unified["status"] = df_unified["status"].astype("string").str.strip().str.lower().fillna("")
    df_unified["status"] = df_unified["status"].replace({"approve": "approved", "deny": "denied"})

# 3) normalize denial_reason -> pd.NA for missing-like, else strip
if "denial_reason" in df_unified.columns:
    dr = df_unified["denial_reason"].astype("string")
    dr_stripped = dr.str.strip()
    dr_lower = dr_stripped.str.lower().fillna("")
    mask_dr_missing = dr.isna() | dr_lower.isin(sentinels)
    df_unified.loc[mask_dr_missing, "denial_reason"] = pd.NA
    df_unified.loc[~mask_dr_missing, "denial_reason"] = dr_stripped[~mask_dr_missing]

# 4) parse submitted_at to datetime 
if "submitted_at" in df_unified.columns:
    df_unified["submitted_at"] = pd.to_datetime(df_unified["submitted_at"], errors="coerce")

# quick check print
print("Normalization complete — rows:", len(df_unified))
display(df_unified)

Normalization complete — rows: 9


Unnamed: 0,claim_id,patient_id,procedure_code,denial_reason,status,submitted_at,source_system
0,A123,P001,99213,Missing modifier,denied,2025-07-01,alpha
1,A124,P002,99214,Incorrect NPI,denied,2025-07-10,alpha
2,A125,Unknown,99215,Authorization expired,denied,2025-07-05,alpha
3,A126,P003,99381,,approved,2025-07-15,alpha
4,A127,P004,99401,Prior auth required,denied,2025-07-20,alpha
5,B987,P010,99213,Incorrect provider type,denied,2025-07-03,beta
6,B988,P011,99214,Missing modifier,denied,2025-07-09,beta
7,B989,P012,99215,,approved,2025-07-10,beta
8,B990,Unknown,99401,incorrect procedure,denied,2025-07-01,beta


### Classification, eligibility & outputs 

We will classify denial reasons using trusted rule lists with a safe fallback for unclear cases, cache those results for speed and consistency, and apply clear business rules (status, patient presencetime since submissionim, and retryable classification) to mark claims that should be automatically retried; we will also write an auditable rejection log, produce an atomic candidates export, and save run-level metrics so operations can recover valid revenue reliably, cut manual work, and show a clear trail for reviewers.


In [23]:
# resubmission logic, classifier, caching, and helper writers

# heuristic lists
KNOWN_RETRYABLE = ["missing modifier", "incorrect npi", "prior auth required"]
KNOWN_NON_RETRYABLE = ["authorization expired", "incorrect provider type"]
RETRY_KEYWORDS = ["missing", "modifier", "npi", "prior auth", "auth required", "authorization required", "auth"]
NON_RETRY_KEYWORDS = ["expired", "authorization expired", "provider type", "not billable", "not covered"]

# compile word-boundary regex patterns once
def _compile_word_patterns(phrases):
    compiled = {}
    for p in phrases:
        try:
            compiled[p] = re.compile(r"\b" + re.escape(p.lower()) + r"\b")
        except Exception:
            compiled[p] = re.compile(re.escape(p.lower()))
    return compiled

_COMPILED_KNOWN_RETRYABLE = _compile_word_patterns(KNOWN_RETRYABLE)
_COMPILED_KNOWN_NON_RETRYABLE = _compile_word_patterns(KNOWN_NON_RETRYABLE)

def _word_match_precompiled(text: Optional[str], compiled_patterns: Dict[str, re.Pattern]) -> bool:
    if text is None:
        return False
    t = str(text).lower()
    for pat in compiled_patterns.values():
        if pat.search(t):
            return True
    return False

# deterministic mocked LLM
def mocked_llm_infer(text: Optional[str]) -> Dict[str, Any]:
    t = (text or "").strip().lower()
    mapping = {
        "missing modifier": ("known_retryable", 0.98),
        "incorrect npi": ("known_retryable", 0.95),
        "prior auth required": ("known_retryable", 0.97),
        "authorization expired": ("known_non_retryable", 0.99),
        "incorrect provider type": ("known_non_retryable", 0.99),
    }
    if t in mapping:
        return {"label": mapping[t][0], "confidence": mapping[t][1]}
    if any(k in t for k in ["incorrect procedure", "form incomplete", "not billable", "procedure", "form"]):
        return {"label": "ambiguous", "confidence": 0.60}
    if t == "" or t in {"none", "null", "nan"}:
        return {"label": "ambiguous", "confidence": 0.50}
    return {"label": "ambiguous", "confidence": 0.55}

# classifier using heuristics + mocked LLM
def classify_with_strategy(
    reason: Optional[str],
    use_heuristic_for_ambiguous: bool = True,
    use_hybrid: bool = USE_HYBRID,
    llm_conf_threshold: float = LLM_CONF_THRESHOLD,
    fallback_to_heuristics_if_low_conf: bool = FALLBACK_TO_HEURISTICS_IF_LOW_CONF,
    llm_infer_fn: Callable[[Optional[str]], Dict[str, Any]] = mocked_llm_infer,
) -> Dict[str, Any]:
    if reason is None or (isinstance(reason, float) and np.isnan(reason)):
        return {"label": "missing", "confidence": 1.0, "source": "heuristic"}
    text = str(reason).strip()
    if text == "":
        return {"label": "missing", "confidence": 1.0, "source": "heuristic"}
    text_l = text.lower()
    if text_l in {"none", "null", "nan"}:
        return {"label": "missing", "confidence": 1.0, "source": "heuristic_text_normalized"}
    if use_hybrid and use_heuristic_for_ambiguous:
        if _word_match_precompiled(text_l, _COMPILED_KNOWN_NON_RETRYABLE):
            return {"label": "known_non_retryable", "confidence": 0.99, "source": "heuristic"}
        if _word_match_precompiled(text_l, _COMPILED_KNOWN_RETRYABLE):
            return {"label": "known_retryable", "confidence": 0.99, "source": "heuristic"}
        for kw in NON_RETRY_KEYWORDS:
            if re.search(r"\b" + re.escape(kw.lower()) + r"\b", text_l):
                return {"label": "heuristic_non_retryable", "confidence": 0.85, "source": "heuristic"}
        for kw in RETRY_KEYWORDS:
            if re.search(r"\b" + re.escape(kw.lower()) + r"\b", text_l):
                return {"label": "heuristic_retryable", "confidence": 0.85, "source": "heuristic"}
    llm_out = llm_infer_fn(text)
    llm_label = llm_out.get("label", "ambiguous")
    llm_conf = float(llm_out.get("confidence", 0.0))
    if not use_hybrid:
        return {"label": llm_label or "ambiguous", "confidence": llm_conf, "source": "llm"}
    if llm_conf >= llm_conf_threshold:
        return {"label": llm_label or "ambiguous", "confidence": llm_conf, "source": "llm"}
    if fallback_to_heuristics_if_low_conf:
        if any(kw in text_l for kw in RETRY_KEYWORDS):
            return {"label": "heuristic_retryable", "confidence": 0.50, "source": "fallback"}
        if any(kw in text_l for kw in NON_RETRY_KEYWORDS):
            return {"label": "heuristic_non_retryable", "confidence": 0.50, "source": "fallback"}
        return {"label": "ambiguous", "confidence": llm_conf, "source": "llm_low_conf"}
    else:
        return {"label": llm_label or "ambiguous", "confidence": llm_conf, "source": "llm_low_conf"}

# atomic rejection log writer
def _atomic_write_rejection_log(rejects: pd.DataFrame, target_path: str) -> None:
    target = Path(target_path)
    if len(rejects) == 0:
        logger.info("No rejected rows to write to %s", target_path)
        return
    target.parent.mkdir(parents=True, exist_ok=True)
    with tempfile.NamedTemporaryFile(mode="w", delete=False, newline="", suffix=".csv") as tmp:
        tmp_path = Path(tmp.name)
        rejects.to_csv(tmp_path, index=False)
    try:
        if not target.exists():
            os.replace(tmp_path, target)
            logger.info("Wrote %d rejected rows to %s (created)", len(rejects), target_path)
        else:
            with open(target, "ab") as f_target, open(tmp_path, "rb") as f_tmp:
                shutil.copyfileobj(f_tmp, f_target)
            logger.info("Appended %d rejected rows to %s", len(rejects), target_path)
            tmp_path.unlink(missing_ok=True)
    except Exception as e:
        logger.exception("Failed to write rejection log atomically: %s", e)
        try:
            rejects.to_csv(target, mode="a", header=not target.exists(), index=False)
            logger.info("Fallback wrote %d rejected rows to %s", len(rejects), target_path)
        except Exception as e2:
            logger.exception("Fallback write also failed: %s", e2)

# compute eligibility (assumes input normalized)
def compute_resubmission_eligibility(
    df: pd.DataFrame,
    reference_date: Optional[pd.Timestamp] = None,
    write_rejection_log: bool = True,
    rejection_log_path: str = REJECTION_LOG_PATH,
    allow_hybrid: bool = USE_HYBRID,
    llm_conf_threshold: float = LLM_CONF_THRESHOLD,
    llm_infer_fn: Callable[[Optional[str]], Dict[str, Any]] = mocked_llm_infer,
) -> pd.DataFrame:
    # defensive: create missing required columns as pd.NA
    required_cols = {"status", "patient_id", "submitted_at", "denial_reason"}
    missing_cols = required_cols - set(df.columns)
    if missing_cols:
        logger.warning("Missing required columns %s — creating with pd.NA and continuing", missing_cols)
        df = df.copy()
        for c in missing_cols:
            df[c] = pd.NA
    else:
        df = df.copy()

    # normalize submitted_at to datetime
    df["submitted_at"] = pd.to_datetime(df["submitted_at"], errors="coerce")
    if reference_date is None:
        reference_date = REFERENCE_DATE
    else:
        reference_date = pd.to_datetime(reference_date)

    # use normalized denial_reason and cache unique values
    placeholder_missing = "<__MISSING__>"
    map_keys = df["denial_reason"].fillna(placeholder_missing)
    unique_reasons = pd.Index(map_keys.unique())

    cache: Dict[str, Dict[str, Any]] = {}
    for key in unique_reasons:
        if key == placeholder_missing:
            cache[key] = {"label": "missing", "confidence": 1.0, "source": "heuristic"}
        else:
            cache[key] = classify_with_strategy(
                key,
                use_heuristic_for_ambiguous=True,
                use_hybrid=allow_hybrid,
                llm_conf_threshold=llm_conf_threshold,
                fallback_to_heuristics_if_low_conf=FALLBACK_TO_HEURISTICS_IF_LOW_CONF,
                llm_infer_fn=llm_infer_fn,
            )

    # map cached classification results back to dataframe
    df["_denial_classification"] = map_keys.map(lambda k: cache.get(k, {"label": "ambiguous"})["label"])
    df["_denial_confidence"] = map_keys.map(lambda k: float(cache.get(k, {"confidence": 0.0})["confidence"]))
    df["_denial_inference_source"] = map_keys.map(lambda k: cache.get(k, {"source": "unknown"})["source"])

    # compute days since submitted
    df["_days_since_submitted"] = (pd.to_datetime(reference_date) - df["submitted_at"]).dt.days

    # treat 'Unknown' as missing patient_id
    patient_series = df["patient_id"].astype(str).str.strip()
    df["_patient_present"] = df["patient_id"].notna() & patient_series.ne("") & patient_series.str.lower().ne("unknown")

    # eligibility mask
    allowed_classes_for_resubmit = {"known_retryable", "heuristic_retryable"}
    is_denied = df["status"].astype(str).str.strip().str.lower() == "denied"
    days_ok = df["_days_since_submitted"].notna() & (df["_days_since_submitted"] > 7)
    class_allowed = df["_denial_classification"].isin(allowed_classes_for_resubmit)
    df["resubmission_eligible"] = is_denied & df["_patient_present"] & days_ok & class_allowed

    # human-readable reason (precedence)
    reason = pd.Series("ambiguous_classification", index=df.index)
    reason.loc[~is_denied] = "not_denied"
    pid_missing = ~df["_patient_present"]
    days_missing = df["_days_since_submitted"].isna()
    too_recent = df["_days_since_submitted"] <= 7
    is_allowed = df["_denial_classification"].isin(allowed_classes_for_resubmit)
    is_known_non_retry = df["_denial_classification"].isin(["known_non_retryable", "heuristic_non_retryable"])
    is_missing_denial = df["_denial_classification"] == "missing"
    reason.loc[is_denied & pid_missing] = "missing_patient_id"
    reason.loc[is_denied & days_missing] = "missing_submitted_at"
    reason.loc[is_denied & too_recent] = "too_recent"
    reason.loc[is_denied & is_allowed] = df.loc[is_denied & is_allowed, "_denial_classification"]
    reason.loc[is_denied & is_known_non_retry] = "non_retryable"
    reason.loc[is_denied & is_missing_denial] = "missing_denial_reason"
    df["reason"] = reason

    # write rejection log if asked
    rejection_mask = df["_denial_classification"].isin(["ambiguous", "heuristic_non_retryable", "known_non_retryable"])
    rejects = df[rejection_mask]
    if write_rejection_log:
        try:
            _atomic_write_rejection_log(rejects, rejection_log_path)
        except Exception as e:
            logger.exception("Failed to write rejection log: %s", e)

    return df

# --- ADDED HELPER: recommend_from_reason (minimal, deterministic) ---
def recommend_from_reason(reason_text: Optional[str], classification: Optional[str] = None) -> str:
    # small deterministic recommender for human-readable suggested action
    if reason_text is None or (isinstance(reason_text, float) and pd.isna(reason_text)):
        if classification in ("known_retryable", "heuristic_retryable"):
            return "Review denial note, correct the issue, and resubmit"
        return "Review denial reason and decide manually"
    t = str(reason_text).strip().lower()
    if "incorrect npi" in t or re.search(r'\bnpi\b', t):
        return "Review NPI number and resubmit"
    if "missing modifier" in t or "modifier" in t:
        return "Add missing modifier and resubmit"
    if "prior auth" in t or "prior authorization" in t or "auth required" in t:
        return "Obtain prior authorization and resubmit"
    if "form incomplete" in t or "incomplete" in t:
        return "Complete missing form fields and resubmit"
    if "incorrect procedure" in t or "incorrect procedure code" in t:
        return "Verify procedure code (CPT/HCPCS) and resubmit"
    if "not billable" in t:
        return "Not typically retryable — review payer rules; consider manual appeal"
    if classification in ("known_retryable", "heuristic_retryable"):
        return "Review denial note, correct the issue, and resubmit"
    return "Manual review recommended"

# helper: write candidates JSON atomically (used by orchestration)
def write_resubmission_candidates(df: pd.DataFrame, out_path: str = "resubmission_candidates.json") -> str:
    candidates = df[df["resubmission_eligible"].astype(bool)].copy()
    out_list = []
    seen = set()
    for _, row in candidates.iterrows():
        cid = None if pd.isna(row.get("claim_id")) else str(row.get("claim_id")).strip()
        if cid and cid in seen:
            continue
        if cid:
            seen.add(cid)
        out_item = {
            "claim_id": cid if cid else None,
            "patient_id": None if pd.isna(row.get("patient_id")) else row.get("patient_id"),
            "submitted_at": None if pd.isna(row.get("submitted_at")) else pd.to_datetime(row.get("submitted_at")).isoformat(),
            "denial_reason": None if pd.isna(row.get("denial_reason")) else row.get("denial_reason"),
            "_denial_classification": row.get("_denial_classification"),
            "_denial_confidence": float(row.get("_denial_confidence")) if pd.notna(row.get("_denial_confidence")) else None,
            "source_system": row.get("source_system") if pd.notna(row.get("source_system")) else "unknown",
            "recommended_changes": recommend_from_reason(row.get("denial_reason"), row.get("_denial_classification"))
        }
        out_list.append(out_item)
    tmp = None
    try:
        with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", encoding="utf-8") as fh:
            tmp = fh.name
            json.dump(out_list, fh, indent=2, ensure_ascii=False)
        os.replace(tmp, out_path)
        logger.info("Wrote %d candidates to %s", len(out_list), out_path)
    except Exception as e:
        if tmp and os.path.exists(tmp):
            os.remove(tmp)
        logger.exception("Failed to write candidates JSON: %s", e)
        raise
    return out_path

# helper: compute and write metrics atomically
def compute_and_write_metrics(df: pd.DataFrame, out_dir: str = ".", write_json: bool = True) -> Dict[str, Any]:
    run_id = str(uuid.uuid4())
    total_claims = int(len(df))
    total_denied = int(df['status'].astype(str).str.lower().eq('denied').sum())
    total_flagged = int(df['resubmission_eligible'].astype(bool).sum())
    total_excluded = int(total_denied - total_flagged)
    counts_by_source = df['source_system'].value_counts(dropna=False).to_dict()
    flagged_by_source = df[df['resubmission_eligible']].groupby('source_system').size()
    flagged_by_source = flagged_by_source.reindex(index=list(counts_by_source.keys()), fill_value=0).to_dict()
    excluded_reasons = df[~df['resubmission_eligible']]['reason'].value_counts(dropna=False).to_dict()
    denial_class_counts = df['_denial_classification'].value_counts(dropna=False).to_dict()
    metrics = {
        "run_id": run_id,
        "timestamp_utc": datetime.utcnow().isoformat() + "Z",
        "total_claims": total_claims,
        "total_denied": total_denied,
        "total_flagged_for_resubmission": total_flagged,
        "total_excluded_denied": total_excluded,
        "counts_by_source": counts_by_source,
        "flagged_by_source": flagged_by_source,
        "excluded_reasons": excluded_reasons,
        "denial_classification_counts": denial_class_counts
    }
    if write_json:
        fname = os.path.join(out_dir, f"pipeline_metrics_summary_{run_id}.json")
        tmp = None
        try:
            with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json", dir=out_dir) as fh:
                tmp = fh.name
                json.dump(metrics, fh, indent=2)
            os.replace(tmp, fname)
            logger.info("Wrote metrics JSON: %s flagged=%d", fname, total_flagged)
        except Exception as e:
            if tmp and os.path.exists(tmp):
                os.remove(tmp)
            logger.exception("Failed to write metrics JSON: %s", e)
            raise
    logger.info("metrics: run_id=%s total=%d denied=%d flagged=%d excluded_denied=%d by_source=%s",
                metrics["run_id"], total_claims, total_denied, total_flagged, total_excluded, counts_by_source)
    return metrics

### Orchestration & output

We will run the full pipeline to enrich the unified claims dataset and produce a single authoritative set of outputs — an atomic resubmission candidates file, a per-run metrics file for monitoring, and an append-only rejection l.sB by applying the eligibility rules consistently (denied status, valid patient, sufficient time since submission, and retryable classification) and writing artifacts atomically, we ensure downstream systems and auditors get reliable, traceable data that reduces manual review, accelerates recoveries, and supports operational oversigh.


In [26]:
# orchestration — run pipeline, write candidates + metrics, and display preview 

# ensure df_unified exists
if "df_unified" not in globals():
    raise RuntimeError("df_unified not found — run the pipeline orchestration cell first")

# run compute to enrich df_unified and write rejection log
df_unified = compute_resubmission_eligibility(df_unified, reference_date=REFERENCE_DATE, write_rejection_log=True)

# write candidates JSON  
candidates_path = write_resubmission_candidates(df_unified, out_path="resubmission_candidates.json")

# compute/writes metrics JSON 
metrics = compute_and_write_metrics(df_unified, out_dir=".", write_json=True)

# canonical eligible mask (single source of truth)
eligible_mask = df_unified["resubmission_eligible"].astype(bool)

# build masks for other sections
ambiguous_mask    = (df_unified["status"].astype(str).str.lower() == "denied") & (df_unified["_denial_classification"].isin(["ambiguous", "heuristic_non_retryable"]))  # needs review
approved_mask     = df_unified["reason"].astype(str).eq("not_denied")  # approved claims
known_non_retry_mask = df_unified["_denial_classification"] == "known_non_retryable"  # non-eligible known

# choose display columns (consistent across sections)
display_cols = ["claim_id","patient_id","submitted_at","_days_since_submitted","denial_reason","_denial_classification","_denial_confidence","resubmission_eligible","reason","source_system"]

# Previews for inspection
print(f"Run complete — candidates: {candidates_path}  |  metrics_run_id: {metrics['run_id']}")
print(f"Total claims: {metrics['total_claims']}  |  Denied: {metrics['total_denied']}  |  Flagged: {metrics['total_flagged_for_resubmission']}")
print("\nDenial classification counts:")
display(df_unified["_denial_classification"].value_counts(dropna=False).rename_axis("classification").reset_index(name="count"))

# print overall resubmission reason counts
print("Resubmission reason counts (overall):")
display(df_unified["reason"].value_counts(dropna=False).rename_axis("reason").reset_index(name="count"))

# Eligible for automated resubmission (canonical)
print(f"\nEligible for automated resubmission: {int(eligible_mask.sum())} rows")
display(df_unified.loc[eligible_mask, display_cols])

# Ambiguous / needs-review
print(f"\nAmbiguous / needs-review: {int(ambiguous_mask.sum())} rows")
display(df_unified.loc[ambiguous_mask, display_cols])

# Approved claims
print(f"\nApproved claims: {int(approved_mask.sum())} rows")
display(df_unified.loc[approved_mask, display_cols])

# ❌ Known Non-Retryable (Non-Eligible for automated resubmission)
print(f"\n❌ Known Non-Retryable (Non-Eligible for automated resubmission): {int(known_non_retry_mask.sum())} rows")
display(df_unified.loc[known_non_retry_mask, display_cols])

INFO:resubmission_pipeline:Wrote 3 rejected rows to rejection_log.csv (created)
INFO:resubmission_pipeline:Wrote 4 candidates to resubmission_candidates.json
INFO:resubmission_pipeline:Wrote metrics JSON: .\pipeline_metrics_summary_40625afb-3fad-4f80-a163-9e64f1b61252.json flagged=4
INFO:resubmission_pipeline:metrics: run_id=40625afb-3fad-4f80-a163-9e64f1b61252 total=9 denied=7 flagged=4 excluded_denied=3 by_source={'alpha': 5, 'beta': 4}


Run complete — candidates: resubmission_candidates.json  |  metrics_run_id: 40625afb-3fad-4f80-a163-9e64f1b61252
Total claims: 9  |  Denied: 7  |  Flagged: 4

Denial classification counts:


Unnamed: 0,classification,count
0,known_retryable,4
1,known_non_retryable,2
2,missing,2
3,ambiguous,1


Resubmission reason counts (overall):


Unnamed: 0,reason,count
0,known_retryable,4
1,non_retryable,2
2,not_denied,2
3,missing_patient_id,1



Eligible for automated resubmission: 4 rows


Unnamed: 0,claim_id,patient_id,submitted_at,_days_since_submitted,denial_reason,_denial_classification,_denial_confidence,resubmission_eligible,reason,source_system
0,A123,P001,2025-07-01,29,Missing modifier,known_retryable,0.99,True,known_retryable,alpha
1,A124,P002,2025-07-10,20,Incorrect NPI,known_retryable,0.99,True,known_retryable,alpha
4,A127,P004,2025-07-20,10,Prior auth required,known_retryable,0.99,True,known_retryable,alpha
6,B988,P011,2025-07-09,21,Missing modifier,known_retryable,0.99,True,known_retryable,beta



Ambiguous / needs-review: 1 rows


Unnamed: 0,claim_id,patient_id,submitted_at,_days_since_submitted,denial_reason,_denial_classification,_denial_confidence,resubmission_eligible,reason,source_system
8,B990,Unknown,2025-07-01,29,incorrect procedure,ambiguous,0.6,False,missing_patient_id,beta



Approved claims: 2 rows


Unnamed: 0,claim_id,patient_id,submitted_at,_days_since_submitted,denial_reason,_denial_classification,_denial_confidence,resubmission_eligible,reason,source_system
3,A126,P003,2025-07-15,15,,missing,1.0,False,not_denied,alpha
7,B989,P012,2025-07-10,20,,missing,1.0,False,not_denied,beta



❌ Known Non-Retryable (Non-Eligible for automated resubmission): 2 rows


Unnamed: 0,claim_id,patient_id,submitted_at,_days_since_submitted,denial_reason,_denial_classification,_denial_confidence,resubmission_eligible,reason,source_system
2,A125,Unknown,2025-07-05,25,Authorization expired,known_non_retryable,0.99,False,non_retryable,alpha
5,B987,P010,2025-07-03,27,Incorrect provider type,known_non_retryable,0.99,False,non_retryable,beta
