# Imports

In [None]:
from pyspark.sql.functions import col, when, rank, count, sum as pyspark_sum, to_timestamp, regexp_replace, udf, broadcast, avg
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
from sedona.spark import ST_Point, ST_Within, ST_Union_Aggr, ST_Distance
from sedona.spark import SedonaContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import time
from sedona.register import SedonaRegistrator

# Preparation

In [None]:
def timed(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(
            f"Execution time for {func.__name__}: {end_time - start_time:.4f} seconds")
        return result
    return wrapper

In [None]:
def cleanup_crime_data(crime_data_df):
        crime_data_df = crime_data_df.withColumn("Date Rptd", to_timestamp(
            "Date Rptd", "MM/dd/yyyy hh:mm:ss a"))
        crime_data_df = crime_data_df.withColumn("DATE OCC", to_timestamp(
            "DATE OCC", "MM/dd/yyyy hh:mm:ss a"))
        crime_data_df = crime_data_df.withColumnRenamed("AREA ", "AREA")
        return crime_data_df

def load_crime_data_df(spark_session):
    return cleanup_crime_data(spark_session \
        .read \
        .csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/", header=True, inferSchema=True))
    
    
def load_crime_data_from_single_parquet_file(spark_session):
    return cleanup_crime_data(spark_session \
        .read \
        .parquet("s3://groups-bucket-dblab-905418150721/group3/crime_data.parquet", header=True, inferSchema=True))

    
def load_census_blocks_df(spark_session):
    sedona = SedonaContext.create(spark_session)
    census_blocks_df = sedona \
        .read \
        .format("geojson") \
        .option("multiline", "true") \
        .load("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson") \
        .selectExpr("explode(features) as features") \
        .select("features.*")
    census_blocks_df = census_blocks_df.select([col(f"properties.{col_name}").alias(col_name) for col_name in census_blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
        .drop("properties") \
        .drop("type")
    return census_blocks_df

def load_income_df(spark_session):
    income_df = spark_session \
        .read \
        .csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
    income_df = income_df \
        .withColumn(
            "Estimated Median Income",
            regexp_replace(col("Estimated Median Income"),
                           "[$,]", "").cast("int")
        )
    return income_df

def load_re_codes_df(spark_session):
    return spark_session \
        .read \
        .csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)

def load_police_stations_df(spark_session):
    return spark_session \
        .read \
        .csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True, inferSchema=True)

# Q1

In [None]:
@timed
def query1_dataframe(crime_data_df):
    crime_data_df \
        .filter(crime_data_df["Crm Cd Desc"].contains("AGGRAVATED ASSAULT")) \
        .withColumn(
            "Vict Age Group",
            when(col("Vict Age") < 18, "<18")
            .when((18 <= col("Vict Age")) & (col("Vict Age") <= 24), "18-24")
            .when((25 <= col("Vict Age")) & (col("Vict Age") <= 64), "25-64")
            .when(64 < col("Vict Age"), ">64")) \
        .groupBy("Vict Age Group") \
        .count() \
        .orderBy("count", ascending=False) \
        .show()


@timed
def query1_rdd(crime_data_df):
    def age_group(row):
        if int(row["Vict Age"]) < 18:
            return "<18"
        elif 18 <= int(row['Vict Age']) <= 24:
            return "18-24"
        elif 25 <= int(row['Vict Age']) <= 64:
            return "25-64"
        else:
            return ">64"

    crime_data_df \
        .rdd \
        .filter(lambda x: "AGGRAVATED ASSAULT" in x["Crm Cd Desc"]) \
        .map(lambda x: (age_group(x), 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda x: -x[1]) \
        .toDF(["Vict Age Group", "count"]) \
        .show()
        

# Q2

In [None]:
@timed
def query2_sql(spark_session, crime_data_df):
    crime_data_df.createOrReplaceTempView("crime_data")
    query = """
        SELECT
            year,
            precinct,
            closed_case_rate,
            RANK() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) as `#`
        FROM (
            SELECT 
                YEAR(`DATE OCC`) as year,
                `AREA NAME` as precinct,
                COUNT(CASE WHEN `Status Desc` NOT IN ('Invest Cont', 'UNK') THEN 1 END) / COUNT(*) as closed_case_rate
            FROM crime_data
            GROUP BY `AREA NAME`, YEAR(`DATE OCC`)
        )
    """
    result = spark_session.sql(query)
    result.show()


@timed
def query2_rdd(crime_data_df):
    def processed_indicator(row):
        if row["Status Desc"] in ("Invest Cont", "UNK"):
            return 0
        else:
            return 1

    crime_data_df \
        .rdd \
        .map(lambda row: ((row["AREA NAME"], row["DATE OCC"].year), (processed_indicator(row), 1))) \
        .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
        .mapValues(lambda x: x[0] / x[1]) \
        .map(lambda x: (x[0][1], x[0][0], x[1])) \
        .toDF([
            "year",
            "precinct",
            "closed_case_rate",
        ]) \
        .withColumn("#", rank().over(Window.partitionBy("year").orderBy(col("closed_case_rate").desc()))) \
        .orderBy("year", "#") \
        .show()


def save_crime_data_to_single_parquet_file(crime_data_df):
    crime_data_df \
        .repartition(1) \
        .write \
        .mode("overwrite") \
        .parquet("s3://groups-bucket-dblab-905418150721/group3/crime_data.parquet")


def load_crime_data_from_single_parquet_file(spark_session):
    return spark_session \
        .read \
        .parquet("s3://groups-bucket-dblab-905418150721/group3/crime_data.parquet")

# Q3

In [None]:
@timed
def query3(census_blocks_df, income_df, crime_data_df):
    la_census_comms = census_blocks_df \
        .filter(col("CITY") == "Los Angeles") \
        .groupBy("ZCTA10", "COMM") \
        .agg(
            ST_Union_Aggr("geometry").alias("geometry"),
            pyspark_sum("HOUSING10").alias("housing"),
            pyspark_sum("POP_2010").alias("population")
        )

    community_income_df = la_census_comms \
        .join(income_df, la_census_comms["ZCTA10"] == income_df["Zip Code"]) \
        .withColumn("income", col("housing") * col("Estimated Median Income")) \
        .groupBy("COMM") \
        .agg(
            ST_Union_Aggr("geometry").alias("comm_geometry"),
            pyspark_sum("population").alias("comm_population"),
            pyspark_sum("income").alias("comm_income")
        )

    crime_data_df = crime_data_df.withColumn("geom", ST_Point("LON", "LAT"))

    community_income_and_crime_df = community_income_df \
        .join(crime_data_df, ST_Within(crime_data_df["geom"], community_income_df["comm_geometry"])) \
        .groupBy("COMM", "comm_population", "comm_income") \
        .agg(
            count(col("*")).alias("#crimes"),
        ) \
        .withColumn("#Crimes per capita", col("#crimes") / col("comm_population")) \
        .withColumn("Income per capita", col("comm_income") / col("comm_population")) \
        .select("COMM", "#Crimes per capita", "Income per capita") \
        .orderBy(col("Income per capita").desc())

    community_income_and_crime_df.show()
    return community_income_and_crime_df

@timed
def query3_with_strategies(
        census_blocks_df,
        income_df,
        crime_data_df,
        community_income_strategy,
        community_income_and_crime_strategy):
    la_census_comms = census_blocks_df \
        .filter(col("CITY") == "Los Angeles") \
        .groupBy("ZCTA10", "COMM") \
        .agg(
            ST_Union_Aggr("geometry").alias("geometry"),
            pyspark_sum("HOUSING10").alias("housing"),
            pyspark_sum("POP_2010").alias("population")
        )

    community_income_df = la_census_comms \
        .join(income_df.hint(community_income_strategy), la_census_comms["ZCTA10"] == income_df["Zip Code"]) \
        .hint("") \
        .withColumn("income", col("housing") * col("Estimated Median Income")) \
        .groupBy("COMM") \
        .agg(
            ST_Union_Aggr("geometry").alias("comm_geometry"),
            pyspark_sum("population").alias("comm_population"),
            pyspark_sum("income").alias("comm_income")
        )

    crime_data_df = crime_data_df.withColumn("geom", ST_Point("LON", "LAT"))

    community_income_and_crime_df = community_income_df \
        .join(crime_data_df.hint(community_income_and_crime_strategy), ST_Within(crime_data_df["geom"], community_income_df["comm_geometry"])) \
        .groupBy("COMM", "comm_population", "comm_income") \
        .agg(
            count(col("*")).alias("#crimes"),
        ) \
        .withColumn("#Crimes per capita", col("#crimes") / col("comm_population")) \
        .withColumn("Income per capita", col("comm_income") / col("comm_population")) \
        .select("COMM", "#Crimes per capita", "Income per capita") \
        .orderBy(col("Income per capita").desc())

    community_income_and_crime_df.show()
    return community_income_and_crime_df

# Q4

In [None]:
@timed
def query4(query3_result_df, crime_data_df, census_blocks_df, race_codes_df):
    spark_session = crime_data_df.sql_ctx.sparkSession

    top_3_areas = query3_result_df.orderBy(col("Income per capita").desc()).limit(3)
    bottom_3_areas = query3_result_df.orderBy(col("Income per capita").asc()).limit(3)

    top_comms = [row.COMM for row in top_3_areas.select("COMM").collect()]
    bottom_comms = [row.COMM for row in bottom_3_areas.select("COMM").collect()]

    crime_data_with_geom = crime_data_df.withColumn("crime_geom", ST_Point("LON", "LAT"))

    # Επιλογή μόνο των απαραίτητων στηλών από το census_blocks_df και broadcast
    census_blocks_small = census_blocks_df.select("COMM", "geometry")
    crimes_with_comm = crime_data_with_geom.join(
        broadcast(census_blocks_small),
        ST_Within(col("crime_geom"), col("geometry")),
        "left"
    ).cache()  # caching, καθώς θα χρησιμοποιηθεί για δύο φίλτρα

    # Φιλτράρισμα εγκλημάτων για τις Top και Bottom κοινότητες
    top_crimes = crimes_with_comm.filter(col("COMM").isin(top_comms))
    bottom_crimes = crimes_with_comm.filter(col("COMM").isin(bottom_comms))

    race_mapping = {row["Vict Descent"]: row["Vict Descent Full"] for row in race_codes_df.collect()}
    broadcast_race_mapping = spark_session.sparkContext.broadcast(race_mapping)

    def map_race(code):
        return broadcast_race_mapping.value.get(code, None)
    map_race_udf = udf(map_race, StringType())

    # Εφαρμογή του UDF στα DataFrame για να προσθέσουμε την πλήρη περιγραφή της φυλής
    top_crimes_with_race = top_crimes.withColumn("Vict Descent Full", map_race_udf(col("Vict Descent")))
    bottom_crimes_with_race = bottom_crimes.withColumn("Vict Descent Full", map_race_udf(col("Vict Descent")))

    # Ομαδοποίηση και υπολογισμός αριθμού εγκλημάτων ανά race profile
    top_grouped = top_crimes_with_race.groupBy("Vict Descent Full") \
                                      .count() \
                                      .orderBy(col("count").desc())
    bottom_grouped = bottom_crimes_with_race.groupBy("Vict Descent Full") \
                                            .count() \
                                            .orderBy(col("count").desc())

    print("Race profile για τις Top 3 Κοινότητες (ανάλογα με Income per capita):")
    top_grouped.show()

    print("Race profile για τις Bottom 3 Κοινότητες (ανάλογα με Income per capita):")
    bottom_grouped.show()

    return top_crimes_with_race, bottom_crimes_with_race


In [None]:
# Initialize a shared SparkContext
conf = SparkConf().setAppName("SharedSparkContext").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

# Define configurations for Query 4:
configs = [
    {"app_name": "Q4_CONFIG_1", "executor_cores": 1, "executor_memory": "2G"},
    {"app_name": "Q4_CONFIG_2", "executor_cores": 2, "executor_memory": "4G"},
    {"app_name": "Q4_CONFIG_3", "executor_cores": 4, "executor_memory": "8G"}
]

for config in configs:
    print(f"Starting with SparkSession: {config['app_name']}")
    
    spark_session = (
        SparkSession(sc)
        .newSession()
        .builder
        .appName(config["app_name"])
        .config("spark.executor.cores", config["executor_cores"])
        .config("spark.executor.memory", config["executor_memory"])
        .getOrCreate()
    )
    
    income_df = load_income_df(spark_session)
    census_blocks_df = load_census_blocks_df(spark_session)
    crime_data_df = load_crime_data_df(spark_session)    
    re_codes_df = load_re_codes_df(spark_session)

    query3_result_df = query3(census_blocks_df, income_df, crime_data_df)

    start_time = time.time()
    top_crimes_with_race, bottom_crimes_with_race = query4(
        query3_result_df, crime_data_df, census_blocks_df, re_codes_df
    )
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Query 4 execution time for {config['app_name']}: {execution_time:.2f} seconds")


# Q5

In [None]:
@timed
def query5(crime_df, police_stations_df):
    police_stations_df = police_stations_df.withColumn("station_geom", ST_Point("X", "Y"))
    crime_df = crime_df.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()))
    crime_df = crime_df.withColumn("crime_geom", ST_Point("LON", "LAT"))
    
    distances_df = crime_df.crossJoin(police_stations_df) \
        .withColumn("distance", ST_Distance(col("crime_geom"), col("station_geom")))

    window_spec = Window.partitionBy("DR_NO").orderBy(col("distance"))
    closest_stations_df = distances_df.withColumn("rank", rank().over(window_spec)).filter(col("rank") == 1)

    query5_result_df = closest_stations_df.groupBy("DIVISION") \
        .agg(
            count("DR_NO").alias("crime_count"),
            avg("distance").alias("average_distance")
        ) \
        .orderBy(col("crime_count").desc())

    query5_result_df.select(
        col("DIVISION").alias("division"),
        col("average_distance"),
        col("crime_count").alias("#")
    ).orderBy(col("#").desc()).show()

    return query5_result_df

In [None]:
@timed
def query5_KNN(spark_session, crime_df):
    SedonaRegistrator.registerAll(spark_session)
    
    police_stations_df = spark_session.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv",
        header=True, inferSchema=True
    )
    police_stations_df = police_stations_df.withColumn("station_geom", ST_Point("X", "Y"))
    
    crime_df = crime_df.filter((col("LAT").isNotNull()) & (col("LON").isNotNull()))
    crime_df = crime_df.withColumn("crime_geom", ST_Point("LON", "LAT"))
    
    knn_df = ST_KNN(crime_df, police_stations_df, 1, False)
    
    # Υπολογισμός της απόστασης μεταξύ του σημείου του εγκλήματος και του πλησιέστερου σταθμού
    knn_df = knn_df.withColumn("distance", ST_Distance(col("crime_geom"), col("station_geom")))
    
    query5_result_df = knn_df.groupBy("DIVISION") \
        .agg(
            count("DR_NO").alias("crime_count"),
            avg("distance").alias("average_distance")
        ) \
        .orderBy(col("crime_count").desc())
    
    query5_result_df.select(
        col("DIVISION").alias("division"),
        col("average_distance"),
        col("crime_count").alias("#")
    ).orderBy(col("#").desc()).show()
    
    return query5_result_df

In [None]:
# Initialize a shared SparkContext
conf = SparkConf().setAppName("SharedSparkContext").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

# Workflow for each configuration
configs = [
    {"app_name": "Q5_CONFIG_1", "num_executors": 2, "executor_cores": 4, "executor_memory": "8G"},
    {"app_name": "Q5_CONFIG_2", "num_executors": 4, "executor_cores": 2, "executor_memory": "4G"},
    {"app_name": "Q5_CONFIG_3", "num_executors": 8, "executor_cores": 1, "executor_memory": "2G"},
]

for config in configs:
    # Create a new SparkSession using the shared SparkContext and apply resource configurations
    spark_session = (
        SparkSession(sc)
        .newSession()
        .builder
        .appName(config["app_name"])
        .config("spark.executor.instances", config["num_executors"])
        .config("spark.executor.cores", config["executor_cores"])
        .config("spark.executor.memory", config["executor_memory"])
        .getOrCreate()
    )
    
    # Load the crime dataset
    crime_df = spark_session.read.csv(
        "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/",
        header=True,
        inferSchema=True
    )
    
    query5_result_df = query5(spark_session, crime_df)

# Main - Run everything

In [None]:
def main():
    print("Initializing spark session")
    spark_session = SparkSession \
        .builder \
        .appName("LosAngelesCrime") \
        .getOrCreate()

    print("Loading crime data dataframe")
    crime_data_df = load_crime_data_df(spark_session)

    # Q1
    query1_dataframe(crime_data_df)
    query1_rdd(crime_data_df)

    # Q2
    print("Running query 2 RDD")
    query2_rdd(crime_data_df)
    print("Running query 2 SQL")
    query2_sql(spark_session, crime_data_df)
    print("Saving crime data to a single parquet file")
    save_crime_data_to_single_parquet_file(crime_data_df)
    print("Loading crime data from single parquet file")
    parquet_loaded_crime_data_df = load_crime_data_from_single_parquet_file(
        spark_session)
    parquet_loaded_crime_data_df = cleanup_crime_data(
        parquet_loaded_crime_data_df)
    print("Running query 2 RDD again with the dataframe being loaded from a single parquet file")
    query2_rdd(parquet_loaded_crime_data_df)

    # Q3
    print("Loading income dataframe")
    income_df = load_income_df(spark_session)
    print("Loading census blocks dataframe")
    census_blocks_df = load_census_blocks_df(spark_session)
    print("Running query 3 with default strategies")
    query3_result_df = query3(census_blocks_df, income_df, crime_data_df)
    print("Explaining query 3 result with default strategies")
    query3_result_df.explain()
    strategies = ("BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL")
    for (community_income_strategy, community_income_and_crime_strategy) in product(strategies, strategies):
        print(f"Running query 3 with community_income_strategy={community_income_strategy} and community_income_and_crime_strategy={community_income_and_crime_strategy}")
        res = query3_with_strategies(census_blocks_df, income_df, crime_data_df, community_income_and_crime_strategy, community_income_and_crime_strategy)
        res.explain()
    
    
    # Q4
    print("Running query 4")
    re_codes_df = load_re_codes_df(spark_session)
    query4(query3_result_df, crime_data_df, re_codes_df)
    
  # Q5
    print("Running query 5") 
    police_stations_df = load_police_stations_df(spark_session)
    query5_result_df = query5(crime_data_df, police_stations_df)
    query5_result_df.show()

if __name__ == "__main__":
    main()


In [None]:
main()