# Query 5

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Query 5") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
144,application_1738075734771_0145,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Case 1: 2 Executors × 4 Cores/8GB Memory

In [2]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.cores": "4",
        "spark.executor.memory": "8g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
145,application_1738075734771_0146,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
123,application_1738075734771_0124,pyspark,idle,Link,Link,,
124,application_1738075734771_0125,pyspark,idle,Link,Link,,
126,application_1738075734771_0127,pyspark,idle,Link,Link,,
128,application_1738075734771_0129,pyspark,busy,Link,Link,,
129,application_1738075734771_0130,pyspark,idle,Link,Link,,
140,application_1738075734771_0141,pyspark,busy,Link,Link,,
143,application_1738075734771_0144,pyspark,idle,Link,Link,,
145,application_1738075734771_0146,pyspark,idle,Link,Link,,✔


#### Case 2: 4 Executors × 2 Cores/4GB Memory

In [None]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.cores": "2",
        "spark.executor.memory": "4g"
    }
}

#### Case 3: 8 Executors × 1 Cores/2GB Memory

In [2]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
834,application_1732639283265_0801,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
782,application_1732639283265_0749,pyspark,idle,Link,Link,,
783,application_1732639283265_0750,pyspark,idle,Link,Link,,
784,application_1732639283265_0751,pyspark,idle,Link,Link,,
785,application_1732639283265_0752,pyspark,idle,Link,Link,,
789,application_1732639283265_0756,pyspark,idle,Link,Link,,
792,application_1732639283265_0759,pyspark,idle,Link,Link,,
793,application_1732639283265_0760,pyspark,idle,Link,Link,,
795,application_1732639283265_0762,pyspark,idle,Link,Link,,
801,application_1732639283265_0768,pyspark,idle,Link,Link,,
802,application_1732639283265_0769,pyspark,idle,Link,Link,,


## Query Implementation

In [9]:
import time
start_time = time.time()

master = spark.sparkContext.master
print(f"Spark Master: {master}")

conf = spark.sparkContext.getConf()
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Cores:", conf.get("spark.executor.cores"))
print("Executor Memory:", conf.get("spark.executor.memory"))

crime_data_path_1 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_path_2 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
police_stations_data_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Spark Master: yarn
Executor Instances: 2
Executor Cores: 4
Executor Memory: 8g

In [10]:
from sedona.spark import *
from pyspark.sql.functions import col

# Register Sedona for geospatial queries
sedona = SedonaContext.create(spark)

police_stations_data = spark.read.csv(
    path=police_stations_data_path,
    header=True,          # Use header row for column names
    inferSchema=True      # Let Spark infer the schema
).select("X", "Y", "Division")

# police_stations_data.show()

# Create geospatial point for police stations
police_stations = police_stations_data.withColumn(
    "PS_Location_Point", ST_Point("X", "Y")
).select("Division", "PS_Location_Point")

police_stations.show()


crime_data_1 = spark.read.csv(
    path=crime_data_path_1,
    header=True,          # Use header row for column names
    inferSchema=True      # Let Spark infer the schema
).select("LON", "LAT")

print("Rows of crime data 1: ", crime_data_1.count())

crime_data_2 = spark.read.csv(
    path=crime_data_path_2,
    header=True,
    inferSchema=True
).select("LON", "LAT")

print("Rows of crime data 2: ", crime_data_2.count())

# Filter out Null Island records
crime_data_1 = crime_data_1.filter(~((col("LAT") == 0) & (col("LON") == 0)))
crime_data_2 = crime_data_2.filter(~((col("LAT") == 0) & (col("LON") == 0)))

print("Rows of crime data 1 after removing null: ", crime_data_1.count())
print("Rows of crime data 2 after removing null: ", crime_data_2.count())


# Create geospatial point for crime locations
crime_locations_1 = crime_data_1.withColumn(
    "Crime_Location_Point", ST_Point("LON", "LAT")
).select("Crime_Location_Point")

crime_locations_2 = crime_data_2.withColumn(
    "Crime_Location_Point", ST_Point("LON", "LAT")
).select("Crime_Location_Point")

crime_locations = crime_locations_1.union(crime_locations_2)
crime_locations.show()

print("Crime locations rows: ", crime_locations.count())
print("Add rows from each dataset: ", crime_locations_1.count()+crime_locations_2.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------------+
|        Division|   PS_Location_Point|
+----------------+--------------------+
|          HARBOR|POINT (-118.28924...|
|       SOUTHEAST|POINT (-118.27539...|
|     77TH STREET|POINT (-118.27766...|
|         PACIFIC|POINT (-118.41984...|
|       SOUTHWEST|POINT (-118.30514...|
|          NEWTON|POINT (-118.25611...|
|         CENTRAL|POINT (-118.24729...|
|WEST LOS ANGELES|POINT (-118.45077...|
|      HOLLENBECK|POINT (-118.21306...|
|        WILSHIRE|POINT (-118.34282...|
|         OLYMPIC|POINT (-118.29117...|
|         RAMPART|POINT (-118.26697...|
|       HOLLYWOOD|POINT (-118.33066...|
|       NORTHEAST|POINT (-118.24941...|
| NORTH HOLLYWOOD|POINT (-118.38585...|
|        VAN NUYS|POINT (-118.44522...|
|     WEST VALLEY|POINT (-118.54745...|
|         TOPANGA|POINT (-118.59963...|
|        FOOTHILL|POINT (-118.41041...|
|      DEVONSHIRE|POINT (-118.53137...|
+----------------+--------------------+
only showing top 20 rows

Rows of crime 

In [11]:
# Cross join crime locations with police stations
cross_join_df = crime_locations.crossJoin(police_stations)
cross_join_df.printSchema()
cross_join_df.show()
print(cross_join_df.count(), crime_locations.count()*police_stations.count())

# Calculate distances between crimes and police stations
distance_df = cross_join_df.withColumn(
    "Distance",
    ST_DistanceSphere("Crime_Location_Point", "PS_Location_Point")
)

distance_df.printSchema()
distance_df.show()
print(distance_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Crime_Location_Point: geometry (nullable = true)
 |-- Division: string (nullable = true)
 |-- PS_Location_Point: geometry (nullable = true)

+--------------------+----------------+--------------------+
|Crime_Location_Point|        Division|   PS_Location_Point|
+--------------------+----------------+--------------------+
|POINT (-118.2695 ...|          HARBOR|POINT (-118.28924...|
|POINT (-118.2695 ...|       SOUTHEAST|POINT (-118.27539...|
|POINT (-118.2695 ...|     77TH STREET|POINT (-118.27766...|
|POINT (-118.2695 ...|         PACIFIC|POINT (-118.41984...|
|POINT (-118.2695 ...|       SOUTHWEST|POINT (-118.30514...|
|POINT (-118.2695 ...|          NEWTON|POINT (-118.25611...|
|POINT (-118.2695 ...|         CENTRAL|POINT (-118.24729...|
|POINT (-118.2695 ...|WEST LOS ANGELES|POINT (-118.45077...|
|POINT (-118.2695 ...|      HOLLENBECK|POINT (-118.21306...|
|POINT (-118.2695 ...|        WILSHIRE|POINT (-118.34282...|
|POINT (-118.2695 ...|         OLYMPIC|POINT (-118.29117

##### So now we have all the distances of all crimes to all police stations. Next, we have to perform 2 group by operations:
- Firstly, we have to group each crime to its nearest police station. That way, we will create a dataframe with each crime, one police station associated and the distance between the two (this is the minimum distance).
- Then we have to group each police station to:
  - The total number of crimes associated with it.
  - The average distance of these crimes to this police station.

In [12]:
from pyspark.sql.functions import col, min

# Step 1: Compute the minimum distance for each crime location
min_distance_df = distance_df.groupBy("Crime_Location_Point").agg(
    min("Distance").alias("Min_Distance")
)

# min_distance_df.show()

# Step 2: Alias both DataFrames to avoid ambiguity
distance_df_alias = distance_df.alias("d")
min_distance_df_alias = min_distance_df.alias("m")

# Step 3: Join the DataFrames on Crime_Location_Point and Distance
nearest_station_df = distance_df_alias.join(
    min_distance_df_alias,
    (col("d.Crime_Location_Point") == col("m.Crime_Location_Point")) &
    (col("d.Distance") == col("m.Min_Distance"))
)

nearest_station_df.show()

nearest_station_df = nearest_station_df.select(
    col("d.Crime_Location_Point").alias("Crime_Location_Point"),
    col("d.Division").alias("division"),
    col("d.PS_Location_Point").alias("Police_Station_Location"),
    col("d.Distance").alias("Nearest_Distance")
)

# Step 4: Show the result
nearest_station_df.show()

print(min_distance_df.count(), nearest_station_df.count(), distance_df.count()/21)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+--------------------+------------------+--------------------+------------------+
|Crime_Location_Point| Division|   PS_Location_Point|          Distance|Crime_Location_Point|      Min_Distance|
+--------------------+---------+--------------------+------------------+--------------------+------------------+
|POINT (-118.2339 ...|  CENTRAL|POINT (-118.24729...| 1254.374134066565|POINT (-118.2339 ...| 1254.374134066565|
|POINT (-118.2917 ...|HOLLYWOOD|POINT (-118.33066...| 3729.455288788968|POINT (-118.2917 ...| 3729.455288788968|
|POINT (-118.2755 ...|  RAMPART|POINT (-118.26697...|1333.1327248648129|POINT (-118.2755 ...|1333.1327248648129|
|POINT (-118.2755 ...|  RAMPART|POINT (-118.26697...|1333.1327248648129|POINT (-118.2755 ...|1333.1327248648129|
|POINT (-118.2917 ...|HOLLYWOOD|POINT (-118.33066...| 3729.455288788968|POINT (-118.2917 ...| 3729.455288788968|
|POINT (-118.2755 ...|  RAMPART|POINT (-118.26697...|1333.1327248648129|POINT (-118.2755 ...|133

The min_distance dataframe groups each unique point to one police station, not each unique crime! A crime can be unique even if it is in the same Location Point as another one! So we join this dataframe with the distance dataframe which contains all the crimes, regardless of duplicate points. That way, each unique crime is mapped to its minimum distance and the police station division that corresponds to this minimum distance.

We can verify that if we print the number of rows each dataset has:
- minimum distance dataset has (105060) rows, hence (105060) unique crime locations (points).
- distance dataframe has (21 times) x (3109880) rows, hence 3109880 x 21 rows.
- nearest station dataframe has (3109880) rows, hence (3109880) unique crime locations. This is also the number of rows that our initial crime dataset has (crime_data_1).

In [13]:
from pyspark.sql.functions import count, avg, format_number
import time

# Group by Division and aggregate both metrics
result_df = nearest_station_df \
    .groupBy("Division") \
    .agg( \
        count("*").alias("Total_Crimes_#"), \
        avg("Nearest_Distance").alias("Average_Distance") \
    ) \
    .withColumn("Average_Distance(km)", format_number(col("Average_Distance")/1000, 3)) \
    .drop("Average_Distance") \
    .orderBy(col("Total_Crimes_#").desc())

# Show the result
result_df.show(21)

end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+--------------+--------------------+
|        Division|Total_Crimes_#|Average_Distance(km)|
+----------------+--------------+--------------------+
|       HOLLYWOOD|        224340|               2.076|
|        VAN NUYS|        210134|               2.953|
|       SOUTHWEST|        188901|               2.191|
|        WILSHIRE|        185996|               2.593|
|     77TH STREET|        171827|               1.717|
|         OLYMPIC|        170897|               1.724|
| NORTH HOLLYWOOD|        167854|               2.643|
|         PACIFIC|        161359|               3.850|
|         CENTRAL|        153871|               0.992|
|         RAMPART|        152736|               1.535|
|       SOUTHEAST|        152176|               2.422|
|     WEST VALLEY|        138643|               3.036|
|         TOPANGA|        138217|               3.297|
|        FOOTHILL|        134896|               4.251|
|          HARBOR|        126747|               3.703|
|      HOL