# Advanced Database Systems - NTUA - 2023


## Project Scope

## 

### Contributors

Dimitris Vasios 03119404

Thodoris - Angelos Mexis 03118408

### Script to enable the cluster
**Cluster Specification**

Namenode

Datanodes

In [1]:
# Pyspark Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.functions import col, count, when, to_timestamp, udf, rank
from pyspark.sql.functions import year, month, count, dense_rank
from pyspark.sql.window import Window
from pyspark.conf import SparkConf

# Other Libraries
import os
import subprocess as sp
import pandas as pd
import geopy.distance

In [None]:
# Download necessary data for the project
sp.call(['bash', '../scripts/import_data.sh'])

In [None]:
# Start Cluster
sp.call(['bash', '../scripts/cluster_initiate.sh'])

In [None]:
# Combine Primary Data into one csv
crime_data_2010_2019 = pd.read_csv('../data/primary/crime_data_2010_2019.csv')
crime_data_2020_present = pd.read_csv('../data/primary/crime_data_2020_present.csv')
crime_data = pd.concat([crime_data_2010_2019, crime_data_2020_present], ignore_index=True)
crime_data.to_csv('../data/primary/crime_data.csv', index=False)

In [None]:
# Load Data to HDFS
sp.call(['bash', '../scripts/load_data_to_hdfs.sh'])

## Create a Spark Session

In [10]:
# Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ['SPARK_HOME'] = "/home/user/opt/spark"
os.environ['HADOOP_HOME'] = "/home/user/opt/hadoop"
os.environ['PYSPARK PYTHON'] = "~/envs/venv/bin/python"

# Spark Config
conf = SparkConf() \
    .setAppName("AdvDB Project") \
    .set("spark.master", "yarn") \
    .set("spark.eventLog.enabled", "true") \
    .set("spark.eventLog.dir", "hdfs://okeanos-master:54310/spark.eventLog") \
    .set("spark.history.fs.logDirectory", "hdfs://okeanos-master:54310/spark.eventLog") \
    .set("spark.driver.memory", "1g") \
    .set("spark.executor.memory", "1g") \
    .set("spark.executor.cores", "1") \
    .set("spark.yarn.dist.archives", "pyspark_venv.tar.gz#venv") 

# Start a Spark Session
sc = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate() 

In [13]:
# Crime Data Schema
crime_data_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", StringType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", StringType()),
    StructField("LON", StringType()),
])

crime_data_df = sc.read.format('csv') \
    .options(header='true') \
    .schema(crime_data_schema) \
    .load("hdfs://okeanos-master:54310/user/data/primary/crime_data.csv")

# Change Columns types
crime_data_df = crime_data_df.withColumn('Date Rptd', to_timestamp('Date Rptd', 'MM/dd/yyyy hh:mm:ss a')) \
                             .withColumn('DATE OCC', to_timestamp('DATE OCC', 'MM/dd/yyyy hh:mm:ss a')) \
                             .withColumn('TIME OCC', col('TIME OCC').cast('int')) \
                             .withColumn('Vict Age', col('Vict Age').cast('int')) \
                             .withColumn('LAT',col('LAT').cast('double')) \
                             .withColumn('LON', col('LON').cast('double'))

## Print some info

In [14]:
# Print Total Crime Data Rows
rows = crime_data_df.count()
print(f"Crime Data Total Rows : {rows}")

# Print Crime Data Types
crime_data_types = crime_data_df.dtypes
print(f"Date Rptd: {crime_data_types[1][1]}")
print(f"DATE OCC: {crime_data_types[2][1]}")
print(f"Vict Age: {crime_data_types[11][1]}")
print(f"LAT: {crime_data_types[26][1]}")
print(f"LON: {crime_data_types[27][1]}")



Crime Data Total Rows : 2993433
Date Rptd: timestamp
DATE OCC: timestamp
Vict Age: int
LAT: double
LON: double


                                                                                

In [15]:
# ---- QUERY 1 | DATAFRAME API ----

# Keep specific columns from the dataframe
crime_data_date = crime_data_df.select('Date Rptd')

# Extract year and month from the 'date_occ' column
crime_data_year_month = crime_data_date.withColumn('Year', year('Date Rptd')) \
                                       .withColumn('Month', month('Date Rptd'))

# Calculate counts for each year and month
counts = crime_data_year_month.groupBy('Year', 'Month').agg(count('*').alias('crimetotal'))

# Order by Year and Total Crimes Crimes
partitioned = Window.partitionBy('Year').orderBy(counts['crimetotal'].desc())

# Add a rank column to the DataFrame
ranked_df = counts.withColumn('rnk', dense_rank().over(partitioned))

# Filter the top 3 counts for each year
top3_df = ranked_df.filter('rnk <= 3')

# Rename the rank column
top3 = top3_df.withColumnRenamed('rnk', '#')

# Show the results
top3.show(50)

                                                                                

+----+-----+----------+---+
|Year|Month|crimetotal|  #|
+----+-----+----------+---+
|2010|    3|     17595|  1|
|2010|    7|     17520|  2|
|2010|    5|     17338|  3|
|2011|    8|     17139|  1|
|2011|    5|     17050|  2|
|2011|    3|     16951|  3|
|2012|    8|     17696|  1|
|2012|   10|     17477|  2|
|2012|    5|     17391|  3|
|2013|    8|     17329|  1|
|2013|    7|     16714|  2|
|2013|    5|     16671|  3|
|2014|    7|     14059|  1|
|2014|   10|     14031|  2|
|2014|    9|     13799|  3|
|2015|    8|     18951|  1|
|2015|   10|     18916|  2|
|2015|    7|     18528|  3|
|2016|    8|     19779|  1|
|2016|   10|     19615|  2|
|2016|    7|     19262|  3|
|2017|   10|     20400|  1|
|2017|    8|     20086|  2|
|2017|    7|     19997|  3|
|2018|    5|     20248|  1|
|2018|    7|     19972|  2|
|2018|   10|     19814|  3|
|2019|    7|     19338|  1|
|2019|    8|     19074|  2|
|2019|    3|     18932|  3|
|2020|    1|     18488|  1|
|2020|    2|     17436|  2|
|2020|    7|     172

In [None]:
# ---- Query 1 | SQL API ----
query_1_sql = """ with MonthlyCrimeCounts AS ( 
  SELECT  
    EXTRACT(YEAR FROM `Date Rptd`) AS Year, 
    EXTRACT(MONTH FROM `Date Rptd`) AS Month, 
    COUNT(*) AS crime_count,  
    ROW_NUMBER() OVER (PARTITION BY EXTRACT(YEAR FROM `Date Rptd`) ORDER BY COUNT(*) DESC) AS rn 
  FROM 
    crime_data
  GROUP BY 
    Year, 
    Month 
) 

SELECT 
  Year, 
  Month, 
  crime_count, 
  rn AS month_rank  
FROM 
  MonthlyCrimeCounts 
WHERE 
  rn <= 3 
ORDER BY 
  Year ASC, 
  crime_count DESC; """

crime_data_df.createOrReplaceTempView("crime_data")
crime_data_query_1 = sc.sql(query_1_sql)
crime_data_query_1.show()


In [16]:
# ---- Query 2 | Dataframe API ----

query_2 = crime_data_df \
    .filter(crime_data_df['Premis Desc'] == 'STREET') \
    .withColumn( 
        'PartOfDay', 
        when((crime_data_df['TIME OCC'] >= 500) & (crime_data_df['TIME OCC'] < 1200), 'Morning') \
        .when((crime_data_df['TIME OCC'] >= 1200) & (crime_data_df['TIME OCC'] < 1700), 'Noon') \
        .when((crime_data_df['TIME OCC'] >= 1700) & (crime_data_df['TIME OCC'] < 2100), 'Afternoon') \
        .when((crime_data_df['TIME OCC'] >= 2100) & (crime_data_df['TIME OCC'] < 2400) |
              (crime_data_df['TIME OCC'] >= 0) & (crime_data_df['TIME OCC'] < 500), 'Night') \
        .otherwise('NoPartOfDay')) \
    .select(col('TIME OCC').alias('time'), col('PartOfDay')) \
    .groupBy(col('PartOfDay')).agg(count('*').alias('NumberOfCrimes')) \
    .orderBy(col('NumberOfCrimes').desc()) 

# Print Output
query_2.show()




+---------+--------------+
|PartOfDay|NumberOfCrimes|
+---------+--------------+
|    Night|        237605|
|Afternoon|        187306|
|     Noon|        148180|
|  Morning|        123846|
+---------+--------------+



                                                                                

In [None]:
# ----- Query 2 | SQL API

crime_data_df.createOrReplaceTempView("crime_data")

query_2_sql = """

WITH OnlyInStreet AS (
  SELECT 
    `TIME OCC` as time,
    CASE 
      WHEN time >= 500 AND time < 1200 THEN 'Morning'
      WHEN time >= 1200 AND time < 1700 THEN 'Noon' 
      WHEN time >= 1700 AND time < 2100 THEN 'Afternoon' 
      WHEN time >= 2100 AND time < 2400 OR time >=0 AND time < 500 THEN 'Night' 
    END AS PartOfDay
  FROM
    crime_data
  WHERE
    `Premis Desc`='STREET'
)

SELECT 
  PartOfDay,
  COUNT(PartOFDay) as NumberOfCrimes
FROM
  OnlyInStreet
GROUP BY
  PartOfDay
ORDER BY
  NumberOfCrimes DESC;
"""

crime_data_query = sc.sql(query_2_sql).show()

In [None]:
# RDD Dataframe
rdd_from_df = crime_data_df.rdd

In [None]:
# ---- Query 2 | RDD API ----

def categorize(x): 
    if x[0] >= 500 and x[0] < 1200:
        return ('Morning', 1)
    elif x[0] >= 1200 and x[0] < 1700:
        return ('Noon', 1)
    elif x[0] >= 1700 and x[0] < 2100:
        return ('Afternoon', 1)
    elif (x[0] >= 2100 and x[0] < 2400) or (x[0] >= 0 and x[0] < 500):
        return ('Night', 1)
        
# rdd = sc.textFile("hdfs://okeanos-master:54310/user/data/primary/crime_data.csv") 
# rdd = sc.textFile("../data/primary/crime_data.csv") 

data_rdd = rdd_from_df \
                .map(lambda x: (int(x[3]), str(x[15]))) \
                .filter(lambda x: x[1] == "STREET") \
                .map(categorize) \
                .reduceByKey(lambda x, y: x+y) 

result = data_rdd.take(4)

In [None]:
columns = [ 'PartOfDay', 'NumberOfCrimes']
rows = []
for i in result: 
    rows.append([i[0], i[1]])
df = pd.DataFrame(rows, columns=columns)
print(df.sort_values(by='NumberOfCrimes', ascending=False, ignore_index=True))

In [17]:
# Load Secondary Data --- Median Income & Revgecoding

# --------------------------------------------------------------------------------------------------------- |

# Median Income 2015 Schema
median_income_schema = StructType([
    StructField("Zip Code", StringType()),
    StructField("Community", StringType()),
    StructField("Estimated Median Income", StringType())
])

# Load data from dfs
median_income_df = sc \
    .read.format('csv') \
    .options(header='true') \
    .schema(median_income_schema) \
    .load("hdfs://okeanos-master:54310/user/data/secondary/median_household_incomes/LA_income_2015.csv")

# Function to alter 'estimated median income' to integer | $XX,YYY -> XXYYY
alter_median_income_col = udf(lambda x: int(x.replace('$', '').replace(',', '')))

# Change Column Types & Drop Duplicates
median_income_df = median_income_df \
    .withColumn("Zip Code", col("Zip Code").cast('int')) \
    .withColumn("Estimated Median Income", alter_median_income_col(col("Estimated Median Income"))) \
    .drop_duplicates(["Zip Code"]) \
    .select(col("Zip Code").alias("zipcode"), col("Estimated Median Income").alias('median_income'))

# ---------------------------------------------------------------------------------------------------------- |

# Revgecoding Schema
revgecoding_schema = StructType([
    StructField("LAT", StringType()),
    StructField("LON", StringType()),
    StructField("ZIPcode", StringType()),
])

# Load Data from DFS
revgecoding_df =  sc \
    .read.format('csv') \
    .options(header='true') \
    .schema(revgecoding_schema) \
    .load("hdfs://okeanos-master:54310/user/data/secondary/revgecoding.csv")

# Change Column Types
revgecoding_df = revgecoding_df \
    .withColumn("LAT", col("LAT").cast("double")) \
    .withColumn("LON", col("LON").cast("double")) \
    .withColumn("ZIPcode", col("ZIPcode").cast("int")) \
    .select(col("LAT"), col("LON"), col("ZIPcode").alias("zipcode"))

In [18]:
# Join Secondary Dataframes to the main
income_from_coordinates_df = median_income_df.join(revgecoding_df, on="zipcode")

joined_crime_data_df = crime_data_df \
    .join(income_from_coordinates_df, (crime_data_df["LAT"] == income_from_coordinates_df["LAT"]) & (crime_data_df["LON"] == income_from_coordinates_df["LON"])) \
    .select(col('Vict Descent').alias('descent'), col('median_income'))

In [19]:
# Dictionary for Descent
descent_dict = {
    'A': 'Other Asian',
    'B': 'Black',
    'C': 'Chinese',
    'D': 'Cambodian',
    'F': 'Filipino',
    'G': 'Guamanian',
    'H': 'Hispanic/Latin/Mexican',
    'I': 'American Indian/Alaskan Native',
    'J': 'Japanese',
    'K': 'Korean',
    'L': 'Laotian',
    'O': 'Other',
    'P': 'Pacific Islander',
    'S': 'Samoan',
    'U': 'Hawaiian',
    'V': 'Vietnamese',
    'W': 'White',
    'X': 'Unknown',
    'Z': 'Asian Indian'
}

get_descent = udf(lambda x: descent_dict[x])

In [20]:
# ----- Query 3 | Dataframe API
query_3_filtered = joined_crime_data_df \
    .filter((col("descent").isNotNull()) & (col("descent") != '')) 

# Ranked Desc
window_spec_desc = Window.orderBy(col("median_income").desc())
ranked_desc_df = query_3_filtered \
    .withColumn("Rank", dense_rank().over(window_spec_desc)) 

# Ranked Asc
window_spec_asc = Window.orderBy(col("median_income").asc())
ranked_asc_df = query_3_filtered \
    .withColumn("Rank", dense_rank().over(window_spec_asc))

# Query
def query_3 (ranked, num): 
    return ranked \
    .filter(col("Rank") == num) \
    .groupBy("descent").agg(count('*').alias("#")) \
    .withColumn("descent", get_descent(col("descent"))) \
    .orderBy(col("#").desc())

# Print first 3 
for i in range(1,4):
    q = query_3(ranked_desc_df, i)
    q.show()

# Print last 3 
for i in range(1,4):
    q = query_3(ranked_asc_df, i)
    q.show()

24/01/17 15:32:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:07 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+---+
|             descent|  #|
+--------------------+---+
|               White| 73|
|               Other|  9|
|Hispanic/Latin/Me...|  8|
|             Unknown|  3|
|               Black|  2|
|         Other Asian|  1|
+--------------------+---+



24/01/17 15:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:42 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:32:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+----+
|             descent|   #|
+--------------------+----+
|               White|1927|
|               Other| 698|
|             Unknown| 534|
|Hispanic/Latin/Me...| 369|
|               Black| 265|
|         Other Asian| 141|
|             Chinese|  16|
|              Korean|   7|
|            Japanese|   5|
|        Asian Indian|   4|
|           Guamanian|   1|
|          Vietnamese|   1|
|            Hawaiian|   1|
+--------------------+----+



24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+----+
|             descent|   #|
+--------------------+----+
|               White|5556|
|               Other|1714|
|Hispanic/Latin/Me...|1155|
|               Black| 734|
|             Unknown| 622|
|         Other Asian| 237|
|            Filipino|  19|
|American Indian/A...|   9|
|        Asian Indian|   5|
|              Korean|   4|
|            Japanese|   4|
|             Chinese|   3|
|    Pacific Islander|   2|
+--------------------+----+



24/01/17 15:33:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:33:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+----+
|             descent|   #|
+--------------------+----+
|               White|5301|
|Hispanic/Latin/Me...|1400|
|               Other|1286|
|             Unknown|1131|
|               Black| 471|
|         Other Asian| 242|
|            Filipino|  19|
|             Chinese|   8|
|American Indian/A...|   7|
|        Asian Indian|   7|
|              Korean|   5|
|            Japanese|   5|
|          Vietnamese|   3|
|    Pacific Islander|   2|
+--------------------+----+



24/01/17 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+----+
|             descent|   #|
+--------------------+----+
|               White|5557|
|Hispanic/Latin/Me...| 914|
|               Other| 834|
|             Unknown| 783|
|               Black| 723|
|         Other Asian| 291|
|              Korean|  10|
|             Chinese|   9|
|            Filipino|   4|
|            Japanese|   4|
|American Indian/A...|   2|
|            Hawaiian|   2|
|          Vietnamese|   1|
+--------------------+----+



24/01/17 15:34:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:46 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 15:34:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/01/17 1

+--------------------+---+
|             descent|  #|
+--------------------+---+
|               White|677|
|               Other|310|
|Hispanic/Latin/Me...|181|
|         Other Asian|116|
|               Black| 54|
|             Unknown| 50|
|             Chinese|  5|
|              Korean|  5|
|            Filipino|  4|
|            Japanese|  3|
|American Indian/A...|  2|
|          Vietnamese|  1|
+--------------------+---+



                                                                                

In [None]:
# ----- Query 3 | SQL API

joined_crime_data_df.createOrReplaceTempView("joined_crime_data")

def query_3_sql(num, type):
  return f"""

  with ranked as (
    select 
      descent,
      median_income,
      dense_rank() over (order by `median_income` {type}) as rank
    from joined_crime_data
    where 
      descent is not null and trim(descent) != ''
  ) 

  select 
    descent,
    count(`median_income`) as rnk
  from 
    ranked
  where
    rank={num}
  group by descent
  order by rnk desc;
"""

# Print first three
for i in range(1,4):
  sc.sql(query_3_sql(i, 'desc')).show()

# Print last three
for i in range(1,4):
  sc.sql(query_3_sql(i, 'desc')).show()

In [21]:
# Pre-process Secondary Police Stations Data

#  Median Income 2015 Schema
police_stations_schema = StructType([
    StructField("X", StringType()),
    StructField("Y", StringType()),
    StructField("FID", StringType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", StringType())
])

# Load data from dfs
police_stations_df = sc \
    .read.format('csv') \
    .options(header='true') \
    .schema(police_stations_schema) \
    .load("hdfs://okeanos-master:54310/user/data/secondary/LAPD_Police_Stations.csv")

# Change Column Types and Select with aliased names
police_stations_df = police_stations_df \
    .withColumn("X", col("X").cast("double")) \
    .withColumn("Y", col("Y").cast("double")) \
    .select(col("X").alias("police_station_lat"), col("Y").alias("police_station_lon"), col("PREC").alias("police_station")) 

# Join Data
joined_police_station_df = crime_data_df \
    .withColumn("AREA", col("AREA").cast('int')) \
    .select(col("Weapon Used Cd").alias("weapon"), col("LAT").alias("crime_lat"), col("LON").alias("crime_lon"), col("AREA").alias("police_station")) \
    .join(police_stations_df, on="police_station")

In [22]:
# Register Distance Function
def get_distance_geopy (lat1, lon1, lat2, lon2):
    return geopy.distance.geodesic((lat1, lon2), (lat2, lon2))

get_distance = udf(get_distance_geopy)

In [23]:
# ---- Query 4 | Dataframe API ----

filtered_df =  joined_police_station_df \
    .filter(joined_police_station_df['weapon'] != 'NULL') \
    .withColumn("Distance", get_distance(col("crime_lat"), col("crime_lon"), col("police_station_lat"), col("police_station_lon")))

filtered_df.limit(100).show()

24/01/17 15:35:37 WARN TaskSetManager: Lost task 0.0 in stage 61.0 (TID 97) (okeanos-master executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer, 

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
    return self.loads(obj)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1705486182927_0023/container_1705486182927_0023_01_000003/pyspark.zip/pyspark/cloudpickle/cloudpickle.py", line 649, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'geopy'


In [None]:
# ----- Query 4 | SQL API


# Σύνδεση μεταξύ των τμημάτων και των εγκλημάτων
joined_df = crime_df.join(police_station_df, col("AREA") == col("PRECINCT"), "inner")

# 3a : Year
query_3a = """
    SELECT
        EXTRACT(YEAR FROM to_date(`DATE OCC`, 'MM/dd/yyyy hh:mm:ss a')) AS Year,
        COUNT(*) AS CrimeCount,
        AVG(distance_from_police) AS AvgDistance
    FROM (
        SELECT
            `DATE OCC`,
            CASE WHEN `Crm Cd 1` LIKE '1%' OR `Crm Cd 2` LIKE '1%' OR `Crm Cd 3` LIKE '1%' OR `Crm Cd 4` LIKE '1%' THEN 1 ELSE 0 END AS is_firearm_used,
            ST_DISTANCE(ST_POINT(`LON`, `LAT`), ST_POINT(`police_lon`, `police_lat`)) AS distance_from_police
        FROM joined_data
        WHERE `LON` IS NOT NULL AND `LAT` IS NOT NULL
    ) AS subquery
    GROUP BY Year
    ORDER BY Year
"""

result_a = spark.sql(query_3a)

# 3b : Police Station
query_3b = """
    SELECT
        `PRECINCT`,
        COUNT(*) AS CrimeCount,
        AVG(distance_from_police) AS AvgDistance
    FROM (
        SELECT
            `PRECINCT`,
            CASE WHEN `Crm Cd 1` LIKE '1%' OR `Crm Cd 2` LIKE '1%' OR `Crm Cd 3` LIKE '1%' OR `Crm Cd 4` LIKE '1%' THEN 1 ELSE 0 END AS is_firearm_used,
            ST_DISTANCE(ST_POINT(`LON`, `LAT`), ST_POINT(`police_lon`, `police_lat`)) AS distance_from_police
        FROM joined_data
        WHERE `LON` IS NOT NULL AND `LAT` IS NOT NULL
    ) AS subquery
    GROUP BY `PRECINCT`
    ORDER BY CrimeCount DESC
"""

result_b = spark.sql(query_3b)

# Εμφάνιση αποτελεσμάτων
print("per Year")
result_a.show()

print("per Police Station")
result_b.show()
