In [None]:
# Cell 1 - Imports and configuration
import os
import glob
import time
import pandas as pd
import numpy as np

# Paths (edit if needed)
RAW_PATH = r"D:\ML_Project\data\raw"
PROCESSED_PATH = r"D:\ML_Project\data\processed"

os.makedirs(RAW_PATH, exist_ok=True)
os.makedirs(PROCESSED_PATH, exist_ok=True)

# Files we will create
MASTER_PREFIX = "MASTER_"
UNIQUE_DEMO_FILE = os.path.join(PROCESSED_PATH, "UNIQUE_DEMO.csv")
FINAL_MASTER_FILE = os.path.join(PROCESSED_PATH, "FINAL_MASTER_DATASET.csv")

# Quarterly file pattern config
YEARS = range(2015, 2026)  # inclusive start, exclusive end in loops below
QUARTERS = [
    "Q1",
    "Q2",
    "Q3",
    "Q4",
]  # note: your raw files may use different quarter naming - adjust if needed

print("RAW_PATH:", RAW_PATH)
print("PROCESSED_PATH:", PROCESSED_PATH)

RAW_PATH: D:\ML_Project\data\raw
PROCESSED_PATH: D:\ML_Project\data\processed


In [None]:
# Cell 2 - Build MASTER_* files from quarterly .txt files (robust concat)
# This step is idempotent: it writes MASTER_<TYPE>.csv if it finds quarterly files.
# If your MASTER_*.csv already exist and you don't want to rebuild, skip this cell.

FILE_TYPES = {
    "DEMO": [
        "primaryid",
        "caseid",
        "caseversion",
        "i_f_code",
        "event_dt",
        "mfr_dt",
        "init_fda_dt",
        "fda_dt",
        "rept_cod",
        "auth_num",
        "mfr_num",
        "mfr_sndr",
        "lit_ref",
        "age",
        "age_cod",
        "age_grp",
        "sex",
        "e_sub",
        "wt",
        "wt_cod",
        "rept_dt",
        "to_mfr",
        "occp_cod",
        "reporter_country",
        "occr_country",
    ],
    "DRUG": [
        "primaryid",
        "caseid",
        "drug_seq",
        "role_cod",
        "drugname",
        "prod_ai",
        "val_vbm",
        "route",
        "dose_vbm",
        "cum_dose_chr",
        "cum_dose_unit",
        "dechal",
        "rechal",
        "lot_num",
        "exp_dt",
        "nda_num",
        "dose_amt",
        "dose_unit",
        "dose_form",
        "dose_freq",
    ],
    "INDI": ["primaryid", "caseid", "indi_drug_seq", "indi_pt"],
    "OUTC": ["primaryid", "caseid", "outc_cod"],
    "REAC": ["primaryid", "caseid", "pt"],
    "THER": [
        "primaryid",
        "caseid",
        "dsg_drug_seq",
        "start_dt",
        "end_dt",
        "dur",
        "dur_cod",
    ],
}


def process_quarterly_to_master(ftype, master_cols):
    pattern_glob = os.path.join(RAW_PATH, f"{ftype}*.txt")
    quarterly_files = []
    # collect files by expected naming convention (FTYYQ)
    for year in YEARS:
        for q in QUARTERS:
            # e.g., DEMO15Q1 or DEMO15Q1.txt — try variants
            two = str(year)[-2:]
            candidates = [
                os.path.join(RAW_PATH, f"{ftype.upper()}{two}{q}.txt"),
                os.path.join(RAW_PATH, f"{ftype.upper()}{two}{q}.TXT"),
                os.path.join(
                    RAW_PATH, f"{ftype.upper()}{two}{q}.*"
                ),  # catch other extensions if present
            ]
            for c in candidates:
                found = glob.glob(c)
                if found:
                    quarterly_files.extend(found)

    if not quarterly_files:
        print(
            f"No quarterly files found for {ftype}. Skipping creation of MASTER_{ftype}.csv"
        )
        return None

    out_file = os.path.join(RAW_PATH, f"MASTER_{ftype}.csv")
    print(f"Found {len(quarterly_files)} files for {ftype}. Building {out_file} ...")

    # We'll stream-append to avoid memory pressure
    n_written = 0
    for i, fpath in enumerate(sorted(set(quarterly_files))):
        try:
            df_q = pd.read_csv(
                fpath, sep="$", dtype=str, on_bad_lines="skip", engine="python"
            )
            df_q.columns = df_q.columns.str.lower()
            # keep only columns that match master cols (case-insensitive)
            cols_lower = [c.lower() for c in master_cols]
            keep = [c for c in df_q.columns if c in cols_lower]
            df_q = df_q[keep].copy()

            # ensure all master cols exist (create missing)
            for mc in master_cols:
                if mc not in df_q.columns:
                    df_q[mc] = pd.NA

            # reorder and rename to canonical master names
            df_q = df_q[[c for c in master_cols]]
            # write/append
            if n_written == 0:
                df_q.to_csv(out_file, index=False, mode="w")
            else:
                df_q.to_csv(out_file, index=False, mode="a", header=False)
            n_written += len(df_q)
            print(f"  {os.path.basename(fpath)} -> {len(df_q)} rows")
        except Exception as e:
            print(f"  Error reading {fpath}: {e}")

    print(f"MASTER_{ftype}.csv saved: {out_file} ({n_written} rows)")
    return out_file


# Build each MASTER_* file (only if files exist)
for ft, cols in FILE_TYPES.items():
    try:
        process_quarterly_to_master(ft, cols)
    except Exception as e:
        print(f"Error processing {ft}: {e}")

print("MASTER file build step completed.")

Found 168 files for DEMO. Building D:\ML_Project\data\raw\MASTER_DEMO.csv ...
  DEMO15Q1.TXT -> 317071 rows
  DEMO15Q1.txt -> 317071 rows
  DEMO15Q2.TXT -> 289270 rows
  DEMO15Q2.txt -> 289270 rows
  DEMO15Q3.TXT -> 398860 rows
  DEMO15Q3.txt -> 398860 rows
  DEMO15Q4.TXT -> 314704 rows
  DEMO15Q4.txt -> 314704 rows
  DEMO16Q1.TXT -> 365682 rows
  DEMO16Q1.txt -> 365682 rows
  DEMO16Q2.TXT -> 316056 rows
  DEMO16Q2.txt -> 316056 rows
  DEMO16Q3.TXT -> 313613 rows
  DEMO16Q3.txt -> 313613 rows
  DEMO16Q4.TXT -> 309534 rows
  DEMO16Q4.txt -> 309534 rows
  DEMO17Q1.TXT -> 352913 rows
  DEMO17Q1.txt -> 352913 rows
  DEMO17Q2.TXT -> 337398 rows
  DEMO17Q2.txt -> 337398 rows
  DEMO17Q3.TXT -> 351388 rows
  DEMO17Q3.txt -> 351388 rows
  DEMO17Q4.TXT -> 327848 rows
  DEMO17Q4.txt -> 327848 rows
  DEMO18Q1.TXT -> 412702 rows
  DEMO18Q1.txt -> 412702 rows
  DEMO18Q2.TXT -> 457169 rows
  DEMO18Q2.txt -> 457169 rows
  DEMO18Q3.TXT -> 420915 rows
  DEMO18Q3.txt -> 420915 rows
  DEMO18Q4.TXT -> 3940

In [None]:
# Cell 3 - Deduplicate MASTER_DEMO.csv to UNIQUE_DEMO.csv (keep latest per case)
MASTER_DEMO_FILE = os.path.join(RAW_PATH, "MASTER_DEMO.csv")
if not os.path.exists(MASTER_DEMO_FILE):
    print(f"WARNING: {MASTER_DEMO_FILE} not found. Skipping deduplication step.")
else:
    print(
        "Loading MASTER_DEMO.csv for deduplication (may be large; this can take time)..."
    )
    df_demo = pd.read_csv(MASTER_DEMO_FILE, dtype=str, low_memory=False)
    # normalize date column
    if "fda_dt" in df_demo.columns:
        df_demo["fda_dt_parsed"] = pd.to_datetime(
            df_demo["fda_dt"], format="%Y%m%d", errors="coerce"
        )
    else:
        df_demo["fda_dt_parsed"] = pd.NaT

    # convert keys to numeric safe
    df_demo["primaryid"] = pd.to_numeric(
        df_demo.get("primaryid", pd.NA), errors="coerce"
    )
    df_demo["caseid"] = pd.to_numeric(df_demo.get("caseid", pd.NA), errors="coerce")

    # drop rows missing primaryid
    before = len(df_demo)
    df_demo = df_demo.dropna(subset=["primaryid"])
    print(f"Dropped {before - len(df_demo)} rows with missing primaryid")

    # sort so the first per caseid is the latest (by fda_dt then primaryid)
    if "caseid" in df_demo.columns:
        df_demo.sort_values(
            by=["caseid", "fda_dt_parsed", "primaryid"],
            ascending=[True, False, False],
            inplace=True,
        )
        df_unique = df_demo.drop_duplicates(subset=["caseid"], keep="first")
    else:
        # if no caseid present, fallback to primaryid uniqueness
        df_demo.sort_values(by=["primaryid"], ascending=True, inplace=True)
        df_unique = df_demo.drop_duplicates(subset=["primaryid"], keep="first")

    # Write unique demo
    df_unique.to_csv(UNIQUE_DEMO_FILE, index=False)
    print(f"Saved UNIQUE_DEMO.csv -> {UNIQUE_DEMO_FILE} ({len(df_unique):,} rows)")

Loading MASTER_DEMO.csv for deduplication (may be large; this can take time)...
Dropped 0 rows with missing primaryid
Saved UNIQUE_DEMO.csv -> D:\ML_Project\data\processed\UNIQUE_DEMO.csv (2,818,644 rows)


In [None]:
# Cell 4 - Define aggregation functions (OUTC, REAC, DRUG, INDI, THER)
MASTER_OUTC_FILE = os.path.join(RAW_PATH, "MASTER_OUTC.csv")
MASTER_REAC_FILE = os.path.join(RAW_PATH, "MASTER_REAC.csv")
MASTER_DRUG_FILE = os.path.join(RAW_PATH, "MASTER_DRUG.csv")
MASTER_INDI_FILE = os.path.join(RAW_PATH, "MASTER_INDI.csv")
MASTER_THER_FILE = os.path.join(RAW_PATH, "MASTER_THER.csv")

SEVERE_OUTCOMES = {"DE", "LT", "HO", "DS"}
INEFFICACY_TERMS = [
    "DRUG INEFFECTIVE",
    "LACK OF EFFECT",
    "THERAPEUTIC RESPONSE DECREASED",
    "CONDITION WORSENED",
]


def safe_read_csv(path, usecols=None):
    if not os.path.exists(path):
        print(f"File not found: {path} (returning empty df)")
        return pd.DataFrame()
    try:
        return pd.read_csv(path, dtype=str, usecols=usecols, low_memory=False)
    except Exception as e:
        print(f"Error reading {path}: {e}")
        return pd.DataFrame()


def process_outcomes(path):
    df = safe_read_csv(path, usecols=["primaryid", "outc_cod"])
    if df.empty:
        return pd.DataFrame()
    df = df.dropna(subset=["primaryid", "outc_cod"])
    df["primaryid"] = pd.to_numeric(df["primaryid"], errors="coerce")
    df = df.dropna(subset=["primaryid"])
    df["outc_cod"] = df["outc_cod"].astype(str).str.strip().str.upper()
    df["is_severe_outcome"] = df["outc_cod"].isin(SEVERE_OUTCOMES).astype(int)
    agg = (
        df.groupby(df["primaryid"].astype("Int64"))["is_severe_outcome"]
        .max()
        .to_frame("is_severe_outcome")
    )
    return agg


def process_reactions(path):
    df = safe_read_csv(path, usecols=["primaryid", "pt"])
    if df.empty:
        return pd.DataFrame()
    df = df.dropna(subset=["primaryid", "pt"])
    df["primaryid"] = pd.to_numeric(df["primaryid"], errors="coerce")
    df = df.dropna(subset=["primaryid"])
    # uppercase and sanitize PTs
    df["pt_upper"] = df["pt"].astype(str).str.upper().str.strip()
    # aggregate: join all PTs into a single document per primaryid
    agg_pts = (
        df.groupby(df["primaryid"].astype("Int64"))["pt_upper"]
        .apply(lambda vals: " ".join(vals))
        .to_frame("all_reaction_pts")
    )
    agg_pts["reaction_count"] = df.groupby(df["primaryid"].astype("Int64")).size()

    # is_ineffective: check if any inefficacy phrase appears in the aggregated text
    def detect_ineff(x):
        x_upper = x if isinstance(x, str) else ""
        for term in INEFFICACY_TERMS:
            if term in x_upper:
                return 1
        return 0

    agg_pts["is_ineffective"] = (
        agg_pts["all_reaction_pts"].apply(detect_ineff).astype(int)
    )
    return agg_pts


def process_drugs(path):
    df = safe_read_csv(path, usecols=["primaryid"])
    if df.empty:
        return pd.DataFrame()
    df["primaryid"] = pd.to_numeric(df["primaryid"], errors="coerce")
    df = df.dropna(subset=["primaryid"])
    agg = df.groupby(df["primaryid"].astype("Int64")).size().to_frame("drug_count")
    return agg


def process_indications(path):
    df = safe_read_csv(path, usecols=["primaryid"])
    if df.empty:
        return pd.DataFrame()
    df["primaryid"] = pd.to_numeric(df["primaryid"], errors="coerce")
    df = df.dropna(subset=["primaryid"])
    agg = (
        df.groupby(df["primaryid"].astype("Int64")).size().to_frame("indication_count")
    )
    return agg


def process_therapy(path):
    df = safe_read_csv(path, usecols=["primaryid", "dur", "dur_cod"])
    if df.empty:
        return pd.DataFrame()
    df = df.dropna(subset=["primaryid", "dur", "dur_cod"])
    df["primaryid"] = pd.to_numeric(df["primaryid"], errors="coerce")
    df = df.dropna(subset=["primaryid"])
    df["dur_num"] = pd.to_numeric(df["dur"], errors="coerce")
    df = df.dropna(subset=["dur_num"])
    dur_map = {"DY": 1, "WK": 7, "MO": 30.4375, "YR": 365.25}
    df["mult"] = df["dur_cod"].map(dur_map)
    df = df.dropna(subset=["mult"])
    df["duration_in_days"] = df["dur_num"] * df["mult"]
    agg = (
        df.groupby(df["primaryid"].astype("Int64"))["duration_in_days"]
        .max()
        .to_frame("therapy_duration_days")
    )
    return agg


print("Aggregation helper functions ready.")

Aggregation helper functions ready.


In [None]:
# Cell 5 - Build FINAL_MASTER_DATASET.csv by joining UNIQUE_DEMO + aggregated dfs

# Load UNIQUE_DEMO
if not os.path.exists(UNIQUE_DEMO_FILE):
    raise FileNotFoundError(
        f"UNIQUE_DEMO.csv not found at {UNIQUE_DEMO_FILE}. Run deduplication first."
    )
df_base = pd.read_csv(UNIQUE_DEMO_FILE, dtype=str, low_memory=False)
# ensure primaryid numeric index (nullable Int64)
df_base["primaryid"] = pd.to_numeric(df_base.get("primaryid"), errors="coerce").astype(
    "Int64"
)
df_base = df_base[~df_base["primaryid"].isna()].copy()
df_base.set_index("primaryid", inplace=True)

print("Loaded UNIQUE_DEMO: rows =", len(df_base))

# produce aggregates
print("Processing relational tables...")
t0 = time.time()
df_outc = process_outcomes(MASTER_OUTC_FILE)
df_reac = process_reactions(MASTER_REAC_FILE)
df_drug = process_drugs(MASTER_DRUG_FILE)
df_indi = process_indications(MASTER_INDI_FILE)
df_ther = process_therapy(MASTER_THER_FILE)
print("Aggregation done in", round(time.time() - t0, 2), "s")


# Ensure indexes are all nullable Int64 and drop NA indexes
def normalize_index(df):
    if df is None or df.empty:
        return df
    df.index = pd.to_numeric(df.index, errors="coerce").astype("Int64")
    df = df[~df.index.isna()]
    return df


dfs_to_join = [
    normalize_index(df_outc),
    normalize_index(df_drug),
    normalize_index(df_indi),
    normalize_index(df_reac),
    normalize_index(df_ther),
]

print("Joining aggregated tables to base demo (left join)...")
df_final = df_base.join(dfs_to_join, how="left")

# Fill missing aggregated values with defaults
for col in ["drug_count", "indication_count", "reaction_count"]:
    if col in df_final.columns:
        df_final[col] = df_final[col].fillna(0).astype(int)

for col in ["is_severe_outcome", "is_ineffective"]:
    if col in df_final.columns:
        # Some operations may have produced floats/NaNs — fill then cast
        df_final[col] = df_final[col].fillna(0).astype(int)

# Ensure reaction text exists (string)
if "all_reaction_pts" not in df_final.columns:
    df_final["all_reaction_pts"] = ""
else:
    df_final["all_reaction_pts"] = df_final["all_reaction_pts"].fillna("").astype(str)

# Create is_failure target: severe outcome OR inefficacy
df_final["is_failure"] = (
    (df_final.get("is_severe_outcome", 0) == 1)
    | (df_final.get("is_ineffective", 0) == 1)
).astype(int)

# Save final to CSV (will be large)
print("Saving FINAL_MASTER_DATASET to:", FINAL_MASTER_FILE)
df_final.to_csv(
    FINAL_MASTER_FILE, index=True
)  # keep primaryid as index in file (first column)
print("Saved. Rows:", len(df_final), "Cols:", len(df_final.columns))

# Print summary counts
print("\n--- Summary: Failure / Outcome counts ---")
if all(
    c in df_final.columns for c in ["is_failure", "is_severe_outcome", "is_ineffective"]
):
    print(
        df_final[["is_failure", "is_severe_outcome", "is_ineffective"]]
        .value_counts()
        .sort_index()
    )
else:
    print(
        "Some columns missing for summary. Available columns:",
        df_final.columns.tolist(),
    )

# quick head
print("\nSample rows (index=primaryid):")
display(df_final.head(5))

Loaded UNIQUE_DEMO: rows = 2818644
Processing relational tables...
Processing D:\ML_Project\data\raw\MASTER_DRUG.csv in chunks...
Processed 16,482,610 unique primaryid entries for drug_count
Aggregation done in 484.37 s
Joining aggregated tables to base demo (left join)...
Saving FINAL_MASTER_DATASET to: D:\ML_Project\data\processed\FINAL_MASTER_DATASET.csv
Saved. Rows: 2818644 Cols: 33

--- Summary: Failure / Outcome counts ---
is_failure  is_severe_outcome  is_ineffective
0           0                  0                 1817724
1           0                  1                  180490
            1                  0                  799917
                               1                   20513
Name: count, dtype: int64

Sample rows (index=primaryid):


Unnamed: 0_level_0,caseid,caseversion,i_f_code,event_dt,mfr_dt,init_fda_dt,fda_dt,rept_cod,auth_num,mfr_num,...,occr_country,fda_dt_parsed,is_severe_outcome,drug_count,indication_count,all_reaction_pts,reaction_count,is_ineffective,therapy_duration_days,is_failure
primaryid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
33538582,3353858,2,F,20081126,20160225,19990716,20160307,EXP,,125829,...,FR,2016-03-07,1,6,6,ESCHERICHIA SEPSIS ESCHERICHIA SEPSIS,2,0,,1
33568652,3356865,2,F,20080113,20150223,19990716,20160303,PER,,124841,...,FR,2016-03-03,1,10,6,CARDIAC FAILURE CARDIAC FAILURE,2,0,,1
33612122,3361212,2,F,20100329,20150223,19990716,20160303,PER,,125149,...,FR,2016-03-03,0,6,6,LEUKOPENIA LEUKOPENIA,2,0,,0
34795972,3479597,2,F,20071105,20151103,19990318,20160303,PER,,122128,...,FR,2016-03-03,0,4,4,BONE MARROW FAILURE BONE MARROW FAILURE,2,0,,0
34835022,3483502,2,F,20071206,20151023,19990318,20160303,PER,,122073,...,FR,2016-03-03,0,6,6,BONE MARROW FAILURE BONE MARROW FAILURE,2,0,,0


In [None]:
import pandas as pd

FINAL_MASTER = r"D:\ML_Project\data\processed\FINAL_MASTER_DATASET.csv"
df = pd.read_csv(FINAL_MASTER, usecols=["drug_count", "is_failure"], low_memory=False)

print("Rows:", len(df))
print("drug_count stats:")
print(df["drug_count"].describe())
print("\nFailures:")
print(df["is_failure"].value_counts())

Rows: 2818644
drug_count stats:
count    2.818644e+06
mean     6.839403e+00
std      9.454566e+00
min      2.000000e+00
25%      2.000000e+00
50%      4.000000e+00
75%      8.000000e+00
max      1.364000e+03
Name: drug_count, dtype: float64

Failures:
is_failure
0    1817724
1    1000920
Name: count, dtype: int64
