In [1]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, year, count, when, desc, sum, to_timestamp, row_number, regexp_replace, expr, asc
from pyspark.sql.types import DecimalType
from pyspark.sql import functions as F
from sedona.spark import *
import itertools

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3777,application_1732639283265_3721,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
### QUERY 3
APP_NAME = "(Income/person and crimes/person) / region"
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

sedona = SedonaContext.create(spark)

# crime data
d1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True)
d2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True)
crime_data = d1.union(d2)

# remove NULL ISLAND (0,0)
crime_data = crime_data.filter(~((col('LAT') == 0) & (col('LON') == 0))) \
        .select('DR_NO', 'LAT', 'LON').withColumn('geometry', ST_Point('LON', 'LAT')).drop('LON').drop('LAT')

# census block data
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
census_blocks = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
census_blocks = census_blocks.select(
            [col(f"properties.{col_name}").alias(col_name) for col_name in
            census_blocks.schema["properties"].dataType.fieldNames()] + ["geometry"],).drop("properties").drop("type")
census_blocks = census_blocks.filter((census_blocks['HOUSING10'] >= 0) & (census_blocks['POP_2010'] >= 0)).filter(F.col('CITY') == 'Los Angeles')

# median household income data
income_data = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721//LA_income_2015.csv",
    header=True, inferSchema=True)

income_data = income_data.withColumn(
    "median_hincome",
    regexp_replace(col("Estimated Median Income"), "[$,]", "").cast(DecimalType())
).drop('Estimated Median Income', 'Community')

# DATAFRAME API BEGIN #
start = time.time()
# crime data 
comm_join_crimes = crime_data.join(census_blocks, ST_Within(crime_data['geometry'], census_blocks['geometry']))
count_comm_crimes = comm_join_crimes.groupby('COMM').agg(F.count('*').alias('crimes_count'))
#income data 
zip_comm_houses_pop = census_blocks.groupBy('ZCTA10', 'COMM').agg(
    sum(col('POP_2010')).alias('population'),
    sum(col('HOUSING10')).alias('houses'),
).select('ZCTA10', 'COMM', 'population', 'houses')
# joined with median household income
zip_comm_houses_pop_hincome = zip_comm_houses_pop.join(
    income_data, 
    income_data['Zip Code'] == zip_comm_houses_pop['ZCTA10']
).drop('Zip Code')
# zip_total_income = houses * median_hincome per (zip, comm)
zip_comm_total_income = zip_comm_houses_pop_hincome.withColumn(
    "zip_total_income",
    col('median_hincome') * col('houses')
)
# total_population per COMM
comm_total_population_total_income = zip_comm_total_income.groupBy('COMM').agg(
    sum('population').alias('total_population'),
    sum('zip_total_income').alias('total_income')
)
# join crime and income data
comm_crime_income = comm_total_population_total_income.join(
    count_comm_crimes,
    on='COMM',
    how='right'
)
comm_mincome_person = comm_crime_income.withColumn(
    'average_income_per_person',
    when(col('total_population') > 0, col('total_income') / col('total_population'))
    .otherwise(0)).withColumn(
    'crime_rate_per_person',
    when(col('total_population') > 0, col('crimes_count') / col('total_population'))
    .otherwise(0)
).drop('total_income').drop('crimes_count').drop('total_population')
comm_mincome_person_collect = comm_mincome_person.collect()
end = time.time()
print("DATAFRAME performance:", end-start)
comm_mincome_person.show(n=comm_mincome_person.count(), truncate=False)
# DATAFRAME API END #

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DATAFRAME performance: 29.66372036933899
+-----------------------+-------------------------+---------------------+
|COMM                   |average_income_per_person|crime_rate_per_person|
+-----------------------+-------------------------+---------------------+
|San Pedro              |24244.561240             |0.7751021460858706   |
|South Park             |6943.255564              |0.8862840357021805   |
|Vernon Central         |6624.199012              |0.7288347583605012   |
|Florence-Firestone     |8079.274829              |1.1133874146386178   |
|Studio City            |44049.852630             |0.9295754238516157   |
|Valley Village         |28191.368349             |0.6809068681929318   |
|Sherman Oaks           |37767.445674             |0.7903687241383545   |
|Valley Glen            |18271.771022             |0.5546786523216308   |
|Park La Brea           |36619.900017             |1.025207944321847    |
|Miracle Mile           |38834.846111             |0.8820452139253908  

In [3]:
### QUERY 3 (join strategies)
def execute_query_with_join_strategy(strategy1, strategy2, strategy3):
    start = time.time()

    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.join.preferSortMergeJoin", "true" if strategy1 == "MERGE" else "false")
    spark.conf.set("spark.sql.join.preferBroadcastJoin", "true" if strategy1 == "BROADCAST" else "false")

    # crime_data join census_blocks (strategy1)
    comm_join_crimes = crime_data.join(
        census_blocks.hint(strategy1),
        ST_Within(crime_data['geometry'], census_blocks['geometry'])
    )
    count_comm_crimes = comm_join_crimes.groupby('COMM').agg(F.count('*').alias('crimes_count'))

    # zip_comm_houses_pop_hincome join income_data (strategy2)
    zip_comm_houses_pop = census_blocks.groupBy('ZCTA10', 'COMM').agg(
        sum(col('POP_2010')).alias('population'),
        sum(col('HOUSING10')).alias('houses'),
    ).select('ZCTA10', 'COMM', 'population', 'houses')

    zip_comm_houses_pop_hincome = zip_comm_houses_pop.join(
        income_data.hint(strategy2),
        income_data['Zip Code'] == zip_comm_houses_pop['ZCTA10']
    ).drop('Zip Code')

    # zip_comm_total_income (strategy3)
    zip_comm_total_income = zip_comm_houses_pop_hincome.withColumn(
        "zip_total_income",
        col('median_hincome') * col('houses')
    )

    comm_total_population_total_income = zip_comm_total_income.groupBy('COMM').agg(
        sum('population').alias('total_population'),
        sum('zip_total_income').alias('total_income')
    )

    comm_crime_income = comm_total_population_total_income.join(
        count_comm_crimes.hint(strategy3),
        on='COMM',
        how='outer'
    )

    comm_mincome_person = comm_crime_income.withColumn(
        'average_income_per_person',
        when(col('total_population') > 0, col('total_income') / col('total_population'))
        .otherwise(0)
    ).withColumn(
        'crime_rate_per_person',
        when(col('total_population') > 0, col('crimes_count') / col('total_population'))
        .otherwise(0)
    ).drop('total_income').drop('crimes_count').drop('total_population')
    comm_mincome_person_collect = comm_mincome_person.collect()
    
    end = time.time()
    execution_time = end - start
    
    return execution_time

join_strategies = ["BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL"]
strategy_combinations = list(itertools.product(join_strategies, repeat=3))

results = []

for strat1, strat2, strat3 in strategy_combinations:
    exec_time = execute_query_with_join_strategy(strat1, strat2, strat3)
    results.append((strat1, strat2, strat3, exec_time))

results_df = spark.createDataFrame(results, ["Strategy1", "Strategy2", "Strategy3", "ExecutionTime"])
results_df.show(n=results_df.count(), truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+------------------+
|Strategy1           |Strategy2           |Strategy3           |ExecutionTime     |
+--------------------+--------------------+--------------------+------------------+
|BROADCAST           |BROADCAST           |BROADCAST           |17.746803045272827|
|BROADCAST           |BROADCAST           |MERGE               |15.800673723220825|
|BROADCAST           |BROADCAST           |SHUFFLE_HASH        |15.497687578201294|
|BROADCAST           |BROADCAST           |SHUFFLE_REPLICATE_NL|16.231744050979614|
|BROADCAST           |MERGE               |BROADCAST           |18.210407495498657|
|BROADCAST           |MERGE               |MERGE               |16.40001654624939 |
|BROADCAST           |MERGE               |SHUFFLE_HASH        |21.747425079345703|
|BROADCAST           |MERGE               |SHUFFLE_REPLICATE_NL|16.669507026672363|
|BROADCAST           |SHUFFLE_HASH        |BROADCAST           |14.974244832

In [4]:
results_df_sorted = results_df.orderBy("ExecutionTime", ascending=True)
results_df_sorted.show(n=results_df_sorted.count(), truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+------------------+
|Strategy1           |Strategy2           |Strategy3           |ExecutionTime     |
+--------------------+--------------------+--------------------+------------------+
|SHUFFLE_REPLICATE_NL|SHUFFLE_HASH        |BROADCAST           |13.472944498062134|
|BROADCAST           |SHUFFLE_REPLICATE_NL|SHUFFLE_REPLICATE_NL|13.713311672210693|
|SHUFFLE_REPLICATE_NL|SHUFFLE_REPLICATE_NL|MERGE               |13.72116470336914 |
|SHUFFLE_HASH        |SHUFFLE_REPLICATE_NL|SHUFFLE_REPLICATE_NL|13.74719762802124 |
|SHUFFLE_HASH        |SHUFFLE_HASH        |BROADCAST           |13.748756647109985|
|SHUFFLE_REPLICATE_NL|MERGE               |BROADCAST           |14.009728908538818|
|SHUFFLE_REPLICATE_NL|BROADCAST           |BROADCAST           |14.104211568832397|
|SHUFFLE_HASH        |SHUFFLE_HASH        |SHUFFLE_HASH        |14.124082088470459|
|SHUFFLE_HASH        |SHUFFLE_HASH        |MERGE               |14.143094301