In [1]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
478,application_1764662801237_0480,pyspark,idle,Link,Link,,
482,application_1764662801237_0484,pyspark,idle,Link,Link,,
485,application_1764662801237_0487,pyspark,idle,Link,Link,,
486,application_1764662801237_0488,pyspark,idle,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, to_date, count, sum as _sum, corr, desc, lit, avg, regexp_replace, expr, upper, trim, split, explode, format_number
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, FloatType
import time

# Sedona Imports
from sedona.spark.SedonaContext import SedonaContext
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

builder = SparkSession.builder \
    .appName("Query 5 execution 1") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config("spark.sql.extensions", "org.apache.spark.sql.sedona_sql.io.SedonaSqlWrapper") \
    .getOrCreate()
spark = SedonaContext.create(builder)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
490,application_1764662801237_0492,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Preparing the data
Crime_data_schema = StructType([
    StructField("DR_NO", IntegerType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", IntegerType()),
    StructField("AREA", IntegerType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", IntegerType()),
    StructField("Part 1-2", IntegerType()),
    StructField("Crm Cd", IntegerType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", IntegerType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", IntegerType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", IntegerType()),
    StructField("Crm Cd 2", IntegerType()),
    StructField("Crm Cd 3", IntegerType()),
    StructField("Crm Cd 4", IntegerType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

Income_schema = StructType([
    StructField("Zip Code", IntegerType()),
    StructField("Community", StringType()),
    StructField("Estimated Median Income", StringType()),
])

Crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                          header = True, \
                          schema = Crime_data_schema)

blocks_df = spark.read.format("geojson") \
    .option("multiLine", "true") \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson") \
    .selectExpr("explode(features) as features") \
    .select("features.*")

income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv", \
                           header = True, \
                           schema = Income_schema, \
                           sep = ";")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# --- 1. PREPARING CRIME DATA (2020-2021) ---
# Filtering for years 2020-2021 Null Island (0,0) erasing
crimes_filtered = Crime_df.filter((col("LAT") != 0) & (col("LON") != 0)) \
    .withColumn("Year", year(to_date(col("DATE OCC"), "yyyy MMM dd hh:mm:ss a"))) \
    .filter(col("Year").isin([2020, 2021])) \
    .withColumn("geometry_point", expr("ST_Point(LON, LAT)")) \
    .select("DR_NO", "geometry_point")

# --- 2. PREPARING CENSUS DATA ---
# We use POP20 (Population) and COMM (Location)
census_blocks = blocks_df.select(
    col("geometry"),
    col("properties.COMM").alias("COMM"),
    col("properties.POP20").alias("POP20")
).filter(col("geometry").isNotNull())

# --- 3. PREPARING INCOME DATA ---
income_clean = income_df.withColumn("Median_Income", regexp_replace(col("Estimated Median Income"), "[$,]", "").cast(IntegerType())) \
    .filter(col("Median_Income").isNotNull()) \
    .withColumn("raw_comm", col("Community")) \
    .withColumn("comm_fixed_la", regexp_replace(col("raw_comm"), "Los Angeles \\((.*?)\\)", "$1")) \
    .withColumn("comm_final_str", regexp_replace(col("comm_fixed_la"), "\\s*\\(.*?\\)", "")) \
    .withColumn("comm_array", split(col("comm_final_str"), ",")) \
    .select(explode(col("comm_array")).alias("Community_Clean"), col("Median_Income")) \
    .withColumn("Community_Clean", upper(trim(col("Community_Clean")))) \
    .groupBy("Community_Clean") \
    .agg(avg("Median_Income").alias("Median_Income"))

# --- 4. SPATIAL JOIN (CRIMES + CENSUS) ---
# We find in which Census block each crime is registered
spatial_join = crimes_filtered.alias("crimes").join(
    census_blocks.alias("blocks"),
    expr("ST_Contains(blocks.geometry, crimes.geometry_point)"),
    "inner"
)

# --- 5. AGGREGATIONS (PER COMMUNITY) ---
# Step Α: Counting crimes per COMM
crimes_per_comm = spatial_join.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(count("DR_NO").alias("Total_Crimes_2y"))

# Step Β: Summing the population per COMM
pop_per_comm = census_blocks.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(_sum("POP20").alias("Total_Population")) \
    .filter(col("Total_Population") > 0)

# Step C: Merging (Metrics + Income)
# Join 'Community' and 'COMM'
final_stats = pop_per_comm.join(crimes_per_comm, "COMM_Clean", "inner") \
    .join(income_clean, pop_per_comm.COMM_Clean == income_clean.Community_Clean, "inner") \
    .select(
        col("COMM_Clean").alias("COMM"),
        col("Total_Population"),
        col("Total_Crimes_2y"),
        col("Median_Income")
    )

# --- 6. CALCULATE FINAL METRIC ---
# Annual average crimes per person:
# (Total_Crimes / 2) / Total_Population
final_stats = final_stats.withColumn(
    "Crimes_Per_Person_Yearly",
    (col("Total_Crimes_2y") / 2) / col("Total_Population")
)

# Join explain
final_stats.explain()

# Time Benchmarking
start_time = time.time()
total_records = final_stats.count()
end_time = time.time()
print(f"Execution Time: {end_time - start_time:.4f} seconds")

# --- 7. CORRELATION CALCULATIONS ---
# 7.1 All Blocks
corr_all = final_stats.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (All Communities): {corr_all}")

# 7.2 Top 10 and Bottom 10 based on income
top_10 = final_stats.orderBy(col("Median_Income").desc()).limit(10)
bottom_10 = final_stats.orderBy(col("Median_Income").asc()).limit(10)

# Merge
subset_extremes = top_10.union(bottom_10)

corr_extremes = subset_extremes.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (Top 10 & Bottom 10 Income Areas): {corr_extremes}")

# Visualization
print("\n--- Top 10 Wealthiest Areas Stats ---")
top_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()
print("\n--- Top 10 Poorest Areas Stats ---")
bottom_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM_Clean#251 AS COMM#282, Total_Population#261L, Total_Crimes_2y#248L, Median_Income#221, ((cast(Total_Crimes_2y#248L as double) / 2.0) / cast(Total_Population#261L as double)) AS Crimes_Per_Person_Yearly#287]
   +- BroadcastHashJoin [COMM_Clean#251], [Community_Clean#215], Inner, BuildRight, false
      :- Project [COMM_Clean#251, Total_Population#261L, Total_Crimes_2y#248L]
      :  +- BroadcastHashJoin [COMM_Clean#251], [COMM_Clean#234], Inner, BuildLeft, false
      :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=190]
      :     :  +- Filter (isnotnull(Total_Population#261L) AND (Total_Population#261L > 0))
      :     :     +- HashAggregate(keys=[COMM_Clean#251], functions=[sum(POP20#169L)], schema specialized)
      :     :        +- Exchange hashpartitioning(COMM_Clean#251, 1000), ENSURE_REQUIREMENTS, [plan_id=184]
      :     :           +- HashAggregate(k

In [1]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "4",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
478,application_1764662801237_0480,pyspark,idle,Link,Link,,
482,application_1764662801237_0484,pyspark,idle,Link,Link,,
485,application_1764662801237_0487,pyspark,idle,Link,Link,,
486,application_1764662801237_0488,pyspark,idle,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, to_date, count, sum as _sum, corr, desc, lit, avg, regexp_replace, expr, upper, trim, split, explode, format_number
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, FloatType
import time

# Sedona Imports
from sedona.spark.SedonaContext import SedonaContext
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

builder = SparkSession.builder \
    .appName("Query 5 execution 2") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config("spark.sql.extensions", "org.apache.spark.sql.sedona_sql.io.SedonaSqlWrapper") \
    .getOrCreate()
spark = SedonaContext.create(builder)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
491,application_1764662801237_0493,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Preparing the data
Crime_data_schema = StructType([
    StructField("DR_NO", IntegerType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", IntegerType()),
    StructField("AREA", IntegerType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", IntegerType()),
    StructField("Part 1-2", IntegerType()),
    StructField("Crm Cd", IntegerType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", IntegerType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", IntegerType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", IntegerType()),
    StructField("Crm Cd 2", IntegerType()),
    StructField("Crm Cd 3", IntegerType()),
    StructField("Crm Cd 4", IntegerType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

Income_schema = StructType([
    StructField("Zip Code", IntegerType()),
    StructField("Community", StringType()),
    StructField("Estimated Median Income", StringType()),
])

Crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                          header = True, \
                          schema = Crime_data_schema)

blocks_df = spark.read.format("geojson") \
    .option("multiLine", "true") \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson") \
    .selectExpr("explode(features) as features") \
    .select("features.*")

income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv", \
                           header = True, \
                           schema = Income_schema, \
                           sep = ";")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# --- 1. PREPARING CRIME DATA (2020-2021) ---
# Filtering for years 2020-2021 Null Island (0,0) erasing
crimes_filtered = Crime_df.filter((col("LAT") != 0) & (col("LON") != 0)) \
    .withColumn("Year", year(to_date(col("DATE OCC"), "yyyy MMM dd hh:mm:ss a"))) \
    .filter(col("Year").isin([2020, 2021])) \
    .withColumn("geometry_point", expr("ST_Point(LON, LAT)")) \
    .select("DR_NO", "geometry_point")

# --- 2. PREPARING CENSUS DATA ---
# We use POP20 (Population) and COMM (Location)
census_blocks = blocks_df.select(
    col("geometry"),
    col("properties.COMM").alias("COMM"),
    col("properties.POP20").alias("POP20")
).filter(col("geometry").isNotNull())

# --- 3. PREPARING INCOME DATA ---
income_clean = income_df.withColumn("Median_Income", regexp_replace(col("Estimated Median Income"), "[$,]", "").cast(IntegerType())) \
    .filter(col("Median_Income").isNotNull()) \
    .withColumn("raw_comm", col("Community")) \
    .withColumn("comm_fixed_la", regexp_replace(col("raw_comm"), "Los Angeles \\((.*?)\\)", "$1")) \
    .withColumn("comm_final_str", regexp_replace(col("comm_fixed_la"), "\\s*\\(.*?\\)", "")) \
    .withColumn("comm_array", split(col("comm_final_str"), ",")) \
    .select(explode(col("comm_array")).alias("Community_Clean"), col("Median_Income")) \
    .withColumn("Community_Clean", upper(trim(col("Community_Clean")))) \
    .groupBy("Community_Clean") \
    .agg(avg("Median_Income").alias("Median_Income"))

# --- 4. SPATIAL JOIN (CRIMES + CENSUS) ---
# We find in which Census block each crime is registered
spatial_join = crimes_filtered.alias("crimes").join(
    census_blocks.alias("blocks"),
    expr("ST_Contains(blocks.geometry, crimes.geometry_point)"),
    "inner"
)

# --- 5. AGGREGATIONS (PER COMMUNITY) ---
# Step Α: Counting crimes per COMM
crimes_per_comm = spatial_join.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(count("DR_NO").alias("Total_Crimes_2y"))

# Step Β: Summing the population per COMM
pop_per_comm = census_blocks.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(_sum("POP20").alias("Total_Population")) \
    .filter(col("Total_Population") > 0)

# Step C: Merging (Metrics + Income)
# Join 'Community' and 'COMM'
final_stats = pop_per_comm.join(crimes_per_comm, "COMM_Clean", "inner") \
    .join(income_clean, pop_per_comm.COMM_Clean == income_clean.Community_Clean, "inner") \
    .select(
        col("COMM_Clean").alias("COMM"),
        col("Total_Population"),
        col("Total_Crimes_2y"),
        col("Median_Income")
    )

# --- 6. CALCULATE FINAL METRIC ---
# Annual average crimes per person:
# (Total_Crimes / 2) / Total_Population
final_stats = final_stats.withColumn(
    "Crimes_Per_Person_Yearly",
    (col("Total_Crimes_2y") / 2) / col("Total_Population")
)

# Join explain
final_stats.explain()

# Time Benchmarking
start_time = time.time()
total_records = final_stats.count()
end_time = time.time()
print(f"Execution Time: {end_time - start_time:.4f} seconds")

# --- 7. CORRELATION CALCULATIONS ---
# 7.1 All Blocks
corr_all = final_stats.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (All Communities): {corr_all}")

# 7.2 Top 10 and Bottom 10 based on income
top_10 = final_stats.orderBy(col("Median_Income").desc()).limit(10)
bottom_10 = final_stats.orderBy(col("Median_Income").asc()).limit(10)

# Merge
subset_extremes = top_10.union(bottom_10)

corr_extremes = subset_extremes.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (Top 10 & Bottom 10 Income Areas): {corr_extremes}")

# Visualization
print("\n--- Top 10 Wealthiest Areas Stats ---")
top_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()
print("\n--- Top 10 Poorest Areas Stats ---")
bottom_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM_Clean#251 AS COMM#282, Total_Population#261L, Total_Crimes_2y#248L, Median_Income#221, ((cast(Total_Crimes_2y#248L as double) / 2.0) / cast(Total_Population#261L as double)) AS Crimes_Per_Person_Yearly#287]
   +- BroadcastHashJoin [COMM_Clean#251], [Community_Clean#215], Inner, BuildRight, false
      :- Project [COMM_Clean#251, Total_Population#261L, Total_Crimes_2y#248L]
      :  +- BroadcastHashJoin [COMM_Clean#251], [COMM_Clean#234], Inner, BuildLeft, false
      :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=190]
      :     :  +- Filter (isnotnull(Total_Population#261L) AND (Total_Population#261L > 0))
      :     :     +- HashAggregate(keys=[COMM_Clean#251], functions=[sum(POP20#169L)], schema specialized)
      :     :        +- Exchange hashpartitioning(COMM_Clean#251, 1000), ENSURE_REQUIREMENTS, [plan_id=184]
      :     :           +- HashAggregate(k

In [1]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "8",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
478,application_1764662801237_0480,pyspark,idle,Link,Link,,
482,application_1764662801237_0484,pyspark,idle,Link,Link,,
485,application_1764662801237_0487,pyspark,idle,Link,Link,,
486,application_1764662801237_0488,pyspark,idle,Link,Link,,
492,application_1764662801237_0494,pyspark,idle,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, to_date, count, sum as _sum, corr, desc, lit, avg, regexp_replace, expr, upper, trim, split, explode, format_number
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, FloatType
import time

# Sedona Imports
from sedona.spark.SedonaContext import SedonaContext
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

builder = SparkSession.builder \
    .appName("Query 5 execution 3") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config("spark.sql.extensions", "org.apache.spark.sql.sedona_sql.io.SedonaSqlWrapper") \
    .getOrCreate()
spark = SedonaContext.create(builder)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
494,application_1764662801237_0496,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Preparing the data
Crime_data_schema = StructType([
    StructField("DR_NO", IntegerType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", IntegerType()),
    StructField("AREA", IntegerType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", IntegerType()),
    StructField("Part 1-2", IntegerType()),
    StructField("Crm Cd", IntegerType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", IntegerType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", IntegerType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", IntegerType()),
    StructField("Crm Cd 2", IntegerType()),
    StructField("Crm Cd 3", IntegerType()),
    StructField("Crm Cd 4", IntegerType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

Income_schema = StructType([
    StructField("Zip Code", IntegerType()),
    StructField("Community", StringType()),
    StructField("Estimated Median Income", StringType()),
])

Crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                          header = True, \
                          schema = Crime_data_schema)

blocks_df = spark.read.format("geojson") \
    .option("multiLine", "true") \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson") \
    .selectExpr("explode(features) as features") \
    .select("features.*")

income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv", \
                           header = True, \
                           schema = Income_schema, \
                           sep = ";")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# --- 1. PREPARING CRIME DATA (2020-2021) ---
# Filtering for years 2020-2021 Null Island (0,0) erasing
crimes_filtered = Crime_df.filter((col("LAT") != 0) & (col("LON") != 0)) \
    .withColumn("Year", year(to_date(col("DATE OCC"), "yyyy MMM dd hh:mm:ss a"))) \
    .filter(col("Year").isin([2020, 2021])) \
    .withColumn("geometry_point", expr("ST_Point(LON, LAT)")) \
    .select("DR_NO", "geometry_point")

# --- 2. PREPARING CENSUS DATA ---
# We use POP20 (Population) and COMM (Location)
census_blocks = blocks_df.select(
    col("geometry"),
    col("properties.COMM").alias("COMM"),
    col("properties.POP20").alias("POP20")
).filter(col("geometry").isNotNull())

# --- 3. PREPARING INCOME DATA ---
income_clean = income_df.withColumn("Median_Income", regexp_replace(col("Estimated Median Income"), "[$,]", "").cast(IntegerType())) \
    .filter(col("Median_Income").isNotNull()) \
    .withColumn("raw_comm", col("Community")) \
    .withColumn("comm_fixed_la", regexp_replace(col("raw_comm"), "Los Angeles \\((.*?)\\)", "$1")) \
    .withColumn("comm_final_str", regexp_replace(col("comm_fixed_la"), "\\s*\\(.*?\\)", "")) \
    .withColumn("comm_array", split(col("comm_final_str"), ",")) \
    .select(explode(col("comm_array")).alias("Community_Clean"), col("Median_Income")) \
    .withColumn("Community_Clean", upper(trim(col("Community_Clean")))) \
    .groupBy("Community_Clean") \
    .agg(avg("Median_Income").alias("Median_Income"))

# --- 4. SPATIAL JOIN (CRIMES + CENSUS) ---
# We find in which Census block each crime is registered
spatial_join = crimes_filtered.alias("crimes").join(
    census_blocks.alias("blocks"),
    expr("ST_Contains(blocks.geometry, crimes.geometry_point)"),
    "inner"
)

# --- 5. AGGREGATIONS (PER COMMUNITY) ---
# Step Α: Counting crimes per COMM
crimes_per_comm = spatial_join.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(count("DR_NO").alias("Total_Crimes_2y"))

# Step Β: Summing the population per COMM
pop_per_comm = census_blocks.withColumn("COMM_Clean", upper(trim(col("COMM")))) \
    .groupBy("COMM_Clean") \
    .agg(_sum("POP20").alias("Total_Population")) \
    .filter(col("Total_Population") > 0)

# Step C: Merging (Metrics + Income)
# Join 'Community' and 'COMM'
final_stats = pop_per_comm.join(crimes_per_comm, "COMM_Clean", "inner") \
    .join(income_clean, pop_per_comm.COMM_Clean == income_clean.Community_Clean, "inner") \
    .select(
        col("COMM_Clean").alias("COMM"),
        col("Total_Population"),
        col("Total_Crimes_2y"),
        col("Median_Income")
    )

# --- 6. CALCULATE FINAL METRIC ---
# Annual average crimes per person:
# (Total_Crimes / 2) / Total_Population
final_stats = final_stats.withColumn(
    "Crimes_Per_Person_Yearly",
    (col("Total_Crimes_2y") / 2) / col("Total_Population")
)

# Join explain
final_stats.explain()

# Time Benchmarking
start_time = time.time()
total_records = final_stats.count()
end_time = time.time()
print(f"Execution Time: {end_time - start_time:.4f} seconds")

# --- 7. CORRELATION CALCULATIONS ---
# 7.1 All Blocks
corr_all = final_stats.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (All Communities): {corr_all}")

# 7.2 Top 10 and Bottom 10 based on income
top_10 = final_stats.orderBy(col("Median_Income").desc()).limit(10)
bottom_10 = final_stats.orderBy(col("Median_Income").asc()).limit(10)

# Merge
subset_extremes = top_10.union(bottom_10)

corr_extremes = subset_extremes.stat.corr("Median_Income", "Crimes_Per_Person_Yearly")
print(f"Correlation (Top 10 & Bottom 10 Income Areas): {corr_extremes}")

# Visualization
print("\n--- Top 10 Wealthiest Areas Stats ---")
top_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()
print("\n--- Top 10 Poorest Areas Stats ---")
bottom_10.withColumn("Crimes_Per_Person_Yearly", format_number("Crimes_Per_Person_Yearly", 5)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM_Clean#251 AS COMM#282, Total_Population#261L, Total_Crimes_2y#248L, Median_Income#221, ((cast(Total_Crimes_2y#248L as double) / 2.0) / cast(Total_Population#261L as double)) AS Crimes_Per_Person_Yearly#287]
   +- BroadcastHashJoin [COMM_Clean#251], [Community_Clean#215], Inner, BuildRight, false
      :- Project [COMM_Clean#251, Total_Population#261L, Total_Crimes_2y#248L]
      :  +- BroadcastHashJoin [COMM_Clean#251], [COMM_Clean#234], Inner, BuildLeft, false
      :     :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [plan_id=190]
      :     :  +- Filter (isnotnull(Total_Population#261L) AND (Total_Population#261L > 0))
      :     :     +- HashAggregate(keys=[COMM_Clean#251], functions=[sum(POP20#169L)], schema specialized)
      :     :        +- Exchange hashpartitioning(COMM_Clean#251, 1000), ENSURE_REQUIREMENTS, [plan_id=184]
      :     :           +- HashAggregate(k