In [14]:
# Install or Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id 
import pandas as pd
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import sqlite3
import psycopg2
from dotenv import load_dotenv
from pyspark.sql.functions import col, to_date, to_timestamp
import os

In [15]:
## set jave home to avoid java running with the previous version
os.environ['JAVA_HOME'] = r'C:\JAVA8'

In [16]:
import os
os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1"

In [17]:
# initialize my spark seesion with allowed security
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GWIHR_PROJECT") \
    .config("spark.jars", "postgresql-42.7.5.jar") \
    .getOrCreate()
spark

In [18]:
# Extract this historical data into spark dataframe
df = spark.read.csv(r'raw_data\SI-RADET.csv', header=True, inferSchema=True)

df.show(5)

+------+----------+-----------+--------------------+---------------+--------------------+---------------+--------------+-----+---+--------------------+---------------------+----------------+------+-----------+-----------------+----------------+------------+--------------+-------------+--------------+-----------+--------------------+--------------+---------------+-------------------+---------------+-------------------+-----------+--------------------+--------------+---------------+-------------------+----+----------------+----+-----------------+----------------------+----------------------+-----------------------------+---------------------+-----------+-----------+------------+--------------------+-----------------+------------------+-----------------+--------------------+--------------------------+---------------------+--------------------+------------------+-----------------+-----------------------+-----------------------+---------------+----------------+-----------+--------------+---

### Data cleaning and Transformation

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DateType

# Initialize Spark Session
spark = SparkSession.builder.appName("DateConversion").getOrCreate()

# Define the list of date columns
date_columns = [
    "DateTransferredIn", "ARTStartDate", "LastPickupDate", "LastVisitDate", 
    "InitialCD4CountDate", "CurrentCD4CountDate", "LastEACDate", "PregnancyStatusDate", 
    "EDD", "LastDeliveryDate", "LMP", "ViralLoadEncounterDate", "ViralLoadSampleCollectionDate", 
    "ViralLoadReportedDate", "ResultDate", "AssayDate", "ApprovalDate", "PatientOutcomeDate", 
    "DateReturnedToCare", "DateOfTermination", "PharmacyNextAppointment", "ClinicalNextAppointment", 
    "DateOfBirth", "BiometricCaptureDate", "CurrentWeightDate", "TBStatusDate", "BaselineINHStartDate", 
    "BaselineINHStopDate", "CurrentINHStartDate", "CurrentINHOutcomeDate", "LastINHDispensedDate", 
    "BaselineTBTreatmentStartDate", "BaselineTBTreatmentStopDate", "LastViralLoadSampleCollectionFormDate", 
    "LastSampleTakenDate", "OTZEnrollmentDate", "OTZOutcomeDate", "EnrollmentDate", "InitialFirstLineRegimenDate", 
    "InitialSecondLineRegimenDate", "LastPickupDatePreviousQuarter", "PatientOutcomeDatePreviousQuarter", "RecaptureDate"
]

# Load Data (assuming df is already loaded)
# df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Convert necessary columns to DateType
for col_name in date_columns:
    df = df.withColumn(col_name, to_date(col(col_name), 'yyyy-MM-dd'))

# Show schema after transformation
df.printSchema()



root
 |-- State: string (nullable = true)
 |-- LGA: string (nullable = true)
 |-- DatimCode: string (nullable = true)
 |-- FacilityName: string (nullable = true)
 |-- PatientUniqueID: string (nullable = true)
 |-- PatientHospitalNo: string (nullable = true)
 |-- ANCNoIdentifier: string (nullable = true)
 |-- ANCNoConceptID: string (nullable = true)
 |-- HTSNo: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- AgeAtStartOfARTYears: integer (nullable = true)
 |-- AgeAtStartOfARTMonths: string (nullable = true)
 |-- CareEntryPoint: string (nullable = true)
 |-- KPType: string (nullable = true)
 |-- MonthsOnART: integer (nullable = true)
 |-- DateTransferredIn: date (nullable = true)
 |-- TransferInStatus: string (nullable = true)
 |-- ARTStartDate: date (nullable = true)
 |-- LastPickupDate: date (nullable = true)
 |-- LastVisitDate: date (nullable = true)
 |-- DaysOfARVRefil: integer (nullable = true)
 |-- PillBalance: integer (nullable = true)
 |-- InitialRegimenLine: str

In [8]:
# Data cleaning and Transformation(to check missing values and nulls in the column)
for column in df.columns:
    print(column,'Nulls:', df.filter(df[column].isNull()).count())

State Nulls: 0
LGA Nulls: 0
DatimCode Nulls: 0
FacilityName Nulls: 0
PatientUniqueID Nulls: 0
PatientHospitalNo Nulls: 73
ANCNoIdentifier Nulls: 22277
ANCNoConceptID Nulls: 22277
HTSNo Nulls: 8720
Sex Nulls: 0
AgeAtStartOfARTYears Nulls: 3
AgeAtStartOfARTMonths Nulls: 22277
CareEntryPoint Nulls: 2404
KPType Nulls: 2075
MonthsOnART Nulls: 3
DateTransferredIn Nulls: 22277
TransferInStatus Nulls: 21739
ARTStartDate Nulls: 22277
LastPickupDate Nulls: 22277
LastVisitDate Nulls: 22277
DaysOfARVRefil Nulls: 4
PillBalance Nulls: 4378
InitialRegimenLine Nulls: 3
InitialRegimen Nulls: 3
InitialCD4Count Nulls: 22062
InitialCD4CountDate Nulls: 22277
CurrentCD4Count Nulls: 22062
CurrentCD4CountDate Nulls: 22277
LastEACDate Nulls: 22277
CurrentRegimenLine Nulls: 3
CurrentRegimen Nulls: 3
PregnancyStatus Nulls: 6141
PregnancyStatusDate Nulls: 22277
EDD Nulls: 22277
LastDeliveryDate Nulls: 22277
LMP Nulls: 22277
GestationAgeWeeks Nulls: 22277
CurrentViralLoad(c/ml) Nulls: 1507
ViralLoadEncounterDate N

In [20]:
from pyspark.sql.functions import lit

# Fill numeric columns with 0
numeric_cols = ["PatientHospitalNo", "AgeAtStartOfARTMonths", "MonthsOnART", "DaysOfARVRefil", "PillBalance", "CurrentViralLoad(c/ml)", "CurrentWeight(Kg)", "QuantityOfARVDispensedLastVisit", "RecaptureCount"]
df_clean = df.fillna(0, subset=numeric_cols)

# Fill categorical columns with "Unknown"
categorical_cols = ["CareEntryPoint", "KPType", "PregnancyStatus", "DispensingModality", "FacilityDispensingModality", "DDDDispensingModality", "MMDType", "TBStatus", "CurrentINHOutcome", "PatientOutcome", "ARTStatusPreviousQuarter", "RegistrationPhoneNo", "NextofKinPhoneNo", "TreatmentSupporterPhoneNo", "MarkAsDeseased"]
df_clean= df.fillna("Unknown", subset=categorical_cols)

# Fill date columns with '1900-01-01'
date_cols = ["DateTransferredIn", "TransferInStatus", "ARTStartDate", "LastPickupDate", "PregnancyStatusDate", "EDD", "InitialCD4CountDate", "CurrentCD4CountDate", "ViralLoadEncounterDate", "ResultDate", "PatientOutcomeDate", "PharmacyNextAppointment", "ClinicalNextAppointment", "BaselineINHStartDate", "RecaptureDate"]
df_clean = df.fillna("1900-01-01", subset=date_cols)


In [21]:
# confirm that you have sorted missing values
for column in df_clean.columns:
    print(column,'Nulls:', df_clean.filter(df_clean[column].isNull()).count())

State Nulls: 0
LGA Nulls: 0
DatimCode Nulls: 0
FacilityName Nulls: 0
PatientUniqueID Nulls: 0
PatientHospitalNo Nulls: 73
ANCNoIdentifier Nulls: 22277
ANCNoConceptID Nulls: 22277
HTSNo Nulls: 8720
Sex Nulls: 0
AgeAtStartOfARTYears Nulls: 3
AgeAtStartOfARTMonths Nulls: 22277
CareEntryPoint Nulls: 2404
KPType Nulls: 2075
MonthsOnART Nulls: 3
DateTransferredIn Nulls: 22277
TransferInStatus Nulls: 0
ARTStartDate Nulls: 22277
LastPickupDate Nulls: 22277
LastVisitDate Nulls: 22277
DaysOfARVRefil Nulls: 4
PillBalance Nulls: 4378
InitialRegimenLine Nulls: 3
InitialRegimen Nulls: 3
InitialCD4Count Nulls: 22062
InitialCD4CountDate Nulls: 22277
CurrentCD4Count Nulls: 22062
CurrentCD4CountDate Nulls: 22277
LastEACDate Nulls: 22277
CurrentRegimenLine Nulls: 3
CurrentRegimen Nulls: 3
PregnancyStatus Nulls: 6141
PregnancyStatusDate Nulls: 22277
EDD Nulls: 22277
LastDeliveryDate Nulls: 22277
LMP Nulls: 22277
GestationAgeWeeks Nulls: 22277
CurrentViralLoad(c/ml) Nulls: 1507
ViralLoadEncounterDate Nulls

In [22]:
# rename columns for title clarity
df = df.withColumnRenamed("PatientUniqueID", "Patient_Unique_code") \
    .withColumnRenamed("PatientHospitalNo", "Patient_Hospital_No") \
    .withColumnRenamed("FacilityName", "Facility_Name") \
    .withColumnRenamed("HTSNo", "HTS_No") \
    .withColumnRenamed("AgeAtStartOfARTYears", "Age_At_Start_Of_ART_Years") \
    .withColumnRenamed("AgeAtStartOfARTMonths", "Age_At_Start_Of_ART_Months") \
    .withColumnRenamed("CurrentAgeYears", "Current_Age_Years") \
    .withColumnRenamed("CurrentAgeMonths", "Current_Age_Months") \
    .withColumnRenamed("CareEntryPoint", "Care_Entry_Point") \
    .withColumnRenamed("KPType", "KP_Type") \
    .withColumnRenamed("BiometricCaptured", "Biometric_Captured") \
    .withColumnRenamed("BiometricCaptureDate", "Biometric_Capture_Date") \
    .withColumnRenamed("ValidCapture", "Valid_Capture") \
    .withColumnRenamed("MarkAsDeseased", "Mark_As_Deceased") \
    .withColumnRenamed("MarkAsDeseasedDeathDate", "Mark_As_Deceased_Death_Date") \
    .withColumnRenamed("RegistrationPhoneNo", "Registration_Phone_No") \
    .withColumnRenamed("NextofKinPhoneNo", "Next_Of_Kin_Phone_No") \
    .withColumnRenamed("TreatmentSupporterPhoneNo", "Treatment_Supporter_Phone_No") \
    .withColumnRenamed("DatimCode", "Datim_Code") \
    .withColumnRenamed("FacilityName", "Facility_Name") \
    .withColumnRenamed("InitialRegimenLine", "Initial_Regimen_Line") \
    .withColumnRenamed("InitialRegimen", "Initial_Regimen") \
    .withColumnRenamed("InitialFirstLineRegimen", "Initial_FirstLine_Regimen") \
    .withColumnRenamed("InitialFirstLineRegimenDate", "Initial_FirstLine_Regimen_Date") \
    .withColumnRenamed("InitialSecondLineRegimen", "Initial_SecondLine_Regimen") \
    .withColumnRenamed("InitialSecondLineRegimenDate", "Initial_SecondLine_Regimen_Date") \
    .withColumnRenamed("CurrentRegimenLine", "Current_Regimen_Line") \
    .withColumnRenamed("CurrentRegimen", "Current_Regimen") \
    .withColumnRenamed("MonthsOnART", "Months_On_ART") \
    .withColumnRenamed("ARTStartDate", "ART_Start_Date") \
    .withColumnRenamed("CurrentARTStatus", "Current_ART_Status") \
    .withColumnRenamed("DispensingModality", "Dispensing_Modality")\
    .withColumnRenamed("FacilityDispensingModality", "Facility_Dispensing_Modality")\
    .withColumnRenamed("DDDDispensingModality", "DDD_Dispensing_Modality")\
    .withColumnRenamed("MMDType", "MMD_Type")\
    .withColumnRenamed("CurrentARTStatusWithPillBalance", "Current_ART_Status_With_PillBalance") \
    .withColumnRenamed("CurrentViralLoad(c/ml)", "Current_Viral_Load(cml)") \
    .withColumnRenamed("ViralLoadEncounterDate", "Viral_Load_Encounter_Date") \
    .withColumnRenamed("ViralLoadSampleCollectionDate", "Viral_Load_Sample_Collection_Date") \
    .withColumnRenamed("ViralLoadReportedDate", "Viral_Load_Reported_Date") \
    .withColumnRenamed("ResultDate", "Result_Date") \
    .withColumnRenamed("AssayDate", "Assay_Date") \
    .withColumnRenamed("ApprovalDate", "Approval_Date") \
    .withColumnRenamed("ViralLoadIndication", "Viral_Load_Indication") \
    .withColumnRenamed("LastViralLoadSampleCollectionFormDate", "Last_Viral_Load_Sample_CollectionForm_Date") \
    .withColumnRenamed("LastSampleTakenDate", "Last_Sample_Taken_Date") \
    .withColumnRenamed("PregnancyStatus", "Pregnancy_Status") \
    .withColumnRenamed("PregnancyStatusDate", "Pregnancy_Status_Date") \
    .withColumnRenamed("EDD", "EDD_(Estimated Delivery Date)") \
    .withColumnRenamed("LastDeliveryDate", "Last_Delivery_Date") \
    .withColumnRenamed("GestationAgeWeeks", "Gestation_Age_Weeks") \
    .withColumnRenamed("LastPickupDate", "Last_Pickup_Date") \
    .withColumnRenamed("LastVisitDate", "Last_Visit_Date") \
    .withColumnRenamed("DaysOfARVRefil", "Days_Of_ARV_Refill") \
    .withColumnRenamed("PillBalance", "Pill_Balance") \
    .withColumnRenamed("PharmacyNextAppointment", "Pharmacy_Next_Appointment") \
    .withColumnRenamed("ClinicalNextAppointment", "Clinical_Next_Appointment") \
    .withColumnRenamed("LastEACDate", "Last_EAC_Date") \
    .withColumnRenamed("OTZEnrollmentDate", "OTZ_Enrollment_Date") \
    .withColumnRenamed("OTZOutcomeDate", "OTZ_Outcome_Date") \
    .withColumnRenamed("EnrollmentDate", "Enrollment_Date") \
    .withColumnRenamed("DateTransferredIn", "Date_Transferred_In") \
    .withColumnRenamed("TransferInStatus", "Transfer_In_Status") \
    .withColumnRenamed("DateReturnedToCare", "Date_Returned_To_Care") \
    .withColumnRenamed("DateOfTermination", "Date_Of_Termination") \
    .withColumnRenamed("RecaptureDate", "Recapture_Date") \
    .withColumnRenamed("RecaptureCount", "Recapture_Count") \
    .withColumnRenamed("CurrentWeight(Kg)", "Current_Weight(Kg)") \
    .withColumnRenamed("CurrentWeightDate", "Current_Weight_Date") \
    .withColumnRenamed("TBStatus", "TB_Status") \
    .withColumnRenamed("TBStatusDate", "TB_Status_Date") \
    .withColumnRenamed("PatientOutcome", "Patient_Outcome") \
    .withColumnRenamed("PatientOutcomeDate", "Patient_Outcome_Date") \
    .withColumnRenamed("BaselineINHStartDate", "Baseline_INH_StartDate") \
    .withColumnRenamed("BaselineINHStopDate", "Baseline_INH_StopDate") \
    .withColumnRenamed("CurrentINHStartDate", "Current_INH_StartDate") \
    .withColumnRenamed("CurrentINHOutcome", "Current_INH_Outcome") \
    .withColumnRenamed("CurrentINHOutcomeDate", "Current_INH_Outcome_Date") \
    .withColumnRenamed("LastINHDispensedDate", "Last_INH_Dispensed_Date") \
    .withColumnRenamed("BaselineTBTreatmentStartDate", "Baseline_TB_Treatment_StartDate") \
    .withColumnRenamed("BaselineTBTreatmentStopDate", "Baseline_TB_Treatment_StopDate") \
    .withColumnRenamed("LastViralLoadSampleCollectionFormDate", "Last_Viral_Load_Sample_CollectionForm_Date") \
    .withColumnRenamed("LastSampleTakenDate", "Last_Sample_Taken_Date") \
    .withColumnRenamed("OTZEnrollmentDate", "OTZ_Enrollment_Date") \
    .withColumnRenamed("OTZOutcomeDate", "OTZ_Outcome_Date") \
    .withColumnRenamed("EnrollmentDate", "Enrollment_Date") \
    .withColumnRenamed("InitialFirstLineRegimen", "Initial_FirstLine_Regimen") \
    .withColumnRenamed("InitialFirstLineRegimenDate", "Initial_FirstLine_Regimen_Date") \
    .withColumnRenamed("InitialSecondLineRegimen", "Initial_SecondLine_Regimen") \
    .withColumnRenamed("InitialSecondLineRegimenDate", "Initial_SecondLine_Regimen_Date") \
    .withColumnRenamed("LastPickupDatePreviousQuarter", "Last_Pickup_Date_Previous_Quarter") \
    .withColumnRenamed("DrugDurationPreviousQuarter", "Drug_Duration_Previous_Quarter") \
    .withColumnRenamed("PatientOutcomePreviousQuarter", "Patient_Outcome_Previous_Quarter") \
    .withColumnRenamed("PatientOutcomeDatePreviousQuarter", "Patient_Outcome_Date_Previous_Quarter") \
    .withColumnRenamed("ARTStatusPreviousQuarter", "ART_Status_Previous_Quarter") \
    .withColumnRenamed("QuantityOfARVDispensedLastVisit", "Quantity_Of_ARV_Dispensed_LastVisit") \
    .withColumnRenamed("FrequencyOfARVDispensedLastVisit", "Frequency_Of_ARV_Dispensed_LastVisit") \
    .withColumnRenamed("CurrentARTStatusWithPillBalance", "Current_ART_Status_With_Pill_Balance") \
    .withColumnRenamed("RecaptureDate", "Recapture_Date") \
    .withColumnRenamed("RecaptureCount", "Recapture_Count")


In [23]:
# copy dataframe to avoid altering the original dataset
df_clean = df.select("*")


In [24]:
df_clean.printSchema()

root
 |-- State: string (nullable = true)
 |-- LGA: string (nullable = true)
 |-- Datim_Code: string (nullable = true)
 |-- Facility_Name: string (nullable = true)
 |-- Patient_Unique_code: string (nullable = true)
 |-- Patient_Hospital_No: string (nullable = true)
 |-- ANCNoIdentifier: string (nullable = true)
 |-- ANCNoConceptID: string (nullable = true)
 |-- HTS_No: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age_At_Start_Of_ART_Years: integer (nullable = true)
 |-- Age_At_Start_Of_ART_Months: string (nullable = true)
 |-- Care_Entry_Point: string (nullable = true)
 |-- KP_Type: string (nullable = true)
 |-- Months_On_ART: integer (nullable = true)
 |-- Date_Transferred_In: date (nullable = true)
 |-- Transfer_In_Status: string (nullable = true)
 |-- ART_Start_Date: date (nullable = true)
 |-- Last_Pickup_Date: date (nullable = true)
 |-- Last_Visit_Date: date (nullable = true)
 |-- Days_Of_ARV_Refill: integer (nullable = true)
 |-- Pill_Balance: integer (nullab

### Transformation to 2NF

In [25]:
Patient_Dim_Table = df_clean.select(
    'Patient_Unique_code', 'Patient_Hospital_No', 'Sex', 'HTS_No',
    'Age_At_Start_Of_ART_Years', 'Age_At_Start_Of_ART_Months', 'Current_Age_Years', 
    'Current_Age_Months', 'Care_Entry_Point', 'KP_Type', 'Biometric_Captured', 
    'Biometric_Capture_Date', 'Valid_Capture', 'Mark_As_Deceased', 
    'Mark_As_Deceased_Death_Date', 'Registration_Phone_No', 'Next_Of_Kin_Phone_No', 
    'Treatment_Supporter_Phone_No'
).distinct() \
    .withColumn('Patient_ID', monotonically_increasing_id()) \
    .select(
        'Patient_ID', 'Patient_Unique_code', 'Patient_Hospital_No', 'Sex', 'HTS_No',
        'Age_At_Start_Of_ART_Years', 'Age_At_Start_Of_ART_Months', 'Current_Age_Years', 
        'Current_Age_Months', 'Care_Entry_Point', 'KP_Type', 'Biometric_Captured', 
        'Biometric_Capture_Date', 'Valid_Capture', 'Mark_As_Deceased', 
        'Mark_As_Deceased_Death_Date', 'Registration_Phone_No', 'Next_Of_Kin_Phone_No', 
        'Treatment_Supporter_Phone_No'
    )

#Patient_Dim_Table.show(5)

In [26]:
# For the Dim_Location table
Location_Dim_Table = df_clean.select(
    'State', 'LGA', 'Datim_Code', 'Facility_Name'
).distinct() \
    .withColumn('Location_ID', monotonically_increasing_id()) \
    .select(
        'Location_ID', 'State', 'LGA', 'Datim_Code', 'Facility_Name'
    )

# Show the first 5 rows
#Location_Dim_Table.show(5)

In [27]:
# For the Dim_ART_Regimen table
ART_Regimen_Dim_Table = df_clean.select(
    'Initial_Regimen_Line', 'Initial_Regimen', 'Initial_FirstLine_Regimen', 
    'Initial_FirstLine_Regimen_Date', 'Initial_SecondLine_Regimen', 
    'Initial_SecondLine_Regimen_Date', 'Current_Regimen_Line', 'Current_Regimen', 
    'Months_On_ART', 'ART_Start_Date', 'Current_ART_Status', 
    'Current_ART_Status_With_PillBalance'
).distinct() \
    .withColumn('Regimen_ID', monotonically_increasing_id()) \
    .select(
        'Regimen_ID', 'Initial_Regimen_Line', 'Initial_Regimen', 'Initial_FirstLine_Regimen', 
        'Initial_FirstLine_Regimen_Date', 'Initial_SecondLine_Regimen', 
        'Initial_SecondLine_Regimen_Date', 'Current_Regimen_Line', 'Current_Regimen', 
        'Months_On_ART', 'ART_Start_Date', 'Current_ART_Status', 
        'Current_ART_Status_With_PillBalance'
    )

# Show the first 5 rows
#ART_Regimen_Dim_Table.show(5)

In [28]:
ViralLoad_Dim_Table = df_clean.select(
    'Current_Viral_Load(cml)', 'Viral_Load_Encounter_Date', 'Viral_Load_Sample_Collection_Date', 
    'Viral_Load_Reported_Date', 'Result_Date', 'Assay_Date', 'Approval_Date', 'Viral_Load_Indication', 
    'Last_Viral_Load_Sample_CollectionForm_Date', 'Last_Sample_Taken_Date'
).distinct() \
    .withColumn('ViralLoad_ID', monotonically_increasing_id()) \
    .select(
        'ViralLoad_ID', 'Current_Viral_Load(cml)', 'Viral_Load_Encounter_Date', 'Viral_Load_Sample_Collection_Date', 
        'Viral_Load_Reported_Date', 'Result_Date', 'Assay_Date', 'Approval_Date', 'Viral_Load_Indication', 
        'Last_Viral_Load_Sample_CollectionForm_Date', 'Last_Sample_Taken_Date'
    )

#ViralLoad_Dim_Table.show(5)


In [29]:
Pregnancy_Dim_Table = df_clean.select(
    'Pregnancy_Status', 'Pregnancy_Status_Date', 'EDD_(Estimated Delivery Date)', 'Last_Delivery_Date', 'LMP', 'Gestation_Age_Weeks'
).distinct() \
    .withColumn('Pregnancy_ID', monotonically_increasing_id()) \
    .select(
        'Pregnancy_ID', 'Pregnancy_Status', 'Pregnancy_Status_Date', 'EDD_(Estimated Delivery Date)', 'Last_Delivery_Date', 
        'LMP', 'Gestation_Age_Weeks'
    )

#Pregnancy_Dim_Table.show(5)


In [30]:
TB_INH_Dim_Table = df_clean.select(
    'TB_Status', 'TB_Status_Date', 'Baseline_INH_StartDate', 'Baseline_INH_StopDate', 'Current_INH_StartDate', 
    'Current_INH_Outcome', 'Current_INH_Outcome_Date', 'Last_INH_Dispensed_Date', 'Baseline_TB_Treatment_StartDate', 
    'Baseline_TB_Treatment_StopDate'
).distinct() \
    .withColumn('TB_ID', monotonically_increasing_id()) \
    .select(
        'TB_ID', 'TB_Status', 'TB_Status_Date', 'Baseline_INH_StartDate', 'Baseline_INH_StopDate', 
        'Current_INH_StartDate', 'Current_INH_Outcome', 'Current_INH_Outcome_Date', 'Last_INH_Dispensed_Date', 
        'Baseline_TB_Treatment_StartDate', 'Baseline_TB_Treatment_StopDate'
    )

#TB_INH_Dim_Table.show(5)


In [31]:
Clinical_Visits_Dim_Table = df_clean.select(
    'Last_Pickup_Date', 'Last_Visit_Date', 'Days_Of_ARV_Refill', 'Pill_Balance', 'Pharmacy_Next_Appointment', 
    'Clinical_Next_Appointment', 'Last_EAC_Date', 'OTZ_Enrollment_Date', 'OTZ_Outcome_Date', 'Enrollment_Date', 
    'Date_Transferred_In', 'Transfer_In_Status', 'Date_Returned_To_Care', 'Date_Of_Termination', 'Recapture_Date', 
    'Recapture_Count', 'Quantity_Of_ARV_Dispensed_LastVisit', 'Frequency_Of_ARV_Dispensed_LastVisit'
).distinct() \
    .withColumn('Visit_ID', monotonically_increasing_id()) \
    .select(
        'Visit_ID', 'Last_Pickup_Date', 'Last_Visit_Date', 'Days_Of_ARV_Refill', 'Pill_Balance', 
        'Pharmacy_Next_Appointment', 'Clinical_Next_Appointment', 'Last_EAC_Date', 'OTZ_Enrollment_Date', 
        'OTZ_Outcome_Date', 'Enrollment_Date', 'Date_Transferred_In', 'Transfer_In_Status', 
        'Date_Returned_To_Care', 'Date_Of_Termination', 'Recapture_Date', 'Recapture_Count',
        'Quantity_Of_ARV_Dispensed_LastVisit', 'Frequency_Of_ARV_Dispensed_LastVisit'
    )

#Clinical_Visits_Dim_Table.show(5)


In [32]:
Weight_Dim_Table = df_clean.select(
    'Current_Weight(Kg)', 'Current_Weight_Date'
).distinct() \
    .withColumn('Weight_ID', monotonically_increasing_id()) \
    .select('Weight_ID', 'Current_Weight(Kg)', 'Current_Weight_Date')

#Weight_Dim_Table .show(5)


In [34]:
Care_Modality_Dim_Table = df_clean.select(
    'Dispensing_Modality', 'Facility_Dispensing_Modality', 'DDD_Dispensing_Modality', 'MMD_Type'
).distinct() \
    .withColumn('CareModality_ID', monotonically_increasing_id()) \
    .select('CareModality_ID', 'Dispensing_Modality', 'Facility_Dispensing_Modality', 
            'DDD_Dispensing_Modality', 'MMD_Type')

#Care_Modality_Dim_Table.show(5)


In [35]:
Fact_Care_Table = df_clean.alias('fact') \
    .join(Patient_Dim_Table.alias('patient'), ['Patient_Unique_code'], 'inner') \
    .join(Location_Dim_Table.alias('location'), ['State', 'LGA', 'Datim_Code', 'Facility_Name'], 'inner') \
    .join(ART_Regimen_Dim_Table.alias('regimen'), ['Initial_Regimen_Line', 'Initial_Regimen', 'Current_Regimen_Line', 'Current_Regimen'], 'inner') \
    .join(ViralLoad_Dim_Table.alias('viral_load'), ['Current_Viral_Load(cml)', 'Viral_Load_Encounter_Date'], 'inner') \
    .join(Pregnancy_Dim_Table.alias('pregnancy'), ['Pregnancy_Status', 'Pregnancy_Status_Date'], 'inner') \
    .join(TB_INH_Dim_Table.alias('tb'), ['TB_Status', 'TB_Status_Date'], 'inner') \
    .join(Clinical_Visits_Dim_Table.alias('visit'), ['Last_Pickup_Date', 'Last_Visit_Date'], 'inner') \
    .join(Weight_Dim_Table.alias('weight'), ['Current_Weight(Kg)', 'Current_Weight_Date'], 'inner') \
    .join(Care_Modality_Dim_Table.alias('care_modality'), ['Dispensing_Modality', 'Facility_Dispensing_Modality'], 'inner')

Fact_Care_Table = Fact_Care_Table \
    .select(
        'patient.Patient_ID', 'location.Location_ID', 'regimen.Regimen_ID', 'viral_load.ViralLoad_ID', 
        'pregnancy.Pregnancy_ID', 'tb.TB_ID', 'visit.Visit_ID', 'weight.Weight_ID', 'care_modality.CareModality_ID', 
        'regimen.ART_Start_Date', 'visit.Last_Pickup_Date', 'visit.Last_Visit_Date', 
        'regimen.Current_ART_Status', 'regimen.Current_ART_Status_With_PillBalance', 
        'visit.Quantity_Of_ARV_Dispensed_LastVisit', 'visit.Frequency_Of_ARV_Dispensed_LastVisit', 
        'fact.Drug_Duration_Previous_Quarter', 'fact.Patient_Outcome', 'fact.Patient_Outcome_Date',
        'fact.Patient_Outcome_Previous_Quarter', 'fact.Patient_Outcome_Date_Previous_Quarter', 
        'ART_Status_Previous_Quarter', 'visit.Recapture_Date', 'visit.Recapture_Count'
    ) \
    .withColumn('Fact_ID', monotonically_increasing_id())
