# Query 3: Income and Crime Analysis

## Description
Using the 2010 Census data for population and the 2015 Census data for household income, this analysis aims to compute the following for each area in Los Angeles:
1. **Average Annual Income Per Person**
2. **Ratio of Total Number of Crimes Per Person**

The results are consolidated into a table for easy interpretation.

Different join strategies (`BROADCAST`, `MERGE`, `SHUFFLE_HASH`, `SHUFFLE_REPLICATE_NL`) are experimented with using Spark's `hint` and `explain` methods to understand the Catalyst optimizer's behavior. The performance of each strategy is measured and compared to determine the most suitable join strategy for this scenario.

Finally, the results are saved as CSV files for further analysis.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, col, expr, regexp_replace, round as spark_round, lower
from pyspark.sql.window import Window
from sedona.spark import SedonaContext
import time

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Query 3: Income and Crime Analysis") \
    .getOrCreate()

# Initialize Sedona
sedona = SedonaContext.create(spark)

# Function to measure execution time
def measure_execution_time(func, *args, **kwargs):
    start_time = time.time()
    results = func(*args, **kwargs)
    end_time = time.time()
    exec_time = end_time - start_time
    return exec_time, results


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2887,application_1732639283265_2846,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# --- Load Crime Data ---
crime_data_2010_2019 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)

crime_data_2020_present = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Union Crime Data
crime_df = crime_data_2010_2019.union(crime_data_2020_present) \
    .filter((col("LAT") != 0) | (col("LON") != 0)) \
    .withColumn("geom", expr("ST_Point(LON, LAT)")) \
    .select("geom")

# --- Load GeoJSON Census Blocks ---
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
    .option("multiLine", "true") \
    .load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Flatten properties into columns
flattened_df = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in blocks_df.schema["properties"].dataType.fieldNames()]
    + ["geometry"]
).drop("properties").drop("type")

# Filter for Los Angeles
population_df = flattened_df.filter(col("CITY") == "Los Angeles").select(
    col("COMM"),
    col("ZCTA10").alias("ZIPCODE"),
    col("POP_2010").alias("POPULATION"),
    col("HOUSING10").alias("HOUSING_UNITS"),
    col("geometry")
)

# --- Load Income Data ---
income_df = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv",
    header=True,
    inferSchema=True
)

# Clean Income Data
income_data_cleaned = income_df.withColumn("ZIPCODE", col("Zip Code")) \
    .withColumn(
        "MEDIAN_INCOME",
        regexp_replace(col("Estimated Median Income"), "[$,]", "").cast("double")
    ) \
    .select("ZIPCODE", "MEDIAN_INCOME")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Function to perform joins with different strategies and measure execution time
def perform_join_with_strategy(population_df, income_data_cleaned, strategy_hint):
    start_time = time.time()
    
    if strategy_hint:
        income_df_hinted = income_data_cleaned.hint(strategy_hint)
        print(f"\nJoin Strategy: {strategy_hint}")
    else:
        income_df_hinted = income_data_cleaned
        print("\nJoin Strategy: Default")
    
    # Perform the join on ZIPCODE
    joined_df = population_df.join(
        income_df_hinted,
        on="ZIPCODE",
        how="inner"
    )
    
    
    # Explain the physical plan
    joined_df.explain()
    
    # Trigger join execution
    joined_df.show()
    
    
    exec_time = time.time() - start_time
    return exec_time, joined_df

# Define join strategies to test
join_strategies = ["broadcast", "merge", "shuffle_hash", "shuffle_replicate_nl"]

# Measure execution time for each join strategy
join_times = []
for strategy in join_strategies:
    exec_time, joined_df = perform_join_with_strategy(population_df, income_data_cleaned, strategy)
    join_times.append((strategy, exec_time))

# Perform default join without any hint
exec_time_default, joined_df_default = perform_join_with_strategy(population_df, income_data_cleaned, strategy_hint=None)
join_times.append(("default", exec_time_default))

# Display join times
print("\n=== Join Times for Population + Income ===")
for strat, t in join_times:
    print(f"{strat}: {t:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Join Strategy: broadcast
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ZIPCODE#377, COMM#256, POPULATION#378L, HOUSING_UNITS#379L, geometry#243, MEDIAN_INCOME#415]
   +- BroadcastHashJoin [cast(ZIPCODE#377 as int)], [ZIPCODE#409], Inner, BuildRight, false
      :- Project [features#240.properties.COMM AS COMM#256, features#240.properties.ZCTA10 AS ZIPCODE#377, features#240.properties.POP_2010 AS POPULATION#378L, features#240.properties.HOUSING10 AS HOUSING_UNITS#379L, features#240.geometry AS geometry#243]
      :  +- Filter ((isnotnull(features#240.properties.CITY) AND (features#240.properties.CITY = Los Angeles)) AND isnotnull(features#240.properties.ZCTA10))
      :     +- Generate explode(features#232), false, [features#240]
      :        +- Filter ((size(features#232, true) > 0) AND isnotnull(features#232))
      :           +- FileScan geojson [features#232] Batched: false, DataFilters: [(size(features#232, true) > 0), isnotnull(features#232)], Format: GEO

In [10]:
# Perform Aggregation after Join
def aggregate_population_income(joined_df):
    pop_income_agg = joined_df.groupBy("COMM") \
        .agg(
            spark_sum("POPULATION").alias("TOTAL_POPULATION"),
            spark_sum(expr("MEDIAN_INCOME * HOUSING_UNITS")).alias("TOTAL_INCOME"),
            expr("ST_Union_Aggr(geometry) AS geometry")
        ) \
        .withColumn(
            "AVERAGE_INCOME_PER_PERSON",
            spark_round(col("TOTAL_INCOME") / col("TOTAL_POPULATION"), 2)
        ) \
        .select("COMM", "TOTAL_POPULATION", "AVERAGE_INCOME_PER_PERSON", "geometry")
    
    return pop_income_agg

# Aggregate using the default joined DataFrame
pop_income_agg = aggregate_population_income(joined_df_default)
pop_income_agg.show(3, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------------+-------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
# Function to perform spatial join with different strategies and measure execution time
def perform_spatial_join_with_strategy(crime_df, pop_income_agg, strategy_hint):
    start_time = time.time()
    
    if strategy_hint:
        pop_income_hinted = pop_income_agg.hint(strategy_hint)
        print(f"\nSpatial Join Strategy: {strategy_hint}")
    else:
        pop_income_hinted = pop_income_agg
        print("\nSpatial Join Strategy: Default")
    
    # Perform spatial join using ST_Within
    joined_spatial_df = crime_df.join(
        pop_income_hinted,
        expr("ST_Within(geom, geometry)"),
        "inner"
    )
    
    # Explain the physical plan
    joined_spatial_df.explain()
    
    # Triger join execution
    joined_spatial_df.show()
    
    exec_time = time.time() - start_time
    return exec_time, joined_spatial_df

# Define spatial join strategies to test
spatial_join_strategies = ["broadcast", "merge", "shuffle_hash", "shuffle_replicate_nl"]

# Measure execution time for each spatial join strategy
spatial_join_times = []
for strategy in spatial_join_strategies:
    exec_time, joined_spatial_df = perform_spatial_join_with_strategy(crime_df, pop_income_agg, strategy)
    spatial_join_times.append((strategy, exec_time))

# Perform default spatial join without any hint
exec_time_spatial_default, joined_spatial_df_default = perform_spatial_join_with_strategy(crime_df, pop_income_agg, strategy_hint=None)
spatial_join_times.append(("default", exec_time_spatial_default))

# Display spatial join times
print("\n=== Spatial Join Times: Crime + Population Income ===")
for strat, t in spatial_join_times:
    print(f"{strat}: {t:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Spatial Join Strategy: broadcast
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastIndexJoin geom#200: geometry, RightSide, LeftSide, Inner, WITHIN ST_WITHIN(geom#200, geometry#1868)
   :- Union
   :  :- Project [ **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS geom#200]
   :  :  +- Filter ((NOT (LAT#68 = 0.0) OR NOT (LON#69 = 0.0)) AND isnotnull( **org.apache.spark.sql.sedona_sql.expressions.ST_Point**  ))
   :  :     +- FileScan csv [LAT#68,LON#69] Batched: false, DataFilters: [(NOT (LAT#68 = 0.0) OR NOT (LON#69 = 0.0)), isnotnull( **org.apache.spark.sql.sedona_sql.express..., Format: CSV, Location: InMemoryFileIndex(1 paths)[s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_D..., PartitionFilters: [], PushedFilters: [Or(Not(EqualTo(LAT,0.0)),Not(EqualTo(LON,0.0)))], ReadSchema: struct<LAT:double,LON:double>
   :  +- Project [ **org.apache.spark.sql.sedona_sql.expressions.ST_Point**   AS geom#2315]
   :     +- Filter ((NOT (LAT#142 

In [13]:
# Aggregate Final Results
def aggregate_final_results(joined_spatial_df):
    final_df = joined_spatial_df.groupBy("COMM", "TOTAL_POPULATION", "AVERAGE_INCOME_PER_PERSON") \
        .agg(expr("count('*') as TOTAL_CRIMES")) \
        .withColumn(
            "CRIME_RATIO_PER_PERSON",
            spark_round(col("TOTAL_CRIMES") / col("TOTAL_POPULATION"), 2)
        ) \
        .select("COMM", "AVERAGE_INCOME_PER_PERSON", "CRIME_RATIO_PER_PERSON")
    
    return final_df

# Aggregate using the default spatial joined DataFrame
final_df = aggregate_final_results(joined_spatial_df_default)
print("\n=== Final DataFrame ===")
final_df.show(20, truncate=False)

# Save the final results to CSV
output_path = "s3://groups-bucket-dblab-905418150721/group21/crime_population_analysis/"
final_df.coalesce(1).write.mode("overwrite").csv(output_path, header=True)

print(f"\nFinal results saved to: {output_path}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


=== Final DataFrame ===
+-------------------+-------------------------+----------------------+
|COMM               |AVERAGE_INCOME_PER_PERSON|CRIME_RATIO_PER_PERSON|
+-------------------+-------------------------+----------------------+
|Thai Town          |26851.47                 |0.53                  |
|Harvard Heights    |11820.77                 |0.78                  |
|Valley Glen        |18271.77                 |0.55                  |
|Sycamore Square    |30116.82                 |1.13                  |
|Del Rey            |33675.36                 |0.56                  |
|Lakeview Terrace   |15991.26                 |0.62                  |
|Baldwin Hills      |17302.7                  |1.19                  |
|Winnetka           |19638.75                 |0.67                  |
|Toluca Woods       |24517.58                 |0.64                  |
|Harbor City        |21383.02                 |0.64                  |
|South Park         |6943.26                  |0.89 