In [1]:
# create Spark session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("NYC Parking tkt") \
    .getOrCreate()

In [2]:
# import necessary packages
import pandas as pd
from datetime import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [3]:
# read the input data
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [4]:
df.show(10)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

### Preprocess data

In [5]:
# create year column from Issue date
df = df.withColumn('issue_year', F.year(F.col('Issue Date')))
df = df.withColumn('issue_month', F.month(F.col('Issue Date')))

In [6]:
# keep data only for year=2017
df = df.where(F.col('issue_year') == 2017)

### Examine the data

#### Find the total number of tickets for the year.

Assuming the data contains only 1 year of data

In [8]:
#### Find the total number of tickets for the year.

##Assuming the data contains only 1 year of data

total_tickets_ty = df.select('Summons Number').distinct().count()
print("Total number of summons in a year: ", total_tickets_ty)

Total number of summons in a year:  5431918


##There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [10]:
total_dist_states = df.select('Registration State').distinct().count()
print("Total number distinct states: ", total_dist_states)

Total number distinct states:  65


In [11]:
# find the state with highest frequency
highest_frequency_state = df.groupBy('Registration State').agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

highest_frequency_state = highest_frequency_state.orderBy(F.col('ticket_count').desc())

highest_frequency_state = highest_frequency_state.collect()[0][0]
print('State with highest tickets count is: ', highest_frequency_state)

State with highest tickets count is:  NY


In [12]:
# replace 99 with highest ticket count state and get distinct state count again
df = df.replace('99', highest_frequency_state, 'Registration State')

total_dist_states = df.select('Registration State').distinct().count()
print("Total number distinct states after replacing 99: ", total_dist_states)

Total number distinct states after replacing 99:  64


### Aggregation tasks

#### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [24]:
violation_code_freq = df.groupBy("Violation Code").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

violation_code_freq = violation_code_freq.orderBy(F.col('ticket_count').desc())

violation_code_freq.show(5, truncate=False)

+--------------+------------+
|Violation Code|ticket_count|
+--------------+------------+
|21            |768087      |
|36            |662765      |
|38            |542079      |
|14            |476664      |
|20            |319646      |
+--------------+------------+
only showing top 5 rows



#### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? (Hint: Find the top 5 for both.)

In [14]:
vehicle_body_freq = df.groupBy("Vehicle Body Type").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

vehicle_body_freq = vehicle_body_freq.orderBy(F.col('ticket_count').desc())

vehicle_body_freq.show(5, truncate=False)

+-----------------+------------+
|Vehicle Body Type|ticket_count|
+-----------------+------------+
|SUBN             |1883954     |
|4DSD             |1547312     |
|VAN              |724029      |
|DELV             |358984      |
|SDN              |194197      |
+-----------------+------------+
only showing top 5 rows



In [15]:
vehicle_make_freq = df.groupBy("Vehicle Make").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

vehicle_make_freq = vehicle_make_freq.orderBy(F.col('ticket_count').desc())

vehicle_make_freq.show(5, truncate=False)

+------------+------------+
|Vehicle Make|ticket_count|
+------------+------------+
|FORD        |636844      |
|TOYOT       |605291      |
|HONDA       |538884      |
|NISSA       |462017      |
|CHEVR       |356032      |
+------------+------------+
only showing top 5 rows



#### 3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
+ 'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?

In [16]:
# let's check in which precinct most tickets were cut

violation_pre_freq = df.groupBy("Violation Precinct").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

violation_pre_freq = violation_pre_freq.orderBy(F.col('ticket_count').desc())

violation_pre_freq.show(5, truncate=False)

+------------------+------------+
|Violation Precinct|ticket_count|
+------------------+------------+
|0                 |925596      |
|19                |274445      |
|14                |203553      |
|1                 |174702      |
|18                |169131      |
+------------------+------------+
only showing top 5 rows



In [17]:
# now let's check what are the top Vehicle-Make and Precinct combination

vio_make_pre_freq = df.groupBy("Violation Precinct", "Vehicle Make").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

vio_make_pre_freq = vio_make_pre_freq.orderBy(F.col('ticket_count').desc())

vio_make_pre_freq.show(5, truncate=False)

+------------------+------------+------------+
|Violation Precinct|Vehicle Make|ticket_count|
+------------------+------------+------------+
|0                 |TOYOT       |136752      |
|0                 |HONDA       |111818      |
|0                 |NISSA       |106306      |
|0                 |FORD        |85597       |
|19                |FORD        |55222       |
+------------------+------------+------------+
only showing top 5 rows



##+ 'Issuer Precinct' (This is the precinct that issued the ticket.)
Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)

In [18]:
issuer_pre_freq = df.groupBy("Issuer Precinct").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

issuer_pre_freq = issuer_pre_freq.orderBy(F.col('ticket_count').desc())

issuer_pre_freq.show(6, truncate=False)

+---------------+------------+
|Issuer Precinct|ticket_count|
+---------------+------------+
|0              |1078406     |
|19             |266961      |
|14             |200495      |
|1              |168740      |
|18             |162994      |
|114            |144054      |
+---------------+------------+
only showing top 6 rows



#### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 
(Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)

In [19]:
# find top 3 violation precinct
violation_pre_freq = df.groupBy("Violation Precinct").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

violation_pre_freq = violation_pre_freq.orderBy(F.col('ticket_count').desc()).limit(3)

violation_pre_freq = violation_pre_freq.select('Violation Precinct')

violation_pre_freq.show(100, truncate=False)

+------------------+
|Violation Precinct|
+------------------+
|0                 |
|19                |
|14                |
+------------------+



In [20]:
# get the violation codes for those 3 violation precinct

violation_pre_code_freq = df.groupBy("Violation Precinct", "Violation Code").agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

violation_pre_code_freq = violation_pre_code_freq.join(violation_pre_freq, on="Violation Precinct", how="inner")

violation_pre_code_freq = violation_pre_code_freq.orderBy(F.col('ticket_count').desc())

violation_pre_code_freq.show(100, truncate=False)

+------------------+--------------+------------+
|Violation Precinct|Violation Code|ticket_count|
+------------------+--------------+------------+
|0                 |36            |662765      |
|0                 |7             |210174      |
|19                |46            |50785       |
|0                 |5             |48076       |
|14                |14            |45885       |
|19                |38            |37483       |
|19                |37            |36468       |
|14                |69            |30465       |
|19                |14            |30376       |
|19                |21            |29415       |
|14                |31            |22649       |
|14                |47            |18691       |
|19                |20            |15132       |
|19                |40            |11519       |
|14                |42            |10027       |
|19                |16            |10006       |
|14                |46            |8411        |
|19                |

#### 5.Find out the properties of parking violations across different times of the day:

Find a way to deal with missing values, if any.
(Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

In [7]:
df = df.dropna()

In [16]:
def get_time(x):
    try:
        return datetime.strptime(x+"M", "%I%M%p").hour
    except:
        return None

time_udf = F.udf(get_time)

time_udf2 = F.udf(get_time)

The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

In [17]:
#time_udf = F.udf(lambda x: datetime.strptime(x+"M", "%H%M%p").hour)

df = df.withColumn('violation_hour', time_udf2(F.col('Violation Time')))
df = df.withColumn('violation_hour_groups', (F.col('violation_hour')/4).cast('int'))

df.show(2)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+-----------+--------------+---------------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|issue_year|issue_month|violation_hour|violation_hour_groups|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+-----------+--------------+---------------------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|      2017|          6|            11|                    2|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|      

In [18]:
format_udf = F.udf(lambda x: "{}-{}".format(x*4, x*4+3) if type(x) == int else None)

df = df.withColumn('violation_hour_groups', 
                   F.when(F.isnull(F.col('violation_hour_groups'))==False, format_udf(F.col('violation_hour_groups')))\
                   .otherwise(F.lit(None)))

df.show(2, truncate=False)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+-----------+--------------+---------------------+
|Summons Number|Plate ID|Registration State|Issue Date         |Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|issue_year|issue_month|violation_hour|violation_hour_groups|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+-----------+--------------+---------------------+
|8478629828    |66623ME |NY                |2017-06-14 00:00:00|47            |REFG             |MITSU       |14                |14             |1120A         |2017      |6          |11            |8-11                 |
|5096917368    |FZD8593 |NY                |2017-06-13 00:00:00|7             |SUBN             |ME/BE       |0     

#### Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
(Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

In [21]:
# get ticket count for each time group and violation code
group_violation_freq = df.groupBy(['violation_hour_groups', 'Violation Code']).agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

# filter top 3 violation code for each of the time groups
w = Window.partitionBy('violation_hour_groups').orderBy(F.col('ticket_count').desc())

group_violation_freq = group_violation_freq.withColumn('violation_rank', F.dense_rank().over(w))

top_group_violation_freq = group_violation_freq.where(F.col('violation_rank')<4)

top_group_violation_freq = top_group_violation_freq.orderBy('violation_hour_groups')

top_group_violation_freq = top_group_violation_freq.dropna()

top_group_violation_freq.show(100, truncate=False)

+---------------------+--------------+------------+--------------+
|violation_hour_groups|Violation Code|ticket_count|violation_rank|
+---------------------+--------------+------------+--------------+
|0-3                  |21            |26444       |1             |
|0-3                  |40            |22420       |2             |
|0-3                  |78            |13737       |3             |
|12-15                |38            |240721      |2             |
|12-15                |37            |167025      |3             |
|12-15                |36            |286284      |1             |
|16-19                |38            |102855      |1             |
|16-19                |14            |75902       |2             |
|16-19                |37            |70345       |3             |
|20-23                |7             |26293       |1             |
|20-23                |40            |22336       |2             |
|20-23                |14            |21045       |3          

In [20]:
# df.select('Violation Time'). distinct ().orderBy ('violation Time').show(100)

#### Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).
time_udf = F.udf()#### Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [27]:
# get top 3 violation codes

w = Window.partitionBy().orderBy(F.col('ticket_count').desc())

top3_violation_code_freq = violation_code_freq.withColumn('rank', F.dense_rank().over(w))

top3_violation_code_freq = top3_violation_code_freq.where(F.col('rank')<4)

top3_violation_code_freq = top3_violation_code_freq.select('Violation Code').distinct()

# merge these top violation codes with time group-violation frequency data we created above
top3_violation_code_group_freq = top3_violation_code_freq.join(group_violation_freq, on="Violation Code", how="inner")

# for each violcation code get top time group
w = Window.partitionBy('Violation Code').orderBy(F.col('ticket_count').desc())

top3_violation_code_group_freq = top3_violation_code_group_freq.withColumn('violation_rank', F.dense_rank().over(w))

top3_violation_code_group_freq = top3_violation_code_group_freq.where(F.col('violation_rank')==1)

top3_violation_code_group_freq = top3_violation_code_group_freq.drop('violation_rank')

top3_violation_code_group_freq.show(100, truncate=False)

+--------------+---------------------+------------+
|Violation Code|violation_hour_groups|ticket_count|
+--------------+---------------------+------------+
|38            |12-15                |240721      |
|21            |8-11                 |598062      |
|36            |8-11                 |348165      |
+--------------+---------------------+------------+



### 6. Let’s try and find some seasonality in this data:

#### First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

In [28]:
# create seasons
number_seasons = 4

df = df.withColumn('season', (F.col('issue_month')/number_seasons).cast('int'))

In [29]:
seasons_freq = df.groupBy('season').agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

seasons_freq = seasons_freq.orderBy('season')

seasons_freq.show(10, truncate=False)

+------+------------+
|season|ticket_count|
+------+------------+
|0     |2669069     |
|1     |2761203     |
|2     |1288        |
|3     |358         |
+------+------------+



#### Then, find the three most common violations for each of these seasons.
(Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

In [33]:
# get ticket count for each time group and violation code
season_violation_freq = df.groupBy(['season', 'Violation Code']).agg(F.countDistinct(F.col('Summons Number')).alias('ticket_count'))

# filter top 3 violation code for each of the time groups
w = Window.partitionBy('season').orderBy(F.col('ticket_count').desc())

season_violation_freq = season_violation_freq.withColumn('violation_rank', F.dense_rank().over(w))

top_season_violation_freq = season_violation_freq.where(F.col('violation_rank')<4)

top_season_violation_freq = top_season_violation_freq.orderBy('season')

top_season_violation_freq.show(100, truncate=False)

+------+--------------+------------+--------------+
|season|Violation Code|ticket_count|violation_rank|
+------+--------------+------------+--------------+
|0     |21            |373874      |1             |
|0     |36            |348240      |2             |
|0     |38            |287000      |3             |
|1     |36            |314525      |2             |
|1     |38            |255067      |3             |
|1     |21            |393957      |1             |
|2     |40            |149         |3             |
|2     |46            |288         |1             |
|2     |21            |212         |2             |
|3     |46            |77          |1             |
|3     |21            |44          |2             |
|3     |40            |44          |2             |
|3     |20            |33          |3             |
+------+--------------+------------+--------------+



#### 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

In [34]:
# printing top 3 violation codes

w = Window.partitionBy().orderBy(F.col('ticket_count').desc())

top3_violation_code_freq = violation_code_freq.withColumn('rank', F.dense_rank().over(w))

top3_violation_code_freq = top3_violation_code_freq.where(F.col('rank')<4)

top3_violation_code_freq.show(100, truncate=False)

+--------------+------------+----+
|Violation Code|ticket_count|rank|
+--------------+------------+----+
|21            |768087      |1   |
|36            |662765      |2   |
|38            |542079      |3   |
+--------------+------------+----+

