## Section B - Working with DataFrames and SQL

In [2]:
import pandas as pd
import numpy as np
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import * 

In [3]:
# creating a spark session.
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.250:7077") \
        .appName("Koushik_A3_sql")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/20 19:26:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sqlContext = SQLContext(spark_session.sparkContext)
sqlContext



<pyspark.sql.context.SQLContext at 0x7f6475345820>

### Question B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly

In [5]:
# Loading csv file from hdfs and storing it into a variable.
df= sqlContext.read.csv('hdfs://192.168.2.250:9000/parking-citations.csv',header='true', inferSchema='true').cache()

                                                                                

In [6]:
df.show()



+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21 00:00:00|    1251.0|    NULL|       NULL|            CA|         200304.0|NULL|HOND|        PA|   GY|

                                                                                

### Question B.2 Print the schema for the DataFrame.

In [7]:
df.printSchema() #print schema of dataset

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



### Question B.3 Count the number of rows in the CSV file.

In [8]:
df.count()

13077724

### Question B.4 Count the number of partitions in the underlying RDD.

In [9]:
df.rdd.getNumPartitions()

16

### B.5 Drop the columns VIN, Latitude and Longitude

In [10]:
cols_to_remove = ["VIN", "Latitude","Longitude"]
new_df = df.drop(*cols_to_remove) # deleting the columns

### Question B.6 Find the maximum fine amount. How many fines have this amount?

In [11]:
new_df = new_df.withColumn('Fine amount', new_df['Fine amount'].cast('float').alias('Fine amount'))

In [12]:
max_value = new_df.agg({'Fine Amount':'max'}).collect()[0][0]
print(max_value)

1100.0


In [13]:
count_max = new_df.select('Fine amount').where(new_df['Fine amount'] == max_value).count()

print(f'The maximum fine amount: {max_value} and has been occured: {count_max} times')

The maximum fine amount: 1100.0 and has been occured: 626 times


### Question B.7 Show the top 20 most frequent vehicle makes, and their frequencies.

In [14]:
new_df.groupby('Make').count().orderBy('count', ascending=False).show() # find 20 most frequent car makes



+----+-------+
|Make|  count|
+----+-------+
|TOYT|2150768|
|HOND|1479996|
|FORD|1116235|
|NISS| 945133|
|CHEV| 892676|
| BMW| 603092|
|MERZ| 543298|
|VOLK| 432030|
|HYUN| 404917|
|DODG| 391686|
|LEXS| 368420|
| KIA| 328155|
|JEEP| 316300|
|AUDI| 255395|
|MAZD| 242344|
|OTHR| 205546|
| GMC| 184889|
|INFI| 174315|
|CHRY| 159948|
|SUBA| 154640|
+----+-------+
only showing top 20 rows



                                                                                

### Question B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to create a new column, 'color long', mapping the original colors to their corresponding values in the dictionary below. If there is no key matching the original color, use the original color.

In [15]:
def translate(dictionary): 
    return udf(lambda col: dictionary.get(col),StringType())

COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

new_df=new_df.withColumn('color long',translate(COLORS)('Color')) # creating new column with mapped colors. 

In [16]:
new_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Issue time: double (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: double (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: double (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: double (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)
 |-- color long: string (nullable = true)



### Question B.9 Using this new column, what’s the most frequent colour value for Toyotas (TOYT)?

In [17]:
new_df.groupby('Make').agg(mode('color long')).where(new_df['MAKE'] == 'TOYT').show() # finding frequent color in Toyota cars



+----+----------------+
|Make|mode(color long)|
+----+----------------+
|TOYT|            Gray|
+----+----------------+



                                                                                

In [18]:
spark_session.stop()