## Spark Session

In [1]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark=(SparkSession
        .builder
        .appName("SparkApp")
        .config("spark.jars", "/home/daman/Downloads/postgresql-42.7.5.jar")
        .getOrCreate())


25/05/18 17:17:08 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/05/18 17:17:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/05/18 17:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.read.csv("/home/daman/Downloads/healthcaredata/healthcare_dataset.csv",header=True).show()

+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|               Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|          Doctor|            Hospital|Insurance Provider|    Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|      Bobby JacksOn| 30|  Male|        B-|           Cancer|       2024-01-31|   Matthew Smith|     Sons and Miller|        Blue Cross|18856.281305978155|        328|        Urgent|    2024-02-02|Paracetamol|      Normal|
|       LesLie TErRy| 62|  Male|        A+|          Obesity|       2019-08-20| Samantha Davies|            

In [3]:
import pandas as pd
df=pd.read_csv("/home/daman/Downloads/healthcaredata/healthcare_dataset.csv")
df['Date of Admission'] = pd.to_datetime(df['Date of Admission'], errors='coerce')

df_2019 = df[df['Date of Admission'].dt.year == 2019]
print(df_2019.head())

               Name  Age  Gender Blood Type Medical Condition  \
1      LesLie TErRy   62    Male         A+           Obesity   
12    connOR HANsEn   75  Female         A+          Diabetes   
30  ThOMAS MartInEZ   34    Male         B-            Asthma   
31  JAmES pattERson   23  Female         A+         Arthritis   
36    DEnIse ToRRES   33    Male        AB+          Diabetes   

   Date of Admission            Doctor                   Hospital  \
1         2019-08-20   Samantha Davies                    Kim Inc   
12        2019-12-12  Kenneth Fletcher  Powers Miller, and Flores   
30        2019-08-18       Jacob Huynh                   Hart Ltd   
31        2019-11-03  Kristina Frazier              Cruz-Santiago   
36        2019-10-14       Laura Myers                 LLC Martin   

   Insurance Provider  Billing Amount  Room Number Admission Type  \
1            Medicare    33643.327287          265      Emergency   
12              Cigna    43282.283358          134      

## Breakdown by year

In [4]:

for year in df['Date of Admission'].dt.year.unique():
    df[df['Date of Admission'].dt.year==year].to_csv(f"healthcare_{year}.csv",index=False)

In [5]:
df_2019 = spark.read.csv("healthcare_2019.csv", header=True, inferSchema=True)
df_2019.show()

+--------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|                Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|          Doctor|            Hospital|Insurance Provider|    Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+--------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|        LesLie TErRy| 62|  Male|        A+|          Obesity|       2019-08-20| Samantha Davies|             Kim Inc|          Medicare|33643.327286577885|        265|     Emergency|    2019-08-26|  Ibuprofen|Inconclusive|
|       connOR HANsEn| 75|Female|        A+|         Diabetes|       2019-12-12|Kenneth Fletcher|Powers 

## Reading Incrementally by Year using spark

In [6]:
df=spark.read.csv("/home/daman/Downloads/healthcaredata/healthcare_dataset.csv",header=True)
df.show()

+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|               Name|Age|Gender|Blood Type|Medical Condition|Date of Admission|          Doctor|            Hospital|Insurance Provider|    Billing Amount|Room Number|Admission Type|Discharge Date| Medication|Test Results|
+-------------------+---+------+----------+-----------------+-----------------+----------------+--------------------+------------------+------------------+-----------+--------------+--------------+-----------+------------+
|      Bobby JacksOn| 30|  Male|        B-|           Cancer|       2024-01-31|   Matthew Smith|     Sons and Miller|        Blue Cross|18856.281305978155|        328|        Urgent|    2024-02-02|Paracetamol|      Normal|
|       LesLie TErRy| 62|  Male|        A+|          Obesity|       2019-08-20| Samantha Davies|            

In [7]:
years = [2019,2020,2021,2022,2023,2024]
df_all= None
for year in years:
    df_year=spark.read.csv(f"healthcare_{year}.csv",header=True,inferSchema=True)
    if df_all is None:
        df_all=df_year
    else:
        df_all=df_all.union(df_year)    
df_all = df_all.dropDuplicates()
df_all.count()

                                                                                

54966

## Star Schema

In [8]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [9]:
from pyspark.sql.types import DateType
df_all=df_all.withColumn("Discharge Date",F.col("Discharge Date").cast(DateType()))
df_all = df_all.withColumn("Date of Admission", F.col("Date of Admission").cast(DateType()))

### Dimension Table


In [10]:
df_all.toPandas()

                                                                                

Unnamed: 0,Name,Age,Gender,Blood Type,Medical Condition,Date of Admission,Doctor,Hospital,Insurance Provider,Billing Amount,Room Number,Admission Type,Discharge Date,Medication,Test Results
0,BReNdaN paRKer,71,Female,A+,Arthritis,2019-09-21,Matthew Taylor,Bailey-White,Cigna,4437.956350,406,Emergency,2019-10-20,Penicillin,Normal
1,tAmAra toDd,70,Female,O-,Obesity,2019-09-02,Johnny Reed,Bradford-Andrews,Cigna,23792.250162,376,Emergency,2019-09-14,Aspirin,Normal
2,BReNDA MILLER,74,Male,A-,Diabetes,2019-09-11,Shannon Silva,Roberts Ltd,Blue Cross,38354.776948,406,Urgent,2019-09-12,Aspirin,Abnormal
3,briana tRevIno,39,Female,AB+,Obesity,2019-06-04,Taylor Williams,"Merritt Frazier, and Mullins",Blue Cross,23101.536165,491,Emergency,2019-06-24,Paracetamol,Normal
4,MoniCA JoNes,58,Male,B-,Cancer,2019-05-11,Jennifer Newton,Group Moon,Aetna,23035.366863,444,Urgent,2019-05-28,Penicillin,Normal
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
54961,AnDrew halL,64,Male,A+,Hypertension,2024-04-04,Matthew Harris,Inc Smith,Aetna,45352.436271,430,Urgent,2024-04-29,Lipitor,Normal
54962,aAroN woNg,33,Female,AB-,Asthma,2024-02-11,Mark Gallagher,Barrett LLC,Medicare,19214.948080,159,Urgent,2024-02-15,Lipitor,Abnormal
54963,TeRRy harVey,44,Male,B-,Cancer,2024-05-04,Phillip Davis,"Mcdonald Hall and Johnson,",Medicare,5587.319006,284,Urgent,2024-05-16,Aspirin,Normal
54964,kAREN grIffIN,53,Female,O-,Asthma,2024-01-06,Tracy Bernard,Andrade LLC,UnitedHealthcare,25908.370587,179,Emergency,2024-01-17,Aspirin,Inconclusive


In [11]:

patient = df_all.select('Name', 'Age', 'Gender', 'Blood Type').dropDuplicates()
# patient.count()
patient = patient.withColumn("PatientID", F.row_number().over(Window.orderBy(F.lit(1))))

# patient.toPandas().head(50)
# patient.distinct().count()


In [12]:
doctor=df_all.select("Doctor").dropDuplicates()
# doctor.distinct().count()
doctor=doctor.withColumn("DoctorID",F.row_number().over(Window.orderBy(F.lit(1))))

# doctor.show()

In [13]:
hospital=df_all.select('Hospital').dropDuplicates()
hospital=hospital.withColumn('HospitalID',F.row_number().over(Window.orderBy(F.lit(1))))
hospital.show()

25/05/18 17:17:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 1

+--------------------+----------+
|            Hospital|HospitalID|
+--------------------+----------+
|           Smith PLC|         1|
|Holmes Reed and J...|         2|
|Castro and Smith,...|         3|
|and Bailey Ramos,...|         4|
|      Holmes-Griffin|         5|
|Turner, and Wrigh...|         6|
|          Group Park|         7|
|        Calderon LLC|         8|
|and Watts Terry, ...|         9|
|Pineda Werner, Mc...|        10|
|          Fox-Garcia|        11|
|       Dixon-Jenkins|        12|
|     Sons and Cooper|        13|
|Brown, and Weaver...|        14|
|          Obrien PLC|        15|
|     Hernandez-Smith|        16|
|        Aguilar-Bass|        17|
|          Holmes LLC|        18|
|       Franco-Sawyer|        19|
|Stewart, and Will...|        20|
+--------------------+----------+
only showing top 20 rows



In [14]:
insurance=df_all.select('Insurance Provider').dropDuplicates()
insurance=insurance.withColumn('InsuranceID',F.row_number().over(Window.orderBy(F.lit(1))))
insurance.show()

25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------+-----------+
|Insurance Provider|InsuranceID|
+------------------+-----------+
|             Aetna|          1|
|        Blue Cross|          2|
|          Medicare|          3|
|             Cigna|          4|
|  UnitedHealthcare|          5|
+------------------+-----------+



25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [15]:
medication=df_all.select('Medication').dropDuplicates()
medication=medication.withColumn('MedicationID',F.row_number().over(Window.orderBy(F.lit(1))))
medication.show()

25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------+------------+
| Medication|MedicationID|
+-----------+------------+
|  Ibuprofen|           1|
| Penicillin|           2|
|Paracetamol|           3|
|    Aspirin|           4|
|    Lipitor|           5|
+-----------+------------+



25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [16]:
from pyspark.sql.functions import year, month,dayofmonth
date=df_all.select('Date of Admission').dropDuplicates()
date=date.withColumn('DateID',F.row_number().over(Window.orderBy(F.lit(1))))
date=date.withColumn('Year',year(F.col('Date of Admission')))\
         .withColumn('Month',month(F.col('Date of Admission')))\
         .withColumn('Day',dayofmonth(F.col('Date of Admission')))           
date.show()

25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------------+------+----+-----+---+
|Date of Admission|DateID|Year|Month|Day|
+-----------------+------+----+-----+---+
|       2019-06-04|     1|2019|    6|  4|
|       2019-05-08|     2|2019|    5|  8|
|       2019-11-18|     3|2019|   11| 18|
|       2019-09-22|     4|2019|    9| 22|
|       2019-11-01|     5|2019|   11|  1|
|       2019-11-21|     6|2019|   11| 21|
|       2019-05-27|     7|2019|    5| 27|
|       2019-10-05|     8|2019|   10|  5|
|       2019-07-30|     9|2019|    7| 30|
|       2019-07-28|    10|2019|    7| 28|
|       2019-05-14|    11|2019|    5| 14|
|       2019-07-08|    12|2019|    7|  8|
|       2019-10-24|    13|2019|   10| 24|
|       2019-12-28|    14|2019|   12| 28|
|       2019-06-24|    15|2019|    6| 24|
|       2019-12-10|    16|2019|   12| 10|
|       2019-08-05|    17|2019|    8|  5|
|       2019-12-18|    18|2019|   12| 18|
|       2019-07-19|    19|2019|    7| 19|
|       2019-05-10|    20|2019|    5| 10|
+-----------------+------+----+---

25/05/18 17:17:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### Fact_Table

In [17]:
fact_table = df_all \
    .join(patient, on=['Name', 'Age', 'Gender', 'Blood Type'], how='inner') \
    .join(doctor, on='Doctor', how='inner') \
    .join(hospital, on='Hospital', how='inner') \
    .join(insurance, on='Insurance Provider', how='inner') \
    .join(medication, on='Medication', how='inner') \
    .join(date, on='Date of Admission', how='inner')
# fact_table = fact_table.dropDuplicates()

# fact_table.count()
fact_admission = fact_table.select(
    "PatientID", "DoctorID", "HospitalID", "InsuranceID", "DateID", "MedicationID",
    F.col("Billing Amount").alias("BillingAmount"),
    F.col("Room Number").alias("RoomNumber"),
    F.col("Admission Type").alias("AdmissionType"),
    F.col("Test Results").alias("TestResult")
)

fact_admission = fact_admission.withColumn("AdmissionID", F.row_number().over( Window.orderBy("PatientID")))
fact_admission.show()

25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:39 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 1

+---------+--------+----------+-----------+------+------------+------------------+----------+-------------+------------+-----------+
|PatientID|DoctorID|HospitalID|InsuranceID|DateID|MedicationID|     BillingAmount|RoomNumber|AdmissionType|  TestResult|AdmissionID|
+---------+--------+----------+-----------+------+------------+------------------+----------+-------------+------------+-----------+
|        1|    3106|      2907|          4|    36|           4|3879.0846018133934|       290|     Elective|Inconclusive|          1|
|        2|     282|      3854|          1|   216|           4|15689.567860525334|       490|       Urgent|    Abnormal|          2|
|        3|    2344|      3182|          3|   207|           3|18832.768495910397|       458|     Elective|Inconclusive|          3|
|        4|    2895|      1416|          1|   127|           3| 24605.20585914686|       242|       Urgent|Inconclusive|          4|
|        5|    2004|      3704|          4|   151|           3| 44630

## Loading in Database


In [18]:
pg_url = "jdbc:postgresql://localhost:5432/healthcare_db"
pg_properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}


In [19]:
# Dimension Tables
patient.write.jdbc(url=pg_url, table="patient", mode="overwrite", properties=pg_properties)
doctor.write.jdbc(url=pg_url, table="doctor", mode="overwrite", properties=pg_properties)
hospital.write.jdbc(url=pg_url, table="hospital", mode="overwrite", properties=pg_properties)
insurance.write.jdbc(url=pg_url, table="insurance", mode="overwrite", properties=pg_properties)
medication.write.jdbc(url=pg_url, table="medication", mode="overwrite", properties=pg_properties)
date.write.jdbc(url=pg_url, table="date", mode="overwrite", properties=pg_properties)

# Fact Table
fact_admission.write.jdbc(url=pg_url, table="fact_admission", mode="overwrite", properties=pg_properties)


25/05/18 17:17:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 17:17:44 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/05/18 1

In [20]:
# fact_admission.groupBy("PatientID").count().orderBy("count", ascending=False).show()


## Log Script

In [21]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType


In [22]:
log_schema = StructType([
    StructField("FileName", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("RowCount", IntegerType(), True),
    StructField("IngestionTime", TimestampType(), True),
    StructField("Status", StringType(), True),
    StructField("ErrorMessage", StringType(), True)
])
log_rows = []

for year in years:
    file_path = f"healthcare_{year}.csv"
    try:
        df_year = spark.read.csv(file_path, header=True, inferSchema=True)
        row_count = df_year.count()

        if df_all is None:
            df_all = df_year
        else:
            df_all = df_all.union(df_year)

        
        log_rows.append((
            file_path,
            year,
            row_count,
            datetime.now(),
            "SUCCESS",
            None
        ))

    except Exception as e:
        log_rows.append((
            file_path,
            year,
            0,
            datetime.now(),
            "FAILURE",
            str(e)
        ))



###  Creating Dataframe and saving in db 

In [23]:
if log_rows:
    log_df = spark.createDataFrame(log_rows, schema=log_schema)
    log_df.write.jdbc(url=pg_url, table="ingestion_log", mode="append", properties=pg_properties)


                                                                                