### 1 Core , 2GB Memory


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, when, expr
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import StructField, StructType, IntegerType, FloatType, StringType
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType
from pyspark.sql.functions import col, year, to_timestamp , desc
from sedona.spark import *
from pyspark.sql.functions import count
from pyspark.sql.functions import broadcast

import time

# Δημιουργούμε SparkSession

spark = SparkSession.builder \
    .appName("VictimsAnalysis") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.debug.maxToStringFields", 1000) \
    .getOrCreate()

sedona = SedonaContext.create(spark)
SedonaRegistrator.registerAll(spark)
spark.sparkContext.setLogLevel("DEBUG")

start_time = time.time()


CrimeDataFile = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
CensusBlockFile = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
IncomeFile = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
RacesFile = "s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv"

crimeSchema = StructType([
    StructField("DR_NO", StringType(), True),
    StructField("Date Rptd", StringType(), True),
    StructField("DATE OCC", StringType(), True),
    StructField("TIME OCC", StringType(), True),
    StructField("AREA", StringType(), True),
    StructField("AREA NAME", StringType(), True),
    StructField("Rpt Dist No", StringType(), True),
    StructField("Part 1-2", StringType(), True),
    StructField("Crm Cd", StringType(), True),
    StructField("Crm Cd Desc", StringType(), True),
    StructField("Mocodes", StringType(), True),
    StructField("Vict Age", StringType(), True),
    StructField("Vict Sex", StringType(), True),
    StructField("Vict Descent", StringType(), True),
    StructField("Premis Cd", StringType(), True),
    StructField("Premis Desc", StringType(), True),
    StructField("Weapon Used Cd", StringType(), True),
    StructField("Weapon Desc", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Status Desc", StringType(), True),
    StructField("Crm Cd 1", StringType(), True),
    StructField("Crm Cd 2", StringType(), True),
    StructField("Crm Cd 3", StringType(), True),
    StructField("Crm Cd 4", StringType(), True),
    StructField("LOCATION", StringType(), True),
    StructField("Cross Street", StringType(), True),
    StructField("LAT", StringType(), True),
    StructField("LON", StringType(), True)
])



incomeSchema = StructType([
    StructField("Zip Code", StringType(), True),
    StructField("Community", StringType(), True),
    StructField("Estimated Median Income", StringType(), True)
])

raceSchema = StructType([
    StructField("Vict Descent", StringType(), True),
    StructField("Vict Descent Full", StringType(), True) 
])

#Φορτώνουμε τα DataSets

CrimeDataFrame = spark.read.csv(CrimeDataFile, header=True, schema=crimeSchema)
RaceDataFrame = spark.read.csv(RacesFile, header=True, schema=raceSchema)

#Κρατάμε μόνο τα εγκλήματα που έγιναν το 2015
CrimeDataFrame = CrimeDataFrame \
    .withColumn("parsed_date", to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")) \
    .filter(year(col("parsed_date")) == 2015) \
    .drop("parsed_date")  

IncomeDataFrame = spark.read.csv(IncomeFile, header=True, schema = incomeSchema)



#Για το blocks χρησιμοποιούμε το Sedona και το κάνουμε flatten

blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(CensusBlockFile) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

BlocksDataFrame = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

#Κρατάμε μόνο τα blocks της πόλης του Los Angeles
BlocksDataFrame = BlocksDataFrame.filter(col("CITY") == "Los Angeles")


#Φτιάχνουμε Point Geometry για κάθε έγκλημα

CrimeDataFrame = CrimeDataFrame.filter(col("LAT").isNotNull() & col("LON").isNotNull())
CrimeDataFrame = CrimeDataFrame.withColumn(
    "crime_geometry",
    expr("ST_Point(CAST(LON AS DOUBLE), CAST(LAT AS DOUBLE))")
).withColumn("crime_geometry", col("crime_geometry").cast(GeometryType()))



#Μετατρέπουμε το geometry του κάθε block σε κατάλληλο format για το sedona

BlocksDataFrame = BlocksDataFrame.withColumn(
    "geometry",
    col("geometry").cast(GeometryType())
)


# Repartition crimes and blocks for better parallelism
#CrimeDataFrame = CrimeDataFrame.repartition(200, "LON", "LAT")  # Example partitioning by longitude and latitude
#BlocksDataFrame = BlocksDataFrame.repartition(50, "geometry")     # Partitioning by geometry if available

#Αρχίζουμε γκρουπάροντας τα blocks που έχουν κοινό zip code ώστε να βγάλουμε συνολικό housing και population για κάθε zip code


aggregatedBlocksDataFrame = (
     BlocksDataFrame
    .groupBy("ZCTA10","COMM")
    .agg(
        sum(col("HOUSING10")).alias("total_housing"),
        sum(col("POP_2010")).alias("total_population"),
    )
)



#Για το Income φτιάχνουμε τη μορφή του ώστε να μπορεί να γίνει double για πράξεις και στα blocks δίνουμε στον ταχυδρομικό
#κώδικα το όνομα "Zip Code" ώστε να είναι ίδιο με του LA_Income

aggregatedBlocksDataFrame = aggregatedBlocksDataFrame.withColumnRenamed("ZCTA10", "Zip Code")

IncomeDataFrame = IncomeDataFrame.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[$,]", "")
)

#Κάνουμε Join το blocks dataset με το income dataset πάνω στο κοινό Zip Code ώστε να πάρουμε μέσο εισόδημα ανά σπίτι για κάθε zip code

CombinedDataFrame = (
    aggregatedBlocksDataFrame
    .join(IncomeDataFrame, on="Zip Code", how="inner")
    .withColumn("Estimated Median Income", col("Estimated Median Income").cast("double"))
    .withColumn("total_housing", col("total_housing").cast("double"))
    .withColumn("total_population", col("total_population").cast("double"))
)

# Για κάθε Zip Code πολλαπλασιάζουμε σπίτια επί μέσο εισόδημα ανά σπίτι για να βρούμε το συνολικό εισόδημα 

CombinedDataFrame = CombinedDataFrame.withColumn(
    "income_per_zip",
    col("total_housing") * col("Estimated Median Income")  
)

#Επειδή κάθε Comm έχει πολλά Zip Codes , βρίσκουμε για κάθε Comm το άθροισμα
#του εισοδήματος, του πληθυσμού  των Zip Codes του


CommAggregatesDF = (
    CombinedDataFrame
    .groupBy("COMM")
    .agg(
        sum("income_per_zip").alias("total_income_in_comm"),
        sum("total_population").alias("total_population_in_comm"),
    )
)

#Για κάθε Comm κάνουμε τις διαιρέσεις με το πληθυσμό για να βρούμε
#εισόδημα ανά άτομο 

CommFinalDF = CommAggregatesDF.withColumn(
    "income_per_person",
    col("total_income_in_comm") / col("total_population_in_comm")
)

#Κρατάμε μόνο τα 3 Comm με το υψηλότερο εισόδημα και τα 3 με το χαμηλότερο

CommFinalDF = CommFinalDF.orderBy(col("income_per_person").asc())


last_3 = CommFinalDF.limit(3)

first_3 = CommFinalDF.orderBy(col("income_per_person").desc()).limit(3)

resultDF = last_3.union(first_3)


resultDF = resultDF.orderBy(col("income_per_person").asc())





#Κρατάμε μόνο τα blocks που ανήκουν σε ένα από τα 6 Comms που μας ενδιαφέρουν

communities_of_interest_df = resultDF.select("COMM").distinct()

filteredBlocks = BlocksDataFrame.join(
    broadcast(communities_of_interest_df),
    on="COMM",
    how="inner"
)

#Spatial Join μεταξύ crime και filtered blocks

filteredBlocks = filteredBlocks.withColumn("geometry", col("geometry").cast(GeometryType()))
CrimeDataFrame = CrimeDataFrame.withColumn("crime_geometry", col("crime_geometry").cast(GeometryType()))


joinedDF = CrimeDataFrame.join(
    filteredBlocks,
    expr("ST_Contains(geometry, crime_geometry)"),
    "inner"
)



#Group τα blocks by Comm , για να βρούμε θύματα από κάθε φυλετική ομάδα ανά Comm 
descentCounts = joinedDF.groupBy("COMM", "Vict Descent").agg(count("*").alias("crime_count"))





#Find Sum for 3 Highest Income Comms together

first_3_communities_df = first_3.select("COMM").distinct()


filteredDescentCounts = descentCounts.join(
    broadcast(first_3_communities_df),
    on="COMM",
    how="inner"
)


aggregatedDescentCounts = filteredDescentCounts.groupBy("Vict Descent").agg(
    sum("crime_count").alias("total_crime_count")
    
)


final_firsts_DF = aggregatedDescentCounts.join(
    RaceDataFrame, 
    aggregatedDescentCounts["Vict Descent"] == RaceDataFrame["Vict Descent"], 
    "inner"
).select(
    RaceDataFrame["Vict Descent Full"], 
    aggregatedDescentCounts["total_crime_count"]
)

final_firsts_DF = final_firsts_DF.orderBy(desc("total_crime_count"))
final_firsts_DF.show(truncate=False)




#Find Sum for 3 Lowest Income Comms together

last_3_communities_df = last_3.select("COMM").distinct()


filteredDescentCounts = descentCounts.join(
    broadcast(last_3_communities_df),
    on="COMM",
    how="inner"
)

aggregatedDescentCounts = filteredDescentCounts.groupBy("Vict Descent").agg(
    sum("crime_count").alias("total_crime_count")
)

final_lasts_DF = aggregatedDescentCounts.join(
    RaceDataFrame, 
    aggregatedDescentCounts["Vict Descent"] == RaceDataFrame["Vict Descent"], 
    "inner"
).select(
    RaceDataFrame["Vict Descent Full"], 
    aggregatedDescentCounts["total_crime_count"]
)

final_lasts_DF = final_lasts_DF.orderBy(desc("total_crime_count"))
final_lasts_DF.show(truncate=False)
        

end_time = time.time()
elapsed_time = end_time - start_time
print(f"\n\nTime taken: {elapsed_time:.2f} seconds")



