# Part B - Working with DataFrames and SQL

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark_session = SparkSession.builder\
.master("spark://192.168.2.70:7077") \
.appName("derrick-adjei_app-partb")\
.config("spark.dynamicAllocation.enabled", True)\
.config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
.config("spark.shuffle.service.enabled", True)\
.config("spark.dynamicAllocation.executorIdleTimeout","30s")\
.config("spark.cores.max", 4)\
.getOrCreate()



## B.1  Load the CSV file from HDFS, and call show() to verify the data is loaded correctly

In [10]:
df = spark_session.read\
        .option("header", True)\
        .csv("hdfs://192.168.2.70:9000/parking-citations.csv")

## B.2 Print the schema for the DataFrame

In [11]:
df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



## B.3 Count the number of rows in the CSV file

In [12]:
row_count = df.count()

In [13]:
print(f'Row count: {row_count}')

Row count: 13077724


## B.4 Count the number of partitions in the underlying RDD.

In [15]:
df.rdd.getNumPartitions()

16

## B.5 Drop the columns VIN, Latitude and Longitude

In [17]:
df_new = df.drop('VIN', 'Latitude', 'Longitude')

In [18]:
# verify columns have been dropped
df_new.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



## B.6 Find the maximum fine amount. How many fines have this amount?

In [23]:
df_new = df_new.withColumn("Fine amount", df['Fine amount'].cast('float'))

In [24]:
df_new.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [25]:
# Using max() function
from pyspark.sql.functions import max

In [39]:
max_fine = df_new.select(max(df_new['Fine amount']).alias('max_fine'))
max_fine.show()

+--------+
|max_fine|
+--------+
|  1100.0|
+--------+



In [43]:
max_fine_val = max_fine.first()['max_fine']

In [45]:
rows_with_max_fine = df_new.where(df_new['Fine amount'] == max_fine_val).count()

In [46]:
print(f'There are {rows_with_max_fine} rows with this fine value')

There are 626 rows with this fine value


## B.7 Show the top 20 most frequent vehicle makes and their frequencies 

In [51]:
makes_rdd = df_new.select('Make').rdd\
            .map(lambda x: (x['Make'], 1))\
            .reduceByKey(lambda x,y: x+y)

In [53]:
makes_rdd.sortBy(lambda x: x[1], ascending=False).take(20)

[('TOYT', 2150768),
 ('HOND', 1479996),
 ('FORD', 1116235),
 ('NISS', 945133),
 ('CHEV', 892676),
 ('BMW', 603092),
 ('MERZ', 543298),
 ('VOLK', 432030),
 ('HYUN', 404917),
 ('DODG', 391686),
 ('LEXS', 368420),
 ('KIA', 328155),
 ('JEEP', 316300),
 ('AUDI', 255395),
 ('MAZD', 242344),
 ('OTHR', 205546),
 ('GMC', 184889),
 ('INFI', 174315),
 ('CHRY', 159948),
 ('SUBA', 154640)]

### B.8 Create a User Defined Function to create a new column, ‘color long’, mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color

In [55]:
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black', 'BL':'Blue',
'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze', 'CH':'Charcoal', 'DK':'Dark',
'GD':'Gold', 'GO':'Gold', 'GN':'Green', 'GY':'Gray', 'GT':'Granite',
'IV':'Ivory', 'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

In [68]:
# import the functions as F from pyspark.sql
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

def colorLong(color):
    if color == None:
        return COLORS['UN']
    
    if color in COLORS:
        return COLORS[color]
    
    return color

color_udf = F.udf(colorLong, StringType())

df_with_color_long = df_new.withColumn('Color long', color_udf(df_new['Color']))

### B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

In [71]:
toyt_colors_rdd = df_with_color_long.filter("Make == 'TOYT'").select('Color long')\
        .rdd.map(lambda x: (x['Color long'], 1))\
        .reduceByKey(lambda x,y: x+y)

In [72]:
toyt_colors_rdd.sortBy(lambda x: x[1], ascending=False).take(20)

[('Gray', 489697),
 ('White', 434595),
 ('Black', 353812),
 ('Silver', 347894),
 ('Blue', 180091),
 ('Red', 119074),
 ('Green', 74968),
 ('Gold', 40646),
 ('Maroon', 26242),
 ('Tan', 23355),
 ('Beige', 15723),
 ('OT', 15719),
 ('Brown', 11454),
 ('Yellow', 4372),
 ('PR', 4272),
 ('Orange', 3575),
 ('Unknown', 2783),
 ('TU', 1647),
 ('CO', 730),
 ('Pink', 117)]

The most frequently occuring color for TOYT is **Gray**