
# Query 1 – Victim age groups in aggravated assault incidents

Goal: For all crime incidents that involve any form of “aggravated assault”, compute the number of victims per age group and display the age groups in descending order of incident count.
Age groups:

- Children: < 18

- Young Adults: 18–24

- Adults: 25–64

- Elderly: > 64

Below are three implementations of the same query:

- DataFrame API with a Python UDF

- DataFrame API with native Spark expressions (no UDF)

- RDD API implementation


## Query 1 – DataFrame API with UDF

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, udf
import time
from pyspark.sql.types import StringType

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = (
spark = SparkSession.builder \
    .appName("Crime Data - Query1") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# ---------------------------------------------------------------------
# Schema definition for the LA crime dataset
# ---------------------------------------------------------------------

crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

# ---------------------------------------------------------------------
# UDF for mapping victim age to an age group
# ---------------------------------------------------------------------

def age_group(age):
    try:
        age = int(age)
        if age<=0:
            return "Unknown"
        elif 0< age < 18:
            return "Children"
        elif 18 <= age <= 24:
            return "Young Adults"
        elif 25 <= age <= 64:
            return "Adults"
        elif age > 64:
            return "Elderly"
    except:
        return "Unknown"

age_group_udf = udf(age_group, StringType())

# ---------------------------------------------------------------------
# Execution timing starts
# ---------------------------------------------------------------------

start_time = time.time()

# ---------------------------------------------------------------------
# Load crime CSV files from S3 and union them into a single DataFrame
# ---------------------------------------------------------------------

crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)

crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

print("CSV Loaded!\n")

# ---------------------------------------------------------------------
# Filter only incidents whose description contains "AGGRAVATED ASSAULT"
# ---------------------------------------------------------------------

aggr_assault_df = crime_df.filter(col("Crm Cd Desc").contains("AGGRAVATED ASSAULT"))

# ---------------------------------------------------------------------
# Add "Age Group" column using the Python UDF
# ---------------------------------------------------------------------

aggr_assault_df = aggr_assault_df.withColumn("Age Group", age_group_udf(col("Vict Age")))

# ---------------------------------------------------------------------
# Group by age group and sort by descending incident count
# ---------------------------------------------------------------------

age_group_counts_df = aggr_assault_df.groupBy("Age Group").count().orderBy(col("count").desc())

# ---------------------------------------------------------------------
# Display results and execution time
# ---------------------------------------------------------------------

age_group_counts_df.show()

end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
107,application_1761923966900_0122,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CSV Loaded!

+------------+------+
|   Age Group| count|
+------------+------+
|      Adults|121660|
|Young Adults| 33758|
|    Children| 10904|
|     Elderly|  6011|
|     Unknown|  5110|
+------------+------+

Execution time: 11.92 seconds

## Query 1 – DataFrame API without UDF

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, when
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------

spark = SparkSession.builder \
    .appName("Crime Data - Query1") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# ---------------------------------------------------------------------
# Schema definition for the LA crime dataset
# ---------------------------------------------------------------------

crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

# ---------------------------------------------------------------------
# Execution timing starts
# ---------------------------------------------------------------------

start_time = time.time()

# ---------------------------------------------------------------------
# Load crime CSV files from S3 and union them into a single DataFrame
# ---------------------------------------------------------------------

crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)

crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

print("CSV Loaded!\n")

# ---------------------------------------------------------------------
# Filter only incidents whose description contains "AGGRAVATED ASSAULT"
# -----------------------------------------------------------------------

aggr_assault_df = crime_df.filter(col("Crm Cd Desc").contains("AGGRAVATED ASSAULT"))

# ---------------------------------------------------------------------
# Add "Age Group" column using only native Spark expressions
# ---------------------------------------------------------------------

aggr_assault_df = aggr_assault_df.withColumn(
    "Age Group",
    when(col("Vict Age") <=0, "Unknown")
    .when( (0 < col("Vict Age")) & (col("Vict Age") < 18), "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Elderly")
    .otherwise("Unknown")
)

# ---------------------------------------------------------------------
# Group by age group and sort by descending incident count
# ---------------------------------------------------------------------

age_group_counts_df = aggr_assault_df.groupBy("Age Group").count().orderBy(col("count").desc())

# ---------------------------------------------------------------------
# Display results and execution time
# ---------------------------------------------------------------------

age_group_counts_df.show()

end_time = time.time()
print(f"Execution time: {end_time - start_time:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
108,application_1761923966900_0123,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

CSV Loaded!

+------------+------+
|   Age Group| count|
+------------+------+
|      Adults|121660|
|Young Adults| 33758|
|    Children| 10904|
|     Elderly|  6011|
|     Unknown|  5110|
+------------+------+

Execution time: 10.56 seconds

## Query 1 – RDD API 

In [1]:
from pyspark.sql import SparkSession
import csv
from io import StringIO
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query1 RDD CSV-safe") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

sc = spark.sparkContext

# ---------------------------------------------------------------------
# Helper function: map age to age group
# ---------------------------------------------------------------------

def age_group_from_age(age):
    try:
        age = int(age)
        if age<=0:
            return "Unknown"
        elif 0< age < 18:
            return "Children"
        elif 18 <= age <= 24:
            return "Young Adults"
        elif 25 <= age <= 64:
            return "Adults"
        elif age > 64:
            return "Elderly"
    except:
        return "Unknown"

# ---------------------------------------------------------------------
# Helper function: CSV-safe splitting of a line
# ---------------------------------------------------------------------

def safe_split(line):
    return next(csv.reader(StringIO(line)))

# ---------------------------------------------------------------------
# Execution timing starts
# ---------------------------------------------------------------------

start_time = time.time()

# ---------------------------------------------------------------------
# Read both CSV files as RDDs of raw lines
# ---------------------------------------------------------------------

crime_rdd1 = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
)
crime_rdd2 = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
)

# ---------------------------------------------------------------------
# Remove headers from both RDDs
# ---------------------------------------------------------------------

header1 = crime_rdd1.first()
header2 = crime_rdd2.first()
crime_rdd1 = crime_rdd1.filter(lambda x: x != header1)
crime_rdd2 = crime_rdd2.filter(lambda x: x != header2)

# ---------------------------------------------------------------------
# Union the two RDDs and parse CSV lines safely
# ---------------------------------------------------------------------

crime_rdd = crime_rdd1.union(crime_rdd2)

crime_rdd = crime_rdd.map(safe_split)

# ---------------------------------------------------------------------
# Filter for "AGGRAVATED ASSAULT" incidents
# Crm Cd Desc is at index 9, Vict Age at index 11
# ---------------------------------------------------------------------

aggr_assault_rdd = crime_rdd.filter(lambda x: len(x) > 11 and "AGGRAVATED ASSAULT" in x[9])


# ---------------------------------------------------------------------
# Map to (age_group, 1) pairs
# ---------------------------------------------------------------------

age_group_rdd = aggr_assault_rdd.map(lambda x: (age_group_from_age(x[11]), 1))

# ---------------------------------------------------------------------
# Aggregate counts per age group and sort descending by count
# ---------------------------------------------------------------------

age_group_counts_rdd = age_group_rdd.reduceByKey(lambda a, b: a + b)

# ---------------------------------------------------------------------
# Collect and print results and execution time
# ---------------------------------------------------------------------

age_group_counts_sorted_rdd = age_group_counts_rdd.sortBy(lambda x: x[1], ascending=False)

results = age_group_counts_sorted_rdd.collect()
for r in results:
    print(r)

end_time = time.time()
print(f"\nExecution time (RDD CSV-safe): {end_time - start_time:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
110,application_1761923966900_0125,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('Adults', 121660)
('Young Adults', 33758)
('Children', 10904)
('Elderly', 6011)
('Unknown', 5110)

Execution time (RDD CSV-safe): 16.45 seconds

# QUERY 2
Goal:
For each year in the LA crime dataset, compute:

- The number of victims per Vict Descent group

- The percentage of that group relative to the total victims in the same year

- Keep only the top 3 Vict Descent groups per year (by victim count)

Two implementations:

1. DataFrame API (with window functions)

2. SQL API (using temp views and SQL window functions)

## QUERY 2 - DataFrame API

In [1]:
## Query 2 – DataFrame API
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Crime Data - Query2 (DataFrame API)")
    .config("spark.executor.instances", "4")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .getOrCreate()
)

# ---------------------------------------------------------------------
# Schema for main crime dataset
# ---------------------------------------------------------------------
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

# ---------------------------------------------------------------------
# Schema for race/ethnicity codes (RE_codes.csv)
# ---------------------------------------------------------------------
race_schema = StructType([
    StructField("Vict Descent", StringType()),
    StructField("Vict Descent Full", StringType())
])

# ---------------------------------------------------------------------
# Start execution timer
# ---------------------------------------------------------------------
start = time.time()

# ---------------------------------------------------------------------
# Load crime CSV files from S3
# ---------------------------------------------------------------------
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)

crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

# ---------------------------------------------------------------------
# Load race/ethnicity code mapping (Vict Descent -> Vict Descent Full)
# ---------------------------------------------------------------------
race_codes_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv",
    header=True,
    schema=race_schema
)

# ---------------------------------------------------------------------
# Extract year from DATE OCC
# ---------------------------------------------------------------------
crime_with_year = crime_df.withColumn(
    "year",
    F.year(F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a"))
)

# ---------------------------------------------------------------------
# Count victims by (year, Vict Descent code)
# ---------------------------------------------------------------------
yearly_counts = (
    crime_with_year
    .groupBy("year", "Vict Descent")
    .agg(F.count("*").alias("victim_count"))
)

# ---------------------------------------------------------------------
# Join with race/ethnicity mapping to get full description
# ---------------------------------------------------------------------
yearly_counts_full = (
    yearly_counts
    .join(race_codes_df, on="Vict Descent", how="left")
)

# ---------------------------------------------------------------------
# Compute total victims per year
# ---------------------------------------------------------------------
yearly_totals = (
    yearly_counts_full
    .groupBy("year")
    .agg(F.sum("victim_count").alias("total"))
)

# ---------------------------------------------------------------------
# Join counts with totals and compute percentage
# ---------------------------------------------------------------------
joined_df = (
    yearly_counts_full
    .join(yearly_totals, on="year")
    .withColumn(
        "pct_of_total",
        F.round(F.col("victim_count") / F.col("total") * 100, 2)
    )
)

# ---------------------------------------------------------------------
# Window: rank descent groups per year by victim_count (descending)
# ---------------------------------------------------------------------
windowSpec = Window.partitionBy("year").orderBy(F.col("victim_count").desc())

# ---------------------------------------------------------------------
# Keep top 3 groups per year, output full description
# ---------------------------------------------------------------------
final_df = (
    joined_df
    .withColumn("rank", F.row_number().over(windowSpec))
    .filter(F.col("rank") <= 3)
    .select(
        "year",
        F.col("Vict Descent Full").alias("Vict Descent"),
        "victim_count",
        "pct_of_total"
    )
    .orderBy("year", F.col("victim_count").desc())
)

# ---------------------------------------------------------------------
# Show results and execution time
# ---------------------------------------------------------------------
final_df.show(48)

end = time.time()
overall_time = end - start

print(f"\n Overall_time: {overall_time:.2f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
306,application_1765289937462_0303,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+------------+------------+
|year|        Vict Descent|victim_count|pct_of_total|
+----+--------------------+------------+------------+
|2010|Hispanic/Latin/Me...|       73558|       35.14|
|2010|               White|       53835|       25.72|
|2010|               Black|       33937|       16.21|
|2011|Hispanic/Latin/Me...|       70845|       35.26|
|2011|               White|       51219|       25.49|
|2011|               Black|       32579|       16.22|
|2012|Hispanic/Latin/Me...|       70338|       34.85|
|2012|               White|       51839|       25.68|
|2012|               Black|       33572|       16.63|
|2013|Hispanic/Latin/Me...|       66741|        34.6|
|2013|               White|       48453|       25.12|
|2013|               Black|       31975|       16.58|
|2014|Hispanic/Latin/Me...|       68763|        35.1|
|2014|               White|       47531|       24.27|
|2014|               Black|       32952|       16.82|
|2015|Hispanic/Latin/Me...| 

## Query 2 – SQL API

In [1]:
## Query 2 – SQL API (with Vict Descent full description)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Crime Data - Query2 (SQL API)")
    .config("spark.executor.instances", "4")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .getOrCreate()
)

# ---------------------------------------------------------------------
# Schema for main crime dataset
# ---------------------------------------------------------------------
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

# ---------------------------------------------------------------------
# Schema for race/ethnicity codes
# ---------------------------------------------------------------------
race_schema = StructType([
    StructField("Vict Descent", StringType()),
    StructField("Vict Descent Full", StringType())
])

# ---------------------------------------------------------------------
# Start execution timer
# ---------------------------------------------------------------------
start = time.time()

# ---------------------------------------------------------------------
# Load crime CSV files
# ---------------------------------------------------------------------
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)

crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

# ---------------------------------------------------------------------
# Load race/ethnicity code mapping and register as a view
# ---------------------------------------------------------------------
race_codes_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv",
    header=True,
    schema=race_schema
)

race_codes_df.createOrReplaceTempView("race_codes")

# ---------------------------------------------------------------------
# Register main crime DataFrame as a temporary view
# ---------------------------------------------------------------------
crime_df.createOrReplaceTempView("crime")

# ---------------------------------------------------------------------
# Add year from DATE OCC and create a new view
# ---------------------------------------------------------------------
crime_with_year = spark.sql("""
    SELECT *,
           YEAR(TO_TIMESTAMP(`DATE OCC`, 'yyyy MMM dd hh:mm:ss a')) AS year
    FROM crime
""")
crime_with_year.createOrReplaceTempView("crime_with_year")

# ---------------------------------------------------------------------
# Victim counts by (year, Vict Descent code)
# ---------------------------------------------------------------------
yearly_counts = spark.sql("""
    SELECT year,
           `Vict Descent`,
           COUNT(*) AS victim_count
    FROM crime_with_year
    GROUP BY year, `Vict Descent`
""")
yearly_counts.createOrReplaceTempView("yearly_counts")

# ---------------------------------------------------------------------
# Total victims per year
# ---------------------------------------------------------------------
yearly_totals = spark.sql("""
    SELECT year,
           SUM(victim_count) AS total
    FROM yearly_counts
    GROUP BY year
""")
yearly_totals.createOrReplaceTempView("yearly_totals")

# ---------------------------------------------------------------------
# Final query:
# - Join yearly_counts with yearly_totals and race_codes
# - Compute percentage per group
# - Rank groups by victim_count per year
# - Keep only top 3
# - Output full Vict Descent description
# ---------------------------------------------------------------------
final_query = """
SELECT year,
       rc.`Vict Descent Full` AS `Vict Descent`,
       victim_count,
       pct_of_total
FROM (
    SELECT 
        yc.year,
        yc.`Vict Descent`,
        yc.victim_count,
        ROUND(100.0 * yc.victim_count / yt.total, 2) AS pct_of_total,
        ROW_NUMBER() OVER (
            PARTITION BY yc.year
            ORDER BY yc.victim_count DESC
        ) AS rank
    FROM yearly_counts yc
    JOIN yearly_totals yt
      ON yc.year = yt.year
) AS ranked
JOIN race_codes rc
  ON ranked.`Vict Descent` = rc.`Vict Descent`
WHERE rank <= 3
ORDER BY year, rank
"""

results = spark.sql(final_query)
results.show(48)

# ---------------------------------------------------------------------
# Print total execution time
# ---------------------------------------------------------------------
end = time.time()
overall_time = end - start
print(f"\nOverall time: {overall_time:.2f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
310,application_1765289937462_0307,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+--------------------+------------+------------+
|year|        Vict Descent|victim_count|pct_of_total|
+----+--------------------+------------+------------+
|2010|Hispanic/Latin/Me...|       73558|       35.14|
|2010|               White|       53835|       25.72|
|2010|               Black|       33937|       16.21|
|2011|Hispanic/Latin/Me...|       70845|       35.26|
|2011|               White|       51219|       25.49|
|2011|               Black|       32579|       16.22|
|2012|Hispanic/Latin/Me...|       70338|       34.85|
|2012|               White|       51839|       25.68|
|2012|               Black|       33572|       16.63|
|2013|Hispanic/Latin/Me...|       66741|       34.60|
|2013|               White|       48453|       25.12|
|2013|               Black|       31975|       16.58|
|2014|Hispanic/Latin/Me...|       68763|       35.10|
|2014|               White|       47531|       24.27|
|2014|               Black|       32952|       16.82|
|2015|Hispanic/Latin/Me...| 

# Query 3 – MO code frequency and join strategy comparison

Goal:

1. Extract individual MO codes from the crime dataset (Mocodes column).

2. Join them with an external MO code dictionary (MO_codes.txt) that maps each MO code to a textual description.

3. Compute the frequency of each MO code and show the description and count, sorted by descending frequency.

4. Compare different join strategies (default, broadcast, sort-merge, shuffle hash, shuffle replicate NL).

5. Provide an RDD API implementation of the same logic.

## 3.1 DataFrame API – Default join strategy

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------

spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# ---------------------------------------------------------------------
# Schema definition (columns needed for this query)
# ---------------------------------------------------------------------

crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

start = time.time()

# ---------------------------------------------------------------------
# Load the two crime CSV files from S3
# ---------------------------------------------------------------------
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

# ---------------------------------------------------------------------
# Union the two DataFrames
# ---------------------------------------------------------------------
crime_df = crime_df1.union(crime_df2)


# ---------------------------------------------------------------------
# Split Mocodes string into an array of codes
# ---------------------------------------------------------------------
crime_mo = crime_df.withColumn(
    "MO_array", F.split(F.col("Mocodes"), " ")
)

# ---------------------------------------------------------------------
# Explode into one row per MO code (drop empty codes)
# ---------------------------------------------------------------------
crime_mo = crime_mo.withColumn(
    "MO_code", F.explode("MO_array")
).filter(F.col("MO_code") != "")   # Πετάμε τα κενά strings

# ---------------------------------------------------------------------
# Load MO_codes dictionary (text file: e.g., "0100 Suspect Impersonate")
# ---------------------------------------------------------------------
mo_schema = StructType([
    StructField("raw", StringType())
])

mo_codes_raw = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Split each line into (code, description)
mo_codes = mo_codes_raw.select(
    F.split(F.col("value"), " ", 2).getItem(0).alias("MO_code"),
    F.split(F.col("value"), " ", 2).getItem(1).alias("MO_description")
)

# ---------------------------------------------------------------------
# Join crime MO codes with dictionary on MO_code (default join strategy)
# LEFT join keeps all crime MO codes, even if they have no dictionary entry
# ---------------------------------------------------------------------
joined = crime_mo.join(mo_codes, on="MO_code", how="left")

# Show physical plan (default strategy chosen by Catalyst)
joined.explain(True)
print("\n====================================================================================================================================================================================\n")

# ---------------------------------------------------------------------
# Aggregate frequency per MO code / description and sort descending
# ---------------------------------------------------------------------
result = joined.groupBy("MO_code","MO_description") \
    .agg(F.count("*").alias("freq")) \
    .orderBy(F.col("freq").desc()) \
    .select("MO_description","MO_code","freq")

end=time.time()
overall_time=end-start

# ---------------------------------------------------------------------
# Display full result and execution stats
# ---------------------------------------------------------------------
result.show(774)
print(f"\nOverall time: {overall_time:.2f}")

# Number of rows in the aggregated result
print("Number of rows of result table:", result.count())

# Number of rows in MO_codes dictionary
print("Number of rows of mocodes.txt table:", mo_codes.count())


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
119,application_1761923966900_0134,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [MO_code])
:- Filter NOT (MO_code#182 = )
:  +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
:     +- Generate explode(MO_array#151), false, [MO_code#182]
:        +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
:           +- Union false, false
:              :- Relation [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA#4,AREA NAME#5,Rpt Dist N

## Observation – Missing MO codes between datasets

 Check which MO codes appear in the crime dataset but not in MO_codes.txt,
 and vice versa, to explain mismatched row counts and NULL descriptions.

In [17]:
# Unique MO codes from the crime dataset (after explode)
crime_mo_codes = crime_mo.select("MO_code").distinct()

# Unique MO codes from the MO_codes dictionary file
mo_codes_unique = mo_codes.select("MO_code").distinct()

# MO codes that appear in the crime dataset but not in MO_codes.txt
missing_in_txt = crime_mo_codes.join(mo_codes_unique, on="MO_code", how="left_anti")
print("MO_codes of crime dataset that do not exist in MO_codes.txt:")
missing_in_txt.show()

# MO codes that appear in MO_codes.txt but not in the crime dataset
missing_in_crime = mo_codes_unique.join(crime_mo_codes, on="MO_code", how="left_anti")
print("MO_codes of MO_codes.txt that do not exist in crime dataset:")
missing_in_crime.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

MO_codes of crime dataset that do not exist in MO_codes.txt:
+-------+
|MO_code|
+-------+
|   0851|
|   0858|
|   4021|
|   4014|
|   3026|
|   2213|
|   3032|
|   2202|
|   0859|
|   4016|
|   3029|
|   2120|
|   0855|
|   4018|
|   0857|
|   2206|
|   2118|
|   0850|
|   3039|
|   2204|
+-------+
only showing top 20 rows

MO_codes of MO_codes.txt that do not exist in crime dataset:
+-------+
|MO_code|
+-------+
|persons|
|   1529|
+-------+

# 3.2 DataFrame API – Broadcast hash join strategy

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# ---------------------------------------------------------------------
# Schema definition
# ---------------------------------------------------------------------
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

start = time.time()

# Load and union crime datasets
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

# Split and explode Mocodes
crime_mo = crime_df.withColumn(
    "MO_array", F.split(F.col("Mocodes"), " ")
)

crime_mo = crime_mo.withColumn(
    "MO_code", F.explode("MO_array")
).filter(F.col("MO_code") != "")   # Πετάμε τα κενά strings

# Load MO_codes dictionary
mo_schema = StructType([
    StructField("raw", StringType())
])

mo_codes_raw = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Brake the line in 2 parts: code + description
mo_codes = mo_codes_raw.select(
    F.split(F.col("value"), " ", 2).getItem(0).alias("MO_code"),
    F.split(F.col("value"), " ", 2).getItem(1).alias("MO_description")
)

# ---------------------------------------------------------------------
# Broadcast hash join hint on the MO_codes side
# ---------------------------------------------------------------------
joined = crime_mo.join(
    mo_codes.hint("broadcast"),
    on="MO_code",
    how="left"
)

joined.explain(True)
print("\n====================================================================================================================================================================================\n")

# Aggregate frequency and display
result = joined.groupBy("MO_code","MO_description") \
    .agg(F.count("*").alias("freq")) \
    .orderBy(F.col("freq").desc()) \
    .select("MO_description","MO_code","freq")

end=time.time()
overall_time=end-start

result.show(20)
print(f"\nOverall time: {overall_time:.2f}")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
120,application_1761923966900_0135,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [MO_code])
:- Filter NOT (MO_code#182 = )
:  +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
:     +- Generate explode(MO_array#151), false, [MO_code#182]
:        +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
:           +- Union false, false
:              :- Relation [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA#4,AREA NAME#5,Rpt Dist N

## DataFrame API – Sort-merge join strategy

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Schema definition
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

start = time.time()

# Load and union crime datasets
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

# Split and explode Mocodes
crime_mo = crime_df.withColumn(
    "MO_array", F.split(F.col("Mocodes"), " ")
)

crime_mo = crime_mo.withColumn(
    "MO_code", F.explode("MO_array")
).filter(F.col("MO_code") != "")   # Πετάμε τα κενά strings

# Load MO_codes dictionary
mo_schema = StructType([
    StructField("raw", StringType())
])

mo_codes_raw = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# BRake the line in 2 parts: code + description
mo_codes = mo_codes_raw.select(
    F.split(F.col("value"), " ", 2).getItem(0).alias("MO_code"),
    F.split(F.col("value"), " ", 2).getItem(1).alias("MO_description")
)

# ---------------------------------------------------------------------
# Sort-merge join hint
# ---------------------------------------------------------------------
joined = crime_mo.join(
    mo_codes.hint("merge"),
    on="MO_code",
    how="left"
)

joined.explain(True)
print("\n====================================================================================================================================================================================\n")

# ----- 6. Count συχνότητα εμφάνισης -----
result = joined.groupBy("MO_code","MO_description") \
    .agg(F.count("*").alias("freq")) \
    .orderBy(F.col("freq").desc()) \
    .select("MO_description","MO_code","freq")

end=time.time()
overall_time=end-start

result.show(20)
print(f"\nOverall time: {overall_time:.2f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
121,application_1761923966900_0136,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [MO_code])
:- Filter NOT (MO_code#182 = )
:  +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
:     +- Generate explode(MO_array#151), false, [MO_code#182]
:        +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
:           +- Union false, false
:              :- Relation [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA#4,AREA NAME#5,Rpt Dist N

## DataFrame API – Shuffle hash join strategy

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Schema definition
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

start = time.time()

# Load and union crime datasets
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)

# Split and explode Mocodes
crime_mo = crime_df.withColumn(
    "MO_array", F.split(F.col("Mocodes"), " ")
)

crime_mo = crime_mo.withColumn(
    "MO_code", F.explode("MO_array")
).filter(F.col("MO_code") != "")   # Πετάμε τα κενά strings

# Load MO_codes dictionary
mo_schema = StructType([
    StructField("raw", StringType())
])

mo_codes_raw = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Brake the line 2 parts: code + description
mo_codes = mo_codes_raw.select(
    F.split(F.col("value"), " ", 2).getItem(0).alias("MO_code"),
    F.split(F.col("value"), " ", 2).getItem(1).alias("MO_description")
)

# ---------------------------------------------------------------------
# Shuffle hash join hint
# ---------------------------------------------------------------------
joined = crime_mo.join(
    mo_codes.hint("shuffle_hash"),
    on="MO_code",
    how="left"
)

joined.explain(True)
print("\n====================================================================================================================================================================================\n")

# ----- 6. Count frequency -----
result = joined.groupBy("MO_code","MO_description") \
    .agg(F.count("*").alias("freq")) \
    .orderBy(F.col("freq").desc()) \
    .select("MO_description","MO_code","freq")

end=time.time()
overall_time=end-start

result.show(20)
print(f"\nOverall time: {overall_time:.2f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
391,application_1765289937462_0387,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [MO_code])
:- Filter NOT (MO_code#182 = )
:  +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
:     +- Generate explode(MO_array#151), false, [MO_code#182]
:        +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
:           +- Union false, false
:              :- Relation [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA#4,AREA NAME#5,Rpt Dist N

## DataFrame API – Shuffle replicate nested loop join strategy

In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Schema definition
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])

start = time.time()

# Load and union crime datasets
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crime_df = crime_df1.union(crime_df2)


# Split and explode Mocodes
crime_mo = crime_df.withColumn(
    "MO_array", F.split(F.col("Mocodes"), " ")
)

crime_mo = crime_mo.withColumn(
    "MO_code", F.explode("MO_array")
).filter(F.col("MO_code") != "")   # Πετάμε τα κενά strings

# Load MO_codes dictionary
mo_schema = StructType([
    StructField("raw", StringType())
])

mo_codes_raw = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Brake the line in 2 parts: code + description
mo_codes = mo_codes_raw.select(
    F.split(F.col("value"), " ", 2).getItem(0).alias("MO_code"),
    F.split(F.col("value"), " ", 2).getItem(1).alias("MO_description")
)

# ---------------------------------------------------------------------
# Shuffle replicate nested-loop join hint
# ---------------------------------------------------------------------
joined = crime_mo.join(
    mo_codes.hint("shuffle_replicate_nl"),
    on="MO_code",
    how="left"
)

joined.explain(True)
print("\n====================================================================================================================================================================================\n")

# ----- 6. Count frequency -----
result = joined.groupBy("MO_code","MO_description") \
    .agg(F.count("*").alias("freq")) \
    .orderBy(F.col("freq").desc()) \
    .select("MO_description","MO_code","freq")

end=time.time()
overall_time=end-start

result.show(20)
print(f"\nOverall time: {overall_time:.2f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
122,application_1761923966900_0137,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [MO_code])
:- Filter NOT (MO_code#182 = )
:  +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 6 more fields]
:     +- Generate explode(MO_array#151), false, [MO_code#182]
:        +- Project [DR_NO#0, Date Rptd#1, DATE OCC#2, TIME OCC#3, AREA#4, AREA NAME#5, Rpt Dist No#6, Part 1-2#7, Crm Cd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, Vict Sex#12, Vict Descent#13, Premis Cd#14, Premis Desc#15, Weapon Used Cd#16, Weapon Desc#17, Status#18, Status Desc#19, Crm Cd 1#20, Crm Cd 2#21, Crm Cd 3#22, Crm Cd 4#23, ... 5 more fields]
:           +- Union false, false
:              :- Relation [DR_NO#0,Date Rptd#1,DATE OCC#2,TIME OCC#3,AREA#4,AREA NAME#5,Rpt Dist N

## RDD API

In [1]:
from pyspark.sql import SparkSession
import time
from pyspark.sql import Row
import csv
from io import StringIO

# ---------------------------------------------------------------------
# Spark session configuration
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query3 RDD API") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

sc = spark.sparkContext

# ---------------------------------------------------------------------
# Helper: safe CSV parsing using Python's csv.reader
# ---------------------------------------------------------------------
def parse_csv(line):
    # Handles commas inside quotes and other CSV edge cases
    return next(csv.reader(StringIO(line)))

start = time.time()

# ---------------------------------------------------------------------
# Read both crime CSV files as RDDs of text lines
# ---------------------------------------------------------------------
crime_rdd1 = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
)
crime_rdd2 = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
)

# ---------------------------------------------------------------------
# Remove headers from both RDDs
# ---------------------------------------------------------------------
header1 = crime_rdd1.first()
header2 = crime_rdd2.first()
crime_rdd1 = crime_rdd1.filter(lambda x: x != header1)
crime_rdd2 = crime_rdd2.filter(lambda x: x != header2)

# ---------------------------------------------------------------------
# Union and parse CSV lines
# ---------------------------------------------------------------------
crime_rdd = crime_rdd1.union(crime_rdd2)
crime_rdd = crime_rdd.map(parse_csv)

# ---------------------------------------------------------------------
# Extract MO codes from Mocodes column (index 10)
# Split on whitespace and emit (code, 1) for each non-empty code
# ---------------------------------------------------------------------
mocodes_rdd = crime_rdd.flatMap(lambda row: [(code, 1) for code in row[10].split() if code.strip() != ""])

# ---------------------------------------------------------------------
# Reduce by key to get frequency per MO code
# ---------------------------------------------------------------------
mo_counts_rdd = mocodes_rdd.reduceByKey(lambda a, b: a + b)

# ---------------------------------------------------------------------
# Load MO_codes.txt as (code, description)
# ---------------------------------------------------------------------
mo_rdd = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"
)
mo_rdd = mo_rdd.map(lambda line: line.strip().split(" ", 1))  # (code, description)

# ---------------------------------------------------------------------
# Left outer join: keep all MO codes from crime data
# ---------------------------------------------------------------------
joined_rdd = mo_counts_rdd.leftOuterJoin(mo_rdd) \
                          .map(lambda x: (x[0], x[1][0], x[1][1]))  # (mocode, count, description)

# ---------------------------------------------------------------------
# Sort by frequency (descending) and display top 20 codes
# ---------------------------------------------------------------------
final_rdd = joined_rdd.sortBy(lambda x: x[1], ascending=False)

for row in final_rdd.take(20):
    print(row)

end = time.time()
print(f"\nOverall time: {end - start:.2f} sec")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
299,application_1761923966900_0314,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('0344', 1002900, 'Removes vict property')
('1822', 548422, 'Stranger')
('0416', 404773, 'Hit-Hit w/ weapon')
('0329', 377536, 'Vandalized')
('0913', 278618, 'Victim knew Suspect')
('2000', 256188, 'Domestic violence')
('1300', 219082, 'Vehicle involved')
('0400', 213165, 'Force used')
('1402', 177470, 'Evidence Booked (any crime)')
('1609', 131229, 'Smashed')
('1309', 122108, 'Susp uses vehicle')
('1202', 120238, 'Victim was aged (60 & over) or blind/physically disabled/unable to care for self')
('0325', 120159, 'Took merchandise')
('1814', 118073, 'Susp is/was current/former boyfriend/girlfriend')
('0444', 116763, 'Pushed')
('1501', 115589, 'Other MO (see rpt)')
('1307', 113609, 'Breaks window')
('0334', 105665, 'Brandishes weapon')
('2004', 93426, 'Suspect is homeless/transient')
('0432', 83562, 'Intimidation')

Overall time: 17.53 sec

# Query 4 – Nearest police station and average distance per division

Goal:

Using Apache Sedona and the LA crime + police station datasets:

1. Represent each crime and police station as a geospatial POINT.

2. For every crime, find the nearest police station based on spherical distance.

3.Aggregate, per police division, the:

 - Average distance from crimes to their closest station (in meters)

 - Total number of crimes assigned to that division

4. Compare performance under different cluster configurations:

 - 1 core, 2 GB per executor

 - 2 cores, 4 GB per executor

 - 4 cores, 8 GB per executor

All three implementations use the same DataFrame logic; only the Spark resource configuration changes.

## 4.1 DataFrame + Sedona – 1 core, 2 GB per executor

In [1]:
from sedona.spark import *
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import  round, col, broadcast, row_number, avg, count 
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import asc



# ---------------------------------------------------------------------
# Spark session configuration (1 core, 2 GB per executor)
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()
# Register Sedona functions (ST_Point, ST_DistanceSphere, etc.)
SedonaRegistrator.registerAll(spark)


# ---------------------------------------------------------------------
# Schema definitions for crime data and police station data
# ---------------------------------------------------------------------
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])


police_schema = StructType([
    StructField("X", DoubleType()),          
    StructField("Y", DoubleType()),          
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])


start = time.time()

# ---------------------------------------------------------------------
# Load crime data from both CSV files and union
# ---------------------------------------------------------------------
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crimes_df = crime_df1.union(crime_df2)

# ---------------------------------------------------------------------
# Load police station dataset
# ---------------------------------------------------------------------
stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv", header=True, schema=police_schema)


# ---------------------------------------------------------------------
# Create geospatial POINT objects for crimes and stations
# ---------------------------------------------------------------------
crimes_df = crimes_df \
    .withColumn("crime_point", ST_Point(col("LON"), col("LAT")))

stations_df = stations_df \
    .withColumn("station_point", ST_Point(col("X"), col("Y")))

# ---------------------------------------------------------------------
# Cross join crimes ↔ stations
# (Cartesian product to later pick the nearest station per crime)
# ---------------------------------------------------------------------
joined_df = crimes_df.crossJoin(stations_df)

joined_df.explain(True)

# ---------------------------------------------------------------------
# Compute spherical distance (in meters) between crime and station
# ---------------------------------------------------------------------
joined_df = joined_df.withColumn(
    "distance_m",
    ST_DistanceSphere(col("crime_point"), col("station_point"))
)

# ---------------------------------------------------------------------
# For each crime, keep only the nearest station (min distance)
# ---------------------------------------------------------------------
window = Window.partitionBy("DR_NO").orderBy(asc("distance_m"))

closest_df = joined_df.withColumn(
    "rn",
    row_number().over(window)
).filter(col("rn") == 1).drop("rn")


# ---------------------------------------------------------------------
# Aggregate per division: average distance + crime count
# ---------------------------------------------------------------------
result_df = closest_df.groupBy("DIVISION").agg(
    round(avg("distance_m"),2).alias("avg_distance_m"),
    count("*").alias("crime_count")
).orderBy(col("crime_count").desc())


end = time.time()
print(f"\nOverall time: {end - start:.2f} sec")

# ---------------------------------------------------------------------
# Display result (reordered columns)
# ---------------------------------------------------------------------
# reorder columns
result_df = result_df.select(
    "DIVISION",
    "avg_distance_m",
    "crime_count"   # εδώ το βάζεις τελευταίο
)

result_df.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
304,application_1761923966900_0319,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Join Cross
:- Project [DR_NO#24, Date Rptd#25, DATE OCC#26, TIME OCC#27, AREA#28, AREA NAME#29, Rpt Dist No#30, Part 1-2#31, Crm Cd#32, Crm Cd Desc#33, Mocodes#34, Vict Age#35, Vict Sex#36, Vict Descent#37, Premis Cd#38, Premis Desc#39, Weapon Used Cd#40, Weapon Desc#41, Status#42, Status Desc#43, Crm Cd 1#44, Crm Cd 2#45, Crm Cd 3#46, Crm Cd 4#47, ... 5 more fields]
:  +- Union false, false
:     :- Relation [DR_NO#24,Date Rptd#25,DATE OCC#26,TIME OCC#27,AREA#28,AREA NAME#29,Rpt Dist No#30,Part 1-2#31,Crm Cd#32,Crm Cd Desc#33,Mocodes#34,Vict Age#35,Vict Sex#36,Vict Descent#37,Premis Cd#38,Premis Desc#39,Weapon Used Cd#40,Weapon Desc#41,Status#42,Status Desc#43,Crm Cd 1#44,Crm Cd 2#45,Crm Cd 3#46,Crm Cd 4#47,... 4 more fields] csv
:     +- Relation [DR_NO#80,Date Rptd#81,DATE OCC#82,TIME OCC#83,AREA#84,AREA NAME#85,Rpt Dist No#86,Part 1-2#87,Crm Cd#88,Crm Cd Desc#89,Mocodes#90,Vict Age#91,Vict Sex#92,Vict Descent#93,Premis Cd#94,Premis Desc#95,Weapon Used Cd#9

## 4.2 DataFrame + Sedona – 2 cores, 4 GB per executor

In [1]:
from sedona.spark import *
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import round,col, broadcast, row_number, avg, count
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import asc

# ---------------------------------------------------------------------
# Spark session configuration (2 cores, 4 GB per executor)
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Schemascrime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])


police_schema = StructType([
    StructField("X", DoubleType()),          
    StructField("Y", DoubleType()),          
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])


start = time.time()

# Load crime data and union
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crimes_df = crime_df1.union(crime_df2)

# Load stations data
stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv", header=True, schema=police_schema)


# Create POINT geometries
crimes_df = crimes_df \
    .withColumn("crime_point", ST_Point(col("LON"), col("LAT")))

stations_df = stations_df \
    .withColumn("station_point", ST_Point(col("X"), col("Y")))

# Cross join crimes ↔ stations
joined_df = crimes_df.crossJoin(stations_df)

joined_df.explain(True)

# Distance computation
joined_df = joined_df.withColumn(
    "distance_m",
    ST_DistanceSphere(col("crime_point"), col("station_point"))
)

# Nearest station per crime
window = Window.partitionBy("DR_NO").orderBy(asc("distance_m"))

closest_df = joined_df.withColumn(
    "rn",
    row_number().over(window)
).filter(col("rn") == 1).drop("rn")


# Aggregation per division
result_df = closest_df.groupBy("DIVISION").agg(
    round(avg("distance_m"),2).alias("avg_distance_m"),
    count("*").alias("crime_count")
).orderBy(col("crime_count").desc())


end = time.time()
print(f"\nOverall time: {end - start:.2f} sec")

# Reorder and show
result_df = result_df.select(
    "DIVISION",
    "avg_distance_m",
    "crime_count"   # εδώ το βάζεις τελευταίο
)

result_df.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
303,application_1761923966900_0318,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Join Cross
:- Project [DR_NO#24, Date Rptd#25, DATE OCC#26, TIME OCC#27, AREA#28, AREA NAME#29, Rpt Dist No#30, Part 1-2#31, Crm Cd#32, Crm Cd Desc#33, Mocodes#34, Vict Age#35, Vict Sex#36, Vict Descent#37, Premis Cd#38, Premis Desc#39, Weapon Used Cd#40, Weapon Desc#41, Status#42, Status Desc#43, Crm Cd 1#44, Crm Cd 2#45, Crm Cd 3#46, Crm Cd 4#47, ... 5 more fields]
:  +- Union false, false
:     :- Relation [DR_NO#24,Date Rptd#25,DATE OCC#26,TIME OCC#27,AREA#28,AREA NAME#29,Rpt Dist No#30,Part 1-2#31,Crm Cd#32,Crm Cd Desc#33,Mocodes#34,Vict Age#35,Vict Sex#36,Vict Descent#37,Premis Cd#38,Premis Desc#39,Weapon Used Cd#40,Weapon Desc#41,Status#42,Status Desc#43,Crm Cd 1#44,Crm Cd 2#45,Crm Cd 3#46,Crm Cd 4#47,... 4 more fields] csv
:     +- Relation [DR_NO#80,Date Rptd#81,DATE OCC#82,TIME OCC#83,AREA#84,AREA NAME#85,Rpt Dist No#86,Part 1-2#87,Crm Cd#88,Crm Cd Desc#89,Mocodes#90,Vict Age#91,Vict Sex#92,Vict Descent#93,Premis Cd#94,Premis Desc#95,Weapon Used Cd#9

## 4.3 DataFrame + Sedona – 4 cores, 8 GB per executor

In [1]:
from sedona.spark import *
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import round, col, broadcast, row_number, avg, count
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, year, to_date
from pyspark.sql import Window
from pyspark.sql.functions import row_number, count, sum as Fsum
import time
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import asc

# ---------------------------------------------------------------------
# Spark session configuration (4 cores, 8 GB per executor)
# ---------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("Crime Data - Query2 SQL API") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Schemas
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType())
])


police_schema = StructType([
    StructField("X", DoubleType()),          
    StructField("Y", DoubleType()),          
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])


start = time.time()

# Load crime data and union
crime_df1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    header=True,
    schema=crime_schema
)
crime_df2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
    header=True,
    schema=crime_schema
)

crimes_df = crime_df1.union(crime_df2)

# Load stations
stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv", header=True, schema=police_schema)

# Create POINT geometries
crimes_df = crimes_df \
    .withColumn("crime_point", ST_Point(col("LON"), col("LAT")))

stations_df = stations_df \
    .withColumn("station_point", ST_Point(col("X"), col("Y")))

# Cross join crimes ↔ stations
joined_df = crimes_df.crossJoin(stations_df)

joined_df.explain(True)

# Distance computation
joined_df = joined_df.withColumn(
    "distance_m",
    ST_DistanceSphere(col("crime_point"), col("station_point"))
)

# Nearest station per crime
window = Window.partitionBy("DR_NO").orderBy(asc("distance_m"))

closest_df = joined_df.withColumn(
    "rn",
    row_number().over(window)
).filter(col("rn") == 1).drop("rn")

# Aggregation per division
result_df = closest_df.groupBy("DIVISION").agg(
    round(avg("distance_m"),2).alias("avg_distance_m"),
    count("*").alias("crime_count")
).orderBy(col("crime_count").desc())


end = time.time()
print(f"\nOverall time: {end - start:.2f} sec")

# Reorder and show
result_df = result_df.select(
    "DIVISION",
    "avg_distance_m",
    "crime_count"   # εδώ το βάζεις τελευταίο
)

result_df.show()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
305,application_1761923966900_0320,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Join Cross
:- Project [DR_NO#24, Date Rptd#25, DATE OCC#26, TIME OCC#27, AREA#28, AREA NAME#29, Rpt Dist No#30, Part 1-2#31, Crm Cd#32, Crm Cd Desc#33, Mocodes#34, Vict Age#35, Vict Sex#36, Vict Descent#37, Premis Cd#38, Premis Desc#39, Weapon Used Cd#40, Weapon Desc#41, Status#42, Status Desc#43, Crm Cd 1#44, Crm Cd 2#45, Crm Cd 3#46, Crm Cd 4#47, ... 5 more fields]
:  +- Union false, false
:     :- Relation [DR_NO#24,Date Rptd#25,DATE OCC#26,TIME OCC#27,AREA#28,AREA NAME#29,Rpt Dist No#30,Part 1-2#31,Crm Cd#32,Crm Cd Desc#33,Mocodes#34,Vict Age#35,Vict Sex#36,Vict Descent#37,Premis Cd#38,Premis Desc#39,Weapon Used Cd#40,Weapon Desc#41,Status#42,Status Desc#43,Crm Cd 1#44,Crm Cd 2#45,Crm Cd 3#46,Crm Cd 4#47,... 4 more fields] csv
:     +- Relation [DR_NO#80,Date Rptd#81,DATE OCC#82,TIME OCC#83,AREA#84,AREA NAME#85,Rpt Dist No#86,Part 1-2#87,Crm Cd#88,Crm Cd Desc#89,Mocodes#90,Vict Age#91,Vict Sex#92,Vict Descent#93,Premis Cd#94,Premis Desc#95,Weapon Used Cd#9

## Query 5 – Income vs crime per capita across LA communities (COMM)

Goal:

Using LA census blocks, income data, and crime data:

1. Compute per-capita income per community (COMM) based on:

 - Census block population (POP20)

 - Housing units (HOUSING20)

 - Median income per ZIP (MedianIncome)

2. Spatially join crimes (2020–2021) with census blocks and:

 - Count crimes per COMM

 - Compute crime per capita per year per community

3. Study the correlation between:

 - Per-capita income

 - Crime-per-capita-per-year

4. Repeat under 3 cluster configurations, and separately analyze:

 - All communities

 - Top-10 richest communities

 - Bottom-10 poorest communities

 ### 5.1 All communities – Config 1 (2 executors × 4 cores, 8 GB)

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time  

# ---------------------------------------------------------------------
# Spark & Sedona configuration (Config 1)
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Query5 - Config1 (2x4, 8GB)")
    .config("spark.executor.instances", "2")
    .config("spark.executor.cores", "4")
    .config("spark.executor.memory", "8g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# Start timing AFTER Spark is ready
t_query_start = time.time()  

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean "MedianIncome" from strings like "$75,123" → 75123.0
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# ---- EXPLAIN for Join 1 (blocks ⨝ income) ----
print("\n=== EXPLAIN: Join blocks_df ⨝ income_df (Config 1) ===")
blocks_income_df.explain(True)

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: assign each crime to the block whose geometry contains it
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

# ---- EXPLAIN for spatial Join 2 (crimes ⨝ blocks) ----
print("\n=== EXPLAIN: Spatial Join crime_df_filtered ⨝ blocks_df (Config 1) ===")
crime_blocks_df.explain(True)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

# ---- EXPLAIN for Join 3 (comm_income ⨝ comm_crime) ----
print("\n=== EXPLAIN: Join comm_income_df ⨝ comm_crime_df (Config 1) ===")
comm_final_df.explain(True)

# Replace NULL crime counts with 0 and compute crime per capita per year
comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# Show top 20 COMM by income, with crime per capita
comm_final_df.select("COMM", "PerCapitaIncome", "CrimePerCapitaPerYear") \
    .orderBy(F.col("PerCapitaIncome").desc()) \
    .show(20, truncate=False)

# ===== Correlation (all communities) =====

comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

corr_all = comm_final_df_clean.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (all COMM):", corr_all)

# End timing AFTER last action
t_query_end = time.time()
print(f"[Config 1] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
164,application_1765289937462_0161,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Join blocks_df ? income_df (Config 1) ===
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [ZCTA20])
:- Filter isnotnull(block_geom#57)
:  +- Project [COMM#36, POP20#37L, HOUSING20#38L, ZCTA20#39, geometry_json#51, st_geomfromgeojson(geometry_json#51) AS block_geom#57]
:     +- Project [COMM#36, POP20#37L, HOUSING20#38L, ZCTA20#39, regexp_replace(regexp_replace(geometry_json#40, "\[, [, 1), \]", ], 1) AS geometry_json#51]
:        +- Project [feat#33.properties.COMM AS COMM#36, feat#33.properties.POP20 AS POP20#37L, feat#33.properties.HOUSING20 AS HOUSING20#38L, feat#33.properties.ZCTA20 AS ZCTA20#39, to_json(feat#33.geometry, Some(UTC)) AS geometry_json#40]
:           +- Project [feat#33]
:              +- Generate explode(features#25), false, [feat#33]
:                 +- Relation [crs#24,features#25,name#26,type#27] json
+- Project [ZCTA20#88, cast(regexp_replace(MedianIncome#93, ,, , 1) as double) AS MedianIncome#96]
   +- Project [ZCTA20#88, regexp_replace(Medi

### 5.2 Top-10 richest communities – Config 1

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# ---------------------------------------------------------------------
# Spark & Sedona configuration (Config 1 – Top 10)
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Query5 - Config1 (2x4, 8GB) - Top10")
    .config("spark.executor.instances", "2")
    .config("spark.executor.cores", "4")
    .config("spark.executor.memory", "8g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

# EXPLAIN for SPATIAL JOIN
print("\n=== EXPLAIN: Spatial Join Plan (Config 1 - Top 10) ===")
crime_blocks_df.explain(True)


# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === TOP 10 richest COMMs ===
top10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").desc())
    .limit(10)
    .select("COMM")
)

top10_list = [r["COMM"] for r in top10.collect()]

top10_df = comm_final_df_clean.filter(F.col("COMM").isin(top10_list))


# [ EXPLAIN for final TOP 10]
print("\n=== EXPLAIN: Final Top 10 Plan (Config 1) ===")
top10_df.explain(True)

print("=== TOP 10 richest COMMs ===")
top10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").desc()).show(truncate=False)

corr_top10 = top10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Top 10 richest):", corr_top10)

t_query_end = time.time()
print(f"[Config 1 - Top10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
336,application_1765289937462_0333,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 1 - Top 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part 1-

### 5.3 Bottom-10 poorest communities – Config 1

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time 

# ---------------------------------------------------------------------
# Spark & Sedona configuration (Config 1 – Bottom 10)
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Query5 - Config1 (2x4, 8GB) - Bottom10")
    .config("spark.executor.instances", "2")
    .config("spark.executor.cores", "4")
    .config("spark.executor.memory", "8g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# ---- Start timing (after Spark is ready) ----
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

print("\n=== EXPLAIN: Spatial Join Plan (Config 1 - Bottom 10) ===")
crime_blocks_df.explain(True)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === BOTTOM 10 poorest COMMs ===
bottom10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").asc())
    .limit(10)
    .select("COMM")
)

bottom10_list = [r["COMM"] for r in bottom10.collect()]

bottom10_df = comm_final_df_clean.filter(F.col("COMM").isin(bottom10_list))

print("\n=== EXPLAIN: Final Bottom 10 Plan (Config 1) ===")
bottom10_df.explain(True)


print("=== BOTTOM 10 poorest COMMs ===")
bottom10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").asc()).show(truncate=False)

corr_bottom10 = bottom10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Bottom 10 poorest):", corr_bottom10)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 1 - Bottom10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
365,application_1765289937462_0362,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 1 - Bottom 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part

### 5.4 All communities – Config 2 (4 executors × 2 cores, 4 GB)

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time  

# ---------------------------------------------------------------------
# Spark & Sedona configuration (Config 2)
# ---------------------------------------------------------------------
spark = (
    SparkSession.builder
    .appName("Query5 - Config2 (4x2, 4GB) - All COMM")
    .config("spark.executor.instances", "4")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# ---- Start timing AFTER Spark is ready ----
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ====================== EXPLAIN PLANS (for the report) ======================

print("\n=== EXPLAIN plan for spatial join crime_blocks_df (Config 2) ===")
crime_blocks_df.explain(mode="extended")

print("\n=== EXPLAIN plan for final join comm_final_df (Config 2) ===")
comm_final_df.explain(mode="extended")

# ====================== RESULTS & CORRELATION ======================

# Show top 20 COMM by income, with crime per capita
comm_final_df.select("COMM", "PerCapitaIncome", "CrimePerCapitaPerYear") \
    .orderBy(F.col("PerCapitaIncome").desc()) \
    .show(20, truncate=False)

# ===== Correlation =====

comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

corr_all = comm_final_df_clean.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (all COMM):", corr_all)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 2 - All COMM] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
176,application_1765289937462_0173,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN plan for spatial join crime_blocks_df (Config 2) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#1

### 5.5 Top-10 richest communities – Config 2

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# ---------------- Spark & Sedona ----------------

# Spark session with configuration 2
spark = (
    SparkSession.builder
    .appName("Query5 - Config2 (4x2, 4GB) - Top10 Richest")
    .config("spark.executor.instances", "4")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# Start timing AFTER Spark is ready
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)


# EXPLAIN for SPATIAL JOIN
print("\n=== EXPLAIN: Spatial Join Plan (Config 2 - Top 10) ===")
crime_blocks_df.explain(True)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === TOP 10 richest COMMs ===
top10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").desc())
    .limit(10)
    .select("COMM")
)

top10_list = [r["COMM"] for r in top10.collect()]

top10_df = comm_final_df_clean.filter(F.col("COMM").isin(top10_list))

# EXPLAIN for final TOP 10
print("\n=== EXPLAIN: Final Top 10 Plan (Config 1) ===")
top10_df.explain(True)

print("=== TOP 10 richest COMMs ===")
top10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").desc()).show(truncate=False)

corr_top10 = top10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Top 10 richest):", corr_top10)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 2 - Top10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
340,application_1765289937462_0337,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 2 - Top 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part 1-

### 5.6 Bottom-10 poorest communities – Config 2

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time   

# ---------------- Spark & Sedona ----------------

# Spark session with configuration 2
spark = (
    SparkSession.builder
    .appName("Query5 - Config2 (4x2, 4GB) - Bottom10 Poorest")
    .config("spark.executor.instances", "4")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "4g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# ---- Start timing (after Spark is ready) ----
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

print("\n=== EXPLAIN: Spatial Join Plan (Config 2 - Bottom 10) ===")
crime_blocks_df.explain(True)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === BOTTOM 10 poorest COMMs ===
bottom10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").asc())
    .limit(10)
    .select("COMM")
)

bottom10_list = [r["COMM"] for r in bottom10.collect()]

bottom10_df = comm_final_df_clean.filter(F.col("COMM").isin(bottom10_list))


print("\n=== EXPLAIN: Final Bottom 10 Plan (Config 2) ===")
bottom10_df.explain(True)

print("=== BOTTOM 10 poorest COMMs ===")
bottom10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").asc()).show(truncate=False)

corr_bottom10 = bottom10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Bottom 10 poorest):", corr_bottom10)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 2 - Bottom10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
371,application_1765289937462_0368,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 2 - Bottom 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part

### 5.7 All communities – Config 3 (8 executors × 1 cores, 2 GB)

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# ---------------- Spark & Sedona ----------------

# Spark session with configuration 3
spark = (
    SparkSession.builder
    .appName("Query5 - Config3 (8x1, 2GB) - ALL COMM")
    .config("spark.executor.instances", "8")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# ---- Start timing AFTER Spark is ready ----
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ====================== EXPLAIN PLANS (for the report) ======================

print("\n=== EXPLAIN plan for spatial join crime_blocks_df (Config 3) ===")
crime_blocks_df.explain(mode="extended")

print("\n=== EXPLAIN plan for final join comm_final_df (Config 3) ===")
comm_final_df.explain(mode="extended")

# ====================== RESULTS & CORRELATION ======================

# Show top 20 COMM by income, with crime per capita
comm_final_df.select("COMM", "PerCapitaIncome", "CrimePerCapitaPerYear") \
    .orderBy(F.col("PerCapitaIncome").desc()) \
    .show(20, truncate=False)

# ===== Correlation =====

comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

corr_all = comm_final_df_clean.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (all COMM):", corr_all)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 3 - All COMM] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
292,application_1765289937462_0289,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN plan for spatial join crime_blocks_df (Config 2) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#1

### 5.8 Top-10 richest communities – Config 3

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time  

# ---------------- Spark & Sedona ----------------

# Spark session with configuration 3
spark = (
    SparkSession.builder
    .appName("Query5 - Config3 (8x1, 2GB) - Top10 Richest")
    .config("spark.executor.instances", "8")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)
SedonaRegistrator.registerAll(spark)

# Start timing AFTER Spark is ready
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

# EXPLAIN for SPATIAL JOIN
print("\n=== EXPLAIN: Spatial Join Plan (Config 3 - Top 10) ===")
crime_blocks_df.explain(True)


# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === TOP 10 richest COMMs ===
top10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").desc())
    .limit(10)
    .select("COMM")
)

top10_list = [r["COMM"] for r in top10.collect()]


top10_df = comm_final_df_clean.filter(F.col("COMM").isin(top10_list))


# EXPLAIN for final TOP 10
print("\n=== EXPLAIN: Final Top 10 Plan (Config 3) ===")
top10_df.explain(True)

print("=== TOP 10 richest COMMs ===")
top10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").desc()).show(truncate=False)

corr_top10 = top10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Top 10 richest):", corr_top10)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 3 - Top10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
344,application_1765289937462_0341,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 3 - Top 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part 1-

### 5.9 Bottom-10 poorest communities – Config 3

In [1]:
import pyspark.sql.functions as F
from sedona.sql import ST_Point, ST_GeomFromGeoJSON, ST_Contains
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from sedona.spark import *
from pyspark.sql import SparkSession
import time  

# ---------------- Spark & Sedona ----------------

# Spark session with configuration 3
spark = (
    SparkSession.builder
    .appName("Query5 - Config3 (8x1, 2GB) - Bottom10 Poorest")
    .config("spark.executor.instances", "8")
    .config("spark.executor.cores", "1")
    .config("spark.executor.memory", "2g")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)

SedonaRegistrator.registerAll(spark)

# ---- Start timing (after Spark is ready) ----
t_query_start = time.time()

# ====================== INCOME / BLOCKS ======================

# 1. Read the GeoJSON FeatureCollection as JSON
raw_blocks_df = (
    spark.read
    .option("multiline", "true")
    .json("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson")
)

# 2. Explode "features" → one row per feature
features_df = raw_blocks_df.select(F.explode("features").alias("feat"))

# 3. Select needed properties + geometry, then convert geometry struct → GeoJSON string
#    and FIX malformed coordinate strings like "[-118.53,32.90]" → [-118.53,32.90]
blocks_df = (
    features_df
    .select(
        F.col("feat.properties.COMM").alias("COMM"),
        F.col("feat.properties.POP20").alias("POP20"),
        F.col("feat.properties.HOUSING20").alias("HOUSING20"),
        F.col("feat.properties.ZCTA20").alias("ZCTA20"),
        F.to_json(F.col("feat.geometry")).alias("geometry_json")
    )
    # fix the "coordinates" string issue WITHOUT UDFs
    .withColumn(
        "geometry_json",
        F.regexp_replace(
            F.regexp_replace(
                F.col("geometry_json"),
                '"\\[',   # turn "[-118,... into [-118,...
                '['
            ),
            '\\]"',      # turn ...]" into ...]
            ']'
        )
    )
    .withColumn(
        "block_geom",
        ST_GeomFromGeoJSON(F.col("geometry_json"))
    )
    .filter(F.col("block_geom").isNotNull())
)

# 4. Income CSV - median income per ZIP code
income_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv",
        header=True,
        sep=";"
    )
    .select(
        F.col("Zip Code").alias("ZCTA20"),
        F.col("Estimated Median Income").alias("MedianIncome")
    )
)

# Clean MedianIncome to numeric
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", "\\$", "")
)
income_df = income_df.withColumn(
    "MedianIncome",
    F.regexp_replace("MedianIncome", ",", "").cast("double")
)

# 5. Join blocks with income by ZCTA20
blocks_income_df = blocks_df.join(income_df, on="ZCTA20", how="left")

# 6. Block income = HOUSING20 * MedianIncome
blocks_income_df = blocks_income_df.withColumn(
    "BlockIncome",
    F.col("HOUSING20") * F.col("MedianIncome")
)

# 7. Aggregate per COMM
comm_income_df = blocks_income_df.groupBy("COMM").agg(
    F.sum("BlockIncome").alias("TotalIncome"),
    F.sum("POP20").alias("TotalPopulation")
)

# 8. Per capita income per COMM
comm_income_df = comm_income_df.withColumn(
    "PerCapitaIncome",
    F.col("TotalIncome") / F.col("TotalPopulation")
)

# ====================== CRIMES ======================

crime_df = (
    spark.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",
        header=True,
        inferSchema=True
    )
    .select(
        "DR_NO", "LAT", "LON", "Date Rptd", "DATE OCC"
    )
)

# Parse DATE OCC as timestamp
crime_df = crime_df.withColumn(
    "OCC_TS",
    F.to_timestamp(F.col("DATE OCC"), "yyyy MMM dd hh:mm:ss a")
)

# Year of occurrence
crime_df = crime_df.withColumn("YEAR_OCC", F.year(F.col("OCC_TS")))

# Keep only crimes in 2020 & 2021
crime_df_filtered = crime_df.filter(F.col("YEAR_OCC").isin(2020, 2021))

# Filter out (0,0) coordinates
crime_df_filtered = crime_df_filtered.filter(
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# Create crime point geometry (lon, lat)
crime_df_filtered = crime_df_filtered.withColumn(
    "crime_point",
    ST_Point(F.col("LON"), F.col("LAT"))
)

# ====================== SPATIAL JOIN & METRICS ======================

# Spatial join: which block each crime falls into
crime_blocks_df = crime_df_filtered.join(
    blocks_df,
    ST_Contains(F.col("block_geom"), F.col("crime_point")),
    how="left"
)

print("\n=== EXPLAIN: Spatial Join Plan (Config 3 - Bottom 10) ===")
crime_blocks_df.explain(True)

# Crimes per COMM over 2 years
comm_crime_df = crime_blocks_df.groupBy("COMM").agg(
    F.count("*").alias("CrimeCount_2yrs")
)

# Annual crimes per COMM (average over 2020–2021)
comm_crime_df = comm_crime_df.withColumn(
    "CrimePerYear",
    F.col("CrimeCount_2yrs") / F.lit(2.0)
)

# Final table: income + crime metrics
comm_final_df = comm_income_df.join(comm_crime_df, on="COMM", how="left")

comm_final_df = comm_final_df.withColumn(
    "CrimePerYear",
    F.coalesce(F.col("CrimePerYear"), F.lit(0.0))
).withColumn(
    "CrimePerCapitaPerYear",
    F.col("CrimePerYear") / F.col("TotalPopulation")
)

# ===== Correlation base DF (non-null) =====
comm_final_df_clean = comm_final_df.filter(
    F.col("PerCapitaIncome").isNotNull() &
    F.col("CrimePerCapitaPerYear").isNotNull()
)

# === BOTTOM 10 poorest COMMs ===
bottom10 = (
    comm_final_df_clean
    .orderBy(F.col("PerCapitaIncome").asc())
    .limit(10)
    .select("COMM")
)

bottom10_list = [r["COMM"] for r in bottom10.collect()]

bottom10_df = comm_final_df_clean.filter(F.col("COMM").isin(bottom10_list))

print("\n=== EXPLAIN: Final Bottom 10 Plan (Config 3) ===")
bottom10_df.explain(True)

print("=== BOTTOM 10 poorest COMMs ===")
bottom10_df.select(
    "COMM", "PerCapitaIncome", "CrimePerCapitaPerYear"
).orderBy(F.col("PerCapitaIncome").asc()).show(truncate=False)

corr_bottom10 = bottom10_df.stat.corr("PerCapitaIncome", "CrimePerCapitaPerYear")
print("Correlation (Bottom 10 poorest):", corr_bottom10)

# ---- End timing ----
t_query_end = time.time()
print(f"[Config 3 - Bottom10] Total Query 5 runtime: {t_query_end - t_query_start:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
388,application_1765289937462_0384,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== EXPLAIN: Spatial Join Plan (Config 3 - Bottom 10) ===
== Parsed Logical Plan ==
Join LeftOuter,  **org.apache.spark.sql.sedona_sql.expressions.ST_Contains**
:- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, YEAR_OCC#222,  **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS crime_point#230]
:  +- Filter NOT ((LAT#179 = cast(0 as double)) AND (LON#180 = cast(0 as double)))
:     +- Filter YEAR_OCC#222 IN (2020,2021)
:        +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, OCC_TS#215, year(cast(OCC_TS#215 as date)) AS YEAR_OCC#222]
:           +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155, to_timestamp(DATE OCC#155, Some(yyyy MMM dd hh:mm:ss a), TimestampType, Some(UTC), false) AS OCC_TS#215]
:              +- Project [DR_NO#153, LAT#179, LON#180, Date Rptd#154, DATE OCC#155]
:                 +- Relation [DR_NO#153,Date Rptd#154,DATE OCC#155,TIME OCC#156,AREA#157,AREA NAME#158,Rpt Dist No#159,Part