In [1]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("Part_B_Raheel_Ali")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()


# Old API (RDD)
spark_context = spark_session.sparkContext

In [3]:
### B.1
dataSet = spark_session.read\
    .option("header", "true")\
    .csv("hdfs://192.168.2.87:9000/parking-citations.csv")\
    .cache()

if(dataSet != 0):
    dataSet.show()    

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [4]:
### B.2
dataSet.count()

9881842

In [5]:
### B.3
dataSet.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [6]:
### B.4
dataSet.rdd.getNumPartitions()

11

In [7]:
### B.5
columnsToDrop = ['Agency Description', 'Agency', 'Route']
dataSet = dataSet.drop(*columnsToDrop)

In [8]:
### B.5
dataSet.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [9]:
## B.6
dataSet2 = dataSet.select('Fine amount').summary ()
dataSet2.filter('summary == "mean"').show ()

+-------+----------------+
|summary|     Fine amount|
+-------+----------------+
|   mean|70.1855354220642|
+-------+----------------+



In [10]:
### B.7
vehiclesToShow = dataSet.groupby('Make')\
      .count().orderBy('count', ascending=False)
vehiclesToShow.take(10)

[Row(Make='TOYT', count=1633266),
 Row(Make='HOND', count=1113834),
 Row(Make='FORD', count=860828),
 Row(Make='NISS', count=709250),
 Row(Make='CHEV', count=674422),
 Row(Make='BMW', count=450909),
 Row(Make='MERZ', count=402126),
 Row(Make='VOLK', count=335618),
 Row(Make='HYUN', count=304934),
 Row(Make='DODG', count=290979)]

In [11]:
### B.8
import pyspark
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def color_colorlong(color):
    COLORS = {
        'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
        'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
        'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
        'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
        'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
        'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
        'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
        'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'}
    
    if color in COLORS:
        return COLORS[color]
    return color

userdf_color_colorlong = udf(color_colorlong, StringType())
dataSet_colorlong = dataSet.withColumn("color long", userdf_color_colorlong("Color"))
dataSet_colorlong.cache()
dataSet_colorlong.select('Color', 'color long').show()

+-----+----------+
|Color|color long|
+-----+----------+
|   GY|      Gray|
|   WH|     White|
|   BK|     Black|
|   WH|     White|
|   BK|     Black|
|   GY|      Gray|
|   BL|      Blue|
|   BK|     Black|
|   BR|     Brown|
|   SI|    Silver|
|   WH|     White|
|   GO|      Gold|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   BK|     Black|
|   WH|     White|
| null|      null|
|   BK|     Black|
|   BK|     Black|
+-----+----------+
only showing top 20 rows



In [12]:
### B.8
dataSet_colorlong.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)
 |-- color long: string (nullable = true)



In [13]:
### B.9
hondaColor = dataSet_colorlong.filter("Make = 'HOND'").groupby('color long')\
      .count().orderBy('count', ascending=False)
hondaColor.show()

+----------+------+
|color long| count|
+----------+------+
|      Gray|266135|
|     Black|234627|
|    Silver|180219|
|     White|160165|
|      Blue|100208|
|       Red| 46802|
|     Green| 45293|
|      Gold| 17679|
|    Maroon| 17079|
|     Brown|  9205|
|       Tan|  8114|
|        OT|  7232|
|     Beige|  5395|
|    Orange|  3318|
|        PR|  3287|
|        GR|  3253|
|    Yellow|   978|
|        TA|   976|
|   Unknown|   818|
|        TU|   644|
+----------+------+
only showing top 20 rows



In [14]:
# release the cores for another application!
spark_context.stop()