Configuration settings for queries 1,2 and 3

In [1]:
%%configure -f
{
  "conf": {
    "spark.executor.instances": "4",
    "spark.executor.cores": "1",
    "spark.executor.memory": "2g"
  }
}


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
426,application_1761923966900_0441,pyspark,idle,Link,Link,,
430,application_1761923966900_0445,pyspark,idle,Link,Link,,


General initialization for the whole notebook

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import time

# store timings here
timings_q1 = {}
timings_q2 = {}
timings_q3 = {}
timings_q4 = {}
timings_q5 = {}

# Paths
path_crime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
path_crime_2020_2025 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

# Load both CSVs
crime_2010_2019_df = spark.read.csv(path_crime_2010_2019, header=True, inferSchema=True)
crime_2020_2025_df = spark.read.csv(path_crime_2020_2025, header=True, inferSchema=True)

# Union & cache
crime_df = crime_2010_2019_df.unionByName(crime_2020_2025_df).cache()

print("Total rows in combined crime_df:", crime_df.count())
crime_df.show(5)


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
431,application_1761923966900_0446,pyspark,idle,Link,Link,,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Total rows in combined crime_df: 3138128
+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|       Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|        Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+-

Query 1

In [3]:
# 1. Filter to aggravated assault incidents
aggr_df = crime_df.filter(
    F.lower(F.col("Crm Cd Desc")).contains("aggravated assault")
)

print("Rows with aggravated assault:", aggr_df.count())

# 2. Add integer age column and keep only valid ages
aggr_age_df = (
    aggr_df
    .withColumn("Vict_Age_int", F.col("Vict Age").cast("int"))
    .filter(F.col("Vict_Age_int").isNotNull() & (F.col("Vict_Age_int") > 0))
)

aggr_age_df.select("Vict Age", "Vict_Age_int", "Crm Cd Desc").show(10, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Rows with aggravated assault: 177443
+--------+------------+----------------------------------------------+
|Vict Age|Vict_Age_int|Crm Cd Desc                                   |
+--------+------------+----------------------------------------------+
|51      |51          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|30      |30          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|18      |18          |INTIMATE PARTNER - AGGRAVATED ASSAULT         |
|61      |61          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|40      |40          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|23      |23          |INTIMATE PARTNER - AGGRAVATED ASSAULT         |
|62      |62          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|34      |34          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|27      |27          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
|49      |49          |ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT|
+--------+------------+-----------------

DF no UDF

In [4]:
# Build the age_group column using only built-in expressions
df_no_udf = (
    aggr_age_df
    .withColumn(
        "age_group",
        F.when(F.col("Vict_Age_int") < 18, "Children")
         .when((F.col("Vict_Age_int") >= 18) & (F.col("Vict_Age_int") <= 24), "Young adults")
         .when((F.col("Vict_Age_int") >= 25) & (F.col("Vict_Age_int") <= 64), "Adults")
         .otherwise("Elderly")
    )
)

q1_df_no_udf = (
    df_no_udf
    .groupBy("age_group")
    .agg(F.count("*").alias("victim_count"))
    .orderBy(F.col("victim_count").desc())
)

# Time the first action (collect)
start = time.perf_counter()
q1_df_no_udf_results = q1_df_no_udf.collect()
end = time.perf_counter()

timings_q1["df_no_udf"] = end - start
print(f"DataFrame (no UDF) execution time: {end - start:.3f} seconds\n")

print("Query 1 : DataFrame (no UDF) results:")
for row in q1_df_no_udf_results:
    print(row["age_group"], row["victim_count"])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

DataFrame (no UDF) execution time: 3.489 seconds

Query 1 : DataFrame (no UDF) results:
Adults 121660
Young adults 33758
Children 10904
Elderly 6011

DF with UDF

In [5]:
# Python function for age ‚Üí group
def age_to_group(age):
    if age is None:
        return None
    try:
        a = int(age)
    except (TypeError, ValueError):
        return None

    if a <= 0:
        return None
    if a < 18:
        return "Children"
    if 18 <= a <= 24:
        return "Young adults"
    if 25 <= a <= 64:
        return "Adults"
    return "Elderly"

age_to_group_udf = F.udf(age_to_group, StringType())

# Apply UDF
df_with_udf = (
    aggr_age_df
    .withColumn("age_group_udf", age_to_group_udf(F.col("Vict_Age_int")))
    .filter(F.col("age_group_udf").isNotNull())
)

q1_df_with_udf = (
    df_with_udf
    .groupBy("age_group_udf")
    .agg(F.count("*").alias("victim_count"))
    .orderBy(F.col("victim_count").desc())
)

# Time the action
start = time.perf_counter()
q1_df_with_udf_results = q1_df_with_udf.collect()
end = time.perf_counter()

timings_q1["df_with_udf"] = end - start
print(f"DataFrame (with UDF) execution time: {end - start:.3f} seconds\n")

print("Query 1 : DataFrame (with UDF) results:")
for row in q1_df_with_udf_results:
    print(row["age_group_udf"], row["victim_count"])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

DataFrame (with UDF) execution time: 5.215 seconds

Query 1 : DataFrame (with UDF) results:
Adults 121660
Young adults 33758
Children 10904
Elderly 6011

RDD

In [6]:
# RDD mapping function, reusing the same logic
def age_to_group_for_rdd(row):
    age_val = row["Vict Age"]
    try:
        a = int(age_val)
    except (TypeError, ValueError):
        return None

    if a <= 0:
        return None
    if a < 18:
        group = "Children"
    elif 18 <= a <= 24:
        group = "Young adults"
    elif 25 <= a <= 64:
        group = "Adults"
    else:
        group = "Elderly"
    return (group, 1)

# Start from the *unprocessed* aggravated-assault DF (aggr_df),
# because we assume RDD does its own casting/filtering
aggr_rdd = aggr_df.select("Vict Age").rdd

age_group_pairs_rdd = (
    aggr_rdd
    .map(age_to_group_for_rdd)
    .filter(lambda x: x is not None)
)

age_group_counts_rdd = age_group_pairs_rdd.reduceByKey(lambda x, y: x + y)
q1_rdd = age_group_counts_rdd.sortBy(lambda x: x[1], ascending=False)

# Time the action
start = time.perf_counter()
q1_rdd_results = q1_rdd.collect()
end = time.perf_counter()

timings_q1["rdd"] = end - start
print(f"RDD execution time: {end - start:.3f} seconds\n")

print("Query 1 : RDD results (age_group, victim_count):")
for group, count in q1_rdd_results:
    print(group, count)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

RDD execution time: 0.843 seconds

Query 1 : RDD results (age_group, victim_count):
Adults 121660
Young adults 33758
Children 10904
Elderly 6011

In [7]:
print("=== Query 1 : Timing Summary (seconds) ===")
for name, t in timings_q1.items():
    print(f"{name}: {t:.3f}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

=== Query 1 : Timing Summary (seconds) ===
df_no_udf: 3.489
df_with_udf: 5.215
rdd: 0.843

Save in s3 (run once)

In [8]:
q1_df_no_udf.select("age_group", "victim_count") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .option("header", "true") \
    .csv("s3a://groups-bucket-dblab-905418150721/group9/query1_results/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [9]:
path_re_codes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv"

re_df = spark.read.csv(path_re_codes, header=True, inferSchema=True)
re_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

+------------+-----------------+
|Vict Descent|Vict Descent Full|
+------------+-----------------+
|           A|      Other Asian|
|           B|            Black|
|           C|          Chinese|
|           D|        Cambodian|
|           F|         Filipino|
+------------+-----------------+
only showing top 5 rows

Query 2

In [10]:
#Query 2
from pyspark.sql.window import Window

# 1. Œ†Œ±ŒØœÅŒΩŒøœÖŒºŒµ ŒºœåŒΩŒø œÑŒø Œ∫ŒøŒºŒºŒ¨œÑŒπ "yyyy MMM dd" Œ±œÄœå œÑŒø DATE OCC Œ∫Œ±Œπ œÜœÑŒπŒ¨œáŒΩŒøœÖŒºŒµ occ_date + year
df_with_year = (
    crime_df
    .withColumn(
        "occ_date",
        F.to_date(
            F.substring(F.col("DATE OCC"), 1, 11),  # "2020 Nov 07"
            "yyyy MMM dd"
        )
    )
    .withColumn("year", F.year(F.col("occ_date")))
)

# 2. Œ¶ŒπŒªœÑœÅŒ¨œÅŒøœÖŒºŒµ Œ≥œÅŒ±ŒºŒºŒ≠œÇ œáœâœÅŒØœÇ year ŒÆ œáœâœÅŒØœÇ Vict Descent code
df_q2_base = (
    df_with_year
    .filter(F.col("year").isNotNull())
    .filter(F.col("Vict Descent").isNotNull() & (F.col("Vict Descent") != ""))
)

# 3. Join ŒºŒµ RE_codes Œ≥ŒπŒ± ŒΩŒ± œÄŒ¨œÅŒøœÖŒºŒµ œÑŒ∑ŒΩ œÄŒªŒÆœÅŒ∑ œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆ œÜœÖŒªŒÆœÇ
#    Œ§Œø join Œ≥ŒØŒΩŒµœÑŒ±Œπ œÄŒ¨ŒΩœâ œÉœÑŒ∑ œÉœÑŒÆŒªŒ∑ "Vict Descent"
df_q2_joined = df_q2_base.join(
    re_df.select("Vict Descent", "Vict Descent Full"),
    on="Vict Descent",
    how="left"
)

df_q2_joined.select("DATE OCC", "occ_date", "year", "Vict Descent", "Vict Descent Full").show(10, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

+-----------------------+----------+----+------------+----------------------+
|DATE OCC               |occ_date  |year|Vict Descent|Vict Descent Full     |
+-----------------------+----------+----+------------+----------------------+
|2010 Feb 20 12:00:00 AM|2010-02-20|2010|H           |Hispanic/Latin/Mexican|
|2010 Sep 12 12:00:00 AM|2010-09-12|2010|W           |White                 |
|2010 Aug 09 12:00:00 AM|2010-08-09|2010|H           |Hispanic/Latin/Mexican|
|2010 Jan 05 12:00:00 AM|2010-01-05|2010|W           |White                 |
|2010 Jan 02 12:00:00 AM|2010-01-02|2010|H           |Hispanic/Latin/Mexican|
|2010 Jan 04 12:00:00 AM|2010-01-04|2010|B           |Black                 |
|2010 Jan 07 12:00:00 AM|2010-01-07|2010|H           |Hispanic/Latin/Mexican|
|2010 Jan 08 12:00:00 AM|2010-01-08|2010|B           |Black                 |
|2010 Jan 09 12:00:00 AM|2010-01-09|2010|H           |Hispanic/Latin/Mexican|
|2010 Jan 06 12:00:00 AM|2010-01-06|2010|W           |White     

DF API

In [11]:
# 4. ŒúŒµœÑœÅŒ¨ŒºŒµ victims Œ±ŒΩŒ¨ (year, Vict Descent Full)
df_counts = (
    df_q2_joined
    .groupBy("year", "Vict Descent Full")
    .agg(F.count("*").alias("victim_count"))
)

# 5. Œ†œÅŒøœÉŒ∏Œ≠œÑŒøœÖŒºŒµ total victims per year
w_year = Window.partitionBy("year")

df_with_totals = (
    df_counts
    .withColumn("total_victims_year", F.sum("victim_count").over(w_year))
    .withColumn(
        "percentage",
        (F.col("victim_count") / F.col("total_victims_year") * 100.0)
    )
)

# 6. Top-3 Œ±ŒΩŒ¨ Œ≠œÑŒøœÇ (Œ≤Œ¨œÉŒµŒπ victim_count)
w_rank = Window.partitionBy("year").orderBy(F.col("victim_count").desc())

df_ranked = (
    df_with_totals
    .withColumn("rn", F.row_number().over(w_rank))
    .filter(F.col("rn") <= 3)
)

# 7. timing Œ†Œ°ŒôŒù œÑŒø .show()
start = time.perf_counter()
q2_df_results = df_ranked.collect()
end = time.perf_counter()

timings_q2["df"] = end - start
print(f"Query 2 : DataFrame execution time: {end - start:.3f} s\n")

# 8. pretty output (Œ±œÄœå œÑŒø ŒØŒ¥ŒπŒø df_ranked)
print("=== Query 2 : DataFrame Results (Pretty Table) ===")
(
    df_ranked
    .select(
        F.col("year").alias("Year"),
        F.col("Vict Descent Full").alias("Victim Descent"),
        F.col("victim_count").alias("Victims"),
        F.round("percentage", 2).alias("Percent")
    )
    .orderBy(
        F.col("Year").desc(),
        F.col("Victims").desc()
    )
    .show(50, truncate=False)
)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 2 : DataFrame execution time: 4.670 s

=== Query 2 : DataFrame Results (Pretty Table) ===
+----+----------------------+-------+-------+
|Year|Victim Descent        |Victims|Percent|
+----+----------------------+-------+-------+
|2025|Hispanic/Latin/Mexican|34     |40.48  |
|2025|Unknown               |24     |28.57  |
|2025|White                 |13     |15.48  |
|2024|Hispanic/Latin/Mexican|28576  |29.05  |
|2024|White                 |22958  |23.34  |
|2024|Unknown               |19984  |20.32  |
|2023|Hispanic/Latin/Mexican|69401  |34.55  |
|2023|White                 |44615  |22.21  |
|2023|Black                 |30504  |15.19  |
|2022|Hispanic/Latin/Mexican|73111  |35.64  |
|2022|White                 |46695  |22.76  |
|2022|Black                 |34634  |16.88  |
|2021|Hispanic/Latin/Mexican|63676  |35.08  |
|2021|White                 |44523  |24.53  |
|2021|Black                 |30173  |16.62  |
|2020|Hispanic/Latin/Mexican|61606  |35.33  |
|2020|White                 |4

SQL API

In [12]:
# 1. ŒöŒ¨ŒΩŒøœÖŒºŒµ œÑŒ± DataFrames Œ¥ŒπŒ±Œ∏Œ≠œÉŒπŒºŒ± œâœÇ œÄœÅŒøœÉœâœÅŒπŒΩŒ¨ views
crime_df.createOrReplaceTempView("crime")
re_df.createOrReplaceTempView("re_codes")

q2_sql = """
WITH base AS (
    SELECT
        -- ŒíŒ≥Œ¨Œ∂ŒøœÖŒºŒµ œÑŒø Œ≠œÑŒøœÇ Œ±œÄœå œÑŒ± œÄœÅœéœÑŒ± 11 chars œÑŒ∑œÇ DATE OCC ("yyyy MMM dd")
        YEAR(
            TO_DATE(
                SUBSTRING(c.`DATE OCC`, 1, 11),
                'yyyy MMM dd'
            )
        ) AS year,
        r.`Vict Descent Full` AS victim_descent_full,
        COUNT(*) AS victim_count
    FROM crime c
    JOIN re_codes r
      ON c.`Vict Descent` = r.`Vict Descent`
    WHERE c.`DATE OCC` IS NOT NULL
      AND c.`Vict Descent` IS NOT NULL
      AND c.`Vict Descent` <> ''
    GROUP BY
        YEAR(
            TO_DATE(
                SUBSTRING(c.`DATE OCC`, 1, 11),
                'yyyy MMM dd'
            )
        ),
        r.`Vict Descent Full`
),
with_totals AS (
    SELECT
        year,
        victim_descent_full,
        victim_count,
        SUM(victim_count) OVER (PARTITION BY year) AS total_victims_year,
        victim_count * 100.0 / SUM(victim_count) OVER (PARTITION BY year) AS percentage
    FROM base
),
ranked AS (
    SELECT
        year,
        victim_descent_full,
        victim_count,
        percentage,
        ROW_NUMBER() OVER (PARTITION BY year ORDER BY victim_count DESC) AS rn
    FROM with_totals
)
SELECT
    year,
    victim_descent_full AS `Victim Descent`,
    victim_count,
    percentage
FROM ranked
WHERE rn <= 3
"""

# 2. Œ§œÅŒ≠œáŒøœÖŒºŒµ œÑŒø query Œ∫Œ±Œπ ŒºŒµœÑœÅŒ¨ŒºŒµ œáœÅœåŒΩŒø (SQL implementation)
start = time.perf_counter()
q2_sql_df = spark.sql(q2_sql)
q2_sql_results = q2_sql_df.collect()
end = time.perf_counter()

timings_q2["sql"] = end - start
print(f"Query 2 : SQL execution time: {end - start:.3f} s\n")

# 3. ŒåŒºŒøœÅœÜŒø output, ŒºŒµ Œ≠œÑŒ∑ œÉŒµ œÜŒ∏ŒØŒΩŒøœÖœÉŒ± œÉŒµŒπœÅŒ¨ Œ∫Œ±Œπ œÄŒøœÉŒøœÉœÑŒ¨ œÉœÑœÅŒøŒ≥Œ≥œÖŒªŒµŒºŒ≠ŒΩŒ±
print("=== Query 2 : SQL Results (Pretty Table) ===")
(
    q2_sql_df
    .select(
        F.col("year").alias("Year"),
        F.col("Victim Descent").alias("Victim Descent"),
        F.col("victim_count").alias("Victims"),
        F.round("percentage", 2).alias("Percent")
    )
    .orderBy(
        F.col("Year").desc(),
        F.col("Victims").desc()
    )
    .show(50, truncate=False)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 2 : SQL execution time: 3.227 s

=== Query 2 : SQL Results (Pretty Table) ===
+----+----------------------+-------+-------+
|Year|Victim Descent        |Victims|Percent|
+----+----------------------+-------+-------+
|2025|Hispanic/Latin/Mexican|34     |40.48  |
|2025|Unknown               |24     |28.57  |
|2025|White                 |13     |15.48  |
|2024|Hispanic/Latin/Mexican|28576  |29.05  |
|2024|White                 |22958  |23.34  |
|2024|Unknown               |19984  |20.32  |
|2023|Hispanic/Latin/Mexican|69401  |34.55  |
|2023|White                 |44615  |22.21  |
|2023|Black                 |30504  |15.19  |
|2022|Hispanic/Latin/Mexican|73111  |35.64  |
|2022|White                 |46695  |22.76  |
|2022|Black                 |34634  |16.88  |
|2021|Hispanic/Latin/Mexican|63676  |35.08  |
|2021|White                 |44523  |24.53  |
|2021|Black                 |30173  |16.62  |
|2020|Hispanic/Latin/Mexican|61606  |35.33  |
|2020|White                 |42638  |24.45

In [13]:
print("=== Query 2: Timing Summary (seconds) ===")
for name, t in timings_q2.items():
    print(f"{name}: {t:.3f}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

=== Query 2: Timing Summary (seconds) ===
df: 4.670
sql: 3.227

Save in S3 (run once)

In [14]:
q2_sorted_output = (
    df_ranked
    .select(
        F.col("year").alias("Year"),
        F.col("Vict Descent Full").alias("Victim_Descent"),
        F.col("victim_count").alias("Victims"),
        F.round("percentage", 2).alias("Percent")
    )
    .orderBy(
        F.col("Year").desc(),
        F.col("Victims").desc()
    )
)

q2_sorted_output.coalesce(1).write.mode("overwrite").option("header", "true") \
    .csv("s3a://groups-bucket-dblab-905418150721/group9/query2_results/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 3

In [15]:
path_mo_codes = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt"

# ŒîŒπŒ±Œ≤Œ¨Œ∂ŒøœÖŒºŒµ œâœÇ Œ±œÄŒªœå text (ŒºŒØŒ± œÉœÑŒÆŒªŒ∑: value)
mo_raw = spark.read.text(path_mo_codes)

# Œ£œÄŒ¨ŒºŒµ œÑŒ∑ Œ≥œÅŒ±ŒºŒºŒÆ œÉŒµ 2 ŒºŒ≠œÅŒ∑:
#   - œÄœÅœéœÑŒø token = Œ∫œâŒ¥ŒπŒ∫œåœÇ
#   - œÖœÄœåŒªŒøŒπœÄŒø string = œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆ
mo_df = (
    mo_raw
    .withColumn(
        "parts",
        F.split(F.col("value"), r"\s+", 2)   # limit=2 ‚Üí Œ∫œÅŒ±œÑŒ¨ŒºŒµ: [code, full description]
    )
    .select(
        F.col("parts").getItem(0).alias("mo_code"),
        F.col("parts").getItem(1).alias("mo_description")
    )
)

mo_df.show(10, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

+-------+-------------------+
|mo_code|mo_description     |
+-------+-------------------+
|0100   |Suspect Impersonate|
|0101   |Aid victim         |
|0102   |Blind              |
|0103   |Physically disabled|
|0104   |Customer           |
|0105   |Delivery           |
|0106   |Doctor             |
|0107   |God                |
|0108   |Infirm             |
|0109   |Inspector          |
+-------+-------------------+
only showing top 10 rows

DF API

In [16]:
# 1. ŒöœÅŒ±œÑŒ¨ŒºŒµ ŒºœåŒΩŒø œÑŒ∑ œÉœÑŒÆŒªŒ∑ Mocodes Œ∫Œ±Œπ œÄŒµœÑŒ¨ŒºŒµ null / Œ∫ŒµŒΩŒ≠œÇ œÑŒπŒºŒ≠œÇ
df_mo_base = (
    crime_df
    .select("Mocodes")
    .filter(F.col("Mocodes").isNotNull() & (F.col("Mocodes") != ""))
)

# 2. Œ£œÄŒ¨ŒºŒµ œÑŒø string œÉŒµ array Œ±œÄœå Œ∫œâŒ¥ŒπŒ∫ŒøœçœÇ ŒºŒµ Œ≤Œ¨œÉŒ∑ œÑŒ± Œ∫ŒµŒΩŒ¨
df_mo_array = df_mo_base.withColumn(
    "mo_code",
    F.explode(
        F.split(F.col("Mocodes"), r"\s+")   # split œÉŒµ Œ≠ŒΩŒ± ŒÆ œÄŒµœÅŒπœÉœÉœåœÑŒµœÅŒ± Œ∫ŒµŒΩŒ¨
    )
)

# 3. ŒöŒ±Œ∏Œ±œÅŒØŒ∂ŒøœÖŒºŒµ Œ∫ŒµŒΩŒ¨ / Œ∫ŒµŒΩŒ≠œÇ œÑŒπŒºŒ≠œÇ
df_mo_clean = (
    df_mo_array
    .withColumn("mo_code", F.trim(F.col("mo_code")))
    .filter(F.col("mo_code") != "")
)

# 4. Œ•œÄŒøŒªŒøŒ≥ŒØŒ∂ŒøœÖŒºŒµ œÉœÖœáŒΩœåœÑŒ∑œÑŒ± ŒµŒºœÜŒ¨ŒΩŒπœÉŒ∑œÇ Œ±ŒΩŒ¨ Œ∫œâŒ¥ŒπŒ∫œå
df_mo_counts = (
    df_mo_clean
    .groupBy("mo_code")
    .agg(F.count("*").alias("freq"))
)

# 5. Join ŒºŒµ mo_df Œ≥ŒπŒ± ŒΩŒ± œÄŒ¨œÅŒøœÖŒºŒµ œÑŒ∑ŒΩ œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆ œÑŒ∑œÇ ŒºŒµŒ∏œåŒ¥ŒøœÖ
df_mo_joined = df_mo_counts.join(
    mo_df.select("mo_code", "mo_description"),
    on="mo_code",
    how="left"
)

# 6. Œ§ŒµŒªŒπŒ∫œå DataFrame Œ≥ŒπŒ± Query 3 (ŒºŒøŒΩŒø Œ∫œâŒ¥ŒπŒ∫œåœÇ, œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆ, œÉœÖœáŒΩœåœÑŒ∑œÑŒ±)
df_q3_df = (
    df_mo_joined
    .select("mo_code", "mo_description", "freq")
    .orderBy(F.col("freq").desc(), F.col("mo_code"))
)

# 7. Timing DataFrame œÖŒªŒøœÄŒøŒØŒ∑œÉŒ∑œÇ
start = time.perf_counter()
q3_df_results = df_q3_df.collect()   # action ‚Üí œÑœÅŒ≠œáŒµŒπ œåŒªŒø œÑŒø query
end = time.perf_counter()

timings_q3["df"] = end - start
print(f"Query 3 : DataFrame execution time: {end - start:.3f} s\n")

# 8. ŒåŒºŒøœÅœÜŒø output (top 50)
print("=== Query 3 : DataFrame Results (top 50) ===")
(
    df_q3_df
    .select(
        F.col("mo_code").alias("MO Code"),
        F.col("mo_description").alias("Description"),
        F.col("freq").alias("Frequency")
    )
    .orderBy(F.col("Frequency").desc(), F.col("MO Code"))
    .show(50, truncate=False)
)

#Save Ouyput in S3
q3_sorted_output = (
    df_q3_df
    .select(
        F.col("mo_code").alias("MO Code"),
        F.col("mo_description").alias("Description"),
        F.col("freq").alias("Frequency")
    )
    .orderBy(F.col("Frequency").desc(), F.col("MO Code"))
)

q3_sorted_output.coalesce(1).write.mode("overwrite").option("header", "true") \
    .csv("s3a://groups-bucket-dblab-905418150721/group9/query3_results/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 3 : DataFrame execution time: 3.325 s

=== Query 3 : DataFrame Results (top 50) ===
+-------+--------------------------------------------------------------------------------+---------+
|MO Code|Description                                                                     |Frequency|
+-------+--------------------------------------------------------------------------------+---------+
|0344   |Removes vict property                                                           |1002900  |
|1822   |Stranger                                                                        |548422   |
|0416   |Hit-Hit w/ weapon                                                               |404773   |
|0329   |Vandalized                                                                      |377536   |
|0913   |Victim knew Suspect                                                             |278618   |
|2000   |Domestic violence                                                               |256188   |
|

RDD API

In [17]:
# 1. Œ¶œÑŒπŒ¨œáŒΩŒøœÖŒºŒµ dict Œ±œÄœå œÑŒø mo_df Œ≥ŒπŒ± lookup œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆœÇ œÉœÑŒø RDD
mo_dict = {
    row["mo_code"]: row["mo_description"]
    for row in mo_df.select("mo_code", "mo_description").collect()
}

mo_bcast = spark.sparkContext.broadcast(mo_dict)

# 2. RDD œÄŒ¨ŒΩœâ Œ±œÄœå œÑŒ∑ŒΩ œÉœÑŒÆŒªŒ∑ Mocodes
mo_rdd = (
    crime_df
    .select("Mocodes")
    .rdd
    .flatMap(
        lambda row: row["Mocodes"].split() if row["Mocodes"] is not None and row["Mocodes"].strip() != "" else []
    )
    .map(lambda code: code.strip())
    .filter(lambda code: code != "")
)

# 3. Count Œ±ŒΩŒ¨ Œ∫œâŒ¥ŒπŒ∫œå ŒºŒµ reduceByKey
mo_rdd_counts = (
    mo_rdd
    .map(lambda code: (code, 1))
    .reduceByKey(lambda x, y: x + y)
)

# 4. Œ†œÅŒøœÉŒ∏Œ≠œÑŒøœÖŒºŒµ œÄŒµœÅŒπŒ≥œÅŒ±œÜŒÆ Œ±œÄœå œÑŒø broadcast dict
mo_rdd_with_desc = mo_rdd_counts.map(
    lambda kv: (kv[0], mo_bcast.value.get(kv[0]), kv[1])  # (code, description, freq)
)

# 5. Œ§Œ±ŒæŒπŒΩœåŒºŒ∑œÉŒ∑ Œ∫Œ±œÑŒ¨ œÉœÖœáŒΩœåœÑŒ∑œÑŒ± (œÜŒ∏ŒØŒΩŒøœÖœÉŒ±)
mo_rdd_sorted = mo_rdd_with_desc.sortBy(lambda x: x[2], ascending=False)

# 6. Timing RDD œÖŒªŒøœÄŒøŒØŒ∑œÉŒ∑œÇ
start = time.perf_counter()
q3_rdd_results = mo_rdd_sorted.collect()
end = time.perf_counter()

timings_q3["rdd"] = end - start
print(f"Query 3 : RDD execution time: {end - start:.3f} s\n")

print("=== Query 3 : RDD Results (top 50) ===")
for code, desc, freq in q3_rdd_results[:50]:
    print(f"{code} | {desc} | {freq}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 3 : RDD execution time: 0.966 s

=== Query 3 : RDD Results (top 50) ===
0344 | Removes vict property | 1002900
1822 | Stranger | 548422
0416 | Hit-Hit w/ weapon | 404773
0329 | Vandalized | 377536
0913 | Victim knew Suspect | 278618
2000 | Domestic violence | 256188
1300 | Vehicle involved | 219082
0400 | Force used | 213165
1402 | Evidence Booked (any crime) | 177470
1609 | Smashed | 131229
1309 | Susp uses vehicle | 122108
1202 | Victim was aged (60 & over) or blind/physically disabled/unable to care for self | 120238
0325 | Took merchandise | 120159
1814 | Susp is/was current/former boyfriend/girlfriend | 118073
0444 | Pushed | 116763
1501 | Other MO (see rpt) | 115589
1307 | Breaks window | 113609
0334 | Brandishes weapon | 105665
2004 | Suspect is homeless/transient | 93426
0432 | Intimidation | 83562
0342 | Multi-susps overwhelm | 81230
0421 | Threaten to kill | 81036
0906 | Gangs | 78910
0429 | Vict knocked to ground | 77442
1813 | Susp is/was current/former spouse/co-habi

Different join strategies. Saved the plans in s3

In [18]:
import io
import contextlib

timings_q3_joins = {}
plans_text = ""  

plans_s3_path = "s3a://groups-bucket-dblab-905418150721/group9/query3_plan_details/"


def run_join_and_capture(name, right_df_with_hint):
    global plans_text
    
    joined = df_mo_counts.join(
        right_df_with_hint,
        on="mo_code",
        how="left"
    )
    
    # Œ†ŒπŒ¨ŒΩŒøœÖŒºŒµ œÑŒø explain œÉŒµ string (ŒîŒïŒù œÑŒø œÑœÖœÄœéŒΩŒøœÖŒºŒµ œÉœÑŒ∑ŒΩ ŒøŒ∏œåŒΩŒ∑)
    buf = io.StringIO()
    with contextlib.redirect_stdout(buf):
        joined.explain("formatted")
    plan_str = buf.getvalue()
    
    # Œ†œÅŒøœÉŒ∏Œ≠œÑŒøœÖŒºŒµ header Œ∫Œ±Œπ œÑŒø ŒØŒ¥ŒπŒø œÑŒø œÄŒªŒ¨ŒΩŒø œÉœÑŒø ŒºŒµŒ≥Œ¨ŒªŒø Œ∫ŒµŒØŒºŒµŒΩŒø
    plans_text += f"\n\n=== {name} ===\n{plan_str}"
    
    # ŒúŒµœÑœÅŒ¨ŒºŒµ œáœÅœåŒΩŒø ŒºŒµ Œ≠ŒΩŒ± Œ±œÄŒªœå action (count)
    start = time.perf_counter()
    _ = joined.count()
    end = time.perf_counter()
    
    timings_q3_joins[name] = end - start


# Œ§œÅŒ≠œáŒøœÖŒºŒµ œåŒªŒµœÇ œÑŒπœÇ œÉœÑœÅŒ±œÑŒ∑Œ≥ŒπŒ∫Œ≠œÇ
run_join_and_capture("default", mo_df)
run_join_and_capture("broadcast", mo_df.hint("broadcast"))
run_join_and_capture("merge", mo_df.hint("merge"))
run_join_and_capture("shuffle_hash", mo_df.hint("shuffle_hash"))
run_join_and_capture("shuffle_replicate_nl", mo_df.hint("shuffle_replicate_nl"))

## ŒëœÄŒøŒ∏ŒÆŒ∫ŒµœÖœÉŒ∑ ŒüŒõŒüŒ• œÑŒøœÖ Œ∫ŒµŒπŒºŒ≠ŒΩŒøœÖ ŒºŒµ œÑŒ± plans œÉŒµ ŒïŒùŒë txt œÉœÑŒø S3 
#(œÑŒø ŒµŒ∫Œ±ŒΩŒ± comment Œ≥ŒπŒ±œÑŒπ ŒºŒøœÖ ŒµŒ≤Œ≥Œ±Œ∂Œµ error Œ±ŒΩ ŒµŒπœáŒµ œÑŒø ŒπŒ¥ŒπŒø ŒøŒΩŒøŒºŒ± œÉœÑŒø path vœâœÉœÑŒµ ŒΩŒ± ŒºŒ∑ŒΩ œÑœÅŒµœáŒµŒπ ŒºŒµ Œ∫Œ±Œ∏Œµ 
#Invoke œÑŒøœÖ notebook) 
# plans_rdd = spark.sparkContext.parallelize([plans_text], 1)  # 1 partition ŒºœåŒΩŒø
# plans_rdd.saveAsTextFile(plans_s3_path)


# Œ£œÑŒø notebook Œ¥ŒµŒØœáŒΩŒøœÖŒºŒµ ŒúŒüŒùŒü œÑŒøœÖœÇ œáœÅœåŒΩŒøœÖœÇ
print("=== Query 3 : Join Strategy Timings (seconds) ===")
for name, t in timings_q3_joins.items():
    print(f"{name}: {t:.3f} s")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

=== Query 3 : Join Strategy Timings (seconds) ===
default: 1.510 s
broadcast: 0.943 s
merge: 1.277 s
shuffle_hash: 1.090 s
shuffle_replicate_nl: 0.860 s

Query 4
TŒø Query 4 ŒµœáŒµŒπ ŒµŒΩŒ±ŒΩ ŒπŒ¥ŒπŒ±ŒπœÑŒµœÅŒø œÑœÅŒøœÄŒø ŒΩŒ± œÑŒø œÑœÅŒµœáŒµŒπœÇ.Œ†œÅœâœÑŒ± ŒµœÄŒπŒªŒµŒ≥ŒµŒπœÇ œÑŒø configuration œÄŒøœÖ Œ∏ŒµŒªŒµŒπœÇ œÑœÅŒµŒæŒµŒπœÇ œÑŒø Œ∫ŒµŒªŒπ ŒºŒµ œÑŒøŒΩ Œ∫œéŒ¥ŒπŒ∫Œ± Œ∫Œ±Œπ ŒºŒµœÑŒ± œÑŒø Œ±ŒΩœÑŒπœÉœÑŒøŒπœáœâ Œ±œÄŒø œÑŒ± 3 function Œ±ŒºŒµœÉœâœÇ ŒºŒµœÑŒ± Œ≥ŒπŒ± ŒΩŒ± œÉŒøœÖ ŒµŒºœÜŒ±ŒΩŒπœÉœÑŒøœçŒΩ œÑŒ± Œ±ŒΩœÑŒØœÉœÑŒøŒπœáŒ± Œ¥ŒµŒ¥ŒøŒºŒ≠ŒΩŒ±. ŒüœÄŒøœÑŒµ Œ±ŒΩ Œ∏ŒµŒªœâ ŒΩŒ± œÑœÅŒµŒæœâ œÑŒø œÄœÅœâœÑŒø configuration œÑœÅŒµœáœâ œÑŒø Œ∫Œ±œÑŒ¨ŒªŒªŒ∑ŒªŒø Œ∫ŒµŒªŒπ Œ∫Œ±Œπ œÑŒ≠ŒªŒøœÇ œÑœÅŒ≠œáœâ œÑŒø Œ±ŒΩœÑŒØœÉœÑŒøŒπœáŒø function œÑŒ∑œÇ ŒºŒøœÅœÜŒÆœÇ q4_df_2x1_2g = run_q4("2x1_2g").Œ§Œ± œÖœÄœåŒªŒøŒπœÄŒ± ŒµŒπŒΩŒ±Œπ œÉŒµ ŒºŒøœÅœÜŒ∑ comment œâœÉœÑŒµ ŒΩŒ± ŒºŒ∑ŒΩ Œ¥ŒπŒ±Œ∫œåœÄœÑŒøœÖŒΩ œÑŒ∑ŒΩ œÅŒøŒ∑ œÑŒøœÖ Notebook Œ±ŒΩ œÑœÅŒµœáŒ∏ŒµŒπ ŒøŒªŒø ŒºŒ±Œ∂Œπ.

In [19]:
# %%configure -f

# {
#   "conf": {
#     "spark.executor.instances": "2",
#     "spark.executor.cores": "1",
#     "spark.executor.memory": "2g"
#   }
# }


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [20]:
# %%configure -f
# {
#   "conf": {
#     "spark.executor.instances": "2",
#     "spark.executor.cores": "2",
#     "spark.executor.memory": "4g"
#   }
# }


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [21]:
# %%configure -f
# {
#   "conf": {
#     "spark.executor.instances": "2",
#     "spark.executor.cores": "4",
#     "spark.executor.memory": "8g"
#   }
# }


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [39]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time
from sedona.register import SedonaRegistrator

SedonaRegistrator.registerAll(spark)

timings_q4 = {}
print("Imports & Sedona OK")

# Œ¶ŒøœÅœÑœéŒΩŒµŒπœÇ crime_df, stations_df

path_crime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
path_crime_2020_2025 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

crime_2010_2019_df = spark.read.csv(path_crime_2010_2019, header=True, inferSchema=True)
crime_2020_2025_df = spark.read.csv(path_crime_2020_2025, header=True, inferSchema=True)

crime_df = crime_2010_2019_df.unionByName(crime_2020_2025_df)

path_stations = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv"
stations_df = spark.read.csv(path_stations, header=True, inferSchema=True)

print("Loaded crime_df and stations_df")

STATION_NAME_COL = "DIVISION"
STATION_LAT_COL  = "Y"
STATION_LON_COL  = "X"

crime_geo_df = (
    crime_df
    .filter(F.col("LAT").isNotNull() & F.col("LON").isNotNull())
    .filter(~((F.col("LAT") == 0) & (F.col("LON") == 0)))
    .withColumn("crime_geom", F.expr("ST_Point(CAST(LON AS DOUBLE), CAST(LAT AS DOUBLE))"))
    .select(F.col("DR_NO").alias("crime_id"), "LAT", "LON", "crime_geom")
)

stations_geo_df = (
    stations_df
    .filter(F.col(STATION_LAT_COL).isNotNull() & F.col(STATION_LON_COL).isNotNull())
    .filter(~((F.col(STATION_LAT_COL) == 0) & (F.col(STATION_LON_COL) == 0)))
    .withColumn("station_geom", F.expr(f"ST_Point(CAST({STATION_LON_COL} AS DOUBLE), CAST({STATION_LAT_COL} AS DOUBLE))"))
    .select(
        F.col(STATION_NAME_COL).alias("division"),   
        F.col(STATION_LAT_COL).alias("station_lat"),
        F.col(STATION_LON_COL).alias("station_lon"),
        "station_geom"
    )
)

print("crime_geo_df rows:", crime_geo_df.count())
print("stations_geo_df rows:", stations_geo_df.count())

q4_s3_path = "s3a://groups-bucket-dblab-905418150721/group9/query4_results/"

def build_q4_df():
    crime_station_pairs = crime_geo_df.crossJoin(stations_geo_df)

    crime_station_with_dist = crime_station_pairs.withColumn(
        "distance_deg",
        F.expr("ST_Distance(crime_geom, station_geom)")
    ).withColumn(
        "distance_km",
        F.col("distance_deg") * F.lit(111.0)
    )

    w_nearest = Window.partitionBy("crime_id").orderBy(F.col("distance_deg").asc())

    nearest_station_per_crime = (
        crime_station_with_dist
        .withColumn("rn", F.row_number().over(w_nearest))
        .filter(F.col("rn") == 1)
        .select("crime_id", "division", "distance_km")
    )

    # ŒïŒ¥œé Œ∫œÅŒ±œÑŒ¨ŒºŒµ œÑŒ∑ŒΩ "œâŒºŒÆ" ŒºŒ≠œÉŒ∑ Œ±œÄœåœÉœÑŒ±œÉŒ∑ œâœÇ avg_distance_km
    q4_result_df = (
        nearest_station_per_crime
        .groupBy("division")
        .agg(
            F.count("*").alias("crime_count"),
            F.avg("distance_km").alias("avg_distance_km")
        )
        .orderBy(F.col("crime_count").desc())
    )

    return q4_result_df


def run_q4(config_label, save_to_s3=False, show_sample=False):
    global timings_q4
    q4_result_df = build_q4_df()

    # ŒßœÅŒøŒΩŒπœÉŒºœåœÇ œÄŒ¨ŒΩœâ œÉœÑŒø "œâŒºœå" DF
    start = time.perf_counter()
    _ = q4_result_df.count()
    end = time.perf_counter()
    elapsed = end - start
    timings_q4[config_label] = elapsed
    print(f"[{config_label}] Query 4 execution time: {elapsed:.3f} s")

    # üîπ Œ¶œÑŒπŒ¨œáŒΩŒøœÖŒºŒµ ŒúŒüŒ°Œ¶ŒüŒ†ŒüŒôŒóŒúŒïŒùŒü output:
    # 1) division
    # 2) average_distance (3 Œ¥ŒµŒ∫Œ±Œ¥ŒπŒ∫Œ¨)
    # 3) crime_count (count)
    q4_formatted_df = (
        q4_result_df
        .select(
            F.col("division"),
            F.round("avg_distance_km", 3).alias("average_distance"),
            F.col("crime_count")
        )
        .orderBy(F.col("crime_count").desc())
    )

    if show_sample:
        q4_formatted_df.show(20, truncate=False)

    if save_to_s3:
        (
            q4_formatted_df
            .coalesce(1)
            .write
            .mode("overwrite")
            .option("header", "true")
            .csv(q4_s3_path)
        )
        print("Saved formatted results to:", q4_s3_path)

    return q4_formatted_df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Imports & Sedona OK
Loaded crime_df and stations_df
crime_geo_df rows: 3134980
stations_geo_df rows: 21

In [23]:
q4_df_2x1_2g = run_q4("2x1_2g", save_to_s3=True, show_sample=True)
#q4_df_2x2_4g = run_q4("2x2_4g")
#q4_df_2x4_8g = run_q4("2x4_8g")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

[2x1_2g] Query 4 execution time: 11.048 s
+----------------+----------------+-----------+
|division        |average_distance|crime_count|
+----------------+----------------+-----------+
|HOLLYWOOD       |2.266           |212904     |
|VAN NUYS        |3.167           |209295     |
|WILSHIRE        |2.921           |198499     |
|SOUTHWEST       |2.395           |186976     |
|OLYMPIC         |1.927           |172255     |
|NORTH HOLLYWOOD |2.898           |171399     |
|77TH STREET     |1.842           |166133     |
|PACIFIC         |4.165           |158098     |
|CENTRAL         |1.096           |155274     |
|RAMPART         |1.636           |150293     |
|SOUTHEAST       |2.703           |143597     |
|TOPANGA         |3.6             |139462     |
|WEST VALLEY     |3.207           |129467     |
|HARBOR          |3.85            |127073     |
|WEST LOS ANGELES|3.31            |121301     |
|FOOTHILL        |4.588           |120663     |
|HOLLENBECK      |2.928           |119726     

ŒïŒ¥œé œÄŒ±œÅŒøœÖœÉŒπŒ¨Œ∂ŒøŒΩœÑŒ±Œπ œÄŒ±œÅŒ±Œ¥ŒµŒπŒ≥ŒºŒ±œÑŒπŒ∫Œ¨ Œ∫Œ¨œÄŒøŒπŒøŒπ œáœÅœåŒΩŒøŒπ Œ±œÄŒø ŒºŒØŒ± Œ¥ŒøŒ∫ŒπŒºŒÆ. Œ§ŒøœÖœÇ œÄŒ±œÅŒ±Œ∏Œ≠œÑœâ Œ≥ŒπŒ±œÑŒØ œáœâœÅŒØœÇ œÉœÜŒ±ŒªŒºŒ± Œ≥ŒµŒΩŒØŒ∫ŒµœÖœÉŒ∑œÇ ŒºœÄ[ŒøœÅŒøœÖ ŒΩŒ± Œ≤Œ≥ŒøœÖŒΩ Œ±œÄŒøœÑŒµŒªŒ≠œÉŒºŒ±œÑŒ± Œ≥ŒπŒ± œÑŒøœÖœÇ Œ¥ŒπŒ±œÜŒøœÅŒøœÖœÇ œÑœÅœåœÄŒøœÖœÇ ŒµŒ∫œÑŒ≠ŒªŒµœÉŒ∑œÇ.

[2x1_2g] Query 4 execution time: 16.256 s

[2x2_4g] Query 4 execution time: 10.809 s

[2x4_8g] Query 4 execution time: 8.450 s



Query 5

In [24]:
# %%configure -f

# {
#   "conf": {
#     "spark.executor.instances": "2",
#     "spark.executor.cores": "4",
#     "spark.executor.memory": "8g"
#   }
# }


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
432,application_1761923966900_0447,pyspark,idle,Link,Link,,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
426,application_1761923966900_0441,pyspark,idle,Link,Link,,
430,application_1761923966900_0445,pyspark,idle,Link,Link,,
432,application_1761923966900_0447,pyspark,idle,Link,Link,,‚úî


In [50]:
# %%configure -f

# {
#   "conf": {
#     "spark.executor.instances": "4",
#     "spark.executor.cores": "2",
#     "spark.executor.memory": "4g"
#   }
# }


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
444,application_1761923966900_0459,pyspark,idle,Link,Link,,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
426,application_1761923966900_0441,pyspark,idle,Link,Link,,
430,application_1761923966900_0445,pyspark,idle,Link,Link,,
433,application_1761923966900_0448,pyspark,idle,Link,Link,,
434,application_1761923966900_0449,pyspark,idle,Link,Link,,
436,application_1761923966900_0451,pyspark,idle,Link,Link,,
438,application_1761923966900_0453,pyspark,idle,Link,Link,,
440,application_1761923966900_0455,pyspark,idle,Link,Link,,
443,application_1761923966900_0458,pyspark,idle,Link,Link,,
444,application_1761923966900_0459,pyspark,idle,Link,Link,,‚úî
445,application_1761923966900_0460,pyspark,starting,,,,


In [53]:
%%configure -f

{
  "conf": {
    "spark.executor.instances": "8",
    "spark.executor.cores": "1",
    "spark.executor.memory": "2g"
  }
}


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
450,application_1761923966900_0464,pyspark,idle,Link,Link,,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
426,application_1761923966900_0441,pyspark,idle,Link,Link,,
430,application_1761923966900_0445,pyspark,idle,Link,Link,,
433,application_1761923966900_0448,pyspark,idle,Link,Link,,
434,application_1761923966900_0449,pyspark,idle,Link,Link,,
436,application_1761923966900_0451,pyspark,idle,Link,Link,,
438,application_1761923966900_0453,pyspark,idle,Link,Link,,
440,application_1761923966900_0455,pyspark,idle,Link,Link,,
443,application_1761923966900_0458,pyspark,idle,Link,Link,,
448,application_1761923966900_0462,pyspark,idle,Link,Link,,
450,application_1761923966900_0464,pyspark,idle,Link,Link,,‚úî


In [54]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import corr
from sedona.register import SedonaRegistrator

SedonaRegistrator.registerAll(spark)

print("Imports & Sedona OK")

# ==========================================================
# 1. Load Crime Data (2010‚Äì2019 + 2020‚Äì2025) ‚Üí crime_df
# ==========================================================
path_crime_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv"
path_crime_2020_2025 = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv"

crime_2010_2019_df = spark.read.csv(path_crime_2010_2019, header=True, inferSchema=True)
crime_2020_2025_df = spark.read.csv(path_crime_2020_2025, header=True, inferSchema=True)

crime_df = crime_2010_2019_df.unionByName(crime_2020_2025_df).cache()
print("Loaded crime_df")

# ==========================================================
# 2. Keep crimes in 2020‚Äì2021 & clean coordinates
# ==========================================================
# Œ†œÅŒøœÉœÄŒ±Œ∏ŒøœçŒºŒµ ŒºŒµ Œ¥ŒπŒ¨œÜŒøœÅŒ± œÄŒπŒ∏Œ±ŒΩŒ¨ formats Œ≥ŒπŒ± DATE OCC
# STEP 1 ‚Äî Extract the true date from "DATE OCC"
crime_with_year = (
    crime_df
    # extract "yyyy MMM dd" from the first 11 chars
    .withColumn(
        "occ_date",
        F.to_date(
            F.substring(F.col("DATE OCC"), 1, 11),
            "yyyy MMM dd"
        )
    )
    .withColumn("year", F.year(F.col("occ_date")))
)

# STEP 2 ‚Äî Filter for years 2020‚Äì2021 + valid coordinates
crime_2020_2021 = (
    crime_with_year
    .filter((F.col("year") == 2020) | (F.col("year") == 2021))
    .filter(
        F.col("LAT").isNotNull() &
        F.col("LON").isNotNull() &
        (~((F.col("LAT") == 0.0) & (F.col("LON") == 0.0)))
    )
)


print("Filtered crimes for years 2020‚Äì2021")

# ==========================================================
# 3. Load Census Blocks 2020 from GeoJSON
#    (use properties.COMM, properties.POP20, properties.ZCTA20)
# ==========================================================
census_geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Census_Blocks_2020.geojson"

# ŒîŒπŒ±Œ≤Œ¨Œ∂ŒøœÖŒºŒµ œÑŒø GeoJSON œâœÇ JSON (œÑœÖœÄŒπŒ∫œå schema: type, properties, geometry)
census_raw = spark.read.json(census_geojson_path)

# ŒïŒ¥œé œáœÅŒ∑œÉŒπŒºŒøœÄŒøŒπŒøœçŒºŒµ œÑŒ± ŒøŒΩœåŒºŒ±œÑŒ± Œ±œÄœå œÑŒø fields CSV:
# COMM, POP20, ZCTA20, Œ∫Œ±Œπ ŒºŒµœÑŒ±œÑœÅŒ≠œÄŒøœÖŒºŒµ œÑŒø GeoJSON geometry œÉŒµ Sedona geometry
census_geom = (
    census_raw
    .selectExpr(
        "properties.COMM as COMM",
        "properties.POP20 as POP20",
        "properties.ZCTA20 as ZCTA20",
        "ST_GeomFromGeoJSON(to_json(geometry)) as block_geom"
    )
    # ŒúœÄŒøœÅŒµŒØœÇ œÄœÅŒøŒ±ŒπœÅŒµœÑŒπŒ∫Œ¨ ŒΩŒ± œÜŒπŒªœÑœÅŒ¨œÅŒµŒπœÇ blocks œáœâœÅŒØœÇ œÄŒªŒ∑Œ∏œÖœÉŒºœå
    .filter(F.col("POP20").isNotNull() & (F.col("POP20") > 0))
)

print("Loaded census blocks from GeoJSON")

# ==========================================================
# 4. Build crime points (Sedona geometries)
# ==========================================================
crime_points = (
    crime_2020_2021
    .withColumn(
        "crime_geom",
        F.expr("ST_Point(CAST(LON AS double), CAST(LAT AS double))")
    )
    .select("DR_NO", "year", "crime_geom")
)

crime_points.createOrReplaceTempView("crime_points")
census_geom.createOrReplaceTempView("census_blocks")

print("Created crime_points and census_blocks views")

# ==========================================================
# 5. Spatial join: assign each crime to a COMM (via census block)
# ==========================================================
crime_comm = spark.sql("""
    SELECT
        c.DR_NO,
        c.year,
        b.COMM,
        b.POP20
    FROM crime_points c
    JOIN census_blocks b
      ON ST_Contains(b.block_geom, c.crime_geom)
""")

print("Performed spatial join crime : census blocks")

# ==========================================================
# 6. Crimes per COMM in 2020‚Äì2021 + annual crime rate
# ==========================================================
crimes_per_comm_2y = (
    crime_comm
    .groupBy("COMM")
    .agg(
        F.countDistinct("DR_NO").alias("crime_count_2y"),
        F.first("POP20").alias("pop_2020")
    )
    .withColumn(
        "avg_annual_crime_rate",
        F.col("crime_count_2y") / (F.col("pop_2020") * F.lit(2.0))
    )
)

print("Computed crimes per COMM and avg annual crime rate")

# ==========================================================
# 7. Load Income 2021 per ZIP and clean values
# ==========================================================
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_income_2021.csv"

income_df = (
    spark.read
         .option("header", True)
         .option("sep", ";")
         .csv(income_path)
)

income_clean = (
    income_df
    .withColumn("zip_str", F.col("Zip Code").cast("string"))
    .withColumn(
        "median_income",
        F.regexp_replace("Estimated Median Income", "[$,]", "").cast("double")
    )
    .select("zip_str", "median_income")
)

print("Loaded and cleaned income data")

# ==========================================================
# 8. Join Census Blocks with Income via ZCTA20 ‚Üî ZIP
# ==========================================================
census_zip = (
    census_geom
    .select(
        "COMM",
        "POP20",
        F.col("ZCTA20").cast("string").alias("zip_str")
    )
)

census_income = census_zip.join(income_clean, on="zip_str", how="inner")

# Œ†ŒªŒ∑Œ∏œÖœÉŒºŒøœÉœÑŒ±Œ∏ŒºŒπœÉŒºŒ≠ŒΩŒø ŒºŒ≠œÉŒø ŒµŒπœÉœåŒ¥Œ∑ŒºŒ± Œ±ŒΩŒ¨ COMM
income_per_comm = (
    census_income
    .groupBy("COMM")
    .agg(
        F.sum("POP20").alias("pop_2020_comm"),
        F.sum(F.col("POP20") * F.col("median_income")).alias("income_weighted_sum")
    )
    .withColumn(
        "income_per_capita",
        F.col("income_weighted_sum") / F.col("pop_2020_comm")
    )
    .select("COMM", "income_per_capita")
)

print("Computed population-weighted income per COMM")

# ==========================================================
# 9. Final COMM-level dataframe for correlation
# ==========================================================
comm_stats = (
    crimes_per_comm_2y.alias("cr")
    .join(income_per_comm.alias("inc"), on="COMM", how="inner")
    .select(
        "COMM",
        "pop_2020",
        "crime_count_2y",
        F.round("avg_annual_crime_rate", 4).alias("avg_annual_crime_rate"),
        F.round("income_per_capita", 2).alias("income_per_capita")
    )
    .filter(F.col("income_per_capita").isNotNull())
)

print("Built final COMM stats dataframe")

# Optional: inspect a few rows
comm_stats.show(20, truncate=False)

# ==========================================================
# 10. Correlations: all / top-10 / bottom-10
# ==========================================================
# (Œ±) All COMM
corr_all = comm_stats.select(
    corr("avg_annual_crime_rate", "income_per_capita").alias("corr_all")
).first()["corr_all"]

print("Correlation (all COMM):", corr_all)

# (Œ≤) Top-10 richest COMM (by income_per_capita)
top10 = comm_stats.orderBy(F.col("income_per_capita").desc()).limit(10)
corr_top10 = top10.select(
    corr("avg_annual_crime_rate", "income_per_capita").alias("corr_top10")
).first()["corr_top10"]

print("Correlation (top 10 income):", corr_top10)

# (Œ≥) Bottom-10 poorest COMM
bottom10 = comm_stats.orderBy(F.col("income_per_capita").asc()).limit(10)
corr_bottom10 = bottom10.select(
    corr("avg_annual_crime_rate", "income_per_capita").alias("corr_bottom10")
).first()["corr_bottom10"]

print("Correlation (bottom 10 income):", corr_bottom10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Imports & Sedona OK
Loaded crime_df
Filtered crimes for years 2020?2021
Loaded census blocks from GeoJSON
Created crime_points and census_blocks views
Performed spatial join crime ? census blocks
Computed crimes per COMM and avg annual crime rate
Loaded and cleaned income data
Computed population-weighted income per COMM
Built final COMM stats dataframe
+------------------+--------+--------------+---------------------+-----------------+
|COMM              |pop_2020|crime_count_2y|avg_annual_crime_rate|income_per_capita|
+------------------+--------+--------------+---------------------+-----------------+
|Toluca Terrace    |726     |37            |0.0255               |61761.0          |
|Elysian Park      |113     |608           |2.6903               |57377.12         |
|Longwood          |400     |371           |0.4638               |53659.0          |
|Green Meadows     |108     |2883          |13.3472              |46869.78         |
|Cadillac-Corning  |304     |442           |0.727

In [55]:
import time

print("Measuring TOTAL execution time for Query 5...")

start = time.perf_counter()

# Force full execution of the final DF
_ = comm_stats.count()

end = time.perf_counter()

total_time_q5 = end - start

print(f"\n[Query 5] TOTAL execution time: {total_time_q5:.3f} s")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Measuring TOTAL execution time for Query 5...

[Query 5] TOTAL execution time: 6.756 s

conf1 = 4.626s
conf2 = 5.192s
conf3 = 6.756s

In [47]:
# Path œÉœÑŒø S3 Œ≥ŒπŒ± œÑŒ± Œ±œÄŒøœÑŒµŒªŒ≠œÉŒºŒ±œÑŒ± œÑŒøœÖ Query 5
q5_s3_path = "s3a://groups-bucket-dblab-905418150721/group9/query5_results/"

# ŒúœÄŒøœÅŒøœçŒºŒµ ŒΩŒ± Œ∫Œ¨ŒΩŒøœÖŒºŒµ Œ∫Œ±Œπ ŒªŒØŒ≥Œ∑ ŒºŒøœÅœÜŒøœÄŒøŒØŒ∑œÉŒ∑ (œÄœÅŒøŒ±ŒπœÅŒµœÑŒπŒ∫Œ¨)
q5_output_df = (
    comm_stats
    .select(
        "COMM",
        "pop_2020",
        "crime_count_2y",
        F.round("avg_annual_crime_rate", 8).alias("avg_annual_crime_rate"),
        F.round("income_per_capita", 2).alias("income_per_capita")
    )
    .orderBy(F.col("income_per_capita").desc())
)

# ŒëœÄŒøŒ∏ŒÆŒ∫ŒµœÖœÉŒ∑ œÉŒµ S3 œâœÇ CSV ŒºŒµ header, œÉŒµ Œ≠ŒΩŒ± Œ±œÅœáŒµŒØŒø (coalesce(1))
(
    q5_output_df
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(q5_s3_path)
)

print("Query 5 results saved to:", q5_s3_path)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Query 5 results saved to: s3a://groups-bucket-dblab-905418150721/group9/query5_results/