In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName("DisabiltySet").getOrCreate()

In [0]:
main_df = spark.table("workspace.default.disability_and_health_data_system_dhds_20250704")
main_df.display()

In [0]:
main_df.printSchema()

## Data Cleaning

In [0]:
main_df = main_df.fillna({"Data_Value_Footnote_Symbol": "-"})
main_df.display()

In [0]:
df_1=main_df.select(main_df["Rowid"],main_df["Year"],main_df["LocationAbbr"],main_df["LocationDesc"],main_df["Category"],main_df["Indicator"],main_df["Data_Value"],main_df["Data_Value_Alt"],main_df["Data_Value_Type"],main_df["Data_Value_Footnote_Symbol"],main_df["Data_Value_Footnote"],main_df["Low_Confidence_Limit"],main_df["High_Confidence_Limit"],main_df["WeightedNumber"],main_df["Number"],main_df["StratificationCategory1"],main_df["Stratification1"],main_df["StratificationCategory2"],main_df["Stratification2"],main_df["Response"],main_df["Geolocation"])
df_1.display()

In [0]:
from pyspark.sql.functions import col
distinct_strat1 = df_1.select("Stratification1").distinct()
count = distinct_strat1.count()
print(f"Total unique categories in Stratification1: {count}")
distinct_strat1.show(truncate=False)

In [0]:
df_hearing = df_1.filter(main_df["Stratification1"] == "Hearing Disability")
df_hearing.display()

In [0]:
df_vision = df_1.filter(main_df["Stratification1"] == "Vision Disability")
df_vision.display()

In [0]:
df_cognitive = df_1.filter(main_df["Stratification1"] == "Cognitive Disability")
df_cognitive.display()

In [0]:
df_cognitive = df_1.filter(main_df["Stratification1"] == "Any Disability")
df_cognitive.display()

In [0]:
df_cognitive = df_1.filter(main_df["Stratification1"] == "No Disability")
df_cognitive.display()

In [0]:
from pyspark.sql.functions import col, when

disability_types = [
    "Hearing Disability", "Vision Disability", "Cognitive Disability",
    "Self-care Disability", "Mobility Disability",
    "Independent Living Disability", "Overall"
]

df_1 = df_1.withColumn(
    "StratificationCategory2",
    when(
        col("Stratification1").isin(disability_types) & col("StratificationCategory2").isNull(),
        "-"
    ).otherwise(col("StratificationCategory2"))
)

df_1 = df_1.withColumn(
    "Stratification2",
    when(
        col("Stratification1").isin(disability_types) & col("Stratification2").isNull(),
        "-"
    ).otherwise(col("Stratification2"))
)

df_1.display()


In [0]:
from pyspark.sql.functions import col
distinct_strat1 = df_1.select("LocationDesc").distinct()
count = distinct_strat1.count()
print(f"Total unique categories in Stratification1: {count}")
distinct_strat1.display()

In [0]:
from pyspark.sql.functions import desc

df_sub=df_1.select("Stratification1","StratificationCategory2","Stratification2")
df_sub.filter(df_sub["Stratification1"]=="No Disability").groupBy("Stratification1","StratificationCategory2").count().orderBy(desc("count")).display()

In [0]:
df_1.filter(df_1["StratificationCategory2"] == "Race/Ethnicity").groupBy("Stratification2").count().orderBy(desc("count")).display()

In [0]:
df_1 = df_1.withColumn(
    "StratificationCategory2",
    when(
        (col("Stratification1") == "No Disability") & col("StratificationCategory2").isNull(),
        "Race/Ethnicity"
    ).otherwise(col("StratificationCategory2"))
)

df_1 = df_1.withColumn(
    "Stratification2",
    when(
        (col("StratificationCategory2") == "Race/Ethnicity") & col("Stratification2").isNull(),
        "White,non-Hispanic"
    ).otherwise(col("Stratification2"))
)

df_1.display()

In [0]:
df_sub_3=df_1.select("Stratification1","StratificationCategory2","Stratification2")
df_sub_3.filter(df_sub_3["Stratification1"]=="Any Disability").groupBy("Stratification1","StratificationCategory2").count().orderBy(desc("count")).display()

In [0]:
df_sub_3.filter(df_sub_3["Stratification1"]=="Any Disability").groupBy("Stratification1","Stratification2").count().orderBy(desc("count")).display()

In [0]:
df_1 = df_1.withColumn(
    "StratificationCategory2",
    when(
        (col("Stratification1") == "Any Disability") & col("StratificationCategory2").isNull(),
        "Race/Ethnicity"
    ).otherwise(col("StratificationCategory2"))
)

df_1 = df_1.withColumn(
    "Stratification2",
    when(
        (col("StratificationCategory2") == "Race/Ethnicity") & col("Stratification2").isNull(),
        "White,non-Hispanic"
    ).otherwise(col("Stratification2"))
)

df_1.display()

In [0]:
df_1.select("Data_Value_Footnote_Symbol").where(df_1["Data_Value_Footnote_Symbol"] == "*").count()

In [0]:
df_1.select("Data_Value_Footnote_Symbol").where(df_1["Data_Value_Footnote_Symbol"] == "-").count()

In [0]:
from pyspark.sql.functions import col

df_filtered = df_1.filter(col("Data_Value_Footnote_Symbol") != "*")

In [0]:
df_filtered.display()

In [0]:
df_filtered=df_filtered.drop("Data_Value_Footnote_Symbol","Data_Value_Footnote")
df_filtered.display()

In [0]:
df_filtered.select("Data_Value").where(df_filtered["Geolocation"].isNull()).count()

In [0]:
df_filtered.where(df_filtered["Data_Value"].isNull()).display()

In [0]:
df_filtered_2=df_filtered.filter(col("Data_Value").isNotNull() & col("Data_Value_Alt").isNotNull() & col("Low_Confidence_Limit").isNotNull() & col("High_Confidence_Limit").isNotNull() & col("WeightedNumber").isNotNull() & col("Number").isNotNull())
df_filtered_2.display()

In [0]:
df_filtered_2.select("Rowid").count()

In [0]:
df_filtered_2.where(df_filtered_2["Geolocation"].isNull()).count()

In [0]:
df_filtered_2=df_filtered_2.drop("Geolocation")
df_filtered_2.display()

In [0]:
df_filtered_2.printSchema()

In [0]:
df_filtered_2.where(df_filtered_2["Data_Value"].isNull() & df_filtered_2["Data_Value_Alt"].isNull() & df_filtered_2["Low_Confidence_Limit"].isNull() & df_filtered_2["High_Confidence_Limit"].isNull() & df_filtered_2["WeightedNumber"].isNull() & df_filtered_2["Number"].isNull() & df_filtered_2["StratificationCategory1"].isNull() & df_filtered_2["Stratification1"].isNull() & df_filtered_2["StratificationCategory2"].isNull() & df_filtered_2["Stratification2"].isNull() & df_filtered_2["Response"].isNull()).display()

## Max. Disabled Region and Age group.

In [0]:
df_filtered_2.groupBy("Stratification1").count().display()

In [0]:
# which region has maximum hearing disable 

df_hearing=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Hearing Disability")
df_hearing.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum vision disable 

df_vision=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Vision Disability")
df_vision.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum cognitive disable 

df_cognitive=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Cognitive Disability")
df_cognitive.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum mobility disable 

df_mobility=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Mobility Disability")
df_mobility.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum Independent Living Disability

df_ILD=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Independent Living Disability")
df_ILD.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum self-care disability

df_self=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Self-care Disability")
df_self.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum Any disability

df_any=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Any Disability")
df_any.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum No disability

df_no=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="No Disability")
df_no.groupBy("LocationDesc").count().orderBy(desc("count")).display()

# which region has maximum Overall
df_overall=df_filtered_2.select("LocationDesc").where(df_filtered_2["Stratification1"]=="Overall")
df_overall.groupBy("LocationDesc").count().orderBy(desc("count")).display()

In [0]:
from pyspark.sql.functions import col, desc
import matplotlib.pyplot as plt
import pandas as pd

# List of all Stratification1 types you're analyzing
disability_types = [
    "Hearing Disability", "Vision Disability", "Cognitive Disability",
    "Mobility Disability", "Independent Living Disability", "Self-care Disability",
    "Any Disability", "No Disability", "Overall"
]

# Dictionary to store results
data_dict = {}

# Loop through each type and extract count per region
for disability in disability_types:
    df_temp = (
        df_filtered_2
        .filter(col("Stratification1") == disability)
        .groupBy("LocationDesc")
        .count()
        .orderBy(desc("count"))
    )
    # Convert Spark DataFrame to Pandas and to dict
    pdf = df_temp.toPandas()
    data_dict[disability] = dict(zip(pdf["LocationDesc"], pdf["count"]))

# Plotting
fig, axs = plt.subplots(9, 1, figsize=(30, 35))
fig.suptitle("Disability Counts by Region for Each Stratification Type", fontsize=18)
axs = axs.flatten()

for i, (disability, region_counts) in enumerate(data_dict.items()):
    pd_series = pd.Series(region_counts).sort_values(ascending=False).head(50)  # Top 10 for clarity
    axs[i].plot(pd_series.index, pd_series.values, color='skyblue')
    axs[i].set_title(disability, fontsize=12)
    axs[i].tick_params(axis='x', rotation=45)
    axs[i].set_ylabel("Count")

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()


In [0]:
# see for the sequence of indicator and response for specific disease

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Hearing Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Vision Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Cognitive Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Mobility Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Independent Living Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Self-care Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Any Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="No Disability")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

df_new=df_filtered_2.select("Indicator","Response").where(df_filtered_2["Stratification1"]=="Overall")
df_new.groupBy("Indicator").count().orderBy(desc("count")).display()

In [0]:
# trend per year observed
from pyspark.sql.functions import avg

df_yearly = df_filtered_2.groupBy("Year", "Stratification1").agg(avg("Data_Value").alias("Average_Disability_Rate")).orderBy("Year")
df_yearly.orderBy(desc("Average_Disability_Rate")).display()

In [0]:
# Stratification comparison
df_filtered_2.groupBy("Stratification2", "Stratification1").agg(avg("Data_Value").alias("Avg_Value")).orderBy(desc("Avg_Value")).display()

In [0]:
from pyspark.sql.functions import avg, desc, asc

# Compute average disability value per state per disability type
df_state_avg = df_filtered_2.groupBy("LocationDesc", "Stratification1").agg(
    avg("Data_Value").alias("Average_Disability_Rate")
)

df_state_avg.display()

In [0]:
import matplotlib.pyplot as plt

df_pd = df_filtered_2.select("Stratification1").toPandas()
counts = df_pd["Stratification1"].value_counts()

plt.figure(figsize=(10, 6))
counts.plot(kind="bar", color="skyblue")
plt.title("Count of Each Disability Type (Stratification1)")
plt.xlabel("Disability Type")
plt.ylabel("Count")
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.show()


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window partitioned by disability type and ordered by descending average
windowSpecTop = Window.partitionBy("Stratification1").orderBy(desc("Average_Disability_Rate"))

# Add rank column
df_top5 = df_state_avg.withColumn("rank", row_number().over(windowSpecTop)).filter("rank <= 5")

df_top5.select("Stratification1", "LocationDesc", "Average_Disability_Rate", "rank").display()


In [0]:
# Define a window ordered ascending
windowSpecBottom = Window.partitionBy("Stratification1").orderBy(asc("Average_Disability_Rate"))

df_bottom5 = df_state_avg.withColumn("rank", row_number().over(windowSpecBottom)).filter("rank <= 5")

df_bottom5.select("Stratification1", "LocationDesc", "Average_Disability_Rate", "rank").display()


In [0]:
df_ci = df_filtered_2.withColumn(
    "Confidence_Width",
    col("High_Confidence_Limit") - col("Low_Confidence_Limit")
)

df_ci.orderBy(desc("Confidence_Width")).display()


In [0]:
from pyspark.sql.functions import when

df_ci_flagged = df_ci.withColumn(
    "High_Uncertainty",
    when(col("Confidence_Width") > 30, True).otherwise(False)
)


In [0]:
df_clean = df_ci.filter(col("Confidence_Width") <= 30)

In [0]:
df_ci.groupBy("Stratification1").avg("Confidence_Width").orderBy(desc("avg(Confidence_Width)")).display()


In [0]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

pdf = df_ci.select("Stratification1", "Confidence_Width").toPandas()
plt.figure(figsize=(12,6))
sns.boxplot(x="Stratification1", y="Confidence_Width", data=pdf)
plt.xticks(rotation=45)
plt.title("Confidence Width by Disability Type")
plt.show()


In [0]:
# Convert full DataFrame to pandas (or sample if too large)
df_ci_pd = df_ci.select("Confidence_Width").dropna().toPandas()

plt.figure(figsize=(10,6))
plt.hist(df_ci_pd["Confidence_Width"], bins=30, color="skyblue", edgecolor="black")
plt.title("Distribution of Confidence Interval Widths")
plt.xlabel("Confidence Width")
plt.ylabel("Frequency")
plt.tight_layout()
plt.show()


In [0]:
from pyspark.sql.functions import avg
import matplotlib.pyplot as plt

df_grouped = df_filtered_2.filter(df_filtered_2["Stratification1"] == "Hearing Disability") \
    .groupBy("Year") \
    .agg(avg("Data_Value").alias("Average_Disability"))

df_grouped_pd = df_grouped.orderBy("Year").toPandas()
df_grouped_pd.plot(x="Year", y="Average_Disability", kind="line", marker='o', figsize=(10, 5), color="orange")
plt.title("Trend of Hearing Disability Over Years")
plt.xlabel("Year")
plt.ylabel("Average Disability Value")
plt.grid(True)
plt.tight_layout()
plt.show()
