In [56]:
import mysql.connector
import pandas as pd
import json
import time

class DataExtractor:
    def __init__(self, config_path):
        with open(config_path, 'r') as f:
            self.db_config = json.load(f)  # load MySQL credentials

    def extract_patient_data(self, hospital_key):
        return self._extract_table(hospital_key, "patients")  # full table

    def extract_transaction_data(self, hospital_key, start_date=None, end_date=None):
        query = "SELECT * FROM transactions"
        if start_date and end_date:
            query += f" WHERE PaidDate BETWEEN '{start_date}' AND '{end_date}'"
        return self._extract_query(hospital_key, query)

    def _extract_table(self, hospital_key, table_name):
        query = f"SELECT * FROM {table_name}"
        return self._extract_query(hospital_key, query)

    def _extract_query(self, hospital_key, query):
        creds = self.db_config[hospital_key]
        try:
            start = time.time()
            conn = mysql.connector.connect(**creds)
            df = pd.read_sql(query, conn)  # read into pandas DataFrame
            conn.close()
            end = time.time()
            print(f"{hospital_key}: Extracted {len(df)} records in {end - start:.2f}s")
            return df
        except Exception as e:
            print(f" {hospital_key}: Extraction failed\nError: {e}")
            return pd.DataFrame()  # return empty DataFrame if error


In [57]:
extractor = DataExtractor("../config/db_config.json")

In [58]:
patients_a = extractor.extract_patient_data("hospital_a")

patients_a.info()
print(patients_a.head(2))

  df = pd.read_sql(query, conn)  # read into pandas DataFrame


hospital_a: Extracted 5000 records in 1.87s
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 10 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   PatientID     5000 non-null   object
 1   FirstName     5000 non-null   object
 2   LastName      5000 non-null   object
 3   MiddleName    5000 non-null   object
 4   SSN           5000 non-null   object
 5   PhoneNumber   5000 non-null   object
 6   Gender        5000 non-null   object
 7   DOB           5000 non-null   object
 8   Address       5000 non-null   object
 9   ModifiedDate  5000 non-null   object
dtypes: object(10)
memory usage: 390.8+ KB
      PatientID FirstName LastName MiddleName          SSN  \
0  HOSP1-000001      Rick    Russo          U  188-23-9828   
1  HOSP1-000002   Gregory   Graham          B  730-45-8217   

            PhoneNumber  Gender         DOB  \
0  +1-630-829-7585x0769  Female  1937-06-04   
1    456.746.7289x692

In [59]:
transactions_a = extractor.extract_transaction_data("hospital_a", start_date="2024-01-01", end_date="2024-12-31")
transactions_a.head()

  df = pd.read_sql(query, conn)  # read into pandas DataFrame


hospital_a: Extracted 10000 records in 1.19s


Unnamed: 0,TransactionID,EncounterID,PatientID,ProviderID,DeptID,VisitDate,ServiceDate,PaidDate,VisitType,Amount,...,PaidAmount,ClaimID,PayorID,ProcedureCode,ICDCode,LineOfBusiness,MedicaidID,MedicareID,InsertDate,ModifiedDate
0,TRANS000001,ENC001204,HOSP1-002372,PROV0456,DEPT002,2024-08-02,2024-05-25,2024-06-15,Routine,988.37,...,315.7,CLAIM533365,PAYOR4707,94521,I19.4,Commercial,MEDI24173,MCARE19466,2021-01-16,2021-12-27
1,TRANS000002,ENC000029,HOSP1-002329,PROV0321,DEPT013,2024-05-02,2024-09-14,2024-09-19,Emergency,291.26,...,667.99,CLAIM629724,PAYOR1481,51588,I54.0,Self-Pay,MEDI63110,MCARE97946,2021-02-06,2022-01-26
2,TRANS000003,ENC001088,HOSP1-004636,PROV0405,DEPT007,2024-07-25,2024-03-04,2024-06-13,Routine,91.51,...,595.56,CLAIM305176,PAYOR8415,32053,I35.3,Medicaid,MEDI83622,MCARE77469,2022-05-09,2021-09-22
3,TRANS000004,ENC004215,HOSP1-004064,PROV0463,DEPT019,2024-01-30,2024-02-05,2024-01-21,Follow-up,893.21,...,489.8,CLAIM987878,PAYOR5517,21422,I35.1,Commercial,MEDI89783,MCARE68786,2021-01-30,2023-02-20
4,TRANS000005,ENC006483,HOSP1-003625,PROV0167,DEPT011,2024-09-23,2024-04-11,2024-04-24,Emergency,729.37,...,305.11,CLAIM988945,PAYOR8174,39210,I81.1,Commercial,MEDI52037,MCARE92710,2024-08-11,2022-01-28


In [60]:
patients_b = extractor.extract_patient_data("hospital_b")
patients_b.head(2)

  df = pd.read_sql(query, conn)  # read into pandas DataFrame


hospital_b: Extracted 4985 records in 0.75s


Unnamed: 0,ID,F_Name,L_Name,M_Name,SSN,PhoneNumber,Gender,DOB,Address,ModifiedDate
0,HOSP1-000001,Victoria,Gamble,Q,318-87-5123,4902994299,Male,1994-12-24,"7912 Arthur Loaf Apt. 907, Julieville, AK 38866",2021-06-19
1,HOSP1-000002,Meghan,West,F,110-08-3049,(703)210-5078x2916,Female,2009-04-03,"70600 Destiny Grove Suite 946, Amberside, IL 2...",2024-07-06


In [61]:
transactions_b = extractor.extract_transaction_data("hospital_b")
transactions_b.head(2)

  df = pd.read_sql(query, conn)  # read into pandas DataFrame


hospital_b: Extracted 10000 records in 0.95s


Unnamed: 0,TransactionID,EncounterID,PatientID,ProviderID,DeptID,VisitDate,ServiceDate,PaidDate,VisitType,Amount,...,PaidAmount,ClaimID,PayorID,ProcedureCode,ICDCode,LineOfBusiness,MedicaidID,MedicareID,InsertDate,ModifiedDate
0,TRANS000001,ENC000512,HOSP1-002609,PROV0434,DEPT016,2024-08-31,2024-04-03,2024-02-06,Follow-up,616.87,...,234.57,CLAIM879364,PAYOR9066,93411,I95.4,Commercial,MEDI13625,MCARE38344,2021-10-01,2024-01-15
1,TRANS000002,ENC009236,HOSP1-004031,PROV0382,DEPT017,2024-04-10,2024-08-25,2024-02-01,Follow-up,680.42,...,445.87,CLAIM835197,PAYOR1013,86401,I39.5,Self-Pay,MEDI20311,MCARE53178,2020-03-20,2023-01-29


In [62]:
##  Task 2.2: Claims Data Extraction

In [63]:
import os


# Path to the claims CSVs
CLAIMS_DIR = "../../Datasets/claims" 

REQUIRED_COLUMNS = {
    "ClaimID", "PatientID", 
    "ClaimDate", "ProviderID", 
    "ClaimAmount", "ClaimStatus"
}

def read_claims_csvs(directory_path):
    all_claims = []
    files = [f for f in os.listdir(directory_path) if f.endswith(".csv")]
    
    print(f" Found {len(files)} claims CSV files")

    for file in files:
        file_path = os.path.join(directory_path, file)
        try:
            df = pd.read_csv(file_path)
            #print(df.head(2))
            df_columns = set(df.columns)
            
            # Validate required columns
            #print(df_columns)
            missing = REQUIRED_COLUMNS - df_columns
            #print(missing)
            if missing:
                print(f"⚠️ Skipping {file} due to missing columns: {missing}")
                continue

            # Keep only required columns (ignore extras)
            df = df[list(REQUIRED_COLUMNS)]
            df["source_file"] = file  # useful for tracing origin
            all_claims.append(df)
            print(f"Loaded {file} with {len(df)} records")

        except Exception as e:
            print(f"Error reading {file}: {e}")
    
    # Combine all claims into one DataFrame
    if all_claims:
        combined_df = pd.concat(all_claims, ignore_index=True)
        print(f"\n📊 Total combined claims: {len(combined_df)}")
        return combined_df
    else:
        print("No valid claims files found.")
        return pd.DataFrame()

claims_df = read_claims_csvs(CLAIMS_DIR)


 Found 2 claims CSV files
Loaded hospital1_claim_data.csv with 10000 records
Loaded hospital2_claim_data.csv with 10000 records

📊 Total combined claims: 20000


In [64]:

# Task 2.3: Data Source Integratio

In [98]:
# adding source

patients_a["source"] = "hospital_a"
patients_b["source"] = "hospital_b"

transactions_a["source"] = "hospital_a"
transactions_b["source"] = "hospital_b"

print(patients_a.columns)
print(patients_b.columns)
print(transactions_a.columns)
print(transactions_b.columns)


Index(['PatientID', 'FirstName', 'LastName', 'MiddleName', 'SSN',
       'PhoneNumber', 'Gender', 'DOB', 'Address', 'ModifiedDate', 'source',
       'unified_patient_id'],
      dtype='object')
Index(['ID', 'F_Name', 'L_Name', 'M_Name', 'SSN', 'PhoneNumber', 'Gender',
       'DOB', 'Address', 'ModifiedDate', 'source', 'unified_patient_id'],
      dtype='object')
Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDate', 'source', 'unified_patient_id'],
      dtype='object')
Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'Me

In [99]:
patients_b.rename(columns={'ID':"PatientID",'F_Name': 'FirstName', 'M_Name': 'MiddleName','L_Name':'LastName'}, inplace=True)



In [105]:
print(patients_a.columns)
print(patients_b.columns)
print(transactions_a.columns)
print(transactions_b.columns)

Index(['PatientID', 'FirstName', 'LastName', 'MiddleName', 'SSN',
       'PhoneNumber', 'Gender', 'DOB', 'Address', 'ModifiedDate', 'source'],
      dtype='object')
Index(['PatientID', 'FirstName', 'LastName', 'MiddleName', 'SSN',
       'PhoneNumber', 'Gender', 'DOB', 'Address', 'ModifiedDate', 'source'],
      dtype='object')
Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDate', 'source'],
      dtype='object')
Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDa

In [104]:

transactions_a.drop(columns=["unified_patient_id"], inplace=True)
transactions_b.drop(columns=["unified_patient_id"], inplace=True)

In [106]:
#Create unified patient ID across hospitals

patients_a["unified_patient_id"] = "HOSP_A_" + patients_a["PatientID"].astype(str)
patients_b["unified_patient_id"] = "HOSP_B_" + patients_b["PatientID"].astype(str)

patients_b["unified_patient_id"]

0       HOSP_B_HOSP1-000001
1       HOSP_B_HOSP1-000002
2       HOSP_B_HOSP1-000003
3       HOSP_B_HOSP1-000004
4       HOSP_B_HOSP1-000005
               ...         
4980    HOSP_B_HOSP1-004996
4981    HOSP_B_HOSP1-004997
4982    HOSP_B_HOSP1-004998
4983    HOSP_B_HOSP1-004999
4984    HOSP_B_HOSP1-005000
Name: unified_patient_id, Length: 4985, dtype: object

In [107]:
transactions_a = transactions_a.merge(
    patients_a[["PatientID", "unified_patient_id"]],
    on="PatientID",
    how="left"
)
transactions_a.columns


Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDate', 'source', 'unified_patient_id'],
      dtype='object')

In [108]:
print(transactions_b.columns)

patients_b.columns

Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDate', 'source', 'ID'],
      dtype='object')


Index(['PatientID', 'FirstName', 'LastName', 'MiddleName', 'SSN',
       'PhoneNumber', 'Gender', 'DOB', 'Address', 'ModifiedDate', 'source',
       'unified_patient_id'],
      dtype='object')

In [110]:
transactions_b = transactions_b.merge(
    patients_b[["PatientID", "unified_patient_id"]],
    on="PatientID",
    how="left"
)
transactions_b.columns

Index(['TransactionID', 'EncounterID', 'PatientID', 'ProviderID', 'DeptID',
       'VisitDate', 'ServiceDate', 'PaidDate', 'VisitType', 'Amount',
       'AmountType', 'PaidAmount', 'ClaimID', 'PayorID', 'ProcedureCode',
       'ICDCode', 'LineOfBusiness', 'MedicaidID', 'MedicareID', 'InsertDate',
       'ModifiedDate', 'source', 'ID', 'unified_patient_id'],
      dtype='object')

In [111]:
# combined patients

combined_patients = pd.concat([patients_a, patients_b], ignore_index=True)
combined_transactions = pd.concat([transactions_a, transactions_b], ignore_index=True)

In [113]:
print(f"Combined patients: {combined_patients.shape[0]} rows")
print(f"Combined transactions: {combined_transactions.shape[0]} rows")

print(combined_patients.head(3))
print(combined_transactions.head(3))


Combined patients: 9985 rows
Combined transactions: 20000 rows
      PatientID FirstName LastName MiddleName          SSN  \
0  HOSP1-000001      Rick    Russo          U  188-23-9828   
1  HOSP1-000002   Gregory   Graham          B  730-45-8217   
2  HOSP1-000003      Mary     Ryan          H  348-14-7947   

            PhoneNumber  Gender         DOB  \
0  +1-630-829-7585x0769  Female  1937-06-04   
1    456.746.7289x69233  Female  1937-06-10   
2          522-501-5461  Female  1926-08-09   

                                      Address ModifiedDate      source  \
0            Unit 0915 Box 7064, DPO AA 82777   2020-05-25  hospital_a   
1   9864 Gibson Islands, Danielside, KY 99809   2021-06-05  hospital_a   
2  6194 Joseph Turnpike, North Juan, OH 46800   2024-09-06  hospital_a   

    unified_patient_id  
0  HOSP_A_HOSP1-000001  
1  HOSP_A_HOSP1-000002  
2  HOSP_A_HOSP1-000003  
  TransactionID EncounterID     PatientID ProviderID   DeptID   VisitDate  \
0   TRANS000001   ENC0012

In [112]:
combined_patients.to_csv("../../datasets/combined_patients.csv", index=False)
combined_transactions.to_csv("../../datasets/combined_transactions.csv", index=False)