### Query 1

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count
import time
import csv

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#Both Dataframe and RDD implementations are to use 4 spark executors

spark = SparkSession.builder \
    .appName("Query1 DataFrame API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

#### Dataframe Implementation

In [23]:
start_time = time.time()

# Load Crime Data
crime_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)

# Filter for "AGGRAVATED ASSAULT"
assault_data = crime_data.filter(col("Crm Cd Desc").contains("AGGRAVATED ASSAULT"))

# Age Groups
categorized = assault_data.withColumn(
    "AgeGroup",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Seniors")
)

# Group and count
result_df = categorized.groupBy("AgeGroup").agg(count("*").alias("Count")).orderBy(col("Count").desc())

# Show results
result_df.show()

# Stop timer and print elapsed time
elapsed_time = time.time() - start_time
print(f"Execution Time (DataFrame API): {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|    AgeGroup|Count|
+------------+-----+
|      Adults|72610|
|Young Adults|23472|
|    Children|10724|
|     Seniors| 3099|
+------------+-----+

Execution Time (DataFrame API): 19.31 seconds

#### RDD Implementation

In [33]:
start_time = time.time()

# Load dataset as RDD
crime_rdd = spark.sparkContext.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")

# Extract header and filter it out
header = crime_rdd.first()
crime_rdd = crime_rdd.filter(lambda row: row != header)

# Parse CSV rows


def parse_csv(line):
    return list(csv.reader([line]))[0]


parsed_rdd = crime_rdd.map(parse_csv)

# Filter for "AGGRAVATED ASSAULT"
assault_rdd = parsed_rdd.filter(lambda row: "AGGRAVATED ASSAULT" in row[9])

# Age groups
age_group_rdd = assault_rdd.map(lambda row: (
    "Children" if int(row[11]) < 18 else
    "Young Adults" if 18 <= int(row[11]) <= 24 else
    "Adults" if 25 <= int(row[11]) <= 64 else
    "Seniors"
))

# Group and count
result_rdd = age_group_rdd.map(lambda group: (group, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)

# Show results
for group, count in result_rdd.collect():
    print(f"{group}: {count}")

# Stop timer and print elapsed time
elapsed_time = time.time() - start_time
print(f"Execution Time (RDD API): {elapsed_time:.2f} seconds")

# Stop spark session
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Adults: 72610
Young Adults: 23472
Children: 10724
Seniors: 3099
Execution Time (RDD API): 21.01 seconds

### Query 2

#### i) Data_Frame API

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, when, row_number, expr
from pyspark.sql.window import Window
import time

# Start timer
start_time = time.time()

# Start Spark session
spark = SparkSession.builder \
    .appName("Query2 DataFrame API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Load datasets
crime_data_2010_2019 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020_present = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)

# Combine both datasets
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Aggregation logic
aggregated = crime_data.groupBy(
    expr("substring(`DATE OCC`, 7, 4)").alias("YEAR"),  # Extract year from DATE OCC
    col("AREA NAME")
).agg(
    count("*").alias("total_cases"),
    sum(when(~col("Status Desc").isin("UNK", "Invest Cont"), 1).otherwise(0)).alias("closed_cases")  # Non-"UNK"/"Invest Cont" are closed
).withColumn("closed_case_rate", col("closed_cases") / col("total_cases"))

# Define window specification for ranking within each year
window_spec = Window.partitionBy("YEAR").orderBy(col("closed_case_rate").desc())

# Assign rank and filter top 3 precincts per year
ranked = aggregated.withColumn("ranking", row_number().over(window_spec)) \
    .filter(col("ranking") <= 3) \
    .orderBy("YEAR", "ranking")

# Measure the end time for the DataFrame API operations
dataframe_api_end_time = time.time()

# Count rows for showing all results
row_count = ranked.count()

# Show all rows in the output
ranked.show(truncate=False, n=row_count)

# Print DataFrame API execution time
print(f"DataFrame API Execution Time: {dataframe_api_end_time - dataframe_api_start_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------+------------+-------------------+-------+
|YEAR|AREA NAME  |total_cases|closed_cases|closed_case_rate   |ranking|
+----+-----------+-----------+------------+-------------------+-------+
|2010|Rampart    |8707       |2860        |0.32847134489491214|1      |
|2010|Olympic    |8764       |2762        |0.3151528982199909 |2      |
|2010|Harbor     |9598       |2818        |0.2936028339237341 |3      |
|2011|Olympic    |7988       |2799        |0.35040060090135206|1      |
|2011|Rampart    |8444       |2744        |0.324964471814306  |2      |
|2011|Harbor     |9841       |2806        |0.2851336246316431 |3      |
|2012|Olympic    |8543       |2930        |0.3429708533302119 |1      |
|2012|Rampart    |8626       |2800        |0.3246000463714352 |2      |
|2012|Harbor     |9441       |2786        |0.29509585848956676|3      |
|2013|Olympic    |8305       |2789        |0.3358217940999398 |1      |
|2013|Rampart    |8148       |2616        |0.32106038291605304|2

#### ii) SQL API

In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import time

# Start timer
sql_api_start_time = time.time()
# Start Spark session
spark = SparkSession.builder \
    .appName("Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Load datasets
crime_data_2010_2019 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020_present = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)

# Combine datasets
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

# Register the DataFrame as a temporary SQL table
crime_data.createOrReplaceTempView("crime_data")

# SQL query
sql_query = """
    WITH Aggregated AS (
        SELECT
            SUBSTRING(`DATE OCC`, 7, 4) AS YEAR,
            `AREA NAME` AS Precinct,
            COUNT(*) AS total_cases,
            SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) AS closed_cases,
            SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) / COUNT(*) AS closed_case_rate
        FROM crime_data
        GROUP BY SUBSTRING(`DATE OCC`, 7, 4), `AREA NAME`
    ),
    Ranked AS (
        SELECT
            YEAR,
            Precinct,
            total_cases,
            closed_cases,
            closed_case_rate,
            ROW_NUMBER() OVER (PARTITION BY YEAR ORDER BY closed_case_rate DESC) AS ranking
        FROM Aggregated
    )
    SELECT
        YEAR,
        Precinct,
        total_cases,
        closed_cases,
        closed_case_rate,
        ranking
    FROM Ranked
    WHERE ranking <= 3
    ORDER BY YEAR, ranking
"""



# Execute the SQL query
result = spark.sql(sql_query)

# Count rows to ensure all results are shown
row_count = result.count()

# Show the results
result.show(truncate=False, n=row_count)

# End timer
sql_api_end_time = time.time()

# Print SQL API execution time
print(f"SQL API Execution Time: {sql_api_end_time - sql_api_start_time:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------+------------+-------------------+-------+
|YEAR|Precinct   |total_cases|closed_cases|closed_case_rate   |ranking|
+----+-----------+-----------+------------+-------------------+-------+
|2010|Rampart    |8707       |2860        |0.32847134489491214|1      |
|2010|Olympic    |8764       |2762        |0.3151528982199909 |2      |
|2010|Harbor     |9598       |2818        |0.2936028339237341 |3      |
|2011|Olympic    |7988       |2799        |0.35040060090135206|1      |
|2011|Rampart    |8444       |2744        |0.324964471814306  |2      |
|2011|Harbor     |9841       |2806        |0.2851336246316431 |3      |
|2012|Olympic    |8543       |2930        |0.3429708533302119 |1      |
|2012|Rampart    |8626       |2800        |0.3246000463714352 |2      |
|2012|Harbor     |9441       |2786        |0.29509585848956676|3      |
|2013|Olympic    |8305       |2789        |0.3358217940999398 |1      |
|2013|Rampart    |8148       |2616        |0.32106038291605304|2

csv to parquet transition

In [53]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("Save Crime Data as Parquet") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Load datasets
crime_data_2010_2019 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020_present = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)

# Combine datasets
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Save as a single Parquet file to the specified S3 bucket
output_path = "s3://groups-bucket-dblab-905418150721/group28/query2/"
crime_data.repartition(1).write.mode("overwrite").parquet(output_path)

print(f"Data successfully saved to {output_path} in Parquet format.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Data successfully saved to s3://groups-bucket-dblab-905418150721/group28/query2/ in Parquet format.

In [30]:
#parquet solution
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import time

# Start timer
sql_api_start_time = time.time()

# Start Spark session
spark = SparkSession.builder \
    .appName("Query2 SQL API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Crime Dataset in Parquet format
crime_data = spark.read.parquet(
    "s3://groups-bucket-dblab-905418150721/group28/query2/part-00000-fea3c04b-7961-41ea-8e05-d62534cf766e-c000.snappy.parquet"
)

# Register the DataFrame as a temporary SQL table
crime_data.createOrReplaceTempView("crime_data")

# SQL query
sql_query = """
    WITH Aggregated AS (
        SELECT
            SUBSTRING(`DATE OCC`, 7, 4) AS YEAR,
            `AREA NAME` AS Precinct,
            COUNT(*) AS total_cases,
            SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) AS closed_cases,
            SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) / COUNT(*) AS closed_case_rate
        FROM crime_data
        GROUP BY SUBSTRING(`DATE OCC`, 7, 4), `AREA NAME`
    ),
    Ranked AS (
        SELECT
            YEAR,
            Precinct,
            total_cases,
            closed_cases,
            closed_case_rate,
            ROW_NUMBER() OVER (PARTITION BY YEAR ORDER BY closed_case_rate DESC) AS ranking
        FROM Aggregated
    )
    SELECT
        YEAR,
        Precinct,
        total_cases,
        closed_cases,
        closed_case_rate,
        ranking
    FROM Ranked
    WHERE ranking <= 3
    ORDER BY YEAR, ranking
"""

# Execute the SQL query
result = spark.sql(sql_query)

# Count rows to ensure all results are shown
row_count = result.count()

# Show the results
result.show(truncate=False, n=row_count)

# End timer
sql_api_end_time = time.time()

# Print SQL API execution time
print(f"SQL API Execution Time: {sql_api_end_time - sql_api_start_time:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------+------------+-------------------+-------+
|YEAR|Precinct   |total_cases|closed_cases|closed_case_rate   |ranking|
+----+-----------+-----------+------------+-------------------+-------+
|2010|Rampart    |8707       |2860        |0.32847134489491214|1      |
|2010|Olympic    |8764       |2762        |0.3151528982199909 |2      |
|2010|Harbor     |9598       |2818        |0.2936028339237341 |3      |
|2011|Olympic    |7988       |2799        |0.35040060090135206|1      |
|2011|Rampart    |8444       |2744        |0.324964471814306  |2      |
|2011|Harbor     |9841       |2806        |0.2851336246316431 |3      |
|2012|Olympic    |8543       |2930        |0.3429708533302119 |1      |
|2012|Rampart    |8626       |2800        |0.3246000463714352 |2      |
|2012|Harbor     |9441       |2786        |0.29509585848956676|3      |
|2013|Olympic    |8305       |2789        |0.3358217940999398 |1      |
|2013|Rampart    |8148       |2616        |0.32106038291605304|2

### Query 3

In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
from pyspark.sql.functions import col, regexp_replace, round, avg, expr, udf, count, sum as spark_sum
from pyspark.sql.types import StringType
from shapely.geometry import shape
from shapely import wkt
import time

# Initialize Spark session with Sedona
spark = SparkSession.builder \
    .appName("Query3_LA_Analysis") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

# Register Sedona
SedonaRegistrator.registerAll(spark)

start_time = time.time()

# Load and preprocess income dataset
income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
income_df = income_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[$,]", "").cast("float")
)

# Load and preprocess census dataset
census_df = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")
census_df = census_df.select(
    col("properties.ZCTA10").alias("Zip Code"),
    col("properties.POP_2010").alias("Population"),
    col("properties.HOUSING10").alias("Housing"),
    col("properties.COMM").alias("COMM"),
    col("geometry.coordinates").alias("Coordinates")
)

# Convert GeoJSON to WKT for Sedona


def geojson_to_wkt(coords):
    if coords is None:
        return None
    try:
        geom = shape({"type": "Polygon", "coordinates": coords})
        return wkt.dumps(geom)
    except Exception as e:
        print(f"Error processing coordinates: {coords}, Error: {e}")
        return None


geojson_to_wkt_udf = udf(geojson_to_wkt, StringType())
census_df = census_df.withColumn("wkt_geometry", geojson_to_wkt_udf(census_df["Coordinates"]))
census_df = census_df.withColumn("geometry", expr("ST_GeomFromWKT(wkt_geometry)"))

# Load and preprocess crime dataset
crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_df = crime_df.filter(col("DATE OCC").substr(7, 4) == "2010")
crime_df = crime_df.select(
    col("DR_NO"),
    col("LAT"),
    col("LON")
).withColumn("geometry", expr("ST_Point(LON, LAT)"))

# Perform spatial join using Sedona
census_df.createOrReplaceTempView("census")
crime_df.createOrReplaceTempView("crime")

crime_with_comm_df = spark.sql("""
    SELECT c.COMM, cr.DR_NO
    FROM crime cr
    JOIN census c
    ON ST_Contains(c.geometry, cr.geometry)
""")

# Precompute income by COMM to avoid recomputation
income_by_comm = income_df.select(
    col("Zip Code"),
    col("Estimated Median Income")
).join(
    census_df.select("Zip Code", "COMM"),
    on="Zip Code",
    how="inner"
).groupBy("COMM").agg(
    round(avg("Estimated Median Income"), 2).alias("Median Income")
)

# Calculate the final DataFrame with all required information
final_df = census_df.groupBy("COMM").agg(
    spark_sum("Population").alias("Total Population"),
    spark_sum("Housing").alias("Total Housing")
).join(
    crime_with_comm_df.groupBy("COMM").agg(
        count("DR_NO").alias("Crime Count")
    ),
    on="COMM",
    how="left"
).join(
    income_by_comm,
    on="COMM",
    how="inner"
).withColumn(
    "Crime Per Person",
    round(col("Crime Count") / col("Total Population"), 6)
).withColumn(
    "Income per Person",
    round((col("Median Income") * col("Total Housing")) / col("Total Population"), 2)
).select(
    col("COMM"),
    col("Total Population"),
    col("Crime Count"),
    col("Crime Per Person"),
    col("Median Income"),
    col("Income per Person")
).orderBy(col("Crime Per Person").desc())

final_df.explain(mode="extended")

# Save results for query 4
query3_df = final_df.select(
    col("COMM"),
    col("Income per Person")
)

query3_df.write.mode("ignore").option("header", "true").csv("s3://groups-bucket-dblab-905418150721/group28/query3/")

# Display final results
final_df_area = final_df.withColumnRenamed("COMM", "Area")
final_df_area.show()

end_time = time.time()

# Print execution time
print(f"Execution Time: {end_time - start_time:.2f} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3840,application_1732639283265_3780,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Sort ['Crime Per Person DESC NULLS LAST], true
+- Project [COMM#72, Total Population#208L, Crime Count#217L, Crime Per Person#249, Median Income#197, Income per Person#256]
   +- Project [COMM#72, Total Population#208L, Total Housing#210L, Crime Count#217L, Median Income#197, Crime Per Person#249, round(((Median Income#197 * cast(Total Housing#210L as double)) / cast(Total Population#208L as double)), 2) AS Income per Person#256]
      +- Project [COMM#72, Total Population#208L, Total Housing#210L, Crime Count#217L, Median Income#197, round((cast(Crime Count#217L as double) / cast(Total Population#208L as double)), 6) AS Crime Per Person#249]
         +- Project [COMM#72, Total Population#208L, Total Housing#210L, Crime Count#217L, Median Income#197]
            +- Join Inner, (COMM#72 = COMM#241)
               :- Project [COMM#72, Total Population#208L, Total Housing#210L, Crime Count#217L]
               :  +- Join LeftOuter, (COMM#72 = COMM#227)
         

### Query 4

#### 1core/2GB

In [2]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
from pyspark.sql.functions import col, to_date, year, regexp_replace, count, expr, udf
from pyspark.sql.types import StringType
from shapely.geometry import shape
from shapely.geometry.polygon import Polygon
from shapely import wkt
import time

# Create Spark Session with Sedona enabled
spark = SparkSession.builder \
    .appName("Spatial Join - 1 Core, 2GB Memory") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", 1) \
    .config("spark.executor.memory", "2g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

# Register Sedona
SedonaRegistrator.registerAll(spark)

start_time = time.time()

# Load census.geojson
census_df = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")

census_df = census_df.select(
    col("properties.ZCTA10").alias("Zip Code"),
    col("properties.POP_2010").alias("Population"),
    col("properties.HOUSING10").alias("Housing"),
    col("properties.COMM").alias("COMM"),
    col("geometry.coordinates").alias("Coordinates")
)

# Convert to wkt with udf


def geojson_to_wkt(coords):
    if coords is None:
        return None
    try:
        geom = shape({"type": "Polygon", "coordinates": coords})
        return wkt.dumps(geom)
    except Exception as e:
        print(f"Error processing coordinates: {coords}, Error: {e}")
        return None


geojson_to_wkt_udf = udf(geojson_to_wkt, StringType())

wkt_df = census_df.withColumn("wkt_geometry", geojson_to_wkt_udf(census_df["Coordinates"]))
wkt_df.select("wkt_geometry")
wkt_df = wkt_df.withColumn("geometry", expr("ST_GeomFromWKT(wkt_geometry)"))

wkt_census_df = wkt_df.select("COMM", "geometry")

# Load crime.csv
crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_df = crime_df.filter(col("DATE OCC").substr(7, 4) == "2015")

# Select necessary columns and create point geometries
crime_df = crime_df.select(
    col("DR_NO"),
    col("Vict Descent"),
    col("LAT"),
    col("LON")
).withColumn("geometry", expr("ST_Point(LON, LAT)"))

# Load income per person from query3
income_per_person_df = spark.read.csv("s3://groups-bucket-dblab-905418150721/group28/query3/part-00000-5e1158bd-fb96-45fd-a2a9-7bf304269966-c000.csv", header=True, inferSchema=True)

# Load race.csv
race_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)

# Perform Spatial Join
# Register spatial DataFrames as tables
wkt_census_df.createOrReplaceTempView("census")
crime_df.createOrReplaceTempView("crime")

# Perform spatial join using Sedona's ST_Contains
crime_with_zip_df = spark.sql("""
    SELECT c.COMM, cr.DR_NO, cr.`Vict Descent`
    FROM crime cr
    JOIN census c
    ON ST_Contains(c.geometry, cr.geometry)
""")

# Aggregate Crimes by Zip Code and Race
crime_with_race_df = crime_with_zip_df.join(race_df, "Vict Descent", "inner")
grouped_df = crime_with_race_df.groupBy("COMM", "Vict Descent Full").agg(count("*").alias("CrimeCount"))

# Filter Top and Bottom 3 Communities by Income
top_3_comm = income_per_person_df.orderBy(col("Income Per Person").desc()).select("COMM").limit(3)
bottom_3_comm = income_per_person_df.orderBy(col("Income Per Person").asc()).select("COMM").limit(3)

# Convert top and bottom communities to lists for filtering
top_3_comm_list = [row["COMM"] for row in top_3_comm.collect()]
bottom_3_comm_list = [row["COMM"] for row in bottom_3_comm.collect()]

# Filter crime data for Top and Bottom Communities
filtered_crime_top = crime_with_race_df.filter(col("COMM").isin(top_3_comm_list))
filtered_crime_bottom = crime_with_race_df.filter(col("COMM").isin(bottom_3_comm_list))


# Aggregate Crime Counts by Race for Each Group
top_crime_race_df = filtered_crime_top.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

bottom_crime_race_df = filtered_crime_bottom.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

# Display Results
print("Victim Counts by Race in Top 3 High-Income Communities:")
top_crime_race_df.show()

print("\nVictim Counts by Race in Bottom 3 Low-Income Communities:")
bottom_crime_race_df.show()

end_time = time.time()

runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")

# Stop Spark Session
spark.stop()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Victim Counts by Race in Top 3 High-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|               White|      544|
|               Other|       73|
|Hispanic/Latin/Me...|       60|
|             Unknown|       41|
|               Black|       37|
|         Other Asian|       15|
|             Chinese|        1|
|American Indian/A...|        1|
+--------------------+---------+


Victim Counts by Race in Bottom 3 Low-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|Hispanic/Latin/Me...|     1494|
|               Black|      456|
|               Other|       53|
|               White|       29|
|         Other Asian|        4|
|             Unknown|        3|
|            Filipino|        1|
+--------------------+---------+

Total execution time: 40.14 seconds

#### 2cores/4GB

In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
from pyspark.sql.functions import col, to_date, year, regexp_replace, count, expr, udf
from pyspark.sql.types import StringType
from shapely.geometry import shape
from shapely.geometry.polygon import Polygon
from shapely import wkt
import time

spark = SparkSession.builder \
    .appName("Spatial Join - 1 Core, 2GB Memory") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", 2) \
    .config("spark.executor.memory", "4g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

start_time = time.time()

census_df = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")

census_df = census_df.select(
    col("properties.ZCTA10").alias("Zip Code"),
    col("properties.POP_2010").alias("Population"),
    col("properties.HOUSING10").alias("Housing"),
    col("properties.COMM").alias("COMM"),
    col("geometry.coordinates").alias("Coordinates")
)


def geojson_to_wkt(coords):
    if coords is None:
        return None
    try:
        geom = shape({"type": "Polygon", "coordinates": coords})
        return wkt.dumps(geom)
    except Exception as e:
        print(f"Error processing coordinates: {coords}, Error: {e}")
        return None


geojson_to_wkt_udf = udf(geojson_to_wkt, StringType())

wkt_df = census_df.withColumn("wkt_geometry", geojson_to_wkt_udf(census_df["Coordinates"]))
wkt_df.select("wkt_geometry")
wkt_df = wkt_df.withColumn("geometry", expr("ST_GeomFromWKT(wkt_geometry)"))

wkt_census_df = wkt_df.select("COMM", "geometry")

crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_df = crime_df.filter(col("DATE OCC").substr(7, 4) == "2015")

crime_df = crime_df.select(
    col("DR_NO"),
    col("Vict Descent"),
    col("LAT"),
    col("LON")
).withColumn("geometry", expr("ST_Point(LON, LAT)"))

income_per_person_df = spark.read.csv("s3://groups-bucket-dblab-905418150721/group28/query3/part-00000-5e1158bd-fb96-45fd-a2a9-7bf304269966-c000.csv", header=True, inferSchema=True)

race_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)

wkt_census_df.createOrReplaceTempView("census")
crime_df.createOrReplaceTempView("crime")

crime_with_zip_df = spark.sql("""
    SELECT c.COMM, cr.DR_NO, cr.`Vict Descent`
    FROM crime cr
    JOIN census c
    ON ST_Contains(c.geometry, cr.geometry)
""")

crime_with_race_df = crime_with_zip_df.join(race_df, "Vict Descent", "inner")
grouped_df = crime_with_race_df.groupBy("COMM", "Vict Descent Full").agg(count("*").alias("CrimeCount"))

top_3_comm = income_per_person_df.orderBy(col("Income Per Person").desc()).select("COMM").limit(3)
bottom_3_comm = income_per_person_df.orderBy(col("Income Per Person").asc()).select("COMM").limit(3)

top_3_comm_list = [row["COMM"] for row in top_3_comm.collect()]
bottom_3_comm_list = [row["COMM"] for row in bottom_3_comm.collect()]

filtered_crime_top = crime_with_race_df.filter(col("COMM").isin(top_3_comm_list))
filtered_crime_bottom = crime_with_race_df.filter(col("COMM").isin(bottom_3_comm_list))

top_crime_race_df = filtered_crime_top.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

bottom_crime_race_df = filtered_crime_bottom.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

print("Victim Counts by Race in Top 3 High-Income Communities:")
top_crime_race_df.show()

print("\nVictim Counts by Race in Bottom 3 Low-Income Communities:")
bottom_crime_race_df.show()

end_time = time.time()

runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")

spark.stop()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3672,application_1732639283265_3618,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Victim Counts by Race in Top 3 High-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|               White|      544|
|               Other|       73|
|Hispanic/Latin/Me...|       60|
|             Unknown|       41|
|               Black|       37|
|         Other Asian|       15|
|             Chinese|        1|
|American Indian/A...|        1|
+--------------------+---------+


Victim Counts by Race in Bottom 3 Low-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|Hispanic/Latin/Me...|     1494|
|               Black|      456|
|               Other|       53|
|               White|       29|
|         Other Asian|        4|
|             Unknown|        3|
|            Filipino|        1|
+--------------------+---------+

Total execution time: 34.99 seconds

#### 4cores/8GB

In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
from pyspark.sql.functions import col, to_date, year, regexp_replace, count, expr, udf
from pyspark.sql.types import StringType
from shapely.geometry import shape
from shapely.geometry.polygon import Polygon
from shapely import wkt
import time

spark = SparkSession.builder \
    .appName("Spatial Join - 1 Core, 2GB Memory") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", 4) \
    .config("spark.executor.memory", "8g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

start_time = time.time()

census_df = spark.read.json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")

census_df = census_df.select(
    col("properties.ZCTA10").alias("Zip Code"),
    col("properties.POP_2010").alias("Population"),
    col("properties.HOUSING10").alias("Housing"),
    col("properties.COMM").alias("COMM"),
    col("geometry.coordinates").alias("Coordinates")
)

def geojson_to_wkt(coords):
    if coords is None:
        return None
    try:
        geom = shape({"type": "Polygon", "coordinates": coords})
        return wkt.dumps(geom)
    except Exception as e:
        print(f"Error processing coordinates: {coords}, Error: {e}")
        return None


geojson_to_wkt_udf = udf(geojson_to_wkt, StringType())

wkt_df = census_df.withColumn("wkt_geometry", geojson_to_wkt_udf(census_df["Coordinates"]))
wkt_df.select("wkt_geometry")
wkt_df = wkt_df.withColumn("geometry", expr("ST_GeomFromWKT(wkt_geometry)"))

wkt_census_df = wkt_df.select("COMM", "geometry")

crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_df = crime_df.filter(col("DATE OCC").substr(7, 4) == "2015")

crime_df = crime_df.select(
    col("DR_NO"),
    col("Vict Descent"),
    col("LAT"),
    col("LON")
).withColumn("geometry", expr("ST_Point(LON, LAT)"))

income_per_person_df = spark.read.csv("s3://groups-bucket-dblab-905418150721/group28/query3/part-00000-5e1158bd-fb96-45fd-a2a9-7bf304269966-c000.csv", header=True, inferSchema=True)

race_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)

wkt_census_df.createOrReplaceTempView("census")
crime_df.createOrReplaceTempView("crime")

crime_with_zip_df = spark.sql("""
    SELECT c.COMM, cr.DR_NO, cr.`Vict Descent`
    FROM crime cr
    JOIN census c
    ON ST_Contains(c.geometry, cr.geometry)
""")

crime_with_race_df = crime_with_zip_df.join(race_df, "Vict Descent", "inner")
grouped_df = crime_with_race_df.groupBy("COMM", "Vict Descent Full").agg(count("*").alias("CrimeCount"))

top_3_comm = income_per_person_df.orderBy(col("Income Per Person").desc()).select("COMM").limit(3)
bottom_3_comm = income_per_person_df.orderBy(col("Income Per Person").asc()).select("COMM").limit(3)

top_3_comm_list = [row["COMM"] for row in top_3_comm.collect()]
bottom_3_comm_list = [row["COMM"] for row in bottom_3_comm.collect()]

filtered_crime_top = crime_with_race_df.filter(col("COMM").isin(top_3_comm_list))
filtered_crime_bottom = crime_with_race_df.filter(col("COMM").isin(bottom_3_comm_list))

top_crime_race_df = filtered_crime_top.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

bottom_crime_race_df = filtered_crime_bottom.groupBy("Vict Descent Full").agg(
    count("*").alias("VictCount")
).orderBy(col("VictCount").desc())

print("Victim Counts by Race in Top 3 High-Income Communities:")
top_crime_race_df.show()

print("\nVictim Counts by Race in Bottom 3 Low-Income Communities:")
bottom_crime_race_df.show()

end_time = time.time()

runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")

spark.stop()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3674,application_1732639283265_3620,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Victim Counts by Race in Top 3 High-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|               White|      544|
|               Other|       73|
|Hispanic/Latin/Me...|       60|
|             Unknown|       41|
|               Black|       37|
|         Other Asian|       15|
|             Chinese|        1|
|American Indian/A...|        1|
+--------------------+---------+


Victim Counts by Race in Bottom 3 Low-Income Communities:
+--------------------+---------+
|   Vict Descent Full|VictCount|
+--------------------+---------+
|Hispanic/Latin/Me...|     1494|
|               Black|      456|
|               Other|       53|
|               White|       29|
|         Other Asian|        4|
|             Unknown|        3|
|            Filipino|        1|
+--------------------+---------+

Total execution time: 29.24 seconds

### Query 5



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, when, row_number, expr
from pyspark.sql.window import Window
import time

# Start timer
start_time = time.time()

# Start Spark session
spark = SparkSession.builder \
    .appName("Query2 DataFrame API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Load datasets
crime_data_2010_2019 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020_present = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)

# Combine both datasets
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

In [3]:
# Load the CSV file
csv_file_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Inspect columns
print("Columns in the CSV file:")
print(df.columns)

# Inspect schema
print("Schema of the CSV file:")
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Columns in the CSV file:
['X', 'Y', 'FID', 'DIVISION', 'LOCATION', 'PREC']
Schema of the CSV file:
root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- FID: integer (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- PREC: integer (nullable = true)

#### 2 executors × 4 cores/8GB memory

In [24]:
import time
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, mean, count, min, first

# Start the timer
start_time = time.time()

# Initialize Spark Session with Sedona
spark = SparkSession.builder \
    .appName("GeospatialQuery") \
    .config("spark.jars.packages", "org.apache.sedona:sedona-sql-3.0_2.12:1.6.1") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Create Spatial DataFrames
# Police stations use X (longitude) and Y (latitude)
df = df.withColumn("station_geometry", expr("ST_Point(cast(X as Decimal(24, 20)), cast(Y as Decimal(24, 20)))"))

# Crimes use LAT (latitude) and LON (longitude)
crime_data = crime_data.withColumn("crime_geometry", expr("ST_Point(cast(LON as Decimal(24, 20)), cast(LAT as Decimal(24, 20)))"))

# Perform Spatial Join to calculate distances from all crimes to all police stations
df_broadcast = df.select("DIVISION", "station_geometry").cache()

# Calculate distances
distances = crime_data.crossJoin(df_broadcast) \
    .withColumn("distance", expr("ST_Distance(crime_geometry, station_geometry)"))

# Assign each crime to its closest division
closest_crimes = distances.withColumn("rank", expr("ROW_NUMBER() OVER (PARTITION BY DR_NO ORDER BY distance ASC)")) \
    .filter(col("rank") == 1) \
    .select("DR_NO", "distance", "DIVISION")

# Aggregate by division to calculate the number of crimes and average distance
result = closest_crimes.groupBy("DIVISION").agg(
    count("*").alias("number_of_crimes"),
    mean("distance").alias("average_distance")
).orderBy(col("number_of_crimes").desc())

# Show all rows of the result
result.show(result.count(), truncate=False)

# End the timer
end_time = time.time()

# Calculate and display the runtime
runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+--------------------+
|DIVISION        |number_of_crimes|average_distance    |
+----------------+----------------+--------------------+
|HOLLYWOOD       |213080          |0.02043779072548565 |
|VAN NUYS        |211457          |0.028653154590629136|
|WILSHIRE        |198150          |0.026312166557481587|
|SOUTHWEST       |186742          |0.021577001184243143|
|OLYMPIC         |180463          |0.01729162112331338 |
|NORTH HOLLYWOOD |171159          |0.026115214222567722|
|77TH STREET     |167323          |0.016584871496068188|
|PACIFIC         |157468          |0.037495777088312074|
|CENTRAL         |154474          |0.009868086849235298|
|SOUTHEAST       |151999          |0.024150127195506455|
|RAMPART         |149675          |0.014730484635455721|
|TOPANGA         |147167          |0.03243890335156792 |
|WEST VALLEY     |130933          |0.02897360719640746 |
|HARBOR          |130206          |3.2997622866934675  |
|FOOTHILL        |122515       

#### 4 executors × 2 cores/4GB memory

In [25]:
import time
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, mean, count, min, first

# Start the timer
start_time = time.time()

# Initialize Spark Session with specified resources
spark = SparkSession.builder \
    .appName("GeospatialQuery") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Create Spatial DataFrames
# Police stations use X (longitude) and Y (latitude)
df = df.withColumn("station_geometry", expr("ST_Point(cast(X as Decimal(24, 20)), cast(Y as Decimal(24, 20)))"))

# Crimes use LAT (latitude) and LON (longitude)
crime_data = crime_data.withColumn("crime_geometry", expr("ST_Point(cast(LON as Decimal(24, 20)), cast(LAT as Decimal(24, 20)))"))

# Perform Spatial Join to calculate distances from all crimes to all police stations
df_broadcast = df.select("DIVISION", "station_geometry").cache()

# Calculate distances
distances = crime_data.crossJoin(df_broadcast) \
    .withColumn("distance", expr("ST_Distance(crime_geometry, station_geometry)"))

# Assign each crime to its closest division
closest_crimes = distances.withColumn("rank", expr("ROW_NUMBER() OVER (PARTITION BY DR_NO ORDER BY distance ASC)")) \
    .filter(col("rank") == 1) \
    .select("DR_NO", "distance", "DIVISION")

# Aggregate by division to calculate the number of crimes and average distance
result = closest_crimes.groupBy("DIVISION").agg(
    count("*").alias("number_of_crimes"),
    mean("distance").alias("average_distance")
).orderBy(col("number_of_crimes").desc())

# Show all rows of the result
result.show(result.count(), truncate=False)

# End the timer
end_time = time.time()

# Calculate and display the runtime
runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+--------------------+
|DIVISION        |number_of_crimes|average_distance    |
+----------------+----------------+--------------------+
|HOLLYWOOD       |213080          |0.020437790725485655|
|VAN NUYS        |211457          |0.02865315459062913 |
|WILSHIRE        |198150          |0.026312166557481583|
|SOUTHWEST       |186742          |0.021577001184243143|
|OLYMPIC         |180463          |0.01729162112331337 |
|NORTH HOLLYWOOD |171159          |0.026115214222567722|
|77TH STREET     |167323          |0.016584871496068194|
|PACIFIC         |157468          |0.03749577708831207 |
|CENTRAL         |154474          |0.0098680868492353  |
|SOUTHEAST       |151999          |0.024150127195506462|
|RAMPART         |149675          |0.014730484635455718|
|TOPANGA         |147167          |0.03243890335156792 |
|WEST VALLEY     |130933          |0.028973607196407465|
|HARBOR          |130206          |3.2997622866934675  |
|FOOTHILL        |122515       

#### 8 executors × 1 core/2 GB memory

In [26]:
import time
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, mean, count, min, first

# Start the timer
start_time = time.time()

# Initialize Spark Session with specified resources
spark = SparkSession.builder \
    .appName("GeospatialQuery") \
    .config("spark.executor.instances", "8") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Create Spatial DataFrames
# Police stations use X (longitude) and Y (latitude)
df = df.withColumn("station_geometry", expr("ST_Point(cast(X as Decimal(24, 20)), cast(Y as Decimal(24, 20)))"))

# Crimes use LAT (latitude) and LON (longitude)
crime_data = crime_data.withColumn("crime_geometry", expr("ST_Point(cast(LON as Decimal(24, 20)), cast(LAT as Decimal(24, 20)))"))

# Perform Spatial Join to calculate distances from all crimes to all police stations
df_broadcast = df.select("DIVISION", "station_geometry").cache()

# Calculate distances
distances = crime_data.crossJoin(df_broadcast) \
    .withColumn("distance", expr("ST_Distance(crime_geometry, station_geometry)"))

# Assign each crime to its closest division
closest_crimes = distances.withColumn("rank", expr("ROW_NUMBER() OVER (PARTITION BY DR_NO ORDER BY distance ASC)")) \
    .filter(col("rank") == 1) \
    .select("DR_NO", "distance", "DIVISION")

# Aggregate by division to calculate the number of crimes and average distance
result = closest_crimes.groupBy("DIVISION").agg(
    count("*").alias("number_of_crimes"),
    mean("distance").alias("average_distance")
).orderBy(col("number_of_crimes").desc())

# Show all rows of the result
result.show(result.count(), truncate=False)

# End the timer
end_time = time.time()

# Calculate and display the runtime
runtime = end_time - start_time
print(f"Total execution time: {runtime:.2f} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+----------------+--------------------+
|DIVISION        |number_of_crimes|average_distance    |
+----------------+----------------+--------------------+
|HOLLYWOOD       |213080          |0.02043779072548566 |
|VAN NUYS        |211457          |0.028653154590629126|
|WILSHIRE        |198150          |0.026312166557481583|
|SOUTHWEST       |186742          |0.021577001184243143|
|OLYMPIC         |180463          |0.017291621123313373|
|NORTH HOLLYWOOD |171159          |0.02611521422256772 |
|77TH STREET     |167323          |0.016584871496068188|
|PACIFIC         |157468          |0.037495777088312074|
|CENTRAL         |154474          |0.009868086849235298|
|SOUTHEAST       |151999          |0.02415012719550645 |
|RAMPART         |149675          |0.014730484635455718|
|TOPANGA         |147167          |0.03243890335156791 |
|WEST VALLEY     |130933          |0.02897360719640747 |
|HARBOR          |130206          |3.299762286693468   |
|FOOTHILL        |122515       

In [52]:
income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, encoding="UTF-8")
print("Preview of the Income DataFrame:")
income_df.show(20, truncate=False)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Preview of the Income DataFrame:
+--------+-------------------------------------------------------------------------------------------------------+-----------------------+
|Zip Code|Community                                                                                              |Estimated Median Income|
+--------+-------------------------------------------------------------------------------------------------------+-----------------------+
|90001   |Los Angeles (South Los Angeles), Florence-Graham                                                       |$33,887                |
|90002   |Los Angeles (Southeast Los Angeles, Watts)                                                             |$30,413                |
|90003   |Los Angeles (South Los Angeles, Southeast Los Angeles)                                                 |$30,805                |
|90004   |Los Angeles (Hancock Park, Rampart Village, Virgil Village, Wilshire Center, Windsor Square)           |$40,612            

In [53]:
from pyspark.sql.functions import col, trim, regexp_replace, lpad

# Correctly load the CSV with header information
income_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, encoding="UTF-8")

# Rename columns for consistency
income_df = income_df.withColumnRenamed("Zip Code", "ZIP").withColumnRenamed("Estimated Median Income", "Income")

# Clean ZIP and Income columns
income_df = income_df.withColumn("ZIP", lpad(trim(col("ZIP")), 5, "0")) \
    .withColumn("Income", regexp_replace(col("Income"), "[^0-9.]", "").cast("double"))

# Preview the cleaned DataFrame
print("Cleaned Income DataFrame:")
income_df.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Cleaned Income DataFrame:
+-----+-------------------------------------------------------------------------------------------------------+-------+
|ZIP  |Community                                                                                              |Income |
+-----+-------------------------------------------------------------------------------------------------------+-------+
|90001|Los Angeles (South Los Angeles), Florence-Graham                                                       |33887.0|
|90002|Los Angeles (Southeast Los Angeles, Watts)                                                             |30413.0|
|90003|Los Angeles (South Los Angeles, Southeast Los Angeles)                                                 |30805.0|
|90004|Los Angeles (Hancock Park, Rampart Village, Virgil Village, Wilshire Center, Windsor Square)           |40612.0|
|90005|Los Angeles (Hancock Park, Koreatown, Wilshire Center, Wilshire Park, Windsor Square)                  |31142.0|
|90006|Los Ang

In [55]:
from pyspark.sql.functions import col, lpad, trim, sum as _sum, when

# Ensure ZIP codes in GeoJSON are formatted correctly
geojson_flat_df = geojson_flat_df.withColumn("ZIP", lpad(trim(col("ZIP")), 5, "0"))

# Perform the join on ZIP codes
matched_zips_df = geojson_flat_df.join(income_df, on="ZIP", how="inner")

# Check the matched ZIP codes and incomes
print("Matched ZIP Codes and Incomes:")
matched_zips_df.select("ZIP", "Income", "Neighborhood").show(100, truncate=False)

# Aggregate income and population by neighborhood
neighborhood_stats_df = matched_zips_df.groupBy("Neighborhood").agg(
    _sum("Income").alias("TotalIncome"),
    _sum("Population").alias("TotalPopulation")
)

# Calculate income per person
neighborhood_stats_df = neighborhood_stats_df.withColumn(
    "IncomePerPerson",
    when(col("TotalPopulation") > 0, col("TotalIncome") / col("TotalPopulation")).otherwise(None)
)

# Show final neighborhood stats
print("Neighborhood Stats:")
neighborhood_stats_df.show(truncate=False)

# Find the 3 richest neighborhoods
richest_neighborhoods_df = neighborhood_stats_df.orderBy(col("IncomePerPerson").desc()).limit(3)
print("3 Richest Neighborhoods:")
richest_neighborhoods_df.show(truncate=False)

# Find the 3 poorest neighborhoods
poorest_neighborhoods_df = neighborhood_stats_df.orderBy(col("IncomePerPerson").asc()).limit(3)
print("3 Poorest Neighborhoods:")
poorest_neighborhoods_df.show(truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matched ZIP Codes and Incomes:
+-----+--------+---------------------+
|ZIP  |Income  |Neighborhood         |
+-----+--------+---------------------+
|90732|84679.0 |San Pedro            |
|91789|93301.0 |Diamond Bar          |
|90275|118790.0|Rancho Palos Verdes  |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90732|84679.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90731|50879.0 |San Pedro            |
|90803|75197.0 |Long Beach           |
|90275|118790.0|Rancho Palos Verdes  |
|90731|50879.0 |San Pedro            |
|90802|42829.0 |Long Beach           |
|90803|75197.0 |Long Beach           |
|90803|75197.0 |Long Beach           |
|90802|42829.0 |Long Beach           |
|90803|75197.0 |Long Beach           |
|90731|50879.0 |Harbor City      

In [60]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("CrimeDataUnion").getOrCreate()

# File paths for the two datasets
crime_data_2010_to_2019_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_2020_to_present_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

# Step 1: Load the two crime datasets
crime_data_2010_to_2019_df = spark.read.csv(crime_data_2010_to_2019_path, header=True, inferSchema=True)
crime_data_2020_to_present_df = spark.read.csv(crime_data_2020_to_present_path, header=True, inferSchema=True)

# Step 2: Union the two datasets
crimes_df = crime_data_2010_to_2019_df.union(crime_data_2020_to_present_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
from pyspark.sql.functions import col, broadcast
import geopandas as gpd
from shapely.geometry import Point

# Step 1: Load Census GeoJSON as a GeoPandas DataFrame
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
geojson_gdf = gpd.read_file(geojson_path)

# Step 2: Create Point Geometry for Crimes Data
crimes_pdf = crimes_df.select("LAT", "LON", "DR_NO", "Crm Cd Desc", "Date Rptd").toPandas()
crimes_pdf["geometry"] = crimes_pdf.apply(lambda row: Point(float(row["LON"]), float(row["LAT"])), axis=1)
crimes_gdf = gpd.GeoDataFrame(crimes_pdf, geometry="geometry")

# Step 3: Perform Spatial Join
crimes_with_neighborhoods = gpd.sjoin(crimes_gdf, geojson_gdf, how="left", predicate="within")

# Step 4: Filter for Richest and Poorest Neighborhoods
richest_neighborhoods = ["Whittier Narrows", "Industry", "Franklin Canyon"]
poorest_neighborhoods = ["Bandini Islands", "Westlake", "Little Bangladesh"]

# Crimes in Richest Neighborhoods
richest_crimes = crimes_with_neighborhoods[crimes_with_neighborhoods["Neighborhood"].isin(richest_neighborhoods)]
print("Crimes in the 3 Richest Neighborhoods:")
print(richest_crimes[["DR_NO", "Neighborhood", "Crm Cd Desc", "Date Rptd"]])

# Crimes in Poorest Neighborhoods
poorest_crimes = crimes_with_neighborhoods[crimes_with_neighborhoods["Neighborhood"].isin(poorest_neighborhoods)]
print("Crimes in the 3 Poorest Neighborhoods:")
print(poorest_crimes[["DR_NO", "Neighborhood", "Crm Cd Desc", "Date Rptd"]])


An error was encountered:
Session 2741 unexpectedly reached final status 'dead'. See logs:
stdout: 

stderr: 
25/01/16 11:33:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/16 11:33:04 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-192-168-1-36.eu-central-1.compute.internal/192.168.1.36:8032
25/01/16 11:33:05 INFO Configuration: resource-types.xml not found
25/01/16 11:33:05 INFO ResourceUtils: Unable to find 'resource-types.xml'.
25/01/16 11:33:05 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
25/01/16 11:33:05 INFO Client: Will allocate AM container, with 1384 MB memory including 384 MB overhead
25/01/16 11:33:05 INFO Client: Setting up container launch context for our AM
25/01/16 11:33:05 INFO Client: Setting up the launch environment for our AM container
25/01/16 11:33:05 INFO Cl