In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NYC_Parking_Data').master("local").getOrCreate()
spark

In [4]:
## Data is frist loaded to EC2 from s3 and then to hadoop using thing commented out commands below
# wget https://upgrad-spark-data.s3.amazonaws.com/Parking_Violations_Issued_-_Fiscal_Year_2017.csv
# hadoop fs -put Parking_Violations_Issued_-_Fiscal_Year_2017.csv /user/root

df=spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2017.csv", inferSchema = True, header=True)

In [4]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType, TimestampType

In [6]:
fileSchema = StructType([
    StructField('Summons Number', LongType(), True),
    StructField('Plate ID', StringType(), True),
    StructField('Registration State', StringType(), True),
    StructField('Plate Type', StringType(), True),
    StructField('Issue Date', StringType(), True),
    StructField('Violation Code', IntegerType(), True),
    StructField('Vehicle Body Type', StringType(), True),
    StructField('Vehicle Make', StringType(), True),
    StructField('Issuing Agency', StringType(), True),
    StructField('Street Code1', IntegerType(), True),
    StructField('Street Code2', IntegerType(), True),
    StructField('Street Code3', IntegerType(), True),
    StructField('Vehicle Expiration Date', StringType(), True),
    StructField('Violation Location', IntegerType(), True),
    StructField('Violation Precinct', IntegerType(), True),
    StructField('Issuer Precinct', IntegerType(), True),
    StructField('Issuer Code', IntegerType(), True),
    StructField('Issuer Command', StringType(), True),
    StructField('Issuer Squad', StringType(), True),
    StructField('Violation Time', StringType(), True),
    StructField('Time First Observed', StringType(), True),
    StructField('Violation County', StringType(), True),
    StructField('Violation In Front Of Or Opposite', StringType(), True),
    StructField('House Number', StringType(), True),
    StructField('Street Name', StringType(), True),
    StructField('Intersecting Street', StringType(), True),
    StructField('Date First Observed', StringType(), True),
    StructField('Law Section', IntegerType(), True),
    StructField('Sub Division', StringType(), True),
    StructField('Violation Legal Code', StringType(), True),
    StructField('Days Parking In Effect', StringType(), True),
    StructField('From Hours In Effect', StringType(), True),
    StructField('To Hours In Effect', StringType(), True),
    StructField('Vehicle Color', StringType(), True),
    StructField('Unregistered Vehicle?', IntegerType(), True),
    StructField('Vehicle Year', StringType(), True),
    StructField('Meter Number', StringType(), True),
    StructField('Feet From Curb', IntegerType(), True),
    StructField('Violation Post Code', StringType(), True),
    StructField('Violation Description', StringType(), True),
    StructField('No Standing or Stopping Violation', StringType(), True),
    StructField('Hydrant Violation', StringType(), True),
    StructField('Double Parking Violation', StringType(), True),
])

In [7]:
df = spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2017.csv", schema = fileSchema, header=True)
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Count

In [8]:
from pyspark.sql.functions import *

In [9]:
# Selecting only columns that are required for analysis 'Registration State','Vehicle Body Type','Vehicle Make', 'Violation Code', 'Issue Date' and converting Issue Date to timestamp

pattern1 = 'MM/dd/yyyy'
df1 = df.select('Registration State','Vehicle Body Type','Vehicle Make', 'Violation Code', 'Issue Date').withColumn('Issue Date', unix_timestamp(df['Issue Date'], pattern1).cast('timestamp'))
df1.printSchema()

root
 |-- Registration State: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Issue Date: timestamp (nullable = true)



### **** Find the total number of tickets for the year 2017****


In [10]:
# Question: Find the total number of tickets for the year
# Solution: The total number of tickets for the year 2017 is 5431918 and is shown in a tabular format below

df2 = df1.groupBy(year('Issue Date').alias('year')).count().orderBy('count', ascending = False)
df2.where(df2['year'] == 2017).show()

+----+-------+
|year|  count|
+----+-------+
|2017|5431918|
+----+-------+



### **** Find out the number of unique states from where the cars that got parking tickets came. ****


In [11]:
# Question: Find out the number of unique states from where the cars that got parking tickets came. 
# Solution: There are 67 distinct states including '99' and is shown in a tabular format below

df2 = df1.select('Registration State').distinct()
df2.show(df2.count(),False)


+------------------+
|Registration State|
+------------------+
|AZ                |
|SC                |
|NS                |
|LA                |
|MN                |
|NJ                |
|MX                |
|DC                |
|OR                |
|99                |
|NT                |
|VA                |
|RI                |
|WY                |
|KY                |
|BC                |
|NH                |
|MI                |
|GV                |
|NV                |
|QB                |
|WI                |
|ID                |
|CA                |
|CT                |
|NE                |
|MT                |
|NC                |
|VT                |
|MD                |
|DE                |
|MO                |
|IL                |
|ME                |
|MB                |
|WA                |
|ND                |
|MS                |
|AL                |
|IN                |
|OH                |
|TN                |
|IA                |
|NM                |
|PA          

### **** Find out the number of unique states from where the cars that got parking tickets came after replacing bad               data '99' with the state having highest entries ****


In [12]:
# Question: Find out the number of unique states from where the cars that got parking tickets came after replacing bad data '99' with the state having highest entries
# Solution: There are 66 distinct states that got parking tickets after replacing bad data '99' with 'NY' and is shown in a tabular format below

# First finding the dataframe having only state with highest entries for replacing and converted to pandas
dp = df.groupBy('Registration State').count().orderBy('count', ascending = False).limit(1).toPandas()

# Selecting only columns that are required for analysis 'Registration State','Vehicle Body Type','Vehicle Make', 'Violation Code', 'Issue Date' and converting Issue Date to timestamp
# replaced the bad input '99' with the state calculated above
df1 = df.select('Registration State','Vehicle Body Type','Vehicle Make', 'Violation Code', 'Issue Date').withColumn('Registration State', regexp_replace('Registration State', '99', dp['Registration State'].values[0])).withColumn('Issue Date', unix_timestamp(df['Issue Date'], pattern1).cast('timestamp'))
df2 = df1.select('Registration State').distinct()
df2.show(df2.count(),False)


+------------------+
|Registration State|
+------------------+
|AZ                |
|SC                |
|NS                |
|LA                |
|MN                |
|NJ                |
|MX                |
|DC                |
|OR                |
|NT                |
|VA                |
|RI                |
|WY                |
|KY                |
|BC                |
|NH                |
|MI                |
|GV                |
|NV                |
|QB                |
|WI                |
|ID                |
|CA                |
|CT                |
|NE                |
|MT                |
|NC                |
|VT                |
|MD                |
|DE                |
|MO                |
|IL                |
|ME                |
|MB                |
|WA                |
|ND                |
|MS                |
|AL                |
|IN                |
|OH                |
|TN                |
|IA                |
|NM                |
|PA                |
|SD          

### **** Display the top 20 states with the most number of tickets along with their ticket count. ****


In [13]:
# Question: Display the top 20 states with the most number of tickets along with their ticket count.
# Solution: The top 20 states which got parking tickets are shown below

df1.groupBy('Registration State').count().orderBy('count', ascending = False).show(20,False)

+------------------+-------+
|Registration State|count  |
+------------------+-------+
|NY                |8517686|
|NJ                |925965 |
|PA                |285419 |
|FL                |144556 |
|CT                |141088 |
|MA                |85547  |
|IN                |80749  |
|VA                |72626  |
|MD                |61800  |
|NC                |55806  |
|IL                |37329  |
|GA                |36852  |
|TX                |36516  |
|AZ                |26426  |
|OH                |25302  |
|CA                |24260  |
|SC                |21836  |
|ME                |21574  |
|MN                |18227  |
|OK                |18165  |
+------------------+-------+
only showing top 20 rows



### **** How often does each violation code occur? Display the frequency of the top five violation codes. ****

In [14]:
# Question: How often does each violation code occur? Display the frequency of the top five violation codes.
# Solution: The top five violation codes are 21, 36, 38, 14, 20 and thier respectives counts are shown below

df1.groupBy('Violation Code').count().orderBy('count', ascending = False).show(5,False)

+--------------+-------+
|Violation Code|count  |
+--------------+-------+
|21            |1528588|
|36            |1400614|
|38            |1062304|
|14            |893498 |
|20            |618593 |
+--------------+-------+
only showing top 5 rows



### **** How often does each 'vehicle body type' get a parking ticket? ****

In [15]:
# Question: How often does each 'vehicle body type' get a parking ticket? 
# Solution: The top five vehicle body types that got parking tickets are SUBN, 4DSD, VAN, DELV, SDN and thier respectives counts are shown below

df1.groupBy('Vehicle Body Type').count().orderBy('count', ascending = False).show(5,False)


+-----------------+-------+
|Vehicle Body Type|count  |
+-----------------+-------+
|SUBN             |3719802|
|4DSD             |3082020|
|VAN              |1411970|
|DELV             |687330 |
|SDN              |438191 |
+-----------------+-------+
only showing top 5 rows



### **** How about the 'vehicle make'? Find the top 5 for both. ****

In [16]:
# Question: How about the 'vehicle make'? Find the top 5 for both.
# Solution: The top 5 vehicle makes that got parking tikcet are FORD, TOYOT, HONDA, NISSA, CHEVR and thier respective counts are shown below

df1.groupBy('Vehicle Make').count().orderBy('count', ascending = False).show(5,False)


+------------+-------+
|Vehicle Make|count  |
+------------+-------+
|FORD        |1280958|
|TOYOT       |1211451|
|HONDA       |1079238|
|NISSA       |918590 |
|CHEVR       |714655 |
+------------+-------+
only showing top 5 rows



### **** Finding the frequencies of tickets for each season ****

In [17]:
# Question: Finding the frequencies of tickets for each season
# Solution: Below is the udf to convert a month to a season winter(1,2,3),spring(4,5,6),summer(7,8,9),autumn(10,11,12) and thier respective counts are shown below

# UDF using annotation
@udf("string")
def monthToSeason(month):
    if month in (1, 2, 3):
        season = 'winter'
    elif month in (4, 5, 6):
        season = 'spring'
    elif month in (7, 8, 9):
        season = 'summer'
    else:
        season = 'autumn'
    return season


df1.groupBy(monthToSeason(month('Issue Date')).alias('Season')).count().orderBy('count', ascending = False).show()

+------+-------+
|Season|  count|
+------+-------+
|spring|3018840|
|winter|2671332|
|autumn|2648920|
|summer|2463936|
+------+-------+



### **** Finding the three most common violations for each of these seasons ****

In [18]:
# Question: Finding the three most common violations for each of these seasons
# Solution: Below listed are the top 3 most common violations per seasons which are (21,36,38)

df2 = df1.groupBy(monthToSeason(month('Issue Date')).alias('Season'), 'Violation Code').count().orderBy('count', ascending = False)
df2.where(df2['Season'] == 'winter').show(3)
df2.where(df2['Season'] == 'spring').show(3)
df2.where(df2['Season'] == 'summer').show(3)
df2.where(df2['Season'] == 'autumn').show(3)

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|winter|            21|374202|
|winter|            36|348240|
|winter|            38|287017|
+------+--------------+------+
only showing top 3 rows

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|spring|            21|421184|
|spring|            36|369902|
|spring|            38|266909|
+------+--------------+------+
only showing top 3 rows

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|summer|            21|385774|
|summer|            38|244985|
|summer|            36|239879|
+------+--------------+------+
only showing top 3 rows

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|autumn|            36|442593|
|autumn|            21|347428|
|autumn|            38|263393|
+------+--------------+------+
only showing top 3 rows



### **** Find the total occurrences of the three most common violation codes. ****

In [19]:
# Question: Find the total occurrences of the three most common violation codes.
# Solution: Below are the top most common violation codes (21,36,38) and thier respective counts

df2 = df1.groupBy('Violation Code').count().orderBy('count', ascending = False).limit(3)
df2.show()

+--------------+-------+
|Violation Code|  count|
+--------------+-------+
|            21|1528588|
|            36|1400614|
|            38|1062304|
+--------------+-------+



### **** Find the total amount collected for each of the three violation codes with the maximum tickets. ****

In [20]:
# Question: Find the total amount collected for each of the three violation codes with the maximum tickets.
# Solution: Below is the total amount collected for the top three violation codes (21, 36, 38) and thier average respective fines are 65$, 50$, 50$

@udf("int")
def getFine(code):
    if code == 21:
        fine = 65
    elif code == 36:
        fine = 50
    elif code == 38:
        fine = 50
    else:
        fine = 0
    return fine

df3 = df2.withColumn('Total $ amount collected', getFine(df2['Violation Code']) * df2['count']).select('Violation Code','Total $ amount collected')
df3.show()


+--------------+------------------------+
|Violation Code|Total $ amount collected|
+--------------+------------------------+
|            21|                99358220|
|            36|                70030700|
|            38|                53115200|
+--------------+------------------------+



### **** State the code that has the highest total collection (only based on the top 3 tickets) ****

In [21]:
# Question: State the code that has the highest total collection (only based on the top 3 tickets)
# Solution: Violation Code 21 has the highest total collection and is shown below and toal amount collected is already listed above 
dp1 = df3.toPandas()
dp1['Violation Code'].values[0]

21

### **** Find the top 3 states that have the highest ticket revenue based on the top 3 violation codes alone. ****

In [22]:
# Question: Find the top 3 states that have the highest ticket revenue based on the top 3 violation codes alone.
# Solution: Below are the top 3 states that had the most revenue from top three violation codes (21, 36, 38)

df2 = df1.groupBy('Registration State', 'Violation Code').count().where((df1['Violation Code'] == 21) | (df1['Violation Code'] == 36) | (df1['Violation Code'] == 38));
df3 = df2.withColumn('Total Revenue per violation', df2['count'] * getFine(df2['Violation Code']))
df4 = df3.groupBy('Registration State').agg(sum('Total Revenue per violation').alias('Total Revenue')).orderBy('Total Revenue', ascending = False)
df4.show(3)

+------------------+-------------+
|Registration State|Total Revenue|
+------------------+-------------+
|                NY|    177078880|
|                NJ|     14753770|
|                PA|      6944560|
+------------------+-------------+
only showing top 3 rows



## What can you intuitively infer from these findings?

### Top 3 Analysis

* The neighbouring states of NY like NJ, PA had the highest ticket count as well as the highest revenue generated. It means that the tickets and revenue generated is directly related to the nearness of the state 'NY
* No parking where parking is not allowed by sign (Violation Code: 21) is the most common violation
* Most of the parking tickets are happening in Season spring

### Other Analysis
* The vehicle body types SUBN, 4DSD, VAN, DELV, SDN had the most parking tickets
* The Vehicle makes FORD, TOYOT, HONDA, NISSA, CHEVR had the the most parking tickets
