# Code to analyze the cleaned homeless deaths data using PySpark
Note: 

I ran this code on google colab, so it has the code for that, consider running it there rather than locally.

In [None]:
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz
!tar xf spark-3.5.1-bin-hadoop3-scala2.13.tgz
!pip install -q findspark
!pip install pyspark

In [2]:
import os
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version
os.environ["JAVA_HOME"] = f"/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-3.5.1-bin-hadoop3-scala2.13"

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Homeless Deaths Analysis") \
    .getOrCreate()

data_by_cause = spark.read.csv('/content/cleaned_homeless-deaths-by-cause.csv', header=True, inferSchema=True)
data_by_demographics = spark.read.csv('/content/cleaned_homeless-deaths-by-demographics.csv', header=True, inferSchema=True)
data_by_month = spark.read.csv('/content/cleaned_homeless-deaths-by-month.csv', header=True, inferSchema=True)

filtered_data_by_cause = data_by_cause.filter(data_by_cause["Year of death"] == 2023)
aggregated_data_by_cause = data_by_cause.groupBy("Cause_of_death").sum("Count")

filtered_data_by_cause.show()
aggregated_data_by_cause.show()

aggregated_data_by_cause.write.parquet('/content/aggregated_homeless_deaths_by_cause.parquet')

spark.stop()

+---+-------------+--------------------+---------+-------+-----+
|_id|Year of death|      Cause_of_death|Age_group| Gender|Count|
+---+-------------+--------------------+---------+-------+-----+
|  1|         2023|cardiovascular di...|  Unknown|   Male|    1|
| 16|         2023|       drug toxicity|    40-59| Female|    7|
| 22|         2023|             suicide|    40-59|   Male|    1|
| 35|         2023|           pneumonia|  Unknown|   Male|    1|
| 36|         2023|             unknown|  Unknown|Unknown|    1|
| 56|         2023|             unknown|  Unknown| Female|    2|
| 65|         2023|             unknown|    20-39| Female|    3|
| 72|         2023|               other|    20-39| Female|    1|
| 79|         2023|       drug toxicity|    20-39|   Male|   16|
| 90|         2023|            accident|    40-59|   Male|    4|
|102|         2023|              cancer|    40-59|   Male|    2|
|105|         2023|            accident|  Unknown|   Male|    1|
|127|         2023|      

In [15]:
spark = SparkSession.builder \
    .appName("Homeless Deaths Analysis") \
    .getOrCreate()

data_by_cause = spark.read.csv('/content/cleaned_homeless-deaths-by-cause.csv', header=True, inferSchema=True)
data_by_demographics = spark.read.csv('/content/cleaned_homeless-deaths-by-demographics.csv', header=True, inferSchema=True)
data_by_month = spark.read.csv('/content/cleaned_homeless-deaths-by-month.csv', header=True, inferSchema=True)

# QoL Column renaming
## Rename the column count to demo_count
data_by_demographics = data_by_demographics.withColumnRenamed("count", "demo_count")
## Rename the column Age_group to demo_age_group
data_by_demographics = data_by_demographics.withColumnRenamed("Age_group", "demo_age_group")
## Rename the column Gender to demo_gender
data_by_demographics = data_by_demographics.withColumnRenamed("Gender", "demo_gender")

## Rename the column count to cause_count
data_by_cause = data_by_cause.withColumnRenamed("count", "cause_count")

## Rename the column count to month_count
data_by_month = data_by_month.withColumnRenamed("count", "month_count")

# Join data_by_cause with data_by_demographics on "Year of death"
joined_cause_demographics = data_by_cause.join(
    data_by_demographics,
    on=["Year of death"],
    how="inner"
)

# Join the result with data_by_month on "Year of death"
joined_all = joined_cause_demographics.join(
    data_by_month,
    on=["Year of death"],
    how="inner"
)

# Select relevant columns to avoid duplicates
selected_columns = joined_all.select(
    "Year of death",
    "Cause_of_death",
    "Age_group",
    "Gender",
    "Month of death",
    joined_cause_demographics["cause_count"].alias("Count_by_cause"),
    data_by_demographics["demo_count"].alias("Count_by_demographics"),
    data_by_month["month_count"].alias("Count_by_month")
)

# Show the final DataFrame
selected_columns.show()

# Save the final DataFrame to Parquet
# selected_columns.write.parquet('joined_homeless_deaths_data.parquet')
 
# Stop the Spark session
spark.stop()

+-------------+--------------------+---------+------+--------------+--------------+---------------------+--------------+
|Year of death|      Cause_of_death|Age_group|Gender|Month of death|Count_by_cause|Count_by_demographics|Count_by_month|
+-------------+--------------------+---------+------+--------------+--------------+---------------------+--------------+
|         2023|cardiovascular di...|  Unknown|  Male|     September|             1|                   24|            15|
|         2023|cardiovascular di...|  Unknown|  Male|      December|             1|                   24|             9|
|         2023|cardiovascular di...|  Unknown|  Male|        August|             1|                   24|             5|
|         2023|cardiovascular di...|  Unknown|  Male|       January|             1|                   24|            19|
|         2023|cardiovascular di...|  Unknown|  Male|       October|             1|                   24|            10|
|         2023|cardiovascular di

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/path/to/joined_homeless_deaths_data.parquet already exists. Set mode as "overwrite" to overwrite the existing path.