In [None]:
#! wget https://upgrad-spark-data.s3.amazonaws.com/Parking_Violations_Issued_-_Fiscal_Year_2017.csv

New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets.

 

In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available on Kaggle. We will try and perform some exploratory analysis on a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017. 

 

In [1]:
#import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *

In [2]:
spark= SparkSession.builder.appName('nycparking').master("local").getOrCreate()

22/12/18 11:40:24 WARN Utils: Your hostname, Dipanwitas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.5 instead (on interface en0)
22/12/18 11:40:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/18 11:40:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df=spark.read.csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', header= True, inferSchema=True)

                                                                                

In [4]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [5]:
# drop duplicates
df= df.dropDuplicates()

In [6]:
#Find the total number of tickets for the year.
df.select('Summons Number').count()

22/12/18 11:42:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

10803028

In [7]:
# There is a numeric entry '99' in the column'Registration State, which should be corrected. 
# Replace it with the state having the maximum entries. Provide the number of unique states again.
df.select('Registration State').where(df['Registration State']==99).count()

                                                                                

36625

In [8]:
max_tickets=df.groupby('Registration State').count().orderBy('count',ascending=False).first()['Registration State']

                                                                                

In [9]:
df=df.withColumn('Registration State', when(df['Registration State']==99, max_tickets).otherwise(df['Registration State']))

In [10]:
df.select('Registration State').where(df['Registration State']==99).count()

                                                                                

0

In [11]:
# Find out the number of unique states from where the cars that got parking tickets came. 
# (Hint: Use the column 'Registration State'.)
df.groupby('Registration State').count().orderBy('count', ascending=False).show()



+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|8517686|
|                NJ| 925965|
|                PA| 285419|
|                FL| 144556|
|                CT| 141088|
|                MA|  85547|
|                IN|  80749|
|                VA|  72626|
|                MD|  61800|
|                NC|  55806|
|                IL|  37329|
|                GA|  36852|
|                TX|  36516|
|                AZ|  26426|
|                OH|  25302|
|                CA|  24260|
|                SC|  21836|
|                ME|  21574|
|                MN|  18227|
|                OK|  18165|
+------------------+-------+
only showing top 20 rows



                                                                                

In [12]:
# How often does each violation code occur? Display the frequency of the top five violation codes.
df.groupBy('Violation Code').count().orderBy('count', ascending=False).show(5)



+--------------+-------+
|Violation Code|  count|
+--------------+-------+
|            21|1528588|
|            36|1400614|
|            38|1062304|
|            14| 893498|
|            20| 618593|
+--------------+-------+
only showing top 5 rows



                                                                                

In [13]:
#How often does each 'vehicle body type' get a parking ticket? 
df.groupBy('vehicle body type').count().orderBy('count', ascending=False).show(5)




+-----------------+-------+
|vehicle body type|  count|
+-----------------+-------+
|             SUBN|3719802|
|             4DSD|3082020|
|              VAN|1411970|
|             DELV| 687330|
|              SDN| 438191|
+-----------------+-------+
only showing top 5 rows



                                                                                

In [14]:

# How about the 'vehicle make'? (Hint: Find the top 5 for both.)
df.groupBy('vehicle make').count().orderBy('count', ascending=False).show(5)



+------------+-------+
|vehicle make|  count|
+------------+-------+
|        FORD|1280958|
|       TOYOT|1211451|
|       HONDA|1079238|
|       NISSA| 918590|
|       CHEVR| 714655|
+------------+-------+
only showing top 5 rows



                                                                                

In [15]:
#A precinct is a police station that has a certain zone of the city under its command. 
# Find the (5 highest) frequencies of tickets for each of the following:
#'Violation Precinct' (This is the precinct of the zone where the violation occurred). 
# Using this, can you draw any insights for parking violations in any specific areas of the city?
df.groupBy('Violation Precinct').count().orderBy('count', ascending=False).show(5)




+------------------+-------+
|Violation Precinct|  count|
+------------------+-------+
|                 0|2072400|
|                19| 535671|
|                14| 352450|
|                 1| 331810|
|                18| 306920|
+------------------+-------+
only showing top 5 rows



                                                                                

In [16]:

#'Issuer Precinct' (This is the precinct that issued the ticket.)
df.groupBy('Issuer Precinct').count().orderBy('count', ascending=False).show(5)




+---------------+-------+
|Issuer Precinct|  count|
+---------------+-------+
|              0|2388479|
|             19| 521513|
|             14| 344977|
|              1| 321170|
|             18| 296553|
+---------------+-------+
only showing top 5 rows



                                                                                

In [17]:
# Here, you would have noticed that the dataframe has the 'Violating Precinct' or 'Issuing Precinct' as '0'. 
# These are erroneous entries. Hence, you need to provide the records for five correct precincts. 
# (Hint: Print the top six entries after sorting.)
df.groupBy('Violation Precinct').count().orderBy('count', ascending=False).show(6)




+------------------+-------+
|Violation Precinct|  count|
+------------------+-------+
|                 0|2072400|
|                19| 535671|
|                14| 352450|
|                 1| 331810|
|                18| 306920|
|               114| 296514|
+------------------+-------+
only showing top 6 rows



                                                                                

In [18]:
df.groupBy('Issuer Precinct').count().orderBy('count', ascending=False).show(6)



+---------------+-------+
|Issuer Precinct|  count|
+---------------+-------+
|              0|2388479|
|             19| 521513|
|             14| 344977|
|              1| 321170|
|             18| 296553|
|            114| 289950|
+---------------+-------+
only showing top 6 rows



                                                                                

In [19]:
# Find the violation code frequencies for three precincts that have issued the most number of tickets. 
# Do these precinct zones have an exceptionally high frequency of certain violation codes? 
# Are these codes common across precincts? 
#(Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)
df.createTempView('parking_violation')


In [22]:
query ='''Select * from
(select `Issuer Precinct`, `violation Code`, 
count(`Summons Number`) ,
row_number() over (partition by `Issuer Precinct` order by count(`Summons Number`) desc ) as rn
from parking_violation
where `Issuer Precinct` in 
    (select `Issuer Precinct` from parking_violation
    where `Issuer Precinct`!=0  
    group by `Issuer Precinct`
    order by count(`Issuer Precinct`) desc 
    limit 3)
group by `Issuer Precinct`, `violation Code`)
where rn<=3
'''

spark.sql(query).show()



+---------------+--------------+---------------------+---+
|Issuer Precinct|violation Code|count(Summons Number)| rn|
+---------------+--------------+---------------------+---+
|              1|            14|                73522|  1|
|              1|            16|                38937|  2|
|              1|            20|                27841|  3|
|             14|            14|                73837|  1|
|             14|            69|                58026|  2|
|             14|            31|                39857|  3|
|             19|            46|                86390|  1|
|             19|            37|                72437|  2|
|             19|            38|                72344|  3|
+---------------+--------------+---------------------+---+



                                                                                

In [23]:

# Find out the properties of parking violations across different times of the day:
#Find a way to deal with missing values, if any.
#(Hint: Check for the null values using 'isNull' under the SQL. 

query= '''select count(`Summons Number`) as counts from parking_violation
where `Violation Time` is NULL'''

spark.sql(query).show()



+------+
|counts|
+------+
|    63|
+------+



                                                                                

In [24]:
df=df.filter('`Violation Time` is not NULL')

In [25]:
df.createOrReplaceTempView('parking_violation')

In [26]:
query= '''select count(`Summons Number`) as counts from parking_violation
where `Violation Time` is NULL'''

spark.sql(query).show()



+------+
|counts|
+------+
|     0|
+------+



                                                                                

In [27]:
df.select('Violation Time').orderBy('Violation Time').show()



+--------------+
|Violation Time|
+--------------+
|         .240P|
|         .359A|
|         .933A|
|         0+45A|
|         0.22A|
|         0.47A|
|         0.51P|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
|         0000A|
+--------------+
only showing top 20 rows



                                                                                

In [28]:
# there are a few data which does not look right. dropping those records
df= df.where(~(df['Violation Time'].isin(['.240P','.359A', '.933A','0+45A', '0.22A', '0.47A', '0.51P'])))

In [29]:
# The Violation Time field is specified in a strange format. 
# Find a way to make this a time attribute that you can use to divide into groups.
df=df.withColumn('Violation Time', lpad(df['Violation Time'], 5,'0'))


In [30]:

df=df.withColumn('Violation Time', \
    when(df['Violation Time'].contains('A'), \
        regexp_replace(df['Violation Time'], 'A', 'am')).\
            otherwise(regexp_replace(df['Violation Time'], 'P', 'pm')))

In [31]:
pattern= 'hhmma'
df=df.withColumn('Violationtime', to_timestamp(df['Violation Time'], pattern))


In [32]:
df.createOrReplaceTempView('parking_violation')

In [33]:
#Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
# For each of these groups, find the three most commonly occurring violations.
#(Hint: Use the CASE-WHEN in SQL view to segregate into bins. 
# To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)
query= '''Select * from 
(select `violation Code`, TIME, 
count(`violation Code`) as counts,
row_number() over (partition by TIME order by count(`violation Code`) desc ) as rn
from        
 (select `violation Code`,  case when hour(`ViolationTime`) between 0 and 3 then '0-3'
    when hour(`ViolationTime`) between 4 and 7 then '4-7'
    when hour(`ViolationTime`) between 8 and 11 then '8-11'
    when hour(`ViolationTime`) between 12 and 15 then '12-15'
    when hour(`ViolationTime`) between 16 and 20 then '16-20'
    when hour(`ViolationTime`) between 21 and 23 then '21-23' END AS TIME
    from parking_violation) temp
group by `violation Code` ,TIME )
where rn<=3'''

spark.sql(query).show()



+--------------+-----+-------+---+
|violation Code| TIME| counts| rn|
+--------------+-----+-------+---+
|            21| null|  23924|  1|
|            14| null|   8028|  2|
|            40| null|   6260|  3|
|            21|  0-3|  53600|  1|
|            40|  0-3|  44737|  2|
|            78|  0-3|  28716|  3|
|            36|12-15| 588395|  1|
|            38|12-15| 462756|  2|
|            37|12-15| 337074|  3|
|            38|16-20| 221419|  1|
|            14|16-20| 156366|  2|
|             7|16-20| 152038|  3|
|             7|21-23|  45323|  1|
|            40|21-23|  34688|  2|
|            14|21-23|  33160|  3|
|            14|  4-7| 141275|  1|
|            21|  4-7| 119466|  2|
|            40|  4-7| 112186|  3|
|            21| 8-11|1182676|  1|
|            36| 8-11| 751422|  2|
+--------------+-----+-------+---+
only showing top 20 rows



                                                                                

In [34]:
# Let’s try and find some seasonality in this data:
#First, divide the year into a certain number of seasons, 
# and find the frequencies of tickets for each season. 
# (Hint: Use Issue Date to segregate into seasons.)

# cnvert the issue date to date
df.select('Issue Date').show()




+----------+
|Issue Date|
+----------+
|09/21/2016|
|07/07/2016|
|03/16/2017|
|10/17/2016|
|12/28/2016|
|05/17/2017|
|12/08/2016|
|09/22/2016|
|02/01/2017|
|01/26/2017|
|03/29/2017|
|09/07/2016|
|01/05/2017|
|07/25/2016|
|04/11/2017|
|03/02/2017|
|12/11/2016|
|01/19/2017|
|05/13/2017|
|05/27/2017|
+----------+
only showing top 20 rows



                                                                                

In [35]:
pattern= 'MM/dd/yyyy'
df=df.withColumn('Issue Date', to_timestamp(df['Issue Date'], pattern))

In [36]:
df=df.withColumn('Season', when (month(df['Issue Date']).isin(11,12,1,2),'winter').\
    when(month(df['Issue Date']).isin(3,4,5,6,),'summer').\
    when(month(df['Issue Date']).isin(7,8,9,10),'rainy'))

In [37]:
df.groupby('Season').count().orderBy('count', ascending=False).show()



+------+-------+
|Season|  count|
+------+-------+
|summer|3984066|
| rainy|3433735|
|winter|3385157|
+------+-------+



                                                                                

In [38]:

#Then, find the three most common violations for each of these seasons.
#(Hint: You can use an approach similar to the one mentioned in the hint for question 4.)
seasonal_violation=df.groupby('Season', 'violation Code').count()

In [39]:
seasonal_violation1=seasonal_violation.select('Season', 'violation Code', row_number().over(Window.partitionBy('Season').orderBy('count')).alias('rn'))

In [40]:
seasonal_violation1.filter(seasonal_violation1['rn']<='3').show()



+------+--------------+---+
|Season|violation Code| rn|
+------+--------------+---+
| rainy|            57|  1|
| rainy|             0|  2|
| rainy|            34|  3|
|summer|            87|  1|
|summer|            54|  2|
|summer|            57|  3|
|winter|            44|  1|
|winter|            57|  2|
|winter|            28|  3|
+------+--------------+---+



                                                                                

In [41]:
#The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. 
# Let’s take an example of estimating this for the three most commonly occurring codes:

#Find the total occurrences of the three most common violation codes.

df.createOrReplaceTempView('parking_violation')

In [42]:
query= '''select `violation code`, 
            count(`violation code`) as counts 
            from parking_violation 
            group by `violation code` 
            order by count(`violation code`) desc
            limit 3;'''

most_violation=spark.sql(query)

In [43]:
most_violation.show()



+--------------+-------+
|violation code| counts|
+--------------+-------+
|            21|1528587|
|            36|1400614|
|            38|1062304|
+--------------+-------+



                                                                                

In [44]:
#Then, visit the website: http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page

#It lists the fines associated with different violation codes. 
# They’re divided into two categories: 
#   one for the highest-density locations in the city 
#   the other for the rest of the city. 
# 
# For the sake of simplicity, take the average of the two.

finedata=[{'code':21, 'metro':65, 'nonmetro':65},
{'code':36, 'metro':50, 'nonmetro':50},
{'code':38, 'metro':65, 'nonmetro':35}]

finedf= spark.createDataFrame(finedata)
finedf.show()



+----+-----+--------+
|code|metro|nonmetro|
+----+-----+--------+
|  21|   65|      65|
|  36|   50|      50|
|  38|   65|      35|
+----+-----+--------+



In [45]:
finedf=finedf.withColumn('averagefine', (finedf['metro']+finedf['nonmetro'])/2)
finedf.show()

+----+-----+--------+-----------+
|code|metro|nonmetro|averagefine|
+----+-----+--------+-----------+
|  21|   65|      65|       65.0|
|  36|   50|      50|       50.0|
|  38|   65|      35|       50.0|
+----+-----+--------+-----------+



In [46]:
#Using this information, find the total amount collected for the three violation codes with the maximum tickets. 
# State the code that has the highest total collection.
#
# What can you intuitively infer from these findings?
joined_table=most_violation.join(finedf, most_violation['violation code']==finedf['code']  , how= 'inner')

In [48]:
joined_table.select('violation code', (joined_table['counts']*joined_table['averagefine']).alias('fines collected')).orderBy('fines collected', ascending=False).show(truncate=False)



+--------------+---------------+
|violation code|fines collected|
+--------------+---------------+
|21            |9.9358155E7    |
|36            |7.00307E7      |
|38            |5.31152E7      |
+--------------+---------------+



                                                                                

Thus the top vilation code is 

- Code 21: Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device.
- Code 36: Exceeding the posted speed limit in or near a designated school zone.
- Code 38: Failing to show a receipt or tag in the windshield.