In [1]:
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, StringType

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",3)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .appName("marcelloVendruscolo_Assignment3_pB")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
#User Defined Functions.
def string_to_float(fine_amount):
    try:
        return float(fine_amount)
    except:
        return 0

def expand_colour_names(original_colour):
    COLORS = {'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown',\
              'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',\
              'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive',\
              'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver',\
              'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',\
              'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'}
    extended_colour = COLORS.get(original_colour)
    if(not extended_colour is None):
        return extended_colour
    return original_colour

In [3]:
#B.1 - Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.113:9000/parking-citations.csv')
data_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [4]:
#B.2 - Print the schema for the DataFrame.
print("Schema for the DataFrame:\n")
data_frame.printSchema()

Schema for the DataFrame:

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [5]:
#B.3 - Count the number of rows in the CSV file.
print("Number of rows in the CSV files: " + str(data_frame.count()))

Number of rows in the CSV files: 9257460


In [6]:
#B.4 - Count the number of partitions in the underlying RDD.
print("Number of partitions in the underlying RDD: " + str(data_frame.rdd.getNumPartitions()))

Number of partitions in the underlying RDD: 10


In [7]:
#B.5 - Drop the columns VIN, Latitude and Longitude.
data_frame_reduced = data_frame.drop(*['VIN', 'Latitude', 'Longitude'])

In [8]:
#B.6 - Find the maximum fine amount. How many fines have this amount?
udf_string_to_float = udf(string_to_float, FloatType())
data_frame = data_frame.withColumn('Float fine amount', udf_string_to_float('Fine amount'))
max_fine = data_frame.agg({'Float fine amount': 'max'}).collect()[0]['max(Float fine amount)']
print("The maximum fine amount: " + str(max_fine))
max_fine_freq = data_frame.filter(data_frame['Float fine amount'] == max_fine).count()
print("The number of fines corresponding to the maximum fine: " + str(max_fine_freq))

The maximum fine amount: 505.0
The number of fines corresponding to the maximum fine: 6


In [9]:
#B.7 - Show the top 20 most frequent vehicle makes, and their frequencies.
print("The top 20 most frequent vehicle makes:\n")
data_frame.groupby('Make').count().withColumnRenamed('count','Frequency').orderBy('Frequency', ascending=False).show()

The top 20 most frequent vehicle makes:

+----+---------+
|Make|Frequency|
+----+---------+
|TOYT|  1531949|
|HOND|  1043276|
|FORD|   807498|
|NISS|   662097|
|CHEV|   631413|
| BMW|   422916|
|MERZ|   376830|
|VOLK|   316002|
|HYUN|   285286|
|DODG|   271590|
|LEXS|   263269|
| KIA|   217795|
|JEEP|   214965|
|AUDI|   179718|
|MAZD|   169811|
|OTHR|   154376|
| GMC|   132788|
|INFI|   120340|
|CHRY|   120317|
|ACUR|   111265|
+----+---------+
only showing top 20 rows



In [10]:
#B.8 - Expand abbreviations in the color column with a User Defined Function to create a new ‘color long’ column mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color.
udf_expand_colour_names = udf(expand_colour_names, StringType())
data_frame = data_frame.withColumn('Color Long', udf_expand_colour_names('Color'))

In [11]:
#B.9 - Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?
print("The top most frequent colour values for Toyotas (TOYT):\n")
data_frame.select('Make', 'Color Long').filter('Make == "TOYT"').groupby('Color Long').count().withColumnRenamed('count','Frequency').orderBy('Frequency', ascending=False).show()

The top most frequent colour values for Toyotas (TOYT):

+----------+---------+
|Color Long|Frequency|
+----------+---------+
|      Gray|   346822|
|     White|   304620|
|     Black|   252199|
|    Silver|   248685|
|      Blue|   128051|
|       Red|    84175|
|     Green|    57627|
|      Gold|    30154|
|    Maroon|    19882|
|       Tan|    17006|
|     Beige|    11572|
|        OT|    10805|
|     Brown|     8466|
|    Yellow|     3413|
|        PR|     3010|
|    Orange|     2527|
|   Unknown|     1343|
|        TU|     1077|
|        CO|      423|
|      Pink|       89|
+----------+---------+
only showing top 20 rows



In [12]:
#Release the cores for another application!
spark_context.stop()