## Data transformation file for chicago and US enriched datasets

In [35]:
import pandas as pd 
import numpy as np
import math
import csv
import re
from collections import defaultdict, Counter

In [36]:
#Initial dataset to check to see what states are represented in the US dataset
import csv

file_path = "us_hospitals_data_enriched.csv"
states = set()

with open(file_path, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        state = row.get('detail_state', '').strip()
        if state:
            states.add(state)

print("Unique states found in dataset:")
for s in sorted(states):
    print(s)

Unique states found in dataset:
AK
AL
AR
AZ
CA
CO
CT
DE
FL
GA
HI
IA
ID
IL
IN
KS
KY
LA
MA
MD
ME
MI
MN
MO
MS
MT
NC
ND
NE
NH
NJ
NM
NV
NY
OH
OK
OR
PA
RI
SC
SD
TN
TX
UT
VA
VT
WA
WI
WV
WY


Clean CMS data

In [37]:
#omitting hospitals that are not located in the US and have empty values in the APC_DESC column
def extract_clean_rows(input_path: str):
    """
    Keep only rows where Rndrng_Prvdr_State_Abrvtn is a US state.
    Writes cleaned_medicare_op.csv in the current directory.
    """
    output_path = "cleaned_medicare_op.csv"
    with open(input_path, newline="", encoding="utf-8") as fin, \
         open(output_path, "w", newline="", encoding="utf-8") as fout:

        reader = csv.DictReader(fin)
        writer = csv.DictWriter(fout, fieldnames=reader.fieldnames)
        writer.writeheader()

        for row in reader:
            state = (row.get("Rndrng_Prvdr_State_Abrvtn") or "").strip().upper()
            if state in states:
                writer.writerow(row)

    print(f"Saved -only rows to {output_path}")


Count the number of same procedures a single hospital does and save the highest frequency of the same procedure from each hospital to a new csv with the name of the hospital, state, city and zip


In [38]:
def hospital_top_procedures(
    input_path="cleaned_medicare_op.csv",
    output_path="hospital_top_procedures.csv"
):
    """
    For each hospital, find the most frequent APC_DESC procedure(s).
    Hospital identity columns (1-based in the source): 2=name, 3=state, 4=city, 5=address, 7=zip.
    Output columns (reordered): name, address, city, state, zip, Top_Procedures.
    """
    # 1-based -> 0-based indexes for hospital identity columns in the source file
    NAME_IDX   = 1
    STATE_IDX  = 2
    CITY_IDX   = 3
    ADDR_IDX   = 4
    ZIP_IDX    = 6

    with open(input_path, newline="", encoding="utf-8") as fin:
        reader = csv.reader(fin)
        header = next(reader)
        try:
            apc_idx = next(i for i, h in enumerate(header) if h.strip().lower() == "apc_desc")
        except StopIteration:
            raise RuntimeError("Could not find 'APC_DESC' column in the input file.")

    proc_counts = defaultdict(Counter)
    first_seen_text = defaultdict(dict)  # hospital_key -> {proc_norm: original_text}

    with open(input_path, newline="", encoding="utf-8") as fin:
        reader = csv.reader(fin)
        _ = next(reader)  # skip header

        for row in reader:
            if len(row) <= max(apc_idx, ZIP_IDX):
                continue

            name = (row[NAME_IDX]  or "").strip()
            state = (row[STATE_IDX] or "").strip()
            city = (row[CITY_IDX]  or "").strip()
            addr = (row[ADDR_IDX]  or "").strip()
            zip_ = (row[ZIP_IDX]   or "").strip()

            hospital_key = (name, state, city, addr, zip_)

            apc_raw = (row[apc_idx] or "").strip()
            if not apc_raw:
                continue

            apc_norm = apc_raw.lower()
            proc_counts[hospital_key][apc_norm] += 1
            first_seen_text[hospital_key].setdefault(apc_norm, apc_raw)

    with open(output_path, "w", newline="", encoding="utf-8") as fout:
        writer = csv.writer(fout)
        writer.writerow(["Hospital_Name", "Address", "City", "State", "ZIP", "Top_Procedures"])

        for hospital_key, counter in proc_counts.items():
            name, state, city, addr, zip_ = hospital_key

            if not counter:
                top_str = "data not available"
            else:
                max_freq = max(counter.values())
                tied = [p for p, c in counter.items() if c == max_freq]
                tied_sorted = sorted(
                    (first_seen_text[hospital_key][p] for p in tied),
                    key=lambda s: s.lower()
                )[:3]
                top_str = " | ".join(tied_sorted)

            # Reordered output: name, address, city, state, zip
            writer.writerow([name, addr, city, state, zip_, top_str])

    print(f" Wrote per-hospital top procedures to {output_path}")


Append top procedures to a final working csv

In [39]:


#  Normalizers
SUFFIX_MAP = {
    "STREET":"ST", "ST":"ST",
    "ROAD":"RD", "RD":"RD",
    "AVENUE":"AVE", "AVE":"AVE",
    "BOULEVARD":"BLVD", "BLVD":"BLVD",
    "DRIVE":"DR", "DR":"DR",
    "COURT":"CT", "CT":"CT",
    "LANE":"LN", "LN":"LN",
    "HIGHWAY":"HWY", "HWY":"HWY",
    "PARKWAY":"PKWY", "PKWY":"PKWY",
}

NAME_STRIP_WORDS = {
    "HOSPITAL","HOSP","MEDICAL","MED","CENTER","CTR","HEALTH","HLTH",
    "SYSTEM","SYS","CLINIC","LLC","INC","LTD"
}

def _norm_space_upper(s: str) -> str:
    s = (s or "").strip().upper()
    s = re.sub(r"\s+", " ", s)
    return s

def _norm_zip5(s: str) -> str:
    s = (s or "").strip()
    digits = re.sub(r"\D", "", s)
    if len(digits) >= 5:
        return digits[:5]
    return digits.zfill(5) if digits else ""

def _norm_state(s: str) -> str:
    s = _norm_space_upper(s)
    m = re.search(r"\b([A-Z]{2})\b", s)
    return m.group(1) if m else s[:2]

def _simplify_name(name: str) -> str:
    t = _norm_space_upper(name)
    # remove punctuation
    t = re.sub(r"[^\w\s]", " ", t)
    # collapse spaces
    t = re.sub(r"\s+", " ", t)
    # remove common institution words
    tokens = [tok for tok in t.split() if tok not in NAME_STRIP_WORDS]
    return " ".join(tokens)

def _norm_address(addr: str) -> str:
    t = _norm_space_upper(addr)
    # Drop suite/unit/ste/apt/# fragments
    t = re.sub(r"\b(APT|UNIT|STE|SUITE|#)\s*\w+\b", "", t)
    # Standardize street suffixes at end
    # remove punctuation
    t = re.sub(r"[^\w\s]", " ", t)
    t = re.sub(r"\s+", " ", t).strip()
    parts = t.split()
    if parts:
        last = parts[-1]
        # expand/normalize last token
        last_norm = SUFFIX_MAP.get(last, SUFFIX_MAP.get(last.rstrip("."), last))
        parts[-1] = last_norm
    return " ".join(parts)

def _key_full(name, addr, city, state, zip5):
    return ("F", _norm_space_upper(name), _norm_address(addr),
            _norm_space_upper(city), _norm_state(state), _norm_zip5(zip5))

def _key_name_zip(name, zip5):
    return ("NZ", _norm_space_upper(name), _norm_zip5(zip5))

def _key_name_city_state(name, city, state):
    return ("NCS", _norm_space_upper(name), _norm_space_upper(city), _norm_state(state))

def _key_addr_zip(addr, zip5):
    return ("AZ", _norm_address(addr), _norm_zip5(zip5))

def _key_simplename_zip(name, zip5):
    return ("SNZ", _simplify_name(name), _norm_zip5(zip5))

# --- Joiner ---
def append_top_procedures(
    hosp_proc_path="hospital_top_procedures.csv",
    er_path="us_hospitals_data_enriched.csv",
    output_path="US_er_final.csv"
):
    # Build multi-index map from hospital_top_procedures.csv
    # Expected columns there: Hospital_Name, Address, City, State, ZIP, Top_Procedures
    maps = {}  # dict of key -> procedures
    def _put(k, v):
        if k and v:
            maps[k] = v

    with open(hosp_proc_path, newline="", encoding="utf-8") as f:
        r = csv.DictReader(f)
        fn = r.fieldnames or []
        need = lambda cands: next((c for c in cands if c in fn), None)

        name_c = need(["Hospital_Name","Name","Hospital"])
        addr_c = need(["Address","Street","Addr"])
        city_c = need(["City","Town"])
        state_c = need(["State"])
        zip_c = need(["ZIP","Zip","PostalCode"])
        proc_c = need(["Top_Procedures","Top Procedures","Procedures"])

        if not all([name_c, addr_c, city_c, state_c, zip_c, proc_c]):
            raise RuntimeError("hospital_top_procedures.csv missing required columns")

        for row in r:
            name = row.get(name_c, "")
            addr = row.get(addr_c, "")
            city = row.get(city_c, "")
            state = row.get(state_c, "")
            zip5 = row.get(zip_c, "")
            procs = (row.get(proc_c) or "").strip() or "data not available"

            _put(_key_full(name, addr, city, state, zip5), procs)
            _put(_key_name_zip(name, zip5), procs)
            _put(_key_name_city_state(name, city, state), procs)
            _put(_key_addr_zip(addr, zip5), procs)
            _put(_key_simplename_zip(name, zip5), procs)

    # Read ER file, append Top_Procedures
    with open(er_path, newline="", encoding="utf-8") as fin, \
         open(output_path, "w", newline="", encoding="utf-8") as fout:
        r = csv.DictReader(fin)
        if not r.fieldnames:
            raise RuntimeError("midwest_er_transformed.csv has no header")

        name_c = "hospital_name"
        addr_c = "detail_address"
        city_c = "detail_city"
        state_c = "detail_state"
        zip_c = "detail_zip"
        for col in [name_c, addr_c, city_c, state_c, zip_c]:
            if col not in r.fieldnames:
                raise RuntimeError(f"Missing required column '{col}' in {er_path}")

        out_col = "Top_Procedures"
        fieldnames = list(r.fieldnames)
        if out_col not in fieldnames:
            fieldnames.append(out_col)

        w = csv.DictWriter(fout, fieldnames=fieldnames)
        w.writeheader()

        for row in r:
            name = row.get(name_c, "")
            addr = row.get(addr_c, "")
            city = row.get(city_c, "")
            state = row.get(state_c, "")
            zip5 = row.get(zip_c, "")

            # Try in order of strictest to looser
            keys = [
                _key_full(name, addr, city, state, zip5),
                _key_name_zip(name, zip5),
                _key_name_city_state(name, city, state),
                _key_addr_zip(addr, zip5),
                _key_simplename_zip(name, zip5),
            ]
            procs = "data not available"
            for k in keys:
                if k in maps:
                    procs = maps[k]
                    break

            row[out_col] = procs
            w.writerow(row)

    print(f"Appended Top_Procedures to {output_path}")

In [40]:
def _safe_float(x):
    try:
        return float(x)
    except:
        return None

def _round_half_up(x):
    return int(math.floor(x + 0.5))

def _clamp(v, lo, hi):
    return max(lo, min(hi, v))

# 5% bands: lower mortality is better
def _adj_from_mortality_5pct_bands(pct) -> int:
    p = _safe_float(pct)
    if p is None:
        return 0
    p = max(0.0, p)
    return 1 if p < 5 else -math.ceil((p - 5.0) / 5.0)

# parse overall mortality into signed 20%-blocks (± up to 5)
_PCT_RE = re.compile(r"(\d+(?:\.\d+)?)\s*%")
def _parse_overall_mortality_blocks(r):
    direction = (r.get("detail_mortality_overall_direction") or "").strip().lower()
    pct = _safe_float(r.get("detail_mortality_overall_percent"))
    text = (r.get("detail_mortality_overall_text") or "").strip().lower()

    if direction not in ("better", "worse"):
        if "better" in text: direction = "better"
        elif "worse" in text: direction = "worse"

    if pct is None:
        m = _PCT_RE.search(text)
        if m:
            pct = _safe_float(m.group(1))

    if direction in ("better", "worse") and pct is not None:
        blocks = _clamp(_round_half_up(pct / 20.0), 0, 5)
        signed = blocks if direction == "better" else -blocks
        return f"{_round_half_up(pct)}% {direction} (±{blocks})", signed

    return "mortality not used", 0

def _ed_points_label(baseline, ed_minutes, wait_minutes):
    ed = _safe_float(ed_minutes)
    wt = _safe_float(wait_minutes)
    if (ed is None) or (wt is None) or (ed == 0) or (wt == 0):
        return "wait time rating not available", None
    pts = 10 - int(max(0.0, ed - baseline) // 30)
    if pts > 0:
        return str(pts), pts
    return "points not available", 0

_PR_MAP = {
    "very good": 10, "good": 9, "above average": 8,
    "average": 7, "below average": 6, "poor": 5, "very poor": 4
}
def _patient_points_label(text):
    t = (text or "").strip().lower()
    if t in _PR_MAP:
        pts = _PR_MAP[t]
        return str(pts), pts
    return "patient rating not available", None

def _fmt_adj_or_same(base_total, delta):
    return (
        str(base_total + delta)
        if delta != 0
        else f"no data for specific chief complaint, quality point remains at {base_total}"
    )

# ---------- main enrichment ----------
def add_quality_points(in_path: str, out_path: str) -> None:
    with open(in_path, newline="", encoding="utf-8") as f:
        rows = list(csv.DictReader(f))

    # baseline ED minutes: prefer row 2 if valid; else min valid across file
    base2 = _safe_float(rows[1].get("detail_avg_time_in_ed_minutes")) if len(rows) >= 2 else None
    if base2 is not None and base2 > 0:
        baseline = base2
    else:
        candidates = [
            _safe_float(rr.get("detail_avg_time_in_ed_minutes"))
            for rr in rows
            if _safe_float(rr.get("detail_avg_time_in_ed_minutes")) not in (None, 0)
            and _safe_float(rr.get("wait_minutes")) not in (None, 0)
        ]
        baseline = min(candidates) if candidates else 0.0

    prelim, ed_valid, patient_valid = [], [], []
    for r in rows:
        ed_label, ed_pts = _ed_points_label(baseline, r.get("detail_avg_time_in_ed_minutes"), r.get("wait_minutes"))
        if isinstance(ed_pts, int): ed_valid.append(ed_pts)

        pr_label, pr_pts = _patient_points_label(r.get("detail_overall_patient_rating"))
        if isinstance(pr_pts, int): patient_valid.append(pr_pts)

        mort_label, mort_signed = _parse_overall_mortality_blocks(r)
        prelim.append((r, ed_label, ed_pts, pr_label, pr_pts, mort_label, mort_signed))

    ed_mean = _round_half_up(sum(ed_valid) / len(ed_valid)) if ed_valid else 0
    pr_mean = _round_half_up(sum(patient_valid) / len(patient_valid)) if patient_valid else 0

    # headers (drop legacy; add new)
    fieldnames = list(rows[0].keys()) if rows else []
    for legacy in ("adjusted_qp_heartattack","adjusted_qp_stroke","adjusted_qp_heartfailure","adjusted_qp_pneu","base_total_quality_points"):
        if legacy in fieldnames: fieldnames.remove(legacy)
    for c in (
        "ed_minutes_rating",
        "detail_overall_patient_rating_points",
        "mortality_overall_contribution",
        "total_quality_points",
        "adj_total_heartattack",
        "adj_total_stroke",
        "adj_total_heartfailure",
        "adj_total_pneu",
    ):
        if c not in fieldnames: fieldnames.append(c)

    out_rows = []
    for r, ed_label, ed_pts, pr_label, pr_pts, mort_label, mort_signed in prelim:
        ed_pts = ed_mean if ed_pts is None else ed_pts
        pr_pts = pr_mean if pr_pts is None else pr_pts

        base_total = int(ed_pts) + int(pr_pts) + int(mort_signed)
        r["ed_minutes_rating"] = ed_label
        r["detail_overall_patient_rating_points"] = pr_label
        r["mortality_overall_contribution"] = mort_label
        r["total_quality_points"] = str(base_total)

        # per-condition deltas (non-cumulative)
        deltas = {
            "heartattack": _adj_from_mortality_5pct_bands(r.get("detail_mortality_heart_attack_percent")),
            "stroke":      _adj_from_mortality_5pct_bands(r.get("detail_mortality_stroke_percent")),
            "heartfailure":_adj_from_mortality_5pct_bands(r.get("detail_mortality_heart_failure_percent")),
            "pneu":        _adj_from_mortality_5pct_bands(r.get("detail_mortality_pneumonia_percent")),
        }
        r["adj_total_heartattack"] = _fmt_adj_or_same(base_total, deltas["heartattack"])
        r["adj_total_stroke"] = _fmt_adj_or_same(base_total, deltas["stroke"])
        r["adj_total_heartfailure"] = _fmt_adj_or_same(base_total, deltas["heartfailure"])
        r["adj_total_pneu"] = _fmt_adj_or_same(base_total, deltas["pneu"])

        out_rows.append(r)

    with open(out_path, "w", newline="", encoding="utf-8") as f:
        csv.DictWriter(f, fieldnames=fieldnames).writeheader()
        csv.DictWriter(f, fieldnames=fieldnames).writerows(out_rows)

# ---------- complaint-aware ranking (read-only) ----------
_COMPLAINT_TO_COL = {
    "chest pain": ["detail_mortality_heart_attack_percent"],
    "heart attack": ["detail_mortality_heart_attack_percent", "detail_mortality_overall_percent"],
    "slurred speech": ["detail_mortality_stroke_percent"],
    "facial droop": ["detail_mortality_stroke_percent"],
    "stroke": ["detail_mortality_stroke_percent"],
    "shortness of breath": ["detail_mortality_heart_failure_percent", "detail_mortality_pneumonia_percent"],
    "trouble breathing": ["detail_mortality_heart_failure_percent", "detail_mortality_pneumonia_percent"],
    "cough": ["detail_mortality_pneumonia_percent"],
    "fever": ["detail_mortality_pneumonia_percent"],
    "default": ["detail_mortality_overall_percent"],
}

def _pick_mort_col(complaint: str):
    c = (complaint or "").strip().lower()
    for k, cols in _COMPLAINT_TO_COL.items():
        if k != "default" and k in c:
            return cols
    return _COMPLAINT_TO_COL["default"]

def _mortality_points_0to5(pct) -> int | None:
    p = _safe_float(pct)
    if p is None:
        return None
    p = _clamp(p, 0.0, 100.0)
    return max(0, 5 - int(p // 20))

def complaint_points_for_row(row: dict, complaint: str) -> tuple[int, str]:
    for col in _pick_mort_col(complaint):
        pts = _mortality_points_0to5(row.get(col))
        if pts is None:
            continue
        p = _safe_float(row.get(col))
        return pts, f"{col}→{(f'{p:.1f}%' if p is not None else 'NA')} ⇒ +{pts}"
    return 0, "complaint mortality not used"

def rank_with_complaint(enriched_path: str, complaint: str, top_k: int = 10) -> list[dict]:
    with open(enriched_path, newline="", encoding="utf-8") as f:
        rows = list(csv.DictReader(f))

    results = []
    for r in rows:
        base = int(_safe_float(r.get("total_quality_points")) or 0)
        cmp_pts, explain = complaint_points_for_row(r, complaint)
        results.append({
            "name": r.get("name",""),
            "city": r.get("city",""),
            "complaint": complaint,
            "complaint_mortality_points": cmp_pts,
            "complaint_mortality_explain": explain,
            "complaint_total_quality_points": base + int(cmp_pts),
        })

    results.sort(key=lambda x: x["complaint_total_quality_points"], reverse=True)
    return results[:max(1, top_k)]


In [41]:
#Call and create new cleaned csv for US for medicare outpatient data
extract_clean_rows("Medicare_OP_Hospitals_by_Provider_and_Service_2023.csv")

Saved -only rows to cleaned_medicare_op.csv


In [42]:
#Call hospital top procedures
hospital_top_procedures()

 Wrote per-hospital top procedures to hospital_top_procedures.csv


In [43]:
#Call to append top procedures to original US csv. Create a final master csv for front end usage. 
append_top_procedures()

Appended Top_Procedures to US_er_final.csv


In [44]:
#Call and create a new tranformed us_hospitals_enriched.csv called us_er_transformed.csv
add_quality_points(
    in_path="us_hospitals_data_enriched.csv",
    out_path="us_er_transformed.csv"
)