In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas
import matplotlib
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))


spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("AdityaShirke_B")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.cores.max",4)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

In [2]:
#B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.
citations = spark_session.read.csv("hdfs://192.168.2.113:9000/parking-citations.csv", header = True, sep = ",")
citations.take(10)

[Row(Ticket number='1103341116', Issue Date='2015-12-21T00:00:00', Issue time='1251', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='200304', VIN=None, Make='HOND', Body Style='PA', Color='GY', Location='13147 WELBY WAY', Route='01521', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50', Latitude='99999', Longitude='99999'),
 Row(Ticket number='1103700150', Issue Date='2015-12-21T00:00:00', Issue time='1435', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201512', VIN=None, Make='GMC', Body Style='VN', Color='WH', Location='525 S MAIN ST', Route='1C51', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50', Latitude='99999', Longitude='99999'),
 Row(Ticket number='1104803000', Issue Date='2015-12-21T00:00:00', Issue time='2055', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201503', VIN=None, Make='NISS', Body Style='P

In [3]:
#B.2 Print the schema for the DataFrame.
citations.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [4]:
#B.3 Count the number of rows in the CSV file.
citations.count()

9257460

In [5]:
#B.4 Count the number of partitions in the underlying RDD.
citations.rdd.getNumPartitions()

10

In [6]:
#B.5 Drop the columns VIN, Latitude and Longitude.
citations_new = citations.drop("VIN", "Latitude", "Longitude")
citations_new.show()

#Another way
#citations_new = citations.drop(citations.VIN).drop(citations.Latitude).drop(citations.Longitude)
#citations_new.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN|   WH|       525 S MAIN ST| 1C51|     1|        4000A1|   NO

In [7]:
#B.6 Find the maximum fine amount. How many fines have this amount?
citations_new_floatAmount = citations_new.withColumn("Fine amount", citations_new['Fine amount'].cast('float'))

max_float_amount = citations_new_floatAmount.agg({"Fine amount": "max"}).collect()[0]
print("Maximum fine amount: ", max_float_amount["max(Fine amount)"])

fine_amounts_counts = citations_new_floatAmount.groupBy("Fine amount").count()
fines_with_max_amount = fine_amounts_counts.filter(fine_amounts_counts['Fine amount'] == max_float_amount["max(Fine amount)"])
fines_with_max_amount.show()


Maximum fine amount:  505.0
+-----------+-----+
|Fine amount|count|
+-----------+-----+
|      505.0|    6|
+-----------+-----+



In [8]:
#B.7 Show the top 20 most frequent vehicle makes, and their frequencies.
#from pyspark.sql.functions import desc
frquent_vehicle_make = citations_new_floatAmount.groupBy("Make").count().orderBy("count", ascending=0)
frquent_vehicle_make.show(20)

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



In [9]:
#B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to create a new column, ‘color long’, mapping the original colors to their corresponding values in the dictionary below. 
#If there is no key matching the original color, use the original color.

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

#from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def translate(colors):
    def translate_(mapped_color):
        return_value = mapped_color
        if (COLORS.get(mapped_color) is not None): #If there is no key matching the original color, use the original color.
            return_value = COLORS.get(mapped_color)
        return return_value
    return udf(translate_, StringType())


new_citations_color = citations_new_floatAmount.withColumn("color long", translate(COLORS)("Color"))
new_citations_color.show(10)

#new_citations_color.filter(new_citations_color['color long'] == 'null').show()



+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount|color long|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|HOND|        PA|   GY|   13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|       50.0|      Gray|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|            CA|           201512| GMC|        VN|   WH|     525 S MAIN ST

In [10]:
#B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?
filtered_toyt = new_citations_color.filter(new_citations_color['Make'] == "TOYT")
most_frequent_color_toyt = filtered_toyt.groupBy('Make', 'color long').count().orderBy("count", ascending=0)
most_frequent_color_toyt.show(1)
most_frequent_color_toyt.collect()[0][1]

+----+----------+------+
|Make|color long| count|
+----+----------+------+
|TOYT|      Gray|346822|
+----+----------+------+
only showing top 1 row



'Gray'