
# Drug Standardization Pipeline (MarketScan • FAERS • MPRINT → UMLS/RxNorm)

This notebook scaffolds a reproducible pipeline to normalize drug names across three sources using **UMLS/RxNorm CUIs** as the canonical identifier.
It supports exact and fuzzy matching, handles salts/esters, and preserves audit trails for review.

**Outputs**
- Per-source standardized tables with `{source}_standardized.csv`
- Fuzzy match review queue: `fuzzy_review_candidates.csv`
- Final joined summary: `final_joined.csv`

> Tip: Run cells top-to-bottom. Fill in the configuration block first.


## 1) Configuration

In [None]:

# --- Required paths ---
# Update these paths to your local environment as needed.

from pathlib import Path

# Source inputs
MS_PATH = Path("/Users/rahurkar.1/Library/CloudStorage/OneDrive-TheOhioStateUniversityWexnerMedicalCenter/FAERS/drug_id_platform/ms_freq.csv")
FAERS_PATH = Path("/path/to/faers_unique_drugs.csv")          # TODO: set
MPRINT_PATH = Path("/path/to/mprint_unique_drugs.csv")        # TODO: set  (ideally includes a CUI column)

# UMLS / RxNorm inputs
# Option A: CSV extracts of RXNCONSO (and optionally RXNSAT/RXNREL if you have them)
# Provide at least RXNCONSO with columns: CUI, SAB, TTY, STR, CODE, RXCUI (if present)
RXNCONSO_CSV = Path("/path/to/RXNCONSO_subset.csv")           # TODO: set

# Option B (advanced): connect to a local UMLS/RxNorm database via SQLAlchemy
USE_SQL_BACKEND = False
SQLALCHEMY_URL = "postgresql+psycopg2://user:password@localhost:5432/umls"  # if using a DB

# Columns
# Set the column in each source that holds the raw drug text. Can be auto-guessed if None.
MS_DRUG_COL = None       # e.g., "drug" or "drug_name"
FAERS_DRUG_COL = None
MPRINT_DRUG_COL = None   # If MPRINT already has CUIs, set MPRINT_CUI_COL below

MPRINT_CUI_COL = "CUI"   # if MPRINT includes CUIs

# Matching thresholds
FUZZY_STRONG = 90   # auto-accept
FUZZY_REVIEW = 80   # send to review queue if in [FUZZY_REVIEW, FUZZY_STRONG)

# Output directory
OUTPUT_DIR = Path("/mnt/data/drug_standardization_outputs")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print("Config loaded. Edit paths above as needed.")


## 2) Imports & helper functions

In [None]:

import re
import pandas as pd
from typing import Optional, Tuple, List, Dict
from pathlib import Path

# Install rapidfuzz if missing (works offline)
try:
    from rapidfuzz import process, fuzz
except Exception as e:
    raise RuntimeError("Please install rapidfuzz: pip install rapidfuzz") from e

def _clean_str(s: str) -> str:
    if pd.isna(s):
        return ""
    s = str(s).lower().strip()
    # remove dosage forms / noise words frequently seen in free text; tweak as needed
    noise = [
        r"\btablet(s)?\b", r"\btab(s)?\b", r"\bcapsule(s)?\b", r"\bcap(s)?\b",
        r"\binjection\b", r"\binj\b", r"\bsolution\b", r"\bsuspension\b",
        r"\bsyrup\b", r"\bcream\b", r"\bointment\b", r"\bspray\b", r"\bdrops?\b",
        r"\ber\b", r"\bir\b", r"\bxr\b", r"\bdr\b"
    ]
    for pat in noise:
        s = re.sub(pat, " ", s)
    # unify punctuation/separators
    s = re.sub(r"[^\w\s/+-]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

# terms whose presence should not block matching to the base IN
SALT_TERMS = [
    "hcl", "hydrochloride", "sodium", "potassium", "sulfate", "sulphate",
    "phosphate", "acetate", "tartrate", "mesylate", "oxalate", "nitrate",
    "succinate", "fumarate", "bitartrate", "bromide", "magnesium", "calcium"
]

def _strip_salts(s: str) -> str:
    toks = [t for t in s.split() if t not in SALT_TERMS]
    return " ".join(toks)

def _split_combos(s: str) -> List[str]:
    # split on common combo separators
    parts = re.split(r"\s*[+/,&]\s*|\s+with\s+", s)
    parts = [p.strip() for p in parts if p.strip()]
    return parts

def auto_guess_drug_col(df: pd.DataFrame) -> Optional[str]:
    candidates = ["drug", "drug_name", "medication", "ingredient", "name", "generic_name", "brand_name"]
    for c in df.columns:
        lc = c.lower()
        if lc in candidates or any(k in lc for k in ["drug", "name", "med", "ing"]):
            return c
    # fallback to first column if it looks text-y
    first = df.columns[0]
    if pd.api.types.is_string_dtype(df[first]):
        return first
    return None

def load_source(path: Path, drug_col: Optional[str]) -> pd.DataFrame:
    df = pd.read_csv(path)
    if drug_col is None:
        drug_col = auto_guess_drug_col(df)
    if drug_col is None:
        raise ValueError(f"Could not auto-detect drug column in {path.name}. Please set the *_DRUG_COL config.")
    df = df.rename(columns={drug_col: "raw_drug"})
    df = df[["raw_drug"]].dropna().drop_duplicates().reset_index(drop=True)
    df["clean_drug"] = df["raw_drug"].apply(_clean_str)
    df["base_drug"] = df["clean_drug"].apply(_strip_salts)
    return df

def load_rxnconso_csv(path: Path) -> pd.DataFrame:
    rx = pd.read_csv(path, dtype=str)
    # keep RXNORM only
    rx = rx[rx["SAB"].str.upper() == "RXNORM"].copy()
    # standard clean
    rx["clean_str"] = rx["STR"].apply(_clean_str)
    rx["base_str"] = rx["clean_str"].apply(_strip_salts)
    # prefer relevant TTYs (IN = ingredient, PIN = precise ingredient, SCD/SBD = clinical/brand drug)
    # We'll keep all but later prioritize IN
    return rx

def pick_preferred_term(group: pd.DataFrame) -> pd.Series:
    # prioritize IN term if present; else any
    order = {"IN": 1, "PIN": 2, "SCD": 3, "SBD": 4, "GPCK": 5, "BPCK": 6}
    group["_prio"] = group["TTY"].map(order).fillna(9)
    best = group.sort_values(["_prio", "STR"]).iloc[0]
    return pd.Series({"CUI": best["CUI"], "Preferred_STR": best["STR"], "Preferred_TTY": best["TTY"]})

def exact_match(source_df: pd.DataFrame, rx: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    # exact on clean_str, then on base_str
    merged1 = source_df.merge(rx[["CUI","TTY","STR","clean_str","base_str"]],
                              left_on="clean_drug", right_on="clean_str", how="left")
    # fill remaining via base_str
    need2 = merged1[merged1["CUI"].isna()].copy()
    fill2 = need2.merge(rx[["CUI","TTY","STR","clean_str","base_str"]],
                        left_on="base_drug", right_on="base_str", how="left", suffixes=("","_b"))
    exact = pd.concat([merged1[~merged1["CUI"].isna()], fill2[~fill2["CUI"].isna()]], ignore_index=True)
    unmatched = pd.concat([merged1[merged1["CUI"].isna()], fill2[fill2["CUI"].isna()]], ignore_index=True)
    exact = exact.drop_duplicates(subset=["raw_drug","CUI"])
    return exact, unmatched[["raw_drug","clean_drug","base_drug"]].drop_duplicates()

def build_string_index(rx: pd.DataFrame) -> Tuple[List[str], Dict[str, List[Tuple[str,str]]]]:
    # Index from string → list of (CUI, STR, TTY) to retrieve CUIs after fuzzy match
    str_to_rows = {}
    candidates = set(rx["clean_str"]).union(set(rx["base_str"]))
    candidates = sorted([c for c in candidates if isinstance(c, str)])
    for _, r in rx.iterrows():
        for key in [r["clean_str"], r["base_str"]]:
            if not isinstance(key, str):
                continue
            str_to_rows.setdefault(key, []).append((r["CUI"], r["STR"], r["TTY"]))
    return candidates, str_to_rows

def fuzzy_match(unmatched_df: pd.DataFrame, rx: pd.DataFrame, strong: int=90, review: int=80) -> Tuple[pd.DataFrame, pd.DataFrame]:
    candidates, str_to_rows = build_string_index(rx)
    results = []
    reviews = []
    for _, row in unmatched_df.iterrows():
        q = row["clean_drug"]
        if not q:
            continue
        match = process.extractOne(q, candidates, scorer=fuzz.WRatio)
        if match is None:
            continue
        match_str, score, _ = match
        rows = str_to_rows.get(match_str, [])
        df = pd.DataFrame(rows, columns=["CUI","STR","TTY"])
        if df.empty:
            continue
        chosen = pick_preferred_term(df)
        out = {
            "raw_drug": row["raw_drug"],
            "query": q,
            "matched_str": match_str,
            "score": score,
            "CUI": chosen["CUI"],
            "Preferred_STR": chosen["Preferred_STR"],
            "Preferred_TTY": chosen["Preferred_TTY"]
        }
        if score >= strong:
            results.append(out)
        else:
            # anything below strong goes to review (including low scores)
            reviews.append(out)

    fm = pd.DataFrame(results)
    rv = pd.DataFrame(reviews)
    return fm, rv

def consolidate_matches(exact_df: pd.DataFrame, fuzzy_df: pd.DataFrame) -> pd.DataFrame:
    # normalize columns and pick one CUI per raw_drug (prefer exact)
    exact_norm = exact_df.rename(columns={"STR": "Preferred_STR", "TTY": "Preferred_TTY"})
    exact_norm = exact_norm[["raw_drug","CUI","Preferred_STR","Preferred_TTY"]].drop_duplicates()

    if not fuzzy_df.empty:
        fuzzy_pick = fuzzy_df.sort_values(["raw_drug","score"], ascending=[True, False]).drop_duplicates("raw_drug")
        fuzzy_pick = fuzzy_pick[["raw_drug","CUI","Preferred_STR","Preferred_TTY"]]
    else:
        fuzzy_pick = pd.DataFrame(columns=["raw_drug","CUI","Preferred_STR","Preferred_TTY"])

    combined = pd.concat([exact_norm, fuzzy_pick], ignore_index=True)
    combined = combined.drop_duplicates("raw_drug", keep="first")
    return combined


## 3) Load RxNorm / UMLS reference

In [None]:

if USE_SQL_BACKEND:
    from sqlalchemy import create_engine, text
    engine = create_engine(SQLALCHEMY_URL)
    query_sql = (
        "SELECT CUI, SAB, TTY, STR "
        "FROM RXNCONSO "
        "WHERE UPPER(SAB) = 'RXNORM'"
    )
    with engine.connect() as con:
        rx = pd.read_sql(text(query_sql), con)
else:
    rx = load_rxnconso_csv(RXNCONSO_CSV)

print(f"RxNorm terms loaded: {len(rx):,}")
rx.head(3)


## 4) Load sources & clean

In [None]:

ms = load_source(MS_PATH, MS_DRUG_COL)
print("MarketScan unique drugs:", len(ms))

# Uncomment when paths set
# faers = load_source(FAERS_PATH, FAERS_DRUG_COL)
# print("FAERS unique drugs:", len(faers))

# mprint = load_source(MPRINT_PATH, MPRINT_DRUG_COL)  # if MPRINT has CUIs, we'll align later
# print("MPRINT unique drugs:", len(mprint))

ms.head(10)


## 5) Match each source to UMLS/RxNorm

In [None]:

def match_source_to_rxnorm(src_df: pd.DataFrame, source_name: str):
    exact, unmatched = exact_match(src_df, rx)
    fuzzy, review = fuzzy_match(unmatched, rx, strong=FUZZY_STRONG, review=FUZZY_REVIEW)
    combined = consolidate_matches(exact, fuzzy)
    combined["source"] = source_name
    return combined, review

ms_map, ms_review = match_source_to_rxnorm(ms, "marketscan")
print("MarketScan mapped:", len(ms_map), "| review queue:", len(ms_review))

# When ready for others:
# faers_map, faers_review = match_source_to_rxnorm(faers, "faers")
# mprint_map, mprint_review = match_source_to_rxnorm(mprint, "mprint")


## 6) Save per-source standardized outputs

In [None]:

ms_out = ms.merge(ms_map.drop(columns=["source"]), on="raw_drug", how="left")
ms_out = ms_out.rename(columns={
    "raw_drug": "Drug",
    "CUI": "UMLS_CUI",
    "Preferred_STR": "Preferred_Term",
    "Preferred_TTY": "Preferred_TTY"
})
ms_out.to_csv(OUTPUT_DIR / "marketscan_standardized.csv", index=False)

# pd.DataFrame().to_csv(OUTPUT_DIR / "faers_standardized.csv", index=False)   # uncomment once mapped
# pd.DataFrame().to_csv(OUTPUT_DIR / "mprint_standardized.csv", index=False)  # uncomment once mapped

# Consolidate all review queues
review_all = ms_review.copy()
# review_all = pd.concat([ms_review, faers_review, mprint_review], ignore_index=True)

if not review_all.empty:
    review_all.to_csv(OUTPUT_DIR / "fuzzy_review_candidates.csv", index=False)

print("Saved:", [p.name for p in Path(OUTPUT_DIR).glob("*.csv")])


## 7) Join across sources on CUI (placeholder until FAERS/MPRINT mapped)

In [None]:

# Once FAERS/MPRINT maps exist, you can do something like:
#
# final = (
#     ms_out[["UMLS_CUI","Preferred_Term"]]
#     .merge(faers_out[["UMLS_CUI"]].assign(FAERS_flag=1), on="UMLS_CUI", how="outer")
#     .merge(mprint_out[["UMLS_CUI"]].assign(MPRINT_flag=1), on="UMLS_CUI", how="outer")
#     .fillna({"FAERS_flag":0, "MPRINT_flag":0})
#     .drop_duplicates()
# )
# final.to_csv(OUTPUT_DIR / "final_joined.csv", index=False)
#
# For count-based summaries, merge in your per-source frequency tables before grouping.
print("Join step scaffolded. Populate once FAERS/MPRINT standardized files are created.")


## 8) QC summary

In [None]:

def qc_report(mapped: pd.DataFrame, source_label: str) -> pd.DataFrame:
    total = len(mapped)
    matched = mapped["UMLS_CUI"].notna().sum()
    return pd.DataFrame({
        "source":[source_label],
        "total_unique_drugs":[total],
        "matched":[matched],
        "match_rate":[round(100*matched/total,2) if total else 0.0]
    })

qc_tables = []
qc_tables.append(qc_report(ms_out, "marketscan"))
# qc_tables.append(qc_report(faers_out, "faers"))
# qc_tables.append(qc_report(mprint_out, "mprint"))

qc = pd.concat(qc_tables, ignore_index=True)
qc.to_csv(OUTPUT_DIR / "qc_summary.csv", index=False)
qc


## 9) (Optional) Apply manual overrides for fuzzy review

In [None]:

# If you curate 'manual_overrides.csv' with columns: raw_drug, CUI
# You can re-run consolidation with enforced mappings.

OVERRIDES = OUTPUT_DIR / "manual_overrides.csv"
if OVERRIDES.exists():
    ov = pd.read_csv(OVERRIDES, dtype=str)
    # example apply to MarketScan
    if not ov.empty:
        ms_out2 = ms_out.merge(ov, on="raw_drug", how="left", suffixes=("","_OVR"))
        # prefer override CUI if present
        ms_out2["UMLS_CUI"] = ms_out2["CUI"].fillna(ms_out2["UMLS_CUI"])
        ms_out2 = ms_out2.drop(columns=["CUI"])
        ms_out2.to_csv(OUTPUT_DIR / "marketscan_standardized_overridden.csv", index=False)
        print("Applied overrides → marketscan_standardized_overridden.csv")
else:
    print("No overrides found. To use, create:", OVERRIDES)



---

### Next steps
1. Point `FAERS_PATH`, `MPRINT_PATH`, and `RXNCONSO_CSV` to your local files.
2. Run sections 3 → 9.
3. Review `fuzzy_review_candidates.csv`, curate `manual_overrides.csv` as needed, and re-run section 9.
4. Add frequency columns and build the `final_joined.csv` summary for your app.
