# Περιεχόμενα

## Setup

In [1]:
# Files to be used

# Paths for .csv
fcrime = "../Data/Crime_Data_from_2010_to_2019_20241101.csv"
fstations = "../Data/LA_Police_Stations.csv"

# Paths for .parquet
fcrime_parq = '../Data/Crime.parquet'
fstations_parq = '../Data/Stations.parquet'

In [2]:
# Create the spark session

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Advanced DB") \
    .config('spark.executor.instances','4') \
    .getOrCreate()

25/01/17 13:20:15 WARN Utils: Your hostname, takis-TUF-Gaming-FX505DT-FX505DT resolves to a loopback address: 127.0.1.1; using 192.168.1.217 instead (on interface enp2s0)
25/01/17 13:20:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/17 13:20:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Imports

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType, DateType
from pyspark.sql.functions import year, when, count, sum, col, row_number, to_timestamp
from pyspark.sql.window import Window
import time

Schemas:

In [4]:
# Crimes table

crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", DateType()),
    StructField("DATEOCC", DateType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df = spark.read.csv(fcrime, header=True, schema=crimes_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

In [5]:
# Stations table

stations_schema = StructType([
    StructField("X", FloatType()),
    StructField("Y", FloatType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType()),
])

stations_df = spark.read.csv(fstations, header=True, schema=stations_schema, dateFormat='MM/dd/yyyy hh:mm:ss a')

## Query 1

### Να υλοποιηθεί το __Query 1__ χρησιμοποιώντας τα DataFrame και RDD APIs. Να εκτελέσετε και τις δύο υλοποιήσεις με 4 Spark executors. Υπάρχει διαφορά στην επίδοση μεταξύ των δύο APIs; Αιτιολογήσετε την απάντησή σας.

#### DataFrame API

In [6]:
# Start timer
time_start = time.time()

In [7]:
# Categorize into age groups
categorized_df = crimes_df.withColumn('age_group',
                    when(col('Vict Age').cast('int') < 18, 'Children')
                    .when(
                        ((col('Vict Age').cast('int') >= 18) & (col('Vict Age').cast('int') <= 24)), 'Young Adults'
                        )
                    .when(
                        ((col('Vict Age').cast('int') >= 25) & (col('Vict Age').cast('int') <= 64)), 'Adults'
                        )
                    .when(col('Vict Age').cast('int') > 64, 'Elderly')
                    )

In [8]:
# Filter for 'AGGRAVATED ASSAULT'
assault_df = categorized_df.filter(
    col('Crm Cd Desc').contains('AGGRAVATED ASSAULT')
    ) \
    .groupby('age_group') \
    .agg(count('*').alias('victim_count')) \
    .orderBy(col('victim_count').desc())

25/01/17 13:20:38 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
# Show results
assault_df.show()

time_end = time.time()
time_df = time_end - time_start

                                                                                

+------------+------------+
|   age_group|victim_count|
+------------+------------+
|      Adults|       72610|
|Young Adults|       23472|
|    Children|       10724|
|     Elderly|        3099|
+------------+------------+



#### RDD API

Δυστυχώς στα δεδομένα μας έχουμε μερικές περιπτώσεις όπου υπάρχει υποδιαστολή εντός quote marks (""), δημιουργώντας έτσι θέμα στο parse.
Για αυτό θα χρησιμοποιήσουμε μια βιβλιοθήκη της Python για την ανάγνωση των csv αρχείων και θα την περάσουμε μέσω mapping σε κάθε δεδομένο:

In [10]:
import csv
from datetime import datetime

def parse_line(line):
    reader = csv.reader([line])
    fields = next(reader)
    return {
        'DR_NO': fields[0],
        'DateRptd': fields[1],
        'DATEOCC': fields[2],
        'TIMEOCC': fields[3],
        'AREA': fields[4],
        'AREANAME': fields[5],
        'RptDistNo': fields[6],
        'Part': fields[7],
        'CrmCd': fields[8],
        'CrmCdDesc': fields[9],
        'Mocodes': fields[10],
        'VictAge': int(fields[11]),
        'VictSex': fields[12],
        'VictDescent': fields[13],
        'PremisCd': fields[14],
        'PremisDesc': fields[15],
        'WeaponUsedCd': fields[16],
        'WeaponDesc': fields[17],
        'Status': fields[18],
        'CrmCd1': fields[19],
        'CrmCd2': fields[20],
        'CrmCd3': fields[21],
        'CrmCd4': fields[22],
        'LOCATION': fields[23],
        'CrossStreet': fields[24],
        'LAT': fields[25],
        'LON': fields[26]
    }


In [11]:
# Read the .csv as Text
rdd = spark.sparkContext.textFile(fcrime)

# Remove the header
header = rdd.first()
crimes_rdd = rdd.filter(lambda line: line != header).map(parse_line)

                                                                                

In [12]:
# Start timer
time_start = time.time()

In [13]:
# Map function to categorize age groups
def categorize_age(crime):
    age = crime['VictAge']
    if age < 18:
        return 'Children'
    elif age >= 18 and age <= 24:
        return 'Young Adults'
    elif age >= 25 and age <= 64:
        return 'Adults'
    else:
        return 'Elderly'
    
categorized_rdd = crimes_rdd.filter(lambda x: 'AGGRAVATED ASSAULT' in x['CrmCdDesc']) \
                    .map(lambda x: (categorize_age(x), 1)) \
                    .reduceByKey(lambda a, b: a + b) \
                    .sortBy(lambda x: -x[1])

categorized_rdd.collect()
time_end = time.time()
time_rdd = time_end - time_start

                                                                                

### Παρατηρήσεις:

Η διαφορά στην απόδοση μεταξύ των 2 μεθόδων είναι τόσο εμφανής όσο αναμέναμε.
Τα DataFrames αξιοποιούν το optimization και προσφέρουν γενικά μεγαλύτερη ταχύτητα, αν και στη συγκεκριμένη περίπτωση η διαφορά είναι μικρή.
Αντιθέτως, τα RDD προσφέρουν μεγαλύτερη ευελιξία, αφού είναι low level, στην επεξεργασία των δεδομένων. 

Πιθανόν το μέγεθος των δεδομένων να μην είναι αρκετά μεγάλο ώστε να αρχίσει να φαίνεται μια ουσιαστική διαφορά.

In [34]:
print(f"""Time taken for DataFrame API: {time_df}.
Time taken for RDD API: {time_rdd}
""")

Time taken for DataFrame API: 11.356709003448486.
Time taken for RDD API: 14.037628889083862



## Query 2

### α) Να υλοποιηθεί το __Query 2__ χρησιμοποιώντας τα DataFrame και SQL APIs. Να αναφέρετε και να συγκρίνετε τους χρόνους εκτέλεσης μεταξύ των δύο υλοποιήσεων.

#### DataFrame API

In [15]:
# Start timer
time_start = time.time()

In [16]:
# Create table yearly_precincts

yearly_precincts_df = crimes_df.join(
    stations_df,
    crimes_df.AREA.cast("int") == stations_df.FID
).groupBy(
    year(crimes_df.DateRptd).alias("year"),
    stations_df.DIVISION.alias("precinct")
).agg(
    sum(when(col('Status') != "IC", 1).otherwise(0)).alias("closed_cases"),
    count("*").alias("total_cases"),
    (
        sum(when(col('Status') != "IC", 1).otherwise(0)) * 100.0 / count("*")
    ).alias("closed_case_rate")
)

In [17]:
# Create table ranked precincts

windowSpec = Window.partitionBy('year').orderBy(col('closed_case_rate').desc())

ranked_precincts_df = yearly_precincts_df.withColumn(
    'ranking', row_number().over(windowSpec)
).select(
    col('year'),
    col('precinct'),
    col('closed_case_rate'),
    col('ranking')
)

In [18]:
# Create table rsults

results_df = ranked_precincts_df.filter(
    col('ranking') <= 3
).select(
    'year',
    'precinct',
    'closed_case_rate',
    'ranking'
).orderBy('year','ranking')

In [19]:
# Final results:

results_df.show()
time_end = time.time()
df_time = time_end - time_start
print(f'Time taken since creation of DF spark session to completion: {df_time:.2f} seconds')

25/01/17 13:21:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Date Rptd, AREA , Status
 Schema: DateRptd, AREA, Status
Expected: DateRptd but found: Date Rptd
CSV file: file:///home/takis/Documents/GitHub/Advanced-DB-2025/Data/Crime_Data_from_2010_to_2019_20241101.csv

+----+----------+------------------+-------+
|year|  precinct|  closed_case_rate|ranking|
+----+----------+------------------+-------+
|2010| SOUTHEAST|32.947355855318136|      1|
|2010|DEVONSHIRE|31.962706191728426|      2|
|2010| SOUTHWEST| 29.63203463203463|      3|
|2011|DEVONSHIRE|35.212167689161554|      1|
|2011| SOUTHEAST|32.511779630300836|      2|
|2011| SOUTHWEST| 28.65220520201501|      3|
|2012|DEVONSHIRE|34.414818310523835|      1|
|2012| SOUTHEAST|  32.9464181029429|      2|
|2012| SOUTHWEST|29.815133276010318|      3|
|2013|DEVONSHIRE| 33.52812271731191|      1|
|2013| SOUTHEAST| 32.08287360549222|      2|
|2013| SOUTHWEST|29.164224592662055|      3|
|2014|HOLLENBECK| 31.80567315834039|      1|
|2014|  WILSHIRE|31.311989956057754|      2|
|2014|  FOOTHILL|31.162790697674417|      3|
|2015|HOLLENBECK|32.641346981727736|      1|
|2015|  WILSHIRE|30.275974025974026|      2|
|2015|  FOOTHILL|30.179460678380156|      3|
|2016|HOLLENBECK|31.880755720117726|      1|
|2016|  WI

                                                                                

#### SQL API

In [20]:
# Start timer
time_start = time.time()

In [21]:
# Crimes Table

crimes_schema_sql = StructType([
    StructField("DR_NO", StringType()),
    StructField("DateRptd", StringType()),
    StructField("DATEOCC", StringType()),
    StructField("TIMEOCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREANAME", StringType()),
    StructField("RptDistNo", StringType()),
    StructField("Part", IntegerType()),
    StructField("CrmCd", StringType()),
    StructField("CrmCdDesc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("VictAge", StringType()),
    StructField("VictSex", StringType()),
    StructField("VictDescent", StringType()),
    StructField("PremisCd", StringType()),
    StructField("PremisDesc", StringType()),
    StructField("WeaponUsedCd", StringType()),
    StructField("WeaponDesc", StringType()),
    StructField("Status", StringType()),
    StructField("CrmCd1", StringType()),
    StructField("CrmCd2", StringType()),
    StructField("CrmCd3", StringType()),
    StructField("CrmCd4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("CrossStreet", StringType()),
    StructField("LAT", FloatType()),
    StructField("LON", FloatType()),
])

crimes_df = spark.read.format('csv') \
    .options(header='true', dateFormat='MM/dd/yyyy hh:mm:ss a') \
    .schema(crimes_schema_sql) \
    .load(fcrime)

crimes_df = crimes_df.withColumn("DateRptd", to_timestamp("DateRptd", "MM/dd/yyyy hh:mm:ss a")) \
                     .withColumn("DATEOCC", to_timestamp("DATEOCC", "MM/dd/yyyy hh:mm:ss a"))

crimes_df.createOrReplaceTempView("crimes")

25/01/17 13:21:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [22]:
# Stations Table

stations_schema_sql = StructType([
    StructField("X", FloatType()),
    StructField("Y", FloatType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType()),
])

stations_df = spark.read.format('csv') \
    .options(header='true') \
    .schema(stations_schema_sql) \
    .load(fstations)

stations_df.createOrReplaceTempView("stations")

In [23]:
query2_sql = """WITH YearlyPrecinctStats AS ( 
    SELECT 
        YEAR(c.DateRptd) AS year,
        s.DIVISION AS precinct,
        COUNT(*) AS total_cases,
        SUM(CASE WHEN c.Status != 'IC' THEN 1 ELSE 0 END) AS closed_cases,
        SUM(CASE WHEN c.Status != 'IC' THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS closed_case_rate
    FROM crimes c
    JOIN stations s
        ON CAST(c.AREA as INTEGER) = s.FID
    GROUP BY YEAR(c.DateRptd), s.DIVISION
    ),
    rankedPrecincts AS (
        SELECT
            year,
            precinct,
            closed_case_rate,
            ROW_NUMBER() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) AS ranking
        FROM YearlyPrecinctStats
    )
    SELECT
        year,
        precinct,
        closed_case_rate,
        ranking
    FROM RankedPrecincts
    WHERE ranking <= 3
    ORDER BY year, ranking;"""

__Explanation:__ 

YearlyPrecinctStats
- We need to group our data by year and department
- Keep a count of all cases and a count of closed cases
- Create the rate as a percentage

RankedPrecincts
- From YearlyPrecinctStats keep: year, precinct and closed_case_rate
- We will need to create the ranking based on the closed cases rate of each department
- For each year assign a ranking (starting at 1) in descending order

Notes:

The symbol '#' is not supported in SQL so it was changed to ranking

In [24]:
spark.sql(query2_sql).show()
time_end = time.time()
sql_time = time_end - time_start
print(f"Time taken since creation of spark session to query completion: {sql_time:.2f} seconds")

25/01/17 13:21:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Date Rptd, AREA , Status
 Schema: DateRptd, AREA, Status
Expected: DateRptd but found: Date Rptd
CSV file: file:///home/takis/Documents/GitHub/Advanced-DB-2025/Data/Crime_Data_from_2010_to_2019_20241101.csv

+----+----------+-----------------+-------+
|year|  precinct| closed_case_rate|ranking|
+----+----------+-----------------+-------+
|2010| SOUTHEAST|32.94735585531813|      1|
|2010|DEVONSHIRE|31.96270619172842|      2|
|2010| SOUTHWEST|29.63203463203463|      3|
|2011|DEVONSHIRE|35.21216768916155|      1|
|2011| SOUTHEAST|32.51177963030083|      2|
|2011| SOUTHWEST|28.65220520201501|      3|
|2012|DEVONSHIRE|34.41481831052383|      1|
|2012| SOUTHEAST|32.94641810294290|      2|
|2012| SOUTHWEST|29.81513327601032|      3|
|2013|DEVONSHIRE|33.52812271731191|      1|
|2013| SOUTHEAST|32.08287360549222|      2|
|2013| SOUTHWEST|29.16422459266206|      3|
|2014|HOLLENBECK|31.80567315834039|      1|
|2014|  WILSHIRE|31.31198995605775|      2|
|2014|  FOOTHILL|31.16279069767442|      3|
|2015|HOLLENBECK|32.64134698172773|      1|
|2015|  WILSHIRE|30.27597402597403|      2|
|2015|  FOOTHILL|30.17946067838016|      3|
|2016|HOLLENBECK|31.88075572011773|      1|
|2016|  WILSHIRE|31.547987616099

                                                                                

#### Συμπεράσματα

In [25]:
print(f'''Time taken for DataFrame API: {df_time:.2f}
Time taken for SQL API: {sql_time:.2f}''')

Time taken for DataFrame API: 6.33
Time taken for SQL API: 6.38


Παρατηρούμε ότι οι χρόνοι εκτέλεσης δεν παρουσιάζουν μεγάλες αποκλίσεις.
Παραδόξως, οι χρόνοι εκτέλεσης για το SQL API είναι συνήθως σταθεροί για όσες φορές τρέξουμε τον κώδικα, ενώ στο DataFrame API παρατηρούμε μεγάλη απόκλιση μεταξύ των τιμών για πολλαπλές εκτελέσεις του κώδικα.

### β) Να γράψετε κώδικα Spark που μετατρέπει το κυρίως data set σε parquet file format και αποθηκεύει ένα μοναδικό .parquet αρχείο στο S3 bucket της ομάδας σας. Επιλέξτε μία από τις δύο υλοποιήσεις του υποερωτήματος α) (DataFrame ή SQL) και συγκρίνετε τους χρόνους εκτέλεσης της εφαρμογής σας όταν τα δεδομένα εισάγονται σαν .csv και σαν .parquet.

In [26]:
crimes_df = spark.read.csv(fcrime,header=True, inferSchema=True)
crimes_df.write.mode('overwrite').parquet(fcrime_parq)

stations_df = spark.read.csv(fstations,header=True, inferSchema=True)
stations_df.write.mode('overwrite').parquet(fstations_parq)

                                                                                

Θα χρησιμοποιήσουμε το DataFrame:

In [27]:
time_start = time.time()

In [28]:
crimes_df = spark.read.parquet(fcrime_parq)
stations_df = spark.read.parquet(fstations_parq)

In [29]:
crimes_df = crimes_df.withColumn('Date Rptd', to_timestamp('Date Rptd', 'MM/dd/yyyy hh:mm:ss a'))

In [30]:
# Create table yearly_precincts

yearly_precincts_df = crimes_df.join(
    stations_df,
    col('AREA ').cast("int") == stations_df.FID
).groupBy(
    year(col('Date Rptd')).alias("year"),
    stations_df.DIVISION.alias("precinct")
).agg(
    sum(when(col('Status') != "IC", 1).otherwise(0)).alias("closed_cases"),
    count("*").alias("total_cases"),
    (
        sum(when(col('Status') != "IC", 1).otherwise(0)) * 100.0 / count("*")
    ).alias("closed_case_rate")
)

In [31]:
# Create table ranked precincts

windowSpec = Window.partitionBy('year').orderBy(col('closed_case_rate').desc())

ranked_precincts_df = yearly_precincts_df.withColumn(
    'ranking', row_number().over(windowSpec)
).select(
    col('year'),
    col('precinct'),
    col('closed_case_rate'),
    col('ranking')
)

In [32]:
# Create table results

results_df = ranked_precincts_df.filter(
    col('ranking') <= 3
).select(
    'year',
    'precinct',
    'closed_case_rate',
    'ranking'
).orderBy('year','ranking')

In [33]:
# Final results:

results_df.show()
time_end = time.time()
df_time_parq = time_end - time_start
print(f'Time taken for DF with parquet to completion: {df_time_parq:.2f} seconds')

[Stage 34:>                                                         (0 + 2) / 2]

+----+----------+------------------+-------+
|year|  precinct|  closed_case_rate|ranking|
+----+----------+------------------+-------+
|2010| SOUTHEAST|32.947355855318136|      1|
|2010|DEVONSHIRE|31.962706191728426|      2|
|2010| SOUTHWEST| 29.63203463203463|      3|
|2011|DEVONSHIRE|35.212167689161554|      1|
|2011| SOUTHEAST|32.511779630300836|      2|
|2011| SOUTHWEST| 28.65220520201501|      3|
|2012|DEVONSHIRE|34.414818310523835|      1|
|2012| SOUTHEAST|  32.9464181029429|      2|
|2012| SOUTHWEST|29.815133276010318|      3|
|2013|DEVONSHIRE| 33.52812271731191|      1|
|2013| SOUTHEAST| 32.08287360549222|      2|
|2013| SOUTHWEST|29.164224592662055|      3|
|2014|HOLLENBECK| 31.80567315834039|      1|
|2014|  WILSHIRE|31.311989956057754|      2|
|2014|  FOOTHILL|31.162790697674417|      3|
|2015|HOLLENBECK|32.641346981727736|      1|
|2015|  WILSHIRE|30.275974025974026|      2|
|2015|  FOOTHILL|30.179460678380156|      3|
|2016|HOLLENBECK|31.880755720117726|      1|
|2016|  WI

                                                                                

Χρησιμοποιώντας τα parquet δεδομένα, παρατηρούμε σταθερά μια αρκετά μεγάλη βελτίωση στην ταχύτητα.

## Query 3

## Query 4

## Query 5