In [None]:
import os

os.environ.setdefault("PYARROW_IGNORE_TIMEZONE", "1")

In [None]:
import warnings

import findspark
import numpy as np
import pyspark.pandas as ps
import pandas as pd
import seaborn as sns
import geopandas as gpd
import folium
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand
from pyspark.sql.types import FloatType, IntegerType

In [None]:
pd.set_option("display.max_rows", 500)

In [None]:
findspark.init()
findspark.find()

In [None]:
RESET = True
SPARK_CONNECT = False
HOST = None

if RESET:
    os.environ.pop("SPARK_REMOTE", None)
    os.environ.pop("SPARK_CONNECT_MODE_ENABLED", None)
    try:
        exec("spark.stop()")
    except Exception as e:
        print(e)

spark = None

if SPARK_CONNECT:
    HOST = "localhost"
    spark = (
        SparkSession.builder.remote("sc://localhost:15002")
        .appName("ResearchProject")
        .config("spark.driver.host", HOST)
        .config("spark.driver.port", 51041)
        .getOrCreate()
    )
else:
    HOST = "192.168.1.122"
    spark = SparkSession.builder
    spark._options.pop("spark.remote", None)
    spark = (
        spark.master("spark://localhost:7077")
        .appName("ResearchProject")
        .config("spark.driver.host", HOST)
        .config("spark.driver.port", "51041")
        .config("spark.driver.maxResultSize", 0)
        .getOrCreate()
    )
    # make spark more efficient
    prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")
    ps.set_option("compute.default_index_type", "distributed")
    warnings.filterwarnings("ignore")

print(spark)

In [None]:
df = spark.read.csv(
    f"hdfs://{HOST}:8020/data/MHSVI2020_US_county.csv",
    header=True,
    encoding="ISO-8859-1",
).pandas_api()

In [None]:
head = df.head()
print(head)

In [None]:
print(df.dtypes)

In [None]:
exclude_cols = ["ST", "STATE", "ST_ABBR", "COUNTY"]

cols = [col for col in df.columns if col not in exclude_cols]

for col in cols:
    is_int = True
    for row in df[col].loc[0:5].to_list():
        is_int &= row.isdigit()
        if is_int:
            break
    if is_int:
        df[col] = df[col].astype("int64")
    else:
        df[col] = ps.to_numeric(df[col])

In [None]:
df.head()

In [None]:
df.dtypes

In [None]:
afam_df = df[
    [
        "UNIQUE_ID",
        "ST",
        "STATE",
        "ST_ABBR",
        "COUNTY",
        "FIPS",
        "E_TOTPOP",
        "E_AFAM",
        "EP_AFAM",
        "EP_HBURD",
        "EP_NOHSDP",
        "EP_POV150",
        "EP_UNINSUR",
        "EP_DISABL",
        "EP_CROWD",
        "EP_NOINT",
        "R_HOSP",
        "R_PHARM",
        "R_URG",
        "ER_DIAB",
        "ER_RESPD",
        "ER_OBES",
        "R_CARDIO",
    ]
].copy()

In [None]:
afam_df.head()

In [None]:
afam_df = afam_df[afam_df["EP_AFAM"] >= 14.2]

In [None]:
afam_df.head()

In [None]:
len(afam_df)

In [None]:
correlation_matrix = (
    afam_df[
        [
            "ER_DIAB",  # outcomes
            "ER_RESPD",
            "ER_OBES",
            "R_CARDIO",
            "EP_HBURD",  # factors
            "EP_NOHSDP",
            "EP_POV150",
            "EP_UNINSUR",
            "EP_DISABL",
            "EP_CROWD",
            "EP_NOINT",
            "R_HOSP",
            "R_PHARM",
            "R_URG",
        ]
    ]
    .corr()
    .to_pandas()
)

print(correlation_matrix.to_markdown())
print(type(correlation_matrix))
print(correlation_matrix["ER_DIAB"]["ER_DIAB"])

In [None]:
plt.figure(figsize=(15, 12))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", center=0.0, linewidths=0.5)
plt.title("Correlation Heatmap of Health Factors and Outcomes in Black Communities")
plt.show()

In [None]:
afam_percentage_state_df = df[["STATE", "E_TOTPOP", "E_AFAM"]].copy()
afam_percentage_state_df = afam_percentage_state_df.groupby("STATE")[["E_TOTPOP", "E_AFAM"]]
afam_percentage_state_df = afam_percentage_state_df.apply(lambda x: x["E_AFAM"].sum() / x["E_TOTPOP"].sum())
afam_percentage_state_df = afam_percentage_state_df.reset_index(name="ER_AFAM")


In [None]:
afam_percentage_state_df.to_csv('.')

In [None]:
diab_count_state_df = (
    afam_df[["STATE","E_AFAM","ER_DIAB"]]
    .groupby("STATE")
    [["E_AFAM", "ER_DIAB"]]
    .apply(lambda x: sum(x["E_AFAM"] * x["ER_DIAB"]))
    .reset_index(name="E_AFAM_DIAB")
)

# print(diab_count_state_df)

In [None]:
respd_count_state_d = (
    afam_df[["STATE","E_AFAM","ER_RESPD"]]
    .groupby("STATE")
    [["E_AFAM", "ER_RESPD"]]
    .apply(lambda x: sum(x["E_AFAM"] * x["ER_RESPD"]))
    .reset_index(name="E_AFAM_RESPD")
)

print(respd_count_state_d)

In [None]:
obes_count_state_df = (
    afam_df[["STATE","E_AFAM","ER_OBES"]]
    .groupby("STATE")
    [["E_AFAM", "ER_OBES"]]
    .apply(lambda x: sum(x["E_AFAM"] * x["ER_OBES"]))
    .reset_index(name="E_AFAM_OBES")
)

print(obes_count_state_df)

In [None]:
cardio_count_state_df = (
    afam_df[["STATE","E_AFAM","R_CARDIO"]]
    .groupby("STATE")
    [["E_AFAM", "R_CARDIO"]]
    .apply(lambda x: sum(x["E_AFAM"] * x["R_CARDIO"]))
    .reset_index(name="E_AFAM_CARDIO")
)

print(cardio_count_state_df)

In [None]:
# Sample DataFrame for heat map
heat_map_data = {
    "state": ["California", "Texas", "New York", "Florida", "Illinois"],
    "value": [100, 90, 80, 70, 60],
}
df_heat_map = pd.DataFrame(heat_map_data)

# Sample DataFrame for bubble map
bubble_map_data = {
    "state": ["California", "Texas", "New York", "Florida", "Illinois"],
    "bubble_value": [50, 40, 30, 20, 10],
}
df_bubble_map = pd.DataFrame(bubble_map_data)

# Load the US states shapefile
states = gpd.read_file(
    "https://raw.githubusercontent.com/johan/world.geo.json/master/countries/USA.geo.json"
)

# Rename the column to match the DataFrame
states = states.rename(columns={"name": "state"})

# Merge the DataFrame with the GeoDataFrame for heat map
merged_heat_map = states.set_index("state").join(df_heat_map.set_index("state"))

# Initialize the map centered on the US
m = folium.Map(location=[37.0902, -95.7129], zoom_start=4)

# Add a heat map layer
folium.Choropleth(
    geo_data=states,
    name="choropleth",
    data=merged_heat_map,
    columns=[merged_heat_map.index, "value"],
    key_on="feature.properties.state",
    fill_color="YlOrRd",
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name="Value by State",
).add_to(m)

# Get state centroid coordinates for bubble map
state_coordinates = {
    "California": [36.7783, -119.4179],
    "Texas": [31.9686, -99.9018],
    "New York": [40.7128, -74.0060],
    "Florida": [27.9944, -81.7603],
    "Illinois": [40.6331, -89.3985],
}

state_coordinates = {
    "Alabama": (32.806671, -86.79113),
    "Alaska": (61.370716, -152.404419),
    "Arizona": (33.729759, -111.431221),
    "Arkansas": (34.969704, -92.373123),
    "California": (36.116203, -119.681564),
    "Colorado": (39.059811, -105.311104),
    "Connecticut": (41.597782, -72.755371),
    "Delaware": (39.318523, -75.507141),
    "Florida": (27.766279, -81.686783),
    "Georgia": (33.040619, -83.643074),
    "Hawaii": (21.094318, -157.498337),
    "Idaho": (44.240459, -114.478828),
    "Illinois": (40.349457, -88.986137),
    "Indiana": (39.849426, -86.258278),
    "Iowa": (42.011539, -93.210526),
    "Kansas": (38.5266, -96.726486),
    "Kentucky": (37.66814, -84.670067),
    "Louisiana": (31.169546, -91.867805),
    "Maine": (44.693947, -69.381927),
    "Maryland": (39.063946, -76.802101),
    "Massachusetts": (42.230171, -71.530106),
    "Michigan": (43.326618, -84.536095),
    "Minnesota": (45.694454, -93.900192),
    "Mississippi": (32.741646, -89.678696),
    "Missouri": (38.456085, -92.288368),
    "Montana": (46.921925, -110.454353),
    "Nebraska": (41.12537, -98.268082),
    "Nevada": (38.313515, -117.055374),
    "New Hampshire": (43.452492, -71.563896),
    "New Jersey": (40.298904, -74.521011),
    "New Mexico": (34.840515, -106.248482),
    "New York": (42.165726, -74.948051),
    "North Carolina": (35.630066, -79.806419),
    "North Dakota": (47.528912, -99.784012),
    "Ohio": (40.388783, -82.764915),
    "Oklahoma": (35.565342, -96.928917),
    "Oregon": (44.572021, -122.070938),
    "Pennsylvania": (40.590752, -77.209755),
    "Rhode Island": (41.680893, -71.51178),
    "South Carolina": (33.856892, -80.945007),
    "South Dakota": (44.299782, -99.438828),
    "Tennessee": (35.747845, -86.692345),
    "Texas": (31.054487, -97.563461),
    "Utah": (40.150032, -111.862434),
    "Vermont": (44.045876, -72.710686),
    "Virginia": (37.769337, -78.169968),
    "Washington": (47.400902, -121.490494),
    "West Virginia": (38.491226, -80.954063),
    "Wisconsin": (44.268543, -89.616508),
    "Wyoming": (42.755966, -107.30249),
}


df_bubble_map["lat"] = df_bubble_map["state"].map(lambda x: state_coordinates[x][0])
df_bubble_map["lon"] = df_bubble_map["state"].map(lambda x: state_coordinates[x][1])

# Add bubble map layer
for _, row in df_bubble_map.iterrows():
    folium.CircleMarker(
        location=[row["lat"], row["lon"]],
        radius=row["bubble_value"] / 10,  # Adjust size as needed
        color="blue",
        fill=True,
        fill_color="blue",
        fill_opacity=0.6,
        popup=row["state"],
    ).add_to(m)

# Save the map as an HTML file
m.save("combined_map.html")

In [None]:
# Step 4: Save as CSV
output_path = f"hdfs://{HOST}:8020/results/temp"
df.to_csv(output_path, header=True, mode="overwrite")