In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "8",
    "spark.executor.cores": "1",
    "spark.executor.memory": "2g",
    "spark.driver.memory": "4g"
  }
}


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
403,application_1765289937462_0399,pyspark,idle,Link,Link,,
409,application_1765289937462_0405,pyspark,idle,Link,Link,,
424,application_1765289937462_0420,pyspark,idle,Link,Link,,
427,application_1765289937462_0423,pyspark,idle,Link,Link,,
433,application_1765289937462_0429,pyspark,idle,Link,Link,,
443,application_1765289937462_0439,pyspark,idle,Link,Link,,
445,application_1765289937462_0441,pyspark,idle,Link,Link,,
449,application_1765289937462_0445,pyspark,busy,Link,Link,,


In [2]:
import time
import numpy as np
import matplotlib.path as mplPath
from collections import defaultdict
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, udf, explode, sum as _sum, trim, regexp_replace, desc, asc, round, format_number
from pyspark.sql.types import StringType
from pyspark.sql.types import DecimalType

spark = SparkSession.builder.getOrCreate()

start_total = time.time()

crime_df = spark.read.option("header", "true").csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv")
income_df = spark.read.option("header", "true").option("delimiter", ";").csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv")
geo_df = spark.read.option("multiline", "true").json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")

crime_filtered = crime_df.filter(
    col("DATE OCC").contains("2020") | col("DATE OCC").contains("2021")
).select("LAT", "LON", "DR_NO").filter((col("LAT") != 0) & (col("LON") != 0))

income_clean = income_df.withColumn("clean_inc_str", regexp_replace(col("Estimated Median Income"), "[$,]", "")) \
                        .withColumn("Median_Income", col("clean_inc_str").cast("int")) \
                        .withColumn("ZIPcode", trim(col("Zip Code").cast("string"))) \
                        .filter(col("Median_Income").isNotNull())

blocks_df = geo_df.select(explode("features").alias("feature")) \
    .select(
        trim(col("feature.properties.ZCTA20").cast("string")).alias("ZIPcode"),     
        col("feature.properties.POP20").alias("Population"),    
        col("feature.geometry.coordinates").alias("coords"),
        col("feature.geometry.type").alias("type")
    )

local_blocks = blocks_df.select("ZIPcode", "coords", "type").collect()
GRID_SIZE = 0.01
spatial_grid = defaultdict(list)

for row in local_blocks:
    zip_code = row["ZIPcode"]
    if not zip_code: continue
    try:
        coords = row["coords"]
        gtype = row["type"]
        
        def add_to_grid(c):
            arr = np.array(c[0], dtype=float)
            path = mplPath.Path(arr)
            min_x, min_y = np.min(arr[:,0]), np.min(arr[:,1])
            max_x, max_y = np.max(arr[:,0]), np.max(arr[:,1])
            bbox = (min_x, min_y, max_x, max_y)
            shape_entry = (zip_code, path, bbox)
            
            start_x_idx = int(min_x / GRID_SIZE)
            end_x_idx   = int(max_x / GRID_SIZE)
            start_y_idx = int(min_y / GRID_SIZE)
            end_y_idx   = int(max_y / GRID_SIZE)
            
            for x_i in range(start_x_idx, end_x_idx + 1):
                for y_i in range(start_y_idx, end_y_idx + 1):
                    spatial_grid[(x_i, y_i)].append(shape_entry)
                    
        if gtype == 'Polygon':
            add_to_grid(coords)
        elif gtype == 'MultiPolygon':
            for poly_coords in coords:
                add_to_grid(poly_coords)
    except: continue

sc = spark.sparkContext
broadcast_grid = sc.broadcast(spatial_grid)

def find_zip_id(lat, lon):
    if lat is None or lon is None: return None
    try:
        x, y = float(lon), float(lat)
        grid_key = (int(x / GRID_SIZE), int(y / GRID_SIZE))
        candidates = broadcast_grid.value.get(grid_key, [])
        for zcode, poly, bbox in candidates:
            if not (bbox[0] <= x <= bbox[2] and bbox[1] <= y <= bbox[3]):
                continue
            if poly.contains_point((x, y)):
                return zcode
    except: return None
    return None

find_zip_udf = udf(find_zip_id, StringType())

crime_with_zip = crime_filtered.withColumn("ZIPcode", find_zip_udf(col("LAT"), col("LON"))) \
    .filter(col("ZIPcode").isNotNull())

crime_counts = crime_with_zip.groupBy("ZIPcode").agg(count("DR_NO").alias("Total_Crimes"))

pop_counts = blocks_df.withColumn("Population", col("Population").cast("int")) \
    .withColumn("ZIPcode", trim(col("ZIPcode").cast("string"))) \
    .groupBy("ZIPcode").agg(_sum("Population").alias("Total_Population"))

final_stats_df = crime_counts.join(pop_counts, "ZIPcode", "inner")
print("Join 1 :")
final_stats_df.explain(True)
print("")
final_stats_df = final_stats_df.join(income_clean, "ZIPcode", "inner")
print("Join 2 :")
final_stats_df.explain(True)
print("")


final_stats_df = final_stats_df.withColumn("Annual_Avg_Crimes", col("Total_Crimes") / 2) \
                               .withColumn("Crime_Rate_Per_Capita", col("Annual_Avg_Crimes") / col("Total_Population")) \
                               .filter(col("Total_Population") > 0) \
                               .cache()

display_df = final_stats_df.select(
    col("ZIPcode"),
    col("Community"),
    col("Total_Population"),
    col("Total_Crimes"),
    round(col("Annual_Avg_Crimes"), 1).alias("Avg_Annual_Crimes"),
    col("Crime_Rate_Per_Capita").cast(DecimalType(18, 8)).alias("Crime_Rate"),
    col("Median_Income")
).orderBy(desc("Crime_Rate"))


display_df.show(50, truncate=False)

if final_stats_df.count() > 0:
    corr_global = final_stats_df.stat.corr("Median_Income", "Crime_Rate_Per_Capita")
    print(f"Correlation (Income vs Crime Rate): {corr_global:.4f}")
    
    top_10_rich = final_stats_df.orderBy(desc("Median_Income")).limit(10)
    bottom_10_poor = final_stats_df.orderBy(asc("Median_Income")).limit(10)
    
    top_10_rich = top_10_rich.withColumn(
        "Crime_Rate_Per_Capita",
        col("Crime_Rate_Per_Capita").cast(DecimalType(18,8))
    )
    
    bottom_10_poor = bottom_10_poor.withColumn(
        "Crime_Rate_Per_Capita",
        col("Crime_Rate_Per_Capita").cast(DecimalType(18,8))
    )


    
    subset_df = top_10_rich.union(bottom_10_poor)
    corr_subset = subset_df.stat.corr("Median_Income", "Crime_Rate_Per_Capita")
    
    top_10_rich.select("ZIPcode", "Community", "Median_Income", "Crime_Rate_Per_Capita").show(10, truncate=False)
    bottom_10_poor.select("ZIPcode", "Community", "Median_Income", "Crime_Rate_Per_Capita").show(10, truncate=False)
    
    print(f"Correlation (Top 10 & Bottom 10 Only): {corr_subset:.4f}")

end_total = time.time()
print(f"Total pipeline executed in {end_total - start_total:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
450,application_1765289937462_0446,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Join 1 :
== Parsed Logical Plan ==
'Join UsingJoin(Inner, [ZIPcode])
:- Aggregate [ZIPcode#165], [ZIPcode#165, count(DR_NO#29) AS Total_Crimes#175L]
:  +- Filter isnotnull(ZIPcode#165)
:     +- Project [LAT#55, LON#56, DR_NO#29, find_zip_id(LAT#55, LON#56)#164 AS ZIPcode#165]
:        +- Filter (NOT (cast(LAT#55 as int) = 0) AND NOT (cast(LON#56 as int) = 0))
:           +- Project [LAT#55, LON#56, DR_NO#29]
:              +- Filter (Contains(DATE OCC#31, 2020) OR Contains(DATE OCC#31, 2021))
:                 +- Relation [DR_NO#29,Date Rptd#30,DATE OCC#31,TIME OCC#32,AREA#33,AREA NAME#34,Rpt Dist No#35,Part 1-2#36,Crm Cd#37,Crm Cd Desc#38,Mocodes#39,Vict Age#40,Vict Sex#41,Vict Descent#42,Premis Cd#43,Premis Desc#44,Weapon Used Cd#45,Weapon Desc#46,Status#47,Status Desc#48,Crm Cd 1#49,Crm Cd 2#50,Crm Cd 3#51,Crm Cd 4#52,... 4 more fields] csv
+- Aggregate [ZIPcode#183], [ZIPcode#183, sum(Population#178) AS Total_Population#193L]
   +- Project [trim(cast(ZIPcode#144 as string), None) A