# Public Health Analysis Using CDC Metrics, Patient Data and Hospital General Data

Start by setting up the Databricks architecture. 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import pandas as pd



# Setting up my datalakes layers - bronze, silver and gold 
datalake_base = "/mnt/datalake"
bronze_path       = f"{datalake_base}/bronze/appointments"
silver_date       = f"{datalake_base}/silver/dim_date"
silver_patients   = f"{datalake_base}/silver/dim_patients"
silver_hospitals  = f"{datalake_base}/silver/dim_hospitals"
silver_doctors    = f"{datalake_base}/silver/dim_doctors"
silver_appt_types = f"{datalake_base}/silver/dim_appointment_types"
silver_events     = f"{datalake_base}/silver/appointments"

gold_facts = f"{datalake_base}/gold/fact_appointments"

raw_events_base = f"{datalake_base}/raw/appointment_events"
spark = SparkSession.builder.appName("No-Show Analysis").getOrCreate()

Each file must be uploaded, cleaned and placed into a pyspark dataframe before it is queried and worked with. Kaggle provides patient data. We use the same file for hospital general data and CDC data provides us with data points on general health trends.

In [0]:
# import the patient data first
# read the raw csv file using spark, clean up column titles and only retain columns necessary 

kaggle_sdf = spark.read.csv(
    "dbfs:/FileStore/tables/KaggleV2_May_2016__1_.csv",
    header=True, inferSchema=True
)


kaggle_sdf = kaggle_sdf.toDF(*[col.replace(" ", "_").lower() for col in kaggle_sdf.columns])

# create patient dimension
patients_df = kaggle_sdf.selectExpr(
    "patientid AS patient_id",
    "age",
    "gender",
    "diabetes AS has_diabetes",
    "hipertension AS has_hypertension"
)

In [0]:
# Load hospital data from general hospital data 
# clean up column names and rename certain columns for easier access in schema 
# retain only specific columns
hospital_sdf = spark.read.csv(
    "dbfs:/FileStore/tables/Hospital_General_Information__1_-2.csv",
    header=True, inferSchema=True
)


hospital_sdf = hospital_sdf.toDF(*[col.replace(" ", "_").lower() for col in hospital_sdf.columns])
hospital_df = hospital_sdf \
    .withColumnRenamed("Facility ID", "facility_id") \
    .withColumnRenamed("Facility Name", "hospital_name") \
    .withColumnRenamed("City/Town", "city") \
    .withColumnRenamed("State", "state") \
    .withColumnRenamed("Hospital Type", "hospital_type") \
    .withColumnRenamed("Hospital Ownership", "hospital_ownership") \
    .withColumnRenamed("Hospital overall rating", "hospital_rating")

hospital_df = hospital_sdf.select(
    "facility_id", "facility_name", "City/Town", "state",
    "hospital_type", "hospital_ownership", "hospital_overall_rating"
)

In [0]:
# CDC data must be accessed through an endpoint - set up logic and only pull certain columns 
import requests
from pyspark.sql import Row
from pyspark.sql.functions import col, explode


api_url = "https://data.cdc.gov/resource/hksd-2xuw.json?$limit=500"
response = requests.get(api_url)
appointments = response.json()

# handle inconsistent data types
def normalize_record(record):
    return {
        "yearstart": record.get("yearstart", None),
        "yearend": record.get("yearend", None),
        "locationabbr": record.get("locationabbr", None),
        "locationdesc": record.get("locationdesc", None),
        "topic": record.get("topic", None),
        "question": record.get("question", None),
        "datavalue": record.get("datavalue", None),
        "stratification1": record.get("stratification1", None),
        "locationid": record.get("locationid", None),
        "coordinates": record.get("geolocation", {}).get("coordinates", None)
    }


normalized_data = [Row(**normalize_record(record)) for record in appointments]
appointments_df = spark.createDataFrame(normalized_data)
display(appointments_df)


root
 |-- yearstart: string (nullable = true)
 |-- yearend: string (nullable = true)
 |-- locationabbr: string (nullable = true)
 |-- locationdesc: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- question: string (nullable = true)
 |-- datavalue: string (nullable = true)
 |-- stratification1: string (nullable = true)
 |-- locationid: string (nullable = true)
 |-- coordinates: array (nullable = true)
 |    |-- element: double (containsNull = true)



yearstart,yearend,locationabbr,locationdesc,topic,question,datavalue,stratification1,locationid,coordinates
2020,2020,US,United States,Health Status,Recent activity limitation among adults,2.9,Female,59,
2015,2019,AR,Arkansas,Cancer,"Invasive cancer (all sites combined), incidence",9537.0,Male,5,"List(-92.27449074299966, 34.74865012400045)"
2015,2019,CA,California,Cancer,"Cervical cancer mortality among all females, underlying cause",486.0,Overall,6,"List(-120.99999953799971, 37.63864012300047)"
2015,2019,CO,Colorado,Cancer,"Invasive cancer (all sites combined), incidence",2880.0,Hispanic,8,"List(-106.13361092099967, 38.843840757000464)"
2015,2019,GA,Georgia,Cancer,"Prostate cancer mortality among all males, underlying cause",519.0,"White, non-Hispanic",13,"List(-83.62758034599966, 32.83968109300048)"
2015,2019,KS,Kansas,Cancer,"Invasive cancer (all sites combined), incidence",8102.0,Male,20,"List(-98.20078122699965, 38.34774030000045)"
2015,2019,ME,Maine,Cancer,"Invasive cancer (all sites combined), incidence",9238.0,Overall,23,"List(-68.98503133599962, 45.254228894000505)"
2015,2019,NJ,New Jersey,Cancer,"Invasive cancer (all sites combined), incidence",5587.0,Hispanic,34,"List(-74.27369128799967, 40.13057004800049)"
2015,2019,NY,New York,Cancer,"Breast cancer mortality among all females, underlying cause",2547.0,Female,36,"List(-75.54397042699964, 42.82700103200045)"
2015,2019,OR,Oregon,Cancer,"Breast cancer mortality among all females, underlying cause",548.0,Overall,41,"List(-120.15503132599969, 44.56744942400047)"


In [0]:
# Save CDC data to the bronze layer 
appointments_df.write.mode("overwrite").format("delta").save("/mnt/datalake/gold/fact_cdc_health")

# Register the table in SQL
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS fact_cdc_health
  USING DELTA
  LOCATION '/mnt/datalake/gold/fact_cdc_health'
""")
display(spark.sql("SELECT * FROM fact_cdc_health LIMIT 5"))

yearstart,yearend,locationabbr,locationdesc,topic,question,datavalue,stratification1,locationid,coordinates
2020,2020,US,United States,Health Status,Recent activity limitation among adults,2.9,Female,59,
2015,2019,AR,Arkansas,Cancer,"Invasive cancer (all sites combined), incidence",9537.0,Male,5,"List(-92.27449074299966, 34.74865012400045)"
2015,2019,CA,California,Cancer,"Cervical cancer mortality among all females, underlying cause",486.0,Overall,6,"List(-120.99999953799971, 37.63864012300047)"
2015,2019,CO,Colorado,Cancer,"Invasive cancer (all sites combined), incidence",2880.0,Hispanic,8,"List(-106.13361092099967, 38.843840757000464)"
2015,2019,GA,Georgia,Cancer,"Prostate cancer mortality among all males, underlying cause",519.0,"White, non-Hispanic",13,"List(-83.62758034599966, 32.83968109300048)"


After data is populated into dataframes, we build dimensions that will hold the data. Then perform a number of queries.

In [0]:
# Save Patients Dimension
patients_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(silver_patients)
spark.sql("""
  CREATE TABLE IF NOT EXISTS dim_patients
  USING DELTA
  LOCATION '/mnt/datalake/silver/dim_patients'
""")

Out[15]: DataFrame[]

In [0]:
display(spark.sql("SELECT * FROM dim_patients LIMIT 5"))

patient_id,age,gender,has_diabetes,has_hypertension
29872499824296.0,62,F,0,1
558997776694438.0,56,M,0,0
4262962299951.0,62,F,0,0
867951213174.0,8,F,0,0
8841186448183.0,56,F,1,1


In [0]:
# Save Hospital Dimension
hospital_df.write.mode("overwrite").format("delta").save("/mnt/datalake/silver/dim_hospitals")
spark.sql("""
  CREATE TABLE IF NOT EXISTS dim_hospitals
  USING DELTA
  LOCATION '/mnt/datalake/silver/dim_hospitals'
""")

Out[16]: DataFrame[]

In [0]:
display(spark.sql("SELECT * FROM dim_hospitals LIMIT 5"))

facility_id,facility_name,City/Town,state,hospital_type,hospital_ownership,hospital_overall_rating
10001.0,SOUTHEAST HEALTH MEDICAL CENTER,DOTHAN,AL,Acute Care Hospitals,Government - Hospital District or Authority,3
10005.0,MARSHALL MEDICAL CENTERS,BOAZ,AL,Acute Care Hospitals,Government - Hospital District or Authority,2
10006.0,NORTH ALABAMA MEDICAL CENTER,FLORENCE,AL,Acute Care Hospitals,Proprietary,1
10007.0,MIZELL MEMORIAL HOSPITAL,OPP,AL,Acute Care Hospitals,Voluntary non-profit - Private,1
10008.0,CRENSHAW COMMUNITY HOSPITAL,LUVERNE,AL,Acute Care Hospitals,Proprietary,Not Available


In [0]:
# Calculate average health metric by state
avg_health_by_state = spark.sql("""
SELECT locationdesc AS state_name, 
       topic, 
       question, 
       ROUND(AVG(CAST(datavalue AS FLOAT)), 2) AS avg_metric
FROM fact_cdc_health
GROUP BY locationdesc, topic, question
ORDER BY avg_metric DESC
""")
avg_health_by_state.show()


+--------------+--------------------+--------------------+----------+
|    state_name|               topic|            question|avg_metric|
+--------------+--------------------+--------------------+----------+
| United States|              Cancer|Invasive cancer (...|  314982.0|
|North Carolina|              Cancer|Invasive cancer (...|   30211.0|
|         Texas|Chronic Obstructi...|Chronic obstructi...|   18663.0|
|      Arkansas|              Cancer|Invasive cancer (...|    9537.0|
|North Carolina|Chronic Obstructi...|Chronic obstructi...|    8594.0|
|     Wisconsin|              Cancer|Invasive cancer (...|    8447.8|
|        Kansas|              Cancer|Invasive cancer (...|    8102.0|
|  Pennsylvania|Cardiovascular Di...|Coronary heart di...|    7189.0|
|         Maine|              Cancer|Invasive cancer (...|    7001.0|
|      New York|            Diabetes|Diabetes mortalit...|    6787.0|
|    New Jersey|              Cancer|Invasive cancer (...|    5587.0|
|      Illinois|    

In [0]:
# Calculate average health metric by year
avg_health_by_year = spark.sql("""
SELECT yearstart AS year, 
       topic, 
       question, 
       ROUND(AVG(CAST(datavalue AS FLOAT)), 2) AS avg_metric
FROM fact_cdc_health
GROUP BY yearstart, topic, question
ORDER BY year DESC, avg_metric DESC
""")
avg_health_by_year.show()


+----+--------------------+--------------------+----------+
|year|               topic|            question|avg_metric|
+----+--------------------+--------------------+----------+
|2020|            Diabetes|Diabetes mortalit...|    4050.0|
|2020|Chronic Obstructi...|Chronic obstructi...|    1801.0|
|2020|Cardiovascular Di...|Cerebrovascular d...|   1359.25|
|2020|Cardiovascular Di...|Coronary heart di...|   1080.85|
|2020|Cardiovascular Di...|Diseases of the h...|   1023.33|
|2020|             Alcohol|Chronic liver dis...|    368.78|
|2020|              Cancer|Cervical cancer s...|     83.75|
|2020|Social Determinan...|Routine checkup w...|     80.05|
|2020|       Health Status|Life expectancy a...|     77.35|
|2020|              Cancer|Mammography use a...|      72.9|
|2020|        Immunization|Pneumococcal vacc...|      71.0|
|2020|         Oral Health|No teeth lost amo...|     61.18|
|2020|         Oral Health|Visited dentist o...|      58.1|
|2020|             Tobacco|Quit attempts

In [0]:
# Calculate the most frequently reported health topics
top_health_issues = spark.sql("""
SELECT topic, 
       COUNT(*) AS report_count
FROM fact_cdc_health
GROUP BY topic
ORDER BY report_count DESC
""")
top_health_issues.show()


+--------------------+------------+
|               topic|report_count|
+--------------------+------------+
|Cardiovascular Di...|          63|
|              Cancer|          53|
|            Diabetes|          42|
|       Health Status|          42|
|           Arthritis|          42|
|       Mental Health|          36|
|             Alcohol|          35|
|        Immunization|          29|
|              Asthma|          27|
|               Sleep|          22|
|             Tobacco|          21|
|Nutrition, Physic...|          18|
|Chronic Obstructi...|          17|
|          Disability|          16|
|Social Determinan...|          15|
|         Oral Health|          14|
|Cognitive Health ...|           5|
|Chronic Kidney Di...|           3|
+--------------------+------------+

