In [13]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.250:7077") \
        .appName("Part_B_Simon_Pislar_A3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD  API
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

In [14]:
# Read the csv-file into a DataFrame, solves B.1
df = spark_session.read.csv("hdfs://192.168.2.250:9000/parking-citations.csv", header=True, inferSchema=True)
df.show()

                                                                                

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    NULL|       NULL|            CA|         200304.0|NULL|HOND|        PA|   GY|

In [15]:
# Prints the schema of the DataFrame, solves B.2
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)


In [16]:
# Count the number of rows in the DataFrame, solves B.3
row_count = df.count()
print(f"Number of rows: {row_count}")



Number of rows: 13077724


                                                                                

In [17]:
# Count the number of partitions in the DataFrame, solves B.4
partition_count = df.rdd.getNumPartitions()
print(f"Number of partitions: {partition_count}")

Number of partitions: 16


In [18]:
# Drop the columns VIN, Latitude and Longitude, solves B.5
df = df.drop('VIN', 'Latitude', 'Longitude')

In [20]:
# Finds the maximum fine amount and the number of fines with the maximum amount, solves B.6
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

df = df.withColumn("Fine amount", col("Fine amount").cast(FloatType()))
max_fine_amount = df.agg({"Fine amount": "max"}).collect()[0][0]
count_max_fine = df.where(col("Fine amount") == max_fine_amount).count()

print(f"Maximum fine amount: {max_fine_amount}")
print(f"Number of fines with maximum amount: {count_max_fine}")



Maximum fine amount: 1100.0
Number of fines with maximum amount: 626


                                                                                

In [21]:
# Find the top 20 vehicle makes, solves B.7
from pyspark.sql.functions import desc
df.groupBy("Make").count().orderBy(desc("count")).show(20)



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+


                                                                                

In [22]:
# Defines a function to map color abbreviations to their full values, solves B.8
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Dictonary of color abbreviations provided by the assignment
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

# Define a UDF to map color abbreviations to full words
def map_color(color_abbr):
    return COLORS.get(color_abbr, color_abbr)

map_color_udf = udf(map_color, StringType())

df = df.withColumn("color long", map_color_udf(col("Color")))

In [23]:
# Find the most common color of vehicles made by Toyota, solves B.9
df.filter(col("Make") == "TOYT").groupBy("color long").count().orderBy(desc("count")).show(1)



+----------+------+
|color long| count|
+----------+------+
|        GY|489697|
+----------+------+


                                                                                

In [None]:
# Stops the spark context
spark_context.stop()