In [35]:
%%configure -f
{
    "conf":{
        "spark.executor.instances": "4",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
850,application_1761923966900_0862,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
847,application_1761923966900_0859,pyspark,idle,Link,Link,,
848,application_1761923966900_0860,pyspark,idle,Link,Link,,
849,application_1761923966900_0861,pyspark,idle,Link,Link,,
850,application_1761923966900_0862,pyspark,idle,Link,Link,,✔


In [37]:
# Implementation 1: DataFrame API
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, explode, trim, col, collect_list, concat_ws
import time


spark = SparkSession \
    .builder \
    .appName("Query 3 implementation w Dataframe") \
    .getOrCreate()

#Πρωτα πρέπει να μετατρέψουμε το txt αρχειο που περιεχει τις περιγραφές και τα MO codes σε ενα dataframe
codes_df = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Extract first 4 digits → code
codes_df = codes_df.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1))

# Extract the rest → description
codes_df = codes_df.withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1))

# Drop original raw column
codes_df = codes_df.drop("value")
#codes_df.show()
#mexri edw eimaste top

# Define the schema for the employees DataFrame
crimes_schema = StructType([
    StructField("dr_no", StringType()),
    StructField("date_rptd", StringType()),
    StructField("date_occ", StringType()),
    StructField("time_occ", StringType()),
    StructField("area", StringType()),
    StructField("area_name", StringType()),
    StructField("rpt_dist_no", StringType()),
    StructField("part_1_2", IntegerType()),
    StructField("crm_cd", StringType()),
    StructField("crm_cd_desc", StringType()),
    StructField("mocodes", StringType()),
    StructField("vict_age", StringType()),
    StructField("vict_sex", StringType()),
    StructField("vict_descent", StringType()),
    StructField("premis_cd", StringType()),
    StructField("premis_desc", StringType()),
    StructField("weapon_used_cd", StringType()),
    StructField("weapon_desc", StringType()),
    StructField("status", StringType()),
    StructField("status_desc", StringType()),
    StructField("crm_cd_1",StringType()),
    StructField("crm_cd_2",StringType()),
    StructField("crm_cd_3",StringType()),
    StructField("crm_cd_4",StringType()),
    StructField("location", StringType()),
    StructField("cross_street", StringType()),
    StructField("lat", FloatType()),
    StructField("lon", FloatType()),
])

crimes_2010_2019_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", \
    header=False, \
    schema=crimes_schema)

crimes_2020_2025_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                                     header=False, \
                                     schema=crimes_schema)

#Union both datasets to have all the data available for the query
crimes_total_df = crimes_2010_2019_df.union(crimes_2020_2025_df)

sorted_mo_df = crimes_total_df.groupBy("mocodes").count().orderBy(col("count").desc())
#sorted_mo_df.show()
#Επειδη το MOcodes εχει πολλές τετραδες, θα φτιάξουμε ενα extended dataset με μεμονομενα mocodes για να βρουμε ολα τα description με left outer join και μετα θα τα μετρησουμε για βρουμε εμφανιση για καθε code

# 1. Split mocodes σε array και explode σε ξεχωριστές γραμμές
expanded_df = crimes_total_df \
    .withColumn("mo_array", split(col("mocodes"), " ")) \
    .withColumn("mo_code", explode(col("mo_array"))) \
    .withColumn("mo_code", trim(col("mo_code"))) \
    .filter(col("mo_code") != "")

start_time=time.time()
# 2. Join με το λεξικό των περιγραφών
joined_df = expanded_df.join(
    codes_df,
    expanded_df["mo_code"] == codes_df["mocodes"],
    "left"
)
end_time=time.time()
result_df = joined_df.groupBy("mo_code","description").count().orderBy(col("count").desc())

result_df.show(truncate=False)
result_df.explain("cost")

print("Dataframe API Execution time for Query 3 (join): {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------------------------------------------------------------------+-------+
|mo_code|description                                                                     |count  |
+-------+--------------------------------------------------------------------------------+-------+
|0344   |Removes vict property                                                           |1002900|
|1822   |Stranger                                                                        |548422 |
|0416   |Hit-Hit w/ weapon                                                               |404773 |
|0329   |Vandalized                                                                      |377536 |
|0913   |Victim knew Suspect                                                             |278618 |
|2000   |Domestic violence                                                               |256188 |
|1300   |Vehicle involved                                                                |219082 |
|0400   |F

In [43]:
#broadcast
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, explode, trim, col, collect_list, concat_ws
import time


spark = SparkSession \
    .builder \
    .appName("Query 3 implementation w Dataframe") \
    .getOrCreate()

#Πρωτα πρέπει να μετατρέψουμε το txt αρχειο που περιεχει τις περιγραφές και τα MO codes σε ενα dataframe
codes_df = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Extract first 4 digits → code
codes_df = codes_df.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1))

# Extract the rest → description
codes_df = codes_df.withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1))

# Drop original raw column
codes_df = codes_df.drop("value")
#codes_df.show()
#mexri edw eimaste top

# Define the schema for the employees DataFrame
crimes_schema = StructType([
    StructField("dr_no", StringType()),
    StructField("date_rptd", StringType()),
    StructField("date_occ", StringType()),
    StructField("time_occ", StringType()),
    StructField("area", StringType()),
    StructField("area_name", StringType()),
    StructField("rpt_dist_no", StringType()),
    StructField("part_1_2", IntegerType()),
    StructField("crm_cd", StringType()),
    StructField("crm_cd_desc", StringType()),
    StructField("mocodes", StringType()),
    StructField("vict_age", StringType()),
    StructField("vict_sex", StringType()),
    StructField("vict_descent", StringType()),
    StructField("premis_cd", StringType()),
    StructField("premis_desc", StringType()),
    StructField("weapon_used_cd", StringType()),
    StructField("weapon_desc", StringType()),
    StructField("status", StringType()),
    StructField("status_desc", StringType()),
    StructField("crm_cd_1",StringType()),
    StructField("crm_cd_2",StringType()),
    StructField("crm_cd_3",StringType()),
    StructField("crm_cd_4",StringType()),
    StructField("location", StringType()),
    StructField("cross_street", StringType()),
    StructField("lat", FloatType()),
    StructField("lon", FloatType()),
])

crimes_2010_2019_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", \
    header=False, \
    schema=crimes_schema)

crimes_2020_2025_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                                     header=False, \
                                     schema=crimes_schema)

#Union both datasets to have all the data available for the query
crimes_total_df = crimes_2010_2019_df.union(crimes_2020_2025_df)

sorted_mo_df = crimes_total_df.groupBy("mocodes").count().orderBy(col("count").desc())
#sorted_mo_df.show()
#Επειδη το MOcodes εχει πολλές τετραδες, θα φτιάξουμε ενα extended dataset με μεμονομενα mocodes για να βρουμε ολα τα description με left outer join και μετα θα τα μετρησουμε για βρουμε εμφανιση για καθε code

# 1. Split mocodes σε array και explode σε ξεχωριστές γραμμές
expanded_df = crimes_total_df \
    .withColumn("mo_array", split(col("mocodes"), " ")) \
    .withColumn("mo_code", explode(col("mo_array"))) \
    .withColumn("mo_code", trim(col("mo_code"))) \
    .filter(col("mo_code") != "")

start_time=time.time()
# 2. Join με το λεξικό των περιγραφών
broadcast_join = expanded_df.join(
    broadcast(codes_df),
    expanded_df["mo_code"] == codes_df["mocodes"],
    "left"
)
end_time=time.time()
broadcast_join.explain(mode="formatted")

result_df = broadcast_join.groupBy("mo_code","description").count().orderBy(col("count").desc())

result_df.show(truncate=False)
result_df.explain("cost")

print("Dataframe API Execution time for Query 3 (broadcast): {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (16)
+- BroadcastHashJoin LeftOuter BuildRight (15)
   :- Project (10)
   :  +- Filter (9)
   :     +- Generate (8)
   :        +- Union (7)
   :           :- Project (3)
   :           :  +- Filter (2)
   :           :     +- Scan csv  (1)
   :           +- Project (6)
   :              +- Filter (5)
   :                 +- Scan csv  (4)
   +- BroadcastExchange (14)
      +- Project (13)
         +- Filter (12)
            +- Scan text  (11)


(1) Scan csv 
Output [28]: [dr_no#2767, date_rptd#2768, date_occ#2769, time_occ#2770, area#2771, area_name#2772, rpt_dist_no#2773, part_1_2#2774, crm_cd#2775, crm_cd_desc#2776, mocodes#2777, vict_age#2778, vict_sex#2779, vict_descent#2780, premis_cd#2781, premis_desc#2782, weapon_used_cd#2783, weapon_desc#2784, status#2785, status_desc#2786, crm_cd_1#2787, crm_cd_2#2788, crm_cd_3#2789, crm_cd_4#2790, location#2791, cross_street#2792, lat#2793, lon#2794]
Batched: false
Location: InMemoryFileIndex [s3://initia

In [47]:
#Merge
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, explode, trim, col, collect_list, concat_ws
import time


spark = SparkSession \
    .builder \
    .appName("Query 3 implementation w Dataframe") \
    .getOrCreate()

#Πρωτα πρέπει να μετατρέψουμε το txt αρχειο που περιεχει τις περιγραφές και τα MO codes σε ενα dataframe
codes_df = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Extract first 4 digits → code
codes_df = codes_df.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1))

# Extract the rest → description
codes_df = codes_df.withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1))

# Drop original raw column
codes_df = codes_df.drop("value")
#codes_df.show()
#mexri edw eimaste top

# Define the schema for the employees DataFrame
crimes_schema = StructType([
    StructField("dr_no", StringType()),
    StructField("date_rptd", StringType()),
    StructField("date_occ", StringType()),
    StructField("time_occ", StringType()),
    StructField("area", StringType()),
    StructField("area_name", StringType()),
    StructField("rpt_dist_no", StringType()),
    StructField("part_1_2", IntegerType()),
    StructField("crm_cd", StringType()),
    StructField("crm_cd_desc", StringType()),
    StructField("mocodes", StringType()),
    StructField("vict_age", StringType()),
    StructField("vict_sex", StringType()),
    StructField("vict_descent", StringType()),
    StructField("premis_cd", StringType()),
    StructField("premis_desc", StringType()),
    StructField("weapon_used_cd", StringType()),
    StructField("weapon_desc", StringType()),
    StructField("status", StringType()),
    StructField("status_desc", StringType()),
    StructField("crm_cd_1",StringType()),
    StructField("crm_cd_2",StringType()),
    StructField("crm_cd_3",StringType()),
    StructField("crm_cd_4",StringType()),
    StructField("location", StringType()),
    StructField("cross_street", StringType()),
    StructField("lat", FloatType()),
    StructField("lon", FloatType()),
])

crimes_2010_2019_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", \
    header=False, \
    schema=crimes_schema)

crimes_2020_2025_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                                     header=False, \
                                     schema=crimes_schema)

#Union both datasets to have all the data available for the query
crimes_total_df = crimes_2010_2019_df.union(crimes_2020_2025_df)

sorted_mo_df = crimes_total_df.groupBy("mocodes").count().orderBy(col("count").desc())
#sorted_mo_df.show()
#Επειδη το MOcodes εχει πολλές τετραδες, θα φτιάξουμε ενα extended dataset με μεμονομενα mocodes για να βρουμε ολα τα description με left outer join και μετα θα τα μετρησουμε για βρουμε εμφανιση για καθε code

# 1. Split mocodes σε array και explode σε ξεχωριστές γραμμές
expanded_df = crimes_total_df \
    .withColumn("mo_array", split(col("mocodes"), " ")) \
    .withColumn("mo_code", explode(col("mo_array"))) \
    .withColumn("mo_code", trim(col("mo_code"))) \
    .filter(col("mo_code") != "")

# 2. Join με το λεξικό των περιγραφών
start_time=time.time()
merge_join = expanded_df.hint("merge") \
    .join(codes_df.hint("merge"), expanded_df["mo_code"] == codes_df["mocodes"], "left")
end_time=time.time()
merge_join.explain(mode="formatted")

result_df = merge_join.groupBy("mo_code","description").count().orderBy(col("count").desc())

result_df.show(truncate=False)
#result_df.explain("cost")

print("Dataframe API Execution time for Query 3 (merge): {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (19)
+- SortMergeJoin LeftOuter (18)
   :- Sort (12)
   :  +- Exchange (11)
   :     +- Project (10)
   :        +- Filter (9)
   :           +- Generate (8)
   :              +- Union (7)
   :                 :- Project (3)
   :                 :  +- Filter (2)
   :                 :     +- Scan csv  (1)
   :                 +- Project (6)
   :                    +- Filter (5)
   :                       +- Scan csv  (4)
   +- Sort (17)
      +- Exchange (16)
         +- Project (15)
            +- Filter (14)
               +- Scan text  (13)


(1) Scan csv 
Output [28]: [dr_no#4588, date_rptd#4589, date_occ#4590, time_occ#4591, area#4592, area_name#4593, rpt_dist_no#4594, part_1_2#4595, crm_cd#4596, crm_cd_desc#4597, mocodes#4598, vict_age#4599, vict_sex#4600, vict_descent#4601, premis_cd#4602, premis_desc#4603, weapon_used_cd#4604, weapon_desc#4605, status#4606, status_desc#4607, crm_cd_1#4608, crm_cd_2#4609, crm_cd_3#4610, crm_cd_4#4611, locati

In [45]:
#shuffle hash
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, explode, trim, col, collect_list, concat_ws
import time


spark = SparkSession \
    .builder \
    .appName("Query 3 implementation w Dataframe") \
    .getOrCreate()

#Πρωτα πρέπει να μετατρέψουμε το txt αρχειο που περιεχει τις περιγραφές και τα MO codes σε ενα dataframe
codes_df = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Extract first 4 digits → code
codes_df = codes_df.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1))

# Extract the rest → description
codes_df = codes_df.withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1))

# Drop original raw column
codes_df = codes_df.drop("value")
#codes_df.show()
#mexri edw eimaste top

# Define the schema for the employees DataFrame
crimes_schema = StructType([
    StructField("dr_no", StringType()),
    StructField("date_rptd", StringType()),
    StructField("date_occ", StringType()),
    StructField("time_occ", StringType()),
    StructField("area", StringType()),
    StructField("area_name", StringType()),
    StructField("rpt_dist_no", StringType()),
    StructField("part_1_2", IntegerType()),
    StructField("crm_cd", StringType()),
    StructField("crm_cd_desc", StringType()),
    StructField("mocodes", StringType()),
    StructField("vict_age", StringType()),
    StructField("vict_sex", StringType()),
    StructField("vict_descent", StringType()),
    StructField("premis_cd", StringType()),
    StructField("premis_desc", StringType()),
    StructField("weapon_used_cd", StringType()),
    StructField("weapon_desc", StringType()),
    StructField("status", StringType()),
    StructField("status_desc", StringType()),
    StructField("crm_cd_1",StringType()),
    StructField("crm_cd_2",StringType()),
    StructField("crm_cd_3",StringType()),
    StructField("crm_cd_4",StringType()),
    StructField("location", StringType()),
    StructField("cross_street", StringType()),
    StructField("lat", FloatType()),
    StructField("lon", FloatType()),
])

crimes_2010_2019_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", \
    header=False, \
    schema=crimes_schema)

crimes_2020_2025_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                                     header=False, \
                                     schema=crimes_schema)

#Union both datasets to have all the data available for the query
crimes_total_df = crimes_2010_2019_df.union(crimes_2020_2025_df)

sorted_mo_df = crimes_total_df.groupBy("mocodes").count().orderBy(col("count").desc())
#sorted_mo_df.show()
#Επειδη το MOcodes εχει πολλές τετραδες, θα φτιάξουμε ενα extended dataset με μεμονομενα mocodes για να βρουμε ολα τα description με left outer join και μετα θα τα μετρησουμε για βρουμε εμφανιση για καθε code

# 1. Split mocodes σε array και explode σε ξεχωριστές γραμμές
expanded_df = crimes_total_df \
    .withColumn("mo_array", split(col("mocodes"), " ")) \
    .withColumn("mo_code", explode(col("mo_array"))) \
    .withColumn("mo_code", trim(col("mo_code"))) \
    .filter(col("mo_code") != "")

# 2. Join με το λεξικό των περιγραφών
start_time=time.time()
shuffle_hash_join = expanded_df.hint("shuffle_hash") \
    .join(codes_df.hint("shuffle_hash"), expanded_df["mo_code"] == codes_df["mocodes"], "left")
end_time=time.time()
shuffle_hash_join.explain(mode="formatted")

result_df = shuffle_hash_join.groupBy("mo_code","description").count().orderBy(col("count").desc())

result_df.show(truncate=False)
#result_df.explain("cost")

print("Dataframe API Execution time for Query 3 (shuffle hash): {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (17)
+- ShuffledHashJoin LeftOuter BuildRight (16)
   :- Exchange (11)
   :  +- Project (10)
   :     +- Filter (9)
   :        +- Generate (8)
   :           +- Union (7)
   :              :- Project (3)
   :              :  +- Filter (2)
   :              :     +- Scan csv  (1)
   :              +- Project (6)
   :                 +- Filter (5)
   :                    +- Scan csv  (4)
   +- Exchange (15)
      +- Project (14)
         +- Filter (13)
            +- Scan text  (12)


(1) Scan csv 
Output [28]: [dr_no#3678, date_rptd#3679, date_occ#3680, time_occ#3681, area#3682, area_name#3683, rpt_dist_no#3684, part_1_2#3685, crm_cd#3686, crm_cd_desc#3687, mocodes#3688, vict_age#3689, vict_sex#3690, vict_descent#3691, premis_cd#3692, premis_desc#3693, weapon_used_cd#3694, weapon_desc#3695, status#3696, status_desc#3697, crm_cd_1#3698, crm_cd_2#3699, crm_cd_3#3700, crm_cd_4#3701, location#3702, cross_street#3703, lat#3704, lon#3705]
Batched: false


In [46]:
#shuffle replicate nl
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, explode, trim, col, collect_list, concat_ws
import time


spark = SparkSession \
    .builder \
    .appName("Query 3 implementation w Dataframe") \
    .getOrCreate()

#Πρωτα πρέπει να μετατρέψουμε το txt αρχειο που περιεχει τις περιγραφές και τα MO codes σε ενα dataframe
codes_df = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")

# Extract first 4 digits → code
codes_df = codes_df.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1))

# Extract the rest → description
codes_df = codes_df.withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1))

# Drop original raw column
codes_df = codes_df.drop("value")
#codes_df.show()
#mexri edw eimaste top

# Define the schema for the employees DataFrame
crimes_schema = StructType([
    StructField("dr_no", StringType()),
    StructField("date_rptd", StringType()),
    StructField("date_occ", StringType()),
    StructField("time_occ", StringType()),
    StructField("area", StringType()),
    StructField("area_name", StringType()),
    StructField("rpt_dist_no", StringType()),
    StructField("part_1_2", IntegerType()),
    StructField("crm_cd", StringType()),
    StructField("crm_cd_desc", StringType()),
    StructField("mocodes", StringType()),
    StructField("vict_age", StringType()),
    StructField("vict_sex", StringType()),
    StructField("vict_descent", StringType()),
    StructField("premis_cd", StringType()),
    StructField("premis_desc", StringType()),
    StructField("weapon_used_cd", StringType()),
    StructField("weapon_desc", StringType()),
    StructField("status", StringType()),
    StructField("status_desc", StringType()),
    StructField("crm_cd_1",StringType()),
    StructField("crm_cd_2",StringType()),
    StructField("crm_cd_3",StringType()),
    StructField("crm_cd_4",StringType()),
    StructField("location", StringType()),
    StructField("cross_street", StringType()),
    StructField("lat", FloatType()),
    StructField("lon", FloatType()),
])

crimes_2010_2019_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv", \
    header=False, \
    schema=crimes_schema)

crimes_2020_2025_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv", \
                                     header=False, \
                                     schema=crimes_schema)

#Union both datasets to have all the data available for the query
crimes_total_df = crimes_2010_2019_df.union(crimes_2020_2025_df)

sorted_mo_df = crimes_total_df.groupBy("mocodes").count().orderBy(col("count").desc())
#sorted_mo_df.show()
#Επειδη το MOcodes εχει πολλές τετραδες, θα φτιάξουμε ενα extended dataset με μεμονομενα mocodes για να βρουμε ολα τα description με left outer join και μετα θα τα μετρησουμε για βρουμε εμφανιση για καθε code

# 1. Split mocodes σε array και explode σε ξεχωριστές γραμμές
expanded_df = crimes_total_df \
    .withColumn("mo_array", split(col("mocodes"), " ")) \
    .withColumn("mo_code", explode(col("mo_array"))) \
    .withColumn("mo_code", trim(col("mo_code"))) \
    .filter(col("mo_code") != "")

# 2. Join με το λεξικό των περιγραφών
start_time=time.time()
shuffle_repl_nl = expanded_df.hint("shuffle_replicate_nl") \
    .join(codes_df.hint("shuffle_replicate_nl"), expanded_df["mo_code"] == codes_df["mocodes"], "left")
end_time=time.time()
shuffle_repl_nl.explain(mode="formatted")
result_df = shuffle_repl_nl.groupBy("mo_code","description").count().orderBy(col("count").desc())

result_df.show(truncate=False)
#result_df.explain("cost")

print("Dataframe API Execution time for Query 3 (shuffle replicate nl): {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (16)
+- BroadcastHashJoin LeftOuter BuildRight (15)
   :- Project (10)
   :  +- Filter (9)
   :     +- Generate (8)
   :        +- Union (7)
   :           :- Project (3)
   :           :  +- Filter (2)
   :           :     +- Scan csv  (1)
   :           +- Project (6)
   :              +- Filter (5)
   :                 +- Scan csv  (4)
   +- BroadcastExchange (14)
      +- Project (13)
         +- Filter (12)
            +- Scan text  (11)


(1) Scan csv 
Output [28]: [dr_no#4133, date_rptd#4134, date_occ#4135, time_occ#4136, area#4137, area_name#4138, rpt_dist_no#4139, part_1_2#4140, crm_cd#4141, crm_cd_desc#4142, mocodes#4143, vict_age#4144, vict_sex#4145, vict_descent#4146, premis_cd#4147, premis_desc#4148, weapon_used_cd#4149, weapon_desc#4150, status#4151, status_desc#4152, crm_cd_1#4153, crm_cd_2#4154, crm_cd_3#4155, crm_cd_4#4156, location#4157, cross_street#4158, lat#4159, lon#4160]
Batched: false
Location: InMemoryFileIndex [s3://initia

In [42]:
#implementation using RDD
from pyspark.sql import SparkSession
import time


sc = SparkSession \
    .builder \
    .appName("Query 3 implemented with RDD") \
    .getOrCreate() \
    .sparkContext

#Φορτώνουμε τα δεδομενα   
crimes_2010_2019 = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv") \
                .map(lambda x: (x.split(","))) # Split lines into a list of elements -> delimiter: ","

crimes_2020_2025 = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv") \
                .map(lambda x: (x.split(","))) # Split lines into a list of elements -> delimiter: ","

crimes=crimes_2010_2019.union(crimes_2020_2025)

codes = spark.read.text("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/MO_codes.txt")
codes = codes.withColumn("mocodes", regexp_extract(col("value"), r"^(\d{4})", 1)) \
                   .withColumn("description", regexp_extract(col("value"), r"^\d{4}\s+(.*)$", 1)) \
                   .drop("value")

codes_rdd = codes.rdd.map(lambda row: (row['mocodes'], row['description']))


def extract_mocodes(row):
    mocodes_str = row[10]  # 11η στήλη στο CSV
    if not mocodes_str:
        return []
    # split σε πολλούς κωδικούς αν υπάρχουν
    return [(c.strip(), 1) for c in mocodes_str.split() if c.strip()]

crimes_rdd = crimes.flatMap(extract_mocodes)

# --- 4. Μετράμε συχνότητες εμφάνισης ανά MO code ---
counts_rdd = crimes_rdd.reduceByKey(lambda a, b: a + b)  # (mocode, count)
start_time=time.time()
# --- 5. Κλασικό join με το codes_rdd για descriptions ---
joined_rdd = counts_rdd.join(codes_rdd)  # (mocode, (count, description))

# --- 6. Ταξινόμηση κατά φθίνουσα συχνότητα ---
result_rdd = joined_rdd.map(lambda x: (x[0], x[1][1], x[1][0])) \
                       .sortBy(lambda x: x[2], ascending=False)
end_time = time.time()
# --- 7. Προβολή αποτελεσμάτων ---
for row in result_rdd.take(10):
    print(row)


print("RDD classic join execution time: {:.4f} sec".format(end_time - start_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('0344', 'Removes vict property', 207949)
('0416', 'Hit-Hit w/ weapon', 164631)
('1822', 'Stranger', 123039)
('0913', 'Victim knew Suspect', 104781)
('0400', 'Force used', 75879)
('1814', 'Susp is/was current/former boyfriend/girlfriend', 74044)
('2000', 'Domestic violence', 64107)
('1300', 'Vehicle involved', 63441)
('0444', 'Pushed', 62748)
('1813', 'Susp is/was current/former spouse/co-habitant', 52548)
RDD classic join execution time: 9.7663 sec