In [1]:
# We import the required modules from pyspark and create a SparkSession object

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.types import DateType

spark = SparkSession \
    .builder \
    .appName("PySpark Assignment") \
    .getOrCreate()

In [2]:
# The dataset is imported in nyc dataframe

nyc = spark.read.csv("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv", header=True)

In [3]:
# printSchema returns schema in tree format

nyc.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [4]:
nyc.schema

# Returns the schema of this DataFrame as a pyspark.sql.types.StructType.

StructType(List(StructField(Summons Number,StringType,true),StructField(Plate ID,StringType,true),StructField(Registration State,StringType,true),StructField(Issue Date,StringType,true),StructField(Violation Code,StringType,true),StructField(Vehicle Body Type,StringType,true),StructField(Vehicle Make,StringType,true),StructField(Violation Precinct,StringType,true),StructField(Issuer Precinct,StringType,true),StructField(Violation Time,StringType,true)))

In [5]:
# We print the first 5 rows of the dataframe

nyc.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

In [6]:
# Computing the missing values in the dataframe

nyc.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in nyc.columns]).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



In [7]:
# We filter the dataframe to consist of data only from the year 2017 as per the problem statement

nyc = nyc.select('Summons Number','Plate ID','Issue Date','Registration State','Violation Code','Vehicle Body Type','Vehicle Make','Violation Precinct','Issuer Precinct','Violation Time').where(year(nyc['Issue Date']) == 2017)

In [8]:
# We print the first 5 rows of the dataframe which has only 2017 data

nyc.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|
|    1407740258| 2513JMG|2017-01-11|                NY|            78|             DELV|       FRUEH|               106|            106|         0015A|
|    1413656420|T672371C|2017-02-04|                NY|            40|             TAXI|

# Examine the data

## 1. Find the total number of tickets for the year.

In [9]:
# Finding the total number of tickets for the year

nyc.count()

5431918

Total number of tickets for the year 2017 = 5431918

## 2. Find out the number of unique states from where the cars that got parking tickets came.

In [10]:
nyc_new = nyc.groupBy("Registration State").count()
nyc_new.show()

+------------------+------+
|Registration State| count|
+------------------+------+
|                SC| 10395|
|                AZ| 12379|
|                NS|   322|
|                LA|  1689|
|                MN| 10083|
|                NJ|475825|
|                DC|  1929|
|                OR|  2622|
|                99| 16055|
|                VA| 34367|
|                RI|  5814|
|                WY|   188|
|                KY|  1795|
|                BC|    54|
|                NH|  4119|
|                MI|  7231|
|                GV|   348|
|                NV|   725|
|                QB|  1998|
|                WI|  2127|
+------------------+------+
only showing top 20 rows



In [11]:
# NJ has maximum state count. Hence, we will replace 99 by NJ

nyc_unique = nyc.withColumn("Registration State", when(col("Registration State")=="99","NJ").otherwise(nyc["Registration State"]))

In [12]:
nyc_df = nyc_unique.groupBy("Registration State").count()
nyc_df.show()

+------------------+------+
|Registration State| count|
+------------------+------+
|                SC| 10395|
|                AZ| 12379|
|                NS|   322|
|                LA|  1689|
|                MN| 10083|
|                NJ|491880|
|                DC|  1929|
|                OR|  2622|
|                VA| 34367|
|                RI|  5814|
|                KY|  1795|
|                WY|   188|
|                BC|    54|
|                NH|  4119|
|                MI|  7231|
|                NV|   725|
|                GV|   348|
|                QB|  1998|
|                WI|  2127|
|                ID|   763|
+------------------+------+
only showing top 20 rows



In [13]:
nyc_unique.select("Registration State").distinct().count()

64

Number of unique states from where the cars that got parking tickets came = 64

# Aggregation tasks

## 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [14]:
nyc_unique.groupBy("Violation Code").count().sort(desc("count")).show(5)

+--------------+------+
|Violation Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
|            14|476664|
|            20|319646|
+--------------+------+
only showing top 5 rows



## 2.  How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [15]:
nyc_unique.groupBy("Vehicle Body Type").count().sort(desc("count")).show(5)

+-----------------+-------+
|Vehicle Body Type|  count|
+-----------------+-------+
|             SUBN|1883954|
|             4DSD|1547312|
|              VAN| 724029|
|             DELV| 358984|
|              SDN| 194197|
+-----------------+-------+
only showing top 5 rows



In [16]:
nyc_unique.groupBy("Vehicle Make").count().sort(desc("count")).show(5)

+------------+------+
|Vehicle Make| count|
+------------+------+
|        FORD|636844|
|       TOYOT|605291|
|       HONDA|538884|
|       NISSA|462017|
|       CHEVR|356032|
+------------+------+
only showing top 5 rows



## 3.  A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:

### 'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?

In [17]:
nyc_unique.groupBy("Violation Precinct").count().sort(desc("count")).show(6)

+------------------+------+
|Violation Precinct| count|
+------------------+------+
|                 0|925596|
|                19|274445|
|                14|203553|
|                 1|174702|
|                18|169131|
|               114|147444|
+------------------+------+
only showing top 6 rows



### 'Issuer Precinct' (This is the precinct that issued the ticket.)

In [18]:
nyc_unique.groupBy("Issuer Precinct").count().sort(desc("count")).show(6)

+---------------+-------+
|Issuer Precinct|  count|
+---------------+-------+
|              0|1078406|
|             19| 266961|
|             14| 200495|
|              1| 168740|
|             18| 162994|
|            114| 144054|
+---------------+-------+
only showing top 6 rows



## OBSERVATION:
For the all three years, following Precincts have highest frequencies of Violating Precincts & Issuing Precincts.
 ### 19
 ### 14
 ### 1
 ### 18
 ### 114

## 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

In [19]:
nyc_unique.groupBy("Violation Precinct").count().sort(desc("count")).show(4)

+------------------+------+
|Violation Precinct| count|
+------------------+------+
|                 0|925596|
|                19|274445|
|                14|203553|
|                 1|174702|
+------------------+------+
only showing top 4 rows



In [20]:
nyc_unique.createOrReplaceTempView("violation_code_frequencies")

In [21]:
# Violation Codes for Violation Precinct = 19

violation_code_frequency_19 = spark.sql("SELECT `Violation Code`, COUNT(1) as count \
                                 FROM violation_code_frequencies WHERE `Violation Precinct` == 19 GROUP BY `Violation Code`  \
                                 ORDER BY count DESC").show(5)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            46|50785|
|            38|37483|
|            37|36468|
|            14|30376|
|            21|29415|
+--------------+-----+
only showing top 5 rows



In [22]:
# Violation Codes for Violation Precinct = 14

violation_code_frequency_14 = spark.sql("SELECT `Violation Code`, COUNT(1) as count \
                                 FROM violation_code_frequencies WHERE `Violation Precinct` == 14 GROUP BY `Violation Code`  \
                                 ORDER BY count DESC").show(5)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            14|45885|
|            69|30465|
|            31|22649|
|            47|18691|
|            42|10027|
+--------------+-----+
only showing top 5 rows



In [23]:
# Violation Codes for Violation Precinct = 1

violation_code_frequency_1 = spark.sql("SELECT `Violation Code`, COUNT(1) as count \
                                 FROM violation_code_frequencies WHERE `Violation Precinct` == 1 GROUP BY `Violation Code`  \
                                 ORDER BY count DESC").show(5)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            14|40226|
|            16|19278|
|            20|15743|
|            46|13534|
|            38| 8588|
+--------------+-----+
only showing top 5 rows



## 5.  Find out the properties of parking violations across different times of the day:

### Find a way to deal with missing values, if any.

In [24]:
# ASSUMPTION:
# Ignored Missing Values

In [25]:
# Computing the missing values in the dataframe

nyc_unique.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in nyc_unique.columns]).show()

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|         0|                 0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+



### The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

In [26]:
nyc_unique.createOrReplaceTempView("violation_time_view") 

In [27]:
# New column 'ViolationTime24Hr' is made which has only the Hour value of time in 24 Hours format

violation_time_format = spark.sql("SELECT *, \
                                CASE \
                                    WHEN SUBSTR(TRIM(`Violation Time`), -1) == 'P' \
                                         THEN IF(SUBSTR(TRIM(`Violation Time`), 1, 2) == 12, SUBSTR(TRIM(`Violation Time`), 1, 2) + 0, SUBSTR(TRIM(`Violation Time`), 1, 2) + 12) \
                                    WHEN SUBSTR(TRIM(`Violation Time`), -1) == 'A' THEN SUBSTR(TRIM(`Violation Time`), 1, 2) + 0 \
                                END  AS ViolationTime24Hr \
                                FROM violation_time_view") \

In [28]:
# We print the first 5 rows of the new dataframe which consists of the entire given dataframe along with column 'ViolationTime24Hr'

violation_time_format.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|             11.0|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|             20.0|
|    1407740258| 2513JMG|2017-01-11|                NY|            78|             DELV|       FRUEH|               106|            106|         0015A

### Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.

### Considered Hour bins as below
### Bin 1 -- 0 to 3 hrs
### Bin 2 -- 4 to 7 hrs
### Bin 3 -- 8 to 11 hrs
### Bin 4 -- 12 to 15 hrs
### Bin 5 -- 16 to 19 hrs
### Bin 6 -- 20 to 23 hrs

In [29]:
violation_time_format.createOrReplaceTempView("nyc_view") 

In [30]:
# Hour Bin are created below

violation_time_bin = spark.sql("SELECT *, \
                                CASE \
                                    WHEN (ViolationTime24Hr >= 0 AND ViolationTime24Hr <= 3) THEN 1  \
                                    WHEN (ViolationTime24Hr >= 4 AND ViolationTime24Hr <= 5) THEN 2  \
                                    WHEN (ViolationTime24Hr >= 6 AND ViolationTime24Hr <= 11) THEN 3 \
                                    WHEN (ViolationTime24Hr >= 12 AND ViolationTime24Hr <= 15) THEN 4 \
                                    WHEN (ViolationTime24Hr >= 16 AND ViolationTime24Hr <= 19) THEN 5 \
                                    WHEN (ViolationTime24Hr >= 20 AND ViolationTime24Hr <= 23) THEN 6 \
                                END  AS Hour_Bin \
                                FROM nyc_view")

In [31]:
# We print the first 5 rows of the new dataframe which consists of the dataframe from previous step along with column 'Hour_Bin'

violation_time_bin.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|Hour_Bin|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|             11.0|       3|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|             20.0|       6|
|    1407740258| 2513JMG|2017-01-11|                NY|            78|             DELV|       FRUEH|    

In [32]:
violation_time_bin.createOrReplaceTempView("nyc_view_1")

In [33]:
# Three most commonly occurring violations for Hour_bin=1

violation_time_bin_1 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 1 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+-----+
|Hour_Bin|Violation Code|count|
+--------+--------------+-----+
|       1|            21|34704|
|       1|            40|23629|
|       1|            14|14168|
+--------+--------------+-----+



In [34]:
# Three most commonly occurring violations for Hour_bin=2

violation_time_bin_2 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 2 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+-----+
|Hour_Bin|Violation Code|count|
+--------+--------------+-----+
|       2|            40|14175|
|       2|            14| 7860|
|       2|            20| 7046|
+--------+--------------+-----+



In [35]:
# Three most commonly occurring violations for Hour_bin=3

violation_time_bin_3 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 3 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+------+
|Hour_Bin|Violation Code| count|
+--------+--------------+------+
|       3|            21|652968|
|       3|            36|362947|
|       3|            14|215279|
+--------+--------------+------+



In [36]:
# Three most commonly occurring violations for Hour_bin=4

violation_time_bin_4 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 4 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+------+
|Hour_Bin|Violation Code| count|
+--------+--------------+------+
|       4|            36|286284|
|       4|            38|240795|
|       4|            37|167044|
+--------+--------------+------+



In [37]:
# Three most commonly occurring violations for Hour_bin=5

violation_time_bin_5 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 5 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+------+
|Hour_Bin|Violation Code| count|
+--------+--------------+------+
|       5|            38|102855|
|       5|            14| 75902|
|       5|            37| 70345|
+--------+--------------+------+



In [38]:
# Three most commonly occurring violations for Hour_bin=6

violation_time_bin_6 = spark.sql("SELECT `Hour_Bin`, `Violation Code`, COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Hour_Bin` = 6 \
                                        GROUP BY `Hour_Bin`, `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------+--------------+-----+
|Hour_Bin|Violation Code|count|
+--------+--------------+-----+
|       6|             7|26293|
|       6|            40|22337|
|       6|            14|21045|
+--------+--------------+-----+



### Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part)

In [39]:
violation_time_bin.createOrReplaceTempView("nyc_view_2")

In [40]:
violation_time_bin.groupBy("Violation Code").count().sort(desc("count")).show(3)

+--------------+------+
|Violation Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
+--------------+------+
only showing top 3 rows



In [41]:
# The most common time of the day for Violation Code = 21

violation_time_bin_21 = spark.sql("SELECT `Violation Code`, ViolationTime24Hr , COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Violation Code` = 21 \
                                        GROUP BY `Violation Code`, ViolationTime24Hr \
                                        ORDER BY count DESC \
                                        LIMIT 6").show()

+--------------+-----------------+------+
|Violation Code|ViolationTime24Hr| count|
+--------------+-----------------+------+
|            21|              8.0|190820|
|            21|              9.0|187017|
|            21|             11.0|167647|
|            21|             12.0| 74616|
|            21|              7.0| 52835|
|            21|             10.0| 52585|
+--------------+-----------------+------+



In [42]:
# The most common time of the day for Violation Code = 36

violation_time_bin_36 = spark.sql("SELECT `Violation Code`, ViolationTime24Hr , COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Violation Code` = 36 \
                                        GROUP BY `Violation Code`, ViolationTime24Hr \
                                        ORDER BY count DESC \
                                        LIMIT 6").show()

+--------------+-----------------+------+
|Violation Code|ViolationTime24Hr| count|
+--------------+-----------------+------+
|            36|             12.0|101991|
|            36|             13.0| 98247|
|            36|             11.0| 98077|
|            36|             10.0| 97661|
|            36|              9.0| 91261|
|            36|              8.0| 61166|
+--------------+-----------------+------+



In [43]:
# The most common time of the day for Violation Code = 38

violation_time_bin_38 = spark.sql("SELECT `Violation Code`, ViolationTime24Hr , COUNT(1) As count \
                                        FROM nyc_view_1 \
                                        WHERE `Violation Code` = 38 \
                                        GROUP BY `Violation Code`, ViolationTime24Hr \
                                        ORDER BY count DESC \
                                        LIMIT 6").show()

+--------------+-----------------+-----+
|Violation Code|ViolationTime24Hr|count|
+--------------+-----------------+-----+
|            38|             13.0|73310|
|            38|             14.0|68117|
|            38|             12.0|56040|
|            38|             10.0|54230|
|            38|              9.0|53839|
|            38|             16.0|49694|
+--------------+-----------------+-----+



## 6. Let’s try and find some seasonality in this data:

### First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season.

In [44]:
# ASSUMPTION:
# Considered Seasons as below
# Season 1 -- January to March
# Season 2 -- April to June
# Season 3 -- July to September
# Season 4 -- October to December

In [45]:
nyc_month = violation_time_bin.select('Summons Number','Plate ID','Issue Date','Registration State','Violation Code','Vehicle Body Type','Vehicle Make','Violation Precinct','Issuer Precinct','Violation Time','ViolationTime24Hr','Hour_Bin',month(nyc['Issue Date']).alias('Month'))

In [46]:
# A new column is extracted from the issue date column which stores the corresponding month

nyc_month.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|Hour_Bin|Month|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|             11.0|       3|    6|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|             20.0|       6|    6|
|    1407740258| 2513JMG|2017-01-11|                NY|            78|     

In [47]:
nyc_month.createOrReplaceTempView("nyc_view_season")

In [48]:
# A new column is made from the which stores the corresponding season bin number

season_bin = spark.sql("SELECT *, \
                                CASE \
                                    WHEN (`Month` >= 1 AND `Month` <= 3) THEN 1 \
                                    WHEN (`Month` >= 4 AND `Month` <= 6) THEN 2 \
                                    WHEN (`Month` >= 7 AND `Month` <= 9) THEN 3 \
                                    WHEN (`Month` >= 10 AND `Month` <= 12) THEN 4 \
                                END  AS bin_number \
                                FROM nyc_view_season")

In [49]:
season_bin.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|Hour_Bin|Month|bin_number|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|             11.0|       3|    6|         2|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|             20.0|       6|    6|         2|
|    1407740258| 251

In [50]:
season_bin.createOrReplaceTempView("nyc_view_season_freq")

In [51]:
spark.sql("SELECT bin_number, COUNT(`Summons Number`) AS total_tickets \
                                        FROM nyc_view_season_freq  \
                                        GROUP BY bin_number  \
                                        ORDER BY bin_number ASC").show()

+----------+-------------+
|bin_number|total_tickets|
+----------+-------------+
|         1|      2669069|
|         2|      2760833|
|         3|         1046|
|         4|          970|
+----------+-------------+



### Then, find the three most common violations for each of these seasons.

In [52]:
# Below commands will return 3 most common violation codes for season bin_number=1 

top_3_vios_bin_1 = spark.sql("select bin_number, `Violation Code`, count(`Summons Number`) as count_tckts \
from nyc_view_season_freq where bin_number == 1 \
group by bin_number, `Violation Code` order by bin_number, count_tckts desc limit 3")

In [53]:
top_3_vios_bin_1.show()

+----------+--------------+-----------+
|bin_number|Violation Code|count_tckts|
+----------+--------------+-----------+
|         1|            21|     373874|
|         1|            36|     348240|
|         1|            38|     287000|
+----------+--------------+-----------+



In [54]:
# Below commands will return 3 most common violation codes for season bin_number=2

top_3_vios_bin_2 = spark.sql("select bin_number, `Violation Code`, count(`Summons Number`) as count_tckts \
from nyc_view_season_freq where bin_number == 2 \
group by bin_number, `Violation Code` order by bin_number, count_tckts desc limit 3")

In [55]:
top_3_vios_bin_2.show()

+----------+--------------+-----------+
|bin_number|Violation Code|count_tckts|
+----------+--------------+-----------+
|         2|            21|     393885|
|         2|            36|     314525|
|         2|            38|     255064|
+----------+--------------+-----------+



In [56]:
# Below commands will return 3 most common violation codes for season bin_number=3

top_3_vios_bin_3 = spark.sql("select bin_number, `Violation Code`, count(`Summons Number`) as count_tckts \
from nyc_view_season_freq where bin_number == 3 \
group by bin_number, `Violation Code` order by bin_number, count_tckts desc limit 3")

In [57]:
top_3_vios_bin_3.show()

+----------+--------------+-----------+
|bin_number|Violation Code|count_tckts|
+----------+--------------+-----------+
|         3|            21|        228|
|         3|            46|        219|
|         3|            40|        109|
+----------+--------------+-----------+



In [58]:
# Below commands will return 3 most common violation codes for season bin_number=4 

top_3_vios_bin_4 = spark.sql("select bin_number, `Violation Code`, count(`Summons Number`) as count_tckts \
from nyc_view_season_freq where bin_number == 4 \
group by bin_number, `Violation Code` order by bin_number, count_tckts desc limit 3")

In [59]:
top_3_vios_bin_4.show()

+----------+--------------+-----------+
|bin_number|Violation Code|count_tckts|
+----------+--------------+-----------+
|         4|            46|        219|
|         4|            40|        121|
|         4|            21|        100|
+----------+--------------+-----------+



Combinining All Results.

In [60]:
bin_1_2 = top_3_vios_bin_1.union(top_3_vios_bin_2)

In [61]:
bin_3_4 = top_3_vios_bin_3.union(top_3_vios_bin_4)

In [62]:
total_bin = bin_1_2.union(bin_3_4)

In [63]:
total_bin.show()

+----------+--------------+-----------+
|bin_number|Violation Code|count_tckts|
+----------+--------------+-----------+
|         1|            21|     373874|
|         1|            36|     348240|
|         1|            38|     287000|
|         2|            21|     393885|
|         2|            36|     314525|
|         2|            38|     255064|
|         3|            21|        228|
|         3|            46|        219|
|         3|            40|        109|
|         4|            46|        219|
|         4|            40|        121|
|         4|            21|        100|
+----------+--------------+-----------+



## 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

### Find the total occurrences of the three most common violation codes.

In [64]:
season_bin.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|Hour_Bin|Month|bin_number|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+
|    8478629828| 66623ME|2017-06-14|                NY|            47|             REFG|       MITSU|                14|             14|         1120A|             11.0|       3|    6|         2|
|    5096917368| FZD8593|2017-06-13|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P|             20.0|       6|    6|         2|
|    1407740258| 251

In [65]:
season_bin.createOrReplaceTempView("nyc_most_violation_code")

In [66]:
# Total occurrences of the three most common violation codes

violation_code_top_3 = spark.sql("SELECT `Violation Code`, COUNT(1) As count \
                                        FROM nyc_most_violation_code \
                                        GROUP BY `Violation Code` \
                                        ORDER BY count DESC \
                                        LIMIT 3").show()

+--------------+------+
|Violation Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
+--------------+------+



### Then, visit the website:
### http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
### It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.

In [67]:
# As per the Website, for Violation code = 21, 
# Manhattan 96th St. & below = $65 
# All Other Areas = $45
# Average fine for Violation code 21 = $55

# As per the Website, for Violation code = 36, 
# Manhattan 96th St. & below = $50 
# All Other Areas = $50
# Average fine for Violation code 36 = $50

# As per the Website, for Violation code = 38, 
# Manhattan 96th St. & below = $65 
# All Other Areas = $35
# Average fine for Violation code 38 = $50

### Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.

In [68]:
season_bin.createOrReplaceTempView("amount_violation_code")

In [69]:
# A new column fine_amount is created which will store the fine amount for corresponding Violation Code as per the website given

fine_df = spark.sql("SELECT *, \
                           CASE WHEN `Violation Code` = 21 THEN 55  \
                           WHEN `Violation Code` = 36 THEN 50 \
                           WHEN `Violation Code` = 38 THEN 50 \
                           END  as fine_amount FROM amount_violation_code \
                           WHERE `Violation Code` = 21 OR `Violation Code` = 36 OR `Violation Code` = 38 ")

In [70]:
fine_df.show(5)

+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+-----------+
|Summons Number|Plate ID|Issue Date|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|ViolationTime24Hr|Hour_Bin|Month|bin_number|fine_amount|
+--------------+--------+----------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------------+--------+-----+----------+-----------+
|    4630524241|  HJBP29|2017-02-03|                FL|            36|               4D|         BMW|                 0|              0|         1034A|             10.0|       3|    2|         1|         50|
|    8505131836| 87155MA|2017-05-27|                NY|            38|              VAN|       CHEVR|                 1|              1|         1021A|             10.0

In [71]:
fine_df.createOrReplaceTempView("fine_total")

In [72]:
total_fine_sql = spark.sql("select `Violation Code`, SUM(`fine_amount`) as Sum_Fine_Amount FROM fine_total \
                           GROUP BY `Violation Code`").show()

+--------------+---------------+
|Violation Code|Sum_Fine_Amount|
+--------------+---------------+
|            38|       27103950|
|            36|       33138250|
|            21|       42244785|
+--------------+---------------+



Violation Code that has the highest total collection = 21