In [1]:
## Importing SparkSession and creating a spark object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC Case Study Assigment Shubham").getOrCreate()

In [2]:
## Reading the data into DataFrame
df = spark.read.format("csv").load('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv', header=True,inferSchema=True)
df

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

## Examining the df

In [3]:
# Checking Schema
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



In [4]:
# Checking Columns
df.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Violation Precinct',
 'Issuer Precinct',
 'Violation Time']

In [5]:
# Printing Top 5 rows 
df.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [6]:
# Checking the count of records in the dataframe
df.count()

10803028

#### Total no. of tickets for the year are 10,803,028

In [7]:
# Counting Distinct Registration States 
from pyspark.sql.functions import col, mean,  countDistinct, count

df.select('Registration State').agg(countDistinct('Registration State')).show()

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                67|
+----------------------------------+



#### There are 67 distinct Registration State but it has 99 value it in. Let's take care of that as well. 

In [8]:
# Checking Frequency of tickets in each Registration State
df.groupBy('Registration State').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show()

+------------------+---------+
|Registration State|frequency|
+------------------+---------+
|                NY|  8481061|
|                NJ|   925965|
|                PA|   285419|
|                FL|   144556|
|                CT|   141088|
|                MA|    85547|
|                IN|    80749|
|                VA|    72626|
|                MD|    61800|
|                NC|    55806|
|                IL|    37329|
|                GA|    36852|
|                99|    36625|
|                TX|    36516|
|                AZ|    26426|
|                OH|    25302|
|                CA|    24260|
|                SC|    21836|
|                ME|    21574|
|                MN|    18227|
+------------------+---------+
only showing top 20 rows



#### Since, NY has the maximum number of tickets replacing 99 with NY 

In [9]:
# Replacing 99 with the NY as it has most number of tickets issued in the given year 
from pyspark.sql.functions import * 
df = df.withColumn('Registration State', regexp_replace('Registration State', '99', 'NY'))

In [10]:
df.groupBy('Registration State').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show()

+------------------+---------+
|Registration State|frequency|
+------------------+---------+
|                NY|  8517686|
|                NJ|   925965|
|                PA|   285419|
|                FL|   144556|
|                CT|   141088|
|                MA|    85547|
|                IN|    80749|
|                VA|    72626|
|                MD|    61800|
|                NC|    55806|
|                IL|    37329|
|                GA|    36852|
|                TX|    36516|
|                AZ|    26426|
|                OH|    25302|
|                CA|    24260|
|                SC|    21836|
|                ME|    21574|
|                MN|    18227|
|                OK|    18165|
+------------------+---------+
only showing top 20 rows



In [11]:
# Counting Distinct Registration States Post fixing 99 column value
df.select('Registration State').agg(countDistinct('Registration State')).show()

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                66|
+----------------------------------+



#### Now we have 66 distinct Registration States

## Aggregation Tasks

### Q1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [12]:
df.groupBy('Violation Code').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show(5)

+--------------+---------+
|Violation Code|frequency|
+--------------+---------+
|            21|  1528588|
|            36|  1400614|
|            38|  1062304|
|            14|   893498|
|            20|   618593|
+--------------+---------+
only showing top 5 rows



#### Most violated code is '21' about 1.53 Million times followed by '36' about 1.40 Million times

### Q2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [13]:
# group by Vehicle Body type and printing top 5 rows 
df.groupBy('Vehicle Body Type').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show(5)

+-----------------+---------+
|Vehicle Body Type|frequency|
+-----------------+---------+
|             SUBN|  3719802|
|             4DSD|  3082020|
|              VAN|  1411970|
|             DELV|   687330|
|              SDN|   438191|
+-----------------+---------+
only showing top 5 rows



In [14]:
# Group by Vehicle maker and printing top 5 rows
df.groupBy('Vehicle Make').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show(5)

+------------+---------+
|Vehicle Make|frequency|
+------------+---------+
|        FORD|  1280958|
|       TOYOT|  1211451|
|       HONDA|  1079238|
|       NISSA|   918590|
|       CHEVR|   714655|
+------------+---------+
only showing top 5 rows



#### SUBN vehicle body type got most number of tickets about 3.72 Mn followed by 4DSD 3.08 Mn
#### FORD vehicles have the highest frequency of tickets about 1.28 Mn

### Q3.1 'Violation Precinct' tickets frequency 

In [15]:
# Checking Violation Precinct wise Tickets frequency
df.groupBy('Violation Precinct').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show(6)

+------------------+---------+
|Violation Precinct|frequency|
+------------------+---------+
|                 0|  2072400|
|                19|   535671|
|                14|   352450|
|                 1|   331810|
|                18|   306920|
|               114|   296514|
+------------------+---------+
only showing top 6 rows



#### Ignoring '0' as these are erroreous entries. The most tickets are from 19 (5.35 Mn) followed by 14 (3.52 Mn) Precinct 

### Q3.2 'Issuer Precinct' tickets frequency

In [16]:
# Checking Issuer Precint wise Tickets frequency 
df.groupBy('Issuer Precinct').agg(count('Summons Number').alias('frequency')).sort(col('frequency').desc()).show(6)

+---------------+---------+
|Issuer Precinct|frequency|
+---------------+---------+
|              0|  2388479|
|             19|   521513|
|             14|   344977|
|              1|   321170|
|             18|   296553|
|            114|   289950|
+---------------+---------+
only showing top 6 rows



#### Ignoring '0' as these are erroreous entries. The most tickets are issued by 19 (5.21 Mn) followed by 14 (3.45 Mn) Precinct 

### Q4. Find the violation code frequencies for three precincts that have issued the most number of tickets. 

In [17]:

# register DataFrame as temp table
df.createOrReplaceTempView("nyc_table")
# after registering temp table you we run your sql queries
spark.sql('''

select `Violation Code`, count(*) as frequency
from nyc_table
where `Violation Precinct` in ('19', '14', '1')
group by `Violation Code`
order by frequency desc

''').show()

+--------------+---------+
|Violation Code|frequency|
+--------------+---------+
|            14|   210865|
|            46|   129758|
|            38|    97816|
|            37|    89584|
|            69|    73875|
|            16|    72237|
|            21|    68782|
|            20|    61902|
|            31|    54868|
|            19|    36819|
|            40|    36506|
|            47|    32672|
|            71|    28177|
|            42|    27740|
|            84|    25420|
|            17|    23484|
|            10|    20706|
|            70|    15199|
|            82|    12569|
|             9|    11122|
+--------------+---------+
only showing top 20 rows



#### Violation Code# 14 (~211K) is the most frequent reason for a ticket followed by 46 (~130K) & 38 (~98K)

In [18]:
# SQL query to get Violation code for Top Violation Precinct i.e 19, 14 & 1
spark.sql('''
select * 
from 
(select *, dense_rank() over (partition by vp order by frequency desc ) as rank
from 
(select `Violation Precinct` as vp, `Violation Code` as vc, count(*) as frequency
from nyc_table
where `Violation Precinct` in ('19', '14', '1')
group by `Violation Precinct`, `Violation Code`
order by frequency desc
)a
)b 
where rank <= 3 

''').show()

+---+---+---------+----+
| vp| vc|frequency|rank|
+---+---+---------+----+
|  1| 14|    76375|   1|
|  1| 16|    39197|   2|
|  1| 20|    28768|   3|
| 19| 46|    90530|   1|
| 19| 38|    74926|   2|
| 19| 37|    73359|   3|
| 14| 14|    75850|   1|
| 14| 69|    58032|   2|
| 14| 31|    40150|   3|
+---+---+---------+----+



#### In Pricent 19, Most violated code is 46 (90.5K) followed by 38 (74.9K) 
#### In Pricent 1, Most violated code is 14 (76.3K) followed by 16 (~39K) 
#### In Pricent 14, Most violated code is 14 (75.8K) followed by 69 (~58K) 

#### Violation Code 14 is highest in Pricent 1 & 14 leading to becoming the most frequent reason for a violation 

### Q5. Find out the properties of parking violations across different times of the day:

#### Checking for null values and handling them by dropping it 

In [19]:
# Checking null values 
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



In [20]:
# Dropping any null value, if any
df =  df.dropna(how='any')
df.count()

10803028

#### Looks like we don't have any null value. Let's continue with correcting Violation time format

In [21]:
# Converting Violation Time into suitable format 
import pyspark.sql.functions as F 
df2 = df.withColumn('Violation Time', F.concat(F.col('Violation Time'), F.lit('M')))
df3 = df2.withColumn('ts', F.to_timestamp('Violation Time','hhmma'))
df3.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-------------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|                 ts|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-------------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|        0143AM|1970-01-01 01:43:00|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|        0400PM|1970-01-01 16:00:00|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUB

In [22]:
# Inspecting top 5 rows post time format treatment
df4 = df3.withColumn('time', F.date_format(F.col('ts'), format='HH:mm'))
df4.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-------------------+-----+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|                 ts| time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-------------------+-----+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|        0143AM|1970-01-01 01:43:00|01:43|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|        0400PM|1970-01-01 16:00:00|16:00|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00| 

### Q5.1 Time Bucket wise Most Violated Code Frequency

In [23]:

# register df4 as temp table
df4.createOrReplaceTempView("nyc_table")

# Getting top three Violation codes in each time bucket 
spark.sql('''
select * 
from 
(select *, dense_rank() over (partition by time_bucket order by frequency desc) as rank 
from 
(
select (case 
when time < '04:00' then '0-4 Hr' 
when time < '08:00' then '4-8 Hr'
when time < '12:00' then '8-12 Hr'
when time < '16:00' then '12-16 Hr'
when time < '20:00' then '16-20 Hr'
when time <= '24:00' then '20-24 Hr'
else 'Others'
end) as time_bucket, `Violation Code` as vc, count(*) as frequency
from nyc_table
group by time_bucket, `Violation Code`
order by frequency desc
)a
)b 
where rank <= 3 


''').show(25)

+-----------+---+---------+----+
|time_bucket| vc|frequency|rank|
+-----------+---+---------+----+
|   16-20 Hr| 38|   203232|   1|
|   16-20 Hr| 37|   145784|   2|
|   16-20 Hr| 14|   144748|   3|
|     0-4 Hr| 21|    53600|   1|
|     0-4 Hr| 40|    44737|   2|
|     0-4 Hr| 78|    28716|   3|
|     Others| 21|    23925|   1|
|     Others| 14|     8029|   2|
|     Others| 40|     6263|   3|
|     4-8 Hr| 14|   141275|   1|
|     4-8 Hr| 21|   119466|   2|
|     4-8 Hr| 40|   112186|   3|
|   20-24 Hr|  7|    65593|   1|
|   20-24 Hr| 38|    47029|   2|
|   20-24 Hr| 14|    44778|   3|
|   12-16 Hr| 36|   588395|   1|
|   12-16 Hr| 38|   462756|   2|
|   12-16 Hr| 37|   337074|   3|
|    8-12 Hr| 21|  1182676|   1|
|    8-12 Hr| 36|   751422|   2|
|    8-12 Hr| 38|   346518|   3|
+-----------+---+---------+----+



#### We can see that in different time bucket we have different violation code. During late night hours i.e 0-4 Hr & 4-8 Hr bucket we can see 21 & 14 respectively. 

### Q5.2 Time Bucket wise Tickets Frequency

In [24]:
# SQL query for getting Time Bucket wise Tickets frequency 
spark.sql('''
select (case 
when time < '04:00' then '0-4 Hr' 
when time < '08:00' then '4-8 Hr'
when time < '12:00' then '8-12 Hr'
when time < '16:00' then '12-16 Hr'
when time < '20:00' then '16-20 Hr'
when time <= '24:00' then '20-24 Hr'
else 'Others'
end) as time_bucket,  count(*) as frequency
from nyc_table
group by time_bucket
order by frequency desc


''').show(25)

+-----------+---------+
|time_bucket|frequency|
+-----------+---------+
|    8-12 Hr|  4306112|
|   12-16 Hr|  3591491|
|   16-20 Hr|  1296358|
|     4-8 Hr|   883502|
|   20-24 Hr|   382297|
|     0-4 Hr|   284430|
|     Others|    58838|
+-----------+---------+



#### Most of the tickets are issued using Morning & Afternoon hours i.e 8-12 Hr & 12-16 Hr

In [29]:
# SQL query for getting Time Bucket wise Tickets frequency for most common Violation Code i.e 21, 14 & 38
spark.sql('''
select (case 
when time < '04:00' then '0-4 Hr' 
when time < '08:00' then '4-8 Hr'
when time < '12:00' then '8-12 Hr'
when time < '16:00' then '12-16 Hr'
when time < '20:00' then '16-20 Hr'
when time <= '24:00' then '20-24 Hr'
else 'Others'
end) as time_bucket,  count(*) as frequency
from nyc_table
where `Violation Code` in ('14','21', '38')
group by time_bucket
order by frequency desc


''').show(25)

+-----------+---------+
|time_bucket|frequency|
+-----------+---------+
|    8-12 Hr|  1803478|
|   12-16 Hr|   867065|
|   16-20 Hr|   348531|
|     4-8 Hr|   263041|
|   20-24 Hr|    92170|
|     0-4 Hr|    78134|
|     Others|    31971|
+-----------+---------+



#### For most common Violation Code which is 14, 21 & 38. Most Tickets were issued in the morning 8-12 Hr (~1.80 Mn) followed by Aternoon 12-16 Hr (~867K)

### Q6. Checking Seasonality in the Data

In [25]:
# SQL query to get month wise tickets frequency 
spark.sql('''

select extract(month from `Issue Date`) as month,  count(*) as frequency
from nyc_table
group by month
order by frequency desc


''').show(25)

+-----+---------+
|month|frequency|
+-----+---------+
|    6|  1103400|
|    5|  1026534|
|   10|   969825|
|    3|   965247|
|    9|   961128|
|   11|   899849|
|    4|   888906|
|    1|   878580|
|    2|   827505|
|    8|   801774|
|   12|   779246|
|    7|   701034|
+-----+---------+



#### So, June is the month in which most of the tickets were issued i.e ~1.10 Mn followed by May (~1.03 Mn) and then October ~9.70 Mn

In [26]:
# SQL query for season wise Tickets frequency 
spark.sql('''

select (case 
when month in (3,4,5) then 'Spring' 
when month in (6,7,8) then 'Summer'
when month in (9,10,11) then 'Autumn'
when month in (12,1,2) then 'Winter' 
else null end ) as seasons, sum(frequency) as frequency  
from 
(select extract(month from `Issue Date`) as month,  count(*) as frequency
from nyc_table
group by month
order by frequency desc
)a
group by seasons
order by frequency desc 


''').show(25)

+-------+---------+
|seasons|frequency|
+-------+---------+
| Spring|  2880687|
| Autumn|  2830802|
| Summer|  2606208|
| Winter|  2485331|
+-------+---------+



#### The highest number of tickets are issued in Spring season but there is not much of a significant difference in frequency across seasons

In [27]:
# SQL query for getting top Violation codes for each season based on tickets issued 
spark.sql('''
select * 
from 
(
select *, dense_rank() over (partition by seasons order by frequency desc) as rank 
from 
(
select (case 
when month in (3,4,5) then 'Spring' 
when month in (6,7,8) then 'Summer'
when month in (9,10,11) then 'Autumn'
when month in (12,1,2) then 'Winter' 
else null end ) as seasons, vc, sum(frequency) as frequency  
from 
(
select extract(month from `Issue Date`) as month, `Violation Code` as vc, count(*) as frequency
from nyc_table
group by month, vc
order by frequency desc
)c
group by seasons, vc
order by frequency desc 
)a
)b 
where rank <= 3 


''').show(25)

+-------+---+---------+----+
|seasons| vc|frequency|rank|
+-------+---+---------+----+
| Spring| 21|   402807|   1|
| Spring| 36|   344834|   2|
| Spring| 38|   271192|   3|
| Summer| 21|   405961|   1|
| Summer| 38|   247561|   2|
| Summer| 36|   240396|   3|
| Autumn| 36|   456046|   1|
| Autumn| 21|   357479|   2|
| Autumn| 38|   283828|   3|
| Winter| 21|   362341|   1|
| Winter| 36|   359338|   2|
| Winter| 38|   259723|   3|
+-------+---+---------+----+



#### In Spring Season Top 3 Violation Codes are 21, 23 & 38 
#### In Summer Season Top 3 Violation Codes are 21, 38 & 36
#### In Autumn Season Top 3 Violation Codes are 36, 21 & 38 
#### In Winter Season Top 3 Violation Codes are 21, 36 & 38 

#### 21 & 38 Violation Code has occured among Top 3 Violation Code across seasons 


### Q7. Revenue Collection from Tickets

In [28]:
# SQL query for Violation Code tickets frequency 
spark.sql('''
select `Violation Code`,  count(*) as frequency
from nyc_table
group by `Violation Code`
order by frequency desc 

''').show(5)

+--------------+---------+
|Violation Code|frequency|
+--------------+---------+
|            21|  1528588|
|            36|  1400614|
|            38|  1062304|
|            14|   893498|
|            20|   618593|
+--------------+---------+
only showing top 5 rows



#### Top 3 Violation Codes are 21, 36 & 38 with  1,528,588, 1,400,614 & 1,062,304 respectively
#### Violation Code 21 (No Parking) Fine is USD 55
#### Violation Code 36 (Exceeding the posted speed limit in or near a designated school zone) Fine is USD 50
#### Violation Code 38 (Failing to show a receipt or tag in the windshield) is USD 50

#### Total Revenue collection from Top 3 Tickets is USD (55x1,528,588, + 50x1,400,614  + 50x1,062,304)  = USD 207,218,240



### Observations:
#### 1. Most of the Tickets were due No Parking, Over speeding or Exceeding Parking Time Limit / showing relevant documents for the same
#### 2. There no much of a significant impact of season on tickets count 
#### 3. Most of the tickets were issued during the morning peak hours 8-12 Hr followed by afternoon hours of 12-4 Hrs
#### 4. Most Tickets were issued in 19, 1 & 14 Precincts

In [None]:
# Stopping Spark
spark.stop()