In [21]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "1",
        "spark.executor.memory": "1g",
        "spark.executor.cores": "1",
        "spark.driver.memory": "2g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2729,application_1732639283265_2688,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2601,application_1732639283265_2560,pyspark,idle,Link,Link,,
2704,application_1732639283265_2663,pyspark,idle,Link,Link,,
2715,application_1732639283265_2674,pyspark,idle,Link,Link,,
2719,application_1732639283265_2678,pyspark,idle,Link,Link,,
2726,application_1732639283265_2685,pyspark,idle,Link,Link,,
2727,application_1732639283265_2686,pyspark,idle,Link,Link,,
2728,application_1732639283265_2687,pyspark,idle,Link,Link,,
2729,application_1732639283265_2688,pyspark,idle,Link,Link,,✔


In [22]:
# Spark Dataframe code
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.functions import col
# To log our application's execution time:
import time

spark = SparkSession \
    .builder \
    .appName("query3") \
    .getOrCreate()

start_time = time.time()


income_schema = StructType([
    StructField("ZIP", StringType(), True),
    StructField("Comm", StringType(), True),
    StructField("income", StringType(), True),
])

income_df = spark.read.format('csv') \
                .options(header='true') \
                .schema(income_schema) \
                .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv")

income_df.printSchema()
income_df.show(20, truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ZIP: string (nullable = true)
 |-- Comm: string (nullable = true)
 |-- income: string (nullable = true)

+-----+-------------------------------------------------------------------------------------------------------+-------+
|ZIP  |Comm                                                                                                   |income |
+-----+-------------------------------------------------------------------------------------------------------+-------+
|90001|Los Angeles (South Los Angeles), Florence-Graham                                                       |$33,887|
|90002|Los Angeles (Southeast Los Angeles, Watts)                                                             |$30,413|
|90003|Los Angeles (South Los Angeles, Southeast Los Angeles)                                                 |$30,805|
|90004|Los Angeles (Hancock Park, Rampart Village, Virgil Village, Wilshire Center, Windsor Square)           |$40,612|
|90005|Los Angeles (Hancock Park, Koreatown, 

In [23]:
census_schema = StructType([
    StructField("CT10", StringType(), True),
    StructField("BG10", StringType(), True),
    StructField("CB10", StringType(), True),
    StructField("CTCB10", StringType(), True),
    StructField("BG10FIP10", StringType(), True),
    StructField("CEN_FIP13", StringType(), True),
    StructField("LA_FIP10", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("COMM", StringType(), True),
    StructField("CITYCOM", StringType(), True),
    StructField("ZCTA10", StringType(), True),
    StructField("PUMA10", StringType(), True),
    StructField("HD_2012", StringType(), True),
    StructField("HD_NAME", StringType(), True),
    StructField("SPA_2012", StringType(), True),
    StructField("SPA_NAME", StringType(), True),
    StructField("SUP_DIST", StringType(), True),
    StructField("SUP_LABEL", StringType(), True),
    StructField("HOUSING10", IntegerType(), True),
    StructField("POP_2010", IntegerType(), True),
    StructField("CT12", StringType(), True),
    StructField("BG12", StringType(), True),
    StructField("Shape.STArea()", FloatType(), True),
    StructField("Shape.STLength()", FloatType(), True)
])

census_df = spark.read.format("json").load("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")

census_df.printSchema()
census_df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _corrupt_record: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- BG10: string (nullable = true)
 |    |-- BG10FIP10: string (nullable = true)
 |    |-- BG12: string (nullable = true)
 |    |-- CB10: string (nullable = true)
 |    |-- CEN_FIP13: string (nullable = true)
 |    |-- CITY: string (nullable = true)
 |    |-- CITYCOM: string (nullable = true)
 |    |-- COMM: string (nullable = true)
 |    |-- CT10: string (nullable = true)
 |    |-- CT12: string (nullable = true)
 |    |-- CTCB10: string (nullable = true)
 |    |-- HD_2012: long (nullable = true)
 |    |-- HD_NAME: string (nullable = true)
 |    |-- HOUSING10: long (nullable = true)
 |    |-- 

In [24]:
from sedona.spark import *
from pyspark.sql.functions import collect_set,sum

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")


la_df = flattened_df.filter(
    (col("CITY") == "Los Angeles") &
    (col("HOUSING10").isNotNull()) & (col("HOUSING10") > 0) &
    (col("POP_2010").isNotNull()) & (col("POP_2010") > 0)
)


LA_areas = la_df \
    .groupBy("COMM") \
    .agg(
        ST_Union_Aggr("geometry").alias("geometry"),   # Aggregate geometries
        collect_set("ZCTA10").alias("ZIPCodes"),       # Collect unique ZIP codes as a list for each area
        sum("HOUSING10").alias("TotalHousing"),        # Sum of HOUSING10 for each community
        sum("POP_2010").alias("TotalPopulation")       # Sum of POP_2010 for each community
    )

LA_areas.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+--------------------+------------+---------------+
|              COMM|            geometry|            ZIPCodes|TotalHousing|TotalPopulation|
+------------------+--------------------+--------------------+------------+---------------+
|    Toluca Terrace|POLYGON ((-118.35...|             [91601]|         541|           1301|
|      Elysian Park|MULTIPOLYGON (((-...|      [90026, 90012]|        1992|           5261|
|          Longwood|MULTIPOLYGON (((-...|             [90016]|        1474|           4210|
|     Green Meadows|POLYGON ((-118.27...|[90001, 90003, 90...|        5204|          19814|
|  Cadillac-Corning|POLYGON ((-118.37...|      [90034, 90035]|        2215|           6665|
|          Mid-city|MULTIPOLYGON (((-...|             [90019]|        6692|          14339|
|   Lincoln Heights|MULTIPOLYGON (((-...|[90065, 90032, 90...|        9197|          31101|
|          Van Nuys|MULTIPOLYGON (((-...|[91402, 91406, 91...|       29170|     

In [25]:
from sedona.spark import *
from sedona.sql import *
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

# Define crime data schema
crimes_schema = StructType([
    StructField("DR_NO", StringType(), True),
    StructField("Date_Rptd", StringType(), True),
    StructField("DATE_OCC", StringType(), True),
    StructField("TIME_OCC", StringType(), True),
    StructField("AREA", StringType(), True),
    StructField("AREA_NAME", StringType(), True),
    StructField("Rpt_Dist_No", StringType(), True),
    StructField("Part_1-2", StringType(), True),
    StructField("Crm_Cd", StringType(), True),
    StructField("Crm_Cd_Desc", StringType(), True),
    StructField("Mocodes", StringType(), True),
    StructField("Vict_Age", StringType(), True),
    StructField("Vict_Sex", StringType(), True),
    StructField("Vict_Descent", StringType(), True),
    StructField("Premis_Cd", StringType(), True),
    StructField("Premis_Desc", StringType(), True),
    StructField("Weapon_Used_Cd", StringType(), True),
    StructField("Weapon_Desc", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Status_Desc", StringType(), True),
    StructField("Crm_Cd_1", StringType(), True),
    StructField("Crm_Cd_2", StringType(), True),
    StructField("Crm_Cd_3", StringType(), True),
    StructField("Crm_Cd_4", StringType(), True),
    StructField("LOCATION", StringType(), True),
    StructField("Cross_Street", StringType(), True),
    StructField("LAT", StringType(), True),
    StructField("LON", StringType(), True)
])

# Read crime data
crimes_df1 = spark.read.format('csv') \
    .options(header='false') \
    .schema(crimes_schema) \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")

crimes_df2 = spark.read.format('csv') \
    .options(header='true') \
    .schema(crimes_schema) \
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")

# Combine datasets
crimes_df = crimes_df1.union(crimes_df2)

# Filter valid coordinates and create geometry column for crimes
crimes_df = crimes_df.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()))
crimes_df = crimes_df.withColumn("geometry", ST_Point(col("LON").cast("double"), col("LAT").cast("double")))


# Perform spatial join: match crime locations with LA areas
crimes_within_areas = crimes_df.alias("crimes").join(
    LA_areas.alias("LA_areas"),
    ST_Contains(col("LA_areas.geometry"), col("crimes.geometry")),
    "inner"
)

# Count the number of crimes in each area and collect ZIP codes
crime_counts_with_zip = crimes_within_areas.groupBy(
    col("LA_areas.COMM").alias("Community")
).agg(
    collect_set("LA_areas.ZIPCodes").alias("ZIPCodes"),  # Collect ZIP codes
    count("*").alias("CrimeCount")                    # Count crimes
).orderBy(col("CrimeCount").desc())

# Flatten the collected ZIPCodes list for better readability
crime_counts_with_zip = crime_counts_with_zip.withColumn(
    "ZIPCodes", col("ZIPCodes").cast("string")  # Convert list of ZIPs to string format
)

# Show the crime counts with ZIP codes
crime_counts_with_zip.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------------------------------------------------------------------+----------+
|Community         |ZIPCodes                                                                |CrimeCount|
+------------------+------------------------------------------------------------------------+----------+
|North Hollywood   |[[91601, 91604, 91605, 91352, 91607, 91602, 91606]]                     |96565     |
|Hollywood         |[[90046, 90027, 90038, 90028, 90069]]                                   |86140     |
|Van Nuys          |[[91402, 91406, 91405, 91411, 91401]]                                   |63459     |
|Melrose           |[[90048, 90046, 90036, 90038, 90004, 90069, 90029]]                     |59302     |
|Sherman Oaks      |[[90210, 91604, 90077, 91607, 91403, 91411, 91401, 91423]]              |52555     |
|West Vernon       |[[90062, 90037, 90047]]                                                 |52195     |
|Boyle Heights     |[[90063, 90033, 90023]]            

In [26]:
from pyspark.sql.functions import *


# Join crime counts with population data
crime_with_population = crime_counts_with_zip.join(
    LA_areas,
    crime_counts_with_zip["Community"] == LA_areas["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimeCount"),
    col("TotalPopulation")
)

crime_with_population = crime_with_population.withColumn(
    "CrimesPerCapita",
    round(col("CrimeCount") / col("TotalPopulation"), 6)  # Round to 6 decimal places for clarity
)

# Show all results sorted by CrimeCount in descending order
crime_with_population.select(
    col("Community"),
    col("CrimeCount"),
    col("TotalPopulation"),
    col("CrimesPerCapita")
).orderBy(col("CrimeCount").desc()).show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----------+---------------+---------------+
|Community         |CrimeCount|TotalPopulation|CrimesPerCapita|
+------------------+----------+---------------+---------------+
|North Hollywood   |96565     |142347         |0.678377       |
|Hollywood         |86140     |62213          |1.384598       |
|Van Nuys          |63459     |85959          |0.738247       |
|Melrose           |59302     |73442          |0.807467       |
|Sherman Oaks      |52555     |81334          |0.646163       |
|West Vernon       |52195     |50300          |1.037674       |
|Boyle Heights     |50578     |81949          |0.617189       |
|San Pedro         |50365     |73394          |0.686228       |
|Vermont Vista     |47099     |37525          |1.255137       |
|Florence-Firestone|46208     |43638          |1.058894       |
|Wholesale District|45021     |29156          |1.544142       |
|Westlake          |43672     |54626          |0.799473       |
|Northridge        |43024     |62225    

In [27]:
# Clean and prepare the income column in income_df
income_df_cleaned = income_df.withColumn(
    "income_cleaned",
    regexp_replace(col("income"), "[$,]", "").cast("double")  # Remove dollar signs and commas, cast to double
)
income_df_LA = income_df_cleaned.filter(col("Comm").contains("Los Angeles"))
# Rename the columns to avoid ambiguity
income_df_LA = income_df_LA.withColumnRenamed("Comm", "Community")
income_df_LA = income_df_LA.withColumnRenamed("ZIP", "ZIP_Income")
income_df_LA.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+-------+--------------+
|ZIP_Income|           Community| income|income_cleaned|
+----------+--------------------+-------+--------------+
|     90001|Los Angeles (Sout...|$33,887|       33887.0|
|     90002|Los Angeles (Sout...|$30,413|       30413.0|
|     90003|Los Angeles (Sout...|$30,805|       30805.0|
|     90004|Los Angeles (Hanc...|$40,612|       40612.0|
|     90005|Los Angeles (Hanc...|$31,142|       31142.0|
|     90006|Los Angeles (Byza...|$31,521|       31521.0|
|     90007|Los Angeles (Sout...|$22,304|       22304.0|
|     90008|Los Angeles (Bald...|$36,564|       36564.0|
|     90010|Los Angeles (Hanc...|$45,786|       45786.0|
|     90011|Los Angeles (Sout...|$30,251|       30251.0|
|     90012|Los Angeles (Down...|$31,576|       31576.0|
|     90013|Los Angeles (Down...|$19,887|       19887.0|
|     90014|Los Angeles (Down...|$23,642|       23642.0|
|     90015|Los Angeles (Dowt...|$29,684|       29684.0|
|     90016|Los Angeles (West..

In [28]:
from pyspark.sql.functions import explode, col, sum

start_time = time.time()

# Join the exploded DataFrame with the income DataFrame on ZipCodes, since comm doesm't match
joined_df = la_df.join(
    income_df_LA,
    la_df["ZCTA10"] == income_df_LA["ZIP_Income"],
    how="inner"
)

# Add the new column : avg_income for each zip
joined_df_zip = joined_df.withColumn(
    "IncomePerCapita_zip",
     col("income_cleaned") * col("HOUSING10") / col("POP_2010")
)

# avg_income for each community (each community has multiple zip codes)
aggregated_df = joined_df_zip.groupBy("COMM").agg(
    round(avg("IncomePerCapita_zip") ,3).alias("IncomePerCapita")
)

result_df = aggregated_df.join(
    crime_with_population,
    crime_with_population["Community"] == aggregated_df["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimesPerCapita"),
    col("IncomePerCapita"),
)

# Sort the results in descending order by IncomePerCapita
result_df = result_df.orderBy(col("IncomePerCapita").desc())

# Show the results
result_df.show(truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken (default join): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------+---------------+
|Community              |CrimesPerCapita|IncomePerCapita|
+-----------------------+---------------+---------------+
|Beverly Crest          |0.368961       |80637.577      |
|Pacific Palisades      |0.379676       |71315.178      |
|Bel Air                |0.399225       |70879.469      |
|Venice                 |1.040428       |67888.88       |
|Palisades Highlands    |0.187842       |67392.081      |
|Marina Peninsula       |0.599954       |63010.679      |
|Mandeville Canyon      |0.261058       |62110.61       |
|Brentwood              |0.405864       |56551.715      |
|Playa Vista            |0.500448       |55354.295      |
|Carthay                |0.762896       |47221.492      |
|Playa Del Rey          |0.742559       |45486.718      |
|Hollywood Hills        |0.747616       |41672.817      |
|Studio City            |0.783407       |41151.145      |
|Century City           |0.632969       |40671.613      |
|South Carthay

In [29]:
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (55)
+- Sort (54)
   +- Exchange (53)
      +- Project (52)
         +- SortMergeJoin Inner (51)
            :- Sort (15)
            :  +- HashAggregate (14)
            :     +- Exchange (13)
            :        +- HashAggregate (12)
            :           +- Project (11)
            :              +- BroadcastHashJoin Inner BuildRight (10)
            :                 :- Project (5)
            :                 :  +- Filter (4)
            :                 :     +- Generate (3)
            :                 :        +- Filter (2)
            :                 :           +- Scan geojson  (1)
            :                 +- BroadcastExchange (9)
            :                    +- Project (8)
            :                       +- Filter (7)
            :                          +- Scan csv  (6)
            +- Filter (50)
               +- Project (49)
                  +- SortMergeJoin Inner (48)
                     :- Sort (38)
        

In [30]:
# BROADCAST JOIN
from pyspark.sql.functions import explode, col, sum

start_time = time.time()

# Join the exploded DataFrame with the income DataFrame on ZipCodes, since comm doesm't match
joined_df = la_df.hint("broadcast").join(
    income_df_LA,
    la_df["ZCTA10"] == income_df_LA["ZIP_Income"],
    how="inner"
)

# Add the new column : avg_income for each zip. AvgPpeoplePerHouse = POP_2010 / HOUSING10 
# IncomePerCapita_zip = IncomePerHouse / AvgPpeoplePerHouse = IncomePerHouse * HOUSING10 / POP_2010

joined_df_zip = joined_df.withColumn(
    "IncomePerCapita_zip",
     col("income_cleaned") * col("HOUSING10") / col("POP_2010")
)

# IncomePerCapita = avg(IncomePerCapita_zip) for each community (each community has multiple zip codes)

aggregated_df = joined_df_zip.groupBy("COMM").agg(
    round(avg("IncomePerCapita_zip") ,3).alias("IncomePerCapita")
)

result_df = aggregated_df.hint("broadcast").join(
    crime_with_population,
    crime_with_population["Community"] == aggregated_df["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimesPerCapita"),
    col("IncomePerCapita"),
)

# Sort the results in descending order by IncomePerCapita
result_df = result_df.orderBy(col("IncomePerCapita").desc())

# Show the results
result_df.show(truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken (broadcast join): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------+---------------+
|Community              |CrimesPerCapita|IncomePerCapita|
+-----------------------+---------------+---------------+
|Beverly Crest          |0.368961       |80637.577      |
|Pacific Palisades      |0.379676       |71315.178      |
|Bel Air                |0.399225       |70879.469      |
|Venice                 |1.040428       |67888.88       |
|Palisades Highlands    |0.187842       |67392.081      |
|Marina Peninsula       |0.599954       |63010.679      |
|Mandeville Canyon      |0.261058       |62110.61       |
|Brentwood              |0.405864       |56551.715      |
|Playa Vista            |0.500448       |55354.295      |
|Carthay                |0.762896       |47221.492      |
|Playa Del Rey          |0.742559       |45486.718      |
|Hollywood Hills        |0.747616       |41672.817      |
|Studio City            |0.783407       |41151.145      |
|Century City           |0.632969       |40671.613      |
|South Carthay

In [32]:
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (54)
+- Sort (53)
   +- Exchange (52)
      +- Project (51)
         +- BroadcastHashJoin Inner BuildLeft (50)
            :- BroadcastExchange (15)
            :  +- HashAggregate (14)
            :     +- Exchange (13)
            :        +- HashAggregate (12)
            :           +- Project (11)
            :              +- BroadcastHashJoin Inner BuildLeft (10)
            :                 :- BroadcastExchange (6)
            :                 :  +- Project (5)
            :                 :     +- Filter (4)
            :                 :        +- Generate (3)
            :                 :           +- Filter (2)
            :                 :              +- Scan geojson  (1)
            :                 +- Project (9)
            :                    +- Filter (8)
            :                       +- Scan csv  (7)
            +- Project (49)
               +- SortMergeJoin Inner (48)
                  :- Sort (38)
            

In [33]:
# MERGE JOIN
from pyspark.sql.functions import explode, col, sum

start_time = time.time()

# Join the exploded DataFrame with the income DataFrame on ZipCodes, since comm doesm't match
joined_df = la_df.hint("merge").join(
    income_df_LA,
    la_df["ZCTA10"] == income_df_LA["ZIP_Income"],
    how="inner"
)

# Add the new column : avg_income for each zip
joined_df_zip = joined_df.withColumn(
    "IncomePerCapita_zip",
     col("income_cleaned") * col("HOUSING10") / col("POP_2010")
)

# avg_income for each community (each community has multiple zip codes)
aggregated_df = joined_df_zip.groupBy("COMM").agg(
    round(avg("IncomePerCapita_zip") ,3).alias("IncomePerCapita")
)

result_df = aggregated_df.hint("merge").join(
    crime_with_population,
    crime_with_population["Community"] == aggregated_df["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimesPerCapita"),
    col("IncomePerCapita"),
)

# Sort the results in descending order by IncomePerCapita
result_df = result_df.orderBy(col("IncomePerCapita").desc())

# Show the results
result_df.show(truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken (merge join): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------+---------------+
|Community              |CrimesPerCapita|IncomePerCapita|
+-----------------------+---------------+---------------+
|Beverly Crest          |0.368961       |80637.577      |
|Pacific Palisades      |0.379676       |71315.178      |
|Bel Air                |0.399225       |70879.469      |
|Venice                 |1.040428       |67888.88       |
|Palisades Highlands    |0.187842       |67392.081      |
|Marina Peninsula       |0.599954       |63010.679      |
|Mandeville Canyon      |0.261058       |62110.61       |
|Brentwood              |0.405864       |56551.715      |
|Playa Vista            |0.500448       |55354.295      |
|Carthay                |0.762896       |47221.492      |
|Playa Del Rey          |0.742559       |45486.718      |
|Hollywood Hills        |0.747616       |41672.817      |
|Studio City            |0.783407       |41151.145      |
|Century City           |0.632969       |40671.613      |
|South Carthay

In [34]:
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (58)
+- Sort (57)
   +- Exchange (56)
      +- Project (55)
         +- SortMergeJoin Inner (54)
            :- Sort (18)
            :  +- HashAggregate (17)
            :     +- Exchange (16)
            :        +- HashAggregate (15)
            :           +- Project (14)
            :              +- SortMergeJoin Inner (13)
            :                 :- Sort (7)
            :                 :  +- Exchange (6)
            :                 :     +- Project (5)
            :                 :        +- Filter (4)
            :                 :           +- Generate (3)
            :                 :              +- Filter (2)
            :                 :                 +- Scan geojson  (1)
            :                 +- Sort (12)
            :                    +- Exchange (11)
            :                       +- Project (10)
            :                          +- Filter (9)
            :                             +- Scan c

In [35]:
# SHUFFLE HASH JOIN

from pyspark.sql.functions import explode, col, sum

start_time = time.time()

# Join the exploded DataFrame with the income DataFrame on ZipCodes, since comm doesm't match
joined_df = la_df.hint("shuffle_hash").join(
    income_df_LA,
    la_df["ZCTA10"] == income_df_LA["ZIP_Income"],
    how="inner"
)

# Add the new column : avg_income for each zip
joined_df_zip = joined_df.withColumn(
    "IncomePerCapita_zip",
     col("income_cleaned") * col("HOUSING10") / col("POP_2010")
)

# avg_income for each community (each community has multiple zip codes)
aggregated_df = joined_df_zip.groupBy("COMM").agg(
    round(avg("IncomePerCapita_zip") ,3).alias("IncomePerCapita")
)

result_df = aggregated_df.hint("shuffle_hash").join(
    crime_with_population,
    crime_with_population["Community"] == aggregated_df["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimesPerCapita"),
    col("IncomePerCapita"),
)

# Sort the results in descending order by IncomePerCapita
result_df = result_df.orderBy(col("IncomePerCapita").desc())

# Show the results
result_df.show(truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken (shuffle_hash join): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------+---------------+
|Community              |CrimesPerCapita|IncomePerCapita|
+-----------------------+---------------+---------------+
|Beverly Crest          |0.368961       |80637.577      |
|Pacific Palisades      |0.379676       |71315.178      |
|Bel Air                |0.399225       |70879.469      |
|Venice                 |1.040428       |67888.88       |
|Palisades Highlands    |0.187842       |67392.081      |
|Marina Peninsula       |0.599954       |63010.679      |
|Mandeville Canyon      |0.261058       |62110.61       |
|Brentwood              |0.405864       |56551.715      |
|Playa Vista            |0.500448       |55354.295      |
|Carthay                |0.762896       |47221.492      |
|Playa Del Rey          |0.742559       |45486.718      |
|Hollywood Hills        |0.747616       |41672.817      |
|Studio City            |0.783407       |41151.145      |
|Century City           |0.632969       |40671.613      |
|South Carthay

In [38]:
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (55)
+- Sort (54)
   +- Exchange (53)
      +- Project (52)
         +- ShuffledHashJoin Inner BuildLeft (51)
            :- HashAggregate (15)
            :  +- Exchange (14)
            :     +- HashAggregate (13)
            :        +- Project (12)
            :           +- ShuffledHashJoin Inner BuildLeft (11)
            :              :- Exchange (6)
            :              :  +- Project (5)
            :              :     +- Filter (4)
            :              :        +- Generate (3)
            :              :           +- Filter (2)
            :              :              +- Scan geojson  (1)
            :              +- Exchange (10)
            :                 +- Project (9)
            :                    +- Filter (8)
            :                       +- Scan csv  (7)
            +- Filter (50)
               +- Project (49)
                  +- SortMergeJoin Inner (48)
                     :- Sort (38)
              

In [39]:
# SHUFFLE_REPLICATE_NL JOIN

from pyspark.sql.functions import explode, col, sum

start_time = time.time()

# Join the exploded DataFrame with the income DataFrame on ZipCodes, since comm doesm't match
joined_df = la_df.hint("shuffle_replicate_nl").join(
    income_df_LA,
    la_df["ZCTA10"] == income_df_LA["ZIP_Income"],
    how="inner"
)

# Add the new column : avg_income for each zip
joined_df_zip = joined_df.withColumn(
    "IncomePerCapita_zip",
     col("income_cleaned") * col("HOUSING10") / col("POP_2010")
)

# avg_income for each community (each community has multiple zip codes)
aggregated_df = joined_df_zip.groupBy("COMM").agg(
    round(avg("IncomePerCapita_zip") ,3).alias("IncomePerCapita")
)

result_df = aggregated_df.hint("shuffle_replicate_nl").join(
    crime_with_population,
    crime_with_population["Community"] == aggregated_df["COMM"],
    how="inner"
).select(
    col("Community"),
    col("CrimesPerCapita"),
    col("IncomePerCapita"),
)

# Sort the results in descending order by IncomePerCapita
result_df = result_df.orderBy(col("IncomePerCapita").desc())

# Show the results
result_df.show(truncate=False)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken (shuffle_replicate_nl join): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------+---------------+
|Community              |CrimesPerCapita|IncomePerCapita|
+-----------------------+---------------+---------------+
|Beverly Crest          |0.368961       |80637.577      |
|Pacific Palisades      |0.379676       |71315.178      |
|Bel Air                |0.399225       |70879.469      |
|Venice                 |1.040428       |67888.88       |
|Palisades Highlands    |0.187842       |67392.081      |
|Marina Peninsula       |0.599954       |63010.679      |
|Mandeville Canyon      |0.261058       |62110.61       |
|Brentwood              |0.405864       |56551.715      |
|Playa Vista            |0.500448       |55354.295      |
|Carthay                |0.762896       |47221.492      |
|Playa Del Rey          |0.742559       |45486.718      |
|Hollywood Hills        |0.747616       |41672.817      |
|Studio City            |0.783407       |41151.145      |
|Century City           |0.632969       |40671.613      |
|South Carthay

In [40]:
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (53)
+- Sort (52)
   +- Exchange (51)
      +- Project (50)
         +- CartesianProduct Inner (49)
            :- HashAggregate (13)
            :  +- Exchange (12)
            :     +- HashAggregate (11)
            :        +- Project (10)
            :           +- CartesianProduct Inner (9)
            :              :- Project (5)
            :              :  +- Filter (4)
            :              :     +- Generate (3)
            :              :        +- Filter (2)
            :              :           +- Scan geojson  (1)
            :              +- Project (8)
            :                 +- Filter (7)
            :                    +- Scan csv  (6)
            +- Filter (48)
               +- Project (47)
                  +- SortMergeJoin Inner (46)
                     :- Sort (36)
                     :  +- HashAggregate (35)
                     :     +- Exchange (34)
                     :        +- HashAggregate (33)
    