In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, when, sum as _sum
from pyspark.sql.window import Window
import time

# Create SparkSession
spark = SparkSession.builder \
    .appName("Query2 DataFrame and SQL with Multiple Datasets") \
    .getOrCreate()

# Load datasets from S3
data_2010_2019 = spark.read.csv(
    "s3a://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)
data_2020_present = spark.read.csv(
    "s3a://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Combine the two datasets
data_combined = data_2010_2019.union(data_2020_present)

# Start timer for DataFrame API implementation
start_df = time.time()

# Step 1: Filter necessary columns
data_filtered = data_combined.select(
    col("DATE OCC").alias("date_occ"),
    col("AREA NAME").alias("area_name"),
    col("Status Desc").alias("status_desc")
)

# Step 2: Add closed_cases and total_cases columns
data_filtered = data_filtered.withColumn(
    "closed_cases",
    when(~col("status_desc").isin("UNK", "Invest Cont"), 1).otherwise(0)
).withColumn(
    "total_cases",
    when(col("status_desc").isNotNull(), 1).otherwise(0)
)

# Step 3: Extract year from "DATE OCC"
data_filtered = data_filtered.withColumn("year", col("date_occ").substr(7, 4))

# Step 4: Group by year and area_name, calculate closed_case_rate
grouped_data = data_filtered.groupBy("year", "area_name").agg(
    (_sum("closed_cases") / _sum("total_cases") * 100).alias("closed_case_rate")
)

# Step 5: Define window for ranking
window_spec = Window.partitionBy("year").orderBy(col("closed_case_rate").desc())

# Step 6: Add ranking column
ranked_data = grouped_data.withColumn("rank", row_number().over(window_spec))

# Step 7: Filter for top 3 precincts per year
top_3_precincts = ranked_data.filter(col("rank") <= 3)

# Step 8: Order results by year and rank
final_result_df = top_3_precincts.orderBy("year", "rank")

# Stop timer for DataFrame API implementation
end_df = time.time()

# Show final DataFrame results
final_result_df.show(n=final_result_df.count(), truncate=False)

# Calculate execution time for DataFrame API
execution_time_df = end_df - start_df
print(f"DataFrame API Execution Time: {execution_time_df} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+----+
|year|area_name  |closed_case_rate  |rank|
+----+-----------+------------------+----+
|2010|Rampart    |32.84713448949121 |1   |
|2010|Olympic    |31.515289821999087|2   |
|2010|Harbor     |29.36028339237341 |3   |
|2011|Olympic    |35.040060090135206|1   |
|2011|Rampart    |32.4964471814306  |2   |
|2011|Harbor     |28.51336246316431 |3   |
|2012|Olympic    |34.29708533302119 |1   |
|2012|Rampart    |32.46000463714352 |2   |
|2012|Harbor     |29.509585848956675|3   |
|2013|Olympic    |33.58217940999398 |1   |
|2013|Rampart    |32.1060382916053  |2   |
|2013|Harbor     |29.723638951488557|3   |
|2014|Van Nuys   |32.0215235281705  |1   |
|2014|West Valley|31.49754809505847 |2   |
|2014|Mission    |31.224939855653567|3   |
|2015|Van Nuys   |32.265140677157845|1   |
|2015|Mission    |30.463762673676303|2   |
|2015|Foothill   |30.353001803658852|3   |
|2016|Van Nuys   |32.194518462124094|1   |
|2016|West Valley|31.40146437042384 |2   |
|2016|Footh

In [14]:
# Register combined dataset as a temporary SQL view
data_combined.createOrReplaceTempView("crime_data")

# Start timer for SQL API implementation
start_sql = time.time()

# SQL Query to calculate closed_case_rate and rank top 3 precincts
query = """
SELECT year, area_name, closed_case_rate, rank
FROM (
    SELECT
        year,
        area_name,
        (SUM(CASE WHEN status_desc NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) * 100.0) /
        SUM(CASE WHEN status_desc IS NOT NULL THEN 1 ELSE 0 END) AS closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY year ORDER BY
        (SUM(CASE WHEN status_desc NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) * 100.0) /
        SUM(CASE WHEN status_desc IS NOT NULL THEN 1 ELSE 0 END) DESC) AS rank
    FROM (
        SELECT
            SUBSTRING(`DATE OCC`, 7, 4) AS year,
            `AREA NAME` AS area_name,
            `Status Desc` AS status_desc
        FROM crime_data
    ) AS year_data
    GROUP BY year, area_name
) AS ranked_data
WHERE rank <= 3
ORDER BY year, rank
"""

# Execute SQL query
final_result_sql = spark.sql(query)

# Stop timer for SQL API implementation
end_sql = time.time()

# Show final SQL results
final_result_sql.show(n=final_result_sql.count(), truncate=False)

# Calculate execution time for SQL API
execution_time_sql = end_sql - start_sql
print(f"SQL API Execution Time: {execution_time_sql} seconds")



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------------+----+
|year|area_name  |closed_case_rate |rank|
+----+-----------+-----------------+----+
|2010|Rampart    |32.84713448949121|1   |
|2010|Olympic    |31.51528982199909|2   |
|2010|Harbor     |29.36028339237341|3   |
|2011|Olympic    |35.04006009013520|1   |
|2011|Rampart    |32.49644718143060|2   |
|2011|Harbor     |28.51336246316431|3   |
|2012|Olympic    |34.29708533302119|1   |
|2012|Rampart    |32.46000463714352|2   |
|2012|Harbor     |29.50958584895668|3   |
|2013|Olympic    |33.58217940999398|1   |
|2013|Rampart    |32.10603829160530|2   |
|2013|Harbor     |29.72363895148855|3   |
|2014|Van Nuys   |32.02152352817050|1   |
|2014|West Valley|31.49754809505847|2   |
|2014|Mission    |31.22493985565357|3   |
|2015|Van Nuys   |32.26514067715784|1   |
|2015|Mission    |30.46376267367630|2   |
|2015|Foothill   |30.35300180365885|3   |
|2016|Van Nuys   |32.19451846212410|1   |
|2016|West Valley|31.40146437042384|2   |
|2016|Foothill   |29.9086472281316

In [8]:
# Save the combined dataset as a Parquet file
data_combined.write.parquet("s3://groups-bucket-dblab-905418150721/group6/query3/", mode="overwrite")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Ανάγνωση του Parquet αρχείου
parquet_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group6/query3/")


# Καταχώρηση του Parquet DataFrame ως SQL view
parquet_data.createOrReplaceTempView("crime_data_parquet")
# Start timer for SQL API implementation
start_sql = time.time()
# Εκτέλεση του SQL Query στο Parquet αρχείο
query = """
SELECT year, area_name, closed_case_rate, rank
FROM (
    SELECT
        year,
        area_name,
        (SUM(CASE WHEN status_desc NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) * 100.0) /
        SUM(CASE WHEN status_desc IS NOT NULL THEN 1 ELSE 0 END) AS closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY year ORDER BY
        (SUM(CASE WHEN status_desc NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) * 100.0) /
        SUM(CASE WHEN status_desc IS NOT NULL THEN 1 ELSE 0 END) DESC) AS rank
    FROM (
        SELECT
            SUBSTRING(`DATE OCC`, 7, 4) AS year,
            `AREA NAME` AS area_name,
            `Status Desc` AS status_desc
        FROM crime_data_parquet
    ) AS year_data
    GROUP BY year, area_name
) AS ranked_data
WHERE rank <= 3
ORDER BY year, rank
"""

# Εκτέλεση του ερωτήματος
final_result_sql = spark.sql(query)
# Stop timer for SQL API implementation
end_sql = time.time()

# Εμφάνιση όλων των αποτελεσμάτων
final_result_sql.show(n=final_result_sql.count(), truncate=False)
# Calculate execution time for SQL API
execution_time_sql = end_sql - start_sql
print(f"SQL API Execution Time: {execution_time_sql} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-----------------+----+
|year|area_name  |closed_case_rate |rank|
+----+-----------+-----------------+----+
|2010|Rampart    |32.84713448949121|1   |
|2010|Olympic    |31.51528982199909|2   |
|2010|Harbor     |29.36028339237341|3   |
|2011|Olympic    |35.04006009013520|1   |
|2011|Rampart    |32.49644718143060|2   |
|2011|Harbor     |28.51336246316431|3   |
|2012|Olympic    |34.29708533302119|1   |
|2012|Rampart    |32.46000463714352|2   |
|2012|Harbor     |29.50958584895668|3   |
|2013|Olympic    |33.58217940999398|1   |
|2013|Rampart    |32.10603829160530|2   |
|2013|Harbor     |29.72363895148855|3   |
|2014|Van Nuys   |32.02152352817050|1   |
|2014|West Valley|31.49754809505847|2   |
|2014|Mission    |31.22493985565357|3   |
|2015|Van Nuys   |32.26514067715784|1   |
|2015|Mission    |30.46376267367630|2   |
|2015|Foothill   |30.35300180365885|3   |
|2016|Van Nuys   |32.19451846212410|1   |
|2016|West Valley|31.40146437042384|2   |
|2016|Foothill   |29.9086472281316