# Understanding Chronic Illness- How Gender, Race, Location & Socioeconomic Variables Impact Disease Prevalence
_______________________________________________________

_By Ilyas Alhassan, Sadia Mohammed, Sakthi Srikanthan & Aneesah Gilani_


## Importing Libraries

In [0]:
from pyspark.sql.functions import col, current_timestamp, regexp_replace, when, lit, count
from pyspark.sql.types import *
import datetime
import pandas as pd 
import numpy as np   
from matplotlib import cm
import matplotlib.pyplot as plt  
import seaborn as sns  
import plotly.express as px
import plotly.graph_objects as go
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col  
from matplotlib.ticker import MaxNLocator

#Creating a Landing Zone Table (RAW)


##Defining Data Primary Data Source (Volumes)

In [0]:
source_path1="/Volumes/sadia_catalog/sadia/data-lake/Alzheimer_s_Disease_and_Healthy_Aging_Data-2.csv"
source_path2="/Volumes/sadia_catalog/sadia/data-lake/U.S._Chronic_Disease_Indicators__CDI___2023_Release.csv"


##Creating a Delta Table with the Raw Data

In [0]:
   df1 = spark.read.format("csv") \
        .option("header", "true") \
        .load(source_path1)

In [0]:
 df2 = spark.read.format("csv") \
        .option("header", "true") \
        .load(source_path2)

In [0]:
df1 = df1.withColumn("ingestion_timestamp", current_timestamp()) \
             .withColumn("batch_id", lit(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))
    

In [0]:
df2 = df2.withColumn("ingestion_timestamp", current_timestamp())\
             .withColumn("batch_id", lit(datetime.datetime.now().strftime("%Y%m%d_%H%M%S")))

In [0]:
df2.display()

In [0]:
df1.display()

In [0]:
catalog = "sadia_catalog"
schema = "Sadia"
table1_name = "raw_Alzheimar"
table2_name = "raw_Chronic"

In [0]:
spark.sql(f"USE Catalog {catalog}")

DataFrame[]

In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{schema}")

DataFrame[]

In [0]:
df1.write.format("delta").mode("append").saveAsTable(f"{catalog}.{schema}.{table1_name}")

In [0]:
df2.write.format("delta").mode("append").saveAsTable(f"{catalog}.{schema}.{table2_name}")

#Preprocessing Raw Data

In [0]:
df1 = spark.sql(f"SELECT * FROM {catalog}.{schema}.{table1_name}")
df2 = spark.sql(f"SELECT * FROM {catalog}.{schema}.{table2_name}")

##Count missing values

In [0]:
df1.select([count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df1.columns]).display()

RowId_nulls,YearStart_nulls,YearEnd_nulls,LocationAbbr_nulls,LocationDesc_nulls,Datasource_nulls,Class_nulls,Topic_nulls,Question_nulls,Data_Value_Unit_nulls,DataValueTypeID_nulls,Data_Value_Type_nulls,Data_Value_nulls,Data_Value_Alt_nulls,Data_Value_Footnote_Symbol_nulls,Data_Value_Footnote_nulls,Low_Confidence_Limit_nulls,High_Confidence_Limit_nulls,StratificationCategory1_nulls,Stratification1_nulls,StratificationCategory2_nulls,Stratification2_nulls,Geolocation_nulls,ClassID_nulls,TopicID_nulls,QuestionID_nulls,LocationID_nulls,StratificationCategoryID1_nulls,StratificationID1_nulls,StratificationCategoryID2_nulls,StratificationID2_nulls,ingestion_timestamp_nulls,batch_id_nulls
0,0,0,0,0,0,0,0,0,0,0,0,619101,619101,1184610,1184610,675367,675367,20237,20237,248346,248346,216468,9765,6720,0,0,0,0,0,0,0,0


In [0]:
df2.select([count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df2.columns]).display()

YearStart_nulls,YearEnd_nulls,LocationAbbr_nulls,LocationDesc_nulls,DataSource_nulls,Topic_nulls,Question_nulls,Response_nulls,DataValueUnit_nulls,DataValueType_nulls,DataValue_nulls,DataValueAlt_nulls,DataValueFootnoteSymbol_nulls,DatavalueFootnote_nulls,LowConfidenceLimit_nulls,HighConfidenceLimit_nulls,StratificationCategory1_nulls,Stratification1_nulls,StratificationCategory2_nulls,Stratification2_nulls,StratificationCategory3_nulls,Stratification3_nulls,GeoLocation_nulls,ResponseID_nulls,LocationID_nulls,TopicID_nulls,QuestionID_nulls,DataValueTypeID_nulls,StratificationCategoryID1_nulls,StratificationID1_nulls,StratificationCategoryID2_nulls,StratificationID2_nulls,StratificationCategoryID3_nulls,StratificationID3_nulls,ingestion_timestamp_nulls,batch_id_nulls
0,0,0,0,0,0,0,9485408,1216984,0,3029872,3048784,6335728,6335728,4026368,4026368,0,0,9485408,9485408,9485408,9485408,81328,9485408,0,0,0,0,0,0,9485408,9485408,9485408,9485408,0,0


##Filling In missing values with Heuristics

In [0]:
df1 = df1.na.fill({"Data_Value": "0", "Data_Value_Alt": "0"})
df1 = df1.na.fill({
    "Low_Confidence_Limit": "0",
    "High_Confidence_Limit": "0"
})
df1 = df1.na.fill({
    "Data_Value_Footnote_Symbol": "None",
    "Data_Value_Footnote": "No footnote"
})
df1 = df1.na.fill({"Geolocation": "Unknown"})
df1 = df1.na.fill({
    "StratificationCategory1": "Unspecified",
    "Stratification1": "Unspecified",
    "StratificationCategory2": "Unspecified",
    "Stratification2": "Unspecified"
})
df1 = df1.na.fill({
    "ClassID": "0",
    "TopicID": "0"
})

In [0]:
df2 = df2.na.fill({
    "DataValue": "0",
    "DataValueAlt": "0"
})

df2 = df2.na.fill({
    "LowConfidenceLimit": "0",
    "HighConfidenceLimit": "0"
})

df2 = df2.na.fill({
    "DataValueFootnoteSymbol": "None",
    "DatavalueFootnote": "No footnote"
})
df2 = df2.na.fill({
    "Response": "No Response",
    "ResponseID": "0"
})

df2 = df2.na.fill({
    "GeoLocation": "Unknown"
})

df2 = df2.na.fill({
    "StratificationCategory1": "Unspecified",
    "Stratification1": "Unspecified",
    "StratificationCategory2": "Unspecified",
    "Stratification2": "Unspecified",
    "StratificationCategory3": "Unspecified",
    "Stratification3": "Unspecified",
    "StratificationCategoryID1": "0",
    "StratificationID1": "0",
    "StratificationCategoryID2": "0",
    "StratificationID2": "0",
    "StratificationCategoryID3": "0",
    "StratificationID3": "0"
})
df2 = df2.na.fill({
    "DataValueUnit": "Unspecified"
})

##Checking whether missing values have been filled

In [0]:
df1.select([count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df1.columns]).display()

In [0]:
df2.select([count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df2.columns]).display()

##Check for Duplicates

In [0]:
print(f"df1 duplicate count: {df1.groupBy(df1.columns).count().filter(col('count') > 1).count()}")
print(f"df2 duplicate count: {df2.groupBy(df2.columns).count().filter(col('count') > 1).count()}")

##Renaming Columns

In [0]:
columns_to_keep = ['LocationID', 'YearStart', 'YearEnd', 'LocationAbbr', 'LocationDesc']
for column in df1.columns:
    if column not in columns_to_keep:
        df1 = df1.withColumnRenamed(column, f"{column}_Alzheimer")
for column in df2.columns:
    if column not in columns_to_keep:
        df2 = df2.withColumnRenamed(column, f"{column}_Chronic")

##Joining Data

In [0]:
joined_df = df1.join(df2, 
    on=['LocationID', 'YearStart', 'YearEnd', 'LocationAbbr', 'LocationDesc'],
    how='inner')
joined_df = joined_df.withColumn(
    "data_category",
    when(col("Topic_Alzheimer").like("%Alzheimer%"), "Alzheimer's Disease")
    .otherwise("Other Chronic Conditions")
)

In [0]:
joined_df.select("Topic_Alzheimer", "Topic_Chronic").distinct().display()

##Checking for Data quality after Join

In [0]:
joined_df.printSchema()

#Data Explorations with (Visualizations)

In [0]:
cognitive_decline_df = df1.filter(col("Class") == "Cognitive Decline")
race_ethnicity_df = cognitive_decline_df.filter(col("StratificationCategory2") == "Race/Ethnicity")
gender_df = cognitive_decline_df.filter(col("StratificationCategory2") == "Gender")

In [0]:
from pyspark.sql.functions import avg
race_ethnicity_agg = race_ethnicity_df.groupBy("Stratification1_Alzheimer", "Stratification2_Alzheimer").agg(
    avg("Data_Value_Alzheimer").alias("mean_percentage")
)
gender_agg = gender_df.groupBy("Stratification1_Alzheimer", "Stratification2_Alzheimer").agg(
    avg("Data_Value_Alzheimer").alias("mean_percentage")
)

##Visualization I (Cognitive Decline by Race and Ethnic and Age Groups)

In [0]:

plt.figure(figsize=(12, 7))
young_color = "#48dbfb"    
middle_color = "#FF6B6B"    
elderly_color = "#a8e6cf"
race_ethnicity_pandas = race_ethnicity_agg.toPandas()
race_ethnicity_pivot = race_ethnicity_pandas.pivot(
    index="Stratification2_Alzheimer", columns="Stratification1_Alzheimer", values="mean_percentage"
)
race_ethnicity_pivot.plot(kind="barh", color=[young_color, middle_color, elderly_color])
plt.title("Cognitive Decline Analysis by Race/Ethnicity and Age Groups", fontsize=16)
plt.xlabel("Mean Percentage of Cognitive Decline (%)")
plt.ylabel("Race/Ethnicity")
plt.axvline(overall_mean, color="#2d3436", linestyle="--",
label=f"Overall Mean: {overall_mean:.1f}%")
plt.grid(alpha=0.3)
plt.legend(title="Age Group")
plt.tight_layout()
plt.show()

##Visulaization II (Cognitive Decline by Gender and Age Groups)

In [0]:

gender_pandas = gender_agg.toPandas()
overall_mean = gender_pandas["mean_percentage"].mean()
gender_pivot = gender_pandas.pivot(
    index="Stratification2_Alzheimer", columns="Stratification1_Alzheimer", values="mean_percentage"
)
gender_pivot.plot(kind="barh", figsize=(10, 6), color=["#FF9999", "#FF5555"])
plt.title("Cognitive Decline by Gender and Age", fontsize=14)
plt.xlabel("Mean Percentage of Cognitive Decline")
plt.ylabel("Gender")
plt.legend(title="Age Group")
plt.grid(axis="x", linestyle="--", alpha=0.7)
plt.axvline(overall_mean, color="red", linestyle="--", label=f"Overall Mean: {overall_mean:.2f}")

plt.show()



A higher mean percentage indicates that a larger portion of individuals in that group reported experiencing cognitive decline

##In this next section, we will examine various factors (RACE AND GENDER) WERE USED TO STRATIFY THE CHRONIC DATASET

##Overview of the Categories

In [0]:
result_df1 = df2.groupBy("StratificationCategoryID1_Chronic").agg(
    count("*").alias("count")
).orderBy("count", ascending=False)
display(result_df1)

##Gender Analysis - Mean Cognitive Scores  (Chronic Diseases)

In [0]:
filtered_df1 = df1.filter(df1["StratificationID2_Alzheimer"].isin("FEMALE", "MALE"))
result_df1 = filtered_df1.groupBy("StratificationID2_Alzheimer").agg(
    avg("Data_Value_Alzheimer").alias("avg_health_score"),
    count("*").alias("count")
).orderBy("avg_health_score", ascending=False)
result_df1.display()

StratificationID2_Alzheimer,avg_health_score,count
FEMALE,35.31837595253131,232407
MALE,32.512704356562416,229929


##Visualization III (Gender Distribution Across Chronic Conditions)

In [0]:
from pyspark.sql import functions as F
filtered_df2 = df2.filter(df2["Stratification1_Chronic"].isin("Male", "Female"))
result_df2 = filtered_df2.groupBy("Topic_Chronic", "Stratification1_Chronic").agg(
    F.countDistinct("DataValue_Chronic").alias("count") 
).orderBy("Topic_Chronic", "Stratification1_Chronic")
result_df2.display()

In [0]:
plt.figure(figsize=(12, 7))
df_plot = result_df2.toPandas()
df_plot_sorted = df_plot.sort_values('count', ascending=False)

sns.barplot(data=df_plot_sorted, 
           x='Topic_Chronic', 
           y='count',
           hue='Stratification1_Chronic',
           palette=["#48dbfb", "#FF6B6B"])

plt.title("Distribution of Chronic Conditions by Gender", fontsize=16)
plt.xlabel("Chronic Condition Type")
plt.ylabel("Count")
plt.xticks(rotation=45, ha='right')
plt.legend(title="Gender")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

##Gender Comparison: Chronic Conditions vs Alzheimer's Scores