
### Author : Ashutosh Kumar
### Contact : ashutoshind2017@outlook.com
### Kernel: PySpark 
### Document Name: NYC Parking Tickets Analysis

## Problem Statement
Big data analytics allows you to analyse data at scale. It has applications in almost every industry in the world. Let’s consider an unconventional application that you wouldn’t ordinarily encounter.

New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets.
In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available on Kaggle. We will try and perform some exploratory analysis on a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017.
Note: Although the broad goal of any analysis of this type is to have better parking and fewer tickets, we are not looking for recommendations on how to reduce the number of parking tickets—there are no specific points reserved for this.
 The purpose of this assignment is to conduct an exploratory data analysis that will help you understand the data. Since the size of the dataset is large, your queries will take some time to run, and you will need to identify the correct queries quicker.
The dataset structure is available on this page along with the data.
https://www.kaggle.com/new-york-city/nyc-parking-tickets

### Accessing the Dataset
The data for this assignment has been placed in HDFS at the following path:
#### '/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv'
 
 
The analysis should be performed on PySpark mounted on your CoreStack cluster, using the PySpark library. Remember that we need to summarise the analysis with insights along with the code.


## Step 1: Data Understanding, Visualisation and Preparation:


In [58]:
# Instantiating the spark session :

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark ML NYC Tickets Analysis").getOrCreate()

In [None]:
# Loading the data:

#Create dataframe by calling read() method on SparkSession/spark object:
# ( used option header true to ignore the first row in csv which is header)

NYCParkingdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true")\
             .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")



In [None]:
# Verify that the data is loaded in the dataframe :

NYCParkingdf.show(5)

In [None]:
# Examining the dataset imported:

#DataFame will have columns, and we use a schema to define them.
# printSchema returns schema in tree format
NYCParkingdf.printSchema()


In [None]:
# Finding the number of records and lenght of dataframe:
print((NYCParkingdf.count(), len(NYCParkingdf.columns)))

In [None]:
# So we have huge data of 10803028 rows and 10 columns in the dataset 

In [None]:
# importing required functions for finding date range for the tickets:
from pyspark.sql.functions import min, max

NYCParkingdf.select(min("Issue Date"), max("Issue Date")).show()

In [None]:
# So we have data from past 1972 till future date 2069 , but we need to filter the records only for year 2017:

# For filtering the data based on the year , lets create a new column in dataframe for the year from Issue Date :

from pyspark.sql.functions import col, year

NYCParkingdf2 = NYCParkingdf.withColumn("Year", year(col("Issue Date")))
NYCParkingdf2.show(5)


In [None]:
# Now filtering the data only for the year 2017 as per the requirement :

NYCParkingdf3 = NYCParkingdf2.filter(NYCParkingdf2.Year == "2017")
NYCParkingdf3.show(5)

In [None]:
# Find the total number of tickets for the year :

# Register the PySpark dataframe as sql temp table for analyis:
NYCParkingdf3.createOrReplaceTempView("dfNYCTable")

# After registering temp table we can run sql queries:
spark.sql('SELECT count(distinct `Summons Number`) FROM dfNYCTable').show()

# Used back tick as escape character for the column "Summons Number" as it was having spaces between them.

In [None]:
# Find out the number of unique states from where the cars that got parking tickets came:
# Registration State column provided this information in the dataframe:

spark.sql('SELECT `Registration State`,count(*) as count1 FROM dfNYCTable group by `Registration State` order by count1 desc').collect()

In [None]:
NYCParkingdf3.select('Registration State').distinct().count()

In [None]:
# We can see that the NY state has the highest number of tickets 4273951 and state FO has lowest number of tickets 8
# Also there are total 65 states data that is present.

# We can see that there are erronous data where the Registration State = 99 , which is incorrect.

In [None]:
# Replacing state 99 with the state having the maximum entries

# Importing required sql function:

from pyspark.sql.functions import when

NYCParkingdf4 = NYCParkingdf3.withColumn("Registration State", \
              when(NYCParkingdf3["Registration State"] == 99, 'NY').otherwise(NYCParkingdf3["Registration State"]))

In [None]:
# Now we have replaced the state 99 with the state having highest ticket, lets check the count again:
NYCParkingdf4.select('Registration State').distinct().count()

In [None]:
# So, now we have 64 distinct state and data is cleaned

In [None]:
# How often does each violation code occur? Display the frequency of the top five violation codes.

violation_code = NYCParkingdf4.select('Violation Code').distinct().count()
violation_code

In [None]:
# So we have 100 different violation codes:

# Re-Register the PySpark dataframe as sql temp table for analyis:
NYCParkingdf4.createOrReplaceTempView("dfNYCTable")

# After registering temp table we can run sql queries:
spark.sql("SELECT `Violation Code`,count(*) as violation_count FROM dfNYCTable group by `Violation Code` order by\
           violation_count desc limit 5").show()

In [None]:
# The top 5 violation codes are 21,36,38,14 and 20.


In [None]:
# How often does each 'vehicle body type' get a parking ticket? 

spark.sql("SELECT `Vehicle Body Type`,count(*) as body_count FROM dfNYCTable group by `Vehicle Body Type` order by\
           body_count desc limit 5").show()

In [None]:
# So the top 5 vehicle body type fined are SUBN,4DSD,VAN, DELV and SDN.

In [None]:
# How about the 'vehicle make'? 

spark.sql("SELECT `Vehicle Make`,count(*) as vehicle_count FROM dfNYCTable group by `Vehicle Make` order by\
           vehicle_count desc limit 5").show()

In [None]:
# So the top 5 vehicle makes which were fined are FORD,TOYOT,HONDA, NISSAN and CHEVR. 

In [None]:
# A precinct is a police station that has a certain zone of the city under its command.
# 'Violation Precinct' (This is the precinct of the zone where the violation occurred)
# Find top 5 violation precinct :

spark.sql("SELECT `Violation Precinct`,count(*) as VP_count FROM dfNYCTable group by `Violation Precinct` order by\
           VP_count desc limit 6").show()


In [None]:
# The violation precinct of 0 is invalid and hence top Violation Precinct are 19,13,1,18 and 114

In [None]:
# 'Issuer Precinct' (This is the precinct that issued the ticket.)
# Find top 5 violation precinct :

spark.sql("SELECT `Issuer Precinct`,count(*) as IP_count FROM dfNYCTable group by `Issuer Precinct` order by\
           IP_count desc limit 6").show()


In [None]:
# The violation precinct of 0 is invalid and hence top Violation Precinct are 19,13,1,18 and 114

# We can infer from this data that the violation and issuing of ticket is happening in the same zone/precint.
# This suggests that the ticketing system might be automated/fast.


In [None]:
# Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones
#have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

from pyspark.sql.functions import col

spark.sql("SELECT `Violation Code`,count(*) as violation_count FROM dfNYCTable where `Issuer Precinct` in (19,13,1) \
            group by `Violation Code` order by violation_count desc limit 5").show()


In [None]:
# The top 5 violation codes are for the top 3 precints zone are 14,46,38,37 and 21.
# The top 5 violation codes are 21,36,38,14 and 20.

# We can infer that we have more traffic violation of the code 46 and 37 in the top 3 fined zones which is not the case with 
# the data for all the traffic zones.
# But we do see few common violation codes which are 21, 36 and 38 there for these top 3 fined zones.



In [None]:
# Find out the properties of parking violations across different times of the day:

# Let's try to find the details on the 'Violation Time':

spark.sql("SELECT count(1) as null_count FROM dfNYCTable where `Violation Time` IS NULL").show()

In [None]:
# We dont have any data with NULL for the year 2017 where the violation time is not recorded.

In [None]:
# The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to
# divide into groups. 
# The time is of datatype string.

# In order to retrieve the time of the day in AM/PM, we must use hhmma. But in SimpleDateFormat, a catches AM or PM, 
# and not A or P. So we need to change our string :

import pyspark.sql.functions as F 

df2 = NYCParkingdf4.withColumn('Violation Time', F.concat(F.col('Violation Time'), F.lit('M')))

# We have used the format of hour as K for hour in am/pm (0-11) according to the below Oracle JAVA documentation :
# https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html
df3 = df2.withColumn('ts', F.to_timestamp('Violation Time',format='KKmma'))

df4 = df3.withColumn('time', F.date_format(F.col('ts'), format='HH:mm'))
df4[['Violation Time','time','ts']].show(5)


In [None]:
# Dropping the columns Violation Time and Year as it is not needed anymore:

# Remove the column
#df4=df4.drop("Violation Time","Year").columns

columns_to_drop = ['Violation Time', 'Year','ts']
df4 = df4.drop(*columns_to_drop)

In [None]:
# Validating unecessary columns have been dropped successfully:
df4.printSchema()

In [None]:
# Changing column types of time from string to date:

# TimestampType()
df4 = df4.withColumn("time", col("time").cast(TimestampType()))

In [None]:
# Rename the column from time to time_stamp:
df4 = df4.withColumnRenamed("time", "time_stamp")

In [None]:
df4.show(5)

In [None]:
# Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the
# three most commonly occurring violations.

# Six equal bins means 240 minutes/4 Hours each:

#import org.apache.spark.sql.functions.{col,hour,minute,second}
from pyspark.sql.functions import col,hour,minute,second

# Re-register the dataframe as sql view:
df4.createOrReplaceTempView("dfNYCTable")

spark.sql(""" SELECT CASE when hour(time_stamp) BETWEEN 0 and 3 then '1' 
                          when hour(time_stamp) BETWEEN 4 and 7 then '2' 
                          when hour(time_stamp) BETWEEN 8 and 11 then '3' 
                          when hour(time_stamp) BETWEEN 12 and 15 then '4' 
                          when hour(time_stamp) BETWEEN 16 and 19 then '5' 
                          ELSE '6' 
                      END 
                          as day_bin, count(*) as bin_count 
                             FROM dfNYCTable group by day_bin order by bin_count desc """).show()

# We can use triple quotes """""" for multiline SQL statements in PySpark

In [None]:
# Most number of violations are happening for time between 8:00 AM and 11:59 AM i.e peak office commute time/morning
# This is followed by violations in evenings between 4:00 PM and 7:59 PM
# The least violation time frame is between midnight to 2:59 AM night time which makes sense as its night time, less traffic.

In [None]:
# Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day 
# (in terms of the bins from the previous part).

# We know from previuos analysis that the three most common violation codes are 21,36 and 38

spark.sql(""" SELECT CASE when hour(time_stamp) BETWEEN 0 and 3 then '1' 
                          when hour(time_stamp) BETWEEN 4 and 7 then '2' 
                          when hour(time_stamp) BETWEEN 8 and 11 then '3' 
                          when hour(time_stamp) BETWEEN 12 and 15 then '4' 
                          when hour(time_stamp) BETWEEN 16 and 19 then '5' 
                          ELSE '6' 
                      END 
                          as day_bin, count(*) as bin_count 
                             FROM dfNYCTable 
                             WHERE `Violation Code` in (21,36,38)
                             group by day_bin order by bin_count desc """).show()



In [None]:
print(1122797/371383 ,2163554/1329942)

In [None]:
# The data for the most number of the violations for part of day for top 3 violations type matches with overall violations data
# The only difference is violation in morning between 8 AM and 11:59 AM is much higher (3 times) as opposed to 1.5 times for 
# the top 3 violation types as opposed to overall violation types.

In [None]:
# Let’s try and find some seasonality in this data:

# First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. 

According to the meteorological definition, the seasons begin on the first day of the months that include the equinoxes and solstices:
Source : https://www.timeanddate.com/calendar/aboutseasons.html

Spring runs from March 1 to May 31;
Summer runs from June 1 to August 31;
Fall (autumn) runs from September 1 to November 30; and
Winter runs from December 1 to February 28 (February 29 in a leap year).

So lets have below seasons for all 12 months: 
Spring(March, April, May)
Summer(June, July, August)
Fall(September, October, November)
Winter(December, January, February)

In [None]:
spark.sql(""" SELECT CASE when month(`Issue Date`) IN (3,4,5) then 'Spring' 
                          when month(`Issue Date`) IN (6,7,8) then 'Summer' 
                          when month(`Issue Date`) IN (9,10,11) then 'Fall' 
                          ELSE 'Winter' 
                      END 
                          as Season, count(*) as bin_count 
                             FROM dfNYCTable group by Season order by bin_count desc """).show()


In [None]:
# The Spring season records for most number of the violations 2873383 followed by the Winter season 1704690.
# Surprisingly, the count of traffic tickets are very low 979 and least among all the seasons for Fall season.

In [None]:
# Find the three most common violations for each of these seasons:

spark.sql(""" SELECT `Violation Code`,count(*) as violation_count, "Spring" as Season
                           FROM dfNYCTable where month(`Issue Date`) IN (3,4,5)
                             group by `Violation Code` order by violation_count desc limit 3""").show()



In [None]:
# Find the three most common violations for each of these seasons:

spark.sql(""" SELECT `Violation Code`,count(*) as violation_count, "Summer" as Season
                           FROM dfNYCTable where month(`Issue Date`) IN (6,7,8)
                             group by `Violation Code` order by violation_count desc limit 3""").show()



In [None]:
# Find the three most common violations for each of these seasons:

spark.sql(""" SELECT `Violation Code`,count(*) as violation_count, "Fall" as Season
                           FROM dfNYCTable where month(`Issue Date`) IN (9,10,11)
                             group by `Violation Code` order by violation_count desc limit 3""").show()



In [None]:
# Find the three most common violations for each of these seasons:

spark.sql(""" SELECT `Violation Code`,count(*) as violation_count, "Winter" as Season
                           FROM dfNYCTable where month(`Issue Date`) IN (12,1,2)
                             group by `Violation Code` order by violation_count desc limit 3""").show()



In [None]:
# The top 3 violation code remains same for the Summer, Winter and Spring season which are 21, 36 and 38.
# On the other hand top 3 violation code for the Fall month are 46,21 and 40

The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

#### Find the total occurrences of the three most common violation codes.


In [None]:
spark.sql("SELECT count(1) as violation_count FROM dfNYCTable where \
             `Violation Code` in ('21','36','38') ").show()

In [None]:
# The total occurence of three most common violation code for the year 2017 is 1972931

Then, visit the website:
http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.
What can you intuitively infer from these findings?


The top 3 fines and correponding calculations :
(Calculating average of two demographics for each fine type)

21 	Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device ($65, $45)
    Resultant fine = $55
36 	Exceeding the posted speed limit in or near a designated school zone. ($50,$50)
    Resultant fine = $50
38  Failing to show a receipt or tag in the windshield. Drivers get a 5-minute grace period past the expired time on parking         meter receipts. ($65, $35)
    Resultant fine = $50

In [None]:
# Let's create a new PySpark dataframe to update the fine amount :

df5 = df4.withColumn(
    'fine_amount',
    F.when((F.col("Violation Code") == '21') , 55)\
    .when((F.col("Violation Code") == '36') , 50)\
    .when((F.col("Violation Code") == '38') , 50)\
    .otherwise(0)
)

df5.show(5)

In [None]:
# Let's rebuild the sql view from new dataframe:

df5.createOrReplaceTempView("dfNYCTable")


In [None]:
# Not using the filter condition here as we want total fine amount for top 3 violations and dataframe was updated only for top 3:
spark.sql(""" SELECT sum(fine_amount) as violation_amount_total
                           FROM dfNYCTable """).show()

In [None]:
# The total amount collected for the three violation codes with the maximum tickets is 102486985 $ for year 2017 !

In [None]:
spark.sql(""" SELECT `Violation Code`, sum(fine_amount) as violation_amount_total
                           FROM dfNYCTable 
                           group by `Violation Code` order by violation_amount_total desc limit 3 """).show()

In [None]:
# So, the code that has the highest total collection is 21 from traffic tickets amounting to whopping 42244785 $.
# The traffic violation code 21 means fine due to violation of "No Parking" rule.

# We can infer that the US cities alike other cities of world are facing issue with the Street Cleaning and this also results
# in the more traffic congestions and higher commute time for city dwellers.
