## Query 1 DataFrame and RDD

In [1]:
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2886,application_1732639283265_2845,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
conf = spark.sparkContext.getConf()

# Print relevant executor settings
print("Executor Instances:", conf.get("spark.executor.instances"))
print("Executor Memory:", conf.get("spark.executor.memory"))
print("Executor Cores:", conf.get("spark.executor.cores"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executor Instances: None
Executor Memory: 4743M
Executor Cores: 2

In [3]:

# Read data
spark = SparkSession \
    .builder \
    .appName("Query 1 dataframe execution from csv") \
    .getOrCreate()

df1 = spark.read.csv('s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv', header=True)
df2 = spark.read.csv('s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv', header=True)

# Union all the data
df_merged_csv = df1.union(df2).dropDuplicates()

df_merged_csv = df_merged_csv.withColumn('Date Rptd', to_timestamp(df_merged_csv['Date Rptd'], 'MM/dd/yyyy hh:mm:ss a'))
df_merged_csv = df_merged_csv.withColumn('DATE OCC', to_timestamp(df_merged_csv['DATE OCC'], 'MM/dd/yyyy hh:mm:ss a'))
df_merged_csv = df_merged_csv.withColumn("Vict Age", col("Vict Age").cast("int"))
df_merged_csv = df_merged_csv.withColumn("LAT", col("LAT").cast("double"))
df_merged_csv = df_merged_csv.withColumn("LON", col("LON").cast("double"))
df_merged_csv = df_merged_csv.withColumn("Crm Cd Desc", lower(col("Crm Cd Desc")))
df_merged_csv = df_merged_csv.withColumn("Status Desc", lower(col("Status Desc")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def query1_df(show_res=True):
    df_aggr_assault = df_merged_csv.filter(col("Crm Cd Desc").contains("aggravated assault"))
    
    # Divide each person to the group age
    df_age_groups = df_aggr_assault.withColumn(
        "Age Group",
        when(col("Vict Age") < 18, "Kids: < 18")
        .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults: 18-24")
        .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults: 25-64")
        .otherwise("Eldery People: > 64")
    )
    
    # Count each group age and sort it in desceding order
    age_group_counts = df_age_groups.groupBy("Age Group").agg(count("*").alias("Victim Count"))
    sorted_age_groups = age_group_counts.orderBy(col("Victim Count").desc())
    if show_res:
        sorted_age_groups.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query1_df(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query1_df()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.09554266929626465, 0.05651450157165527, 0.05931401252746582, 0.07477617263793945, 0.06181502342224121, 0.06507563591003418, 0.06657075881958008, 0.036954402923583984, 0.032987356185913086, 0.03697800636291504]
Average execution time: 0.058652853965759276
+-------------------+------------+
|          Age Group|Victim Count|
+-------------------+------------+
|      Adults: 25-64|      121093|
|Young Adults: 18-24|       33605|
|         Kids: < 18|       15928|
|Eldery People: > 64|        5985|
+-------------------+------------+

In [6]:
def query1_rdd(show_res=True):
    # Convert to rdd
    rdd = df_merged_csv.rdd
    
    # Find the tuples where are aggravated assault crimes
    filtered_rdd = rdd.filter(lambda row: "aggravated assault" in row["Crm Cd Desc"])

    # Divide each person to the group age
    age_group_rdd = filtered_rdd.map(lambda row: (
        "Kids" if row["Vict Age"] < 18 else
        "Young Adults" if 18 <= row["Vict Age"] <= 24 else
        "Adults" if 25 <= row["Vict Age"] <= 64 else
        "Elderly People",
        1  # Δημιουργούμε ένα ζεύγος (κατηγορία, 1)
    ))

    # Group by the group age
    grouped_rdd = age_group_rdd.reduceByKey(lambda x, y: x + y)

    # Order in desceding order
    sorted_rdd = grouped_rdd.sortBy(lambda x: x[1], ascending=False)

    # Collect all results
    results = sorted_rdd.collect()
    if show_res:
        # Εμφάνιση αποτελεσμάτων
        for age_group, Victim_count_ in results:
            print(f"Age Group: {age_group}, Victim Count: {Victim_count_}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query1_rdd(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query1_rdd()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[13.66264820098877, 5.616793155670166, 6.544066905975342, 6.268387794494629, 6.890103816986084, 4.886909246444702, 5.189765691757202, 6.201511383056641, 7.502830982208252, 6.399510145187378]
Average execution time: 6.9162527322769165
Age Group: Adults, Victim Count: 121093
Age Group: Young Adults, Victim Count: 33605
Age Group: Kids, Victim Count: 15928
Age Group: Elderly People, Victim Count: 5985

## Query 2 DataFrame


In [8]:
spark = SparkSession \
    .builder \
    .appName("Query 2 dataframe and SQL execution from csv") \
    .getOrCreate()

df1 = spark.read.csv('s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv', header=True)
df2 = spark.read.csv('s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv', header=True)

# Union all the data
df_merged_csv = df1.union(df2).dropDuplicates()

df_merged_csv = df_merged_csv.withColumn('Date Rptd', to_timestamp(df_merged_csv['Date Rptd'], 'MM/dd/yyyy hh:mm:ss a'))
df_merged_csv = df_merged_csv.withColumn('DATE OCC', to_timestamp(df_merged_csv['DATE OCC'], 'MM/dd/yyyy hh:mm:ss a'))
df_merged_csv = df_merged_csv.withColumn("Vict Age", col("Vict Age").cast("int"))
df_merged_csv = df_merged_csv.withColumn("LAT", col("LAT").cast("double"))
df_merged_csv = df_merged_csv.withColumn("LON", col("LON").cast("double"))
df_merged_csv = df_merged_csv.withColumn("Crm Cd Desc", lower(col("Crm Cd Desc")))
df_merged_csv = df_merged_csv.withColumn("Status Desc", lower(col("Status Desc")))

df_merged_csv = df_merged_csv.withColumn('Year', year('DATE OCC'))
df_merged_csv = df_merged_csv.withColumn('Year', year('DATE OCC'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def query_2_dataframe(df, show_res=True):
    
    # Find all the closed cased
    df_with_closed_case = df.withColumn("IsClosedCase", when(~col("Status Desc").isin("unk", "invest cont"), 1).otherwise(0))
    
    # Group by the Year and the Area 
    df_counts = df_with_closed_case.groupBy("Year", "AREA NAME").agg(count("*").alias("TotalCases"), sum("IsClosedCase").alias("ClosedCases"))
    
    # Found the percentage of the closed Rate per area
    df_counts = df_counts.withColumn("ClosedCaseRate", round((col("ClosedCases") / col("TotalCases")) * 100, 2))
    
    # Using windowing on the year we order on the desceding order using the close case order
    window_per_year = Window.partitionBy("Year").orderBy(col("ClosedCaseRate").desc())
    
    # Using the windowing we found the rank for each area
    df_ranked = df_counts.withColumn("Rank", row_number().over(window_per_year))
    
    # Keep the top 3 ranked area per Year
    df_top3 = df_ranked.filter(col("Rank") <= 3)
    
    # Ordering based the year on Asceding order and after that based on rank on asceding order
    df_sorted = df_top3.orderBy(col("Year").asc(), col("Rank").asc())
    if show_res:
        df_sorted.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_2_dataframe(df_merged_csv, show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_2_dataframe(df_merged_csv, show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.0997164249420166, 0.06797242164611816, 0.05283188819885254, 0.06730365753173828, 0.0676872730255127, 0.05988359451293945, 0.04870343208312988, 0.04543328285217285, 0.059610843658447266, 0.0611264705657959]
Average execution time: 0.06302692890167236
+----+-----------+----------+-----------+--------------+----+
|Year|  AREA NAME|TotalCases|ClosedCases|ClosedCaseRate|Rank|
+----+-----------+----------+-----------+--------------+----+
|2010|    Rampart|      8707|       2860|         32.85|   1|
|2010|    Olympic|      8764|       2762|         31.52|   2|
|2010|     Harbor|      9598|       2818|         29.36|   3|
|2011|    Olympic|      7988|       2799|         35.04|   1|
|2011|    Rampart|      8444|       2744|          32.5|   2|
|2011|     Harbor|      9841|       2806|         28.51|   3|
|2012|    Olympic|      8543|       2930|          34.3|   1|
|2012|    Rampart|      8626|       2800|         32.46|   2|
|2012|     Harbor|      9441|       2786|         29.51|   3|
|20

In [11]:
# Sql implementation
def query_2_SQL(df, show_res=True):
    # Created a tamplete view on the dataframe
    df.createOrReplaceTempView("CrimeData")
    # SQL Query
    final_query = """
    WITH CaseStats AS (
        SELECT 
            Year,
            `AREA NAME` AS AreaName,
            COUNT(*) AS TotalCases,
            SUM(CASE WHEN `Status Desc` NOT IN ('unk', 'invest cont') THEN 1 ELSE 0 END) AS ClosedCases,
            ROUND(SUM(CASE WHEN `Status Desc` NOT IN ('unk', 'invest cont') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) AS ClosedCaseRate
        FROM CrimeData
        GROUP BY Year, `AREA NAME`
    ),
    RankedStats AS (
        SELECT 
            Year,
            AreaName,
            TotalCases,
            ClosedCases,
            ClosedCaseRate,
            ROW_NUMBER() OVER (PARTITION BY Year ORDER BY ClosedCaseRate DESC) AS Rank
        FROM CaseStats
    )
    SELECT 
        Year,
        AreaName,
        TotalCases,
        ClosedCases,
        ClosedCaseRate,
        Rank
    FROM RankedStats
    WHERE Rank <= 3
    ORDER BY Year ASC, Rank ASC
    """

    # Execute the query
    df_result = spark.sql(final_query)

    if show_res:
        # Display the result
        df_result.show(50)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_2_SQL(df_merged_csv, show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_2_SQL(df_merged_csv, show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.24744963645935059, 0.04794168472290039, 0.039650917053222656, 0.032019615173339844, 0.038684844970703125, 0.039374589920043945, 0.039406776428222656, 0.03660869598388672, 0.034066200256347656, 0.03194475173950195]
Average execution time: 0.058714771270751955
+----+-----------+----------+-----------+--------------+----+
|Year|   AreaName|TotalCases|ClosedCases|ClosedCaseRate|Rank|
+----+-----------+----------+-----------+--------------+----+
|2010|    Rampart|      8707|       2860|         32.85|   1|
|2010|    Olympic|      8764|       2762|         31.52|   2|
|2010|     Harbor|      9598|       2818|         29.36|   3|
|2011|    Olympic|      7988|       2799|         35.04|   1|
|2011|    Rampart|      8444|       2744|         32.50|   2|
|2011|     Harbor|      9841|       2806|         28.51|   3|
|2012|    Olympic|      8543|       2930|         34.30|   1|
|2012|    Rampart|      8626|       2800|         32.46|   2|
|2012|     Harbor|      9441|       2786|         29.51|

In [13]:
group_number = "32"
s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_2_dataframe(df_merged_csv, show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_2_dataframe(df_merged_csv, show_res=True)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.09162378311157227, 0.05196857452392578, 0.05146384239196777, 0.06023216247558594, 0.040825605392456055, 0.04323244094848633, 0.03979945182800293, 0.053380727767944336, 0.041522979736328125, 0.03822016716003418]
Average execution time: 0.05122697353363037
+----+-----------+----------+-----------+--------------+----+
|Year|  AREA NAME|TotalCases|ClosedCases|ClosedCaseRate|Rank|
+----+-----------+----------+-----------+--------------+----+
|2010|    Rampart|      8707|       2860|         32.85|   1|
|2010|    Olympic|      8764|       2762|         31.52|   2|
|2010|     Harbor|      9598|       2818|         29.36|   3|
|2011|    Olympic|      7988|       2799|         35.04|   1|
|2011|    Rampart|      8444|       2744|          32.5|   2|
|2011|     Harbor|      9841|       2806|         28.51|   3|
|2012|    Olympic|      8543|       2930|          34.3|   1|
|2012|    Rampart|      8626|       2800|         32.46|   2|
|2012|     Harbor|      9441|       2786|         29.51|   3

#### Query 3

In [15]:
from sedona.spark import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
def query_3_join(show_res=True):
    spark = SparkSession.builder \
        .appName("Query 3") \
        .getOrCreate()

    sedona = SedonaContext.create(spark)

    # Read the file from s3
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                .option("multiLine", "true").load(geojson_path) \
                .selectExpr("explode(features) as features") \
                .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                .drop("properties") \
                .drop("type")
        
    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    # Keep only the data from Los Angeles and rename the Zip Code column name for better usabillity 
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                .withColumnRenamed("ZCTA10", "ZIP Code")

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")
    df_income_parquet.select("Zip Code", "Estimated Median Income")
    
    # Joining the 2010 census data for population with the 2015 income data based the Zip Code and Calculate the Estimated Income 
    df_join = flattend_df_LA.join(df_income_parquet,'Zip Code', "inner").withColumn("EstimatedIncome", col("Estimated Median Income")*col("HOUSING10")) \
                .groupBy("COMM") \
                .agg(ST_Union_Aggr("geometry").alias("geometry"), sum("POP_2010").alias("Population"), sum("EstimatedIncome").alias("EstimatedIncome")) \
                .withColumn("PersonalMeanIncome", col("EstimatedIncome")/col("Population"))
    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    
    # Each Lon and Lat point conver to ST_Point
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    # Join by find the closest area between the Lat and Lon on the polygon geom point 
    df_join2 = df_join.join(df_crime_Geom, ST_Within(df_crime_Geom.point, df_join.geometry), "inner")
    
    # Find the Total Crimes per area and calculate the Average Crimes per area
    df_final = df_join2.groupby("COMM", "PersonalMeanIncome", "Population") \
                    .agg(count("*").alias("TotalCrimes")) \
                    .withColumn("Average Crimes", col("TotalCrimes")/col("Population"))
    if show_res:
        df_final.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_3_join(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_3_join(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[5.360411882400513, 4.317512035369873, 4.002609968185425, 4.486432313919067, 3.5285050868988037, 4.4200356006622314, 3.5619730949401855, 3.511436939239502, 4.218752384185791, 3.783787727355957]
Average execution time: 4.119145703315735
+--------------------+------------------+----------+-----------+-------------------+
|                COMM|PersonalMeanIncome|Population|TotalCrimes|     Average Crimes|
+--------------------+------------------+----------+-----------+-------------------+
|   Faircrest Heights|20908.511762997387|      3443|       4408| 1.2802788266047052|
|           Los Feliz|30473.013522716217|     20558|      18659| 0.9076272010896002|
|    Cadillac-Corning|19572.784696174043|      6665|       4445| 0.6669167291822956|
|              Venice|47614.883340996166|     32625|      40942| 1.2549272030651342|
|      Vermont Square| 8329.565791341376|      7045|       5634| 0.7997161107168205|
|  Century Palms/Cove| 8552.220122507493|     30692|      38877| 1.2666818714974586|

In [18]:
from pyspark.sql.functions import broadcast
def query_3_broadcastJoin(show_res=True):
    spark = SparkSession.builder \
        .appName("Query 3 Broadcast Join") \
        .getOrCreate()

    sedona = SedonaContext.create(spark)

    # Read the file from s3
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                .option("multiLine", "true").load(geojson_path) \
                .selectExpr("explode(features) as features") \
                .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                .drop("properties") \
                .drop("type")
    
    # Keep only the data from Los Angeles and rename the Zip Code column name for better usabillity 
    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                .withColumnRenamed("ZCTA10", "ZIP Code")

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")
    df_income_parquet.select("Zip Code", "Estimated Median Income")
    
    # Apply broadcast join on the smallest file and estimate the Estimated Income
    df_join = flattend_df_LA.join(broadcast(df_income_parquet),'Zip Code', "inner").withColumn("EstimatedIncome", col("Estimated Median Income")*col("HOUSING10")) \
                .groupBy("COMM") \
                .agg(ST_Union_Aggr("geometry").alias("geometry"), sum("POP_2010").alias("Population"), sum("EstimatedIncome").alias("EstimatedIncome")) \
                .withColumn("PersonalMeanIncome", col("EstimatedIncome")/col("Population"))
    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    # Each Lon and Lat point conver to ST_Point
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    # Join by find the closest area between the Lat and Lon on the polygon geom point 
    df_join2 = df_join.join(df_crime_Geom, ST_Within(df_crime_Geom.point, df_join.geometry), "inner") 
    
    # Find the Total Crimes per area and calculate the Average Crimes per area
    df_final = df_join2.groupby("COMM", "PersonalMeanIncome", "Population") \
                    .agg(count("*").alias("TotalCrimes")) \
                    .withColumn("Average Crimes", col("TotalCrimes")/col("Population"))
    if show_res:
        df_final.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_3_broadcastJoin(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_3_broadcastJoin(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[4.014760971069336, 2.9587676525115967, 3.063523530960083, 3.651716709136963, 3.177194118499756, 2.878145694732666, 2.901812791824341, 3.028357982635498, 3.127319097518921, 4.298944473266602]
Average execution time: 3.310054302215576
+--------------------+------------------+----------+-----------+-------------------+
|                COMM|PersonalMeanIncome|Population|TotalCrimes|     Average Crimes|
+--------------------+------------------+----------+-----------+-------------------+
|   Faircrest Heights|20908.511762997387|      3443|       4408| 1.2802788266047052|
|           Los Feliz|30473.013522716217|     20558|      18659| 0.9076272010896002|
|    Cadillac-Corning|19572.784696174043|      6665|       4445| 0.6669167291822956|
|              Venice|47614.883340996166|     32625|      40942| 1.2549272030651342|
|      Vermont Square| 8329.565791341376|      7045|       5634| 0.7997161107168205|
|  Century Palms/Cove| 8552.220122507493|     30692|      38877| 1.2666818714974586|
|

In [20]:
def query_3_MergeJoin(show_res=True):
    spark = SparkSession.builder \
        .appName("Query 3 Broadcast Join") \
        .getOrCreate()

    sedona = SedonaContext.create(spark)

    # Read the file from s3
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                .option("multiLine", "true").load(geojson_path) \
                .selectExpr("explode(features) as features") \
                .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                .drop("properties") \
                .drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                .withColumnRenamed("ZCTA10", "ZIP Code")

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")
    df_income_parquet.select("Zip Code", "Estimated Median Income")
    
    # Sort each data based the column name join atributed and apply the join (Sort merge join)
    flattend_df_LA_sorted = flattend_df_LA.sort("ZIP Code")
    df_income_parquet_sorted = df_income_parquet.sort("Zip Code")
    df_join = flattend_df_LA_sorted.join(df_income_parquet_sorted,'Zip Code', "inner").withColumn("EstimatedIncome", col("Estimated Median Income")*col("HOUSING10")) \
                .groupBy("COMM") \
                .agg(ST_Union_Aggr("geometry").alias("geometry"), sum("POP_2010").alias("Population"), sum("EstimatedIncome").alias("EstimatedIncome")) \
                .withColumn("PersonalMeanIncome", col("EstimatedIncome")/col("Population"))
    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    # Each Lon and Lat point conver to ST_Point
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    # Join by find the closest area between the Lat and Lon on the polygon geom point 
    df_join2 = df_join.join(df_crime_Geom, ST_Within(df_crime_Geom.point, df_join.geometry), "inner")

    df_final = df_join2.groupby("COMM", "PersonalMeanIncome", "Population") \
                    .agg(count("*").alias("TotalCrimes")) \
                    .withColumn("Average Crimes", col("TotalCrimes")/col("Population"))
    if show_res:
        df_final.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_3_MergeJoin(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_3_MergeJoin(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[2.78084135055542, 3.0124707221984863, 3.4125232696533203, 2.753401279449463, 3.6871533393859863, 3.2210469245910645, 3.0289978981018066, 3.019221305847168, 2.8032970428466797, 3.1123452186584473]
Average execution time: 3.0831298351287844
+--------------------+------------------+----------+-----------+-------------------+
|                COMM|PersonalMeanIncome|Population|TotalCrimes|     Average Crimes|
+--------------------+------------------+----------+-----------+-------------------+
|   Faircrest Heights|20908.511762997387|      3443|       4408| 1.2802788266047052|
|           Los Feliz|30473.013522716217|     20558|      18659| 0.9076272010896002|
|    Cadillac-Corning|19572.784696174043|      6665|       4445| 0.6669167291822956|
|              Venice|47614.883340996166|     32625|      40942| 1.2549272030651342|
|      Vermont Square| 8329.565791341376|      7045|       5634| 0.7997161107168205|
|  Century Palms/Cove| 8552.220122507493|     30692|      38877| 1.2666818714974

In [22]:
def query3_HashJoin(show_res):
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("Query 3 SHUFFLE_REPLICATE_NL Join") \
        .config("spark.sql.autoBroadcastJoinThreshold", -1) \
        .getOrCreate()

    sedona = SedonaContext.create(spark)

    # Read the file from S3
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
        .option("multiLine", "true").load(geojson_path) \
        .selectExpr("explode(features) as features") \
        .select("features.*")

    # Formatting
    flattened_df = blocks_df.select(
        [col(f"properties.{col_name}").alias(col_name)
         for col_name in blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
    ).drop("properties").drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
        .withColumnRenamed("ZCTA10", "ZIP Code")

    group_number = "32"
    s3_path = f"s3://groups-bucket-dblab-905418150721/group{group_number}/assigment_parquet_files/"

    df_income_parquet = spark.read.parquet(s3_path + "LA_Income.parquet")
    df_income_parquet = df_income_parquet.select("Zip Code", "Estimated Median Income")

    # Repartition both DataFrames by the join key
    flattend_df_LA_partitioned = flattend_df_LA.repartition("ZIP Code")
    df_income_parquet_partitioned = df_income_parquet.repartition("Zip Code")

    # Perform the shuffle hash join using the repartition on the key
    df_join = flattend_df_LA_partitioned.join(
        df_income_parquet_partitioned,
        flattend_df_LA_partitioned["ZIP Code"] == df_income_parquet_partitioned["Zip Code"],
        "inner").drop("Zip Code").withColumn("EstimatedIncome", col("Estimated Median Income") * col("HOUSING10")).groupBy("COMM").agg(ST_Union_Aggr("geometry").alias("geometry"),
        sum("POP_2010").alias("Population"),
        sum("EstimatedIncome").alias("EstimatedIncome")).withColumn("PersonalMeanIncome", col("EstimatedIncome") / col("Population"))


    # Read crime data parquet file
    df_parquet_file = spark.read.parquet(s3_path + "criminal_parquet.parquet")

    # Create geometry column for crime data
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))

    # Join by find the closest area between the Lat and Lon on the polygon geom point 
    df_join2 = df_join.join(df_crime_Geom, ST_Within(df_crime_Geom.point, df_join.geometry), "inner")

    # Final aggregation
    df_final = df_join2.groupby("COMM", "PersonalMeanIncome", "Population") \
        .agg(count("*").alias("TotalCrimes")) \
        .withColumn("Average Crimes", col("TotalCrimes") / col("Population"))

    if show_res:
        df_final.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# times_lst = []
# for i in range(10):
#     start_time = time.time()
#     query3_HashJoin(show_res=False)
#     times_lst.append(time.time() - start_time)
# print(times_lst)
# print(f'Average execution time: {np.mean(times_lst)}')
# query3_HashJoin(show_res=True)

start_time = time.time()
query3_HashJoin(show_res=True)
print(f'Exec Time: {time.time() - start_time}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+----------+-----------+-------------------+
|                COMM|PersonalMeanIncome|Population|TotalCrimes|     Average Crimes|
+--------------------+------------------+----------+-----------+-------------------+
|         Westchester|  33988.4574421559|     48017|      57124| 1.1896619947102067|
|             Del Rey|33675.359382921706|     27614|      15432| 0.5588469616861013|
|         Playa Vista| 50264.47187990141|      8926|       7281| 0.8157069235939951|
|       Playa Del Rey|   45522.596580114|      3158|       2454| 0.7770740975300824|
|        Harbor Pines|  19236.6966651439|      2189|       1220| 0.5573321151210598|
|          Wilmington|11169.220314136126|     52525|      39582| 0.7535840076154212|
|         Harbor City| 21383.02221233359|     26967|      17331| 0.6426743797975303|
|      Harbor Gateway|16154.823549930237|     40136|      26302| 0.6553219055212278|
|   Faircrest Heights|20908.511762997387|      3443|       4408| 

In [24]:
def query3_SHUFFLE_REPLICATE_NL_Join(show_res):
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("Query 3 SHUFFLE_REPLICATE_NL Join") \
        .config("spark.sql.autoBroadcastJoinThreshold", -1) \
        .getOrCreate()

    sedona = SedonaContext.create(spark)

    # Read the file from S3
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
        .option("multiLine", "true").load(geojson_path) \
        .selectExpr("explode(features) as features") \
        .select("features.*")

    # Formatting
    flattened_df = blocks_df.select(
        [col(f"properties.{col_name}").alias(col_name)
         for col_name in blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
    ).drop("properties").drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
        .withColumnRenamed("ZCTA10", "ZIP Code")

    group_number = "32"
    s3_path = f"s3://groups-bucket-dblab-905418150721/group{group_number}/assigment_parquet_files/"
    df_income_parquet = spark.read.parquet(s3_path + "LA_Income.parquet")
    df_income_parquet = df_income_parquet.select("Zip Code", "Estimated Median Income")

    # Join flattened blocks data with income data
    df_join = flattend_df_LA.join(df_income_parquet,'Zip Code', "inner").withColumn("EstimatedIncome", col("Estimated Median Income")*col("HOUSING10")) \
                .groupBy("COMM") \
                .agg(ST_Union_Aggr("geometry").alias("geometry"), sum("POP_2010").alias("Population"), sum("EstimatedIncome").alias("EstimatedIncome")) \
                .withColumn("PersonalMeanIncome", col("EstimatedIncome")/col("Population"))

    df_parquet_file = spark.read.parquet(s3_path + "criminal_parquet.parquet")

    # Create geometry column for crime data
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    
    # # Join by find the closest area between the Lat and Lon on the polygon geom point 
    # Perform SHUFFLE_REPLICATE_NL Join (CROSS JOIN + Filter)
    df_join2 = df_join.crossJoin(df_crime_Geom) \
        .filter(ST_Within(df_crime_Geom.point, df_join.geometry))

    # Final aggregation
    df_final = df_join2.groupby("COMM", "PersonalMeanIncome", "Population") \
        .agg(count("*").alias("TotalCrimes")) \
        .withColumn("Average Crimes", col("TotalCrimes") / col("Population"))
    if show_res:
        df_final.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query3_SHUFFLE_REPLICATE_NL_Join(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query3_SHUFFLE_REPLICATE_NL_Join(show_res=True)

start_time = time.time()
query3_SHUFFLE_REPLICATE_NL_Join(show_res=True)
print(f'Exec Time: {time.time() - start_time}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[2.9575493335723877, 2.9806854724884033, 3.085684061050415, 2.6318600177764893, 3.2133727073669434, 2.8572897911071777, 2.9764645099639893, 2.8152236938476562, 3.032799482345581, 2.859243392944336]
Average execution time: 2.941017246246338
+--------------------+------------------+----------+-----------+-------------------+
|                COMM|PersonalMeanIncome|Population|TotalCrimes|     Average Crimes|
+--------------------+------------------+----------+-----------+-------------------+
|   Faircrest Heights|20908.511762997387|      3443|       4408| 1.2802788266047052|
|    Cadillac-Corning|19572.784696174043|      6665|       4445| 0.6669167291822956|
|      Vermont Square| 8329.565791341376|      7045|       5634| 0.7997161107168205|
|            Mid-city| 21734.64899923286|     14339|      11574| 0.8071692586651789|
|        Harbor Pines|  19236.6966651439|      2189|       1220| 0.5573321151210598|
|  Cloverdale/Cochran|14660.930872291905|     14032|       9934| 0.7079532497149

### Query 4 

In [26]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1",
        "spark.driver.memory": "2g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2888,application_1732639283265_2847,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [27]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_4(show_res=True):
    spark = SparkSession.builder.appName("Query4_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                    .option("multiLine", "true").load(geojson_path) \
                    .selectExpr("explode(features) as features") \
                    .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                    .drop("properties") \
                    .drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                    .withColumnRenamed("ZCTA10", "ZIP Code")

    df_re = spark.read.parquet(s3_path+"RE_codes.parquet")
    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")

    # Find the zip code of the area for each crime 
    df_parquet_file = df_parquet_file.filter(col("Vict Descent").isNotNull())
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    df_join = flattend_df_LA.join(df_crime_Geom, ST_Within(df_crime_Geom.point, flattend_df_LA.geometry), "inner")
    df_join_select = df_join.select("Zip Code", "LAT", "LON", "Vict Descent")

    # For each Area count the amount of crime per Race
    crime_data_per_zip = df_join_select.groupBy("Zip Code", "Vict Descent").count()
    crime_data_per_zip = crime_data_per_zip.filter(col("Vict Descent").isNotNull())
    # Keep only the LA columns
    sorted_LA_income = (df_income_parquet.filter(df_income_parquet["Community"].like("%Los Angeles (%"))
                        .orderBy(col("Estimated Median Income").desc()))

    # Find the top 3 area with the income 
    top_income_zip = sorted_LA_income.limit(3).select("Zip Code")
    # Find the last 3 lest income areas
    bottom_income_zip = sorted_LA_income.orderBy(col("Estimated Median Income"), ascending=True).limit(3).select("Zip Code")
    top_victims = top_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())
    bottom_victims = bottom_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())

    top_income_victims_res = top_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())
    bottom_income_victims_res = bottom_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())

    if show_res:
        print('Top income victims')
        top_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

        print('Lowest income victims')
        bottom_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
import time
times_lst = []
for i in range(10):
    start_time = time.time()
    query_4(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_4(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[19.11129331588745, 11.547914028167725, 9.506970882415771, 8.310539960861206, 6.652687311172485, 5.815526485443115, 5.405937671661377, 6.302911996841431, 4.375793218612671, 4.9750611782073975]
Average execution time: 8.200463604927062
Top income victims
+--------+--------------------+-----+
|Zip Code|   Vict Descent Full|count|
+--------+--------------------+-----+
|   90077|               White| 1887|
|   90077|               Other|  585|
|   90077|Hispanic/Latin/Me...|  286|
|   90077|               Black|  136|
|   90077|             Unknown|  121|
|   90077|         Other Asian|  113|
|   90077|             Chinese|    6|
|   90077|              Korean|    4|
|   90077|            Japanese|    2|
|   90077|American Indian/A...|    1|
|   90077|         AsianIndian|    1|
|   90077|            Hawaiian|    1|
|   90077|            Filipino|    1|
|   90272|               White| 6090|
|   90272|               Other|  818|
|   90272|Hispanic/Latin/Me...|  610|
|   90272|             U

In [29]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2",
        "spark.driver.memory": "4g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2889,application_1732639283265_2848,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [30]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_4(show_res=True):
    spark = SparkSession.builder.appName("Query4_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                    .option("multiLine", "true").load(geojson_path) \
                    .selectExpr("explode(features) as features") \
                    .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                    .drop("properties") \
                    .drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                    .withColumnRenamed("ZCTA10", "ZIP Code")

    df_re = spark.read.parquet(s3_path+"RE_codes.parquet")
    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")

    # Find the zip code of the area for each crime 
    df_parquet_file = df_parquet_file.filter(col("Vict Descent").isNotNull())
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    df_join = flattend_df_LA.join(df_crime_Geom, ST_Within(df_crime_Geom.point, flattend_df_LA.geometry), "inner")
    df_join_select = df_join.select("Zip Code", "LAT", "LON", "Vict Descent")

    # For each Area count the amount of crime per Race
    crime_data_per_zip = df_join_select.groupBy("Zip Code", "Vict Descent").count()
    crime_data_per_zip = crime_data_per_zip.filter(col("Vict Descent").isNotNull())
    # Keep only the LA columns
    sorted_LA_income = (df_income_parquet.filter(df_income_parquet["Community"].like("%Los Angeles (%"))
                        .orderBy(col("Estimated Median Income").desc()))

    # Find the top 3 area with the income 
    top_income_zip = sorted_LA_income.limit(3).select("Zip Code")
    # Find the last 3 lest income areas
    bottom_income_zip = sorted_LA_income.orderBy(col("Estimated Median Income"), ascending=True).limit(3).select("Zip Code")
    top_victims = top_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())
    bottom_victims = bottom_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())

    top_income_victims_res = top_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())
    bottom_income_victims_res = bottom_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())

    if show_res:
        print('Top income victims')
        top_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

        print('Lowest income victims')
        bottom_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_4(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_4(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[18.081260681152344, 10.015167474746704, 10.062018394470215, 9.04055404663086, 7.7061052322387695, 5.059884548187256, 5.351187467575073, 4.876883268356323, 5.271404266357422, 5.966294288635254]
Average execution time: 8.143075966835022
Top income victims
+--------+--------------------+-----+
|Zip Code|   Vict Descent Full|count|
+--------+--------------------+-----+
|   90077|               White| 1887|
|   90077|               Other|  585|
|   90077|Hispanic/Latin/Me...|  286|
|   90077|               Black|  136|
|   90077|             Unknown|  121|
|   90077|         Other Asian|  113|
|   90077|             Chinese|    6|
|   90077|              Korean|    4|
|   90077|            Japanese|    2|
|   90077|American Indian/A...|    1|
|   90077|         AsianIndian|    1|
|   90077|            Hawaiian|    1|
|   90077|            Filipino|    1|
|   90272|               White| 6090|
|   90272|               Other|  818|
|   90272|Hispanic/Latin/Me...|  610|
|   90272|             

In [32]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4",
        "spark.driver.memory": "4g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2890,application_1732639283265_2849,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [33]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_4(show_res=True):
    spark = SparkSession.builder.appName("Query4_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = sedona.read.format("geojson") \
                    .option("multiLine", "true").load(geojson_path) \
                    .selectExpr("explode(features) as features") \
                    .select("features.*")
    # Formatting magic
    flattened_df = blocks_df.select( \
                    [col(f"properties.{col_name}").alias(col_name) for col_name in \
                    blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
                    .drop("properties") \
                    .drop("type")

    flattend_df = flattened_df.withColumn("ZCTA10", col("ZCTA10").cast("int"))
    flattend_df_LA = flattened_df.filter(col("CITY") == "Los Angeles") \
                    .withColumnRenamed("ZCTA10", "ZIP Code")

    df_re = spark.read.parquet(s3_path+"RE_codes.parquet")
    df_income_parquet = spark.read.parquet(s3_path+"LA_Income.parquet")

    # Find the zip code of the area for each crime 
    df_parquet_file = df_parquet_file.filter(col("Vict Descent").isNotNull())
    df_crime_Geom = df_parquet_file.withColumn("point", ST_Point("LON", "LAT"))
    df_join = flattend_df_LA.join(df_crime_Geom, ST_Within(df_crime_Geom.point, flattend_df_LA.geometry), "inner")
    df_join_select = df_join.select("Zip Code", "LAT", "LON", "Vict Descent")

    # For each Area count the amount of crime per Race
    crime_data_per_zip = df_join_select.groupBy("Zip Code", "Vict Descent").count()
    crime_data_per_zip = crime_data_per_zip.filter(col("Vict Descent").isNotNull())
    # Keep only the LA columns
    sorted_LA_income = (df_income_parquet.filter(df_income_parquet["Community"].like("%Los Angeles (%"))
                        .orderBy(col("Estimated Median Income").desc()))

    # Find the top 3 area with the income 
    top_income_zip = sorted_LA_income.limit(3).select("Zip Code")
    # Find the last 3 lest income areas
    bottom_income_zip = sorted_LA_income.orderBy(col("Estimated Median Income"), ascending=True).limit(3).select("Zip Code")
    top_victims = top_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())
    bottom_victims = bottom_income_zip.join(crime_data_per_zip, "Zip Code").orderBy(col("count").desc())

    top_income_victims_res = top_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())
    bottom_income_victims_res = bottom_victims.join(df_re, "Vict Descent", "inner").orderBy('Zip Code', col('count').desc())

    if show_res:
        print('Top income victims')
        top_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

        print('Lowest income victims')
        bottom_income_victims_res.select("Zip Code", "Vict Descent Full", "count").show(200)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_4(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_4(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[16.47685956954956, 5.634891033172607, 6.166682004928589, 5.679811239242554, 3.4968721866607666, 4.055256605148315, 4.443585634231567, 3.9509100914001465, 5.057728052139282, 4.659974813461304]
Average execution time: 5.962257122993469
Top income victims
+--------+--------------------+-----+
|Zip Code|   Vict Descent Full|count|
+--------+--------------------+-----+
|   90077|               White| 1887|
|   90077|               Other|  585|
|   90077|Hispanic/Latin/Me...|  286|
|   90077|               Black|  136|
|   90077|             Unknown|  121|
|   90077|         Other Asian|  113|
|   90077|             Chinese|    6|
|   90077|              Korean|    4|
|   90077|            Japanese|    2|
|   90077|American Indian/A...|    1|
|   90077|         AsianIndian|    1|
|   90077|            Hawaiian|    1|
|   90077|            Filipino|    1|
|   90272|               White| 6090|
|   90272|               Other|  818|
|   90272|Hispanic/Latin/Me...|  610|
|   90272|             U

### Query 5 

In [35]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.memory": "8g",
        "spark.executor.cores": "4",
        "spark.driver.memory": "4g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2892,application_1732639283265_2851,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [36]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_5(show_res=True):
    
    spark = SparkSession.builder.appName("Query5_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    
    df_Police_stations_parquet_file = spark.read.parquet(s3_path+"LA_Police_Stations.parquet")
    df_filtered_crimes = df_parquet_file.filter(
        (col('Weapon Used Cd') >= 100) &
        (col('Weapon Used Cd') < 200) &
        (col('LAT') != 0.0) &
        (col('LON') != 0.0)
    )

    df_joined = df_parquet_file.join(
        df_Police_stations_parquet_file,
        df_parquet_file["AREA"] == df_Police_stations_parquet_file["PREC"],
        "inner"
    )

    df_GeomPoints = df_joined.withColumn("pointPoliceStation", ST_Point("Y", "X")).withColumn("pointCrime", ST_Point("LAT", "LON"))
    df_distance = df_GeomPoints.withColumn("distance", ST_DistanceSphere("pointPoliceStation", "pointCrime")/1000) # divide with 1000 to conver into km
    df_distance = df_distance.select(
        col("LAT").alias("crime_LAT"),
        col("LON").alias("crime_LON"),
        col("Y").alias("station_LAT"),
        col("X").alias("station_LON"),
        col("DIVISION").alias("division"),
        col("distance")
    )

    # Calculate average distance by division
    average_distance_by_division = df_distance.groupBy("division").agg(
        avg("distance").alias("average_distance"),
        count("*").alias("number_of_incidents")
    ).orderBy(desc("number_of_incidents"))

    if show_res:
        average_distance_by_division.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_5(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_5(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[9.030415296554565, 1.5945732593536377, 1.3192253112792969, 0.9136602878570557, 0.852773904800415, 0.9498627185821533, 0.7466731071472168, 0.7304801940917969, 0.8031618595123291, 0.9451959133148193]
Average execution time: 1.7886021852493286
+----------------+------------------+-------------------+
|        division|  average_distance|number_of_incidents|
+----------------+------------------+-------------------+
|     77TH STREET|14.653158550211844|             206981|
|       SOUTHWEST| 12.00476110447696|             192367|
|         PACIFIC|22.692338743002296|             171166|
|         CENTRAL| 19.70834405698613|             166946|
| NORTH HOLLYWOOD|15.639629476751265|             164710|
|       SOUTHEAST| 17.91416858507957|             161256|
|       HOLLYWOOD| 34.06244353940189|             151053|
|          NEWTON|12.480201162658012|             148886|
|         OLYMPIC|  16.4541969949399|             145135|
|         MISSION| 18.63241087118478|             143777|
|   

In [38]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2",
        "spark.driver.memory": "4g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2893,application_1732639283265_2852,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [39]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_5(show_res=True):
    
    spark = SparkSession.builder.appName("Query5_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    
    df_Police_stations_parquet_file = spark.read.parquet(s3_path+"LA_Police_Stations.parquet")
    df_filtered_crimes = df_parquet_file.filter(
        (col('Weapon Used Cd') >= 100) &
        (col('Weapon Used Cd') < 200) &
        (col('LAT') != 0.0) &
        (col('LON') != 0.0)
    )

    df_joined = df_parquet_file.join(
        df_Police_stations_parquet_file,
        df_parquet_file["AREA"] == df_Police_stations_parquet_file["PREC"],
        "inner"
    )

    df_GeomPoints = df_joined.withColumn("pointPoliceStation", ST_Point("Y", "X")).withColumn("pointCrime", ST_Point("LAT", "LON"))
    df_distance = df_GeomPoints.withColumn("distance", ST_DistanceSphere("pointPoliceStation", "pointCrime")/1000) # divide with 1000 to conver into km
    df_distance = df_distance.select(
        col("LAT").alias("crime_LAT"),
        col("LON").alias("crime_LON"),
        col("Y").alias("station_LAT"),
        col("X").alias("station_LON"),
        col("DIVISION").alias("division"),
        col("distance")
    )

    # Calculate average distance by division
    average_distance_by_division = df_distance.groupBy("division").agg(
        avg("distance").alias("average_distance"),
        count("*").alias("number_of_incidents")
    ).orderBy(desc("number_of_incidents"))

    if show_res:
        average_distance_by_division.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_5(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_5(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[9.772949457168579, 0.9416513442993164, 1.0624849796295166, 1.0719728469848633, 0.8171465396881104, 0.815178632736206, 0.8497989177703857, 0.7264482975006104, 0.7394587993621826, 0.8641884326934814]
Average execution time: 1.7661278247833252
+----------------+------------------+-------------------+
|        division|  average_distance|number_of_incidents|
+----------------+------------------+-------------------+
|     77TH STREET|14.653158550211842|             206981|
|       SOUTHWEST| 12.00476110447696|             192367|
|         PACIFIC|22.692338743002292|             171166|
|         CENTRAL| 19.70834405698613|             166946|
| NORTH HOLLYWOOD|15.639629476751265|             164710|
|       SOUTHEAST| 17.91416858507957|             161256|
|       HOLLYWOOD| 34.06244353940189|             151053|
|          NEWTON|12.480201162658012|             148886|
|         OLYMPIC|  16.4541969949399|             145135|
|         MISSION|18.632410871184778|             143777|
|   

In [41]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.memory": "2g",
        "spark.executor.cores": "1",
        "spark.driver.memory": "2g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2894,application_1732639283265_2853,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2762,application_1732639283265_2721,pyspark,idle,Link,Link,,
2767,application_1732639283265_2726,pyspark,idle,Link,Link,,
2773,application_1732639283265_2732,pyspark,idle,Link,Link,,
2776,application_1732639283265_2735,pyspark,idle,Link,Link,,
2779,application_1732639283265_2738,pyspark,idle,Link,Link,,
2780,application_1732639283265_2739,pyspark,idle,Link,Link,,
2783,application_1732639283265_2742,pyspark,idle,Link,Link,,
2784,application_1732639283265_2743,pyspark,idle,Link,Link,,
2786,application_1732639283265_2745,pyspark,idle,Link,Link,,


In [42]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import to_date, to_timestamp, col, year, month, desc, rank, when, count, lower,row_number, sum, round, avg
import time
import numpy as np

def query_5(show_res=True):
    
    spark = SparkSession.builder.appName("Query5_DataFrame").getOrCreate()
    sedona = SedonaContext.create(spark)

    group_number = "32"
    s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/assigment_parquet_files/"


    df_parquet_file = spark.read.parquet(s3_path+"criminal_parquet.parquet")
    
    df_Police_stations_parquet_file = spark.read.parquet(s3_path+"LA_Police_Stations.parquet")
    df_filtered_crimes = df_parquet_file.filter(
        (col('Weapon Used Cd') >= 100) &
        (col('Weapon Used Cd') < 200) &
        (col('LAT') != 0.0) &
        (col('LON') != 0.0)
    )

    df_joined = df_parquet_file.join(
        df_Police_stations_parquet_file,
        df_parquet_file["AREA"] == df_Police_stations_parquet_file["PREC"],
        "inner"
    )

    df_GeomPoints = df_joined.withColumn("pointPoliceStation", ST_Point("Y", "X")).withColumn("pointCrime", ST_Point("LAT", "LON"))
    df_distance = df_GeomPoints.withColumn("distance", ST_DistanceSphere("pointPoliceStation", "pointCrime")/1000) # divide with 1000 to conver into km
    df_distance = df_distance.select(
        col("LAT").alias("crime_LAT"),
        col("LON").alias("crime_LON"),
        col("Y").alias("station_LAT"),
        col("X").alias("station_LON"),
        col("DIVISION").alias("division"),
        col("distance")
    )

    # Calculate average distance by division
    average_distance_by_division = df_distance.groupBy("division").agg(
        avg("distance").alias("average_distance"),
        count("*").alias("number_of_incidents")
    ).orderBy(desc("number_of_incidents"))

    if show_res:
        average_distance_by_division.show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
times_lst = []
for i in range(10):
    start_time = time.time()
    query_5(show_res=False)
    times_lst.append(time.time() - start_time)
print(times_lst)
print(f'Average execution time: {np.mean(times_lst)}')
query_5(show_res=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[10.835430145263672, 5.542808532714844, 5.285247325897217, 6.0536949634552, 4.799799680709839, 5.154601097106934, 3.6206283569335938, 4.260501384735107, 3.1572768688201904, 3.575212240219116]
Average execution time: 5.228520059585572
+----------------+------------------+-------------------+
|        division|  average_distance|number_of_incidents|
+----------------+------------------+-------------------+
|     77TH STREET|14.653158550211728|             206981|
|       SOUTHWEST|12.004761104476955|             192367|
|         PACIFIC|   22.692338743002|             171166|
|         CENTRAL| 19.70834405698558|             166946|
| NORTH HOLLYWOOD|15.639629476751335|             164710|
|       SOUTHEAST|17.914168585079434|             161256|
|       HOLLYWOOD| 34.06244353940196|             151053|
|          NEWTON|12.480201162658124|             148886|
|         OLYMPIC|16.454196994939856|             145135|
|         MISSION|18.632410871185165|             143777|
|       NORT

## 