# PART B
(Or better known as part B: Thank god for abstractions)
### CONFIGURATION

In [1]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("EricJonsson_B")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()


### B.1 - B.9

In [2]:
# B.1

# Load CSV file
citations_original_df = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.113:9000/parking-citations.csv')\
    .cache()
# Show Dataframe
citations_original_df.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [3]:
# B.2

# Print Schema
citations_original_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [4]:
# B.3

# Count Number of Rows
citations_original_df.count()

9257460

In [5]:
# B.4

# Count number of partitions
citations_original_df.rdd.getNumPartitions()

10

In [6]:
# B.5

# Drop VIN, Long, Lat
citations_df = citations_original_df.drop('VIN', 'Latitude', 'Latitude')
# Print Schema after Drop
citations_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [7]:
# B.6

from pyspark.sql.types import FloatType

# Since dataframes are immutable, we drop the 'Fine amount' column, and create a new dataframe with the same column name but float type
citations_converted_df = citations_df.drop('Fine amount')
citations_converted_df = citations_df.withColumn('Fine amount', citations_df['Fine amount'].cast(FloatType()))
# Print Schema of updated dataframe to verify that the column 'Fine amount' is indeed of type float
citations_converted_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Longitude: string (nullable = true)



In [8]:
# B.6

# Group and order by fine amount and show in descending order
citations_converted_df.select('Fine amount').groupBy('Fine amount').count().orderBy('Fine amount', ascending=False).show()

+-----------+-------+
|Fine amount|  count|
+-----------+-------+
|      505.0|      6|
|      363.0|  63366|
|      353.0|     15|
|      345.0|     40|
|      330.0|      1|
|      293.0|  10401|
|      255.0|     30|
|      163.0| 106748|
|      155.0|      1|
|      143.0|    373|
|      133.0|   9185|
|      128.0|    338|
|      123.0|      3|
|      113.0|      2|
|      105.0|    729|
|      103.0|   7401|
|       98.0|    333|
|       93.0|1097437|
|       88.0|    158|
|       85.0|      5|
+-----------+-------+
only showing top 20 rows



In [9]:
# B.7

# Group by make and order by the count in descending order
citations_df.select('Make').groupBy('Make').count().orderBy('count', ascending=False).show()

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1531949|
|HOND|1043276|
|FORD| 807498|
|NISS| 662097|
|CHEV| 631413|
| BMW| 422916|
|MERZ| 376830|
|VOLK| 316002|
|HYUN| 285286|
|DODG| 271590|
|LEXS| 263269|
| KIA| 217795|
|JEEP| 214965|
|AUDI| 179718|
|MAZD| 169811|
|OTHR| 154376|
| GMC| 132788|
|INFI| 120340|
|CHRY| 120317|
|ACUR| 111265|
+----+-------+
only showing top 20 rows



In [12]:
# B.8

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Function that returns the long name when given a shorthand color, return the original name if no match is found
def colorToLong(col):
    COLORS={'AL':'Aluminum','AM':'Amber','BG':'Beige','BK':'Black','BL':'Blue','BN':'Brown','BR':'Brown','BZ':'Bronze','CH':'Charcoal','DK':'Dark','GD':'Gold','GO':'Gold','GN':'Green','GY':'Gray','GT':'Granite','IV':'Ivory','LT':'Light','OL':'Olive','OR':'Orange','MR':'Maroon','PK':'Pink','RD':'Red','RE':'Red','SI':'Silver','SL':'Silver','SM':'Smoke','TN':'Tan','VT':'Violet','WT':'White','WH':'White','YL':'Yellow','YE':'Yellow','UN':'Unknown'}
    if col in COLORS:
        return COLORS[col]
    else:
        return col
# Create udf version of the function
udf_colorToLong = udf(colorToLong,StringType())

# Create new dataframe with new column 'Color Long'
citations_colorLong_df = citations_df.withColumn('Color Long', udf_colorToLong('Color'))

In [13]:
# B.9

# Filter to only show Make == TOYT, group by 'Color Long', count and order in descending order
citations_colorLong_df.filter(citations_colorLong_df['Make'] == 'TOYT').groupBy('Color Long').count().orderBy('count', ascending=False).show()

+----------+------+
|Color Long| count|
+----------+------+
|      Gray|346822|
|     White|304620|
|     Black|252199|
|    Silver|248685|
|      Blue|128051|
|       Red| 84175|
|     Green| 57627|
|      Gold| 30154|
|    Maroon| 19882|
|       Tan| 17006|
|     Beige| 11572|
|        OT| 10805|
|     Brown|  8466|
|    Yellow|  3413|
|        PR|  3010|
|    Orange|  2527|
|   Unknown|  1343|
|        TU|  1077|
|        CO|   423|
|      Pink|    89|
+----------+------+
only showing top 20 rows

