In [1]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
944,application_1765289937462_0937,pyspark,idle,Link,Link,,
952,application_1765289937462_0945,pyspark,idle,Link,Link,,
956,application_1765289937462_0949,pyspark,idle,Link,Link,,
960,application_1765289937462_0953,pyspark,busy,Link,Link,,


In [2]:
from sedona.spark import *
from pyspark.sql import SparkSession


from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import udf, year, avg, count, concat, lit, round, rank, col

spark = SparkSession \
    .builder \
    .appName("query 4 execution") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
961,application_1765289937462_0954,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
crime_df1 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", header = True, inferSchema = True)
crime_df2 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", header = True, inferSchema = True)
crime_df = crime_df1.union(crime_df2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv", header = True, inferSchema = True)
stations_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(X=-118.289241553, Y=33.7576608970001, FID=1, DIVISION='HARBOR', LOCATION='2175 JOHN S. GIBSON BLVD.', PREC=5)

In [5]:
from pyspark.sql.window import Window

# Window: nearest station per crime
window = Window.partitionBy("DR_NO").orderBy("distance")

# Filter crimes (drop Null Island etc.)
filtered_crimes_df = (
    crime_df
    .filter(col("LAT").isNotNull() & col("LON").isNotNull())
    .filter(~((col("LAT") == 0) & (col("LON") == 0)))   # Null Island
    .select("DR_NO", "LAT", "LON")
)
# define function for counting distance with sedona
def get_distance(lat1_col, lon1_col, lat2_col, lon2_col):
    p1 = ST_Point(lon1_col.cast("double"), lat1_col.cast("double"))
    p2 = ST_Point(lon2_col.cast("double"), lat2_col.cast("double"))
    return ST_DistanceSphere(p1, p2) / 1000.0  # km

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
import time

start = time.time()
# Cross join with stations, compute distance to each station seperately
crimes_stations_dist_df = (
    filtered_crimes_df
    .crossJoin(
        stations_df.select(
            col("division"),  
            col("Y"), # Lat
            col("X") # lon
        )
    )
    .withColumn(
        "distance",
        get_distance(col("LAT"), col("LON"), col("Y"), col("X"))
    )
)

# Keep only nearest station per crime
nearest_station_df = (
    crimes_stations_dist_df
    .withColumn("distance_rank", rank().over(window))
    .filter(col("distance_rank") == 1) #nearest
    .select("division", "distance")
)

# Aggregate per division: count + average distance, sort by count desc
division_stats_df = (
    nearest_station_df
    .groupBy("division")
    .agg(
        avg("distance").alias("average_distance_km"), 
        count("*").alias("#") # only for output consistency
    )
    .withColumn(
        "average_distance_km",
        round(col("average_distance_km"), 3) #round to 3 decimal
    )
    .orderBy(col("#").desc())
)

division_stats_df.show(100, truncate=False)

end = time.time()
print("Execution time:", end - start, "seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------------------+------+
|division        |average_distance_km|#     |
+----------------+-------------------+------+
|HOLLYWOOD       |2.077              |225515|
|VAN NUYS        |2.953              |211130|
|SOUTHWEST       |2.191              |189565|
|WILSHIRE        |2.593              |187061|
|77TH STREET     |1.717              |172558|
|OLYMPIC         |1.725              |172353|
|NORTH HOLLYWOOD |2.643              |168655|
|PACIFIC         |3.853              |162514|
|CENTRAL         |0.993              |154952|
|SOUTHEAST       |2.422              |153746|
|RAMPART         |1.535              |153690|
|TOPANGA         |3.298              |141070|
|WEST VALLEY     |3.039              |139820|
|FOOTHILL        |4.251              |135381|
|HARBOR          |3.702              |127370|
|HOLLENBECK      |2.677              |116558|
|WEST LOS ANGELES|2.79               |116308|
|NEWTON          |1.635              |111628|
|NORTHEAST       |3.623           

In [7]:
division_stats_df.explain()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [##281L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(##281L DESC NULLS LAST, 1000), ENSURE_REQUIREMENTS, [plan_id=585]
      +- HashAggregate(keys=[division#221], functions=[avg(distance#249), count(1)], schema specialized)
         +- Exchange hashpartitioning(division#221, 1000), ENSURE_REQUIREMENTS, [plan_id=582]
            +- HashAggregate(keys=[division#221], functions=[partial_avg(distance#249), partial_count(1)], schema specialized)
               +- Project [division#221, distance#249]
                  +- Filter (distance_rank#259 = 1)
                     +- Window [rank(distance#249) windowspecdefinition(DR_NO#42, distance#249 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS distance_rank#259], [DR_NO#42], [distance#249 ASC NULLS FIRST]
                        +- WindowGroupLimit [DR_NO#42], [distance#249 ASC NULLS FIRST], rank(distance#249), 1, Final
     