In [80]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [81]:
from pprint import pprint
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("alexistubulekasA3_partB")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .config("spark.cores.max",2)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [82]:
# B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.113:9000/parking-citations.csv')\
    .cache()

data_frame.show(5)

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|   13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|  

In [83]:
# B.2 Print the schema for the DataFrame.
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [84]:
# B.3 Count the number of rows in the CSV file
data_frame.count() 

9257460

In [85]:
# B.4 Count the number of partitions in the underlying RDD.
data_frame.rdd.getNumPartitions() #get num partitions

10

In [86]:
# B.5 Drop the columns VIN, Latitude and Longitude.
data_frame = data_frame.drop('VIN','Latitude','Longitude')

In [99]:
# B.6 Find the maximum fine amount. 
from pyspark.sql.functions import col

col_name = "Fine amount"
data_frame = data_frame.withColumn(col_name, col(col_name).cast('float')) #recast fine amount column
data_frame.select('Fine amount').summary('max').show() #show summary inc. max fine

+-------+-----------+
|summary|Fine amount|
+-------+-----------+
|    max|      505.0|
+-------+-----------+



In [100]:
# B.6 How many fines have this amount?
max_fine_df = data_frame.filter(data_frame['Fine amount'] == 505.0)
max_fine_df.count()

6

In [101]:
# B.7 Show the top 20 most frequent vehicle makes, and their frequencies.
data_frame.groupby("Make").count().sort(col("count").desc()).show() 

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



In [102]:
# B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to create a new column, ‘color long’, 
# mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color.

import pyspark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

#step 1, create a python function
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

def Color_to_Color_Long(abbreviation):
    
    if COLORS.get(abbreviation):
        return COLORS.get(abbreviation)
    else:
        return abbreviation

    
    

# User-defined function. Input type is a string.
udf_Color_to_Color_Long = udf(Color_to_Color_Long, StringType())

data_frame_with_color_long = data_frame.withColumn("color long", udf_Color_to_Color_Long("Color"))

data_frame_with_color_long.select('Color','color long').show()



+-----+----------+
|Color|color long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



In [103]:
# B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

data_frame_with_color_long.filter(data_frame_with_color_long['Make'] == 'TOYT').groupby('color long').count().sort(col("count").desc()).show(1)



+----------+------+
|color long| count|
+----------+------+
|      Gray|346822|
+----------+------+
only showing top 1 row



In [104]:
# release the cores for another application!
spark_context.stop()