Importation of libraries

In [1]:
import pandas as pd 
from datetime import datetime

In [2]:
patients_cols = ['patient_id','first_name','last_name','date_of_birth','gender','mrn']
specialties_cols = ['specialty_id','specialty_name','specialty_code']
departments_cols = ['department_id','department_name','floor','capacity']
providers_cols = ['provider_id','first_name','last_name','credential','specialty_id','department_id']
encounters_cols = ['encounter_id','patient_id','provider_id','encounter_type','encounter_date','discharge_date','department_id']
diagnoses_cols = ['diagnosis_id','icd10_code','icd10_description']
enc_diag_cols = ['encounter_diagnosis_id','encounter_id','diagnosis_id','diagnosis_sequence']
procedures_cols = ['procedure_id','cpt_code','cpt_description']
enc_proc_cols = ['encounter_procedure_id','encounter_id','procedure_id','procedure_date']
billing_cols = ['billing_id','encounter_id','claim_amount','allowed_amount','claim_date','claim_status']

In [3]:
OLAP_SCHEMA = {
    "dim_date": [
        "date_key", "calendar_date", "year", "month", "quarter"
    ],
    "dim_patient": [
        "patient_key", "patient_id", "full_name", "gender", "age_group", "mrn"
    ],
    "dim_specialty": [
        "specialty_key", "specialty_id", "specialty_name"
    ],
    "dim_department": [
        "department_key", "department_id", "department_name"
    ],
    "dim_encounter_type": [
        "encounter_type_key", "type_name"
    ],
    "dim_diagnosis": [
        "diagnosis_key", "icd10_code", "description"
    ],
    "dim_procedure": [
        "procedure_key", "cpt_code", "description"
    ],
    "fact_encounters": [
        "encounter_key", "date_key", "patient_key",
        "specialty_key", "department_key", "encounter_type_key",
        "encounter_count", "total_allowed_amount", "total_claim_amount",
        "diagnosis_count", "procedure_count", "length_of_stay"
    ],
    "bridge_encounter_diagnoses": [
        "encounter_key", "diagnosis_key"
    ],
    "bridge_encounter_procedures": [
        "encounter_key", "procedure_key"
    ]
}


In [4]:
def validate_schema(df, table_name):
    expected = OLAP_SCHEMA[table_name]
    actual = list(df.columns)

    print(f"Table: {table_name}")
    print("Expected:", expected)
    print("Actual:  ", actual)

    if expected == actual:
        print("Schema MATCHES")
    else:
        print("Schema MISMATCH")

        missing = [c for c in expected if c not in actual]
        extra = [c for c in actual if c not in expected]
        wrong_order = expected != actual

        if missing:
            print("Missing columns:", missing)
        if extra:
            print("Extra columns:", extra)
        if wrong_order:
            print("  Column order mismatch")


In [5]:
def load_csv(path, columns):
    return pd.read_csv(path, header=None, names=columns)


def export_csv(df, path, name):
    df.to_csv(path + name, index=False, header=False, sep=',', na_rep='\\N')


In [6]:
def create_dim_date(encounters_df):
    encounters_df['encounter_date'] = pd.to_datetime(encounters_df['encounter_date'])
    encounters_df['discharge_date'] = pd.to_datetime(encounters_df['discharge_date'])

    all_dates = pd.concat([
        encounters_df['encounter_date'],
        encounters_df['discharge_date']
    ]).dropna().unique()

    dim_date_df = pd.DataFrame({
        'calendar_date': pd.to_datetime(all_dates)
    })

    dim_date_df['calendar_date'] = dim_date_df['calendar_date'].dt.date
    dim_date_df = dim_date_df.drop_duplicates(subset=['calendar_date'])
    dim_date_df['calendar_date'] = pd.to_datetime(dim_date_df['calendar_date'])

    dim_date_df['date_key'] = dim_date_df['calendar_date'].dt.strftime('%Y%m%d').astype(int)
    dim_date_df['year'] = dim_date_df['calendar_date'].dt.year
    dim_date_df['month'] = dim_date_df['calendar_date'].dt.month
    dim_date_df['quarter'] = dim_date_df['calendar_date'].dt.quarter

    dim_date_df = dim_date_df.sort_values('calendar_date').reset_index(drop=True)
    return dim_date_df[
        ['date_key', 'calendar_date', 'year', 'month', 'quarter']
    ]


In [7]:
def create_dim_patient(patients_df):
    df = patients_df.copy()
    df['full_name'] = df['last_name'] + ' ' + df['first_name']

    current_year = datetime.now().year
    df['age'] = current_year - pd.to_datetime(df['date_of_birth']).dt.year

    df['age_group'] = pd.cut(
        df['age'],
        bins=[0,18,30,45,60,75,200],
        labels=['0-17','18-30','31-45','46-60','61-75','76+']
    )

    df = df.sort_values('patient_id').reset_index(drop=True)
    df['patient_key'] = df.index + 1

    return df[['patient_key','patient_id','full_name','gender','age_group','mrn']]


def create_dim_specialty(specialties_df):
    df = specialties_df.sort_values('specialty_id').reset_index(drop=True)
    df['specialty_key'] = df.index + 1
    return df[['specialty_key','specialty_id','specialty_name']]


def create_dim_department(departments_df):
    df = departments_df.sort_values('department_id').reset_index(drop=True)
    df['department_key'] = df.index + 1
    return df[['department_key','department_id','department_name']]


def create_dim_encounter_type(encounters_df):
    df = encounters_df[['encounter_type']].drop_duplicates()
    df = df.sort_values('encounter_type').reset_index(drop=True)
    df['encounter_type_key'] = df.index + 1
    df = df.rename(columns={'encounter_type':'type_name'})
    return df[['encounter_type_key','type_name']]


def create_dim_diagnosis(diagnoses_df):
    df = diagnoses_df.rename(columns={'icd10_description':'description'})
    df = df.sort_values('diagnosis_id').reset_index(drop=True)
    df['diagnosis_key'] = df.index + 1
    return df[['diagnosis_key','diagnosis_id','icd10_code','description']]


def create_dim_procedure(procedures_df):
    df = procedures_df.rename(columns={'cpt_description':'description'})
    df = df.sort_values('procedure_id').reset_index(drop=True)
    df['procedure_key'] = df.index + 1
    return df[['procedure_key','procedure_id','cpt_code','description']]

In [8]:
def create_fact_encounters(encounters_df, dim_date_df, dim_patient_df,
                           dim_specialty_df, dim_department_df,
                           dim_enc_type_df, providers_df,
                           billing_df, enc_diag_df, enc_proc_df):

    df = encounters_df.copy()
    df['encounter_date_only'] = pd.to_datetime(df['encounter_date']).dt.date

    # Date
    dim_date_lookup = dim_date_df.copy()
    dim_date_lookup['calendar_date_only'] = pd.to_datetime(dim_date_lookup['calendar_date']).dt.date
    df = df.merge(dim_date_lookup[['calendar_date_only','date_key']],
                  left_on='encounter_date_only',
                  right_on='calendar_date_only',
                  how='left')

    # Patient
    df = df.merge(dim_patient_df[['patient_id','patient_key']], on='patient_id', how='left')

    # Provider → Specialty
    df = df.merge(providers_df[['provider_id','specialty_id']], on='provider_id', how='left')
    df = df.merge(dim_specialty_df[['specialty_id','specialty_key']], on='specialty_id', how='left')

    # Department
    df = df.merge(dim_department_df[['department_id','department_key']], on='department_id', how='left')

    # Encounter type
    df = df.merge(dim_enc_type_df,
                  left_on='encounter_type',
                  right_on='type_name',
                  how='left')

    # Measures
    df['encounter_count'] = 1
    df['total_allowed_amount'] = df['encounter_id'].map(
        billing_df.set_index('encounter_id')['allowed_amount']
    ).fillna(0)

    df['total_claim_amount'] = df['encounter_id'].map(
        billing_df.set_index('encounter_id')['claim_amount']
    ).fillna(0)

    df['diagnosis_count'] = df['encounter_id'].map(
        enc_diag_df.groupby('encounter_id').size()
    ).fillna(0).astype(int)

    df['procedure_count'] = df['encounter_id'].map(
        enc_proc_df.groupby('encounter_id').size()
    ).fillna(0).astype(int)

    df['length_of_stay'] = (
        pd.to_datetime(df['discharge_date']) -
        pd.to_datetime(df['encounter_date'])
    ).dt.days.fillna(0).astype(int)

    df = df.sort_values('encounter_id').reset_index(drop=True)
    df['encounter_key'] = df.index + 1

    return df[['encounter_key','date_key','patient_key','specialty_key',
               'department_key','encounter_type_key',
               'encounter_count','total_allowed_amount','total_claim_amount',
               'diagnosis_count','procedure_count','length_of_stay','encounter_id']]

Transform for OLAP

In [9]:
def create_bridge(enc_diag_df, enc_proc_df,
                  fact_enc_df, dim_diagnosis_df, dim_procedure_df):

    # Diagnosis bridge
    diag_map = dim_diagnosis_df[['diagnosis_id','diagnosis_key']]
    bridge_diag = enc_diag_df[['encounter_id','diagnosis_id']] \
        .merge(diag_map, on='diagnosis_id', how='left') \
        .merge(fact_enc_df[['encounter_id','encounter_key']], on='encounter_id', how='left')

    bridge_diag = bridge_diag[['encounter_key','diagnosis_key']]

    # Procedure bridge
    proc_map = dim_procedure_df[['procedure_id','procedure_key']]
    bridge_proc = enc_proc_df[['encounter_id','procedure_id']] \
        .merge(proc_map, on='procedure_id', how='left') \
        .merge(fact_enc_df[['encounter_id','encounter_key']], on='encounter_id', how='left')

    bridge_proc = bridge_proc[['encounter_key','procedure_key']]

    return bridge_diag, bridge_proc



In [10]:
def main():
    base_path = '../../data/oltp/'
    output_path = '../../data/olap/'

    # -------------------------
    # Load OLTP CSVs
    # -------------------------
    patients_df = load_csv(base_path + 'patients.csv', patients_cols)
    specialties_df = load_csv(base_path + 'specialties.csv', specialties_cols)
    departments_df = load_csv(base_path + 'departments.csv', departments_cols)
    providers_df = load_csv(base_path + 'providers.csv', providers_cols)
    encounters_df = load_csv(base_path + 'encounters.csv', encounters_cols)
    diagnoses_df = load_csv(base_path + 'diagnoses.csv', diagnoses_cols)
    enc_diag_df = load_csv(base_path + 'encounter_diagnoses.csv', enc_diag_cols)
    procedures_df = load_csv(base_path + 'procedures.csv', procedures_cols)
    enc_proc_df = load_csv(base_path + 'encounter_procedures.csv', enc_proc_cols)
    billing_df = load_csv(base_path + 'billing.csv', billing_cols)

    # -------------------------
    # Build Dimensions
    # -------------------------
    dim_date = create_dim_date(encounters_df)
    dim_patient = create_dim_patient(patients_df)
    dim_specialty = create_dim_specialty(specialties_df)
    dim_department = create_dim_department(departments_df)
    dim_enc_type = create_dim_encounter_type(encounters_df)
    dim_diagnosis = create_dim_diagnosis(diagnoses_df)
    dim_procedure = create_dim_procedure(procedures_df)

    # -------------------------
    # Build Fact Table
    # -------------------------
    fact_enc = create_fact_encounters(
        encounters_df,
        dim_date,
        dim_patient,
        dim_specialty,
        dim_department,
        dim_enc_type,
        providers_df,
        billing_df,
        enc_diag_df,
        enc_proc_df
    )

    # -------------------------
    # Build Bridge Tables
    # -------------------------
    bridge_diag, bridge_proc = create_bridge(
        enc_diag_df,
        enc_proc_df,
        fact_enc,
        dim_diagnosis,
        dim_procedure
    )

    # -------------------------
    # Schema Validation (NO EXPORTS YET)
    # -------------------------
    validate_schema(dim_date, "dim_date")
    validate_schema(dim_patient, "dim_patient")
    validate_schema(dim_specialty, "dim_specialty")
    validate_schema(dim_department, "dim_department")
    validate_schema(dim_enc_type, "dim_encounter_type")
    validate_schema(
        dim_diagnosis[["diagnosis_key", "icd10_code", "description"]],
        "dim_diagnosis"
    )
    validate_schema(
        dim_procedure[["procedure_key", "cpt_code", "description"]],
        "dim_procedure"
    )
    validate_schema(
        fact_enc.drop(columns=["encounter_id"]),
        "fact_encounters"
    )
    validate_schema(bridge_diag, "bridge_encounter_diagnoses")
    validate_schema(bridge_proc, "bridge_encounter_procedures")
    
    export_csv(dim_date, output_path, 'dim_date.csv')
    export_csv(dim_patient, output_path, 'dim_patient.csv')
    export_csv(dim_specialty, output_path, 'dim_specialty.csv')
    export_csv(dim_department, output_path, 'dim_department.csv')
    export_csv(dim_enc_type, output_path, 'dim_encounter_type.csv')
    export_csv(dim_diagnosis, output_path, 'dim_diagnosis.csv')
    export_csv(dim_procedure, output_path, 'dim_procedure.csv')
    export_csv(fact_enc.drop(columns=['encounter_id']),output_path,'fact_encounters.csv')
    export_csv(bridge_diag, output_path, 'bridge_encounter_diagnoses.csv')
    export_csv(bridge_proc, output_path, 'bridge_encounter_procedures.csv')
    
    print("Schema validation complete — exports are currently disabled")


if __name__ == "__main__":
    main()


Table: dim_date
Expected: ['date_key', 'calendar_date', 'year', 'month', 'quarter']
Actual:   ['date_key', 'calendar_date', 'year', 'month', 'quarter']
Schema MATCHES
Table: dim_patient
Expected: ['patient_key', 'patient_id', 'full_name', 'gender', 'age_group', 'mrn']
Actual:   ['patient_key', 'patient_id', 'full_name', 'gender', 'age_group', 'mrn']
Schema MATCHES
Table: dim_specialty
Expected: ['specialty_key', 'specialty_id', 'specialty_name']
Actual:   ['specialty_key', 'specialty_id', 'specialty_name']
Schema MATCHES
Table: dim_department
Expected: ['department_key', 'department_id', 'department_name']
Actual:   ['department_key', 'department_id', 'department_name']
Schema MATCHES
Table: dim_encounter_type
Expected: ['encounter_type_key', 'type_name']
Actual:   ['encounter_type_key', 'type_name']
Schema MATCHES
Table: dim_diagnosis
Expected: ['diagnosis_key', 'icd10_code', 'description']
Actual:   ['diagnosis_key', 'icd10_code', 'description']
Schema MATCHES
Table: dim_procedure
Ex