In [2]:
import findspark
import pyspark
import pandas as pd
import boto3
import os
from dotenv import load_dotenv

In [3]:
findspark.init()

In [4]:
# Performing necessary imports required for the project
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Healthcare_ETL")\
    .getOrCreate()  

In [6]:
# Function for defining Struct type for different datasets
def define_schemas():    

    # Schema for Encounters Dataset
    encounters_schema = StructType([
        StructField("id", StringType(), True),
        StructField("start", TimestampType(), True),
        StructField("stop", TimestampType(), True),
        StructField("patient", StringType(), True),
        StructField("organization", StringType(), True),
        StructField("provider", StringType(), True),
        StructField("payer", StringType(), True),
        StructField("encounterclass", StringType(), True),
        StructField("code", IntegerType(), True),
        StructField("description", StringType(), True),
        StructField("base_encounter_cost", DoubleType(), True),
        StructField("total_claim_cost", DoubleType(), True),
        StructField("payer_coverage", DoubleType(), True),
        StructField("reasoncode", StringType(), True)
    ])

    # Schema for Patient Dataset
    patients_schema = StructType([
        StructField("id", StringType(), True),
        StructField("birthdate", DateType(), True),
        StructField("deathdate", DateType(), True),
        StructField("ssn", StringType(), True),
        StructField("drivers", StringType(), True),
        StructField("passport", StringType(), True),
        StructField("prefix", StringType(), True),
        StructField("first", StringType(), True),
        StructField("last", StringType(), True),
        StructField("suffix", StringType(), True),
        StructField("maiden", StringType(), True),
        StructField("marital", StringType(), True),
        StructField("race", StringType(), True),
        StructField("ethnicity", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("birthplace", StringType(), True),
        StructField("address", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("county", StringType(), True),
        StructField("fips", IntegerType(), True),
        StructField("zip", IntegerType(), True),
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True),
        StructField("healthcare_expenses", DoubleType(), True),
        StructField("healthcare_coverage", DoubleType(), True),
        StructField("income", IntegerType(), True),
        StructField("mrn", IntegerType(), True)
    ])

    # Schema for Conditions Dataset
    conditions_schema = StructType([
    StructField("start", TimestampType(),True),
    StructField("stop", TimestampType(),True),
    StructField("patient", StringType(),True),
    StructField("encounter", StringType(),True),
    StructField("code", StringType(),True),
    StructField("description", StringType(),True),
    ])


    # Schema for Immunization Dataset
    immunizations_schema = StructType([
        StructField("date", TimestampType(), True),
        StructField("patient", StringType(), True),
        StructField("encounter", StringType(), True),
        StructField("code", IntegerType(), True),
        StructField("description", StringType(), True)
    ])
    
    return patients_schema, encounters_schema, conditions_schema, immunizations_schema






In [7]:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Access environment variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

# Print the values (for debugging)
print(f"AWS Access Key ID: {aws_access_key_id}")
print(f"AWS Secret Access Key: {aws_secret_access_key}")

AWS Access Key ID: AKIA6D6JBN46L435J6HP
AWS Secret Access Key: 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


In [9]:
s3 = boto3.client('s3') 


s3 = boto3.resource(
        service_name = 's3',
        region_name = 'eu-north-1', 
        aws_access_key_id = aws_access_key_id,
        aws_secret_access_key = aws_secret_access_key
)

    # Defining Bucket name and local folder name
bucket_name = "health-care-data-bucket"
local_folder = "dataset_csv"

    # Create the local folder if it doesn't exist
if not os.path.exists(local_folder):
    os.makedirs(local_folder)

    # Getting all the csv files names from the S3 Bucket
csv_files = []
for obj in s3.Bucket('health-care-data-bucket').objects.all():
    csv_files.append(obj.key)

    # Storing CSV Files in local folder
for file_name in csv_files:
    s3.Bucket(bucket_name).download_file(file_name, f"{local_folder}/{file_name}")

### Data Extraction from AWS S3 Bucket

In [10]:
def extract_data(spark, patients_schema, encounters_schema, conditions_schema, immunizations_schema):
   
    # Creating all the CSV Files into Spark Dataframe
    encounters_df = spark.read.csv("dataset_csv/encounters.csv", schema=encounters_schema, header=True)
    patients_df = spark.read.csv("dataset_csv/patients.csv", schema=patients_schema, header=True)
    conditions_df = spark.read.csv("dataset_csv/conditions.csv", schema=conditions_schema, header=True)
    immunizations_df = spark.read.csv("dataset_csv/immunizations.csv", schema=immunizations_schema, header=True)
    return patients_df, encounters_df, conditions_df, immunizations_df

### Data Cleaning and Transformation

In [11]:
def transform_data(patients_df, encounters_df, conditions_df, immunizations_df):

# Cleaning and Few Transformation PATIENT DATA 
    clean_patients_df = patients_df\
        .withColumn("gender", when(col("gender") == "M", "Male")\
                    .when(col("gender") == "F", "Female")\
                    .otherwise("Unknown"))\
        .withColumn("marital", when(col("marital") == "M", "Married")\
                    .when(col("marital") == "S", "Single")\
                    .when(col("marital") == "D", "Divorced")\
                    .otherwise("Unknown"))\
        .withColumnRenamed("id", "patient_id")\
        .withColumnRenamed("first", "first_name")\
        .withColumnRenamed("last", "last_name")\
        .withColumnRenamed("lat","latitude")\
        .withColumnRenamed("lon","longitutde")
    # clean_patients_df.limit(10).display()

# Cleaning and Few Transformation IMMUNIZATION DATA 
    clean_immunization_df = immunizations_df\
        .withColumn("date", immunizations_df["date"].cast("date")).withColumnRenamed('patient','vaccined_patient_id')\
        .withColumnRenamed("encounter","encounter_id")\
        .withColumnRenamed("code","vaccine_code")

# Cleaning and Few Transformation ENCOUNTERS DATA 
    #Start Date and Time Seperation
    clean_encounters_df = encounters_df\
        .withColumn("start_time", date_format('start', 'HH:mm:ss') )\
        .withColumn("start", encounters_df['start'].cast('date'))\
        .withColumn("end_time", date_format('stop', 'HH:mm:ss'))\
        .withColumn("stop", encounters_df['stop'].cast('date'))\
        .withColumnRenamed('id','encounter_id')\
        .withColumnRenamed('start','start_date')\
        .withColumnRenamed('stop','stop_date')\
        .withColumnRenamed('patient','patient_id')\
        .withColumnRenamed('code','encounter_code')

    # clean_immunization_df.limit(10).display()

    clean_conditions_df = conditions_df

    return clean_patients_df, clean_encounters_df, clean_conditions_df, clean_immunization_df
    

### Load Data

In [12]:
def load_data(clean_patients_df, clean_encounters_df, clean_conditions_df, clean_immunization_df):
    print("Patient Data")
    clean_patients_df.show(5)
    print("Encounters Data")
    clean_encounters_df.show(5)
    print("Conditions Data")
    clean_conditions_df.show(5)
    print("Immunizations Data")
    clean_immunization_df.show(5)


### Code Execution

In [13]:

def execute():
    spark = SparkSession.builder.appName("Healthcare_ETL").getOrCreate()
    patients_schema, encounters_schema, conditions_schema, immunizations_schema = define_schemas()
    patients_df, encounters_df, conditions_df, immunizations_df = extract_data(spark, patients_schema, encounters_schema, conditions_schema, immunizations_schema )
    clean_patients_df, clean_encounters_df, clean_conditions_df, clean_immunization_df = transform_data(patients_df, encounters_df, conditions_df, immunizations_df)
    load_data(clean_patients_df, clean_encounters_df, clean_conditions_df, clean_immunization_df)
    
execute()

Patient Data
+--------------------+----------+----------+-----------+---------+----------+------+----------+---------+------+---------+--------+-----+-----------+------+--------------------+--------------------+-----------+-------------+----------------+-----+----+------------------+------------------+-------------------+-------------------+------+---+
|          patient_id| birthdate| deathdate|        ssn|  drivers|  passport|prefix|first_name|last_name|suffix|   maiden| marital| race|  ethnicity|gender|          birthplace|             address|       city|        state|          county| fips| zip|          latitude|        longitutde|healthcare_expenses|healthcare_coverage|income|mrn|
+--------------------+----------+----------+-----------+---------+----------+------+----------+---------+------+---------+--------+-----+-----------+------+--------------------+--------------------+-----------+-------------+----------------+-----+----+------------------+------------------+-------------