In [1]:
#Creating Spark Session Driver Program
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(name= "NycTicket_App").getOrCreate()

In [2]:
#Loading Data in dataframe
df = spark.read.format("csv").option(
                "inferSchema","True").option(
                "header","True").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



### Data Cleaning

In [3]:
#filter dataframe to get only 2017 finacial year data as described in Problem statement
from pyspark.sql.functions import year

df = df.filter(year("Issue Date") == 2017)


In [44]:
# Count null columns
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+



As you see there's no null value across all columns, we can proceed with Data Analysis now.

### Examine the data

In [45]:
#1.	Find the total number of tickets for the year.
from pyspark.sql.functions import countDistinct
df.agg(countDistinct("Summons Number")).show()

+------------------------------+
|count(DISTINCT Summons Number)|
+------------------------------+
|                       5431918|
+------------------------------+



In [46]:
#2.	Find out the number of unique states from where the cars that got parking tickets came.
from pyspark.sql.functions import desc,col
df.groupBy("Registration State").agg(
   count("Registration State").alias("Total_Tickets_Frequency")
    ).sort(desc("Total_Tickets_Frequency")).show(5)

+------------------+-----------------------+
|Registration State|Total_Tickets_Frequency|
+------------------+-----------------------+
|                NY|                4273951|
|                NJ|                 475825|
|                PA|                 140286|
|                CT|                  70403|
|                FL|                  69468|
+------------------+-----------------------+
only showing top 5 rows



In [47]:
#There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries.
# it is NY in our case.
from pyspark.sql.functions import when

df = df.withColumn("Registration State",when(df["Registration State"] == "99","NY").otherwise(df["Registration State"]))

In [48]:
#Recalculate number of unique states from where the cars that got parking tickets came.
df.groupBy("Registration State").agg(
   count("Registration State").alias("Total_Tickets_Frequency")
    ).sort(desc("Total_Tickets_Frequency")).show(5)

+------------------+-----------------------+
|Registration State|Total_Tickets_Frequency|
+------------------+-----------------------+
|                NY|                4290006|
|                NJ|                 475825|
|                PA|                 140286|
|                CT|                  70403|
|                FL|                  69468|
+------------------+-----------------------+
only showing top 5 rows



### Aggregation tasks

1.	How often does each violation code occur? Display the frequency of the top five violation codes.

In [56]:
df.groupBy("Violation Code").agg(
    count("Violation Code").alias("ViolationCode_Frequency")
    ).sort(desc("ViolationCode_Frequency")).show(5)

+--------------+-----------------------+
|Violation Code|ViolationCode_Frequency|
+--------------+-----------------------+
|            21|                 768087|
|            36|                 662765|
|            38|                 542079|
|            14|                 476664|
|            20|                 319646|
+--------------+-----------------------+
only showing top 5 rows



2.	How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [57]:
#2.	How often does each 'vehicle body type' get a parking ticket?
#(Hint: Find the top 5 for both.)
df.groupBy("Vehicle Body Type").agg(
    count("Vehicle Body Type").alias("VehicleBody_Frequency")
    ).sort(desc("VehicleBody_Frequency")).show(5)

#How about the 'vehicle make'? 
df.groupBy("Vehicle Make").agg(
    count("Vehicle Make").alias("VehicleMake_Frequency")
    ).sort(desc("VehicleMake_Frequency")).show(5)

+-----------------+---------------------+
|Vehicle Body Type|VehicleBody_Frequency|
+-----------------+---------------------+
|             SUBN|              1883954|
|             4DSD|              1547312|
|              VAN|               724029|
|             DELV|               358984|
|              SDN|               194197|
+-----------------+---------------------+
only showing top 5 rows

+------------+---------------------+
|Vehicle Make|VehicleMake_Frequency|
+------------+---------------------+
|        FORD|               636844|
|       TOYOT|               605291|
|       HONDA|               538884|
|       NISSA|               462017|
|       CHEVR|               356032|
+------------+---------------------+
only showing top 5 rows



3.	A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:

i)'Violation Precinct' (This is the precinct of the zone where the violation occurred).Using this, can you draw any insights for parking violations in any specific area of the city?

In [60]:
#1.	'Violation Precinct' (This is the precinct of the zone where the violation occurred).
# Using this, can you draw any insights for parking violations in any specific area of the city?
df.filter(col("Issuer Precinct") != 0
        ).groupBy("Violation Precinct","Registration State").agg(
        count("Violation Precinct").alias("Violation_Frequency")
        ).sort(desc("Violation_Frequency")).show(5)

+------------------+------------------+-------------------+
|Violation Precinct|Registration State|Violation_Frequency|
+------------------+------------------+-------------------+
|                19|                NY|             211375|
|                14|                NY|             133169|
|               114|                NY|             122394|
|                 1|                NY|             120535|
|                18|                NY|             110121|
+------------------+------------------+-------------------+
only showing top 5 rows



 Violation Frequencies are high in 19 Precinct Area

ii)'Issuer Precinct' (This is the precinct that issued the ticket.)
Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'.
These are erroneous entries. Hence, you need to provide records for five correct precincts.

In [61]:
#2.	'Issuer Precinct' (This is the precinct that issued the ticket.)
#Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'.
#These are erroneous entries. Hence, you need to provide records for five correct precincts.

df.filter(col("Issuer Precinct") != 0
         ).groupBy("Issuer Precinct","Registration State").agg(
        count("Issuer Precinct").alias("Violation_Frequency")
        ).sort(desc("Violation_Frequency")).show(5)

+---------------+------------------+-------------------+
|Issuer Precinct|Registration State|Violation_Frequency|
+---------------+------------------+-------------------+
|             19|                NY|             206463|
|             14|                NY|             131462|
|            114|                NY|             121652|
|              1|                NY|             120077|
|             18|                NY|             106446|
+---------------+------------------+-------------------+
only showing top 5 rows



4.	Find the violation code frequency for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

In [53]:
df.filter(col("Violation Precinct") ==19 ).groupBy("Violation Code").agg(
    count("Violation Code").alias("ViolationCode_Frequency")
    ).sort(desc("ViolationCode_Frequency")).show(5)

df.filter(col("Violation Precinct") ==14 ).groupBy("Violation Code").agg(
    count("Violation Code").alias("ViolationCode_Frequency")
    ).sort(desc("ViolationCode_Frequency")).show(5)

df.filter(col("Violation Precinct") ==114 ).groupBy("Violation Code").agg(
    count("Violation Code").alias("ViolationCode_Frequency")
    ).sort(desc("ViolationCode_Frequency")).show(5)

+--------------+-----------------------+
|Violation Code|ViolationCode_Frequency|
+--------------+-----------------------+
|            46|                  50785|
|            38|                  37483|
|            37|                  36468|
|            14|                  30376|
|            21|                  29415|
+--------------+-----------------------+
only showing top 5 rows

+--------------+-----------------------+
|Violation Code|ViolationCode_Frequency|
+--------------+-----------------------+
|            14|                  45885|
|            69|                  30465|
|            31|                  22649|
|            47|                  18691|
|            42|                  10027|
+--------------+-----------------------+
only showing top 5 rows

+--------------+-----------------------+
|Violation Code|ViolationCode_Frequency|
+--------------+-----------------------+
|            21|                  35317|
|            38|                  27123|
|      

In [54]:
# Py SQL Method :
df.registerTempTable("violation_frequency_table")
temp_table = spark.sql(
    "select `Violation Code`,`Violation Precinct`, count(`Violation Code`) as violationCode_Frequency" 
    +" from violation_frequency_table GROUP BY  `Violation Code`, `Violation Precinct` order by violationCode_Frequency desc"
)
temp_table.where('`Violation Precinct`= 19 or `Violation Precinct`= 14 or `Violation Precinct`= 114').show(30)

+--------------+------------------+-----------------------+
|Violation Code|Violation Precinct|violationCode_Frequency|
+--------------+------------------+-----------------------+
|            46|                19|                  50785|
|            14|                14|                  45885|
|            38|                19|                  37483|
|            37|                19|                  36468|
|            21|               114|                  35317|
|            69|                14|                  30465|
|            14|                19|                  30376|
|            21|                19|                  29415|
|            38|               114|                  27123|
|            31|                14|                  22649|
|            47|                14|                  18691|
|            37|               114|                  18636|
|            20|                19|                  15132|
|            20|               114|     

precinct zones 19 & 14 have extreamly high frequencies for violation code 46 & 14. Violation Code 14 is common across precincts

### 5.	Find out the properties of parking violations across different times of the day:

i)	Find a way to deal with missing values, if any.
(Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)


In [62]:
#Drop records with null value
df = df.na.drop()
df.count()

5431918

ii)	The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups

In [63]:
df.registerTempTable("violation_Time_tbl")

formatted_timeTBL= spark.sql("Select `Violation Code`, Case"
+" When `Violation Time` like '%A' THEN" 
+" to_timestamp(REPLACE(CONCAT(SUBSTRING(`Violation Time`,1,2),':',SUBSTRING(`Violation Time`,3)),'A',' AM'),'h:mm a')"
+" When `Violation Time` like '%P' THEN"
+" to_timestamp(REPLACE(CONCAT(SUBSTRING(`Violation Time`,1,2),':',SUBSTRING(`Violation Time`,3)),'P',' PM'),'h:mm a')"
+" END AS formatted_Time FROM violation_Time_tbl")


In [64]:
# Data Cleaning to have only 24 hrs time
from pyspark.sql.functions import hour, col
formatted_timeTBL = formatted_timeTBL.filter(hour("formatted_Time")<24)
formatted_timeTBL.show(3)

+--------------+-------------------+
|Violation Code|     formatted_Time|
+--------------+-------------------+
|            47|1970-01-01 11:20:00|
|             7|1970-01-01 20:52:00|
|            40|1970-01-01 05:25:00|
+--------------+-------------------+
only showing top 3 rows



ii) Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.

In [65]:
formatted_timeTBL.registerTempTable("bucketing_table")

bucketing_table =  spark.sql("SELECT `Violation Code`, CASE"
     +" WHEN EXTRACT(HOUR FROM formatted_Time) BETWEEN 0 AND 6 THEN '0-6'"
     +" WHEN EXTRACT(HOUR FROM formatted_Time) BETWEEN 7 AND 12 THEN '7-12'"
     +" WHEN EXTRACT(HOUR FROM formatted_Time) BETWEEN 13 AND 18 THEN '13-18'"
     +" WHEN EXTRACT(HOUR FROM formatted_Time) BETWEEN 19 AND 24 THEN '19-24'"
     +" END AS time_Interval from bucketing_table")

In [66]:
from pyspark.sql.functions import count, desc
bucketing_table.groupBy("time_Interval","Violation Code").agg(
                count("Violation Code").alias("ViolationCode_Frequency")
                ).select(col("Violation Code"), col("time_Interval"),
                 col("ViolationCode_Frequency")
                ).sort(desc("ViolationCode_Frequency")).show(5)

+--------------+-------------+-----------------------+
|Violation Code|time_Interval|ViolationCode_Frequency|
+--------------+-------------+-----------------------+
|            21|         7-12|                 723254|
|            36|         7-12|                 464933|
|            38|        13-18|                 286253|
|            38|         7-12|                 233585|
|            14|         7-12|                 227728|
+--------------+-------------+-----------------------+
only showing top 5 rows



iii) Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [67]:
bucketing_table.registerTempTable("commontime_table")
commontime_table = spark.sql(
    "select `Violation Code`,time_Interval, count(time_Interval) as timeInterval_Frequency" 
    +" from commontime_table GROUP BY  time_Interval, `Violation Code` order by time_Interval desc"
)
commontime_table.where('`Violation Code`= 21 or `Violation Code`= 36 or `Violation Code`= 38').show(3)

+--------------+-------------+----------------------+
|Violation Code|time_Interval|timeInterval_Frequency|
+--------------+-------------+----------------------+
|            38|         7-12|                233585|
|            36|         7-12|                464933|
|            21|         7-12|                723254|
+--------------+-------------+----------------------+
only showing top 3 rows



### 6.	Let’s try and find some seasonality in this data:

i)	First, divide the year into a certain number of seasons, and find frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

In [68]:
df.registerTempTable('seasonality_table')

spark.sql("SELECT CASE"
    +" WHEN MONTH(`Issue Date`) in (3,4,5) THEN 'SPRING'"
    +" WHEN MONTH(`Issue Date`) in (6,7,8) THEN 'SUMMER'"
    +" ELSE 'WINTER'"
    +" END AS Season , COUNT(DISTINCT `Summons Number`) AS ticket_frequency"      
    +" FROM seasonality_table GROUP BY Season ORDER BY ticket_frequency DESC").show()

+------+----------------+
|Season|ticket_frequency|
+------+----------------+
|SPRING|         2873383|
|WINTER|         1705669|
|SUMMER|          852866|
+------+----------------+



ii) Find the three most common violations for each of these seasons.

In [69]:
spark.sql("SELECT CASE"
    +" WHEN MONTH(`Issue Date`) in (3,4,5) THEN 'SPRING'"
    +" WHEN MONTH(`Issue Date`) in (6,7,8) THEN 'SUMMER'"
    +" ELSE 'WINTER'"
    +" END AS Season ,`Violation Code`, COUNT(`Violation Code`) AS violation_frequency"      
    +" FROM seasonality_table GROUP BY Season,`Violation Code` ORDER BY violation_frequency DESC"
         ).show()

+------+--------------+-------------------+
|Season|Violation Code|violation_frequency|
+------+--------------+-------------------+
|SPRING|            21|             402424|
|SPRING|            36|             344834|
|SPRING|            38|             271167|
|SPRING|            14|             256397|
|WINTER|            21|             238311|
|WINTER|            36|             221268|
|WINTER|            38|             187394|
|SPRING|            46|             173440|
|SPRING|            20|             157122|
|SPRING|            37|             151049|
|SPRING|            40|             147408|
|WINTER|            14|             142356|
|SPRING|            71|             135351|
|SUMMER|            21|             127352|
|SPRING|             7|             122644|
|WINTER|            20|              98050|
|WINTER|            37|              97814|
|SUMMER|            36|              96663|
|WINTER|            46|              96222|
|WINTER|            40|         

The most common violation code is 21

#### 7. The fines collected from all instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating that for the three most commonly occurring codes.

7.1 Find the total occurrences of the three most common violation codes.

In [70]:
df.groupBy("Violation Code").agg(
    count("Violation Code").alias("ViolationCode_Frequency")
    ).sort(desc("ViolationCode_Frequency")).show(3)

+--------------+-----------------------+
|Violation Code|ViolationCode_Frequency|
+--------------+-----------------------+
|            21|                 768087|
|            36|                 662765|
|            38|                 542079|
+--------------+-----------------------+
only showing top 3 rows



7.2 visit the website:
http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.


7.3 Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.

In [71]:
from pyspark.sql.functions import sum
df.registerTempTable('total_Collection_table')

total_amountTBL=spark.sql("SELECT `Violation Code`,CASE"
    +" WHEN  `Violation Code` = 21 THEN 86.5"
    +" WHEN  `Violation Code` = 36 THEN 50"
    +" WHEN  `Violation Code` = 38 THEN 82.5"
    +" END AS ViolationAmount, COUNT(`Violation Code`) as ViolationCode_Frequency"
    +" FROM total_Collection_table"
    +" GROUP BY `Violation Code` ORDER BY ViolationCode_Frequency DESC LIMIT 3"
)

total_amountTBL.select(
    "Violation Code","ViolationAmount","ViolationCode_Frequency",
    (col("ViolationAmount") * col("ViolationCode_Frequency")).alias("TotalAmountCollected")
    ).show()


+--------------+---------------+-----------------------+--------------------+
|Violation Code|ViolationAmount|ViolationCode_Frequency|TotalAmountCollected|
+--------------+---------------+-----------------------+--------------------+
|            21|           86.5|                 768087|          66439525.5|
|            36|           50.0|                 662765|          33138250.0|
|            38|           82.5|                 542079|          44721517.5|
+--------------+---------------+-----------------------+--------------------+



Violation Code 21 has maximum collection of 66439525.5

In [5]:
spark.stop()