# PROBLEM STATEMENT

#### New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets.
#### In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available on Kaggle. We will try and perform some exploratory analysis on #a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random #samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017. 

In [21]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.
#A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and 
# read parquet files.To create a SparkSession, use the following builder pattern:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("NYC Parking Ticket Assignment") \
    .getOrCreate()

In [22]:
# Datafram can be created by by calling read method on spark object

tickets = spark.read.csv("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv", inferSchema=True,header=True)
tickets.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [23]:
tickets.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



### PART 1: Examine the data

In [24]:
#1.the total number of tickets for the year 2017
from pyspark.sql.functions import count, year

total_ticket_count_2017 = tickets.filter(year("Issue Date")==2017).agg(count("Summons Number"))
total_ticket_count_2017 = total_ticket_count_2017.withColumnRenamed("count(Summons Number)", "Total Count Of Tickets(2017)")
total_ticket_count_2017.show()

+----------------------------+
|Total Count Of Tickets(2017)|
+----------------------------+
|                     5431918|
+----------------------------+



In [26]:
# the number of parking tickets per unique states from where the cars that got parking tickets came.
from pyspark.sql.functions import count, countDistinct, desc, regexp_replace, sum, col

unique_states = tickets.filter(year("Issue Date")==2017).groupby("Registration State")\
                .agg(countDistinct("Summons Number"))
unique_states = unique_states.withColumnRenamed("count(DISTINCT Summons Number)","count")
unique_states.sort(desc("count")).show(5)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4273951|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
+------------------+-------+
only showing top 5 rows



In [27]:
# replacing 99 in Registration State with NY
state_replace = unique_states.withColumn("Registration State", regexp_replace("Registration State", "99", "NY"))
state_replace = state_replace.where(col("Registration State") == "NY").groupby("Registration State")\
                        .agg(sum("count").alias("count"))
# total number of cars from NY
state_replace.show(100)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
+------------------+-------+



In [28]:
temp = unique_states.select("Registration State", "count")\
                    .where((col("Registration State") != "NY") & (col("Registration State") != "99"))\
                    .sort(desc("count"))
# combining dataframes with NY only count and remaining state counts to get the final dataframe
unique_states_count = state_replace.union(temp)
unique_states_count.sort(desc("count")).show(64)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
|                OK|   9088|
|                TN|   8514|
|                DE|   7905|
|                MI|   7231|
|                RI|   5814|
|                NH|   4119|
|                VT|   3683|
|                AL|   3178|
|                WA|   3052|
|                OR|   2622|
|                MO|   2483|
|                ON|   2460|
|             

In [29]:
# the number of unique states from where the cars that got parking tickets came.
num_unique_states = unique_states_count.agg(countDistinct("Registration State").alias("unique_states_count"))
num_unique_states.show()

+-------------------+
|unique_states_count|
+-------------------+
|                 64|
+-------------------+



### PART 1: Examine the data: RESULTS
##### 1. Find the total number of tickets for the year.
##### 5431918

##### 2. Find out the number of unique states from where the cars that got parking tickets came.
##### 64
##### New York state has the highest number of parking tickets which is 4290006.

### Aggregation Tasks

In [30]:
# Converting the dataframe into a spark sql table/view
tickets = tickets.withColumn("Registration State", regexp_replace("Registration State", "99", "NY"))

tickets.cache()
tickets.createOrReplaceTempView("v_tickets")

In [31]:
# 1. How often does each violation code occur? Display the frequency of the top five violation codes.
violation_code_freq = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Violation Code` ORDER BY Frequency desc LIMIT 5""") 
violation_code_freq.show()

+--------------+---------+
|Violation Code|Frequency|
+--------------+---------+
|            21|   768087|
|            36|   662765|
|            38|   542079|
|            14|   476664|
|            20|   319646|
+--------------+---------+



In [32]:
#2(a).How often does each 'vehicle body type' get a parking ticket?
parking_tick_by_vehicle_body_type = spark.sql("""SELECT `Vehicle Body Type`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Vehicle Body Type` ORDER BY Frequency desc LIMIT 5""")
parking_tick_by_vehicle_body_type.show()

+-----------------+---------+
|Vehicle Body Type|Frequency|
+-----------------+---------+
|             SUBN|  1883954|
|             4DSD|  1547312|
|              VAN|   724029|
|             DELV|   358984|
|              SDN|   194197|
+-----------------+---------+



In [33]:
#2(b).How often does each 'Vehicle Make' get a parking ticket?
parking_tick_by_vehicle_Make = spark.sql("""SELECT `Vehicle Make`, count(*) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Vehicle Make` ORDER BY Frequency desc LIMIT 5""")
parking_tick_by_vehicle_Make.show()

+------------+---------+
|Vehicle Make|Frequency|
+------------+---------+
|        FORD|   636844|
|       TOYOT|   605291|
|       HONDA|   538884|
|       NISSA|   462017|
|       CHEVR|   356032|
+------------+---------+



In [34]:
#3. Find the (5 highest) frequencies of tickets for each of the following:
# Ignoring the entry with Violation & Issue Precinct as 0.
#3(a): 'Violation Precinct'
parking_tick_by_Violation_Precint = spark.sql("""SELECT `Violation Precinct`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Violation Precinct` ORDER BY Frequency desc LIMIT 6""")
parking_tick_by_Violation_Precint.show()

#3(b): 'Issuer Precinct'
parking_tick_by_Issuer_Precint = spark.sql("""SELECT `Issuer Precinct`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Issuer Precinct` ORDER BY Frequency desc LIMIT 6""")
parking_tick_by_Issuer_Precint.show()

+------------------+---------+
|Violation Precinct|Frequency|
+------------------+---------+
|                 0|   925596|
|                19|   274445|
|                14|   203553|
|                 1|   174702|
|                18|   169131|
|               114|   147444|
+------------------+---------+

+---------------+---------+
|Issuer Precinct|Frequency|
+---------------+---------+
|              0|  1078406|
|             19|   266961|
|             14|   200495|
|              1|   168740|
|             18|   162994|
|            114|   144054|
+---------------+---------+



#### Precinct 19 tops as the highest number of parking tickets for a Violation Precinct. 
#### Precinct 19 also tops as the highest number of parking tickets for a Issuer Precinct.
#### Address for Precint 19 is 153 E 67th St, New York, NY 10065, USA which means there is a higher number of parking violations happening around the 67th Street in NYC i.e. this area has severe problem of parking space.

In [35]:
#4. Find the violation code frequencies for three precincts that have issued the most number of tickets
# violation code frequncies for the top three precincts with highest frequency of tickets

#From above question, we found out that precinct 19, 14 & 1 has issued the most number of parking tickets.
#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_19_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 19` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 19 
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 19` desc LIMIT 5""")

freq_prcnt_19_VCode.show()

#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_14_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 14` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 14 
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 14` desc LIMIT 5""")

freq_prcnt_14_VCode.show()

#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_1_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 1` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 1
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 1` desc LIMIT 5""")

freq_prcnt_1_VCode.show()

+--------------+--------------------------------+
|Violation Code|Freq of Violation Codes Prnct 19|
+--------------+--------------------------------+
|            46|                           48445|
|            38|                           36386|
|            37|                           36056|
|            14|                           29797|
|            21|                           28415|
+--------------+--------------------------------+

+--------------+--------------------------------+
|Violation Code|Freq of Violation Codes Prnct 14|
+--------------+--------------------------------+
|            14|                           45036|
|            69|                           30464|
|            31|                           22555|
|            47|                           18364|
|            42|                           10027|
+--------------+--------------------------------+

+--------------+-------------------------------+
|Violation Code|Freq of Violation Codes Prnct 1|


In [36]:
#5(a) Find a way to deal with missing values
from pyspark.sql.functions import isnan, isnull

# Filtering Record for year 2017 only
tickets_2017 = tickets.filter(year("Issue Date")==2017)

# Checking null or nan values in the Violation Time field
null_count = tickets_2017.where(col("Violation Time") == "null").count()
print("Count when column is null", null_count)

nan_count = tickets_2017.where(col("Violation Time") == "nan").count()
print("Count when column has 'nan' value", nan_count)

#Since, the count of nan values is quite low, so we are safe to drop the rows with nan values 
#as it won't affect our results much.
print("+++++++++++AFTER CLEANING THE DATA++++++++++++++")
tickets_2017_filtered = tickets_2017.filter(col("Violation Time") != "nan")
print("Count when column has 'nan':", tickets_2017_filtered.filter(col("Violation Time") == "nan").count())

Count when column is null 0
Count when column has 'nan' value 16
+++++++++++AFTER CLEANING THE DATA++++++++++++++
Count when column has 'nan': 0


In [44]:
#5(b)The Violation Time field is specified in a strange format. 
#    Find a way to make this a time attribute that you can use to divide into groups.

#converting the Violation Time column to timestamp
from pyspark.sql.functions import substring

tickets_2017_hour = tickets_2017_filtered.withColumn("Hour",col("Violation Time").substr(1,2))
tickets_2017_min = tickets_2017_hour.withColumn("Minutes", col("Violation Time").substr(3,2))
tickets_2017_new = tickets_2017_min.withColumn("TimeOfDay",col("Violation Time").substr(5,1))
tickets_2017_new.show(5)
tickets_2017_new.printSchema()

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Hour|Minutes|TimeOfDay|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|  11|     20|        A|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|  08|     52|        P|
|    1407740258| 2513JMG|                NY|2017-01-11 00:00:00|            78| 

In [84]:
#5(b)Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
#    For each of these groups, find the three most commonly occurring violations.
tickets_2017_new=tickets_2017_new.withColumn("Hour",col("Hour").cast("integer"))
tickets_2017_new=tickets_2017_new.withColumn("Minutes",col("Minutes").cast("integer"))

# creating view after removing the null or nan values from Violation Field
tickets_2017_new.createOrReplaceTempView("tickets_2017_new")

#5(b:1) Dividing Violation Time into 6 buckets
violation_time_interval = spark.sql("""SELECT *, CASE WHEN (HOUR IN (0,1,2,3,12)) THEN CASE WHEN (TimeOfDay=="A") THEN "DAWN" ELSE "AFTERNOON" END WHEN (HOUR IN (4,5,6,7)) THEN CASE WHEN (TimeOfDay=="A") THEN "MORNING" ELSE "DUSK" END WHEN (HOUR IN (8,9,10,11) AND (TimeOfDay=="A")) THEN "LATE MORNING" ELSE "NIGHT" END AS `Time of the Day` FROM tickets_2017_new""")

violation_time_interval.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+---------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Hour|Minutes|TimeOfDay|Time of the Day|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+---------------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|  11|     20|        A|   LATE MORNING|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|   8|     52|        P|          NIGHT|


#### Divided the Violation Time into 6 time intervals:
#### 1. DAWN - 00 HRS to 03 Hrs (AM)
#### 2. MORNING - 04 HRS to 07 Hrs (AM)
#### 3. LATE MORNING - 08 HRS to 11 Hrs (AM)
#### 4. AFTERNOON - 12 HRS to 03 Hrs (PM)
#### 5. DUSK - 04 HRS to 07 Hrs (PM)
#### 6. NIGHT - 08 HRS to 11 Hrs (PM)

In [94]:
#5(b:2) three most occuring violations for each time interval
violation_time_interval.cache()
violation_time_interval.createOrReplaceTempView("violation_per_time_interval")
# 3 most violation for dawn
three_most_violation_dawn = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_dawn FROM violation_per_time_interval WHERE `Time of the Day`== "DAWN" GROUP BY `Violation Code` ORDER BY `count_VCode_dawn` DESC LIMIT 3""")
print("Three most common violation in DAWN:\n")
three_most_violation_dawn.show()

# 3 most violation for morning
three_most_violation_morning = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_morning FROM violation_per_time_interval WHERE `Time of the Day`== "MORNING" GROUP BY `Violation Code` ORDER BY `count_VCode_morning` DESC LIMIT 3""")
print("Three most common violation in MORNING:\n")
three_most_violation_morning.show()

# 3 most violation for late morning
three_most_violation_late_morning = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_late_mor FROM violation_per_time_interval WHERE `Time of the Day`== "LATE MORNING" GROUP BY `Violation Code` ORDER BY `count_VCode_late_mor` DESC LIMIT 3""")
print("Three most common violation in LATE MORNING:\n")
three_most_violation_late_morning.show()

# 3 most violation for afternoon
three_most_violation_afternoon = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_aftrn FROM violation_per_time_interval WHERE `Time of the Day`== "AFTERNOON" GROUP BY `Violation Code` ORDER BY `count_VCode_aftrn` DESC LIMIT 3""")
print("Three most common violation in AFTERNOON:\n")
three_most_violation_afternoon.show()

# 3 most violation for dusk
three_most_violation_dusk= spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_dusk FROM violation_per_time_interval WHERE `Time of the Day`== "DUSK" GROUP BY `Violation Code` ORDER BY `count_VCode_dusk` DESC LIMIT 3""")
print("Three most common violation in DUSK:\n")
three_most_violation_dusk.show()

# 3 most violation for night
three_most_violation_night = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_night FROM violation_per_time_interval WHERE `Time of the Day`== "NIGHT" GROUP BY `Violation Code` ORDER BY `count_VCode_night` DESC LIMIT 3""")
print("Three most common violation in NIGHT:\n")
three_most_violation_night.show()

Three most common violation in DAWN:

+--------------+----------------+
|Violation Code|count_VCode_dawn|
+--------------+----------------+
|            21|           36958|
|            40|           25867|
|            78|           15528|
+--------------+----------------+

Three most common violation in MORNING:

+--------------+-------------------+
|Violation Code|count_VCode_morning|
+--------------+-------------------+
|            14|              74114|
|            40|              60652|
|            21|              57897|
+--------------+-------------------+

Three most common violation in LATE MORNING:

+--------------+--------------------+
|Violation Code|count_VCode_late_mor|
+--------------+--------------------+
|            21|              598069|
|            36|              348165|
|            38|              176570|
+--------------+--------------------+

Three most common violation in AFTERNOON:

+--------------+-----------------+
|Violation Code|count_VCode_aft

In [122]:
#5(b:3) For the three most commonly occurring violation codes, find the most common time of the day
common_time_for_violation = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) as Three_most_common_VCode 
            FROM violation_per_time_interval GROUP BY `Violation Code` ORDER BY `Three_most_common_VCode` DESC LIMIT 3""")
print("Three most common occuring violation codes \n")
common_time_for_violation.show()

#most common time of the day for violation code 21
print("Common time of the day for violation code 21")
common_time_vcode_21= spark.sql("""SELECT count(`Summons Number`) AS `VCode_21_freq`, `Time of the Day` 
                            FROM violation_per_time_interval WHERE `Violation Code` = 21 GROUP BY `Time of the Day` 
                            ORDER BY `VCode_21_freq` DESC""")
common_time_vcode_21.show()

#most common time of the day for violation code 36
print("Common time of the day for violation code 36")
common_time_vcode_36= spark.sql("""SELECT count(`Summons Number`) AS `VCode_36_freq`, `Time of the Day` 
                            FROM violation_per_time_interval WHERE `Violation Code` = 36 GROUP BY `Time of the Day` 
                            ORDER BY `VCode_36_freq` DESC""")
common_time_vcode_36.show()

#most common time of the day for violation code 38
print("Common time of the day for violation code 38")
common_time_vcode_38= spark.sql("""SELECT count(`Summons Number`) AS `VCode_38_freq`, `Time of the Day` 
                            FROM violation_per_time_interval WHERE `Violation Code` = 38 GROUP BY `Time of the Day` 
                            ORDER BY `VCode_38_freq` DESC""")
common_time_vcode_38.show()


Three most common occuring violation codes 

+--------------+-----------------------+
|Violation Code|Three_most_common_VCode|
+--------------+-----------------------+
|            21|                 768087|
|            36|                 662765|
|            38|                 542079|
+--------------+-----------------------+

Common time of the day for violation code 21
+-------------+---------------+
|VCode_21_freq|Time of the Day|
+-------------+---------------+
|       598069|   LATE MORNING|
|        74695|      AFTERNOON|
|        57897|        MORNING|
|        36958|           DAWN|
|          259|           DUSK|
|          209|          NIGHT|
+-------------+---------------+

Common time of the day for violation code 36
+-------------+---------------+
|VCode_36_freq|Time of the Day|
+-------------+---------------+
|       348165|   LATE MORNING|
|       286284|      AFTERNOON|
|        14782|        MORNING|
|        13534|           DUSK|
+-------------+---------------+


**6.Let’s try and find some seasonality in this data:
First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)
Then, find the three most common violations for each of these seasons..**

* For this first we convert string issue date into date 
* then we extract month from it 
* according to month we will assign season.
* count number of tickits for each season
* then we count top 3 code for each season.

In [123]:
# converting Issue Date as Date Type
tickets_2017=tickets_2017.withColumn("Issue Date", tickets_2017["Issue Date"].cast("date"))
tickets_2017.show(10)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|
|    1413656420|T672371C|                NY|2017-02-04|            40|             TAXI|

In [137]:
# extracting month from the Issue Date column
from pyspark.sql.functions import year, month, dayofmonth

ticket_month=tickets_2017.withColumn("Month", month("Issue Date"))
ticket_month.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Month|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|    6|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|    6|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|    1|
|    1413656420|T672371C|                NY|2017-02-

In [138]:
#6(a:1) Assigning seasons based on month
#creating view from the dataframe
ticket_month.createOrReplaceTempView("v_tickets_month")

tickets_season=spark.sql("""SELECT *,CASE WHEN Month in (7,8,9,10) THEN "MONSOON" WHEN Month in (2,3,4,5,6) THEN "SUMMER" ELSE "WINTER" END AS Season FROM v_tickets_month""")
tickets_season.show(5)

#creating view from the dataframe
tickets_season.createOrReplaceTempView("v_tickets_season")

#6(a:2) finding the frequencies of tickets for each season
tickets_season_freq = spark.sql("""SELECT `Season`, count(`Summons Number`) as `Freq of Parking Tickets Per Season` 
                            FROM v_tickets_season GROUP BY `Season` ORDER BY `Freq of Parking Tickets Per Season` DESC""")
                                
print("Frequencies of tickets for each Season:\n")
tickets_season_freq.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Month|Season|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|    6|SUMMER|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|    6|SUMMER|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|    1|WINTER|
|    14136

* Summer has the higest frequency of parking tickets

In [140]:
#6(b) 3 most common violations per season
#Winter
tickets_winter = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) AS `VCode_wint_freq` 
                            FROM v_tickets_season WHERE  `Season`="WINTER" GROUP BY `Violation Code` 
                            ORDER BY `VCode_wint_freq` DESC LIMIT 3""")
print("3 common violation codes in Winter Season:\n")
tickets_winter.show()

#Summer
tickets_summer = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) AS `VCode_sum_freq` 
                            FROM v_tickets_season WHERE  `Season`="SUMMER" GROUP BY `Violation Code` 
                            ORDER BY `VCode_sum_freq` DESC LIMIT 3""")
print("3 common violation codes in Summer Season:\n")
tickets_summer.show()

#Monsoon
tickets_monsoon = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) AS `VCode_mon_freq` 
                            FROM v_tickets_season WHERE  `Season`="MONSOON" GROUP BY `Violation Code` 
                            ORDER BY `VCode_mon_freq` DESC LIMIT 3""")
print("3 common violation codes in Monsoon Season:\n")
tickets_monsoon.show()

3 common violation codes in Winter Season:

+--------------+---------------+
|Violation Code|VCode_wint_freq|
+--------------+---------------+
|            36|         129769|
|            21|         119931|
|            38|          95451|
+--------------+---------------+

3 common violation codes in Summer Season:

+--------------+--------------+
|Violation Code|VCode_sum_freq|
+--------------+--------------+
|            21|        647909|
|            36|        532996|
|            38|        446618|
+--------------+--------------+

3 common violation codes in Monsoon Season:

+--------------+--------------+
|Violation Code|VCode_mon_freq|
+--------------+--------------+
|            46|           287|
|            21|           247|
|            40|           146|
+--------------+--------------+



** Summer **

** Monsson **

* For Summer and Winter, we get common violation codes in top 3 which are 21,36,38.
* For Monsoon, common violation codes in top 3  are 46,21,40.

**7.The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:**

* We count number of ticket for each code then we multiply top 3 code with find which we get from previous question. 

In [144]:
#7(a) top 3 violation codes
tickets_fine=tickets_2017.cube('Violation code').count().sort(col('count').desc())
print("Occurrences of three most violation codes:\n")
tickets_fine.show(4)

Occurrences of three most violation codes:

+--------------+-------+
|Violation code|  count|
+--------------+-------+
|          null|5431918|
|            21| 768087|
|            36| 662765|
|            38| 542079|
+--------------+-------+
only showing top 4 rows



In [151]:
#7(b)
# creating table from dataframe
tickets_fine.createOrReplaceTempView("v_tickets_fine")

#Violation code 21
print("total amount of fine for violation code 21")
vc21_total_fine=spark.sql("""select `Violation code`,cast(`count`*((65+45)/2) as int) as `Total Fine Collected` 
                                  from v_tickets_fine where `Violation code` ="21" """)
vc21_total_fine.show()

#Violation code 36
print("total amount of fine for violation code 36")
vc36_total_fine=spark.sql("""select `Violation code`,cast(`count`*((50+50)/2) as int) as `Total Fine Collected` 
                              from v_tickets_fine where `Violation code` ="36" """)
vc36_total_fine.show()

#Violation code 38
print("total amount of fine for violation code 38")
vc38_total_fine=spark.sql("""select `Violation code`,cast(`count`*((65+35)/2) as int) as `Total Fine Collected` 
                                  from v_tickets_fine where `Violation code` ="38" """)
vc38_total_fine.show()

total amount of fine for violation code 21
+--------------+--------------------+
|Violation code|Total Fine Collected|
+--------------+--------------------+
|            21|            42244785|
+--------------+--------------------+

total amount of fine for violation code 36
+--------------+--------------------+
|Violation code|Total Fine Collected|
+--------------+--------------------+
|            36|            33138250|
+--------------+--------------------+

total amount of fine for violation code 38
+--------------+--------------------+
|Violation code|Total Fine Collected|
+--------------+--------------------+
|            38|            27103950|
+--------------+--------------------+



In [152]:
# stopping the spark session
spark.stop()