In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, udf, length
from pyspark.sql.types import FloatType, StringType

spark_session = SparkSession\
       .builder\
       .master("spark://192.168.2.87:7077") \
       .appName("jialun_song_part1_sectionB")\
       .config("spark.dynamicAllocation.enabled", True)\
       .config("spark.shuffle.service.enabled", True)\
       .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
       .config("spark.executor.cores",4)\
       .getOrCreate()

spark_session.sparkContext

# B. 1

In [2]:
parking_citation_df = spark_session.read.csv('hdfs://192.168.2.87:9000/parking-citations.csv', header=True).cache()
parking_citation_df.show(10)

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|          Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|  

# B. 2

In [3]:
print(f'Number of partitions of underlying rdd: {parking_citation_df.rdd.getNumPartitions()} \n')

Number of partitions of underlying rdd: 11 



# B. 3

In [4]:
parking_citation_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



# B. 4

In [5]:
parking_citation_df.count()

9881842

# B. 5+

In [37]:
parking_citation_df = parking_citation_df \
    .drop('Agency Description', 'Agency', 'Route') \
    .withColumn('Fine amount', col('Fine amount').cast(FloatType()))
parking_citation_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



# B. 6

In [6]:
parking_citation_df.select(avg('Fine amount').alias('mean fine amount')).show()

+----------------+
|mean fine amount|
+----------------+
|70.1855354220642|
+----------------+



# B. 7

In [7]:
parking_citation_df.groupBy('Make').count().orderBy('count', ascending=False).limit(10).show()

+----+-------+
|Make|  count|
+----+-------+
|TOYT|1633266|
|HOND|1113834|
|FORD| 860828|
|NISS| 709250|
|CHEV| 674422|
| BMW| 450909|
|MERZ| 402126|
|VOLK| 335618|
|HYUN| 304934|
|DODG| 290979|
+----+-------+



# B. 8

In [8]:
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon', 'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver', 'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

def full_color_name(color):
    return COLORS.get(color, color)

udf_full_color_name = udf(full_color_name, StringType())
parking_citation_color_df = parking_citation_df.withColumn('color long', udf_full_color_name(parking_citation_df['Color']))
# parking_citation_color_df.filter(length(parking_citation_color_df['color long']) <= 2).show(5)
parking_citation_color_df.printSchema()
parking_citation_color_df.show(3)

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)
 |-- color long: string (nullable = true)

+---

# B. 9

In [9]:
parking_citation_color_df \
    .filter(parking_citation_color_df.Make == 'HOND') \
    .groupBy(parking_citation_color_df['color long']) \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(1) \
    .show()

+----------+------+
|color long| count|
+----------+------+
|      Gray|266135|
+----------+------+

