In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from operator import add
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("adam-axelsson-part-b")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.cores.max",4)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/24 16:46:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/24 16:46:59 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [2]:
#B.1
parking = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.119:9000/parking-citations.csv')\
    .cache()

parking.show()


                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [3]:
#B.2

parking.printSchema()


root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [4]:
#B.3
print("Number of rows: %d \n" % parking.count())

#B.4
print("Number of partitions: %d" % parking.rdd.getNumPartitions())



Number of rows: 13077724 

Number of partitions: 16


                                                                                

In [5]:
#B.5
parking = parking.drop(*("VIN","Latitude", "Longitude"))

#B.6
from pyspark.sql.functions import col
parking = parking.withColumn("Fine amount", col("Fine amount").cast('float'))
max_fine = parking.agg({'Fine amount': 'max'}).collect()[0]
print("Maximum fine: %d \n" % max_fine[0])
all_max_fines = parking.where(col("Fine amount") == max_fine[0])
print("Number of fines for %d: %d \n " % (max_fine[0], all_max_fines.count()))

                                                                                

Maximum fine: 1100 





Number of fines for 1100: 626 
 


                                                                                

In [6]:
#B.7
count_make = parking.groupBy("Make").count()
count_make_sorted =  count_make.orderBy("count",ascending=False)
top_20_make = count_make_sorted.limit(20)
top_20_make.show()



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+



                                                                                

In [7]:
#B.8
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def color_column(color):
    COLORS = { 
    'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 
    'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 
    'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 
    'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 
    'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 
    'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 
    'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 
    'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown' 
    } 
    if color in COLORS.keys():
        return COLORS.get(color)
    else:
        return color
    
udfColorColumn = udf(color_column, StringType())
color_df = parking.withColumn("color_long", udfColorColumn("color"))
color_df.select(["color", "color_long"]).show()


[Stage 14:>                                                         (0 + 1) / 1]

+-----+----------+
|color|color_long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



                                                                                

In [8]:
#B.9

make_and_color_long = color_df.select(["Make", "color_long"])
color_toyota = make_and_color_long.where(col("Make") == "TOYT")
count_color_toyota = color_toyota.groupBy("color_long").count()
count_color_toyota_sorted = count_color_toyota.orderBy("count",ascending=False)

count_color_toyota_sorted.show()



+----------+------+
|color_long| count|
+----------+------+
|      Gray|489697|
|     White|434595|
|     Black|353812|
|    Silver|347894|
|      Blue|180091|
|       Red|119074|
|     Green| 74968|
|      Gold| 40646|
|    Maroon| 26242|
|       Tan| 23355|
|     Beige| 15723|
|        OT| 15719|
|     Brown| 11454|
|    Yellow|  4372|
|        PR|  4272|
|    Orange|  3575|
|   Unknown|  2012|
|        TU|  1647|
|      null|   771|
|        CO|   730|
+----------+------+
only showing top 20 rows



                                                                                

In [9]:
spark_session.stop()