In [2]:
#QUERY 1 WITH DF
from pyspark.sql.functions import col, when, count, lower
from pyspark.sql import SparkSession

# Δημιουργία SparkSession
spark = SparkSession.builder \
    .appName("Query 1") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Φόρτωση δεδομένων από τα δύο αρχεία CSV
crime_data_path_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_path_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

crime_df_2010_2019 = spark.read.csv(crime_data_path_2010_2019, header=True, inferSchema=True)
crime_df_2020_present = spark.read.csv(crime_data_path_2020_present, header=True, inferSchema=True)

# Ένωση των δύο DataFrames
combined_crime_df = crime_df_2010_2019.union(crime_df_2020_present)

# Φιλτράρισμα εγγραφών που αναφέρονται στο Null Island
filtered_crime_df = combined_crime_df.filter((col("LAT") != 0) & (col("LON") != 0))

# Φιλτράρισμα για περιστατικά που περιέχουν τον όρο "aggravated assault"
filtered_df = combined_crime_df.filter(lower(col("Crm Cd Desc")).like("%aggravated assault%"))

# Προσθήκη στήλης για την κατηγοριοποίηση των θυμάτων σε ηλικιακές ομάδες
age_grouped_df = filtered_df.withColumn(
    "Age Group",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Seniors")
)

# Ομαδοποίηση ανά ηλικιακή ομάδα και καταμέτρηση
result_df = age_grouped_df.groupBy("Age Group").agg(count("*").alias("Total Cases"))

# Ταξινόμηση σε φθίνουσα σειρά
sorted_result_df = result_df.orderBy(col("Total Cases").desc())

# Εμφάνιση αποτελεσμάτων
sorted_result_df.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------+
|   Age Group|Total Cases|
+------------+-----------+
|      Adults|     121093|
|Young Adults|      33605|
|    Children|      15928|
|     Seniors|       5985|
+------------+-----------+

In [52]:
#QUERY 2 WITH DF
from pyspark.sql.functions import col, count, when, year, to_date, row_number
from pyspark.sql.window import Window

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Μετατροπή "DATE OCC" σε ημερομηνία και εξαγωγή του έτους
combined_crime_df = combined_crime_df.withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy"))
combined_crime_df = combined_crime_df.withColumn("YEAR", year(col("DATE OCC")))

# Υπολογισμός του ποσοστού κλεισμένων υποθέσεων
closed_rate_df = combined_crime_df.groupBy("YEAR", "AREA NAME").agg(
    count(when(col("Status").isin("JA", "AA", "JO", "AO"), True)).alias("closed_cases"),
    count("*").alias("total_cases")
).withColumn(
    "closed_case_rate", (col("closed_cases") / col("total_cases")) * 100
)

# Δημιουργία παραθύρου για ταξινόμηση
window_spec = Window.partitionBy("YEAR").orderBy(col("closed_case_rate").desc())

# Υπολογισμός του ranking
ranked_df = closed_rate_df.withColumn("rank", row_number().over(window_spec))

# Επιλογή των 3 πρώτων τμημάτων ανά έτος
top3_df = ranked_df.filter(col("rank") <= 3).orderBy("YEAR", "rank")

# Εμφάνιση αποτελεσμάτων
top3_df.show(50)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+------------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases|  closed_case_rate|rank|
+----+-----------+------------+-----------+------------------+----+
|2010|    Rampart|        2860|       8707| 32.84713448949121|   1|
|2010|    Olympic|        2762|       8764|31.515289821999087|   2|
|2010|     Harbor|        2818|       9598| 29.36028339237341|   3|
|2011|    Olympic|        2799|       7988|35.040060090135206|   1|
|2011|    Rampart|        2744|       8444|  32.4964471814306|   2|
|2011|     Harbor|        2806|       9841| 28.51336246316431|   3|
|2012|    Olympic|        2930|       8543| 34.29708533302119|   1|
|2012|    Rampart|        2800|       8626| 32.46000463714352|   2|
|2012|     Harbor|        2786|       9441|29.509585848956675|   3|
|2013|    Olympic|        2789|       8305| 33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|  32.1060382916053|   2|
|2013|     Harbor|        2506|       8431|29.72

In [None]:
crime_df.select("Status", "Status Desc").distinct().show(20, truncate=False)


In [4]:
#QUERY 2 WITH SQL API
from pyspark.sql.functions import to_date, year

# Ρύθμιση legacy time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Μετατροπή της στήλης "DATE OCC" σε ημερομηνία και εξαγωγή του έτους
combined_crime_df = combined_crime_df.withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy"))
combined_crime_df = combined_crime_df.withColumn("YEAR", year(col("DATE OCC")))

# Δημιουργία προσωρινού view για χρήση SQL
combined_crime_df.createOrReplaceTempView("crime_data")

# Υλοποίηση του query
sql_query = """
WITH closed_cases_data AS (
    SELECT
        YEAR,
        `AREA NAME`,
        COUNT(CASE WHEN Status IN ('JA', 'AA', 'JO', 'AO') THEN 1 END) AS closed_cases,
        COUNT(*) AS total_cases,
        (COUNT(CASE WHEN Status IN ('JA', 'AA', 'JO', 'AO') THEN 1 END) * 100.0 / COUNT(*)) AS closed_case_rate
    FROM crime_data
    GROUP BY YEAR, `AREA NAME`
),
ranked_data AS (
    SELECT
        YEAR,
        `AREA NAME`,
        closed_cases,
        total_cases,
        closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY YEAR ORDER BY closed_case_rate DESC) AS rank
    FROM closed_cases_data
)
SELECT
    YEAR,
    `AREA NAME`,
    closed_cases,
    total_cases,
    closed_case_rate,
    rank
FROM ranked_data
WHERE rank <= 3
ORDER BY YEAR, rank
"""

# Εκτέλεση του SQL query
top3_sql_df = spark.sql(sql_query)

# Εμφάνιση των αποτελεσμάτων
top3_sql_df.show(50)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+-----------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases| closed_case_rate|rank|
+----+-----------+------------+-----------+-----------------+----+
|2010|    Rampart|        2860|       8707|32.84713448949121|   1|
|2010|    Olympic|        2762|       8764|31.51528982199909|   2|
|2010|     Harbor|        2818|       9598|29.36028339237341|   3|
|2011|    Olympic|        2799|       7988|35.04006009013520|   1|
|2011|    Rampart|        2744|       8444|32.49644718143060|   2|
|2011|     Harbor|        2806|       9841|28.51336246316431|   3|
|2012|    Olympic|        2930|       8543|34.29708533302119|   1|
|2012|    Rampart|        2800|       8626|32.46000463714352|   2|
|2012|     Harbor|        2786|       9441|29.50958584895668|   3|
|2013|    Olympic|        2789|       8305|33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|32.10603829160530|   2|
|2013|     Harbor|        2506|       8431|29.72363895148855| 

In [None]:
#QUERY3 APO EDO KAI KATO
census_df = spark.read.option("multiline", "true").json("s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson")

# Εμφάνιση schema και δεδομένων
census_df.printSchema()
census_df.show()


In [None]:
income_df.printSchema()
income_df.show(5)


In [None]:
filtered_census_df.printSchema()
filtered_census_df.show(5, truncate=False)


In [None]:
census_df.printSchema()


In [41]:
#QUERY3
from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")
# Print schema
flattened_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG10: string (nullable = true)
 |-- BG10FIP10: string (nullable = true)
 |-- BG12: string (nullable = true)
 |-- CB10: string (nullable = true)
 |-- CEN_FIP13: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOM: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- CT10: string (nullable = true)
 |-- CT12: string (nullable = true)
 |-- CTCB10: string (nullable = true)
 |-- HD_2012: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING10: long (nullable = true)
 |-- LA_FIP10: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP_2010: long (nullable = true)
 |-- PUMA10: string (nullable = true)
 |-- SPA_2012: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP_DIST: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: double (nullable = true)
 |-- ShapeSTLength: double (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- geometry: geometry (nulla

In [6]:
from pyspark.sql.functions import regexp_replace

# Διαδρομή του αρχείου εισοδήματος
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"

# Φόρτωση του αρχείου CSV στο Spark
income_df = spark.read.csv(income_path, header=True, inferSchema=True)

# Καθαρισμός της στήλης "Estimated Median Income"
income_df = income_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[^0-9.]", "").cast("double")
)

# Εμφάνιση schema και πρώτων γραμμών
income_df.printSchema()
income_df.show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Zip Code: integer (nullable = true)
 |-- Community: string (nullable = true)
 |-- Estimated Median Income: double (nullable = true)

+--------+--------------------------------------------------------------------------------------------+-----------------------+
|Zip Code|Community                                                                                   |Estimated Median Income|
+--------+--------------------------------------------------------------------------------------------+-----------------------+
|90001   |Los Angeles (South Los Angeles), Florence-Graham                                            |33887.0                |
|90002   |Los Angeles (Southeast Los Angeles, Watts)                                                  |30413.0                |
|90003   |Los Angeles (South Los Angeles, Southeast Los Angeles)                                      |30805.0                |
|90004   |Los Angeles (Hancock Park, Rampart Village, Virgil Village, Wilshire Center, Wi

In [28]:
from pyspark.sql.functions import col, sum


# Εγγραφή Sedona functions
SedonaRegistrator.registerAll(spark)

# Υπολογισμός συνολικού πληθυσμού και αριθμού κατοικιών
census_population_df = flattened_df.groupBy("COMM", "ZCTA10").agg(
    sum("POP_2010").alias("Total Population"),
    sum("HOUSING10").alias("Total Housing Units"),
     ST_Union_Aggr("geometry").alias("geometry")
)

# Υπολογισμός μέσου μεγέθους νοικοκυριού
census_population_df = census_population_df.withColumn(
    "Average Household Size",
    (col("Total Population") / col("Total Housing Units")).cast("double")
)

# Φιλτράρισμα μηδενικών τιμών (προαιρετικό)
census_population_df = census_population_df.filter(
    (col("Total Population") > 0) & (col("Total Housing Units") > 0)
)

# Εμφάνιση αποτελεσμάτων
census_population_df.show(1, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------+----------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
# Σύνδεση GeoJSON και εισοδηματικών δεδομένων με βάση το Zip Code
joined_income_df = income_df.join(
    broadcast(census_population_df),
    income_df["Zip Code"] == census_population_df["ZCTA10"],
    "inner"
)

# Υπολογισμός μέσου εισοδήματος ανά άτομο
final_income_df = joined_income_df.withColumn(
    "Mean Income Per Person",
    (col("Estimated Median Income").cast("double") / col("Average Household Size")).cast("double")
)

# Εμφάνιση αποτελεσμάτων
final_income_df.select(
    "COMM", "Zip Code", "Estimated Median Income", "Average Household Size", "Mean Income Per Person"
).show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+--------+-----------------------+----------------------+----------------------+
|COMM                 |Zip Code|Estimated Median Income|Average Household Size|Mean Income Per Person|
+---------------------+--------+-----------------------+----------------------+----------------------+
|Florence-Firestone   |90001   |33887.0                |4.142286751361161     |8180.747020679986     |
|Green Meadows        |90001   |33887.0                |3.3846153846153846    |10012.068181818182    |
|Lynwood              |90002   |30413.0                |11.0                  |2764.818181818182     |
|Century Palms/Cove   |90002   |30413.0                |3.8961748633879782    |7805.861150070126     |
|Green Meadows        |90002   |30413.0                |3.7476370510396975    |8115.246910466582     |
|Florence-Firestone   |90002   |30413.0                |4.414702581369248     |6889.025803991356     |
|Watts                |90002   |30413.0                |4.038418079096045

In [14]:
# Εμφάνιση του σχήματος του DataFrame
combined_crime_df.printSchema()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: date (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA : integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- LO

In [29]:
census_population_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- COMM: string (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- Total Population: long (nullable = true)
 |-- Total Housing Units: long (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- Average Household Size: double (nullable = true)

In [15]:
# Εμφάνιση δείγματος δεδομένων από τη στήλη LOCATION
combined_crime_df.select("LOCATION").show(10, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------+
|LOCATION                               |
+---------------------------------------+
|300 E  GAGE                         AV |
|SEPULVEDA                    BL        |
|1300 E  21ST                         ST|
|CAHUENGA                     BL        |
|8TH                          ST        |
|700 W  7TH                          ST |
|PICO                         BL        |
|500    CROCKER                      ST |
|800 W  OLYMPIC                      BL |
|200 S  OLIVE                        ST |
+---------------------------------------+
only showing top 10 rows

In [55]:
flattened_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG10: string (nullable = true)
 |-- BG10FIP10: string (nullable = true)
 |-- BG12: string (nullable = true)
 |-- CB10: string (nullable = true)
 |-- CEN_FIP13: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOM: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- CT10: string (nullable = true)
 |-- CT12: string (nullable = true)
 |-- CTCB10: string (nullable = true)
 |-- HD_2012: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING10: long (nullable = true)
 |-- LA_FIP10: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP_2010: long (nullable = true)
 |-- PUMA10: string (nullable = true)
 |-- SPA_2012: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP_DIST: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: double (nullable = true)
 |-- ShapeSTLength: double (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- geometry: geometry (nulla

In [43]:
# Δημιουργία γεωμετρικών σημείων από LAT και LON
crime_points_df = combined_crime_df.withColumn(
    "geom", ST_Point("LON", "LAT")
)

# Geospatial join μεταξύ σημείων εγκλημάτων και περιοχών απογραφής
crime_population_df = crime_points_df.join(
    census_population_df.hint("shuffle_hash"),
    ST_Within(crime_points_df["geom"], census_population_df["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων και πληθυσμού ανά περιοχή
crime_population_df = crime_population_df.groupBy("COMM").agg(
    count("*").alias("Total Crimes"),
    sum("Total Population").alias("Total Population")
).withColumn(
    "Crime Per Capita",
    (col("Total Crimes") / col("Total Population")).cast("double")
)

# Εμφάνιση αποτελεσμάτων
crime_population_df.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------+------------+----------------+---------------------+
|COMM                          |Total Crimes|Total Population|Crime Per Capita     |
+------------------------------+------------+----------------+---------------------+
|Green Meadows                 |23975       |238932800       |1.0034202085272511E-4|
|Carson                        |421         |10786794        |3.902920552668383E-5 |
|Glendale                      |136         |2308681         |5.890809514177143E-5 |
|Silverlake                    |29840       |324462899       |9.196737159153595E-5 |
|Century Palms/Cove            |38877       |458033889       |8.487799906002151E-5 |
|Temple-Beaudry                |26923       |502672145       |5.3559761104327756E-5|
|Angelino Heights              |1497        |2347290         |6.377567322316374E-4 |
|Atwater Village               |8429        |118857329       |7.091695624423799E-5 |
|South Park                    |31378       |561459264       |5.5

In [44]:
from pyspark.sql.functions import col, avg

# Υπολογισμός μέσου εισοδήματος ανά περιοχή
income_aggregated_df = final_income_df.groupBy("COMM").agg(
    avg("Estimated Median Income").alias("Mean Estimated Income")
)

# Σύνδεση των δύο DataFrames με βάση το COMM
#final_df = crime_population_df.join(
 #   final_income_df,
  #  crime_population_df["COMM"] == final_income_df["COMM"],  # Join στη στήλη COMM
   # "inner"
#).select(
 #   crime_population_df["COMM"],
  #  col("Crime Per Capita"),
   # col("Estimated Median Income")
#)

# Σύνδεση του Crime Per Capita με το μέσο εισόδημα
final_df = crime_population_df.hint("merge").join(
    income_aggregated_df,
    "COMM",
    "inner"
).select(
    "COMM",
    "Crime Per Capita",
    "Mean Estimated Income"
)

# Εμφάνιση αποτελεσμάτων
final_df.show(20, truncate=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+---------------------+---------------------+
|COMM                   |Crime Per Capita     |Mean Estimated Income|
+-----------------------+---------------------+---------------------+
|                       |0.001976284584980237 |82810.42857142857    |
|Acton                  |1.271294177472667E-4 |78868.25             |
|Adams-Normandie        |2.5365100088999503E-4|28084.0              |
|Agoura Hills           |4.9188391539596654E-5|114417.0             |
|Alhambra               |2.0250581156764403E-5|51822.333333333336   |
|Alsace                 |8.526603001364257E-5 |38330.0              |
|Altadena               |2.8049704075622E-5   |80547.66666666667    |
|Anaverde               |0.0012106537530266344|69699.66666666667    |
|Angeles National Forest|0.009002934877449036 |73013.5294117647     |
|Angelino Heights       |6.377567322316374E-4 |39784.5              |
|Arcadia                |3.1530821377896896E-5|66056.0              |
|Arleta             

In [45]:
from pyspark.sql.functions import format_number

# Μορφοποίηση της στήλης "Crime Per Capita" σε δεκαδικούς αριθμούς
formatted_df = final_df.withColumn(
    "Crime Per Capita", format_number("Crime Per Capita", 6)  # Διατήρηση 6 δεκαδικών ψηφίων
)

# Εμφάνιση αποτελεσμάτων
formatted_df.show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+----------------+---------------------+
|COMM                   |Crime Per Capita|Mean Estimated Income|
+-----------------------+----------------+---------------------+
|                       |0.001976        |82810.42857142857    |
|Acton                  |0.000127        |78868.25             |
|Adams-Normandie        |0.000254        |28084.0              |
|Agoura Hills           |0.000049        |114417.0             |
|Alhambra               |0.000020        |51822.333333333336   |
|Alsace                 |0.000085        |38330.0              |
|Altadena               |0.000028        |80547.66666666667    |
|Anaverde               |0.001211        |69699.66666666667    |
|Angeles National Forest|0.009003        |73013.5294117647     |
|Angelino Heights       |0.000638        |39784.5              |
|Arcadia                |0.000032        |66056.0              |
|Arleta                 |0.000030        |44588.0              |
|Athens Village         |

In [40]:
from pyspark.sql.functions import broadcast

# Προετοιμασία DataFrames (χρησιμοποιούμε τα crime_population_df και income_aggregated_df)
# Ενδεικτικό Join χωρίς hint
basic_join = crime_population_df.join(
    income_aggregated_df,
    "COMM",
    "inner"
)

# Εμφάνιση του φυσικού πλάνου εκτέλεσης
print("Basic Join:")
basic_join.explain()

# Προσθήκη Broadcast Hint
broadcast_join = crime_population_df.join(
    broadcast(income_aggregated_df),
    "COMM",
    "inner"
)

print("\nBroadcast Join:")
broadcast_join.explain()

# Προσθήκη Merge Hint
merge_join = crime_population_df.hint("merge").join(
    income_aggregated_df.hint("merge"),
    "COMM",
    "inner"
)

print("\nMerge Join:")
merge_join.explain()

# Προσθήκη Shuffle Hash Hint
shuffle_hash_join = crime_population_df.hint("shuffle_hash").join(
    income_aggregated_df.hint("shuffle_hash"),
    "COMM",
    "inner"
)

print("\nShuffle Hash Join:")
shuffle_hash_join.explain()

# Προσθήκη Shuffle Replicate NL Hint
shuffle_replicate_nl_join = crime_population_df.hint("shuffle_replicate_nl").join(
    income_aggregated_df.hint("shuffle_replicate_nl"),
    "COMM",
    "inner"
)

print("\nShuffle Replicate NL Join:")
shuffle_replicate_nl_join.explain()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Basic Join:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM#2242, Total Crimes#3506L, Total Population#3508L, Crime Per Capita#3512, Mean Estimated Income#3864]
   +- SortMergeJoin [COMM#2242], [COMM#4488], Inner
      :- Sort [COMM#2242 ASC NULLS FIRST], false, 0
      :  +- Project [COMM#2242, Total Crimes#3506L, Total Population#3508L, (cast(Total Crimes#3506L as double) / cast(Total Population#3508L as double)) AS Crime Per Capita#3512]
      :     +- HashAggregate(keys=[COMM#2242], functions=[count(1), sum(Total Population#3286L)], schema specialized)
      :        +- Exchange hashpartitioning(COMM#2242, 1000), ENSURE_REQUIREMENTS, [plan_id=9337]
      :           +- HashAggregate(keys=[COMM#2242], functions=[partial_count(1), partial_sum(Total Population#3286L)], schema specialized)
      :              +- Project [COMM#2242, Total Population#3286L]
      :                 +- RangeJoin geom#3366: geometry, geometry#3293: geometry, WITHIN
      :         

In [47]:
#QUERY5

police_stations_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv"

# Φόρτωση δεδομένων αστυνομικών τμημάτων
police_stations_df = spark.read.csv(police_stations_path, header=True, inferSchema=True)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType

# Εγγραφή Sedona functions
SedonaRegistrator.registerAll(spark)

# Μετατροπή δεδομένων εγκλημάτων σε γεωμετρικά σημεία
filtered_crime_df = filtered_crime_df.withColumn("geom", ST_Point("LON", "LAT"))

# Μετατροπή δεδομένων αστυνομικών τμημάτων σε γεωμετρικά σημεία
police_stations_df = police_stations_df.withColumn("geom", ST_Point("X", "Y"))

police_stations_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- FID: integer (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- PREC: integer (nullable = true)
 |-- geom: geometry (nullable = true)

In [54]:
# Υπολογισμός απόστασης μεταξύ σημείων εγκλημάτων και αστυνομικών τμημάτων
crime_with_distance = filtered_crime_df.crossJoin(police_stations_df).withColumn(
    "distance",
    ST_DistanceSphere(filtered_crime_df["geom"], police_stations_df["geom"]) / 1000  # Απόσταση σε χιλιόμετρα
)

# Εύρεση του πλησιέστερου τμήματος για κάθε έγκλημα
window_spec = Window.partitionBy("DR_NO").orderBy("distance")  # Υποθέτω ότι "DR_NO" είναι το μοναδικό ID εγκλήματος
crime_with_nearest_station = crime_with_distance.withColumn(
    "row_num",
    row_number().over(window_spec)
).filter("row_num = 1").drop("row_num")

# Ομαδοποίηση ανά αστυνομικό τμήμα
result_df = crime_with_nearest_station.groupBy("DIVISION").agg(
    count("*").alias("Total Crimes"),
    avg("distance").alias("Average Distance")
).orderBy("Total Crimes", ascending=False)

# Εμφάνιση των αποτελεσμάτων
result_df.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------+------------------+
|DIVISION        |Total Crimes|Average Distance  |
+----------------+------------+------------------+
|HOLLYWOOD       |224340      |2.0762639601787183|
|VAN NUYS        |210134      |2.953369742819781 |
|SOUTHWEST       |188901      |2.191398805780884 |
|WILSHIRE        |185996      |2.592665532978764 |
|77TH STREET     |171827      |1.716544971970095 |
|OLYMPIC         |170897      |1.7236036971780955|
|NORTH HOLLYWOOD |167854      |2.6430060941415636|
|PACIFIC         |161359      |3.850070655307917 |
|CENTRAL         |153871      |0.9924764374568908|
|RAMPART         |152736      |1.5345341879190124|
|SOUTHEAST       |152176      |2.4218662158881803|
|WEST VALLEY     |138643      |3.0356712163140696|
|TOPANGA         |138217      |3.2969548417555536|
|FOOTHILL        |134896      |4.250921708424981 |
|HARBOR          |126747      |3.702561599356522 |
|HOLLENBECK      |115837      |2.680181237706819 |
|WEST LOS ANGELES|115781      |