</br> <font color= 'green' size = 4> <b> NYC Parking Tickets - Analysis  </b> </font>

</br> <font color ='blue'> Analyze NYC parking ticket data with Spark

  

In [2]:
# Create Spark Session

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame and Sql") \
    .getOrCreate()

In [3]:
# Read data into dataframe 'df'
df = spark.read.option("header","true").csv("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [4]:
# Check the dataframe contents
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

In [5]:
# Check the number of records in dataframe

df.count()

10803028

In [4]:
# Check the null values in dataframe
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



<B> Create sql view for the dataframe </B>

In [5]:
df.createOrReplaceTempView("sql_view")

<B> select only the data from year 2017 </B>

In [6]:
df_2017 = spark.sql("""select * from sql_view where `Issue Date` like '2017%' """)

<B>Replace the sql view with latest data  </B>

In [7]:
df_2017.createOrReplaceTempView("sql_view")

Examin the Data
<B> 1. Find the total number of tickets for the year. </B>


In [8]:
spark.sql("""select count(distinct(`Summons Number`)) from sql_view""").show()

+------------------------------+
|count(DISTINCT Summons Number)|
+------------------------------+
|                       5431918|
+------------------------------+



In [9]:
spark.sql("""select count((`Summons Number`)) from sql_view""").show()

+---------------------+
|count(Summons Number)|
+---------------------+
|              5431918|
+---------------------+



Observation: There is no duplicate values present in the data 

<B> 2.1 Find out the number of unique states from where the cars that got parking tickets came. (Hint: Use the column 'Registration State'.) </B> 


In [10]:
#Find out the number of unique states from where the cars that got parking tickets came


spark.sql("""select count(distinct(`Registration State`)) as unique_states from sql_view""").show()

+-------------+
|unique_states|
+-------------+
|           65|
+-------------+



<B> 2.2 There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. </B>

In [11]:
# Find the state with maximum entries
spark.sql("""select `Registration State`,count(*) as number_of_entries from sql_view group by `Registration State` order by count(*) desc limit 1""").show(100)

+------------------+-----------------+
|Registration State|number_of_entries|
+------------------+-----------------+
|                NY|          4273951|
+------------------+-----------------+



In [12]:
# Replace '99' with 'NY'
df_2017_updated = df_2017.withColumn("Registration state",when(df["Registration state"] == '99','NY').otherwise(df["Registration state"]))


In [13]:
# Update the sql view with new data
df_2017_updated.createOrReplaceTempView("sql_view")

In [14]:
#Find out the number of unique states from where the cars that got parking tickets came after replacing the erreneous state '99'


spark.sql("""select count(distinct(`Registration State`)) as unique_states from sql_view""").show()

+-------------+
|unique_states|
+-------------+
|           64|
+-------------+



### Aggregation tasks



<b>1. How often does each violation code occur? Display the frequency of the top five violation codes. </b>

In [15]:
spark.sql("""select `Violation Code`,count(`Violation Code`) as Violation_Code_Frequency from sql_view group by `Violation Code` order by count(*) desc limit 5 """).show()

+--------------+------------------------+
|Violation Code|Violation_Code_Frequency|
+--------------+------------------------+
|            21|                  768087|
|            36|                  662765|
|            38|                  542079|
|            14|                  476664|
|            20|                  319646|
+--------------+------------------------+



<b> 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'?  </b>

In [16]:
# Vehicle body type
spark.sql("""select `vehicle body type`,count(`vehicle body type`) as Vehicle_Body_Frequency from sql_view group by `vehicle body type` order by count(*) desc limit 5 """).show()

+-----------------+----------------------+
|vehicle body type|Vehicle_Body_Frequency|
+-----------------+----------------------+
|             SUBN|               1883954|
|             4DSD|               1547312|
|              VAN|                724029|
|             DELV|                358984|
|              SDN|                194197|
+-----------------+----------------------+



In [17]:
# Vehicle make
spark.sql("""select `vehicle make`,count(`vehicle make`) as Vehicle_Make_Frequency from sql_view group by `vehicle make` order by count(*) desc limit 5 """).show()

+------------+----------------------+
|vehicle make|Vehicle_Make_Frequency|
+------------+----------------------+
|        FORD|                636844|
|       TOYOT|                605291|
|       HONDA|                538884|
|       NISSA|                462017|
|       CHEVR|                356032|
+------------+----------------------+



<b> 3.1 Find the top 5 tickets for Violation Precinct </b>

<b> note: Violation Precint entry with  '0'  values are erreneous records </b>

In [18]:
# Top 5 tickets for violation precinct
spark.sql("""select `Violation Precinct`,count(`Violation Precinct`) as Violation_Precinct_Frequency from sql_view group by `Violation Precinct` order by count(*) desc limit 6 """).show()

+------------------+----------------------------+
|Violation Precinct|Violation_Precinct_Frequency|
+------------------+----------------------------+
|                 0|                      925596|
|                19|                      274445|
|                14|                      203553|
|                 1|                      174702|
|                18|                      169131|
|               114|                      147444|
+------------------+----------------------------+



<b> 3.2 Find the top 5 tickets for Issuer Precinct </b>

In [19]:
spark.sql("""select `Issuer Precinct`,count(`Issuer Precinct`) as Violation_Precinct_Frequency from sql_view group by `Issuer Precinct` order by count(*) desc limit 6 """).show()

+---------------+----------------------------+
|Issuer Precinct|Violation_Precinct_Frequency|
+---------------+----------------------------+
|              0|                     1078406|
|             19|                      266961|
|             14|                      200495|
|              1|                      168740|
|             18|                      162994|
|            114|                      144054|
+---------------+----------------------------+



<b> Assumption:   Remove the erreneous records (records with precint entry '0')</b>

In [20]:
# Removing erreneous records
df_2017_updated1 = spark.sql("""select * from sql_view where `Issuer Precinct` <> 0 and `Violation Precinct` <> 0""")

In [21]:
# Update the sql view with new data
df_2017_updated1.createOrReplaceTempView("sql_view")

In [22]:
spark.sql("""select count(*) from sql_view where `Violation Precinct` == 0""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [23]:
spark.sql("""select count(*) from sql_view where `Issuer Precinct` == 0""").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



<b> 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? </b>

From 3 section , it can be noted that 19 , 14 , 1 are the top most precincts , find the voilation codes for these three

In [24]:
spark.sql("""select `Violation Code`,count(`Violation Code`) as Violation_Precinct_Frequency from sql_view where `Violation Precinct` = 19 group by `Violation Code` order by count(*) desc  limit 7""").show()

+--------------+----------------------------+
|Violation Code|Violation_Precinct_Frequency|
+--------------+----------------------------+
|            46|                       50705|
|            38|                       37482|
|            37|                       36468|
|            14|                       30330|
|            21|                       28666|
|            20|                       15087|
|            40|                       11503|
+--------------+----------------------------+



In [25]:
VC_19 = {46,38,37,14,21,20,40}
type(VC_19)

set

In [26]:
spark.sql("""select `Violation Code`,count(`Violation Code`) as Violation_Precinct_Frequency from sql_view where `Violation Precinct` = 14 group by `Violation Code` order by count(*) desc limit 7 """).show()

+--------------+----------------------------+
|Violation Code|Violation_Precinct_Frequency|
+--------------+----------------------------+
|            14|                       45824|
|            69|                       30464|
|            31|                       22644|
|            47|                       18655|
|            42|                       10027|
|            46|                        8400|
|            19|                        7448|
+--------------+----------------------------+



In [27]:
VC_14 = {14,69,31,47,42,46,19}
type(VC_14)

set

In [28]:
spark.sql("""select `Violation Code`,count(`Violation Code`) as Violation_Precinct_Frequency from sql_view where `Violation Precinct` = 1 group by `Violation Code` order by count(*) desc limit 7   """).show()

+--------------+----------------------------+
|Violation Code|Violation_Precinct_Frequency|
+--------------+----------------------------+
|            14|                       38774|
|            16|                       19131|
|            20|                       15295|
|            46|                       13434|
|            38|                        8549|
|            17|                        7575|
|            37|                        6461|
+--------------+----------------------------+



In [29]:
VC_1 = {14,16,20,46,38,17,37}
type(VC_1)

set

In [30]:
A1 = VC_19.intersection(VC_14)
A1

{14, 46}

In [31]:
A2 = A1.intersection(VC_1)
A2

{14, 46}

<b> observation: Violation code with 14 and 46 have more number of entries created for Precincts 14,19 and 1 </b>

<B> 5. Find out the properties of parking violations across different times of the day:<B> 


<b> 5.1 Find a way to deal with missing values, if any.<b>
    
    

In [32]:
df_2017_updated1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



<b> Observation: There are no null values observed in any column </b>

In [33]:
spark.sql("""select * from sql_view where `Violation Time` = 'nan' """).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    1404224350| FCJ5087|                NY|2017-05-04|            40|             SUBN|       NISSA|                70|            156|           nan|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



Observation : <B> Remove null values with string 'nan</B> 

In [34]:
# remove the rows with value 'nan'
df_2017_updated2 = spark.sql("""select * from sql_view where `Violation Time` <> 'nan' """)

In [35]:
# Update the new data
df_2017_updated2.createOrReplaceTempView("sql_view")

In [36]:
# Check the data after removing erreneous data

spark.sql("""select * from sql_view where `Violation Time` = 'nan' """).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



<b> 5.2 The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.<b>



In [37]:
# Verify the Violation time records before modification
spark.sql("""select `Violation Time` from sql_view""").show(10)

+--------------+
|Violation Time|
+--------------+
|         1120A|
|         0015A|
|         0525A|
|         0256P|
|         1232A|
|         1021A|
|         0721A|
|         0940A|
|         1223P|
|         1028A|
+--------------+
only showing top 10 rows



<B>  Change 12 hour format to 24 hour format by adding ':' between hours and minutes and also adding space with 'M' to time format </B> 

In [38]:
# Add 'M' to timeformat to prepare it for conversion

df_2017_updated3 = spark.sql("""select *,concat(`Violation Time`,'M') as `Violation Time new` from sql_view""")
df_2017_updated3.show(2)
df_2017_updated3.createOrReplaceTempView("sql_view")

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Violation Time new|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|            1120AM|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|            0015AM|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+---------

In [39]:
# Drop the old column and rename the new column
df_2017_updated3 = df_2017_updated3.drop('Violation Time')

df_2017_updated3 = df_2017_updated3.withColumnRenamed('Violation Time New','Violation Time')

In [40]:
# check the records after renaming column 
df_2017_updated3.show(2)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|        1120AM|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|        0015AM|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
only showing top 2 rows



In [41]:
# Change the format from 'hhmmaa'  to 'hh:mm aa'
from pyspark.sql.functions import col, regexp_replace

df_2017_updated3 = df_2017_updated3.withColumn("Violation Time", regexp_replace(col("Violation Time") ,  "(\\d{2})(\\d{2})" , "$1:$2 " ) )
df_2017_updated3.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|      11:20 AM|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|      00:15 AM|
|    1413656420|T672371C|                NY|2017-02-04|            40|             TAXI|       TOYOT|                73|             73|      05:25 AM|
|    8480309064| 51771JW|                NY|2017-01-26|            64|              VAN|

In [42]:
# Change the format from 12 hours to 24 hours
from pyspark.sql.functions import unix_timestamp,from_unixtime

df_2017_updated3 = df_2017_updated3.withColumn('Violation Time',from_unixtime(unix_timestamp(col(('Violation Time')), "hh:mm aa"), "HH:mm"))


In [43]:
# Check the new records
df_2017_updated3.show(2)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         11:20|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|          null|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
only showing top 2 rows



In [44]:
# Save the data to sql view
df_2017_updated3.createOrReplaceTempView("sql_view")

In [45]:
# There are null values present in the column 
spark.sql("""select `Violation Time` from sql_view""").show()

+--------------+
|Violation Time|
+--------------+
|         11:20|
|          null|
|         05:25|
|         14:56|
|         00:32|
|         10:21|
|         07:21|
|         09:40|
|         12:23|
|         10:28|
|         01:48|
|         12:06|
|         13:41|
|         08:20|
|         10:43|
|         14:04|
|         08:53|
|         10:26|
|         11:52|
|         10:31|
+--------------+
only showing top 20 rows



<B> Check the number of null values present in Violatio Time column , remove the null values if they are less compared to non null records </B>

In [46]:
# Check the number of null value records
spark.sql("""select count(*) from sql_view where `Violation Time` is null """).show()

+--------+
|count(1)|
+--------+
|   28397|
+--------+



In [47]:
# Check the number of non null value records
spark.sql("""select count(*) from sql_view where `Violation Time` is not null """).show()

+--------+
|count(1)|
+--------+
| 4321761|
+--------+



In [48]:
# remove the null values from dataframe 
df_2017_updated3 = spark.sql("""select * from sql_view where `Violation Time` is not null""")

In [49]:
# Save the sql view 
df_2017_updated3.createOrReplaceTempView("sql_view")

<b> 5.3 Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations. </b>

In [50]:
# Divide 24 hours into six equal discrete bins of time
df_2017_updated3 = spark.sql("""select *,case
             when hour(`Violation Time`) between 0 and 4 then '0_to_4'
             when hour(`Violation Time`) between 4 and 8 then '4_to_8'
             when hour(`Violation Time`) between 8 and 12 then '8_to_12'
             when hour(`Violation Time`) between 12 and 16 then '12_to_16'
             when hour(`Violation Time`) between 16 and 20 then '16_to_20'
             when hour(`Violation Time`) between 20 and 24 then '20_to_24'
             end as Violation_Time_Intervels 
             FROM sql_view""")

In [51]:
# Check the records in the dataframe
df_2017_updated3.show(2)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Violation_Time_Intervels|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         11:20|                 8_to_12|
|    1413656420|T672371C|                NY|2017-02-04|            40|             TAXI|       TOYOT|                73|             73|         05:25|                  4_to_8|
+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------

In [52]:
# Save the data to sql view
df_2017_updated3.createOrReplaceTempView("sql_view")

In [53]:
df_2017_updated3.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Violation_Time_Intervels|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         11:20|                 8_to_12|
|    1413656420|T672371C|                NY|2017-02-04|            40|             TAXI|       TOYOT|                73|             73|         05:25|                  4_to_8|
|    8480309064| 51771JW|                NY|2017-01-26|            64|              VAN|       INTER|              

In [54]:

# For each of the time intervals (6 intervals ) find the most violations occuring 
spark.sql("""select  violation_code,Violation_Time
             FROM
             (
             (select `Violation Code` as violation_code ,  '0_to_4' as Violation_Time from sql_view
             where   Violation_Time_Intervels = '0_to_4'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
             UNION
             (select `Violation Code` as violation_code ,  '4_to_8' as Violation_Time from sql_view
             where   Violation_Time_Intervels =  '4_to_8'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
              UNION
             (select `Violation Code` as violation_code , '8_to_12' as Violation_Time  from sql_view
             where   Violation_Time_Intervels =  '8_to_12'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
              UNION
             (select `Violation Code` as violation_code ,'12_to_16' as Violation_Time  from sql_view
             where   Violation_Time_Intervels = '12_to_16'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
              UNION
             (select `Violation Code` as violation_code ,'16_to_20' as Violation_Time  from sql_view
             where   Violation_Time_Intervels = '16_to_20'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
              UNION
             (select `Violation Code` as violation_code , '20_to_24' as Violation_Time from sql_view
             where   Violation_Time_Intervels = '20_to_24'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
             ) t
             order by Violation_Time """).show()

+--------------+--------------+
|violation_code|Violation_Time|
+--------------+--------------+
|            21|        0_to_4|
|            40|        0_to_4|
|            14|        0_to_4|
|            14|      12_to_16|
|            38|      12_to_16|
|            37|      12_to_16|
|            37|      16_to_20|
|            14|      16_to_20|
|            38|      16_to_20|
|            38|      20_to_24|
|            40|      20_to_24|
|            14|      20_to_24|
|            40|        4_to_8|
|            21|        4_to_8|
|            14|        4_to_8|
|            38|       8_to_12|
|            14|       8_to_12|
|            21|       8_to_12|
+--------------+--------------+



<B> Observatin:  It is observed that  violation code : 14 is present in all forms of the time frame </B>

<B> 5.4 Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part) </B>



In [55]:
# Check the top 3 violation codes
spark.sql("""select `Violation Code` from sql_view
             group by `Violation Code`
             order by count(*) desc
             limit 3""")

DataFrame[Violation Code: string]

In [56]:

# For the three most commonly occurring violation codes, find the most common time of the day
spark.sql("""select Violation_Time_Intervels,Violation_Code
             FROM
             ((select Violation_Time_Intervels, '21' as Violation_Code from sql_view
             where   `Violation Code` = 21
             group by Violation_Time_Intervels
             order by count(*) desc
             limit 3)
             UNION
             (select Violation_Time_Intervels, '38' as Violation_Code from sql_view
             where   `Violation Code` = 38
             group by Violation_Time_Intervels
             order by count(*) desc
             limit 3)
             UNION
             (select Violation_Time_Intervels, '14' as Violation_Code from sql_view
             where   `Violation Code` = 14
             group by Violation_Time_Intervels
             order by count(*) desc
             limit 3))t
             order by Violation_Code
             """).show()

+------------------------+--------------+
|Violation_Time_Intervels|Violation_Code|
+------------------------+--------------+
|                12_to_16|            14|
|                 8_to_12|            14|
|                  4_to_8|            14|
|                 8_to_12|            21|
|                  4_to_8|            21|
|                  0_to_4|            21|
|                16_to_20|            38|
|                12_to_16|            38|
|                 8_to_12|            38|
+------------------------+--------------+



<B> Observation: It is observed that the top violation codes occured commonly in 8 to 12 time frame </B>

<B> 6.1 Divide the year into a certain number of seasons, and find the frequencies of tickets for each season.  </B>

In [57]:
# Divide 24 hours into six equal discrete bins of time
df_2017_updated3 = spark.sql("""select *,case
             when month(`Issue Date`) in (4,5,6) then 'Summer'
             when month(`Issue Date`) in (7,8,9) then 'Rainy'
             when month(`Issue Date`) in (10,11,12,1)  then 'Winter'
             when month(`Issue Date`) in (2,3)  then 'Spring'
             end as Seasons 
             FROM sql_view""")

In [58]:
# Check the data after adding seasons
df_2017_updated3.show(2)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+-------+
|Summons Number|Plate ID|Registration state|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Violation_Time_Intervels|Seasons|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+------------------------+-------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         11:20|                 8_to_12| Summer|
|    1413656420|T672371C|                NY|2017-02-04|            40|             TAXI|       TOYOT|                73|             73|         05:25|                  4_to_8| Spring|
+--------------+--------+------------------+----------+--------------+-----

In [59]:
# Update the data with changes
df_2017_updated3.createOrReplaceTempView("sql_view")

In [60]:
# Find the frequencies of tickets for each season.
spark.sql("""select Seasons , count(*) as no_of_tickets
             from sql_view 
             group by Seasons
             order by count(*) desc""").show()


+-------+-------------+
|Seasons|no_of_tickets|
+-------+-------------+
| Summer|      2217486|
| Spring|      1422131|
| Winter|       681355|
|  Rainy|          789|
+-------+-------------+



<B> Observation: There are more tickets in the summer than any other season  </B>

<B> 6.2 Find the three most common violations for each of these seasons </B>

In [61]:
# Find the three most common violations for each of these seasons.

spark.sql("""select Violation_Code,Season,cnt
             FROM
             ((select `Violation Code` as Violation_Code, 'Summer' as Season ,count(*) as cnt from sql_view
             where   Seasons = 'Summer'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
             UNION
             (select `Violation Code` as Violation_Code, 'Spring' as Season,count(*) as cnt from sql_view
             where   Seasons ='Spring'
             group by `Violation Code`
             order by count(*) desc
             limit 3)
             UNION
             (select `Violation Code` as Violation_Code, 'Winter' as Season,count(*) as cnt from sql_view
             where   Seasons = 'Winter'
             group by `Violation Code`
             order by count(*) desc
             limit 3))t
             order by Season
             """).show()

+--------------+------+------+
|Violation_Code|Season|   cnt|
+--------------+------+------+
|            14|Spring|150925|
|            38|Spring|191351|
|            21|Spring|203643|
|            38|Summer|254664|
|            14|Summer|246673|
|            21|Summer|329246|
|            38|Winter| 95370|
|            14|Winter| 70248|
|            21|Winter| 98673|
+--------------+------+------+



<B> Observation:  14 , 38 and 21 are the frequent violation codes </B>

<B> 7.1 Find the total occurrences of the three most common violation codes. </B> 

In [64]:
spark.sql("""select `Violation Code`,count(*) as number_of_tickets
from sql_view
group by `Violation Code`
order by count(*) desc
limit 3""").show()

+--------------+-----------------+
|Violation Code|number_of_tickets|
+--------------+-----------------+
|            21|           631584|
|            38|           541392|
|            14|           467927|
+--------------+-----------------+



<B> 7.2 Fines associated for the codes 14 115 dollars , code 21 55 dollars , code 50 dollars </B>

<B>7.3 find the total amount collected for the three violation codes with the maximum tickets </B>

In [73]:
spark.sql("""select `Violation Code`, case 
when `Violation Code`=14 then count(*)*115
when `Violation Code`=38 then count(*)*50
when `Violation Code` = 21 then count(*)*55 end as total_money 
from sql_view
group by `Violation Code`
order by total_money desc
limit 3
 """).show()

+--------------+-----------+
|Violation Code|total_money|
+--------------+-----------+
|            14|   53811605|
|            21|   34737120|
|            38|   27069600|
+--------------+-----------+



Observation: Code 14 has collected more amount of fines than others

<B> 7.4  What can you intuitively infer from these findings? </B>

<B> Observation: As per above observation Standing or parking where standing is not allowed by sign, street marking or; traffic control device is route cause of high amount collection through ticket. </B>