In [87]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC Parking").getOrCreate()

In [88]:
spark

In [89]:
NYC = spark.read.format("csv").option("header", "true").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
NYC

DataFrame[Summons Number: string, Plate ID: string, Registration State: string, Issue Date: string, Violation Code: string, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: string, Issuer Precinct: string, Violation Time: string]

In [90]:
NYC.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

In [91]:
# summary statistics
NYC.describe().show()

+-------+-------------------+--------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|     Summons Number|Plate ID|Registration State|Issue Date|    Violation Code| Vehicle Body Type|      Vehicle Make|Violation Precinct|  Issuer Precinct|   Violation Time|
+-------+-------------------+--------+------------------+----------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|           10803028|10803028|          10803028|  10803028|          10803028|          10803028|          10803028|          10803028|         10803028|         10803028|
|   mean| 6.81744702906579E9|Infinity|              99.0|      null|34.599430455979565|3.9258887134586864| 6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev|2.320233962328227E9|     NaN|               0.0|      null|19.359868716323483|0.501341

In [92]:
# datatype of columns
NYC.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [93]:
# Number of Rows
NYC.count()

10803028

In [94]:
# Number of Columns
len(NYC.columns)

10

In [95]:
# Drop duplicate values
NYC = NYC.dropDuplicates()
NYC.count()

10803028

In [96]:
# If Any, drop null values
NYC = NYC.dropna()
NYC.count()

10803028

In [97]:
# Total Number of summons
NYC.select('Summons Number').distinct().count()

10803028

In [98]:
# replace " " with "_" in column names

NYC = NYC.toDF(*(c.replace(' ','_') for c in NYC.columns))
NYC.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5093739601|   9L43B|                NY|2016-10-03|             7|             TAXI|       NISSA|                 0|              0|         0100A|
|    4632064841| GZJ3137|                NY|2017-03-24|            36|             4DSD|       DODGE|                 0|              0|         0100P|
|    4007108080|  S78CKS|                NJ|2017-03-29|             5|             4 DR|       NISSA|                 0|              0|         0100P|
|    4632094092|  V43GPX|                NJ|2017-03-24|            36|             4 DR|

In [99]:
# Create temporary table view
NYC.createOrReplaceTempView("NYC_parkingTable")

In [100]:
spark.sql('Select * from NYC_parkingTable')

DataFrame[Summons_Number: string, Plate_ID: string, Registration_State: string, Issue_Date: string, Violation_Code: string, Vehicle_Body_Type: string, Vehicle_Make: string, Violation_Precinct: string, Issuer_Precinct: string, Violation_Time: string]

In [None]:
spark.sql('Select * from NYC_parkingTable').show(5)

In [None]:
# Total Number of tickets for each year
year_wise_data =  spark.sql('Select Year(Issue_date) as year, count(Summons_Number) as Number_Of_Tickets from NYC_parkingTable group by year order by year desc')
year_wise_data.show(50)

### Summary : 
    - Data is from 1972 to 2069.
    - It is concentrated around 2016 & 2017.
    - We will consider data for 2017 only.

In [None]:
year_wise_data.count()

### We need data for 2017 Only. 
Filter the data for 2017 only

In [None]:
# Filtering data for 2017
NYC = spark.sql("select * from NYC_parkingTable where Year(Issue_Date) = 2017 ")
NYC.count()

In [None]:
NYC.show(5)

In [None]:
#Creating new view for 2017 data only
NYC.createOrReplaceTempView("NYC_2017")

#Showing distribution 
Distribution_on_years= spark.sql("""
                                 SELECT year(Issue_Date) as year,month(Issue_Date) as month,count(*) as Ticket_Frequency
                                 FROM NYC_2017 GROUP BY year(Issue_Date),month(Issue_Date) order by Ticket_Frequency desc
                                 """)
Distribution_on_years.show()

#### Maximum number of violations are in the month of May. It has been observed that from July to December, there is a significant drop in number of violations.

In [None]:
Number_of_Violations_by_month = Distribution_on_years.toPandas()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
plt.clf()
Number_of_Violations_by_month.plot(x= 'month', y='Ticket_Frequency', kind='bar')
plt.title("Violations on the basis of month in 2017")
plt.xlabel('month')
plt.ylabel('Ticket_Frequency')
plt.show()

In [None]:
Checking_null_values=spark.sql("""Select count(*) as Number_of_Null_Values from NYC_2017 
                                  where Summons_Number is NULL or Plate_ID is NULL 
                                  or Registration_State is NULL or Issue_Date is NULL 
                                  or Violation_Code is NULL 
                                  or Vehicle_Body_Type is NULL 
                                  or Vehicle_Make is NULL 
                                  or Violation_Precinct is NULL 
                                  or Issuer_Precinct is NUll 
                                  or Violation_Time is NULL """)
Checking_null_values.show()

**There is no field with null value.**

In [None]:
#Checking on Plate_ID field to know if there are cases with same plate id.

Plate_Id_Check=spark.sql("""Select Plate_ID, count(*) as Ticket_Frequency 
                            from NYC_2017 group by Plate_ID having count(*)>1 order by Ticket_Frequency desc""")
Plate_Id_Check.show()

**There is one value'BLANKPLATE' which we cannot track. Therefore, we can remove this.**

In [None]:
NYC=NYC[NYC.Plate_ID!='BLANKPLATE']
NYC.count()

In [None]:
# Update the view
NYC.createOrReplaceTempView("NYC_2017")

In [None]:
# Lets see number of violations above 500
Plate_Id_Above_500=spark.sql("""Select Plate_ID, count(*) as Ticket_Frequency from NYC_2017 
                                group by Plate_ID having count(*)>=500 order by Ticket_Frequency desc""")
Plate_Id_Above_500.show()

In [None]:
# Plot number of violation above 500
Number_of_Violations_By_PlateID=Plate_Id_Above_500.toPandas()
plt.clf()
Number_of_Violations_By_PlateID.plot(x= 'Plate_ID', y='Ticket_Frequency', kind='bar')
plt.title("Number of Violations above 500 ")
plt.xlabel('Plate_ID')
plt.ylabel('Ticket_Frequency')
plt.show()

**There are 7 Plate ID with more than 500 violations.**

###  Questions to Be Answered in the Analysis

The following analysis should be performed on PySpark mounted on your CoreStack cluster, using the PySpark library. Remember that you need to summarise the analysis with your insights along with the code

### Examine the data

#### Q1. Find the total number of tickets for the year

In [None]:
spark.sql('select count(*) as Number_Of_Tickets_for_Year from NYC_2017').show()

**Q2. Find out the number of unique states from where the cars that got parking tickets came.**

In [None]:
q2 = spark.sql("""
          SELECT Registration_state, count(*) as Number_of_records 
          FROM NYC_2017
          group by Registration_state
          order by Number_of_records
          """)
q2.count()

In [None]:
q2.show(65)

- There is a state which contains "99" as value. We will replace this with the state which has maximum number of tickets.
- NY is the state with Max number of tickets.
- Replacing "99" with "NY".

In [None]:
from pyspark.sql.functions import when,lit
NYC=NYC.withColumn('Registration_State',when(NYC["Registration_State"]=="99",lit('NY')).otherwise(NYC["Registration_State"]))

In [None]:
## Update the NYC_2017 Table
NYC.createOrReplaceTempView("NYC_2017")

In [None]:
q2 = spark.sql("""
          SELECT Registration_state, count(*) as Number_of_records 
          FROM NYC_2017
          group by Registration_state
          order by Number_of_records
          """)
q2.count()

The unique registration states are now 64. "99" has been replaced with "NY" 

### Aggregation tasks

#### Q1. How often does each violation code occur? Display the frequency of the top five violation codes

In [None]:
## Violation code count
from pyspark.sql.functions import count,desc,countDistinct
NYC.select(countDistinct("Violation_Code")).show()

In [None]:
## Frequency of occuring each violation code
spark.sql("""
          SELECT Violation_code, count(*) as Violation_code_count
          from NYC_2017
          group by Violation_code
          order by Violation_code_count desc
          """).show(100)

In [None]:
## Frequency top 5 violations
spark.sql("""
          SELECT Violation_code, count(*) as Violation_code_count
          from NYC_2017
          group by Violation_code
          order by Violation_code_count desc
          """).show(5)

**Q2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'?**

In [None]:
# Each vehicle body type get a parking ticket
vehicleBodyType = spark.sql("SELECT Vehicle_Body_Type, count(*) as Ticket_Frequency from NYC_2017 group by Vehicle_Body_Type order by Ticket_Frequency desc")
vehicleBodyType.show(5)

In [None]:
# Vehicle Make

vehicleMake = spark.sql("SELECT Vehicle_Make, count(*) as Ticket_Frequency from NYC_2017 group by Vehicle_Make order by Ticket_Frequency desc")
vehicleMake.show(5)

**Q3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:**

**1.'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?**

In [None]:
spark.sql("""SELECT Violation_Precinct, count(*) as Ticket_Frequency 
            from NYC_2017 group by Violation_Precinct order by Ticket_Frequency desc""").show(6)

Here, you would have noticed that the dataframe has the'Violating Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts.

**2 'Issuer Precinct' (this is the precinct that issued the ticket)**

In [None]:
spark.sql("""SELECT Issuer_Precinct, count(*) as Ticket_Frequency 
            from NYC_2017 group by Issuer_Precinct order by Ticket_Frequency desc""").show(6)

**Here, you would have noticed that the dataframe has the 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)**

- So the top 5 area where most violation occurs are 19, 14, 1, 18 and 114.
- Similarily, the top 5 Issuer Precient are 19, 14, 1, 18 and 114

**Q4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? **

**4.1 Finding violation code frequency**

In [None]:

violation_code_freq = spark.sql("""select Issuer_Precinct,Violation_Code, count(*) as Frequency 
                                    from NYC_2017 group by Issuer_Precinct, Violation_Code order by Frequency desc""" )
violation_code_freq.show(7)

We are not considering 0. Therefore 18,19,14 are the three issuer precincts with maximum number of violations.

In [None]:
# Lets dive into the Issuer Precinct one by one
# Issuer Precinct 18 here
violation_code_freq_18 = spark.sql("""select Violation_Code, count(*) as Frequency 
                                    from NYC_2017 where Issuer_Precinct=18 group by Violation_Code order by Frequency desc""" )
violation_code_freq_18.show(10)

In [None]:
# Issuer Precinct 19 here
violation_code_freq_19 = spark.sql("select Violation_Code, count(*) as Frequency from NYC_2017 where Issuer_Precinct=19 group by Violation_Code order by Frequency desc" )
violation_code_freq_19.show(10)

In [None]:
# Issuer Precinct 14 here
violation_code_freq_14 = spark.sql("select Violation_Code, count(*) as Frequency from NYC_2017 where Issuer_Precinct=14 group by Violation_Code order by Frequency desc" )
violation_code_freq_14.show(10)

**4.2 Common codes across precincts**

In [None]:
common_codes =spark.sql("select Violation_Code, count(*) as Frequency from NYC_2017 where Issuer_Precinct in (18,19,14) group by Violation_Code order by Frequency desc")
common_codes.show(5)

### Summary:
- Precinct 18 and Precinct 14 has more less similar top violation code.
- But Precinct 19 has very different top violation code.

#### Q5.Find out the properties of parking violations across different times of the day:
- Find a way to deal with missing values, if any. (Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

- The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

- Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations. (Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

- Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [None]:
# Number of missing values
spark.sql("SELECT count(*) as No_of_Count_Values from NYC_2017 WHERE Violation_Time is NULL").show()

In [None]:
#Checking for the null value
from pyspark.sql.functions import col
NYC.where(col("Violation_Time").isNull()).show()

In [None]:

# Divide 24 hours into six equal discrete bins of time.
bins=spark.sql("""SELECT Summons_Number, Violation_Code , Violation_Time, Issuer_Precinct, 
                  case when substring(Violation_Time,1,2) in ('00','01','02','03','12') 
                  and upper(substring(Violation_Time,-1))='A' then 1 
                  when substring(Violation_Time,1,2) in ('04','05','06','07') 
                  and upper(substring(Violation_Time,-1))='A' then 2 
                  when substring(Violation_Time,1,2) in ('08','09','10','11') 
                  and upper(substring(Violation_Time,-1))='A' then 3 
                  when substring(Violation_Time,1,2) in ('12','00','01','02','03') 
                  and upper(substring(Violation_Time,-1))='P' then 4 
                  when substring(Violation_Time,1,2) in ('04','05','06','07') 
                  and upper(substring(Violation_Time,-1))='P' then 5 
                  when substring(Violation_Time,1,2) in ('08','09','10','11') 
                  and upper(substring(Violation_Time,-1))='P' then 6 
                  else null end as Violation_Time_bin from NYC_2017 
                  where Violation_Time is not null 
                  or (length(Violation_Time)=5 and upper(substring(Violation_Time,-1)) in ('A','P') 
                  and substring(Violation_Time,1,2) in ('00','01','02','03','04','05','06','07', '08','09','10','11','12'))""")
bins.show()

### Bins Details

Bin       Time Interval
- 1         12:00 AM to 4:00 AM
- 2         4:00 AM to 8:00 AM
- 3         8:00 AM to 12:00 PM
- 4         12:00 PM to 4:00 PM
- 5         4:00 PM to 8:00 PM
- 6         8:00 PM to 12:00 AM

In [None]:
bins.createOrReplaceTempView("NYC_bins")

In [None]:
# violation code time count
violation_code_time_count = spark.sql("SELECT Violation_Code,Violation_Time_bin, count(*) count from NYC_bins group by Violation_Code,Violation_Time_bin")
violation_code_time_count.show()

In [None]:
bin1 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 1 group by Violation_Code order by Vio_cnt desc")
bin1.show(3)

In [None]:
bin2 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 2 group by Violation_Code order by Vio_cnt desc")
bin2.show(3)

In [None]:
bin3 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 3 group by Violation_Code order by Vio_cnt desc")
bin3.show(3)

In [None]:
bin4 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 4 group by Violation_Code order by Vio_cnt desc")
bin4.show(3)

In [None]:
bin5 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 5 group by Violation_Code order by Vio_cnt desc")
bin5.show(3)

In [None]:
bin6 = spark.sql("select Violation_Code,count(*) Vio_cnt from NYC_bins where Violation_Time_bin == 6 group by Violation_Code order by Vio_cnt desc")
bin6.show(3)

Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [None]:
time_bin = spark.sql("select Violation_Time_bin, count(*) Vio_count from NYC_bins where Violation_Code in (21, 36, 38) group by Violation_Time_bin order by Vio_count desc")
time_bin.show(3)

#### Bins 3, 4, 5 are having most violations

The obvious reason could be, In day time significantly more vehicles were running and hence more violations.

**Q6.Let’s try and find some seasonality in this data:**

**a)First, divide the year into some number of seasons,and find frequencies of tickets for each season.**

In [None]:
tickets_seasonality = spark.sql("""
                            select Violation_Code , Issuer_Precinct, 
                            case when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 03 and 05 then 'spring' 
                            when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 06 and 08 
                            then 'summer' 
                            when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 09 and 11 
                            then 'autumn' 
                            when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) in (1,2,12) then 'winter' 
                            else 'unknown' end  as season from NYC_2017""")
tickets_seasonality.show()

**Season Binning Details**
- Season :    Month interval

- spring    :March, April, May
- summer    :June, July, August
- autumn    :September, October, November
- winter    :December, January, February

In [None]:
tickets_seasonality.createOrReplaceTempView("NYC_tickets_seasonality")

In [None]:
tickets_seasonality_freq = spark.sql("select season, count(*) as no_of_tickets from NYC_tickets_seasonality group by 1 order by 2 desc")
tickets_seasonality_freq.show()

In [None]:
# Spring season
violation_spring = spark.sql("select Violation_Code, count(*) as Frequency from NYC_tickets_seasonality where Issuer_Precinct in (19, 14, 1) and season = 'spring' group by Violation_Code order by Frequency desc" )
violation_spring.show(3)

In [None]:
# Winter season
violation_winter = spark.sql("select Violation_Code, count(*) as Frequency from NYC_tickets_seasonality where Issuer_Precinct in (19, 14, 1) and season = 'winter' group by Violation_Code order by Frequency desc" )
violation_winter.show(3)

In [None]:
# Summer season
violation_summer = spark.sql("select Violation_Code, count(*) as Frequency from NYC_tickets_seasonality where Issuer_Precinct in (19, 14, 1) and season = 'summer' group by Violation_Code order by Frequency desc" )
violation_summer.show(3)

In [None]:
# Autumn season
violation_autumn = spark.sql("select Violation_Code, count(*) as Frequency from NYC_tickets_seasonality where Issuer_Precinct in (19, 14, 1) and season = 'autumn' group by Violation_Code order by Frequency desc" )
violation_autumn.show(3)

**Q7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:**

**a). Find total occurrences of the three most common violation codes**

In [None]:
common_Violation = spark.sql("select Violation_Code, count(*) as Frequency from NYC_2017 group by Violation_Code order by Frequency desc")
common_Violation.show(3)

**b). Using this information, find the total amount collected for the three violation codes with maximum tickets. State the code which has the highest total collection.**

In [None]:
from pyspark.sql.functions import when

common_Violation_fine=common_Violation.withColumn("fine",when(common_Violation.Violation_Code == 21, (common_Violation.Frequency) *55).otherwise((common_Violation.Frequency)*50))
common_Violation_fine.show(3)
print('Total collection = ',767740*55+662765*50+541526*50)

code with 21 had the highest collection


c).What can you intuitively infer from these findings?
- Jan to June had the major violation & July to Dec has a drastic drop.
- Highest violation &collection was by Code-21(No parking where parking is not allowed by sign, street marking or traffic control device.)

In [None]:
spark.stop()