In [1]:
# Importing libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import *

In [2]:
# Creating a spark sesssion

spark = SparkSession.builder.appName("Case-Study").getOrCreate()

In [3]:
# Reading the data into a dataframe with inferschema.

df_raw = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [4]:
# Inspecting the dataframe
df_raw

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [5]:
df_raw.show(3)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
+--------------+--------+---------

In [6]:
df_raw.count()

10803028

#### Let's create schema manually, so that we can change the datatypes and name of the columns.

In [7]:
# Defining the schema.
# Reading the data again with pre defined schema to change column names and data-types inferred previously.

mySchema = StructType([
  StructField("Summons_Number", LongType(), True),
  StructField("Plate_ID", StringType(), True),
  StructField("Reg_State", StringType(), True),
  StructField("Issue_Date", DateType(), True),
  StructField("Violation_code", IntegerType(), True),
  StructField("V_body_Type", StringType(), True),
  StructField("V_Make", StringType(), True),
  StructField("Violation_precinct", IntegerType(), True),
  StructField("Issuer_precinct", IntegerType(), True),
  StructField("Violation_time", StringType(), True)
])

# Reading the data into a dataframe with user defined schema.

df = spark.read.format("csv").schema(mySchema)\
   .option("header", "true")\
  .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [8]:
df.count()

# Count is same for inferschema and user defined schema.

10803028

In [9]:
df.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Reg_State: string (nullable = true)
 |-- Issue_Date: date (nullable = true)
 |-- Violation_code: integer (nullable = true)
 |-- V_body_Type: string (nullable = true)
 |-- V_Make: string (nullable = true)
 |-- Violation_precinct: integer (nullable = true)
 |-- Issuer_precinct: integer (nullable = true)
 |-- Violation_time: string (nullable = true)



In [10]:
# Inspecting the dataframe.
df.show(5, False)

+--------------+--------+---------+----------+--------------+-----------+------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Reg_State|Issue_Date|Violation_code|V_body_Type|V_Make|Violation_precinct|Issuer_precinct|Violation_time|
+--------------+--------+---------+----------+--------------+-----------+------+------------------+---------------+--------------+
|5092469481    |GZH7067 |NY       |2016-07-10|7             |SUBN       |TOYOT |0                 |0              |0143A         |
|5092451658    |GZH7067 |NY       |2016-07-08|7             |SUBN       |TOYOT |0                 |0              |0400P         |
|4006265037    |FZX9232 |NY       |2016-08-23|5             |SUBN       |FORD  |0                 |0              |0233P         |
|8478629828    |66623ME |NY       |2017-06-14|47            |REFG       |MITSU |14                |14             |1120A         |
|7868300310    |37033JV |NY       |2016-11-21|69            |DELV       |INTER |13 

In [11]:
# Analysing the data for numeric types
df.describe('Summons_Number', 'Violation_code', 'Violation_precinct','Issuer_precinct' ).show()

+-------+--------------------+------------------+------------------+-----------------+
|summary|      Summons_Number|    Violation_code|Violation_precinct|  Issuer_precinct|
+-------+--------------------+------------------+------------------+-----------------+
|  count|            10803028|          10803028|          10803028|         10803028|
|   mean|6.8174470290656595E9|34.599430455979565| 45.01216260848347|46.82931211508477|
| stddev| 2.320233962328229E9|19.359868716323483|40.552560268435805|62.66703577269466|
|    min|          1002884949|                 0|                 0|                0|
|    max|          8585600044|                99|               933|              997|
+-------+--------------------+------------------+------------------+-----------------+



In [12]:
# From above we can see that Issue_Date lies prior to 2017 as well. We need to use only 2017 data in our analysis.

df_2017 = df.filter(year(df.Issue_Date) == 2017)

In [13]:
# Counting the number of rows.

df_2017.count()

5431918

In [14]:
# Checking range of issue_date after filtering.
df_2017.select(min("Issue_Date"), max("Issue_Date")).first()

Row(min(Issue_Date)=datetime.date(2017, 1, 1), max(Issue_Date)=datetime.date(2017, 12, 31))

In [15]:
# Creating a temp view for our analysis.
df_2017.createOrReplaceTempView("parking_data")

## Some Data Quality checks

In [16]:
spark.sql("SELECT Summons_Number, count(*) as count FROM parking_data \
          where (Summons_Number is null or lower(Summons_Number)='nan') \
          group by Summons_Number").show()

+--------------+-----+
|Summons_Number|count|
+--------------+-----+
+--------------+-----+



In [17]:
spark.sql("SELECT Reg_State, count(*) as count FROM parking_data where (Reg_State is null or lower(Reg_State)='nan') \
          group by Reg_State").show()

+---------+-----+
|Reg_State|count|
+---------+-----+
+---------+-----+



In [18]:
spark.sql("SELECT Issue_Date, count(*) as count FROM parking_data where (Issue_Date is null or lower(Issue_Date)='nan') \
          group by Issue_Date").show()

+----------+-----+
|Issue_Date|count|
+----------+-----+
+----------+-----+



In [19]:
spark.sql("SELECT Violation_code, count(*) as count FROM parking_data \
          where (Violation_code is null or lower(Violation_code)='nan') \
          group by Violation_code").show()

+--------------+-----+
|Violation_code|count|
+--------------+-----+
+--------------+-----+



In [20]:
spark.sql("SELECT V_body_Type, count(*) as count FROM parking_data where (V_body_Type is null or lower(V_body_Type)='nan') \
          group by V_body_Type").show()

+-----------+-----+
|V_body_Type|count|
+-----------+-----+
|        nan|20201|
|        NAN|    1|
+-----------+-----+



In [21]:
spark.sql("SELECT V_Make, count(*) as count FROM parking_data where (V_Make is null or lower(V_Make)='nan') \
          group by V_Make").show()

+------+-----+
|V_Make|count|
+------+-----+
|   nan|38509|
+------+-----+



In [22]:
spark.sql("SELECT Violation_precinct, count(*) as count FROM parking_data \
          where (Violation_precinct is null or lower(Violation_precinct)='nan') \
          group by Violation_precinct").show()

+------------------+-----+
|Violation_precinct|count|
+------------------+-----+
+------------------+-----+



In [23]:
spark.sql("SELECT Issuer_precinct, count(*) as count FROM parking_data \
          where (Issuer_precinct is null or lower(Issuer_precinct)='nan') \
          group by Issuer_precinct").show()

+---------------+-----+
|Issuer_precinct|count|
+---------------+-----+
+---------------+-----+



In [24]:
spark.sql("SELECT Violation_time, count(*) as count FROM parking_data \
          where (Violation_time is null or lower(Violation_time)='nan') \
          group by Violation_time").show()

+--------------+-----+
|Violation_time|count|
+--------------+-----+
|           nan|   16|
+--------------+-----+



#### There are nulls/nans only in the vehicle_body_type and violation_time which we will handle (if required) in later part of the code

In [25]:
### We do not need plate Id in our analysis, so dropping these colums

In [26]:
columns_to_drop = ['Plate_ID']
df_2017 = df_2017.drop(*columns_to_drop)

In [27]:
# Updating the view definition
df_2017.createOrReplaceTempView("parking_data")

### 1. Find the total number of tickets for the year.

In [28]:
df_2017.agg(countDistinct(col("Summons_Number")).alias("count")).show()

+-------+
|  count|
+-------+
|5431918|
+-------+



We can see that summons_number is unique as the number of records and distinct count is the same. 
So, we can say that total number of tickets in the year = 5431918.


### 2. Find out the number of unique states from where the cars that got parking tickets came.

In [29]:
spark.sql('SELECT Reg_State,count(*) as count FROM parking_data group by Reg_State order by count(*) desc').show(100)

# We have used show(100) as there are < 100 states.

+---------+-------+
|Reg_State|  count|
+---------+-------+
|       NY|4273951|
|       NJ| 475825|
|       PA| 140286|
|       CT|  70403|
|       FL|  69468|
|       IN|  45525|
|       MA|  38941|
|       VA|  34367|
|       MD|  30213|
|       NC|  27152|
|       TX|  18827|
|       IL|  18666|
|       GA|  17537|
|       99|  16055|
|       AZ|  12379|
|       OH|  12281|
|       CA|  12153|
|       ME|  10806|
|       SC|  10395|
|       MN|  10083|
|       OK|   9088|
|       TN|   8514|
|       DE|   7905|
|       MI|   7231|
|       RI|   5814|
|       NH|   4119|
|       VT|   3683|
|       AL|   3178|
|       WA|   3052|
|       OR|   2622|
|       MO|   2483|
|       ON|   2460|
|       WI|   2127|
|       QB|   1998|
|       IA|   1938|
|       DC|   1929|
|       CO|   1841|
|       KY|   1795|
|       DP|   1794|
|       LA|   1689|
|       MS|   1582|
|       WV|   1265|
|       AR|    994|
|       SD|    859|
|       NM|    792|
|       ID|    763|
|       NV|    725|


In [30]:
# Number of Unique States
spark.sql('SELECT distinct Reg_State FROM parking_data').count()

# We can see the names of the states in the above sql query.

65

In [31]:
# Replacing Reg_State = 99 with NY as it has highest records.

df_2017_state = df_2017.withColumn("Reg_State", when(df_2017["Reg_State"] == '99', 'NY').otherwise(df["Reg_State"]))

In [32]:
# Updating the view definition
df_2017_state.createOrReplaceTempView("parking_data")

In [33]:
# Viewing statewise counts again. We can see that count for NY has increased 
spark.sql('SELECT Reg_State,count(*) as count FROM parking_data group by Reg_State order by count(*) desc').show(100)

+---------+-------+
|Reg_State|  count|
+---------+-------+
|       NY|4290006|
|       NJ| 475825|
|       PA| 140286|
|       CT|  70403|
|       FL|  69468|
|       IN|  45525|
|       MA|  38941|
|       VA|  34367|
|       MD|  30213|
|       NC|  27152|
|       TX|  18827|
|       IL|  18666|
|       GA|  17537|
|       AZ|  12379|
|       OH|  12281|
|       CA|  12153|
|       ME|  10806|
|       SC|  10395|
|       MN|  10083|
|       OK|   9088|
|       TN|   8514|
|       DE|   7905|
|       MI|   7231|
|       RI|   5814|
|       NH|   4119|
|       VT|   3683|
|       AL|   3178|
|       WA|   3052|
|       OR|   2622|
|       MO|   2483|
|       ON|   2460|
|       WI|   2127|
|       QB|   1998|
|       IA|   1938|
|       DC|   1929|
|       CO|   1841|
|       KY|   1795|
|       DP|   1794|
|       LA|   1689|
|       MS|   1582|
|       WV|   1265|
|       AR|    994|
|       SD|    859|
|       NM|    792|
|       ID|    763|
|       NV|    725|
|       KS|    706|


In [34]:
# Number of Unique States after changes
spark.sql('SELECT distinct Reg_State FROM parking_data').count()

# We can see the names of the states in the above sql query.
# Count of the state is decreased by 1.

64

### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [35]:
# Top 5 violation codes
spark.sql('SELECT Violation_code,count(*) as count FROM parking_data group by Violation_code order by count(*) desc').show(5)

+--------------+------+
|Violation_code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
|            14|476664|
|            20|319646|
+--------------+------+
only showing top 5 rows



### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 
#### a. 'vehicle body type'

In [36]:
# Top 5 vehicle body type 
spark.sql('SELECT V_body_Type,count(*) as count FROM parking_data group by V_body_Type order by count(*) desc').show(5)

+-----------+-------+
|V_body_Type|  count|
+-----------+-------+
|       SUBN|1883954|
|       4DSD|1547312|
|        VAN| 724029|
|       DELV| 358984|
|        SDN| 194197|
+-----------+-------+
only showing top 5 rows



### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 
#### b. 'vehicle make'

In [37]:
# Top 5 vehicle make type 
spark.sql('SELECT V_Make,count(*) as count FROM parking_data group by V_Make order by count(*) desc').show(5)

+------+------+
|V_Make| count|
+------+------+
|  FORD|636844|
| TOYOT|605291|
| HONDA|538884|
| NISSA|462017|
| CHEVR|356032|
+------+------+
only showing top 5 rows



### 3. A precinct is a police station that has a certain zone of the city under its command.
#### a. 'Violation Precinct' 

In [38]:
# Top 5 Violation_precinct type 
spark.sql('SELECT Violation_precinct,count(*) as count FROM parking_data group by Violation_precinct order by count(*) desc').show(6)

+------------------+------+
|Violation_precinct| count|
+------------------+------+
|                 0|925596|
|                19|274445|
|                14|203553|
|                 1|174702|
|                18|169131|
|               114|147444|
+------------------+------+
only showing top 6 rows



### 3. A precinct is a police station that has a certain zone of the city under its command.
#### b. 'Issuer Precinct'

In [39]:
# Top 5 Issuer_precinct type 
spark.sql('SELECT Issuer_precinct,count(*) as count FROM parking_data group by Issuer_precinct order by count(*) desc').show(6)

# Taking 6 as 0 is errorneous records

+---------------+-------+
|Issuer_precinct|  count|
+---------------+-------+
|              0|1078406|
|             19| 266961|
|             14| 200495|
|              1| 168740|
|             18| 162994|
|            114| 144054|
+---------------+-------+
only showing top 6 rows



### Observations: 'Violation Precinct' & 'Issuer Precinct' 
1. There are high number of erroneous records in violation precinct and issuer precinct.
2. We can see that most parking violations occured in precinct 19, 14, 1, 18, 114 and most tickets were issued in these precincts only. 
3. It seems that these precincts are the most busiest in the entire New York in terms of traffic violations.

### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

In [40]:
# Find the violation code frequencies for three precincts that have issued the most number of tickets
# top 3 precincts are 19, 14, 1 except for 0.


In [41]:
# Checking for precinct = 19
# Total tickets issued by this precinct = 266961
spark.sql('SELECT Violation_code, Issuer_precinct, count(*) as count, round(count(*)/266961*100, 2)  as fraction_of_total \
          from parking_data \
           WHERE Issuer_precinct = 19 \
           GROUP BY Violation_code, Issuer_precinct \
           ORDER BY count desc').show(5)

+--------------+---------------+-----+-----------------+
|Violation_code|Issuer_precinct|count|fraction_of_total|
+--------------+---------------+-----+-----------------+
|            46|             19|48445|            18.15|
|            38|             19|36386|            13.63|
|            37|             19|36056|            13.51|
|            14|             19|29797|            11.16|
|            21|             19|28415|            10.64|
+--------------+---------------+-----+-----------------+
only showing top 5 rows



In [42]:
# Checking for precinct = 14
# Total tickets issued bu this precinct = 200495
spark.sql('SELECT Violation_code, Issuer_precinct, count(*) as count, round(count(*)/200495*100, 2)  as fraction_of_total \
          from parking_data \
           WHERE Issuer_precinct = 14 \
           GROUP BY Violation_code, Issuer_precinct \
           ORDER BY count desc').show(5)

+--------------+---------------+-----+-----------------+
|Violation_code|Issuer_precinct|count|fraction_of_total|
+--------------+---------------+-----+-----------------+
|            14|             14|45036|            22.46|
|            69|             14|30464|            15.19|
|            31|             14|22555|            11.25|
|            47|             14|18364|             9.16|
|            42|             14|10027|              5.0|
+--------------+---------------+-----+-----------------+
only showing top 5 rows



In [43]:
# Checking for precinct = 1
# Total tickets issued by this precinct = 168740
spark.sql('SELECT Violation_code, Issuer_precinct, count(*) as count, round(count(*)/168740*100, 2)  as fraction_of_total \
          from parking_data \
           WHERE Issuer_precinct = 1 \
           GROUP BY Violation_code, Issuer_precinct \
           ORDER BY count desc').show(5)

+--------------+---------------+-----+-----------------+
|Violation_code|Issuer_precinct|count|fraction_of_total|
+--------------+---------------+-----+-----------------+
|            14|              1|38354|            22.73|
|            16|              1|19081|            11.31|
|            20|              1|15408|             9.13|
|            46|              1|12745|             7.55|
|            38|              1| 8535|             5.06|
+--------------+---------------+-----+-----------------+
only showing top 5 rows



### Observations: Violation_code
#### a. Do these precinct zones have an exceptionally high frequency of certain violation codes?
1. Overall top violation codes for all precincts as seen from query done earlier is (21, 36, 38, 14, 20) in this order desc.
2. For precinct 19, top 5 violation codes (46, 38, 37, 14, 21)  in given order accounts for 67.09% of total tickets issues by this precinct. Here 46, 38 and 37, 14 have slightly higher fractions than others.
3. For precinct 14, top 5 violation codes (14, 69, 31, 47, 42)  in given order accounts for 63.06% of total tickets issues by this precinct. Here 14, 69 and 31 have slightly higher fractions than others. 
4. For precinct 1, top 5 violation codes (14, 16, 20, 46, 38)  in given order accounts for 55.78% of total tickets issues by this precinct. Here 14 and 16 have slightly higher fractions than others. 
    
#### b. Are these codes common across precincts? 
Violation code 14 is in top 5 of all three precincts 19, 14 and 1.
 

### 5. Find out the properties of parking violations across different times of the day.

### 5(a). Find a way to deal with missing values, if any.


In [44]:
# Checking for nulls
spark.sql('SELECT count(*) as count FROM parking_data where Violation_time is null ').show()

+-----+
|count|
+-----+
|    0|
+-----+



In [45]:
# Using isNull()
df_2017_state[df_2017_state.Violation_time.isNull()].show()

+--------------+---------+----------+--------------+-----------+------+------------------+---------------+--------------+
|Summons_Number|Reg_State|Issue_Date|Violation_code|V_body_Type|V_Make|Violation_precinct|Issuer_precinct|Violation_time|
+--------------+---------+----------+--------------+-----------+------+------------------+---------------+--------------+
+--------------+---------+----------+--------------+-----------+------+------------------+---------------+--------------+



In [46]:
# There are no nulls in violation time.
# Now we would check for NaN.

spark.sql("SELECT count(*) as count FROM parking_data where lower(Violation_time) = 'nan' ").show()

+-----+
|count|
+-----+
|   16|
+-----+



In [47]:
# There are only 16 such records. So, we can drop them and do the further analysis.

parking_date_noNull=df_2017_state.filter(lower(df_2017_state.Violation_time) != 'nan')
parking_date_noNull.count()

5431902

In [48]:
# Replacing the existing view with filtered dataframe

parking_date_noNull.createOrReplaceTempView("parking_data")

In [49]:
# Analysing for inconsistencies 

spark.sql("SELECT min(cast(substr(Violation_time, 1,2) as int)) as min_hour, \
          max(cast(substr(Violation_time, 1,2) as int)) as max_hour, \
          substr(Violation_time, 5,1) as AM_PM \
          FROM parking_data  group by substr(Violation_time, 5,1) ").show()

+--------+--------+-----+
|min_hour|max_hour|AM_PM|
+--------+--------+-----+
|       4|       4|    0|
|       0|      12|    A|
|       0|      87|    P|
|       3|      10|     |
+--------+--------+-----+



In [50]:
# Check the count of in-consistent timestamps

spark.sql("SELECT count(*) as count FROM parking_data where \
          (substr(Violation_time, 5,1) not in ('A', 'P') \
          or cast(substr(Violation_time, 1,2) as int) > 12 )").show()

+-----+
|count|
+-----+
|   63|
+-----+



In [51]:
# There are only 63 such records. So, we can drop them and do the further analysis.

df_filtered_timestamp = spark.sql("SELECT * FROM parking_data where not \
          (substr(Violation_time, 5,1) not in ('A', 'P') \
          or cast(substr(Violation_time, 1,2) as int) > 12 )")

In [52]:
# Updating the view definition

df_filtered_timestamp.createOrReplaceTempView("parking_data")

In [53]:
spark.sql('select count(*) as count from parking_data').show()

# Only 63 rows are removed.

+-------+
|  count|
+-------+
|5431839|
+-------+



In [54]:
# Checking again

spark.sql("SELECT min(cast(substr(Violation_time, 1,2) as int)) as min_hour, \
          max(cast(substr(Violation_time, 1,2) as int)) as max_hour, \
          substr(Violation_time, 5,1) as AM_PM \
          FROM parking_data  group by substr(Violation_time, 5,1) ").show()

+--------+--------+-----+
|min_hour|max_hour|AM_PM|
+--------+--------+-----+
|       0|      12|    A|
|       0|      12|    P|
+--------+--------+-----+



### 5(b). The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

In [55]:
# Creating  buckets based on hours
# Taking assumption for violation time of format 00xxP as Afternoon as considering it at 12 PM.

df_bucketed = spark.sql("select parking_data.*, \
                CASE WHEN substr(Violation_time,1,2) between 00 and 03\
                                        AND substr(Violation_time,5,1)='A' THEN 'Late_Night' \
                                       WHEN substr(Violation_time,1,2)=12\
                                        AND substr(Violation_time,5,1)='A' THEN 'Late_Night' \
                                     WHEN substr(Violation_time,1,2) between 04 and 07\
                                        AND substr(Violation_time,5,1)='A' THEN 'Early_Morning'\
                                     WHEN substr(Violation_time,1,2) between 08 and 11\
                                        AND substr(Violation_time,5,1)='A' THEN 'Morning'\
                                     WHEN substr(Violation_time,1,2)= 12\
                                        AND substr(Violation_time,5,1)='P' THEN 'After_Noon'\
                                     WHEN substr(Violation_time,1,2) between 00 and 03\
                                        AND substr(Violation_time,5,1)='P' THEN 'After_Noon'\
                                     WHEN substr(Violation_time,1,2) between 04 and 07\
                                        AND substr(Violation_time,5,1)='P' THEN 'Evening' \
                                     WHEN substr(Violation_time,1,2) between 08 and 11\
                                        AND substr(Violation_time,5,1)='P' THEN 'Night'\
                                     ELSE 'No_time' \
                                        END as Violation_Time_Of_Day from parking_data ")

In [56]:
# Updating the view
df_bucketed.createOrReplaceTempView("parking_data")

### 5(c).  Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.

#### The different time of the day

In [57]:
spark.sql("SELECT distinct Violation_Time_Of_Day FROM parking_data ").show()

+---------------------+
|Violation_Time_Of_Day|
+---------------------+
|              Evening|
|           After_Noon|
|              Morning|
|           Late_Night|
|        Early_Morning|
|                Night|
+---------------------+



### 5(c). Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.

#### For each of these groups, find the three most commonly occurring violations.

In [58]:
# Creating new view for analysis
# Early_Morning
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="Early_Morning"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+-----+
|Violation_Time_Of_Day|Violation_code|count|
+---------------------+--------------+-----+
|        Early_Morning|            14|74114|
|        Early_Morning|            40|60652|
|        Early_Morning|            21|57897|
+---------------------+--------------+-----+
only showing top 3 rows



In [59]:
# Creating new view for analysis
# Morning
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="Morning"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+------+
|Violation_Time_Of_Day|Violation_code| count|
+---------------------+--------------+------+
|              Morning|            21|598069|
|              Morning|            36|348165|
|              Morning|            38|176570|
+---------------------+--------------+------+
only showing top 3 rows



In [60]:
# Creating new view for analysis
# After_Noon
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="After_Noon"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+------+
|Violation_Time_Of_Day|Violation_code| count|
+---------------------+--------------+------+
|           After_Noon|            36|286284|
|           After_Noon|            38|240721|
|           After_Noon|            37|167026|
+---------------------+--------------+------+
only showing top 3 rows



In [61]:
# Creating new view for analysis
# Evening
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="Evening"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+------+
|Violation_Time_Of_Day|Violation_code| count|
+---------------------+--------------+------+
|              Evening|            38|102855|
|              Evening|            14| 75902|
|              Evening|            37| 70345|
+---------------------+--------------+------+
only showing top 3 rows



In [62]:
# Creating new view for analysis
# Night
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="Night"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+-----+
|Violation_Time_Of_Day|Violation_code|count|
+---------------------+--------------+-----+
|                Night|             7|26293|
|                Night|            40|22337|
|                Night|            14|21045|
+---------------------+--------------+-----+
only showing top 3 rows



In [63]:
# Creating new view for analysis
# Late_Night
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            where Violation_Time_Of_Day="Late_Night"\
            group by Violation_code, Violation_Time_Of_Day order by Violation_Time_Of_Day, count desc, Violation_code ').show(3)

+---------------------+--------------+-----+
|Violation_Time_Of_Day|Violation_code|count|
+---------------------+--------------+-----+
|           Late_Night|            21|36958|
|           Late_Night|            40|25867|
|           Late_Night|            78|15528|
+---------------------+--------------+-----+
only showing top 3 rows



### 5(d).  Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [64]:
# Most commonly occuring violations for each timed buckets
# We know that most commonly occuring violation codes are 21, 26, 38 (from earlier queries/)

spark.sql('SELECT Violation_Time_Of_Day,count(*) as count FROM parking_data  \
            WHERE Violation_code in (21,36,38) \
            group by Violation_Time_Of_Day order by count desc ').show()

+---------------------+-------+
|Violation_Time_Of_Day|  count|
+---------------------+-------+
|              Morning|1122804|
|           After_Noon| 601700|
|              Evening| 116648|
|        Early_Morning|  73952|
|           Late_Night|  37270|
|                Night|  20531|
+---------------------+-------+



#### Highest number of the violations have happened during the Morning time i.e. between 08:00 am and 11:00 am.

In [65]:
spark.sql('SELECT Violation_Time_Of_Day, Violation_code, count(*) as count FROM parking_data  \
            WHERE Violation_code in (21,36,38) \
            group by Violation_Time_Of_Day, Violation_code order by Violation_Time_Of_Day, count desc ').show()

+---------------------+--------------+------+
|Violation_Time_Of_Day|Violation_code| count|
+---------------------+--------------+------+
|           After_Noon|            36|286284|
|           After_Noon|            38|240721|
|           After_Noon|            21| 74695|
|        Early_Morning|            21| 57897|
|        Early_Morning|            36| 14782|
|        Early_Morning|            38|  1273|
|              Evening|            38|102855|
|              Evening|            36| 13534|
|              Evening|            21|   259|
|           Late_Night|            21| 36958|
|           Late_Night|            38|   312|
|              Morning|            21|598069|
|              Morning|            36|348165|
|              Morning|            38|176570|
|                Night|            38| 20347|
|                Night|            21|   184|
+---------------------+--------------+------+



### 6. Let’s try and find some seasonality in this data:


#### Creating  buckets based on months
This data is for New York. Hence defining seasons based on following:
1. Winter: December, January, February
2. Spring: March, April, May
3. Summer: June, July, August
4. Fall: September, October, November

In [66]:
df_bucketed = spark.sql("select parking_data.*, \
                CASE WHEN substr(substr(Issue_Date,6,7),1,2) between 01 and 02 THEN 'Winter' \
                     WHEN substr(substr(Issue_Date,6,7),1,2) between 03 and 05 THEN 'Spring' \
                    WHEN substr(substr(Issue_Date,6,7),1,2) between 06 and 08  THEN 'Summer' \
                    WHEN substr(substr(Issue_Date,6,7),1,2) between 09 and 11  THEN 'Fall' \
                    WHEN substr(substr(Issue_Date,6,7),1,2) = 12  THEN 'Winter' \
                ELSE 'No_Season' END as Violation_Season from parking_data ")

In [67]:
# Updating the view
df_bucketed.createOrReplaceTempView("parking_data")

### 6(a). First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. 

#### The different seasons of the year

In [68]:
spark.sql("SELECT distinct Violation_Season FROM parking_data ").show()

+----------------+
|Violation_Season|
+----------------+
|          Spring|
|          Summer|
|            Fall|
|          Winter|
+----------------+



In [69]:
# Most commonly occuring violations for each season
spark.sql('SELECT Violation_Season, count(*) as count FROM parking_data  \
            group by Violation_Season order by count desc ').show()

+----------------+-------+
|Violation_Season|  count|
+----------------+-------+
|          Spring|2873337|
|          Winter|1704669|
|          Summer| 852854|
|            Fall|    979|
+----------------+-------+



Highest number of violations occurs in Spring months followed by Winters.

### 6(b). Then, find the three most common violations for each of these seasons.


In [70]:
# Most commonly occuring violations for each season
## Spring
spark.sql('SELECT Violation_Season, Violation_code, count(*) as count FROM parking_data  \
            WHERE Violation_Season="Spring"\
            group by Violation_Season, Violation_code \
          order by Violation_Season, count desc ').show(3)

+----------------+--------------+------+
|Violation_Season|Violation_code| count|
+----------------+--------------+------+
|          Spring|            21|402407|
|          Spring|            36|344834|
|          Spring|            38|271167|
+----------------+--------------+------+
only showing top 3 rows



In [71]:
# Most commonly occuring violations for each season
## Winter
spark.sql('SELECT Violation_Season, Violation_code, count(*) as count FROM parking_data  \
            WHERE Violation_Season="Winter"\
            group by Violation_Season, Violation_code \
          order by Violation_Season, count desc ').show(3)

+----------------+--------------+------+
|Violation_Season|Violation_code| count|
+----------------+--------------+------+
|          Winter|            21|238180|
|          Winter|            36|221268|
|          Winter|            38|187385|
+----------------+--------------+------+
only showing top 3 rows



In [72]:
# Most commonly occuring violations for each season
## Summer
spark.sql('SELECT Violation_Season, Violation_code, count(*) as count FROM parking_data  \
            WHERE Violation_Season="Summer"\
            group by Violation_Season, Violation_code \
          order by Violation_Season, count desc ').show(3)

+----------------+--------------+------+
|Violation_Season|Violation_code| count|
+----------------+--------------+------+
|          Summer|            21|127347|
|          Summer|            36| 96663|
|          Summer|            38| 83518|
+----------------+--------------+------+
only showing top 3 rows



In [73]:
# Most commonly occuring violations for each season
## Fall
spark.sql('SELECT Violation_Season, Violation_code, count(*) as count FROM parking_data  \
            WHERE Violation_Season="Fall"\
            group by Violation_Season, Violation_code \
          order by Violation_Season, count desc ').show(3)

+----------------+--------------+-----+
|Violation_Season|Violation_code|count|
+----------------+--------------+-----+
|            Fall|            46|  231|
|            Fall|            21|  128|
|            Fall|            40|  116|
+----------------+--------------+-----+
only showing top 3 rows



Top Violation codes are consistent for Spring, Summer and Winter and are 21, 36, 38 in descending order. Whereas during fall season top violation codes are 46, 21, 40 in descending order.

### 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

Then, visit the website:
http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
Using this information, 


### 7(a). Find the total occurrences of the three most common violation codes.

In [74]:
# Most commonly occuring violations overall after all above steps.
## 
spark.sql('SELECT Violation_code, count(*) as count FROM parking_data  \
            group by Violation_code \
          order by count desc ').show(3)

+--------------+------+
|Violation_code| count|
+--------------+------+
|            21|768062|
|            36|662765|
|            38|542078|
+--------------+------+
only showing top 3 rows



### The top three violations are 
##### 21: Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device.
##### 36: Exceeding the posted speed limit in or near a designated school zone.
##### 38: Failing to show a receipt or tag in the windshield.

#### The respective fines for each of the violations:

We are taking average amounts for each of the violation codes based on the above page.

21: $55

36: $50

38: $50

### 7(b). Find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.

In [75]:
# Most commonly occuring violations overall
## 
spark.sql('SELECT Violation_code, count, \
                     CASE WHEN Violation_code=21 THEN "Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device."\
                           WHEN Violation_code=36 THEN "Exceeding the posted speed limit in or near a designated school zone."\
                           WHEN Violation_code=38 THEN "Failing to show a receipt or tag in the windshield."\
                      END AS Parking_Violation_Definition, \
                      CASE WHEN Violation_code=21 THEN "$55"\
                           WHEN Violation_code=36 THEN "$50"\
                           WHEN Violation_code=38 THEN "$50"\
                      END AS Parking_Fine_Per_Violation, \
                      CASE WHEN Violation_code=21 THEN "$"||count*55\
                           WHEN Violation_code=36 THEN "$"||count*50\
                           WHEN Violation_code=38 THEN "$"||count*50\
                      END AS Total_Parking_Fine \
          FROM (SELECT Violation_code, count(*) as count FROM parking_data  \
                  where Violation_code in (21,36,38) \
            group by Violation_code)  order by Total_Parking_Fine desc').show()

+--------------+------+----------------------------+--------------------------+------------------+
|Violation_code| count|Parking_Violation_Definition|Parking_Fine_Per_Violation|Total_Parking_Fine|
+--------------+------+----------------------------+--------------------------+------------------+
|            21|768062|        Street Cleaning: ...|                       $55|         $42243410|
|            36|662765|        Exceeding the pos...|                       $50|         $33138250|
|            38|542078|        Failing to show a...|                       $50|         $27103900|
+--------------+------+----------------------------+--------------------------+------------------+



#### For violation code 21 (Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device) is the violation code which has the total collection of $ 42,243,410.

### 7(c). What can you intuitively infer from these findings?

#### Inference:
     1) Most commonly occuring parking violations are 21, 36 and 38.
     2) Traffic violations in Spring are highest followed by Winter months.
     3) In the morning time i.e. 08 am to 11am most of the traffic violations are happening.
     

In [76]:
spark.stop()