##Data transformation file for chicago and midwest enriched datasets

In [4]:
import pandas as pd 
import numpy as np
import math
import csv
import re

In [None]:
def _safe_float(x):
    try: return float(x)
    except: return None

def _round_half_up(x):
    return int(math.floor(x + 0.5))

# overall mortality +- 5 points max to total quality points
def _parse_overall_mortality_blocks(r):
    """
    Returns (label, signed_blocks).
    blocks = round_half_up(percent/20) clamped to [0,5]
    +blocks if 'better', -blocks if 'worse', 0 if missing/unclear.
    """
    direction = (r.get("detail_mortality_overall_direction") or "").strip().lower()
    pct = _safe_float(r.get("detail_mortality_overall_percent"))
    text = (r.get("detail_mortality_overall_text") or "").strip().lower()

    if direction not in ("better", "worse"):
        if "better" in text: direction = "better"
        elif "worse" in text: direction = "worse"

    if pct is None:
        m = re.search(r"(\d+(?:\.\d+)?)\s*%", text)
        if m: pct = _safe_float(m.group(1))

    if direction in ("better", "worse") and pct is not None:
        blocks = _round_half_up(pct / 20.0)
        blocks = max(0, min(5, blocks))
        signed = blocks if direction == "better" else -blocks
        pct_label = _round_half_up(pct)
        return f"{pct_label}% {direction} (±{blocks})", signed
    return "mortality not used", 0

# Add quality points function, append quality points column and also related vairables column
def add_quality_points(in_path: str, out_path: str) -> None:
    """
    Adds to CSV:
      - ed_minutes_rating  (10..1 / 'wait time rating not available' / 'points not available')
      - detail_overall_patient_rating_points ('10'..'4' or 'patient rating not available')
      - mortality_overall_contribution (label)
      - total_quality_points (int) = ED + Patient ± OverallMortalityBlocks
    ED rating rules:
      • Baseline ED minutes = row 2 if valid; else min valid ED in file.
      • 10 points at baseline; -1 point per +30 minutes.
      • If ED minutes or wait_minutes are 0/blank → label 'wait time rating not available' and impute mean ED rating into total.
    Patient rating:
      • very good=10, good=9, above average=8, average=7, below average=6, poor=5, very poor=4.
      • Blank/unrecognized → label 'patient rating not available', impute mean into total.
    Overall mortality:
      • From overall direction/percent or text; converts to ±blocks of 20% (max ±5). Missing → 0.
    """
    with open(in_path, newline="", encoding="utf-8") as f:
        rows = list(csv.DictReader(f))

    # minimum ED minutes, assume it is in row 2
    baseline = _safe_float(rows[1].get("detail_avg_time_in_ed_minutes")) if len(rows) >= 2 else None
    if baseline is None or baseline <= 0:
        candidates = []
        for rr in rows:
            ed = _safe_float(rr.get("detail_avg_time_in_ed_minutes"))
            wt = _safe_float(rr.get("wait_minutes"))
            if ed is not None and ed > 0 and not (wt == 0):
                candidates.append(ed)
        baseline = min(candidates) if candidates else 0.0

    pr_map = {
        "very good": 10, "good": 9, "above average": 8,
        "average": 7, "below average": 6, "poor": 5, "very poor": 4
    }

    ed_valid, patient_valid = [], []
    prelim = []
    for r in rows:
        # ED rating (10 at baseline; -1 per +30 mins)
        ed_minutes = _safe_float(r.get("detail_avg_time_in_ed_minutes"))
        wait_minutes = _safe_float(r.get("wait_minutes"))
        if (ed_minutes is None) or (wait_minutes is None) or (ed_minutes == 0) or (wait_minutes == 0):
            ed_label, ed_points = "wait time rating not available", None
        else:
            delta = max(0.0, ed_minutes - baseline)
            pts = 10 - int(delta // 30)
            if pts > 0:
                ed_label, ed_points = str(pts), pts
                ed_valid.append(pts)
            else:
                ed_label, ed_points = "points not available", 0

        # Patient rating
        pr_text = (r.get("detail_overall_patient_rating") or "").strip().lower()
        if pr_text in pr_map:
            pr_points = pr_map[pr_text]
            pr_label = str(pr_points)
            patient_valid.append(pr_points)
        else:
            pr_label, pr_points = "patient rating not available", None

        # Overall mortality ±blocks
        mort_label, mort_signed = _parse_overall_mortality_blocks(r)

        prelim.append((r, ed_label, ed_points, pr_label, pr_points, mort_label, mort_signed))

    ed_mean = _round_half_up(sum(ed_valid) / len(ed_valid)) if ed_valid else 0
    patient_mean = _round_half_up(sum(patient_valid) / len(patient_valid)) if patient_valid else 0

    fieldnames = list(rows[0].keys()) if rows else []
    for c in ["ed_minutes_rating",
              "detail_overall_patient_rating_points",
              "mortality_overall_contribution",
              "total_quality_points"]:
        if c not in fieldnames: fieldnames.append(c)

    out_rows = []
    for r, ed_label, ed_points, pr_label, pr_points, mort_label, mort_signed in prelim:
        if ed_points is None: ed_points = ed_mean
        if pr_points is None: pr_points = patient_mean
        total = int(ed_points) + int(pr_points) + int(mort_signed)
        r["ed_minutes_rating"] = ed_label
        r["detail_overall_patient_rating_points"] = pr_label
        r["mortality_overall_contribution"] = mort_label
        r["total_quality_points"] = str(total)
        out_rows.append(r)

    with open(out_path, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader(); w.writerows(out_rows)

# ---------- COMPLAINT-ADJUSTED (per-request) ----------
# map complaint→most relevant condition-specific mortality column(s)
_COMPLAINT_TO_COL = {
    "chest pain": ["detail_mortality_heart_attack_percent"],
    "heart attack": ["detail_mortality_heart_attack_percent", "detail_mortality_overall_percent"],
    "slurred speech": ["detail_mortality_stroke_percent"],
    "facial droop": ["detail_mortality_stroke_percent"],
    "stroke": ["detail_mortality_stroke_percent"],
    "shortness of breath": ["detail_mortality_heart_failure_percent", "detail_mortality_pneumonia_percent"],
    "trouble breathing": ["detail_mortality_heart_failure_percent", "detail_mortality_pneumonia_percent"],
    "cough": ["detail_mortality_pneumonia_percent"],
    "fever": ["detail_mortality_pneumonia_percent"],
    "default": ["detail_mortality_overall_percent"],
}

def _pick_mort_col(complaint: str):
    c = (complaint or "").strip().lower()
    for k, cols in _COMPLAINT_TO_COL.items():
        if k != "default" and k in c:
            return cols
    return _COMPLAINT_TO_COL["default"]

def _mortality_points_0to5(pct: float | None) -> int | None:
    """Lower mortality is better. 0–20%→5, 20–40%→4, 40–60%→3, 60–80%→2, 80–100%→1. Missing→None."""
    if pct is None: return None
    if pct < 0: pct = 0.0
    if pct > 100: pct = 100.0
    band = int(pct // 20)       # 0..5
    pts = max(0, 5 - band)      # 5..0
    return pts

def complaint_points_for_row(row: dict, complaint: str) -> tuple[int, str]:
    """
    For the given row and complaint, return (points_0to5, explain_string).
    Missing mortality for all candidate columns → (0, 'complaint mortality not used').
    """
    for col in _pick_mort_col(complaint):
        pct = _safe_float(row.get(col))
        pts = _mortality_points_0to5(pct)
        if pts is None:
            continue
        # valid mortality; 0 points if very high, but still 'used'
        return pts, f"{col}→{(f'{pct:.1f}%' if pct is not None else 'NA')} ⇒ +{pts}"
    return 0, "complaint mortality not used"

def rank_with_complaint(enriched_path: str, complaint: str, top_k: int = 10) -> list[dict]:
    """
    Reads the enriched CSV (after add_quality_points) and returns complaint-adjusted totals
    WITHOUT modifying the CSV: complaint_total = base total_quality_points + complaint_mortality_points (0..5).
    """
    with open(enriched_path, newline="", encoding="utf-8") as f:
        rows = list(csv.DictReader(f))

    out = []
    for r in rows:
        base_total = _safe_float(r.get("total_quality_points")) or 0.0
        cmp_pts, cmp_explain = complaint_points_for_row(r, complaint)
        adj_total = int(base_total) + int(cmp_pts)
        out.append({
            "name": r.get("name",""),
            "city": r.get("city",""),
            "complaint": complaint,
            "complaint_mortality_points": cmp_pts,
            "complaint_mortality_explain": cmp_explain,
            "complaint_total_quality_points": adj_total
        })

    out.sort(key=lambda x: x["complaint_total_quality_points"], reverse=True)
    return out[:max(1, top_k)]



In [10]:
#Call and create a new transformed staging_chicago_enriched.csv called chicago_er_tranformed.csv
add_quality_points(
    in_path="staging_chicago_enriched.csv",
    out_path="chicago_er_transformed.csv"
)

In [11]:
#Call and create a new tranformed midwest_hospitals_enriched.csv called midwest_er_transformed.csv
add_quality_points(
    in_path="midwest_hospitals_enriched.csv",
    out_path="midwest_er_transformed.csv"
)