In [1]:
#getting the spark session object and providing the appliaction name
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("NYC_Park_assignment") \
    .getOrCreate()

In [2]:
#reading the input file 
df=spark.read.format('csv').option("header", "true").option("inferSchema", "true").\
load('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

In [3]:
# checking few records of the dataframe
df.show(10)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [4]:
#checking the metadata of the dataframe
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



In [5]:
# register dataframe as temp table to use spark-sql
df.registerTempTable("nyc")

In [6]:
# getting the time range for which the data belongs
spark.sql("select min(`Issue Date`), max(`Issue Date`) from nyc").show()

# so we see that parking tickets data exits not only for year 2017, but it exists for other years as well.
# Assumption - But as per TA, We are considering all records belong to Fiscal_Year_2017.

+-------------------+-------------------+
|    min(Issue Date)|    max(Issue Date)|
+-------------------+-------------------+
|1972-03-30 00:00:00|2069-11-19 00:00:00|
+-------------------+-------------------+



In [7]:
# checking mean, meadin  and other attributes
df.describe().show()

+-------+--------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|      Summons Number|Plate ID|Registration State|    Violation Code| Vehicle Body Type|      Vehicle Make|Violation Precinct|  Issuer Precinct|   Violation Time|
+-------+--------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|            10803028|10803028|          10803028|          10803028|          10803028|          10803028|          10803028|         10803028|         10803028|
|   mean|6.8174470290656595E9|Infinity|              99.0|34.599430455979565|3.9258887134586864| 6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev| 2.320233962328229E9|     NaN|               0.0|19.359868716323483|0.5013415469252523|18091.257389147086|40.552560268435805|62.66703577

In [8]:
#checking Bins, Percentile and some other attributes to perform eda
df.summary().show()

+-------+-------------------+---------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+
|summary|     Summons Number| Plate ID|Registration State|    Violation Code| Vehicle Body Type|     Vehicle Make|Violation Precinct|  Issuer Precinct|   Violation Time|
+-------+-------------------+---------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+
|  count|           10803028| 10803028|          10803028|          10803028|          10803028|         10803028|          10803028|         10803028|         10803028|
|   mean|6.817447029065661E9| Infinity|              99.0|34.599430455979565|3.9258887134586864|6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev|2.320233962328229E9|      NaN|               0.0|19.359868716323483|0.5013415469252522|18091.25738914709|40.552560268435805|62.66703577269466

In [12]:
# Check amount of null or NaN columns in data
# Null values are found in Plate ID, Vehicle Body Type, Vehicle Make and Violation Time
from pyspark.sql.functions import isnan, when, count, col
parking_without_issuedate = df.drop('Issue Date')
parking_without_issuedate\
        .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in parking_without_issuedate.columns]).show()

+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|     728|                 0|             0|            42711|       73050|                 0|              0|            63|
+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+



In [11]:
# nan is replaced with None in Plate ID, Vehicle Body Type, Vehicle Make and Violation Time
from pyspark.sql.functions import isnan, when, count, col
df = df.withColumn("Vehicle Body Type",\
                             when(col("Vehicle Body Type") == "nan", None).otherwise(col("Vehicle Body Type")))\
                             .withColumn("Vehicle Make",\
                             when(col("Vehicle Make") == "nan", None).otherwise(col("Vehicle Make")))\
                             .withColumn("Violation Time",\
                             when(col("Violation Time") == "nan", None).otherwise(col("Violation Time")))\
                             .withColumn("Plate ID",\
                             when(col("Plate ID") == "nan", None).otherwise(col("Plate ID")))

In [13]:
#to use spark-sql, register dataframe as temp view 
df.createOrReplaceTempView("nyc")

#### Examine the data
###### 1.Find the total number of tickets for the year.

In [14]:
# Assumption - But as per TA, We are considering all records belong to Fiscal_Year_2017.

spark.sql("select count(*) as total_tickets from nyc").show()

# So total tickets for the year is 10803028

+-------------+
|total_tickets|
+-------------+
|     10803028|
+-------------+



##### 2. Find out the number of unique states from where the cars that got parking tickets came. 

In [15]:
#checkout distinct Registration State
spark.sql("select distinct(`Registration State`) as total_tickets from nyc").show(10)


+-------------+
|total_tickets|
+-------------+
|           SC|
|           AZ|
|           NS|
|           LA|
|           MN|
|           NJ|
|           MX|
|           DC|
|           OR|
|           99|
+-------------+
only showing top 10 rows



In [16]:
#Checking the frequecncy of the various state and finding most frequent state
spark.sql("select `Registration State`, count(*) as freq from nyc group by `Registration State` order by freq desc").show(10)

# 'NY' comes as most frequent state

+------------------+-------+
|Registration State|   freq|
+------------------+-------+
|                NY|8481061|
|                NJ| 925965|
|                PA| 285419|
|                FL| 144556|
|                CT| 141088|
|                MA|  85547|
|                IN|  80749|
|                VA|  72626|
|                MD|  61800|
|                NC|  55806|
+------------------+-------+
only showing top 10 rows



In [17]:
#replace 99 to NY as NY is most frequent
from pyspark.sql.functions import *
df_state=df.withColumn('Registration State', \
                       when(col('Registration State') == ("99"), "NY").otherwise(col("Registration State")))

In [25]:
# make another view which has the above imputed values
df_state.createOrReplaceTempView('imputed_table')

In [26]:
#Finding unique states
spark.sql("select count(*) as Unique_states from (select distinct(`Registration State`) \
          as total_tickets from imputed_table) temp ").show()
# So number of unique states from where the cars that got parking tickets came is 66

+-------------+
|Unique_states|
+-------------+
|           66|
+-------------+



### Aggregation tasks

#### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [27]:
spark.sql("select `Violation Code`, count(*) as freq_violation_code from \
imputed_table group by `Violation Code` order by freq_violation_code desc").show(5)

# top five Violation codes are - 21,36,38,14,20

+--------------+-------------------+
|Violation Code|freq_violation_code|
+--------------+-------------------+
|            21|            1528588|
|            36|            1400614|
|            38|            1062304|
|            14|             893498|
|            20|             618593|
+--------------+-------------------+
only showing top 5 rows



#### 2. How often does each 'vehicle body type' get a parking ticket? 
####     How about the 'vehicle make'? (Hint: Find the top 5 for both.)

In [28]:
# Top 5 'vehicle body type' which get parking tickets
spark.sql("select `Vehicle Body Type`, count(*) as freq_veh_body_type \
from imputed_table group by `Vehicle Body Type` order by freq_veh_body_type desc").show(5)

+-----------------+------------------+
|Vehicle Body Type|freq_veh_body_type|
+-----------------+------------------+
|             SUBN|           3719802|
|             4DSD|           3082020|
|              VAN|           1411970|
|             DELV|            687330|
|              SDN|            438191|
+-----------------+------------------+
only showing top 5 rows



In [29]:
# Top 5 'Vehicle Make' which get parking tickets
spark.sql("select `Vehicle Make`, count(*) as freq_veh_make \
from imputed_table group by `Vehicle Make` order by freq_veh_make desc").show(5)

+------------+-------------+
|Vehicle Make|freq_veh_make|
+------------+-------------+
|        FORD|      1280958|
|       TOYOT|      1211451|
|       HONDA|      1079238|
|       NISSA|       918590|
|       CHEVR|       714655|
+------------+-------------+
only showing top 5 rows



### 3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
#### 3.1 'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?
#### 3.2 'Issuer Precinct' (This is the precinct that issued the ticket.)


In [31]:
spark.sql("select `Violation Precinct`, count(*) as freq_voilation_pre \
from imputed_table where `Violation Precinct` !=0 group by `Violation Precinct` order by freq_voilation_pre desc").show(5)

# Top 5 Violation Precinct are 19, 14, 1, 18, 114 
# Violation precinct 19 is top among all percinct when it comes to violations of the parking laws

+------------------+------------------+
|Violation Precinct|freq_voilation_pre|
+------------------+------------------+
|                19|            535671|
|                14|            352450|
|                 1|            331810|
|                18|            306920|
|               114|            296514|
+------------------+------------------+
only showing top 5 rows



In [30]:
spark.sql("select `Issuer Precinct`, count(*) as freq_issuer_pre \
from imputed_table where `Issuer Precinct` !=0 group by `Issuer Precinct` order by freq_issuer_pre desc").show(5)

## Top 5 Issuer Precinct are 19, 14, 1, 18, 114 

+---------------+---------------+
|Issuer Precinct|freq_issuer_pre|
+---------------+---------------+
|             19|         521513|
|             14|         344977|
|              1|         321170|
|             18|         296553|
|            114|         289950|
+---------------+---------------+
only showing top 5 rows



#### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. 
#### Do these precinct zones have an exceptionally high frequency of certain violation codes? 
#### Are these codes common across precincts? 

In [88]:
# Top 3 Precinct are same in Issuer Precinct and Violation Precinct 
# i.e 19, 14 and 1
# Note: 0 is taken as erroneaous as mentioned in the question earlier, so 0 is not taken among the top 3

#First we calculate violation precinct frequency in Violation Precinct

spark.sql("select `Violation Precinct`, `Violation Code`,  count(*) as freq_voilation_code \
from imputed_table where `Violation Precinct` !=0 and `Violation Precinct` in (19,14,1) \
group by `Violation Precinct`,`Violation Code` order by freq_voilation_code desc").show(5)

+------------------+--------------+-------------------+
|Violation Precinct|Violation Code|freq_voilation_code|
+------------------+--------------+-------------------+
|                19|            46|              90530|
|                 1|            14|              76375|
|                14|            14|              75850|
|                19|            38|              74926|
|                19|            37|              73359|
+------------------+--------------+-------------------+
only showing top 5 rows



In [32]:
# we also check violation precinct frequency in Issuer Precinct
spark.sql("select `Issuer Precinct`, `Violation Code`,  count(*) as freq_voilation_code \
from imputed_table where `Issuer Precinct` !=0 and `Issuer Precinct` in (19,14,1) \
group by `Issuer Precinct`,`Violation Code` order by freq_voilation_code desc").show(5)

# we see top three Precinct has most violation code as 46 and 14. These are common in both type of precinct.

+---------------+--------------+-------------------+
|Issuer Precinct|Violation Code|freq_voilation_code|
+---------------+--------------+-------------------+
|             19|            46|              86390|
|             14|            14|              73837|
|              1|            14|              73522|
|             19|            37|              72437|
|             19|            38|              72344|
+---------------+--------------+-------------------+
only showing top 5 rows



### 5. Find out the properties of parking violations across different times of the day:
#### 5.1 Find a way to deal with missing values, if any.


In [33]:
#counting of null Violation Time
spark.sql("select count(*) as Null_Violation_Time from imputed_table where `Violation Time` is null").show()

+-------------------+
|Null_Violation_Time|
+-------------------+
|                 63|
+-------------------+



In [34]:
## Count non null rows before deleting them
spark.sql("select count(*) as Not_Null_Violation_Time from imputed_table where `Violation Time` is not null").show()

+-----------------------+
|Not_Null_Violation_Time|
+-----------------------+
|               10802965|
+-----------------------+



In [35]:
# removing the rows which are having null violation time
df_parking=df_state.dropna(subset=("Violation Time"))
df_parking.count()

## dataframe row count has reduced from 10803028 to 10802965
# 63 rows have been deleted

10802965

In [37]:
df_parking.createOrReplaceTempView("cleansed_time")

#### 5.2 The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

In [38]:
#checking the current time format
spark.sql("select `Violation Time` from cleansed_time ").show(10)

+--------------+
|Violation Time|
+--------------+
|         0143A|
|         0400P|
|         0233P|
|         1120A|
|         0555P|
|         0852P|
|         0215A|
|         0758A|
|         1005A|
|         0845A|
+--------------+
only showing top 10 rows



In [39]:
#converting the time format
#Assuming the time format is first 2 character as hour, next 2 char is Minutes , next 1 char is A or P 
# A stands for AM
# P stands for PM
# hour is between 1-12
# minute is in 0-59
# # all other records which are not compliant with above assumptions will be filterred out

spark.sql("select `Violation Time`,from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH:mm') \
as converted_time from cleansed_time ").show(10)

+--------------+--------------+
|Violation Time|converted_time|
+--------------+--------------+
|         0143A|         01:43|
|         0400P|         16:00|
|         0233P|         14:33|
|         1120A|         11:20|
|         0555P|         17:55|
|         0852P|         20:52|
|         0215A|         02:15|
|         0758A|         07:58|
|         1005A|         10:05|
|         0845A|         08:45|
+--------------+--------------+
only showing top 10 rows



In [40]:
# After conversion we found that some Violation Time can not be converted into proper format because of invalid input format.
# for example- 8510P - is invalid time so convertion of this type of data gives null values. we discard such value. 
# checking if we have enough records after discarding invalid values.

spark.sql("select count(*) from (select `Violation Time`, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH:mm') \
as converted_time from cleansed_time) t where t.converted_time is not null ").show()

#10.7 million records seems enough for analysis


+--------+
|count(1)|
+--------+
|10744190|
+--------+



#### 5.3 Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.

In [41]:
#checking only hours
spark.sql("select `Violation Time`, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time").show(10)

+--------------+--------------+
|Violation Time|converted_time|
+--------------+--------------+
|         0143A|            01|
|         0400P|            16|
|         0233P|            14|
|         1120A|            11|
|         0555P|            17|
|         0852P|            20|
|         0215A|            02|
|         0758A|            07|
|         1005A|            10|
|         0845A|            08|
+--------------+--------------+
only showing top 10 rows



In [42]:
# dividing the records into bins
#Assumotion- we divided the time in six bins (1,2,3,4,5,6). Below is the time slot information
# 0-4 ->1
# 4-8 ->2
# 8-12 ->3
# 12-16 ->4
# 16-20 ->5
# 20-24 ->6

spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(50)

# for each of the hour_bin, we can get the most frequent violations by looking into the below output
# Alternate way- I will apply filter on the hour_bin for each bin and execute the above query six time

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       3|            21|             1182676|
|       3|            36|              751422|
|       4|            36|              588395|
|       4|            38|              462756|
|       3|            38|              346518|
|       4|            37|              337074|
|       3|            14|              274284|
|       4|            14|              256302|
|       4|            46|              229325|
|       4|            20|              219182|
|       3|            46|              213694|
|       5|            38|              203232|
|       4|            71|              201379|
|       3|            71|              192307|
|       3|            20|              175688|
|       4|            21|              148007|
|       5|            37|              145784|
|       5|            14|              144748|
|       4|   

In [44]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=1
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=1  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       1|            21|               53600|
|       1|            40|               44737|
|       1|            78|               28716|
+--------+--------------+--------------------+
only showing top 3 rows



In [46]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=2
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=2  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       2|            14|              141275|
|       2|            21|              119466|
|       2|            40|              112186|
+--------+--------------+--------------------+
only showing top 3 rows



In [48]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=3
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=3  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       3|            21|             1182676|
|       3|            36|              751422|
|       3|            38|              346518|
+--------+--------------+--------------------+
only showing top 3 rows



In [49]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=4
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=4  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       4|            36|              588395|
|       4|            38|              462756|
|       4|            37|              337074|
+--------+--------------+--------------------+
only showing top 3 rows



In [50]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=5
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=5  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       5|            38|              203232|
|       5|            37|              145784|
|       5|            14|              144748|
+--------+--------------+--------------------+
only showing top 3 rows



In [51]:
# Alternate Answer - finding three most commonly occurring violations for hour_bin=6
spark.sql("select t2.hour_bin,t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.hour_bin=6  group by t2.hour_bin,t2.`Violation Code` order by count_violation_code desc").show(3)

+--------+--------------+--------------------+
|hour_bin|Violation Code|count_violation_code|
+--------+--------------+--------------------+
|       6|             7|               65593|
|       6|            38|               47029|
|       6|            14|               44778|
+--------+--------------+--------------------+
only showing top 3 rows



#### 5.4 Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [52]:
#finding 3 most occuring violaiton codes
spark.sql("select t2.`Violation Code`, count(t2.`Violation Code`) as count_violation_code from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then 1 \
          when (converted_time >=4 and converted_time <8) then 2 \
          when (converted_time >=8 and converted_time <12) then 3 \
          when (converted_time >=12 and converted_time <16) then 4 \
          when (converted_time >=16 and converted_time <20) then 5 \
          when (converted_time >=20 and converted_time <24) then 6 \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
group by t2.`Violation Code` order by count_violation_code desc").show(5)

# so 3 most occuring violation codes are 21,36,38

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            21|             1504663|
|            36|             1400614|
|            38|             1062287|
|            14|              885469|
|            20|              614453|
+--------------+--------------------+
only showing top 5 rows



In [53]:
#finding the most common time of the day when most violation is occuring

spark.sql("select t2.`hour_bin`, count(t2.`hour_bin`) as count_hour_bin from \
          (select *,\
          case \
          when (converted_time >=0 and converted_time <4) then '1' \
          when (converted_time >=4 and converted_time <8) then '2' \
          when (converted_time >=8 and converted_time <12) then '3' \
          when (converted_time >=12 and converted_time <16) then '4' \
          when (converted_time >=16 and converted_time <20) then '5' \
          when (converted_time >=20 and converted_time <24) then '6' \
          else null \
          end as hour_bin \
          from \
          (select *, \
from_unixtime(unix_timestamp(concat(`Violation Time`,'M'),'hhmmaa'),'HH') \
as converted_time from cleansed_time ) t where t.converted_time is not null) t2 \
where t2.`Violation Code` in (21,36,38)  group by t2.`hour_bin` order by count_hour_bin desc").show(6)
# most common time is 3rd slot, that is morning 8-12 most violations are happening.

+--------+--------------+
|hour_bin|count_hour_bin|
+--------+--------------+
|       3|       2280616|
|       4|       1199158|
|       5|        230641|
|       2|        155705|
|       1|         54052|
|       6|         47392|
+--------+--------------+



#### 6.1. First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season.

In [54]:
#Assumptios
# there will be four seasons in a year
#1. Spring - runs from March 1 to May 31;
#2. Summer - runs from June 1 to August 31;
#3. Autumn - runs from September 1 to November 30
#4. Winter - runs from December 1 to February 28 (February 29 in a leap year).
# we will negelect the time to calculate season. we will only consider day and month to calculate season
# Issue data is in yyyy-MM-dd format

spark.sql("select `Issue Date`, from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd')) from imputed_table where `Issue Date` is null ").show(5)

spark.sql("select `Issue Date`, from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd')) from  imputed_table  ").show(5)

# we can see there is no null dates and spark has inferred the `Issue Date` as date correctly


+----------+--------------------------------------------------------------------------+
|Issue Date|from_unixtime(unix_timestamp(Issue Date, yyyy-MM-dd), yyyy-MM-dd HH:mm:ss)|
+----------+--------------------------------------------------------------------------+
+----------+--------------------------------------------------------------------------+

+-------------------+--------------------------------------------------------------------------+
|         Issue Date|from_unixtime(unix_timestamp(Issue Date, yyyy-MM-dd), yyyy-MM-dd HH:mm:ss)|
+-------------------+--------------------------------------------------------------------------+
|2016-07-10 00:00:00|                                                       2016-07-10 00:00:00|
|2016-07-08 00:00:00|                                                       2016-07-08 00:00:00|
|2016-08-23 00:00:00|                                                       2016-08-23 00:00:00|
|2017-06-14 00:00:00|                                            

In [55]:
#finding ticket count for each season
spark.sql(" select t2.season,count(t2.season) as ticket_count_per_season from \
    (select *,\
    case \
    when (t1.month in (3,4,5)) then 'Spring' \
    when (t1.month in (6,7,8)) then 'Summer' \
    when (t1.month in (9,10,11)) then 'Autumn' \
    when (t1.month in (12,1,2)) then 'Winter' \
    else null \
    end as season \
    from \
(select *, month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table) t1 ) t2 group by season order by ticket_count_per_season desc").show(5)

+------+-----------------------+
|season|ticket_count_per_season|
+------+-----------------------+
|Spring|                2880687|
|Autumn|                2830802|
|Summer|                2606208|
|Winter|                2485331|
+------+-----------------------+



In [56]:
# Verification step to use season count as count the tickets
spark.sql("select month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table  where `Issue Date` is null ").show(5)
# we check that there is no null for month so we can count season (which is derived from month) as no of ticketes

+-----+
|month|
+-----+
+-----+



#### 6.2 Then, find the three most common violations for each of these seasons

In [57]:
#three most common violations for Spring
spark.sql(" select t2.`Violation Code`,count(t2.`Violation Code`) as count_violation_code from \
    (select *,\
    case \
    when (t1.month in (3,4,5)) then 'Spring' \
    when (t1.month in (6,7,8)) then 'Summer' \
    when (t1.month in (9,10,11)) then 'Autumn' \
    when (t1.month in (12,1,2)) then 'Winter' \
    else null \
    end as season \
    from \
(select *, month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table) t1 ) t2 where season='Spring' group by `Violation Code` \
order by count_violation_code desc").show(3)
#Most common violation for spring is 21,36,38

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            21|              402807|
|            36|              344834|
|            38|              271192|
+--------------+--------------------+
only showing top 3 rows



In [58]:
#three most common violations for Autumn
spark.sql(" select t2.`Violation Code`,count(t2.`Violation Code`) as count_violation_code from \
    (select *,\
    case \
    when (t1.month in (3,4,5)) then 'Spring' \
    when (t1.month in (6,7,8)) then 'Summer' \
    when (t1.month in (9,10,11)) then 'Autumn' \
    when (t1.month in (12,1,2)) then 'Winter' \
    else null \
    end as season \
    from \
(select *, month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table) t1 ) t2 where season='Autumn' group by `Violation Code` \
order by count_violation_code desc").show(3)
#Most common violation for Autumn is 36,21,38

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            36|              456046|
|            21|              357479|
|            38|              283828|
+--------------+--------------------+
only showing top 3 rows



In [59]:
#three most common violations for Summer
spark.sql(" select t2.`Violation Code`,count(t2.`Violation Code`) as count_violation_code from \
    (select *,\
    case \
    when (t1.month in (3,4,5)) then 'Spring' \
    when (t1.month in (6,7,8)) then 'Summer' \
    when (t1.month in (9,10,11)) then 'Autumn' \
    when (t1.month in (12,1,2)) then 'Winter' \
    else null \
    end as season \
    from \
(select *, month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table) t1 ) t2 where season='Summer' group by `Violation Code` \
order by count_violation_code desc").show(3)
#Most common violation for Summer is 21,38,36

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            21|              405961|
|            38|              247561|
|            36|              240396|
+--------------+--------------------+
only showing top 3 rows



In [52]:
#three most common violations for Winter
spark.sql(" select t2.`Violation Code`,count(t2.`Violation Code`) as count_violation_code from \
    (select *,\
    case \
    when (t1.month in (3,4,5)) then 'Spring' \
    when (t1.month in (6,7,8)) then 'Summer' \
    when (t1.month in (9,10,11)) then 'Autumn' \
    when (t1.month in (12,1,2)) then 'Winter' \
    else null \
    end as season \
    from \
(select *, month(from_unixtime(unix_timestamp(`Issue Date`,'yyyy-MM-dd'))) as month \
from  imputed_table) t1 ) t2 where season='Winter' group by `Violation Code` \
order by count_violation_code desc").show(3)
#Most common violation for spring is 21,36,38

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            21|              362341|
|            36|              359338|
|            38|              259723|
+--------------+--------------------+
only showing top 3 rows



### 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
   #### 7.1 Find the total occurrences of the three most common violation codes.

In [60]:
# finding three most common vioaltions

spark.sql("select `Violation Code`,count(`Violation Code`) as count_violation_code \
from  imputed_table  group by `Violation Code` \
order by count_violation_code desc").show(5)

# so three most common violation codes are - 21, 36, 38

+--------------+--------------------+
|Violation Code|count_violation_code|
+--------------+--------------------+
|            21|             1528588|
|            36|             1400614|
|            38|             1062304|
|            14|              893498|
|            20|              618593|
+--------------+--------------------+
only showing top 5 rows



In [61]:
# As per the question info - Charges given for top 3 violation codes are as follows
#code 21 - charges((65+45)/2=55)
#code 36 - charges((50+50)/2=50)
#code 38 - charges((65+35)/2=50)

spark.sql("select t.`Violation Code`,t.count_violation_code*t.charges as Revenue from \
( select `Violation Code`,count(`Violation Code`) as count_violation_code, \
case \
when (`Violation Code`=21) then 55 \
when (`Violation Code`=36) then 50 \
when (`Violation Code`=38) then 50 \
else null \
end as charges \
from  imputed_table  group by `Violation Code` \
order by count_violation_code desc) t").show(3)

# So we see that Violaton code 21 has highest total collection which is 84072340 dollars.

+--------------+--------+
|Violation Code| Revenue|
+--------------+--------+
|            21|84072340|
|            36|70030700|
|            38|53115200|
+--------------+--------+
only showing top 3 rows



In [62]:
#Stooping the spark session to release the resources
spark.stop()