# 🚀 Spark 


In [34]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName('Project') \
        .config("spark.master", "yarn") \
        .config("spark.executor.instances", "4") \
        .config("spark.executor.cores", "1") \
        .config("spark.executor.memory", "1g") \
        .getOrCreate()

23/12/28 20:57:44 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# Ζητούμενο 2

In [13]:
from pyspark.sql.functions import col, to_date, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType


# Read the CSVs file into a DataFrames
df1 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2010_to_2019.csv', header=True, inferSchema=True)
df2 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2020_to_Present.csv', header=True, inferSchema=True)

df = df1.union(df2)

df = df.withColumn("Date Rptd", to_date(col("Date Rptd"), 'MM/dd/yyyy hh:mm:ss a'))
df = df.withColumn("DATE OCC", to_date(col("DATE OCC"), 'MM/dd/yyyy hh:mm:ss a'))

                                                                                

In [16]:
print("Number of rows in the DataFrame:")
df.count()

Number of rows in the DataFrame:


                                                                                

2988445

# Ζητούμενο 3

## DataFrame API


In [57]:
from pyspark.sql.functions import year, month, count, row_number
from pyspark.sql import Window

date_rptd = df.select('Date Rptd')
date_rptd = date_rptd.withColumn("Year", year("Date Rptd")).withColumn("Month", month("Date Rptd")).drop("Date Rptd")


crime_total = date_rptd.groupBy("Year", "Month").agg(count("*").alias("crime_total"))

# Define a window specification to partition by the "Year" column and order by the "crime_total" column
window_spec = Window().partitionBy("Year").orderBy(col("crime_total").desc())

# Use the row_number function to assign row numbers within each group
df_sorted = crime_total.withColumn("row_number", row_number().over(window_spec))

# Filter to keep only the top three within each group
df_top_three_DF = df_sorted.filter(col("row_number") <= 3)

df_top_three_DF.show(truncate=False)



+----+-----+-----------+----------+
|Year|Month|crime_total|row_number|
+----+-----+-----------+----------+
|2010|3    |17595      |1         |
|2010|7    |17520      |2         |
|2010|5    |17338      |3         |
|2011|8    |17139      |1         |
|2011|5    |17050      |2         |
|2011|3    |16951      |3         |
|2012|8    |17696      |1         |
|2012|10   |17477      |2         |
|2012|5    |17391      |3         |
|2013|8    |17329      |1         |
|2013|7    |16714      |2         |
|2013|5    |16671      |3         |
|2014|10   |12789      |1         |
|2014|7    |12696      |2         |
|2014|9    |12498      |3         |
|2015|8    |18951      |1         |
|2015|10   |18916      |2         |
|2015|7    |18528      |3         |
|2016|8    |19779      |1         |
|2016|10   |19615      |2         |
+----+-----+-----------+----------+
only showing top 20 rows



                                                                                

In [21]:
# Save the DataFrame to a CSV file
df_top_three_DF \
  .coalesce(1) \
  .write \
  .mode('overwrite') \
  .option('header', 'true') \
  .csv('results/q1Dt.csv')
# df_top_three_DF.write.csv("results/q1Dt.csv", header=True,  mode="overwrite")

import subprocess

hdfs_path = "hdfs://okeanos-master:54310/user/user/results/q1Dt.csv"
local_path = "/home/user/Project/results/"

subprocess.run(["hadoop", "fs", "-copyToLocal", hdfs_path, local_path])

                                                                                

CompletedProcess(args=['hadoop', 'fs', '-copyToLocal', 'hdfs://okeanos-master:54310/user/user/results/q1Dt.csv', '/home/user/Project/results/'], returncode=0)

## SQL API

In [14]:
# Register the DataFrame as a temporary SQL table
df.createOrReplaceTempView("crime_data")

# Write the SQL query
sql_query = """
    SELECT Year, Month, crime_total, row_number
    FROM (
        SELECT Year, Month, crime_total,
               ROW_NUMBER() OVER (PARTITION BY Year ORDER BY crime_total DESC) AS row_number
        FROM (
            SELECT YEAR(`Date Rptd`) AS Year, MONTH(`Date Rptd`) AS Month, COUNT(*) AS crime_total
            FROM crime_data
            GROUP BY Year, Month
        ) tmp
    ) tmp2
    WHERE row_number <= 3
"""

# Execute the SQL query
df_top_three_sql = spark.sql(sql_query)

# Show the result
df_top_three_sql.show(truncate=False)



+----+-----+-----------+----------+
|Year|Month|crime_total|row_number|
+----+-----+-----------+----------+
|2010|3    |17595      |1         |
|2010|7    |17520      |2         |
|2010|5    |17338      |3         |
|2011|8    |17139      |1         |
|2011|5    |17050      |2         |
|2011|3    |16951      |3         |
|2012|8    |17696      |1         |
|2012|10   |17477      |2         |
|2012|5    |17391      |3         |
|2013|8    |17329      |1         |
|2013|7    |16714      |2         |
|2013|5    |16671      |3         |
|2014|10   |12789      |1         |
|2014|7    |12696      |2         |
|2014|9    |12498      |3         |
|2015|8    |18951      |1         |
|2015|10   |18916      |2         |
|2015|7    |18528      |3         |
|2016|8    |19779      |1         |
|2016|10   |19615      |2         |
+----+-----+-----------+----------+
only showing top 20 rows



                                                                                

In [22]:
# Save the DataFrame to a CSV file
df_top_three_sql.write.csv("results/q1SQL.csv", header=True,  mode="overwrite")

import subprocess

hdfs_path = "hdfs://okeanos-master:54310/user/user/results/q1SQL.csv"
local_path = "/home/user/Project/results/"

subprocess.run(["hadoop", "fs", "-copyToLocal", hdfs_path, local_path])

                                                                                

CompletedProcess(args=['hadoop', 'fs', '-copyToLocal', 'hdfs://okeanos-master:54310/user/user/results/q1SQL.csv', '/home/user/Project/results/'], returncode=0)

In [23]:
is_same = df_top_three_DF.exceptAll(df_top_three_sql).count() == 0
if is_same:
    print("The DataFrames are identical.")
else:
    print("The DataFrames are different.")



The DataFrames are identical.


                                                                                

# Ζητούμενο 4

## DataFrame API

In [13]:
from pyspark.sql.functions import col, unix_timestamp, from_unixtime, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Read the CSVs file into a DataFrames
df1 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2010_to_2019.csv', header=True, inferSchema=False).select("TIME OCC","Premis Cd")
df2 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2020_to_Present.csv', header=True, inferSchema=False).select("TIME OCC","Premis Cd")

df = df1.union(df2)


# Convert the 'TIME OCC' column to a timestamp
df = df.withColumn(
    "TIME OCC",
    from_unixtime(unix_timestamp(col("TIME OCC"), "HHmm")).cast("timestamp")
)

df = df.withColumn(
    "TIME OCC",
    date_format(col("TIME OCC").cast("timestamp"), "HH:mm:ss")
)

df = df.withColumn(
    "Premis Cd",
    col("Premis Cd").cast("int"))

df.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------+---------+
|TIME OCC|Premis Cd|
+--------+---------+
|13:50:00|      501|
|00:45:00|      101|
|15:15:00|      103|
|01:50:00|      101|
|21:00:00|      103|
|16:50:00|      404|
|20:05:00|      101|
|21:00:00|      710|
|02:30:00|      108|
|21:00:00|      710|
|14:45:00|      101|
|20:00:00|      101|
|02:45:00|      102|
|17:45:00|      738|
|20:30:00|      102|
|17:35:00|      103|
|12:25:00|      502|
|11:00:00|      101|
|20:00:00|      502|
|18:20:00|      102|
+--------+---------+
only showing top 20 rows



                                                                                

In [14]:
from pyspark.sql.functions import col, when, sum

filtered_df = df.filter(col("Premis Cd") == 101).select("TIME OCC")

# Define time intervals
morning_interval = ((col("TIME OCC") >= "05:00:00") & (col("TIME OCC") < "12:00:00"))
afternoon_interval = ((col("TIME OCC") >= "12:00:00") & (col("TIME OCC") < "17:00:00"))
evening_interval = ((col("TIME OCC") >= "17:00:00") & (col("TIME OCC") < "21:00:00"))
night_interval = ((col("TIME OCC") >= "21:00:00") | (col("TIME OCC") < "05:00:00"))

# Apply conditions and sum within each interval
result_df = filtered_df.groupBy().agg(
    sum(when(morning_interval, 1).otherwise(0)).alias("Morning"),
    sum(when(afternoon_interval, 1).otherwise(0)).alias("Afternoon"),
    sum(when(evening_interval, 1).otherwise(0)).alias("Evening"),
    sum(when(night_interval, 1).otherwise(0)).alias("Night")
)

# Show the result
result_df.show(truncate=False)




+-------+---------+-------+------+
|Morning|Afternoon|Evening|Night |
+-------+---------+-------+------+
|123748 |148077   |186896 |237137|
+-------+---------+-------+------+



                                                                                

In [None]:
[('Afternoon', 126476), ('Night', 205687), ('Morning', 107927), ('Evening', 165672)]

## RDD API

In [17]:
spark.stop()

In [18]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("RDD query") \
    .getOrCreate() \
    .sparkContext

23/12/27 23:23:05 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [34]:
import datetime
import csv

# Load the first CSV file into an RDD
rdd1 = spark.textFile("hdfs://okeanos-master:54310/user/project/Crime_Data_from_2010_to_2019.csv") \
    .map(lambda x: next(csv.reader([x])))
    
header1 = rdd1.first()
rdd1 = rdd1.filter(lambda row: row != header1)

# Load the second CSV file into an RDD
rdd2 = spark.textFile("hdfs://okeanos-master:54310/user/project/Crime_Data_from_2020_to_Present.csv") \
    .map(lambda x: next(csv.reader([x])))

    
header2 = rdd2.first()  
rdd2 = rdd2.filter(lambda row: row != header2)

# Merge the two RDDs
rdd = rdd1.union(rdd2)
rdd = rdd.map(lambda col: (col[3], col[14]))




filtered_rdd = rdd.filter(lambda row: (row[1] == '101') or (row[1] == 101) )

def get_interval(time_occ):
    # Convert the time_occ to a datetime object for easier comparison
    time_object = datetime.datetime.strptime(time_occ, "%H%M")

    if datetime.time(5, 0) <= time_object.time() < datetime.time(12, 0):
        return "Morning"
    elif datetime.time(12, 0) <= time_object.time() < datetime.time(17, 0):
        return "Afternoon"
    elif datetime.time(17, 0) <= time_object.time() < datetime.time(21, 0):
        return "Evening"
    elif (datetime.time(21, 0) <= time_object.time()) or  (time_object.time() < datetime.time(5, 0)):
        return "Night"

    
# Map each row to a tuple of (interval, 1)
mapped_rdd = filtered_rdd.map(lambda col: (get_interval(col[0]), 1))

# Reduce by key to sum occurrences within each interval
result_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)

print(result_rdd.collect())



[('Afternoon', 148077), ('Night', 237137), ('Morning', 123748), ('Evening', 186896)]


                                                                                

# Ζητούμενο 5

In [21]:
from pyspark.sql.functions import col, to_date, regexp_replace
from pyspark.sql.functions import year

# Read the CSVs file into a DataFrames
df1 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2010_to_2019.csv', header=True, inferSchema=True)
df2 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2020_to_Present.csv', header=True, inferSchema=True)

df = df1.union(df2)
df = df.select('Date Rptd', 'Vict Descent','LAT', 'LON').filter(col('Vict Descent').isNotNull())
df = df.withColumn("Date Rptd", to_date(col("Date Rptd"), 'MM/dd/yyyy hh:mm:ss a'))
df = df.withColumn("Year", year("Date Rptd")).drop("Date Rptd").filter(col("Year") == 2015).drop('Year')

income = spark.read.csv('hdfs://okeanos-master:54310/user/project/income/LA_income_2015.csv', header=True, inferSchema=True)
income = income.withColumn("Estimated Median Income", regexp_replace("Estimated Median Income", "[^0-9]", "").cast("int"))
geocoding = spark.read.csv('hdfs://okeanos-master:54310/user/project/revgecoding.csv', header=True, inferSchema=True)

geocoding = geocoding \
    .withColumnRenamed("LAT", "LAT_g") \
    .withColumnRenamed("LON", "LON_g") 

df = df.join(geocoding, (df.LAT == geocoding.LAT_g) & (df.LON == geocoding.LON_g)).drop("LAT","LON","LAT_g","LON_g")

# non_matching_rows = df.join(geocoding, (df.LAT == geocoding.LAT_g) & (df.LON == geocoding.LON_g), "left_anti")

distinct_geocoding = geocoding.select("ZIPcode").distinct()
filtered_income = income.join(distinct_geocoding, income["Zip Code"] == distinct_geocoding["ZIPcode"])
filtered_income = filtered_income.orderBy(col("Estimated Median Income").desc())

# Get top 3 zip codes with highest Estimated Median Income
top3 = filtered_income.limit(3)

# Get bottom 3 zip codes with lowest Estimated Median Income
filtered_income = filtered_income.orderBy(col("Estimated Median Income"))
tail3 = filtered_income.limit(3)

join_top3 = df.join(top3, (df.ZIPcode == top3.ZIPcode))
count_top3 = join_top3.groupBy("Vict Descent").count().orderBy(col("count").desc())

join_tail3 = df.join(tail3, (df.ZIPcode == top3.ZIPcode))
count_tail3 = join_tail3.groupBy("Vict Descent").count().orderBy(col("count").desc())

descent_mapping = {
    "A": "Other Asian",
    "B": "Black",
    "C": "Chinese",
    "D": "Cambodian",
    "F": "Filipino",
    "G": "Guamanian",
    "H": "Hispanic/Latin/Mexican",
    "I": "American Indian/Alaskan Native",
    "J": "Japanese",
    "K": "Korean",
    "L": "Laotian",
    "O": "Other",
    "P": "Pacific Islander",
    "S": "Samoan",
    "U": "Hawaiian",
    "V": "Vietnamese",
    "W": "White",
    "X": "Unknown",
    "Z": "Asian Indian"
}

count_top3 = count_top3.withColumn("Victim Descent", col("Vict Descent").cast("string")).replace(descent_mapping, subset=["Victim Descent"]).drop("Vict Descent")
columns_order = ["Victim Descent", "count"] 
count_top3 = count_top3.select(columns_order)
count_tail3 = count_tail3.withColumn("Victim Descent", col("Vict Descent").cast("string")).replace(descent_mapping, subset=["Victim Descent"]).drop("Vict Descent")
count_tail3 = count_tail3.select(columns_order)

                                                                                

In [23]:
count_top3.show(truncate=False)



+----------------------+-----+
|Victim Descent        |count|
+----------------------+-----+
|White                 |312  |
|Other                 |102  |
|Hispanic/Latin/Mexican|53   |
|Unknown               |26   |
|Other Asian           |16   |
|Black                 |14   |
+----------------------+-----+



                                                                                

In [24]:
count_tail3.show(truncate=False)



+------------------------------+-----+
|Victim Descent                |count|
+------------------------------+-----+
|Hispanic/Latin/Mexican        |1503 |
|Black                         |1078 |
|White                         |690  |
|Other                         |382  |
|Other Asian                   |100  |
|Unknown                       |63   |
|Korean                        |7    |
|American Indian/Alaskan Native|3    |
|Japanese                      |3    |
|Chinese                       |2    |
|Filipino                      |1    |
+------------------------------+-----+



                                                                                

In [136]:
count_tail3.show()



+------------+-----+
|Vict Descent|count|
+------------+-----+
|           H| 1503|
|           B| 1078|
|           W|  690|
|           O|  382|
|           A|  100|
|           X|   63|
|           K|    7|
|           I|    3|
|           J|    3|
|           C|    2|
|           F|    1|
+------------+-----+



                                                                                

# Ζητούμενο 6

In [59]:
import geopy.distance

from pyspark.sql.functions import col, to_date, year,udf
from pyspark.sql.types import  FloatType

def get_distance(lat1, lon1, lat2, lon2):
    return geopy.distance.geodesic((lat1, lon1), (lat2, lon2)).km

get_distance = udf(get_distance, FloatType())

# Read the CSVs file into a DataFrames
df1 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2010_to_2019.csv', header=True, inferSchema=True)
df2 = spark.read.csv('hdfs://okeanos-master:54310/user/project/Crime_Data_from_2020_to_Present.csv', header=True, inferSchema=True)

df = df1.union(df2)
df = df.withColumn("Date Rptd", to_date(col("Date Rptd"), 'MM/dd/yyyy hh:mm:ss a'))
df = df.withColumn("Year", year("Date Rptd")).drop("Date Rptd")

df = df.select("Year","AREA ","Weapon Used Cd", "LAT", "LON")
# filter fire arm crimes
df = df.filter(df["Weapon Used Cd"].cast("int").between(100, 199))

# remove Null Island entries
df = df.filter((df["LAT"] != 0) & (df["LON"] != 0))

police_stations = spark.read.csv('hdfs://okeanos-master:54310/user/project/LAPD_Police_Stations.csv', header=True, inferSchema=True)

police_stations = police_stations.select("X", "Y","PREC")

join = df.join(police_stations, (df["AREA "] == police_stations.PREC)).drop("AREA ", "Weapon Used Cd","PREC")

                                                                                

In [61]:
distance_df = join.withColumn("Distance", get_distance(col("LAT"), col("LON"), col("Y"), col("X")))

In [62]:
distance_df.show()

23/12/28 21:20:59 WARN TaskSetManager: Lost task 0.0 in stage 79.0 (TID 176) (okeanos-worker executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000002/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000002/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000002/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer,

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/worker.py", line 1231, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/worker.py", line 1067, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/worker.py", line 529, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/worker.py", line 90, in read_command
    command = serializer._read_with_length(file)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
    return self.loads(obj)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/serializers.py", line 472, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/home/user/opt/data/hadoop/nm-local-dir/usercache/user/appcache/application_1703788084972_0005/container_1703788084972_0005_01_000004/pyspark.zip/pyspark/cloudpickle/cloudpickle.py", line 649, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'geopy'


# Ζητούμενο 7