In [1]:
# Paths for csv
fcrime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
fcrime_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
fstations = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv"
fincome = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcodes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv"

# Paths for GeoJSON
fgeo = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"

mo_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"
fgeofields = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020_fields.csv"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1520,application_1765289937462_1506,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField, StructType, StringType, IntegerType, FloatType, DoubleType, 
    DateType, TimestampType
)
from pyspark.sql.functions import (
    col, sum, count, expr, coalesce, lit, when,
    year, avg, round, row_number, to_timestamp, 
    regexp_replace, to_date ,explode,split,trim,broadcast
)
from pyspark.sql.window import Window
import time

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
# Create the spark session
from sedona.spark import *
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Advanced DB") \
    .config('spark.executor.instances','4') \
    .config('spark.executor.cores','1') \
    .config('spark.executor.memory','2g') \
    .getOrCreate()

sedona = SedonaContext.create(spark)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1651,application_1765289937462_1635,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Crimes table

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df_10_19 = spark.read.csv(fcrime_2010_2019, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')
crimes_df_20_present = spark.read.csv(fcrime_2020_present, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

crimes_df_all = crimes_df_10_19.union(crimes_df_20_present)
print(f"total lines: {crimes_df_all.count()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

total lines: 3138128

Query 1

Υλοποίηση με DataFrame (Χωρίς UDF)

In [5]:
from pyspark.sql.functions import year, when, count, sum, col, regexp_replace

start_time_df = time.time()

# 1. Καθαρισμός και μετατροπή της στήλης Vict Age σε αριθμητικό τύπο
crimes_df_cleaned = crimes_df_all.withColumn(
    "Vict_Age_Num",
    regexp_replace(col("Vict Age"), "[^0-9]", "").cast(IntegerType())
).filter(col("Vict_Age_Num").isNotNull()) 

# 2. Φιλτράρισμα για "aggravated assault"
assault_df = crimes_df_cleaned.filter(
    col("Crm Cd Desc").ilike("%aggravated assault%")
)

# 3. Ομαδοποίηση σε ηλικιακές κατηγορίες
age_groups_df = assault_df.withColumn(
    "Age_Group",
    when(col("Vict_Age_Num") < 18, "Children: < 18")
    .when((col("Vict_Age_Num") >= 18) & (col("Vict_Age_Num") <= 24), "Young Adults: 18 to 24")
    .when((col("Vict_Age_Num") >= 25) & (col("Vict_Age_Num") <= 64), "Adults: 25 to 64")
    .when(col("Vict_Age_Num") > 64, "Elderly: >64")
    .otherwise("Not specified") 
)

# 4. Καταμέτρηση και ταξινόμηση
result_df = age_groups_df.groupBy("Age_Group").agg(
    count("*").alias("Total_Victims")
).orderBy(col("Total_Victims").desc())

# Εμφάνιση αποτελεσμάτων και μέτρηση χρόνου
result_df.show(truncate=False)
end_time_df = time.time()
time_df = end_time_df - start_time_df
print(f"Execution time DataFrame (without UDF): {time_df:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+-------------+
|Age_Group             |Total_Victims|
+----------------------+-------------+
|Adults: 25 to 64      |121660       |
|Young Adults: 18 to 24|33758        |
|Children: < 18        |16014        |
|Elderly: >64          |6011         |
+----------------------+-------------+

Execution time DataFrame (without UDF): 10.1763 seconds

Υλοποίηση με DataFrame (Με UDF)

In [5]:
from pyspark.sql.functions import udf

def get_age_group(age):
    """Καθορίζει την ηλικιακή ομάδα βάσει της ηλικίας."""
    if age is None:
        return "Άγνωστη/Μη Καθορισμένη"
    if age < 18:
        return "Children: < 18"
    elif 18 <= age <= 24:
        return "Young Adults: 18 to 24"
    elif 25 <= age <= 64:
        return "Adults: 25 to 64"
    elif age > 64:
        return "Elderly: >64"
    else:
        return "Not specified"

# Καταχώρηση του UDF
get_age_group_udf = udf(get_age_group, StringType())

start_time_udf = time.time()

# Επανάληψη βημάτων 1 & 2 από πριν
crimes_df_cleaned_udf = crimes_df_all.withColumn(
    "Vict_Age_Num",
    regexp_replace(col("Vict Age"), "[^0-9]", "").cast(IntegerType())
).filter(col("Vict_Age_Num").isNotNull())

assault_df_udf = crimes_df_cleaned_udf.filter(
    col("Crm Cd Desc").ilike("%aggravated assault%")
)

# 3. Ομαδοποίηση σε ηλικιακές κατηγορίες με UDF
age_groups_df_udf = assault_df_udf.withColumn(
    "Age_Group",
    get_age_group_udf(col("Vict_Age_Num"))
)

# 4. Καταμέτρηση και ταξινόμηση
result_df_udf = age_groups_df_udf.groupBy("Age_Group").agg(
    count("*").alias("Total_Victims")
).orderBy(col("Total_Victims").desc())

# Εμφάνιση αποτελεσμάτων και μέτρηση χρόνου
result_df_udf.show(truncate=False)
end_time_udf = time.time()
time_udf = end_time_udf - start_time_udf
print(f"Execution time of DataFrame API (with UDF): {time_udf:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+-------------+
|Age_Group             |Total_Victims|
+----------------------+-------------+
|Adults: 25 to 64      |121660       |
|Young Adults: 18 to 24|33758        |
|Children: < 18        |16014        |
|Elderly: >64          |6011         |
+----------------------+-------------+

Execution time of DataFrame API (with UDF): 9.6997 seconds

Υλοποίηση με RDD API

In [9]:
start_time_rdd = time.time()

# 1. Μετατροπή σε RDD και επιλογή των απαραίτητων στηλών (Vict Age, Crm Cd Desc)
crimes_rdd = crimes_df_all.rdd.map(lambda row: (row["Vict Age"], row["Crm Cd Desc"]))

def get_age_group_rdd(age_str):
    """Καθαρίζει την ηλικία και βρίσκει την ηλικιακή ομάδα."""
    try:
        # Καθαρισμός και μετατροπή σε int
        age = int("".join(filter(str.isdigit, age_str)))
    except:
        return None # Αγνοούμε μη αριθμητικές τιμές

    if age < 18:
        return "Children: < 18"
    elif 18 <= age <= 24:
        return "Young Adults: 18 to 24"
    elif 25 <= age <= 64:
        return "Adults: 25 to 64"
    elif age > 64:
        return "Elderly: >64"
    else:
        return None


# 2. Φιλτράρισμα για "aggravated assault"
assault_rdd = crimes_rdd.filter(
    lambda row: row[1] is not None and "aggravated assault" in row[1].lower()
)

# 3. Ομαδοποίηση και καταμέτρηση
result_rdd = assault_rdd \
    .map(lambda row: get_age_group_rdd(row[0])) \
    .filter(lambda group: group is not None) \
    .map(lambda group: (group, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda pair: pair[1], ascending=False)

# Εμφάνιση αποτελεσμάτων και μέτρηση χρόνου
print("\n+-------------------------+-------------+")
print("|Age_Group                |Total_Victims|")
print("+-------------------------+-------------+")
for group, count in result_rdd.collect():
    print(f"|{group:<25}|{count:^13}|")
print("+-------------------------+-------------+")
end_time_rdd = time.time()
time_rdd = end_time_rdd - start_time_rdd
print(f"Execution time of RDD API: {time_rdd:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


+-------------------------+-------------+
|Age_Group                |Total_Victims|
+-------------------------+-------------+
|Adults: 25 to 64         |   121660    |
|Young Adults: 18 to 24   |    33758    |
|Children: < 18           |    16014    |
|Elderly: >64             |    6011     |
+-------------------------+-------------+
Execution time of RDD API: 19.8276 seconds

Query 2

Υλοποίηση με DataFrame API

In [10]:
from pyspark.sql.functions import year, count, col, sum, row_number, when, to_timestamp, coalesce, round, lit
from pyspark.sql.window import Window
import time

start_time_df = time.time()

# Νέα στήλη Effective_Date (from string to timestamp)
df_processing = crimes_df_all \
    .withColumn("Effective_Date", to_timestamp(col("DATEOCC"), "yyyy MMM dd hh:mm:ss a")) \
    .filter(col("Effective_Date").isNotNull())

# Προσθήκη στήλης 'year' 
df_with_year = df_processing.withColumn("year", year(col("Effective_Date")))

# Ομαδοποίηση των φυλών
df_grouped_descent = df_with_year.withColumn(
    "VictDescent_Code",
    coalesce(col("VictDescent"), lit("X")) # Αν η τιμή είναι NULL, βάζουμε 'X'
).withColumn(
    "VictDescent_Code",
    when(col("VictDescent_Code") == "", "X") 
    .otherwise(col("VictDescent_Code"))
)

# Αντιστοίχιση Κωδικού με Πλήρες Όνομα
df_with_names = df_grouped_descent.withColumn(
    "VictDescent_Name",
    when(col("VictDescent_Code") == "A", lit("Other Asian"))
    .when(col("VictDescent_Code") == "B", lit("Black"))
    .when(col("VictDescent_Code") == "C", lit("Chinese"))
    .when(col("VictDescent_Code") == "D", lit("Cambodian"))
    .when(col("VictDescent_Code") == "F", lit("Filipino"))
    .when(col("VictDescent_Code") == "G", lit("Guamanian"))
    .when(col("VictDescent_Code") == "H", lit("Hispanic/Latin/Mexican"))
    .when(col("VictDescent_Code") == "I", lit("American Indian/Alaskan Native"))
    .when(col("VictDescent_Code") == "J", lit("Japanese"))
    .when(col("VictDescent_Code") == "K", lit("Korean"))
    .when(col("VictDescent_Code") == "L", lit("Laotian"))
    .when(col("VictDescent_Code") == "O", lit("Other"))
    .when(col("VictDescent_Code") == "P", lit("Pacific Islander"))
    .when(col("VictDescent_Code") == "S", lit("Samoan"))
    .when(col("VictDescent_Code") == "U", lit("Hawaiian"))
    .when(col("VictDescent_Code") == "V", lit("Vietnamese"))
    .when(col("VictDescent_Code") == "W", lit("White"))
    .when(col("VictDescent_Code") == "X", lit("Unknown"))
    .when(col("VictDescent_Code") == "Z", lit("Asian Indian"))
    .otherwise(col("VictDescent_Code")))
    
# Υπολογισμός Συνολικού Αριθμού Θυμάτων ανά Έτος
total_victims_per_year = df_grouped_descent.groupBy("year").agg(
    count("*").alias("Total_Victims_Year")
)

# Υπολογισμός Θυμάτων ανά Έτος και Φυλετικό Γκρουπ
victims_per_group = df_with_names.groupBy("year", "VictDescent_Name").agg(
    count("*").alias("Victims_Count")
)

# Συνένωση, Υπολογισμός Ποσοστού και Window Function
df_joined = victims_per_group.join(total_victims_per_year, on="year")

df_with_rank = df_joined.withColumn(
    "Percentage",
    (col("Victims_Count") / col("Total_Victims_Year") * 100)
)

window_spec = Window.partitionBy("year").orderBy(col("Victims_Count").desc())

# Εφαρμογή της λειτουργίας row_number()
df_ranked = df_with_rank.withColumn(
    "rank",
    row_number().over(window_spec)
)

#  Τελικό Φιλτράρισμα, Ταξινόμηση και Επιλογή Στηλών
result_df_api = df_ranked.filter(col("rank") <= 3) \
    .orderBy(col("year").desc(), col("Victims_Count").desc()) \
    .select(
        col("year"),
        col("VictDescent_Name").alias("Victim Descent"), 
        col("Victims_Count").alias("#"),
        round(col("Percentage"), 1).alias("%")
    )

result_df_api.show(1000, truncate=False)
end_time_df = time.time()
time_df = end_time_df - start_time_df
print(f"Execution time for DataFrame API: {time_df:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------------------+-----+----+
|year|Victim Descent        |#    |%   |
+----+----------------------+-----+----+
|2025|Unknown               |37   |38.1|
|2025|Hispanic/Latin/Mexican|34   |35.1|
|2025|White                 |13   |13.4|
|2024|Unknown               |49188|38.6|
|2024|Hispanic/Latin/Mexican|28576|22.4|
|2024|White                 |22958|18.0|
|2023|Hispanic/Latin/Mexican|69401|29.9|
|2023|Unknown               |59529|25.6|
|2023|White                 |44615|19.2|
|2022|Hispanic/Latin/Mexican|73111|31.1|
|2022|Unknown               |52130|22.2|
|2022|White                 |46695|19.8|
|2021|Hispanic/Latin/Mexican|63676|30.3|
|2021|Unknown               |46499|22.2|
|2021|White                 |44523|21.2|
|2020|Hispanic/Latin/Mexican|61606|30.8|
|2020|Unknown               |43958|22.0|
|2020|White                 |42638|21.3|
|2019|Hispanic/Latin/Mexican|72458|33.1|
|2019|White                 |48863|22.3|
|2019|Unknown               |36893|16.9|
|2018|Hispanic/L

Υλοποίηση με SQL API

In [11]:


start_time_sql = time.time()

# Καταχώρηση του ενωμένου DataFrame ως προσωρινό πίνακα
crimes_df_all.createOrReplaceTempView("crimes_table")

sql_query_final = """
WITH Cleaned_Crimes AS (
    -- Διόρθωση ημερομηνίας 
    SELECT
        YEAR(TO_TIMESTAMP(DATEOCC, 'yyyy MMM dd hh:mm:ss a')) AS year,
        
        -- Εξαγωγή του κωδικού φυλής
        CASE
            WHEN COALESCE(VictDescent, 'X') = '' THEN 'X'
            ELSE COALESCE(VictDescent, 'X')
        END AS VictDescent_Code,
        
        -- Αντιστοίχιση κωδικού σε πλήρες όνομα
        CASE COALESCE(VictDescent, 'X')
            WHEN 'A' THEN 'Other Asian'
            WHEN 'B' THEN 'Black'
            WHEN 'C' THEN 'Chinese'
            WHEN 'D' THEN 'Cambodian'
            WHEN 'F' THEN 'Filipino'
            WHEN 'G' THEN 'Guamanian'
            WHEN 'H' THEN 'Hispanic/Latin/Mexican'
            WHEN 'I' THEN 'American Indian/Alaskan Native'
            WHEN 'J' THEN 'Japanese'
            WHEN 'K' THEN 'Korean'
            WHEN 'L' THEN 'Laotian'
            WHEN 'O' THEN 'Other'
            WHEN 'P' THEN 'Pacific Islander'
            WHEN 'S' THEN 'Samoan'
            WHEN 'U' THEN 'Hawaiian'
            WHEN 'V' THEN 'Vietnamese'
            WHEN 'W' THEN 'White'
            WHEN 'Z' THEN 'Asian Indian'
            ELSE 'Unknown' 
        END AS VictDescent_Name
    FROM
        crimes_table
    WHERE
        -- Φιλτράρουμε τις μη έγκυρες ημερομηνίες
        TO_TIMESTAMP(DATEOCC, 'yyyy MMM dd hh:mm:ss a') IS NOT NULL
),
Victims_Per_Group AS (
    -- Υπολογισμός θυμάτων ανά έτος και πλήρες όνομα, και σύνολο θυμάτων ανά έτος 
    SELECT
        year,
        VictDescent_Name,
        COUNT(*) AS Victims_Count,
        SUM(COUNT(*)) OVER (PARTITION BY year) AS Total_Victims_Year
    FROM
        Cleaned_Crimes
    GROUP BY
        year, VictDescent_Name
),
Victims_With_Rank AS (
    -- Υπολογισμός ποσοστού και κατάταξης 
    SELECT
        year,
        VictDescent_Name,
        Victims_Count,
        (Victims_Count * 100.0 / Total_Victims_Year) AS Percentage,
        ROW_NUMBER() OVER (
            PARTITION BY year
            ORDER BY Victims_Count DESC
        ) AS rank
    FROM
        Victims_Per_Group
)
-- Τελικό Φιλτράρισμα και Ταξινόμηση
SELECT
    year,
    VictDescent_Name AS `Victim Descent`,
    Victims_Count AS `#`,
    ROUND(Percentage, 1) AS `%`
FROM
    Victims_With_Rank
WHERE
    rank <= 3
ORDER BY
    year DESC, Victims_Count DESC
"""

result_df_sql = spark.sql(sql_query_final)

result_df_sql.show(1000, truncate=False)
end_time_sql = time.time()
time_sql = end_time_sql - start_time_sql
print(f"Execution time for SQL API: {time_sql:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------------------+-----+----+
|year|Victim Descent        |#    |%   |
+----+----------------------+-----+----+
|2025|Unknown               |37   |38.1|
|2025|Hispanic/Latin/Mexican|34   |35.1|
|2025|White                 |13   |13.4|
|2024|Unknown               |49188|38.6|
|2024|Hispanic/Latin/Mexican|28576|22.4|
|2024|White                 |22958|18.0|
|2023|Hispanic/Latin/Mexican|69401|29.9|
|2023|Unknown               |59531|25.6|
|2023|White                 |44615|19.2|
|2022|Hispanic/Latin/Mexican|73111|31.1|
|2022|Unknown               |52130|22.2|
|2022|White                 |46695|19.8|
|2021|Hispanic/Latin/Mexican|63676|30.3|
|2021|Unknown               |46499|22.2|
|2021|White                 |44523|21.2|
|2020|Hispanic/Latin/Mexican|61606|30.8|
|2020|Unknown               |43958|22.0|
|2020|White                 |42638|21.3|
|2019|Hispanic/Latin/Mexican|72458|33.1|
|2019|White                 |48863|22.3|
|2019|Unknown               |36894|16.9|
|2018|Hispanic/L

Query 3

In [3]:
#Paths
crime_paths = [
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
]
mo_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from pyspark.sql.functions import explode,split,trim,broadcast


start_time=time.time();

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])




# Load and union CSVs (infer schema automatically)
crime_df = spark.read.csv(
    crime_paths,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)



mocodes_df = crime_df.select(
    explode(
        split(
            regexp_replace(col("MOCODES"), r"[,\t;]", " "),
            " "
        )
    ).alias("MOCODE")
).filter(
    col("MOCODE").isNotNull() & (trim(col("MOCODE")) != "")
)

# Count occurrences
mocode_counts_df = mocodes_df.groupBy("MOCODE").count().withColumnRenamed("count", "total_count")

# Load descriptions
mocode_raw = spark.read.text(mo_path)


# Load raw MOCODE descriptions
mocode_raw = spark.read.text(mo_path)

mo_codes = (
    mocode_raw
    .withColumn("MOCODE", split(col("value"), " ")[0])
    # Extract Description as before
    .withColumn("Description", trim(regexp_replace(col("value"), r"^\S+\s+", "")))
    
    .filter(col("MOCODE").isNotNull()) 
    .filter(col("MOCODE") != "") 
    
    # Handle the Description edge case (if you still want "Unknown" for a missing description)
    .withColumn("Description", when(col("Description") == "", lit("Unknown")).otherwise(col("Description")))
    
    # Select and Deduplicate as before
    .select("MOCODE", "Description")
    .dropDuplicates(["MOCODE"])
)


# Join counts with descriptions
joined_df= mocode_counts_df.join(mo_codes,
    on="MOCODE",
    how="inner"
)
mocode_with_desc_df = joined_df.orderBy(col("total_count").desc())
mocode_with_desc_df.count()

# --- Show top 20 MOCODES with descriptions ---
end_time=time.time();
time_elapsed=end_time-start_time;
print(time_elapsed);
mocode_with_desc_df.show(20,truncate=False)
mocode_with_desc_df.explain()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

14.85282301902771
+------+-----------+--------------------------------------------------------------------------------+
|MOCODE|total_count|Description                                                                     |
+------+-----------+--------------------------------------------------------------------------------+
|0344  |1002900    |Removes vict property                                                           |
|1822  |548422     |Stranger                                                                        |
|0416  |404773     |Hit-Hit w/ weapon                                                               |
|0329  |377536     |Vandalized                                                                      |
|0913  |278618     |Victim knew Suspect                                                             |
|2000  |256188     |Domestic violence                                                               |
|1300  |219082     |Vehicle involved                            

In [5]:
# --- Setup & Read Data ---

start_time=time.time();

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crime_df = spark.read.csv(
    crime_paths,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

mocodes_df = crime_df.select(
    explode(
        split(
            regexp_replace(col("MOCODES"), r"[,\t;]", " "),
            " "
        )
    ).alias("MOCODE")
).filter(
    col("MOCODE").isNotNull() & (trim(col("MOCODE")) != "")
)

mocode_counts_df = mocodes_df.groupBy("MOCODE").count().withColumnRenamed("count", "total_count")

mocode_raw = spark.read.text(mo_path)

mo_codes = (
    mocode_raw
    .withColumn("MOCODE", split(col("value"), " ")[0])
    # Extract Description as before
    .withColumn("Description", trim(regexp_replace(col("value"), r"^\S+\s+", "")))
    
    .filter(col("MOCODE").isNotNull()) 
    .filter(col("MOCODE") != "") 
    
    # Handle the Description edge case (if you still want "Unknown" for a missing description)
    .withColumn("Description", when(col("Description") == "", lit("Unknown")).otherwise(col("Description")))
    
    # Select and Deduplicate as before
    .select("MOCODE", "Description")
    .dropDuplicates(["MOCODE"])
)
# --- Join with Broadcast ---
joined_df = mocode_counts_df.join(broadcast(mo_codes), on="MOCODE", how="inner")
mocode_with_desc_df = joined_df.orderBy(col("total_count").desc())

mocode_with_desc_df.count()
elapsed = time.time() - start_time
print(f"BROADCAST join time: {elapsed:.4f} sec")
print("BROADCAST Join Top 20:")
mocode_with_desc_df.show(20, truncate=False)
mocode_with_desc_df.explain()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

BROADCAST join time: 12.5029 sec
BROADCAST Join Top 20:
+------+-----------+--------------------------------------------------------------------------------+
|MOCODE|total_count|Description                                                                     |
+------+-----------+--------------------------------------------------------------------------------+
|0344  |1002900    |Removes vict property                                                           |
|1822  |548422     |Stranger                                                                        |
|0416  |404773     |Hit-Hit w/ weapon                                                               |
|0329  |377536     |Vandalized                                                                      |
|0913  |278618     |Victim knew Suspect                                                             |
|2000  |256188     |Domestic violence                                                               |
|1300  |219082     |Vehicl

In [4]:
# --- Setup & Read Data ---

start_time=time.time();


crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crime_df = spark.read.csv(
    crime_paths,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

mocodes_df = crime_df.select(
    explode(
        split(
            regexp_replace(col("MOCODES"), r"[,\t;]", " "),
            " "
        )
    ).alias("MOCODE")
).filter(
    col("MOCODE").isNotNull() & (trim(col("MOCODE")) != "")
)

mocode_counts_df = mocodes_df.groupBy("MOCODE").count().withColumnRenamed("count", "total_count")

mocode_raw = spark.read.text(mo_path)

mo_codes = (
    mocode_raw
    .withColumn("MOCODE", split(col("value"), " ")[0])
    # Extract Description as before
    .withColumn("Description", trim(regexp_replace(col("value"), r"^\S+\s+", "")))
    
    # Filter out rows where MOCODE is null or an empty string
    .filter(col("MOCODE").isNotNull()) 
    .filter(col("MOCODE") != "") 
    
    # Handle the Description edge case (if you still want "Unknown" for a missing description)
    .withColumn("Description", when(col("Description") == "", lit("Unknown")).otherwise(col("Description")))
    
    # Select and Deduplicate as before
    .select("MOCODE", "Description")
    .dropDuplicates(["MOCODE"])
)

# --- Join with Merge ---
joined_df = mocode_counts_df.join(mo_codes.hint("merge"), on="MOCODE", how="inner")
mocode_with_desc_df = joined_df.orderBy(col("total_count").desc())
mocode_with_desc_df.count()
elapsed = time.time() - start_time
print(f"MERGE join time: {elapsed:.4f} sec")
print("MERGE Join Top 20:")
mocode_with_desc_df.show(20, truncate=False)
mocode_with_desc_df.explain()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

MERGE join time: 18.3418 sec
MERGE Join Top 20:
+------+-----------+--------------------------------------------------------------------------------+
|MOCODE|total_count|Description                                                                     |
+------+-----------+--------------------------------------------------------------------------------+
|0344  |1002900    |Removes vict property                                                           |
|1822  |548422     |Stranger                                                                        |
|0416  |404773     |Hit-Hit w/ weapon                                                               |
|0329  |377536     |Vandalized                                                                      |
|0913  |278618     |Victim knew Suspect                                                             |
|2000  |256188     |Domestic violence                                                               |
|1300  |219082     |Vehicle involv

In [3]:

# --- Setup & Read Data ---

start_time=time.time();


crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])


crime_df = spark.read.csv(
    crime_paths,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

mocodes_df = crime_df.select(
    explode(
        split(
            regexp_replace(col("MOCODES"), r"[,\t;]", " "),
            " "
        )
    ).alias("MOCODE")
).filter(
    col("MOCODE").isNotNull() & (trim(col("MOCODE")) != "")
)

mocode_counts_df = mocodes_df.groupBy("MOCODE").count().withColumnRenamed("count", "total_count")

mocode_raw = spark.read.text(mo_path)

mo_codes = (
    mocode_raw
    .withColumn("MOCODE", split(col("value"), " ")[0])
    # Extract Description as before
    .withColumn("Description", trim(regexp_replace(col("value"), r"^\S+\s+", "")))
    
    .filter(col("MOCODE").isNotNull()) 
    .filter(col("MOCODE") != "") 
    
    # Handle the Description edge case (if you still want "Unknown" for a missing description)
    .withColumn("Description", when(col("Description") == "", lit("Unknown")).otherwise(col("Description")))
    
    # Select and Deduplicate as before
    .select("MOCODE", "Description")
    .dropDuplicates(["MOCODE"])
)
# --- Join with Shuffle Hash ---
joined_df = mocode_counts_df.join(mo_codes.hint("shuffle_hash"), on="MOCODE", how="inner")
mocode_with_desc_df = joined_df.orderBy(col("total_count").desc())
mocode_with_desc_df.count()

elapsed = time.time() - start_time
print("SHUFFLE_HASH Join Top 20:")
mocode_with_desc_df.show(20, truncate=False)
print(f"SHUFFLE_HASH join time: {elapsed:.4f} sec")
mocode_with_desc_df.explain()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SHUFFLE_HASH Join Top 20:
+------+-----------+--------------------------------------------------------------------------------+
|MOCODE|total_count|Description                                                                     |
+------+-----------+--------------------------------------------------------------------------------+
|0344  |1002900    |Removes vict property                                                           |
|1822  |548422     |Stranger                                                                        |
|0416  |404773     |Hit-Hit w/ weapon                                                               |
|0329  |377536     |Vandalized                                                                      |
|0913  |278618     |Victim knew Suspect                                                             |
|2000  |256188     |Domestic violence                                                               |
|1300  |219082     |Vehicle involved                    

In [4]:
# --- Setup & Read Data ---

start_time=time.time();

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crime_df = spark.read.csv(
    crime_paths,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

mocodes_df = crime_df.select(
    explode(
        split(
            regexp_replace(col("MOCODES"), r"[,\t;]", " "),
            " "
        )
    ).alias("MOCODE")
).filter(
    col("MOCODE").isNotNull() & (trim(col("MOCODE")) != "")
)

mocode_counts_df = mocodes_df.groupBy("MOCODE").count().withColumnRenamed("count", "total_count")

mocode_raw = spark.read.text(mo_path)

mo_codes = (
    mocode_raw
    .withColumn("MOCODE", split(col("value"), " ")[0])
    # Extract Description as before
    .withColumn("Description", trim(regexp_replace(col("value"), r"^\S+\s+", "")))
    
    .filter(col("MOCODE").isNotNull()) 
    .filter(col("MOCODE") != "") 
    
    # Handle the Description edge case (if you still want "Unknown" for a missing description)
    .withColumn("Description", when(col("Description") == "", lit("Unknown")).otherwise(col("Description")))
    
    # Select and Deduplicate as before
    .select("MOCODE", "Description")
    .dropDuplicates(["MOCODE"])
)

# --- Join with Shuffle Replicate NL ---
joined_df = mocode_counts_df.join(mo_codes.hint("shuffle_replicate_nl"), on="MOCODE", how="inner")
mocode_with_desc_df = joined_df.orderBy(col("total_count").desc())
mocode_with_desc_df.count()
elapsed = time.time() - start_time
print("SHUFFLE_REPLICATE_NL Join Top 20:")
mocode_with_desc_df.show(20, truncate=False)
print(f"SHUFFLE_REPLICATE_NL join time: {elapsed:.4f} sec")
mocode_with_desc_df.explain()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SHUFFLE_REPLICATE_NL Join Top 20:
+------+-----------+--------------------------------------------------------------------------------+
|MOCODE|total_count|Description                                                                     |
+------+-----------+--------------------------------------------------------------------------------+
|0344  |1002900    |Removes vict property                                                           |
|1822  |548422     |Stranger                                                                        |
|0416  |404773     |Hit-Hit w/ weapon                                                               |
|0329  |377536     |Vandalized                                                                      |
|0913  |278618     |Victim knew Suspect                                                             |
|2000  |256188     |Domestic violence                                                               |
|1300  |219082     |Vehicle involved            

In [3]:
import time
import csv
from io import StringIO
from pyspark import SparkContext

start = time.time()

# Read CSV files as RDD ,every line is a string
crime_rdd = sc.textFile(
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv," +
    "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
)

# CSV line parser,handle commas,quotes,escapes
def parse_csv_line(line):
    return next(csv.reader([line]))  #iterator

#each string a list of fields
parsed_rdd = crime_rdd.map(parse_csv_line)

#  Extract header and name for column and dictionary to refer to its column with its name
header = parsed_rdd.first()
col_index = {name: idx for idx, name in enumerate(header)}

# Now you can do: col_index["Mocodes"]

# Remove header row 
data_rdd = parsed_rdd.filter(lambda row: row != header)

#  Extract and explode MOCODES 
cleaned_data_rdd = data_rdd.filter(
    lambda row: (
        len(row) > col_index["Mocodes"] and   #ruined columns
        row[col_index["Mocodes"]] is not None and
        row[col_index["Mocodes"]].strip() != "" and  #after removing blanksandwhitespaces if there are still blanks
        row[col_index["Mocodes"]].strip().upper() != "NULL"
    )
)
#flatmap:each mocode an element of the RDD

mocode_rdd = cleaned_data_rdd.flatMap(
    lambda row: [
        code.strip() 
        for code in row[col_index["Mocodes"]].split()  # split for multiple codes
        if code.strip() != ""  # απορρίπτουμε κενά στοιχεία
    ]
)


#  Load descriptions 
mo_dict_rdd = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")
mo_dict = (
    mo_dict_rdd
    .map(lambda line: line.strip())
    .filter(lambda line: line != "")
    .map(lambda line: line.split(" ", 1))  #seperate the line in the first space, to have mocode and its description
    .filter(lambda parts: len(parts) == 2)  #avoid ruined lines with no description
    .map(lambda parts: (parts[0].strip(), parts[1].strip()))  #tuple:(mocode,description)
    .collectAsMap() #collect RDD as Python dictionary
)

# Count occurrences , each mocode is a tuple(mocode,1) 
mocode_counts = mocode_rdd.map(lambda code: (code, 1)).reduceByKey(lambda a, b: a + b)

#  Attach descriptions 
mocode_with_desc = mocode_counts.map(
    lambda x: (x[0], mo_dict.get(x[0], "Unknown"), x[1])
)
sorted_rdd = mocode_with_desc.sortBy(lambda x: x[2], ascending=False)
# Trigger execution
total = sorted_rdd.count()
end = time.time()
top20 = sorted_rdd.take(20)
# Show results
for code, desc, count in top20:
    print(f"{code} - {desc}: {count}")

print(f"\nTotal rows: {total}, Time = {end - start:.2f} sec\n")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0344 - Removes vict property: 1002900
1822 - Stranger: 548422
0416 - Hit-Hit w/ weapon: 404773
0329 - Vandalized: 377536
0913 - Victim knew Suspect: 278618
2000 - Domestic violence: 256188
1300 - Vehicle involved: 219082
0400 - Force used: 213165
1402 - Evidence Booked (any crime): 177470
1609 - Smashed: 131229
1309 - Susp uses vehicle: 122108
1202 - Victim was aged (60 & over) or blind/physically disabled/unable to care for self: 120238
0325 - Took merchandise: 120159
1814 - Susp is/was current/former boyfriend/girlfriend: 118073
0444 - Pushed: 116763
1501 - Other MO (see rpt): 115589
1307 - Breaks window: 113609
0334 - Brandishes weapon: 105665
2004 - Suspect is homeless/transient: 93426
0432 - Intimidation: 83562

Total rows: 774, Time = 30.16 sec

Query 4

1 core, 2 GB memory

In [7]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1527,application_1765289937462_1513,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1495,application_1765289937462_1481,pyspark,idle,Link,Link,,
1512,application_1765289937462_1498,pyspark,idle,Link,Link,,
1517,application_1765289937462_1503,pyspark,idle,Link,Link,,
1522,application_1765289937462_1508,pyspark,idle,Link,Link,,
1524,application_1765289937462_1510,pyspark,idle,Link,Link,,
1527,application_1765289937462_1513,pyspark,idle,Link,Link,,✔
1528,application_1765289937462_1514,pyspark,starting,Link,Link,,


In [8]:
# Paths for csv
fcrime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
fcrime_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
fstations = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv"
fincome = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcodes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv"

# Paths for GeoJSON
fgeo = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fgeofields = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020_fields.csv"

# Imports again

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import year, when, count, sum, col, row_number, to_timestamp, regexp_replace, lit, expr, avg, round
from pyspark.sql.window import Window
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# Crimes table

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df_10_19 = spark.read.csv(fcrime_2010_2019, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')
crimes_df_20_present = spark.read.csv(fcrime_2020_present, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

crimes_df_all = crimes_df_10_19.union(crimes_df_20_present)
print(f"total lines: {crimes_df_all.count()}")

stations_schema = StructType([
    StructField("X", FloatType()),
    StructField("Y", FloatType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])

stations_df = spark.read.csv(fstations, header=True, schema=stations_schema)

sedona = SedonaContext.create(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

total lines: 3138128

In [9]:
start_time = time.time()

# Δημιουργία ID για το partition (χρήσιμο για το window function)
crimes_df_prep = crimes_df_all.withColumn("crime_id", expr("uuid()"))

crimes_filtered = crimes_df_prep.filter(
    ~((col("LAT") == 0.0) & (col("LON") == 0.0) | col("LAT").isNull() | col("LON").isNull())
)

# Μετατροπή των συντεταγμένων σε Sedona Point Geometry
crimes_with_geom = crimes_filtered.withColumn(
    "crime_geom",
    ST_Point(col("LON"), col("LAT"))
)

pd_with_geom = stations_df.withColumn(
    "pd_geom",
    ST_Point(col("X"), col("Y"))
).select(
    col("DIVISION").alias("division"),
    col("pd_geom")
)

# Κάθε έγκλημα συνδέεται με κάθε αστυνομικό τμήμα
cross_joined_df = crimes_with_geom.crossJoin(pd_with_geom)

# Υπολογισμός της απόστασης 
df_with_distance = cross_joined_df.withColumn(
    "distance",
    (ST_DistanceSphere(col("crime_geom"), col("pd_geom"))/1000)
)

# Διαμέριση ανά crime_id και ταξινόμηση ανά distance
window_spec = Window.partitionBy("crime_id").orderBy(col("distance").asc())

df_ranked = df_with_distance.withColumn(
    "rank",
    row_number().over(window_spec)
)

closest_crimes_df = df_ranked.filter(col("rank") == 1)

# Τελικός υπολογισμός: Αριθμός εγκλημάτων και μέση απόσταση ανά τμήμα
final_result_df = closest_crimes_df.groupBy(col("division")).agg(
    count("*").alias("#"),
    round(avg(col("distance")), 3).alias("average_distance")
)

final_sorted_result = final_result_df.orderBy(col("#").desc())

print("\n--- Spark Explain Output ---")
final_sorted_result.explain(True)

final_sorted_result.show(final_sorted_result.count(), truncate=False)

end_time = time.time()
time = end_time - start_time
print(f"Execution time : {time:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- Spark Explain Output ---
== Parsed Logical Plan ==
'Sort ['# DESC NULLS LAST], true
+- Aggregate [division#279], [division#279, count(1) AS ##419L, round(avg(distance#314), 3) AS average_distance#421]
   +- Filter (rank#349 = 1)
      +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 10 more fields]
         +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 11 more fields]
            +- Window [row_number() windowspecdefinition(crime_id#209, distance#314 ASC NULLS FIRST, sp

 2 cores, 4GB memory

In [35]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "2",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
595,application_1765289937462_0588,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
550,application_1765289937462_0543,pyspark,idle,Link,Link,,
558,application_1765289937462_0551,pyspark,idle,Link,Link,,
571,application_1765289937462_0564,pyspark,idle,Link,Link,,
573,application_1765289937462_0566,pyspark,idle,Link,Link,,
575,application_1765289937462_0568,pyspark,idle,Link,Link,,
576,application_1765289937462_0569,pyspark,idle,Link,Link,,
584,application_1765289937462_0577,pyspark,idle,Link,Link,,
589,application_1765289937462_0582,pyspark,idle,Link,Link,,
592,application_1765289937462_0585,pyspark,busy,Link,Link,,
593,application_1765289937462_0586,pyspark,idle,Link,Link,,


In [36]:
# Paths for csv
fcrime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
fcrime_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
fstations = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv"
fincome = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcodes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv"

# Paths for GeoJSON
fgeo = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fgeofields = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020_fields.csv"

# Imports again

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import year, when, count, sum, col, row_number, to_timestamp, regexp_replace, lit, expr, avg, round
from pyspark.sql.window import Window
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# Crimes table

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df_10_19 = spark.read.csv(fcrime_2010_2019, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')
crimes_df_20_present = spark.read.csv(fcrime_2020_present, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

crimes_df_all = crimes_df_10_19.union(crimes_df_20_present)
print(f"total lines: {crimes_df_all.count()}")

stations_schema = StructType([
    StructField("X", FloatType()),
    StructField("Y", FloatType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])

stations_df = spark.read.csv(fstations, header=True, schema=stations_schema)

sedona = SedonaContext.create(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

total lines: 3138128

In [37]:
start_time = time.time()

# Δημιουργία ID για το partition (χρήσιμο για το window function)
crimes_df_prep = crimes_df_all.withColumn("crime_id", expr("uuid()"))

crimes_filtered = crimes_df_prep.filter(
    ~((col("LAT") == 0.0) & (col("LON") == 0.0) | col("LAT").isNull() | col("LON").isNull())
)

# Μετατροπή των συντεταγμένων σε Sedona Point Geometry
crimes_with_geom = crimes_filtered.withColumn(
    "crime_geom",
    ST_Point(col("LON"), col("LAT"))
)

pd_with_geom = stations_df.withColumn(
    "pd_geom",
    ST_Point(col("X"), col("Y"))
).select(
    col("DIVISION").alias("division"),
    col("pd_geom")
)

# Κάθε έγκλημα συνδέεται με κάθε αστυνομικό τμήμα
cross_joined_df = crimes_with_geom.crossJoin(pd_with_geom)

# Υπολογισμός της απόστασης 
df_with_distance = cross_joined_df.withColumn(
    "distance",
    (ST_DistanceSphere(col("crime_geom"), col("pd_geom"))/1000)
)

# Διαμέριση ανά crime_id και ταξινόμηση ανά distance
window_spec = Window.partitionBy("crime_id").orderBy(col("distance").asc())

df_ranked = df_with_distance.withColumn(
    "rank",
    row_number().over(window_spec)
)

closest_crimes_df = df_ranked.filter(col("rank") == 1)

# Τελικός υπολογισμός: Αριθμός εγκλημάτων και μέση απόσταση ανά τμήμα
final_result_df = closest_crimes_df.groupBy(col("division")).agg(
    count("*").alias("#"),
    round(avg(col("distance")), 3).alias("average_distance")
)

final_sorted_result = final_result_df.orderBy(col("#").desc())

print("\n--- Spark Explain Output ---")
final_sorted_result.explain(True)

final_sorted_result.show(final_sorted_result.count(), truncate=False)

end_time = time.time()
time = end_time - start_time
print(f"Execution time : {time:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- Spark Explain Output ---
== Parsed Logical Plan ==
'Sort ['# DESC NULLS LAST], true
+- Aggregate [division#279], [division#279, count(1) AS ##419L, round(avg(distance#314), 3) AS average_distance#421]
   +- Filter (rank#349 = 1)
      +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 10 more fields]
         +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 11 more fields]
            +- Window [row_number() windowspecdefinition(crime_id#209, distance#314 ASC NULLS FIRST, sp

4 cores, 8GB memory

In [10]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1535,application_1765289937462_1521,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1512,application_1765289937462_1498,pyspark,idle,Link,Link,,
1517,application_1765289937462_1503,pyspark,idle,Link,Link,,
1522,application_1765289937462_1508,pyspark,idle,Link,Link,,
1531,application_1765289937462_1517,pyspark,idle,Link,Link,,
1532,application_1765289937462_1518,pyspark,idle,Link,Link,,
1534,application_1765289937462_1520,pyspark,idle,Link,Link,,
1535,application_1765289937462_1521,pyspark,idle,Link,Link,,✔


In [11]:
# Paths for csv
fcrime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
fcrime_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"
fstations = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv"
fincome = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcodes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv"

# Paths for GeoJSON
fgeo = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fgeofields = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020_fields.csv"

# Imports again

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import year, when, count, sum, col, row_number, to_timestamp, regexp_replace, lit, expr, avg, round
from pyspark.sql.window import Window
from sedona.spark import *
from pyspark.sql import SparkSession
import time

# Crimes table

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df_10_19 = spark.read.csv(fcrime_2010_2019, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')
crimes_df_20_present = spark.read.csv(fcrime_2020_present, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

crimes_df_all = crimes_df_10_19.union(crimes_df_20_present)
print(f"total lines: {crimes_df_all.count()}")

stations_schema = StructType([
    StructField("X", FloatType()),
    StructField("Y", FloatType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])

stations_df = spark.read.csv(fstations, header=True, schema=stations_schema)

sedona = SedonaContext.create(spark)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

total lines: 3138128

In [12]:
start_time = time.time()

# Δημιουργία ID για το partition (χρήσιμο για το window function)
crimes_df_prep = crimes_df_all.withColumn("crime_id", expr("uuid()"))

crimes_filtered = crimes_df_prep.filter(
    ~((col("LAT") == 0.0) & (col("LON") == 0.0) | col("LAT").isNull() | col("LON").isNull())
)

# Μετατροπή των συντεταγμένων σε Sedona Point Geometry
crimes_with_geom = crimes_filtered.withColumn(
    "crime_geom",
    ST_Point(col("LON"), col("LAT"))
)

pd_with_geom = stations_df.withColumn(
    "pd_geom",
    ST_Point(col("X"), col("Y"))
).select(
    col("DIVISION").alias("division"),
    col("pd_geom")
)

# Κάθε έγκλημα συνδέεται με κάθε αστυνομικό τμήμα
cross_joined_df = crimes_with_geom.crossJoin(pd_with_geom)

# Υπολογισμός της απόστασης 
df_with_distance = cross_joined_df.withColumn(
    "distance",
    (ST_DistanceSphere(col("crime_geom"), col("pd_geom"))/1000)
)

# Διαμέριση ανά crime_id και ταξινόμηση ανά distance
window_spec = Window.partitionBy("crime_id").orderBy(col("distance").asc())

df_ranked = df_with_distance.withColumn(
    "rank",
    row_number().over(window_spec)
)

closest_crimes_df = df_ranked.filter(col("rank") == 1)

# Τελικός υπολογισμός: Αριθμός εγκλημάτων και μέση απόσταση ανά τμήμα
final_result_df = closest_crimes_df.groupBy(col("division")).agg(
    count("*").alias("#"),
    round(avg(col("distance")), 3).alias("average_distance")
)

final_sorted_result = final_result_df.orderBy(col("#").desc())

print("\n--- Spark Explain Output ---")
final_sorted_result.explain(True)

final_sorted_result.show(final_sorted_result.count(), truncate=False)

end_time = time.time()
time = end_time - start_time
print(f"Execution time : {time:.4f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


--- Spark Explain Output ---
== Parsed Logical Plan ==
'Sort ['# DESC NULLS LAST], true
+- Aggregate [division#279], [division#279, count(1) AS ##419L, round(avg(distance#314), 3) AS average_distance#421]
   +- Filter (rank#349 = 1)
      +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 10 more fields]
         +- Project [DR_NO#0, DateRptd#1, DATEOCC#2, TIMEOCC#3, AREA#4, AREANAME#5, RptDistNo#6, Part#7, CrmCd#8, Crm Cd Desc#9, Mocodes#10, Vict Age#11, VictSex#12, VictDescent#13, PremisCd#14, PremisDesc#15, WeaponUsedCd#16, WeaponDesc#17, Status#18, Status Desc#19, CrmCd1#20, CrmCd2#21, CrmCd3#22, CrmCd4#23, ... 11 more fields]
            +- Window [row_number() windowspecdefinition(crime_id#209, distance#314 ASC NULLS FIRST, sp

In [1]:
from sedona.spark import *

spark = spark.newSession().builder \
    .appName("Query 5 ") \
    .config('spark.executor.instances', '2') \
    .config('spark.executor.cores', '8') \
    .config('spark.executor.memory', '4g') \
    .getOrCreate()

# Initialize SedonaContext
sedona = SedonaContext.create(spark)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1563,application_1765289937462_1549,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField, StructType, StringType, IntegerType, 
    FloatType, DoubleType, DateType, TimestampType  
)
from pyspark.sql.functions import (
    year, when, count, sum, col, row_number, 
    to_timestamp, regexp_replace, to_date, expr,avg,broadcast,first    
)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, count, expr, coalesce, lit, when,explode,collect_set
from pyspark.sql.window import Window
from pyspark.sql.functions import  to_date, year
import time
from pyspark.sql.functions import col, regexp_replace, trim


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
fgeo="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fincome="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcrime="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", TimestampType()),
    StructField("DATEOCC", TimestampType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),
])




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:

start_time = time.time()


#Prepare the geojson file
blocks_df = sedona.read.format('geojson') \
    .option('multiLine','true').load(fgeo) \
    .selectExpr('explode(features) as features') \
    .select('features.*')

flattened_df = blocks_df.select(
    [col(f'properties.{col_name}').alias(col_name) for col_name in \
    blocks_df.schema['properties'].dataType.fieldNames()] + ['geometry']) \
    .drop('properties').drop('type')

la_comm_df = flattened_df.filter(col("CITY") == "Los Angeles") \
    .groupBy("COMM") \
    .agg(
        expr("ST_Union_Aggr(geometry)").alias("geometry"),
        first("POP20").alias("TotalPopulation")
    ) \
    .filter(
        (col("COMM").isNotNull()) &
        (col("COMM") != "") &
        (col("COMM") != "NULL") &
        (col("TotalPopulation") > 0)
    )


zip_codes_comm = flattened_df.filter(col("CITY") == "Los Angeles") \
    .select("ZCTA20", "COMM") \
    .filter(
        col("ZCTA20").isNotNull() &
        (col("ZCTA20") != "") &
        (col("ZCTA20") != "NULL")
    ) \
    .dropDuplicates()



# 
income_df = spark.read.csv(
    fincome,
    header=True,
    sep=';',
    quote='"',
    escape='"',
    inferSchema=False
)

# Trim the Zip Code column and rename
income_df = income_df.withColumn("ZipCode", trim(col("Zip Code")))

# 3️ Keep only rows where Community mentions Los Angeles
income_df = income_df.filter(col("Community").rlike("(?i)Los Angeles"))

# Clean Income column
income_df = income_df.withColumn(
    "Income",
    regexp_replace(col("Estimated Median Income"), r"[$,]", "").cast("float")
)

from pyspark.sql.functions import col

income_df = income_df.filter(
    col("ZipCode").isNotNull() &       # ZipCode not NULL
    (col("ZipCode") != "") &           # ZipCode not empty
    (col("ZipCode") != "NULL") &       # ZipCode not the string "NULL"
    col("Income").isNotNull()          # Income not NULL
)

#  Keep only ZipCode and Income column
income_df = income_df.select("ZipCode", "Income")

income_with_comm = income_df.join(
    zip_codes_comm,
    income_df.ZipCode == zip_codes_comm.ZCTA20,
    "inner"
)

#in each COMM there are nany ZipCodes with different Income so find average income per comm
income_per_comm = income_with_comm.groupBy("COMM") \
    .agg(F.avg("Income").alias("avg_income"))

#Crimes
crimes_df = spark.read.csv(
    fcrime,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

#  Cast Lat/Lon to Double  and Filter Zeros
crimes_df = crimes_df.filter((col('LAT') != 0.0) & (col('LON') != 0.0))

# Extract Year
crimes_df = crimes_df.withColumn("year", year(col("DATEOCC")))

# Filter for years 2020 and 2021
crimes_df = crimes_df.filter(col("year").isin([2020, 2021]))

#  Create Geometry (Requires Apache Sedona)
#  ST_Point takes (Longitude, Latitude) -> (X, Y)
crimes_geo_df = crimes_df.withColumn("crime_point", expr("ST_Point(LON, LAT)"))

# Count crimes per community per year,schema with comm,year,pop,crimecounts
crime_per_comm_year = (
    crimes_geo_df
    .join(
        la_comm_df.select("COMM", "geometry", "TotalPopulation"),
        expr("ST_Within(crime_point, geometry)"),
        "inner"
    )
    .groupBy("COMM", "year", "TotalPopulation")
    .agg(F.count("*").alias("annual_crime_count"))
)
#to avoid probems with nans set 0 for the comms where no crimes happened
crime_per_comm_year = crime_per_comm_year.fillna({"annual_crime_count": 0})


# Crime rate per person per year
crime_rate_ = crime_per_comm_year.withColumn(
    "crime_rate_per_person",
    col("annual_crime_count") / col("TotalPopulation")
)

# Annual average crime rate per person
crime_per_comm = (
    crime_rate_
    .groupBy("COMM")   #crime rate per person for each community
    .agg(
        F.avg("crime_rate_per_person").alias("annual_avg_crime_rate_per_person")  #average for all the years 2020,2021
    )
)


final_df = (
    crime_per_comm   # COMM, annual_avg_crime_rate_per_person
    .join(
        income_per_comm,   # COMM, avg_income
        on="COMM",
        how="inner"
    )
)
# All areas
total_corr = final_df.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Top 10 richest areas
top10 = final_df.orderBy(col("avg_income").desc()).limit(10)
top10_corr = top10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Bottom 10 poorest areas
bottom10 = final_df.orderBy(col("avg_income").asc()).limit(10)
bottom10_corr = bottom10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")


elapsed = time.time() - start_time


# Show results

print("Total correlation:", total_corr)
print("Top 10 richest COMM correlation:", top10_corr)
print("Bottom 10 poorest COMM correlation:", bottom10_corr)
print(f"Elapsed time: {elapsed:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total correlation: 0.15027668303196612
Top 10 richest COMM correlation: -0.3735207290423716
Bottom 10 poorest COMM correlation: 0.018338409553787367
Elapsed time: 58.88 seconds

In [20]:
income_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('Income) AS avg_income#2259]
+- Join Inner, (ZipCode#2232 = ZCTA20#2037)
   :- Project [ZipCode#2232, Income#2238]
   :  +- Filter (((isnotnull(ZipCode#2232) AND NOT (ZipCode#2232 = )) AND NOT (ZipCode#2232 = NULL)) AND isnotnull(Income#2238))
   :     +- Project [Zip Code#2226, Community#2227, Estimated Median Income#2228, ZipCode#2232, cast(regexp_replace(Estimated Median Income#2228, [$,], , 1) as float) AS Income#2238]
   :        +- Filter RLIKE(Community#2227, (?i)Los Angeles)
   :           +- Project [Zip Code#2226, Community#2227, Estimated Median Income#2228, trim(Zip Code#2226, None) AS ZipCode#2232]
   :              +- Relation [Zip Code#2226,Community#2227,Estimated Median Income#2228] csv
   +- Deduplicate [ZCTA20#2037, COMM#2017]
      +- Filter ((isnotnull(ZCTA20#2037) AND NOT (ZCTA20#2037 = )) AND NOT (ZCTA20#2037 = NULL))
         +- Project [ZCTA20#2037, COMM#2017]
            +- Filter (CITY#2013 = Los Angel

In [24]:
crime_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('crime_rate_per_person) AS annual_avg_crime_rate_per_person#2509]
+- Project [COMM#2017, year#2319, TotalPopulation#2189L, annual_crime_count#2492L, (cast(annual_crime_count#2492L as double) / cast(TotalPopulation#2189L as double)) AS crime_rate_per_person#2497]
   +- Project [COMM#2017, year#2319, TotalPopulation#2189L, coalesce(annual_crime_count#2483L, cast(0 as bigint)) AS annual_crime_count#2492L]
      +- Aggregate [COMM#2017, year#2319, TotalPopulation#2189L], [COMM#2017, year#2319, TotalPopulation#2189L, count(1) AS annual_crime_count#2483L]
         +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
            :- Project [DR_NO#2262, DateRptd#2263, DATEOCC#2264, TIMEOCC#2265, AREA#2266, AREANAME#2267, RptDistNo#2268, Part#2269, CrmCd#2270, Crm Cd Desc#2271, Mocodes#2272, Vict Age#2273, VictSex#2274, VictDescent#2275, PremisCd#2276, PremisDesc#2277, WeaponUsedCd#2278, WeaponDesc#2279, Status#2280,

In [25]:
final_df.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [COMM])
:- Aggregate [COMM#2017], [COMM#2017, avg(crime_rate_per_person#2497) AS annual_avg_crime_rate_per_person#2509]
:  +- Project [COMM#2017, year#2319, TotalPopulation#2189L, annual_crime_count#2492L, (cast(annual_crime_count#2492L as double) / cast(TotalPopulation#2189L as double)) AS crime_rate_per_person#2497]
:     +- Project [COMM#2017, year#2319, TotalPopulation#2189L, coalesce(annual_crime_count#2483L, cast(0 as bigint)) AS annual_crime_count#2492L]
:        +- Aggregate [COMM#2017, year#2319, TotalPopulation#2189L], [COMM#2017, year#2319, TotalPopulation#2189L, count(1) AS annual_crime_count#2483L]
:           +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
:              :- Project [DR_NO#2262, DateRptd#2263, DATEOCC#2264, TIMEOCC#2265, AREA#2266, AREANAME#2267, RptDistNo#2268, Part#2269, CrmCd#2270, Crm Cd Desc#2271, Mocodes#2272, Vict Age#2273, VictSex#2274, VictDescent#2275, PremisCd#2276, Premi

In [26]:
# Explain the final join
final_df.explain(True)
# Show top 20 rows
final_df.show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [COMM])
:- Aggregate [COMM#2017], [COMM#2017, avg(crime_rate_per_person#2497) AS annual_avg_crime_rate_per_person#2509]
:  +- Project [COMM#2017, year#2319, TotalPopulation#2189L, annual_crime_count#2492L, (cast(annual_crime_count#2492L as double) / cast(TotalPopulation#2189L as double)) AS crime_rate_per_person#2497]
:     +- Project [COMM#2017, year#2319, TotalPopulation#2189L, coalesce(annual_crime_count#2483L, cast(0 as bigint)) AS annual_crime_count#2492L]
:        +- Aggregate [COMM#2017, year#2319, TotalPopulation#2189L], [COMM#2017, year#2319, TotalPopulation#2189L, count(1) AS annual_crime_count#2483L]
:           +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
:              :- Project [DR_NO#2262, DateRptd#2263, DATEOCC#2264, TIMEOCC#2265, AREA#2266, AREANAME#2267, RptDistNo#2268, Part#2269, CrmCd#2270, Crm Cd Desc#2271, Mocodes#2272, Vict Age#2273, VictSex#2274, VictDescent#2275, PremisCd#2276, Premi

In [86]:
blocks_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- geometry: geometry (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- BG20: string (nullable = true)
 |    |-- BG20FIP_CURRENT: string (nullable = true)
 |    |-- BGFIP20: string (nullable = true)
 |    |-- CB20: string (nullable = true)
 |    |-- CITY: string (nullable = true)
 |    |-- CITYCOMM: string (nullable = true)
 |    |-- CITYCOMM_CURRENT: string (nullable = true)
 |    |-- CITY_CURRENT: string (nullable = true)
 |    |-- COMM: string (nullable = true)
 |    |-- COMM_CURRENT: string (nullable = true)
 |    |-- COUNTY: string (nullable = true)
 |    |-- CT20: string (nullable = true)
 |    |-- CTCB20: string (nullable = true)
 |    |-- FEAT_TYPE: string (nullable = true)
 |    |-- FIP20: string (nullable = true)
 |    |-- FIP_CURRENT: string (nullable = true)
 |    |-- HD22: long (nullable = true)
 |    |-- HD_NAME: string (nullable = true)
 |    |-- HOUSING20: long (nullable = true)
 |    |-- OBJECTID: long (nullable = true)
 |    |-- POP20: long 

In [88]:
flattened_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG20: string (nullable = true)
 |-- BG20FIP_CURRENT: string (nullable = true)
 |-- BGFIP20: string (nullable = true)
 |-- CB20: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOMM: string (nullable = true)
 |-- CITYCOMM_CURRENT: string (nullable = true)
 |-- CITY_CURRENT: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- COMM_CURRENT: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- CT20: string (nullable = true)
 |-- CTCB20: string (nullable = true)
 |-- FEAT_TYPE: string (nullable = true)
 |-- FIP20: string (nullable = true)
 |-- FIP_CURRENT: string (nullable = true)
 |-- HD22: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING20: long (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP20: long (nullable = true)
 |-- SPA22: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP21: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: 

In [90]:
la_flattened_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG20: string (nullable = true)
 |-- BG20FIP_CURRENT: string (nullable = true)
 |-- BGFIP20: string (nullable = true)
 |-- CB20: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOMM: string (nullable = true)
 |-- CITYCOMM_CURRENT: string (nullable = true)
 |-- CITY_CURRENT: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- COMM_CURRENT: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- CT20: string (nullable = true)
 |-- CTCB20: string (nullable = true)
 |-- FEAT_TYPE: string (nullable = true)
 |-- FIP20: string (nullable = true)
 |-- FIP_CURRENT: string (nullable = true)
 |-- HD22: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING20: long (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP20: long (nullable = true)
 |-- SPA22: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP21: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: 

In [92]:
income_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ZipCode: string (nullable = true)
 |-- Income: float (nullable = true)

In [97]:
crimes_df.printSchema()
crimes_geo_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DR_NO: string (nullable = true)
 |-- DateRptd: timestamp (nullable = true)
 |-- DATEOCC: timestamp (nullable = true)
 |-- TIMEOCC: string (nullable = true)
 |-- AREA: string (nullable = true)
 |-- AREANAME: string (nullable = true)
 |-- RptDistNo: string (nullable = true)
 |-- Part: integer (nullable = true)
 |-- CrmCd: string (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: string (nullable = true)
 |-- VictSex: string (nullable = true)
 |-- VictDescent: string (nullable = true)
 |-- PremisCd: string (nullable = true)
 |-- PremisDesc: string (nullable = true)
 |-- WeaponUsedCd: string (nullable = true)
 |-- WeaponDesc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- CrmCd1: string (nullable = true)
 |-- CrmCd2: string (nullable = true)
 |-- CrmCd3: string (nullable = true)
 |-- CrmCd4: string (nullable = true)
 |-- LOCATION: string (nullable = true

In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.cores": "2",
        "spark.executor.memory": "4g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1562,application_1765289937462_1548,pyspark,idle,Link,Link,,
1564,application_1765289937462_1550,pyspark,idle,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from sedona.spark import SedonaContext

# Create a new Spark session
spark = SparkSession.builder \
    .appName("LA Crime Analysis") \
    .getOrCreate()

#  Initialize Sedona (if using spatial operations)
sedona = SedonaContext.create(spark)


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1570,application_1765289937462_1556,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField, StructType, StringType, IntegerType, 
    FloatType, DoubleType, DateType, TimestampType  
)
from pyspark.sql.functions import (
    year, when, count, sum, col, row_number, 
    to_timestamp, regexp_replace, to_date, expr,avg,broadcast,first    
)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, count, expr, coalesce, lit, when,explode,collect_set
from pyspark.sql.window import Window
from pyspark.sql.functions import  to_date, year
import time
from pyspark.sql.functions import col, regexp_replace, trim
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, year, trim, regexp_replace, first, broadcast

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1569,application_1765289937462_1555,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
fgeo="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fincome="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcrime="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", TimestampType()),
    StructField("DATEOCC", TimestampType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),
])




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, year, trim, regexp_replace, first, broadcast


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:

start_time = time.time()


#Prepare the geojson file
blocks_df = sedona.read.format('geojson') \
    .option('multiLine','true').load(fgeo) \
    .selectExpr('explode(features) as features') \
    .select('features.*')

flattened_df = blocks_df.select(
    [col(f'properties.{col_name}').alias(col_name) for col_name in \
    blocks_df.schema['properties'].dataType.fieldNames()] + ['geometry']) \
    .drop('properties').drop('type')

la_comm_df = flattened_df.filter(col("CITY") == "Los Angeles") \
    .groupBy("COMM") \
    .agg(
        expr("ST_Union_Aggr(geometry)").alias("geometry"),
        first("POP20").alias("TotalPopulation")
    ) \
    .filter(
        (col("COMM").isNotNull()) &
        (col("COMM") != "") &
        (col("COMM") != "NULL") &
        (col("TotalPopulation") > 0)
    )


zip_codes_comm = flattened_df.filter(col("CITY") == "Los Angeles") \
    .select("ZCTA20", "COMM") \
    .filter(
        col("ZCTA20").isNotNull() &
        (col("ZCTA20") != "") &
        (col("ZCTA20") != "NULL")
    ) \
    .dropDuplicates()



#  Read CSV with correct delimiter
income_df = spark.read.csv(
    fincome,
    header=True,
    sep=';',
    quote='"',
    escape='"',
    inferSchema=False
)

# 2️Trim the Zip Code column and rename
income_df = income_df.withColumn("ZipCode", trim(col("Zip Code")))

# 3️ Keep only rows where Community mentions Los Angeles
income_df = income_df.filter(col("Community").rlike("(?i)Los Angeles"))

# Clean Income column
income_df = income_df.withColumn(
    "Income",
    regexp_replace(col("Estimated Median Income"), r"[$,]", "").cast("float")
)

from pyspark.sql.functions import col

income_df = income_df.filter(
    col("ZipCode").isNotNull() &       # ZipCode not NULL
    (col("ZipCode") != "") &           # ZipCode not empty
    (col("ZipCode") != "NULL") &       # ZipCode not the string "NULL"
    col("Income").isNotNull()          # Income not NULL
)

#  Keep only ZipCode and Income column
income_df = income_df.select("ZipCode", "Income")

income_with_comm = income_df.join(
    zip_codes_comm,
    income_df.ZipCode == zip_codes_comm.ZCTA20,
    "inner"
)

income_per_comm = income_with_comm.groupBy("COMM") \
    .agg(F.avg("Income").alias("avg_income"))

#Crimes
crimes_df = spark.read.csv(
    fcrime,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

#  Cast Lat/Lon to Double  and Filter Zeros
crimes_df = crimes_df.filter((col('LAT') != 0.0) & (col('LON') != 0.0))

# Extract Year
crimes_df = crimes_df.withColumn("year", year(col("DATEOCC")))

# Filter for years 2020 and 2021
crimes_df = crimes_df.filter(col("year").isin([2020, 2021]))

#  Create Geometry (Requires Apache Sedona)
#  ST_Point takes (Longitude, Latitude) -> (X, Y)
crimes_geo_df = crimes_df.withColumn("crime_point", expr("ST_Point(LON, LAT)"))

# Count crimes per community per year
crime_per_comm_year = (
    crimes_geo_df
    .join(
        la_comm_df.select("COMM", "geometry", "TotalPopulation"),
        expr("ST_Within(crime_point, geometry)"),
        "inner"
    )
    .groupBy("COMM", "year", "TotalPopulation")
    .agg(F.count("*").alias("annual_crime_count"))
)
crime_per_comm_year = crime_per_comm_year.fillna({"annual_crime_count": 0})


# Crime rate per person per year
crime_rate_ = crime_per_comm_year.withColumn(
    "crime_rate_per_person",
    col("annual_crime_count") / col("TotalPopulation")
)

# Annual average crime rate per person
crime_per_comm = (
    crime_rate_
    .groupBy("COMM")
    .agg(
        F.avg("crime_rate_per_person").alias("annual_avg_crime_rate_per_person")
    )
)


final_df = (
    crime_per_comm   # COMM, annual_avg_crime_rate_per_person
    .join(
        income_per_comm,   # COMM, avg_income
        on="COMM",
        how="inner"
    )
)
# All areas
total_corr = final_df.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Top 10 richest areas
top10 = final_df.orderBy(col("avg_income").desc()).limit(10)
top10_corr = top10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Bottom 10 poorest areas
bottom10 = final_df.orderBy(col("avg_income").asc()).limit(10)
bottom10_corr = bottom10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")


elapsed = time.time() - start_time


# Show results

print("Total correlation:", total_corr)
print("Top 10 richest COMM correlation:", top10_corr)
print("Bottom 10 poorest COMM correlation:", bottom10_corr)
print(f"Elapsed time: {elapsed:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total correlation: 0.1502766830319662
Top 10 richest COMM correlation: -0.3735207290423716
Bottom 10 poorest COMM correlation: 0.018338409553787367
Elapsed time: 77.83 seconds

In [14]:
income_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('Income) AS avg_income#756]
+- Join Inner, (ZipCode#729 = ZCTA20#534)
   :- Project [ZipCode#729, Income#735]
   :  +- Filter (((isnotnull(ZipCode#729) AND NOT (ZipCode#729 = )) AND NOT (ZipCode#729 = NULL)) AND isnotnull(Income#735))
   :     +- Project [Zip Code#723, Community#724, Estimated Median Income#725, ZipCode#729, cast(regexp_replace(Estimated Median Income#725, [$,], , 1) as float) AS Income#735]
   :        +- Filter RLIKE(Community#724, (?i)Los Angeles)
   :           +- Project [Zip Code#723, Community#724, Estimated Median Income#725, trim(Zip Code#723, None) AS ZipCode#729]
   :              +- Relation [Zip Code#723,Community#724,Estimated Median Income#725] csv
   +- Deduplicate [ZCTA20#534, COMM#514]
      +- Filter ((isnotnull(ZCTA20#534) AND NOT (ZCTA20#534 = )) AND NOT (ZCTA20#534 = NULL))
         +- Project [ZCTA20#534, COMM#514]
            +- Filter (CITY#510 = Los Angeles)
               +- Project [p

In [18]:
crime_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('crime_rate_per_person) AS annual_avg_crime_rate_per_person#1006]
+- Project [COMM#514, year#816, TotalPopulation#686L, annual_crime_count#989L, (cast(annual_crime_count#989L as double) / cast(TotalPopulation#686L as double)) AS crime_rate_per_person#994]
   +- Project [COMM#514, year#816, TotalPopulation#686L, coalesce(annual_crime_count#980L, cast(0 as bigint)) AS annual_crime_count#989L]
      +- Aggregate [COMM#514, year#816, TotalPopulation#686L], [COMM#514, year#816, TotalPopulation#686L, count(1) AS annual_crime_count#980L]
         +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
            :- Project [DR_NO#759, DateRptd#760, DATEOCC#761, TIMEOCC#762, AREA#763, AREANAME#764, RptDistNo#765, Part#766, CrmCd#767, Crm Cd Desc#768, Mocodes#769, Vict Age#770, VictSex#771, VictDescent#772, PremisCd#773, PremisDesc#774, WeaponUsedCd#775, WeaponDesc#776, Status#777, Status Desc#778, CrmCd1#779, CrmCd2#7

In [19]:
final_df.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [COMM])
:- Aggregate [COMM#514], [COMM#514, avg(crime_rate_per_person#994) AS annual_avg_crime_rate_per_person#1006]
:  +- Project [COMM#514, year#816, TotalPopulation#686L, annual_crime_count#989L, (cast(annual_crime_count#989L as double) / cast(TotalPopulation#686L as double)) AS crime_rate_per_person#994]
:     +- Project [COMM#514, year#816, TotalPopulation#686L, coalesce(annual_crime_count#980L, cast(0 as bigint)) AS annual_crime_count#989L]
:        +- Aggregate [COMM#514, year#816, TotalPopulation#686L], [COMM#514, year#816, TotalPopulation#686L, count(1) AS annual_crime_count#980L]
:           +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
:              :- Project [DR_NO#759, DateRptd#760, DATEOCC#761, TIMEOCC#762, AREA#763, AREANAME#764, RptDistNo#765, Part#766, CrmCd#767, Crm Cd Desc#768, Mocodes#769, Vict Age#770, VictSex#771, VictDescent#772, PremisCd#773, PremisDesc#774, WeaponUsedCd#775, WeaponDe

In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1564,application_1765289937462_1550,pyspark,idle,Link,Link,,
1571,application_1765289937462_1557,pyspark,idle,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from sedona.spark import SedonaContext

# Create a new Spark session
spark = SparkSession.builder \
    .appName("LA Crime Analysis") \
    .getOrCreate()

#  Initialize Sedona (if using spatial operations)
sedona = SedonaContext.create(spark)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1572,application_1765289937462_1558,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField, StructType, StringType, IntegerType, 
    FloatType, DoubleType, DateType, TimestampType  
)
from pyspark.sql.functions import (
    year, when, count, sum, col, row_number, 
    to_timestamp, regexp_replace, to_date, expr,avg,broadcast,first    
)
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, count, expr, coalesce, lit, when,explode,collect_set
from pyspark.sql.window import Window
from pyspark.sql.functions import  to_date, year
import time
from pyspark.sql.functions import col, regexp_replace, trim
from pyspark.sql import functions as F
from pyspark.sql.functions import col, expr, year, trim, regexp_replace, first, broadcast

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
fgeo="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"
fincome="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"
fcrime="s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", TimestampType()),
    StructField("DATEOCC", TimestampType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),
])



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:

start_time = time.time()


#Prepare the geojson file
blocks_df = sedona.read.format('geojson') \
    .option('multiLine','true').load(fgeo) \
    .selectExpr('explode(features) as features') \
    .select('features.*')

flattened_df = blocks_df.select(
    [col(f'properties.{col_name}').alias(col_name) for col_name in \
    blocks_df.schema['properties'].dataType.fieldNames()] + ['geometry']) \
    .drop('properties').drop('type')

la_comm_df = flattened_df.filter(col("CITY") == "Los Angeles") \
    .groupBy("COMM") \
    .agg(
        expr("ST_Union_Aggr(geometry)").alias("geometry"),
        first("POP20").alias("TotalPopulation")
    ) \
    .filter(
        (col("COMM").isNotNull()) &
        (col("COMM") != "") &
        (col("COMM") != "NULL") &
        (col("TotalPopulation") > 0)
    )


zip_codes_comm = flattened_df.filter(col("CITY") == "Los Angeles") \
    .select("ZCTA20", "COMM") \
    .filter(
        col("ZCTA20").isNotNull() &
        (col("ZCTA20") != "") &
        (col("ZCTA20") != "NULL")
    ) \
    .dropDuplicates()



#  Read CSV with correct delimiter
income_df = spark.read.csv(
    fincome,
    header=True,
    sep=';',
    quote='"',
    escape='"',
    inferSchema=False
)

# 2️Trim the Zip Code column and rename
income_df = income_df.withColumn("ZipCode", trim(col("Zip Code")))

# 3️ Keep only rows where Community mentions Los Angeles
income_df = income_df.filter(col("Community").rlike("(?i)Los Angeles"))

# Clean Income column
income_df = income_df.withColumn(
    "Income",
    regexp_replace(col("Estimated Median Income"), r"[$,]", "").cast("float")
)

from pyspark.sql.functions import col

income_df = income_df.filter(
    col("ZipCode").isNotNull() &       # ZipCode not NULL
    (col("ZipCode") != "") &           # ZipCode not empty
    (col("ZipCode") != "NULL") &       # ZipCode not the string "NULL"
    col("Income").isNotNull()          # Income not NULL
)

#  Keep only ZipCode and Income column
income_df = income_df.select("ZipCode", "Income")

income_with_comm = income_df.join(
    zip_codes_comm,
    income_df.ZipCode == zip_codes_comm.ZCTA20,
    "inner"
)

income_per_comm = income_with_comm.groupBy("COMM") \
    .agg(F.avg("Income").alias("avg_income"))

#Crimes
crimes_df = spark.read.csv(
    fcrime,
    header=True,
    schema=crimes_schema,
    timestampFormat='yyyy MMM dd hh:mm:ss a',
    quote='"',
    escape='"'
)

#  Cast Lat/Lon to Double  and Filter Zeros
crimes_df = crimes_df.filter((col('LAT') != 0.0) & (col('LON') != 0.0))

# Extract Year
crimes_df = crimes_df.withColumn("year", year(col("DATEOCC")))

# Filter for years 2020 and 2021
crimes_df = crimes_df.filter(col("year").isin([2020, 2021]))

#  Create Geometry (Requires Apache Sedona)
#  ST_Point takes (Longitude, Latitude) -> (X, Y)
crimes_geo_df = crimes_df.withColumn("crime_point", expr("ST_Point(LON, LAT)"))

# Count crimes per community per year
crime_per_comm_year = (
    crimes_geo_df
    .join(
        la_comm_df.select("COMM", "geometry", "TotalPopulation"),
        expr("ST_Within(crime_point, geometry)"),
        "inner"
    )
    .groupBy("COMM", "year", "TotalPopulation")
    .agg(F.count("*").alias("annual_crime_count"))
)
crime_per_comm_year = crime_per_comm_year.fillna({"annual_crime_count": 0})


# Crime rate per person per year
crime_rate_ = crime_per_comm_year.withColumn(
    "crime_rate_per_person",
    col("annual_crime_count") / col("TotalPopulation")
)

# Annual average crime rate per person
crime_per_comm = (
    crime_rate_
    .groupBy("COMM")
    .agg(
        F.avg("crime_rate_per_person").alias("annual_avg_crime_rate_per_person")
    )
)


final_df = (
    crime_per_comm   # COMM, annual_avg_crime_rate_per_person
    .join(
        income_per_comm,   # COMM, avg_income
        on="COMM",
        how="inner"
    )
)
# All areas
total_corr = final_df.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Top 10 richest areas
top10 = final_df.orderBy(col("avg_income").desc()).limit(10)
top10_corr = top10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")

# Bottom 10 poorest areas
bottom10 = final_df.orderBy(col("avg_income").asc()).limit(10)
bottom10_corr = bottom10.stat.corr("avg_income", "annual_avg_crime_rate_per_person")


elapsed = time.time() - start_time


# Show results

print("Total correlation:", total_corr)
print("Top 10 richest COMM correlation:", top10_corr)
print("Bottom 10 poorest COMM correlation:", bottom10_corr)
print(f"Elapsed time: {elapsed:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total correlation: 0.1502766830319662
Top 10 richest COMM correlation: -0.3735207290423716
Bottom 10 poorest COMM correlation: 0.018338409553787367
Elapsed time: 84.02 seconds

In [9]:
income_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('Income) AS avg_income#292]
+- Join Inner, (ZipCode#265 = ZCTA20#70)
   :- Project [ZipCode#265, Income#271]
   :  +- Filter (((isnotnull(ZipCode#265) AND NOT (ZipCode#265 = )) AND NOT (ZipCode#265 = NULL)) AND isnotnull(Income#271))
   :     +- Project [Zip Code#259, Community#260, Estimated Median Income#261, ZipCode#265, cast(regexp_replace(Estimated Median Income#261, [$,], , 1) as float) AS Income#271]
   :        +- Filter RLIKE(Community#260, (?i)Los Angeles)
   :           +- Project [Zip Code#259, Community#260, Estimated Median Income#261, trim(Zip Code#259, None) AS ZipCode#265]
   :              +- Relation [Zip Code#259,Community#260,Estimated Median Income#261] csv
   +- Deduplicate [ZCTA20#70, COMM#50]
      +- Filter ((isnotnull(ZCTA20#70) AND NOT (ZCTA20#70 = )) AND NOT (ZCTA20#70 = NULL))
         +- Project [ZCTA20#70, COMM#50]
            +- Filter (CITY#46 = Los Angeles)
               +- Project [properties

In [11]:
crime_per_comm_year.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
Project [COMM#50, year#352, TotalPopulation#222L, coalesce(annual_crime_count#516L, cast(0 as bigint)) AS annual_crime_count#525L]
+- Aggregate [COMM#50, year#352, TotalPopulation#222L], [COMM#50, year#352, TotalPopulation#222L, count(1) AS annual_crime_count#516L]
   +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
      :- Project [DR_NO#295, DateRptd#296, DATEOCC#297, TIMEOCC#298, AREA#299, AREANAME#300, RptDistNo#301, Part#302, CrmCd#303, Crm Cd Desc#304, Mocodes#305, Vict Age#306, VictSex#307, VictDescent#308, PremisCd#309, PremisDesc#310, WeaponUsedCd#311, WeaponDesc#312, Status#313, Status Desc#314, CrmCd1#315, CrmCd2#316, CrmCd3#317, CrmCd4#318, ... 6 more fields]
      :  +- Filter year#352 IN (2020,2021)
      :     +- Project [DR_NO#295, DateRptd#296, DATEOCC#297, TIMEOCC#298, AREA#299, AREANAME#300, RptDistNo#301, Part#302, CrmCd#303, Crm Cd Desc#304, Mocodes#305, Vict Age#306, VictSex#307, VictDescent#308, PremisCd#309, Pr

In [13]:
crime_per_comm.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Aggregate ['COMM], ['COMM, avg('crime_rate_per_person) AS annual_avg_crime_rate_per_person#542]
+- Project [COMM#50, year#352, TotalPopulation#222L, annual_crime_count#525L, (cast(annual_crime_count#525L as double) / cast(TotalPopulation#222L as double)) AS crime_rate_per_person#530]
   +- Project [COMM#50, year#352, TotalPopulation#222L, coalesce(annual_crime_count#516L, cast(0 as bigint)) AS annual_crime_count#525L]
      +- Aggregate [COMM#50, year#352, TotalPopulation#222L], [COMM#50, year#352, TotalPopulation#222L, count(1) AS annual_crime_count#516L]
         +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
            :- Project [DR_NO#295, DateRptd#296, DATEOCC#297, TIMEOCC#298, AREA#299, AREANAME#300, RptDistNo#301, Part#302, CrmCd#303, Crm Cd Desc#304, Mocodes#305, Vict Age#306, VictSex#307, VictDescent#308, PremisCd#309, PremisDesc#310, WeaponUsedCd#311, WeaponDesc#312, Status#313, Status Desc#314, CrmCd1#315, CrmCd2#316, C

In [14]:
final_df.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [COMM])
:- Aggregate [COMM#50], [COMM#50, avg(crime_rate_per_person#530) AS annual_avg_crime_rate_per_person#542]
:  +- Project [COMM#50, year#352, TotalPopulation#222L, annual_crime_count#525L, (cast(annual_crime_count#525L as double) / cast(TotalPopulation#222L as double)) AS crime_rate_per_person#530]
:     +- Project [COMM#50, year#352, TotalPopulation#222L, coalesce(annual_crime_count#516L, cast(0 as bigint)) AS annual_crime_count#525L]
:        +- Aggregate [COMM#50, year#352, TotalPopulation#222L], [COMM#50, year#352, TotalPopulation#222L, count(1) AS annual_crime_count#516L]
:           +- Join Inner,  **org.apache.spark.sql.sedona_sql.expressions.ST_Within**
:              :- Project [DR_NO#295, DateRptd#296, DATEOCC#297, TIMEOCC#298, AREA#299, AREANAME#300, RptDistNo#301, Part#302, CrmCd#303, Crm Cd Desc#304, Mocodes#305, Vict Age#306, VictSex#307, VictDescent#308, PremisCd#309, PremisDesc#310, WeaponUsedCd#311, WeaponDesc#312,