In [1]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import time


# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("Query 5 - Crime Analysis with Police Stations") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

# Create Sedona context
sedona = SedonaContext.create(spark)

# Start timing
start_time = time.time()

# Load datasets
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
station_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True)

# Filter out Null Island records and create geometries for crime data
crime_data = crime_data.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()) & (col("LAT") != 0) & (col("LON") != 0))
crime_data = crime_data.withColumn("crime_geometry", ST_Point(col("LON").cast("double"), col("LAT").cast("double")))

# Create geometries for police stations using X and Y columns
station_data = station_data.filter((col("X").isNotNull()) & (col("Y").isNotNull()))
station_data = station_data.withColumn("station_geometry", ST_Point(col("X").cast("double"), col("Y").cast("double")))

# Broadcast police stations for join efficiency
station_data_broadcast = station_data.select("station_geometry", "DIVISION").alias("stations")

# Join crime data with police stations to calculate distances
crime_with_distances = crime_data.crossJoin(station_data_broadcast) \
    .withColumn("distance", ST_Distance(col("crime_geometry"), col("stations.station_geometry")))

# Find the closest station for each crime using DR_NO as the unique identifier
window_spec = Window.partitionBy("DR_NO").orderBy(col("distance").asc())
closest_station = crime_with_distances.withColumn("row_number", row_number().over(window_spec)) \
    .filter(col("row_number") == 1) \
    .select("DR_NO", "DIVISION", "distance")

# Aggregate results by station
results = closest_station.groupBy("DIVISION").agg(
    count("DR_NO").alias("#"),
    avg("distance").alias("average_distance")
)

# Sort results by number of incidents
sorted_results = results.orderBy(desc("#"))

# Display the results
sorted_results.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

# Stop Spark session
spark.stop()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
697,application_1732639283265_0664,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------+--------------------+
|        DIVISION|     #|    average_distance|
+----------------+------+--------------------+
|        VAN NUYS|148946| 0.02856339903536966|
|       HOLLYWOOD|140927|0.020444060380655493|
|       SOUTHWEST|133420|0.021628874258637677|
|        WILSHIRE|132967| 0.02632706717398016|
|         OLYMPIC|119636|0.017338152570481324|
| NORTH HOLLYWOOD|118938|  0.0263080120045502|
|     77TH STREET|116946|0.016632946064630495|
|       SOUTHEAST|105162| 0.02403583311415139|
|         PACIFIC|104090|0.037408008860875336|
|         TOPANGA|103828| 0.03233359516748025|
|         RAMPART| 95479|0.014934307019218239|
|         CENTRAL| 93531|0.009495482667801518|
|     WEST VALLEY| 90132|0.028989371552462475|
|          HARBOR| 88268| 0.03480036440117154|
|        FOOTHILL| 87497|0.041252498599392944|
|      HOLLENBECK| 81541|0.026781603358714202|
|WEST LOS ANGELES| 79670|0.030343153555294662|
|         MISSION| 78753|0.035104180437474056|
|       NORTH

In [1]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import time

# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("Query 5 - Crime Analysis with Police Stations") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

# Create Sedona context
sedona = SedonaContext.create(spark)

# Start timing
start_time = time.time()

# Load datasets
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
station_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True)

# Filter out Null Island records and create geometries for crime data
crime_data = crime_data.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()) & (col("LAT") != 0) & (col("LON") != 0))
crime_data = crime_data.withColumn("crime_geometry", ST_Point(col("LON").cast("double"), col("LAT").cast("double")))

# Create geometries for police stations using X and Y columns
station_data = station_data.filter((col("X").isNotNull()) & (col("Y").isNotNull()))
station_data = station_data.withColumn("station_geometry", ST_Point(col("X").cast("double"), col("Y").cast("double")))

# Broadcast police stations for join efficiency
station_data_broadcast = station_data.select("station_geometry", "DIVISION").alias("stations")

# Join crime data with police stations to calculate distances
crime_with_distances = crime_data.crossJoin(station_data_broadcast) \
    .withColumn("distance", ST_Distance(col("crime_geometry"), col("stations.station_geometry")))

# Find the closest station for each crime using DR_NO as the unique identifier
window_spec = Window.partitionBy("DR_NO").orderBy(col("distance").asc())
closest_station = crime_with_distances.withColumn("row_number", row_number().over(window_spec)) \
    .filter(col("row_number") == 1) \
    .select("DR_NO", "DIVISION", "distance")

# Aggregate results by station
results = closest_station.groupBy("DIVISION").agg(
    count("DR_NO").alias("#"),
    avg("distance").alias("average_distance")
)

# Sort results by number of incidents
sorted_results = results.orderBy(desc("#"))

# Display the results
sorted_results.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

# Stop Spark session
spark.stop()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
698,application_1732639283265_0665,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------+--------------------+
|        DIVISION|     #|    average_distance|
+----------------+------+--------------------+
|        VAN NUYS|148946| 0.02856339903536965|
|       HOLLYWOOD|140927| 0.02044406038065549|
|       SOUTHWEST|133420| 0.02162887425863768|
|        WILSHIRE|132967|0.026327067173980156|
|         OLYMPIC|119636|0.017338152570481328|
| NORTH HOLLYWOOD|118938|0.026308012004550196|
|     77TH STREET|116946|  0.0166329460646305|
|       SOUTHEAST|105162| 0.02403583311415139|
|         PACIFIC|104090|0.037408008860875315|
|         TOPANGA|103828| 0.03233359516748025|
|         RAMPART| 95479|0.014934307019218239|
|         CENTRAL| 93531|0.009495482667801515|
|     WEST VALLEY| 90132|0.028989371552462475|
|          HARBOR| 88268|0.034800364401171534|
|        FOOTHILL| 87497| 0.04125249859939294|
|      HOLLENBECK| 81541|  0.0267816033587142|
|WEST LOS ANGELES| 79670| 0.03034315355529467|
|         MISSION| 78753| 0.03510418043747406|
|       NORTH

In [1]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import time

# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("Query 5 - Crime Analysis with Police Stations") \
    .config("spark.executor.instances", "8") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "1") \
    .getOrCreate()

# Create Sedona context
sedona = SedonaContext.create(spark)

# Start timing
start_time = time.time()

# Load datasets
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True)
station_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True)

# Filter out Null Island records and create geometries for crime data
crime_data = crime_data.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()) & (col("LAT") != 0) & (col("LON") != 0))
crime_data = crime_data.withColumn("crime_geometry", ST_Point(col("LON").cast("double"), col("LAT").cast("double")))

# Create geometries for police stations using X and Y columns
station_data = station_data.filter((col("X").isNotNull()) & (col("Y").isNotNull()))
station_data = station_data.withColumn("station_geometry", ST_Point(col("X").cast("double"), col("Y").cast("double")))

# Broadcast police stations for join efficiency
station_data_broadcast = station_data.select("station_geometry", "DIVISION").alias("stations")

# Join crime data with police stations to calculate distances
crime_with_distances = crime_data.crossJoin(station_data_broadcast) \
    .withColumn("distance", ST_Distance(col("crime_geometry"), col("stations.station_geometry")))

# Find the closest station for each crime using DR_NO as the unique identifier
window_spec = Window.partitionBy("DR_NO").orderBy(col("distance").asc())
closest_station = crime_with_distances.withColumn("row_number", row_number().over(window_spec)) \
    .filter(col("row_number") == 1) \
    .select("DR_NO", "DIVISION", "distance")

# Aggregate results by station
results = closest_station.groupBy("DIVISION").agg(
    count("DR_NO").alias("#"),
    avg("distance").alias("average_distance")
)

# Sort results by number of incidents
sorted_results = results.orderBy(desc("#"))

# Display the results
sorted_results.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

# Stop Spark session
spark.stop()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
700,application_1732639283265_0667,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------+--------------------+
|        DIVISION|     #|    average_distance|
+----------------+------+--------------------+
|        VAN NUYS|148946| 0.02856339903536965|
|       HOLLYWOOD|140927| 0.02044406038065549|
|       SOUTHWEST|133420|0.021628874258637673|
|        WILSHIRE|132967|0.026327067173980152|
|         OLYMPIC|119636|0.017338152570481328|
| NORTH HOLLYWOOD|118938|0.026308012004550196|
|     77TH STREET|116946|  0.0166329460646305|
|       SOUTHEAST|105162|0.024035833114151393|
|         PACIFIC|104090| 0.03740800886087533|
|         TOPANGA|103828| 0.03233359516748025|
|         RAMPART| 95479|0.014934307019218237|
|         CENTRAL| 93531|0.009495482667801518|
|     WEST VALLEY| 90132| 0.02898937155246247|
|          HARBOR| 88268| 0.03480036440117154|
|        FOOTHILL| 87497| 0.04125249859939293|
|      HOLLENBECK| 81541| 0.02678160335871419|
|WEST LOS ANGELES| 79670|0.030343153555294662|
|         MISSION| 78753| 0.03510418043747406|
|       NORTH