In [6]:
# etl/etl_csv_diabetes.py
import pandas as pd
from config import pg_engine

# 1. Read CSV
df = pd.read_csv(r"C:\Users\Admin\Documents\GitHub\Healthcare-data-warehouse\source_data\diabetic_data.csv")

# 2. Basic cleaning & missing handling
# -----------------------------------
# Convert "?" to proper missing values
df.replace("?", None, inplace=True)

# Drop rows that are unusable for the warehouse:
#  - no encounter_id (event)
#  - no patient_nbr (cannot link to patient dimension)
df = df.dropna(subset=["encounter_id", "patient_nbr"])

# Optionally drop obvious duplicates based on encounter_id
df = df.drop_duplicates(subset=["encounter_id"])

# (Optional) ensure numeric types where appropriate
numeric_cols = [
    "time_in_hospital",
    "num_lab_procedures",
    "num_procedures",
    "num_medications",
    "number_outpatient",
    "number_emergency",
    "number_inpatient",
    "number_diagnoses",
]
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors="coerce")

# 3. Derive flags and age_group
df["readmitted_raw"] = df["readmitted"]
df["readmitted_30d_flag"] = df["readmitted"].apply(lambda x: x == "<30")
df["age_group"] = df["age"]   # can be remapped later if needed

# 4. Build dim_patient
dim_patient = (
    df[["patient_nbr", "race", "gender", "age_group", "payer_code"]]
    .drop_duplicates(subset=["patient_nbr"])
    .copy()
)
dim_patient["source_system"] = "CSV_diabetes"

# 5. Build dim_admission
dim_admission = (
    df[["admission_type_id", "discharge_disposition_id", "admission_source_id"]]
    .drop_duplicates()
    .copy()
)
dim_admission.rename(
    columns={
        "admission_type_id": "admission_type",
        "discharge_disposition_id": "discharge_disposition",
        "admission_source_id": "admission_source",
    },
    inplace=True,
)
dim_admission["source_system"] = "CSV_diabetes"

# 6. Build dim_diagnosis
diag_long = (
    pd.melt(
        df[["encounter_id", "diag_1", "diag_2", "diag_3"]],
        id_vars=["encounter_id"],
        value_vars=["diag_1", "diag_2", "diag_3"],
        var_name="diag_position",
        value_name="diagnosis_code",
    )
    .dropna(subset=["diagnosis_code"])
    .drop_duplicates(subset=["diagnosis_code"])
)
diag_long["icd_category"] = diag_long["diagnosis_code"].str.slice(0, 3)
dim_diagnosis = diag_long[["diagnosis_code", "icd_category"]].drop_duplicates()
dim_diagnosis["source_system"] = "CSV_diabetes"

# 7. Load dimensions
with pg_engine.begin() as conn:
    dim_patient.to_sql("dim_patient", con=conn, if_exists="append", index=False)
    dim_admission.to_sql("dim_admission", con=conn, if_exists="append", index=False)
    dim_diagnosis.to_sql("dim_diagnosis", con=conn, if_exists="append", index=False)

# 8. Re-read dimensions with keys to build fact
with pg_engine.connect() as conn:
    dim_patient_db = pd.read_sql("SELECT * FROM dim_patient", conn)
    dim_admission_db = pd.read_sql("SELECT * FROM dim_admission", conn)
    dim_diag_db = pd.read_sql("SELECT * FROM dim_diagnosis", conn)

patient_key_map = dim_patient_db.set_index("patient_nbr")["patient_key"].to_dict()
admission_key_map = (
    dim_admission_db
    .set_index(["admission_type", "discharge_disposition", "admission_source"])["admission_dim_key"]
    .to_dict()
)
diag_key_map = dim_diag_db.set_index("diagnosis_code")["diagnosis_key"].to_dict()

df["patient_key"] = df["patient_nbr"].map(patient_key_map)
df["admission_dim_key"] = df.apply(
    lambda row: admission_key_map.get(
        (row["admission_type_id"],
         row["discharge_disposition_id"],
         row["admission_source_id"])
    ),
    axis=1,
)
df["primary_diagnosis_key"] = df["diag_1"].map(diag_key_map)
df["secondary_diagnosis_key"] = df["diag_2"].map(diag_key_map)
df["tertiary_diagnosis_key"] = df["diag_3"].map(diag_key_map)

fact_cols = [
    "encounter_id",
    "patient_key",
    "admission_dim_key",
    "primary_diagnosis_key",
    "secondary_diagnosis_key",
    "tertiary_diagnosis_key",
    "time_in_hospital",
    "num_lab_procedures",
    "num_procedures",
    "num_medications",
    "number_outpatient",
    "number_emergency",
    "number_inpatient",
    "number_diagnoses",
    "readmitted_raw",
    "readmitted_30d_flag",
    "change",
    "diabetesMed",
]

fact_df = df[fact_cols].copy()

# 9. Drop fact rows that can't join to core dimensions
fact_df = fact_df.dropna(subset=["patient_key", "admission_dim_key"])

fact_df["source_system"] = "CSV_diabetes"

with pg_engine.begin() as conn:
    fact_df.to_sql(
        "fact_hospital_admission_parted",
        con=conn,
        if_exists="append",
        index=False,
    )


IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dim_patient_patient_nbr_key"
DETAIL:  Key (patient_nbr)=(8222157) already exists.

[SQL: INSERT INTO dim_patient (patient_nbr, race, gender, age_group, payer_code, source_system) VALUES (%(patient_nbr__0)s, %(race__0)s, %(gender__0)s, %(age_group__0)s, %(payer_code__0)s, %(source_system__0)s), (%(patient_nbr__1)s, %(race__1)s, %(gender__ ... 120085 characters truncated ... )s, %(race__999)s, %(gender__999)s, %(age_group__999)s, %(payer_code__999)s, %(source_system__999)s)]
[parameters: {'age_group__0': '[0-10)', 'source_system__0': 'CSV_diabetes', 'gender__0': 'Female', 'payer_code__0': None, 'race__0': 'Caucasian', 'patient_nbr__0': 8222157, 'age_group__1': '[10-20)', 'source_system__1': 'CSV_diabetes', 'gender__1': 'Female', 'payer_code__1': None, 'race__1': 'Caucasian', 'patient_nbr__1': 55629189, 'age_group__2': '[20-30)', 'source_system__2': 'CSV_diabetes', 'gender__2': 'Female', 'payer_code__2': None, 'race__2': 'AfricanAmerican', 'patient_nbr__2': 86047875, 'age_group__3': '[30-40)', 'source_system__3': 'CSV_diabetes', 'gender__3': 'Male', 'payer_code__3': None, 'race__3': 'Caucasian', 'patient_nbr__3': 82442376, 'age_group__4': '[40-50)', 'source_system__4': 'CSV_diabetes', 'gender__4': 'Male', 'payer_code__4': None, 'race__4': 'Caucasian', 'patient_nbr__4': 42519267, 'age_group__5': '[50-60)', 'source_system__5': 'CSV_diabetes', 'gender__5': 'Male', 'payer_code__5': None, 'race__5': 'Caucasian', 'patient_nbr__5': 82637451, 'age_group__6': '[60-70)', 'source_system__6': 'CSV_diabetes', 'gender__6': 'Male', 'payer_code__6': None, 'race__6': 'Caucasian', 'patient_nbr__6': 84259809, 'age_group__7': '[70-80)', 'source_system__7': 'CSV_diabetes', 'gender__7': 'Male', 'payer_code__7': None, 'race__7': 'Caucasian', 'patient_nbr__7': 114882984, 'age_group__8': '[80-90)', 'source_system__8': 'CSV_diabetes' ... 5900 parameters truncated ... 'race__991': 'AfricanAmerican', 'patient_nbr__991': 51672654, 'age_group__992': '[50-60)', 'source_system__992': 'CSV_diabetes', 'gender__992': 'Male', 'payer_code__992': None, 'race__992': 'AfricanAmerican', 'patient_nbr__992': 46993104, 'age_group__993': '[60-70)', 'source_system__993': 'CSV_diabetes', 'gender__993': 'Female', 'payer_code__993': None, 'race__993': 'Caucasian', 'patient_nbr__993': 1734912, 'age_group__994': '[50-60)', 'source_system__994': 'CSV_diabetes', 'gender__994': 'Female', 'payer_code__994': None, 'race__994': 'AfricanAmerican', 'patient_nbr__994': 469692, 'age_group__995': '[70-80)', 'source_system__995': 'CSV_diabetes', 'gender__995': 'Female', 'payer_code__995': None, 'race__995': 'Caucasian', 'patient_nbr__995': 820035, 'age_group__996': '[90-100)', 'source_system__996': 'CSV_diabetes', 'gender__996': 'Female', 'payer_code__996': None, 'race__996': 'Caucasian', 'patient_nbr__996': 634293, 'age_group__997': '[90-100)', 'source_system__997': 'CSV_diabetes', 'gender__997': 'Female', 'payer_code__997': None, 'race__997': 'Caucasian', 'patient_nbr__997': 75593475, 'age_group__998': '[40-50)', 'source_system__998': 'CSV_diabetes', 'gender__998': 'Female', 'payer_code__998': None, 'race__998': 'Caucasian', 'patient_nbr__998': 2490849, 'age_group__999': '[10-20)', 'source_system__999': 'CSV_diabetes', 'gender__999': 'Female', 'payer_code__999': None, 'race__999': 'Caucasian', 'patient_nbr__999': 106622244}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)