# Spark Case Study
Citizens of New York City, a thriving metropolis face many challenges, one of the biggest being parking. They are frequently served parking tickets due to high number of cars and limited parking spaces in the city.

#### Purpose of the Case study:

The NYPD (NYC Police Department) has collected data for parking tickets to enable them to serve citizens better. Of these, the data files for multiple years are publicly available on Kaggle. This case study attempts to perform some exploratory analysis on a part of this data over the year **2017**. We will use Spark, a Big Data tool, to analyse the full files provided instead of taking a series of random samples that will approximate the population. We will answer questions as asked in the problem statement. <br>




## Assumptions and Directions in Assignment

##### Please ensure Kernel is `PySpark`

1. The data for this case study is present in HDFS at the following path: `/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv` 
2. The exploratory data analysis is specifically for year 2017 and hence any other years will be ignored
3. We need to clean data as we progress through the questions due to nature of the questions asked
4. We are free to use `Spark SQL` Queries and PySpark DataFrame Methods as required.
5. To enable us to answer the questions posed in a more efficient way, we can use other `Python libraries` such as `pandas`, `numpy` and `matplotlib`
6. Data will be cleaned in a particular column if no directions/hint is provided for any particular question


In [1]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, countDistinct, when, isnan


#### Import data

In [2]:
# Create a spark session. We prfer use of session
# instead of context as it is more efficient
spark = SparkSession \
    .builder \
    .appName('Spark Case Study') \
    .getOrCreate()

In [3]:
# Load the file from specified location
# Ensure to filter header and infer schema
# along with specifying date format
nyc_df = spark.read \
           .format("csv") \
           .options(header='true', inferSchema='true', dateFormat='yyyy-MM-dd')\
           .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [4]:
# Replace white spaces with _ charecter for easier access
alias_list = [col(column).alias(column.replace(' ', '_')) for column in nyc_df.columns]
nyc_df = nyc_df.select(*alias_list)

In [5]:
# Display a sample of 5 records in a pretty format
nyc_df.show(5,vertical=True)

-RECORD 0---------------------------------
 Summons_Number     | 5092469481          
 Plate_ID           | GZH7067             
 Registration_State | NY                  
 Issue_Date         | 2016-07-10 00:00:00 
 Violation_Code     | 7                   
 Vehicle_Body_Type  | SUBN                
 Vehicle_Make       | TOYOT               
 Violation_Precinct | 0                   
 Issuer_Precinct    | 0                   
 Violation_Time     | 0143A               
-RECORD 1---------------------------------
 Summons_Number     | 5092451658          
 Plate_ID           | GZH7067             
 Registration_State | NY                  
 Issue_Date         | 2016-07-08 00:00:00 
 Violation_Code     | 7                   
 Vehicle_Body_Type  | SUBN                
 Vehicle_Make       | TOYOT               
 Violation_Precinct | 0                   
 Issuer_Precinct    | 0                   
 Violation_Time     | 0400P               
-RECORD 2---------------------------------
 Summons_Nu

In [6]:
# Force DateTime record of Issue_Date to be a Date only Record
# There is a difference between Issue and Violation time stamps
# As per the kaggle dictionary. We will retain this.
# Reason: A ticket can be issued post violation by mail.
nyc_df = nyc_df.withColumn('Issue_Date',col('Issue_Date').cast("Date"))
nyc_df.show(5,vertical=True)

-RECORD 0------------------------
 Summons_Number     | 5092469481 
 Plate_ID           | GZH7067    
 Registration_State | NY         
 Issue_Date         | 2016-07-10 
 Violation_Code     | 7          
 Vehicle_Body_Type  | SUBN       
 Vehicle_Make       | TOYOT      
 Violation_Precinct | 0          
 Issuer_Precinct    | 0          
 Violation_Time     | 0143A      
-RECORD 1------------------------
 Summons_Number     | 5092451658 
 Plate_ID           | GZH7067    
 Registration_State | NY         
 Issue_Date         | 2016-07-08 
 Violation_Code     | 7          
 Vehicle_Body_Type  | SUBN       
 Vehicle_Make       | TOYOT      
 Violation_Precinct | 0          
 Issuer_Precinct    | 0          
 Violation_Time     | 0400P      
-RECORD 2------------------------
 Summons_Number     | 4006265037 
 Plate_ID           | FZX9232    
 Registration_State | NY         
 Issue_Date         | 2016-08-23 
 Violation_Code     | 5          
 Vehicle_Body_Type  | SUBN       
 Vehicle_Make 

In [7]:
#Display the Schema
nyc_df.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: date (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)



## Examine the data
For the scope of this analysis, we will analyse the parking tickets over the year 2017.

**Question 1:**`Find the total number of tickets for the year`

In [8]:
# register temp view
#nyc_df.createOrReplaceTempView("nyc_dfTable")
# Find Ticket grouped by Years to get correct count

# Uncomment if you want a SQL way of getting the answer
# Both solutions are equivalent

# spark.sql("""
#                 SELECT Year(issue_date)                   AS Year,
#                        Count(DISTINCT ( summons_number )) AS Tickets
#                 FROM   nyc_dftable
#                 GROUP  BY year
#                 ORDER  BY year  
#            """).show(55)

#Using pyspark dataframe functions to achieve one of the assignment goals
nyc_df.select(year(col('Issue_Date')).alias('Year'),'Summons_Number')\
              .groupBy('Year').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .show(55)#.count() yields 55. So, show all 

+----+-------+
|Year|Tickets|
+----+-------+
|1990|      2|
|2025|      6|
|1977|      1|
|2027|     50|
|2003|      1|
|2007|     18|
|2018|   1057|
|1974|      1|
|2015|    419|
|2023|      5|
|2047|      2|
|2069|      4|
|2062|      2|
|2006|      8|
|2022|      4|
|2031|      5|
|2013|     70|
|1997|      1|
|2026|     24|
|1994|      1|
|2014|    120|
|1973|      2|
|2041|      1|
|2019|    472|
|2036|      1|
|2060|      2|
|2004|      2|
|1991|      3|
|2029|      2|
|2030|     12|
|1996|      1|
|2053|      1|
|2068|      1|
|2020|     22|
|1985|      1|
|2012|     87|
|2033|      2|
|2009|      3|
|2016|5368391|
|2061|      1|
|2001|      2|
|2028|      8|
|2024|      3|
|1972|      2|
|2005|      1|
|1984|      1|
|2000|    185|
|2010|     48|
|2011|     22|
|1976|      1|
|2008|      4|
|2017|5431918|
|2063|      2|
|2002|      1|
|2021|     22|
+----+-------+



In [9]:
# Just to demonstrate  SQL returns same answer , uncomment SQL and comment Python code:  
#nyc_df = spark.sql(""" 
#                        SELECT *
#                        FROM   nyc_dftable
#                        WHERE  Year(issue_date) = 2017 
#                    """) 
nyc_df = nyc_df.filter(year(col('Issue_Date')) == 2017)

In [10]:
nyc_df.select(year(col('Issue_Date')).alias('Year'),'Summons_Number')\
              .groupBy('Year').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .show()#

+----+-------+
|Year|Tickets|
+----+-------+
|2017|5431918|
+----+-------+



In [11]:
print('Statistics of violations:')
nyc_df.describe(['Violation_Code','Summons_Number']).show()
# There is no violation code 0. Only 1-99
# as per https://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
# We need to discard this set as it is erroneous set of values
nyc_df = nyc_df.filter(col('Violation_Code') > 0)
print('Statistics of violations:')
nyc_df.describe(['Violation_Code','Summons_Number']).show()

Statistics of violations:
+-------+------------------+--------------------+
|summary|    Violation_Code|      Summons_Number|
+-------+------------------+--------------------+
|  count|           5431918|             5431918|
|   mean| 35.05772196855696| 7.116377503346856E9|
| stddev|19.332822159461614|2.2941358232098784E9|
|    min|                 0|          1002884949|
|    max|                99|          8585600044|
+-------+------------------+--------------------+

Statistics of violations:
+-------+------------------+--------------------+
|summary|    Violation_Code|      Summons_Number|
+-------+------------------+--------------------+
|  count|           5431691|             5431691|
|   mean| 35.05918709293294|7.1163223769480095E9|
| stddev|19.331897645688496| 2.294161894739209E9|
|    min|                 1|          1002884949|
|    max|                99|          8585600044|
+-------+------------------+--------------------+



In [12]:
#  SQL style - preferred as its easy
nyc_df.createOrReplaceTempView("nyc_dfTable")
#Query yielded a length of 65 rows in count. So, showing 65 rows
spark.sql("""
                SELECT *
                FROM   nyc_dftable
                WHERE  summons_number IS NULL  
            """).count()
# THERE ARE NO NULL SUMMONS NUMBERS
# We can count and even replace easily if we use PySpark DataFrame functions
# nyc_df.select('Summons_Number').na.fill(0).where(nyc_df.Summons_Number == 0).count()
# THERE ARE NO NULL SUMMONS NUMBERS

0

In [13]:
# PySpark Data Frame methods
nyc_df.select(year(col('Issue_Date')).alias('Year'),'Summons_Number')\
              .groupBy('Year').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .show()

+----+-------+
|Year|Tickets|
+----+-------+
|2017|5431691|
+----+-------+



1. There were no NULL values in Tickets
2. Erroneous violaion codes were removed
3. We only counted distinct summons number to avoid duplicates.
4. `Violation Precinct` is not treated as a later question expects errors.

The actual number of tickets in 2017 with valid information is **5431691**.

**Question 2.** `Find out the number of unique states from where the cars that got parking tickets came.`

(Hint: Use the column 'Registration State'.)

In [14]:
#SQL style
nyc_df.createOrReplaceTempView("nyc_dfTable")
#Query yielded a length of 65 rows in count. this count is automatically pulled into count variable before printing
count = spark.sql("""
                SELECT registration_state                 AS Origin_State,
                       Count(DISTINCT ( summons_number )) AS Tickets
                FROM   nyc_dftable
                GROUP  BY origin_state
                ORDER  BY tickets DESC 
           """).count()
print("total number of records: ", count)
spark.sql("""
                SELECT registration_state                 AS Origin_State,
                       Count(DISTINCT ( summons_number )) AS Tickets
                FROM   nyc_dftable
                GROUP  BY origin_state
                ORDER  BY tickets DESC 
           """).show(count)
#PySpark DataFrame Methonds
#Query yielded a length of 65 rows in count. So, showing 65 rows
#nyc_df.select(col('Registration_State').alias('Origin_State'),'Summons_Number')\
#              .groupBy('Origin_State').agg(countDistinct('Summons_Number').alias('Tickets'))\
#              .orderBy('Tickets',ascending=False)\
#              .show(65)#.count()

total number of records:  65
+------------+-------+
|Origin_State|Tickets|
+------------+-------+
|          NY|4273870|
|          NJ| 475813|
|          PA| 140286|
|          CT|  70401|
|          FL|  69468|
|          IN|  45525|
|          MA|  38940|
|          VA|  34366|
|          MD|  30212|
|          NC|  27152|
|          TX|  18827|
|          IL|  18665|
|          GA|  17537|
|          99|  15930|
|          AZ|  12378|
|          OH|  12281|
|          CA|  12153|
|          ME|  10806|
|          SC|  10395|
|          MN|  10083|
|          OK|   9088|
|          TN|   8514|
|          DE|   7905|
|          MI|   7231|
|          RI|   5813|
|          NH|   4119|
|          VT|   3683|
|          AL|   3178|
|          WA|   3052|
|          OR|   2622|
|          MO|   2483|
|          ON|   2460|
|          WI|   2127|
|          QB|   1998|
|          IA|   1938|
|          DC|   1929|
|          CO|   1841|
|          KY|   1795|
|          DP|   1794|
|    

**Direction in Question 2:**

There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [15]:
#99 has 16055 Tickets. NY has most tickets - 4273951.
# Replace 99 by NY. It is simple to do in PySpark
nyc_df = nyc_df.withColumn("Registration_State", \
              when(nyc_df["Registration_State"] == '99', 'NY')\
                           .otherwise(nyc_df["Registration_State"]))

In [16]:
#nyc_df.printSchema()
# refresh temp view
nyc_df.createOrReplaceTempView("nyc_dfTable")

states = spark.sql("""
                        SELECT registration_state                 AS Origin_State,
                               Count(DISTINCT ( summons_number )) AS Tickets
                        FROM   nyc_dftable
                        GROUP  BY origin_state
                        ORDER  BY tickets DESC 
                    """)

#Get states and print pretty!
#states = nyc_df.select(col('Registration_State').alias('Origin_State'),'Summons_Number')\
#              .groupBy('Origin_State').agg(countDistinct('Summons_Number').alias('Tickets'))\
#              .orderBy('Tickets',ascending=False)
print('There are ',states.count(),' unique jurisdictions of origin for offeneders who were issued tickets.')
print ('The Top 5 states in order of tickets issued are:')
states.show(5)#use 64 to list all

There are  64  unique jurisdictions of origin for offeneders who were issued tickets.
The Top 5 states in order of tickets issued are:
+------------+-------+
|Origin_State|Tickets|
+------------+-------+
|          NY|4289800|
|          NJ| 475813|
|          PA| 140286|
|          CT|  70401|
|          FL|  69468|
+------------+-------+
only showing top 5 rows



**Answers and Observations:**
1. The total number of tickets issued in 2017 was 5431691.
 
2. There are ony 50 states in the USA. AS a result, there are a total of 64 jurisdictions of origin for vehicles with parking tickets. New York has most tickets (4290006), followed by New Jersey,Pennsylvania, Connecticut and Florida. 

Most of the tickets are issued to Newyorkers. The data shows that the city has many vehicles coming in from other states, especially those that are very near by. This traffic forces Nywyorkers to violate parking regulations and get Tickets issued. If these visitors can be made to use taxis and public transport, the challenges due to high traffic can be reduced.

## Aggregation Queries
**Question 1:**

`How often does each violation code occur? Display the frequency of the top five violation codes.`
(Hint: Find the top 5)

In [17]:
# refresh temp view
print('Statistics of violations:')
nyc_df.describe(['Violation_Code','Summons_Number']).show()

#Display Frequency of tickets per code.
print('Listing frequency of top 5 violation codes in the year 2017')
nyc_df.select(col('Violation_Code'),'Summons_Number')\
              .groupBy('Violation_Code').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .orderBy('Tickets',ascending=False)\
              .show(5) #Replace with 99 for all as count() yielded 99 records
#SQL style...
# refresh temp view
#nyc_df.createOrReplaceTempView("nyc_dfTable")
#print('Listing frequency of violations in a year')
#Display Frequency of violations in the year.
#spark.sql("""
#                SELECT violation_code,
#                       Count(DISTINCT ( summons_number )) AS Tickets
#                FROM   nyc_dftable
#                GROUP  BY violation_code
#                ORDER  BY tickets DESC 
#            """).show(5)#Replace with 99 for all as count() yielded 99 records

Statistics of violations:
+-------+------------------+--------------------+
|summary|    Violation_Code|      Summons_Number|
+-------+------------------+--------------------+
|  count|           5431691|             5431691|
|   mean| 35.05918709293294|7.1163223769480095E9|
| stddev|19.331897645688496|2.2941618947392097E9|
|    min|                 1|          1002884949|
|    max|                99|          8585600044|
+-------+------------------+--------------------+

Listing frequency of top 5 violation codes in the year 2017
+--------------+-------+
|Violation_Code|Tickets|
+--------------+-------+
|            21| 768087|
|            36| 662765|
|            38| 542079|
|            14| 476664|
|            20| 319646|
+--------------+-------+
only showing top 5 rows



Codes 21, 36, 38, 14 and 20 are the most frequent violations in the `year 2017`

**Question 2.1 :**`How often does each 'vehicle body type' get a parking ticket?`
(Hint: Find the top 5)

In [18]:
print('Listing top 5 vehicle body types base on ticket counts')
nyc_df.select(col('Vehicle_Body_Type'),'Summons_Number')\
              .groupBy('Vehicle_Body_Type').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .orderBy('Tickets',ascending=False)\
              .show(5) #Replace with 1165 for all as count() yielded 1165 records

#Display Top 5 in SQL style query...
#nyc_df.createOrReplaceTempView("nyc_dfTable")
#print('Listing top 5 vehicle body types with ticket counts')
#Display Top 5
#spark.sql("""
#                SELECT vehicle_body_type,
#                       COUNT(DISTINCT ( summons_number )) AS Tickets 
#                FROM   nyc_dftable 
#                GROUP  BY vehicle_body_type 
#                ORDER  BY tickets desc  
#           
#           """).show(5)#Replace with 1165 for all

Listing top 5 vehicle body types base on ticket counts
+-----------------+-------+
|Vehicle_Body_Type|Tickets|
+-----------------+-------+
|             SUBN|1883933|
|             4DSD|1547283|
|              VAN| 724022|
|             DELV| 358980|
|              SDN| 194195|
+-----------------+-------+
only showing top 5 rows



In [19]:
# Reference: https://data.ny.gov/Transportation/Vehicle-Makes-and-Body-Types-Most-Popular-in-New-Y/3pxy-wy2i
# On listing, we found codes that were mistyped
# Assuming closest matches and
# replacing them correctly to get exact counts
# SUBU SUB SUBA are SUBN
# FDSD is 4DSD
# VANT, VAND, VAN. VANF, VANH are VAN
# DELI, DELT, DEL, DELA, DELO are DELV
# SEDA (Found boat make, but, no body type. Correcting to SEDN),SEDN,SDN to be SEDN
nyc_df = nyc_df.withColumn("Vehicle_Body_Type", \
              when(nyc_df["Vehicle_Body_Type"].isin(['SUBU', 'SUB', 'SUBA']), 'SUBN')\
                           .otherwise(nyc_df["Vehicle_Body_Type"]))
nyc_df = nyc_df.withColumn("Vehicle_Body_Type", \
              when(nyc_df["Vehicle_Body_Type"] == 'FDSD', '4DSD')\
                           .otherwise(nyc_df["Vehicle_Body_Type"]))
nyc_df = nyc_df.withColumn("Vehicle_Body_Type", \
              when(nyc_df["Vehicle_Body_Type"].isin(['VANT', 'VAND', 'VAN.', 'VANF', 'VANH']), 'VAN')\
                           .otherwise(nyc_df["Vehicle_Body_Type"]))
nyc_df = nyc_df.withColumn("Vehicle_Body_Type", \
              when(nyc_df["Vehicle_Body_Type"].isin(['DELI', 'DELT', 'DEL', 'DELA', 'DELO']), 'DELV')\
                           .otherwise(nyc_df["Vehicle_Body_Type"]))
nyc_df = nyc_df.withColumn("Vehicle_Body_Type", \
              when(nyc_df["Vehicle_Body_Type"].isin(['SEDA','SEDN','SDN']), 'SEDN')\
                           .otherwise(nyc_df["Vehicle_Body_Type"]))
print('Listing top 5 vehicle body types base on ticket counts')
nyc_df.select(col('Vehicle_Body_Type'),'Summons_Number')\
              .groupBy('Vehicle_Body_Type').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .orderBy('Tickets',ascending=False)\
              .show(5)

Listing top 5 vehicle body types base on ticket counts
+-----------------+-------+
|Vehicle_Body_Type|Tickets|
+-----------------+-------+
|             SUBN|1884040|
|             4DSD|1547292|
|              VAN| 724128|
|             DELV| 359041|
|             SEDN| 197293|
+-----------------+-------+
only showing top 5 rows



**Answer and Observation**
Suburban (SUBN), four-door sedan (4DSD), Van Truck (VAN),Delivery Truck (DELV) and Sedan (SEDN) are the top 5 body types in records. The highest violation frequency is of Suburban (almost 1.9 million times in a year).

Most of these vehicles are used for transport and delivery. This indicates that drivers are driven by required delivery/arrival time to violate regulations. Evenly distributed special parking zones for such vehicles can ease traffic challenges.

**Question 2.2 :**`How about the 'vehicle make'?`(Hint: Find the top 5)

In [20]:
print('Listing top 5 vehicle makes base on ticket counts')
nyc_df.select(col('Vehicle_Make'),'Summons_Number')\
              .groupBy('Vehicle_Make').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .orderBy('Tickets',ascending=False)\
              .show(5) #Replace with 3179 for all as count() yielded 3179 records

#Display Top 5 in SQL style query...
#nyc_df.createOrReplaceTempView("nyc_dfTable")
#print('Listing top 5 vehicle makes with ticket counts')
#Display Top 5
#spark.sql('SELECT Vehicle_Make, COUNT(DISTINCT (Summons_Number)) as Tickets \
#           FROM nyc_dfTable \
#           GROUP BY Vehicle_Make \
#           ORDER BY Tickets DESC').show(5)

Listing top 5 vehicle makes base on ticket counts
+------------+-------+
|Vehicle_Make|Tickets|
+------------+-------+
|        FORD| 636838|
|       TOYOT| 605279|
|       HONDA| 538875|
|       NISSA| 462011|
|       CHEVR| 356026|
+------------+-------+
only showing top 5 rows



In [21]:
# Reference: https://data.ny.gov/Transportation/Vehicle-Makes-and-Body-Types-Most-Popular-in-New-Y/3pxy-wy2i
# On listing, we found codes that were mistyped
# Assuming closest matches and
# replacing them correctly to get exact counts
# FORDB, FORDU, Ford, FORE, FOREL, FORLE,  FOREN are FORD
# HONSA, HONRA are HONDA
# NISN, NISSZ, NISSR are NISSA
# Chevr,CHEVO,CHEVG,CHEVV, CHV to be CHEVR
nyc_df = nyc_df.withColumn("Vehicle_Make", \
              when(nyc_df["Vehicle_Make"].isin(['FORDB', 'FORDU', 'Ford', 'FORE', 'FOREL', 'FORLE',  'FOREN']), 'FORD')\
                           .otherwise(nyc_df["Vehicle_Make"]))
nyc_df = nyc_df.withColumn("Vehicle_Make", \
              when(nyc_df["Vehicle_Make"].isin(['HONSA', 'HONRA']), 'HONDA')\
                           .otherwise(nyc_df["Vehicle_Make"]))
nyc_df = nyc_df.withColumn("Vehicle_Make", \
              when(nyc_df["Vehicle_Make"].isin(['NISN', 'NISSZ', 'NISSR']), 'NISSA')\
                           .otherwise(nyc_df["Vehicle_Make"]))
nyc_df = nyc_df.withColumn("Vehicle_Make", \
              when(nyc_df["Vehicle_Make"].isin(['Chevr','CHEVO','CHEVG','CHEVV', 'CHV']), 'CHEVR')\
                           .otherwise(nyc_df["Vehicle_Make"]))

print('Listing top 5 vehicle body types base on ticket counts')
nyc_df.select(col('Vehicle_Make'),'Summons_Number')\
              .groupBy('Vehicle_Make').agg(countDistinct('Summons_Number').alias('Tickets'))\
              .orderBy('Tickets',ascending=False)\
              .show(5)

Listing top 5 vehicle body types base on ticket counts
+------------+-------+
|Vehicle_Make|Tickets|
+------------+-------+
|        FORD| 636847|
|       TOYOT| 605279|
|       HONDA| 538878|
|       NISSA| 462015|
|       CHEVR| 356050|
+------------+-------+
only showing top 5 rows



 `FORD, TOYOTA, HONDA, NISSAN and CHEVROLET are the top 5 body types in records `


##### 3) `A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for:`
##### a. `Violation Precinct` (This is the precinct of the zone where the violation occurred).

##### NOTE: Here, you would have noticed that the dataframe has the'Violating Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)

In [22]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
print('Listing top 5 Precinct (count includes 0 hence top 6) where violations occured with ticket counts')
#Display Top 5  
spark.sql("""
                SELECT violation_precinct,
                       COUNT(DISTINCT ( summons_number )) AS Tickets 
                FROM   nyc_dftable 
                GROUP  BY violation_precinct 
                ORDER  BY tickets desc  
            """).show(6)#Replace with (count) for showing all records

Listing top 5 Precinct (count includes 0 hence top 6) where violations occured with ticket counts
+------------------+-------+
|violation_precinct|Tickets|
+------------------+-------+
|                 0| 925405|
|                19| 274443|
|                14| 203552|
|                 1| 174702|
|                18| 169129|
|               114| 147443|
+------------------+-------+
only showing top 6 rows



`Using this, can you draw any insights for parking violations in any specific areas of the city?`

##### b. `Issuer Precinct` (This is the precinct that issued the ticket.)

NOTE: Here, you would have noticed that the dataframe has the'Issuer Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. 

(Hint: Print the top six entries after sorting.)

In [23]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
print('Listing top 5 Precinct (count includes 0 hence top 6) Precinct by tickets issued with ticket counts')
#Display Top 5  
# 
spark.sql("""
             SELECT issuer_precinct,
                   COUNT(DISTINCT ( summons_number )) AS Tickets 
            FROM   nyc_dftable 
            GROUP  BY issuer_precinct 
            ORDER  BY tickets desc 

            """).show(6)#Replace with (count) for showing all records

Listing top 5 Precinct (count includes 0 hence top 6) Precinct by tickets issued with ticket counts
+---------------+-------+
|issuer_precinct|Tickets|
+---------------+-------+
|              0|1078215|
|             19| 266959|
|             14| 200494|
|              1| 168740|
|             18| 162992|
|            114| 144053|
+---------------+-------+
only showing top 6 rows



##### `Using this, can you draw any insights for parking violations in any specific areas of the city?`

##### `4) Find the violation code frequencies for three precincts that have issued the most number of tickets.` 
(Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)
`Do these precinct zones have an exceptionally high frequency of certain violation codes?` `Are these codes common across precincts?` <br>

In [24]:
import numpy as np
import pandas as pd
from matplotlib.pyplot import show

nyc_df.createOrReplaceTempView("nyc_dfTable")
print('Listing violation code frequencies per precinct with ticket counts')
#Displays all the violation codes for precincts 19, 14 and 1 
# uncomment the code below if you would like to see all
# 
#nyc_pdf = spark.sql("""
#                        SELECT issuer_precinct,
#                               violation_code,
#                               COUNT(DISTINCT ( summons_number )) AS Tickets 
#                        FROM   nyc_dftable 
#                        WHERE  issuer_precinct in ( 19  ,14 , 1  )
#                        GROUP  BY issuer_precinct,
#                                  violation_code 
#                        ORDER  BY issuer_precinct,
#                                  tickets desc    
#       """).toPandas()
# nyc_pdf.sort_values(by='Tickets',ascending=False)
# ax = nyc_pdf.plot.scatter(x='violation_code', y='tickets', c='issuer_precinct', colormap='viridis')
# ax.set_xticks(np.arange(0, 100, 1))
# show()


# Print top 5 violation codes across three precincts 

nyc_pdf = spark.sql("""
                         SELECT *
                        FROM   (SELECT '19' as issuer_precinct, * 
                                        FROM   (SELECT  violation_code,
                                               Count(DISTINCT summons_number) AS Tickets
                                        FROM   nyc_dftable
                                        WHERE  issuer_precinct = 19
                                        GROUP  BY violation_code
                                        ORDER by Tickets desc
                                        limit 5) inner_q) outer_q  -- Display top 5 violation codes 
                        union all
                                SELECT *
                        FROM   (SELECT '14' as issuer_precinct, * 
                                        FROM   (SELECT  violation_code,
                                               Count(DISTINCT summons_number) AS Tickets
                                        FROM   nyc_dftable
                                        WHERE  issuer_precinct = 14
                                        GROUP  BY violation_code
                                        ORDER by Tickets desc
                                        limit 5) inner_q) outer_q    -- Display top 5 violation codes 
                        union all
                                SELECT *
                        FROM   (SELECT '1' as issuer_precinct, * 
                                        FROM   (SELECT  violation_code,
                                               Count(DISTINCT summons_number) AS Tickets
                                        FROM   nyc_dftable
                                        WHERE  issuer_precinct = 1
                                        GROUP  BY violation_code
                                        ORDER by Tickets desc
                                        limit 5) inner_q) outer_q   -- Display top 5 violation codes 
                   """).show()



Listing violation code frequencies per precinct with ticket counts
+---------------+--------------+-------+
|issuer_precinct|violation_code|Tickets|
+---------------+--------------+-------+
|             19|            46|  48445|
|             19|            38|  36386|
|             19|            37|  36056|
|             19|            14|  29797|
|             19|            21|  28415|
|             14|            14|  45036|
|             14|            69|  30464|
|             14|            31|  22555|
|             14|            47|  18364|
|             14|            42|  10027|
|              1|            14|  38354|
|              1|            16|  19081|
|              1|            20|  15408|
|              1|            46|  12745|
|              1|            38|   8535|
+---------------+--------------+-------+



##### Inference
1) Violation codes `46` and  `14` tops the list of violations in precincts `19` and `14 & 1` respectively <br>
2) Violation code `14` is common among all three precincts considering `Top 5 violation codes`



##### 5) `Find out the properties of parking violations across different times of the day.` 
a) `Find a way to deal with missing values, if any.` <br>

Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation

In [25]:
nyc_df.where(col("Violation_Time").isNull()).count()

0

In [26]:
nyc_df.where(col("Violation_Time")=='nan').count()

16

In [27]:
#dropNA is not useful as it is nan string
nyc_df = nyc_df.where(col("Violation_Time")!='nan')

In [28]:
nyc_df.select('Violation_Time').where('NOT Violation_Time rlike "^[0-9]{4}[A|P]$"').show(37)

+--------------+
|Violation_Time|
+--------------+
|          0557|
|         110+A|
|         093+A|
|          0855|
|         075/P|
|         09.5A|
|         073/A|
|         09+1A|
|         10.3P|
|          0515|
|         06.5A|
|         09.5A|
|         06+0P|
|          0316|
|         0.47A|
|          0651|
|         065+A|
|         074/A|
|         10+1A|
|         125+A|
|         093+A|
|         09+2A|
|          1037|
|         083.A|
|         08+7A|
|          0446|
|         04080|
|         081*A|
|         015.A|
|         103/P|
|         10.0P|
|         .933A|
|         094/P|
|         121/P|
|         115+A|
|         100.P|
|         07.8A|
+--------------+



In [29]:
#dropNA is not useful as it is nan string
nyc_df = nyc_df.where('Violation_Time rlike "^[0-9]{4}[A|P]$"')

 b) `The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.`

In [30]:
from pyspark.sql.functions import substring
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
nyc_df=nyc_df.withColumn('Violation_Time_AM_PM',substring(nyc_df.Violation_Time, 5, 1))
nyc_df=nyc_df.withColumn('Violation_Time_Hour_12',substring(nyc_df.Violation_Time, 0, 2).cast(IntegerType()))
nyc_df=nyc_df.withColumn('Violation_Time_Minute',substring(nyc_df.Violation_Time, 3, 2).cast(IntegerType()))
nyc_df.describe(['Violation_Time_AM_PM','Violation_Time_Hour_12','Violation_Time_Minute']).show()

+-------+--------------------+----------------------+---------------------+
|summary|Violation_Time_AM_PM|Violation_Time_Hour_12|Violation_Time_Minute|
+-------+--------------------+----------------------+---------------------+
|  count|             5431638|               5431638|              5431638|
|   mean|                null|     6.843028751179663|   29.093529060662732|
| stddev|                null|      3.73763347917092|    17.20503471486344|
|    min|                   A|                     0|                    0|
|    max|                   P|                    87|                   59|
+-------+--------------------+----------------------+---------------------+



In [31]:
nyc_df.where(col("Violation_Time_AM_PM").isNull()).count()

0

In [32]:
nyc_df.where(col("Violation_Time_Hour_12").isNull()).count()

0

In [33]:
nyc_df.where(col("Violation_Time_Minute").isNull()).count()

0

In [34]:
h12_udf = udf(lambda h: h if h < 12 else 0, IntegerType())
nyc_df = nyc_df.withColumn('Violation_Time_Hour_12',h12_udf('Violation_Time_Hour_12'))
h24_udf = udf(lambda h, t: h+12 if t=='P' else h, IntegerType())
nyc_df = nyc_df.withColumn('Violation_Time_Hour_24',h24_udf('Violation_Time_Hour_12','Violation_Time_AM_PM'))
nyc_df.select('Violation_Time','Violation_Time_Hour_24','Violation_Time_Minute').show()

+--------------+----------------------+---------------------+
|Violation_Time|Violation_Time_Hour_24|Violation_Time_Minute|
+--------------+----------------------+---------------------+
|         1120A|                    11|                   20|
|         0852P|                    20|                   52|
|         0015A|                     0|                   15|
|         0525A|                     5|                   25|
|         0256P|                    14|                   56|
|         1232A|                     0|                   32|
|         1034A|                    10|                   34|
|         1021A|                    10|                   21|
|         0721A|                     7|                   21|
|         0940A|                     9|                   40|
|         1223P|                    12|                   23|
|         1028A|                    10|                   28|
|         0148A|                     1|                   48|
|       

In [35]:
nyc_df = nyc_df.drop('Violation_Time','Violation_Time_AM_PM','Violation_Time_Hour_12')
nyc_df.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: date (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time_Minute: integer (nullable = true)
 |-- Violation_Time_Hour_24: integer (nullable = true)



 c) `Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.`

(Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

#####  let's print out total number of entries per bin / bucketed values

In [36]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
time_nyc_df = spark.sql("""
                            SELECT Count(1),
                                   time_bin
                            FROM   (SELECT summons_number,
                                           ( CASE
                                               WHEN ( violation_time_hour_24 BETWEEN 0 AND 3 ) THEN 1
                                               WHEN ( violation_time_hour_24 BETWEEN 4 AND 7 ) THEN 2
                                               WHEN ( violation_time_hour_24 BETWEEN 8 AND 11 ) THEN 3
                                               WHEN ( violation_time_hour_24 BETWEEN 12 AND 15 ) THEN 4
                                               WHEN ( violation_time_hour_24 BETWEEN 16 AND 19 ) THEN 5
                                               WHEN ( violation_time_hour_24 BETWEEN 20 AND 24 ) THEN 6
                                               ELSE 7
                                             -- Bin with invalid timeframe , let's check if we have some rows?
                                             END ) AS Time_Bin
                                    FROM   nyc_dftable)
                            GROUP  BY 2
                            ORDER  BY 2 ASC  
                        """).show()

+--------+--------+
|count(1)|time_bin|
+--------+--------+
|  164531|       1|
|  449861|       2|
| 2163455|       3|
| 1839925|       4|
|  637513|       5|
|  176353|       6|
+--------+--------+



In [37]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
time_nyc_df = spark.sql("""
                            SELECT summons_number,
                                    Violation_Code ,
                                   ( CASE
                                       WHEN ( violation_time_hour_24 BETWEEN 0 AND 3 ) THEN 1
                                       WHEN ( violation_time_hour_24 BETWEEN 4 AND 7 ) THEN 2
                                       WHEN ( violation_time_hour_24 BETWEEN 8 AND 11 ) THEN 3
                                       WHEN ( violation_time_hour_24 BETWEEN 12 AND 15 ) THEN 4
                                       WHEN ( violation_time_hour_24 BETWEEN 16 AND 19 ) THEN 5
                                       WHEN ( violation_time_hour_24 BETWEEN 20 AND 24 ) THEN 6
                                       ELSE 7
                                     END ) AS Time_Bin
                            FROM   nyc_dftable    
                        """)
# time_nyc_df.printSchema()

In [38]:
time_nyc_df.show(5)

+--------------+--------------+--------+
|summons_number|Violation_Code|Time_Bin|
+--------------+--------------+--------+
|    8478629828|            47|       3|
|    5096917368|             7|       6|
|    1407740258|            78|       1|
|    1413656420|            40|       2|
|    8480309064|            64|       4|
+--------------+--------------+--------+
only showing top 5 rows



In [39]:
time_nyc_df.cache()
time_nyc_df.createOrReplaceTempView("time_nyc_dfTable")
spark.sql("""

              SELECT *
                FROM   (SELECT violation_code,
                               time_bin,
                               tickets,
                               Dense_rank()
                                 OVER (
                                   partition BY time_bin
                                   ORDER BY tickets DESC) AS RANK
                        FROM   (SELECT Count(summons_number) AS tickets,
                                       violation_code,
                                       time_bin
                                FROM   time_nyc_dftable
                                GROUP  BY violation_code,
                                          time_bin) inner_q) outer_q
                WHERE  rank <= 3
                ORDER  BY time_bin ASC , RANK asc  

            """).show()

+--------------+--------+-------+----+
|violation_code|time_bin|tickets|RANK|
+--------------+--------+-------+----+
|            21|       1|  36957|   1|
|            40|       1|  25865|   2|
|            78|       1|  15528|   3|
|            14|       2|  74113|   1|
|            40|       2|  60652|   2|
|            21|       2|  57894|   3|
|            21|       3| 598062|   1|
|            36|       3| 348165|   2|
|            38|       3| 176570|   3|
|            36|       4| 286284|   1|
|            38|       4| 240722|   2|
|            37|       4| 167026|   3|
|            38|       5| 102855|   1|
|            14|       5|  75902|   2|
|            37|       5|  70345|   3|
|             7|       6|  26293|   1|
|            40|       6|  22336|   2|
|            14|       6|  21045|   3|
+--------------+--------+-------+----+



##### `Three most common occuring violations are listed for each of the time slot above ` <br>

d) `Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).`

In [40]:
time_nyc_df.cache()
time_nyc_df.createOrReplaceTempView("time_nyc_dfTable")
spark.sql("""
                SELECT time_bin,
                       violation_code,
                       Count(summons_number) AS tickets
                FROM   time_nyc_dftable
                WHERE  violation_code IN (SELECT violation_code
                                          FROM   (SELECT Count(summons_number) AS tickets,
                                                         violation_code
                                                  FROM   time_nyc_dftable
                                                  GROUP  BY violation_code
                                                  ORDER  BY tickets DESC
                                                  LIMIT  3))
                GROUP  BY time_bin,
                          violation_code
                ORDER  BY tickets DESC,
                          time_bin ASC  

            """).show()

+--------+--------------+-------+
|time_bin|violation_code|tickets|
+--------+--------------+-------+
|       3|            21| 598062|
|       3|            36| 348165|
|       4|            36| 286284|
|       4|            38| 240722|
|       3|            38| 176570|
|       5|            38| 102855|
|       4|            21|  74718|
|       2|            21|  57894|
|       1|            21|  36957|
|       6|            38|  20347|
|       2|            36|  14782|
|       5|            36|  13534|
|       2|            38|   1273|
|       1|            38|    312|
|       5|            21|    259|
|       6|            21|    184|
+--------+--------------+-------+



Time bin `3` i.e. time between 8AM and 11AM are the time when there are most violations and violation codes 21, 36 and 38 are the most frequently occuring violations (Top 3)` <br>
Code description is towards the end of the jupyter workbook

##### 6) Let’s try and find some seasonality in this data:

 a) `First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season.` (Hint: Use Issue Date to segregate into seasons.)

In [41]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
season_nyc_df = spark.sql("""
                                 SELECT violation_code,
                                       summons_number,
                                       CASE
                                         WHEN ( Month(issue_date) IN ( 12, 1, 2 ) ) THEN "winter"
                                         WHEN ( Month(issue_date) BETWEEN 3 AND 5 ) THEN "spring"
                                         WHEN ( Month(issue_date) BETWEEN 6 AND 7 ) THEN "summer"
                                         ELSE "fall"
                                       END AS Season
                                FROM   nyc_dftable  
                            """)
season_nyc_df.printSchema()

root
 |-- violation_code: integer (nullable = true)
 |-- summons_number: long (nullable = true)
 |-- Season: string (nullable = false)



##### b) `Then, find the three most common violations for each of these seasons.`

(Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

In [42]:
season_nyc_df.createOrReplaceTempView("season_nyc_dfTable")
spark.sql("""
                SELECT *
                FROM   (SELECT season,
                               violation_code,
                               tickets,
                               DENSE_RANK() OVER ( PARTITION BY season ORDER BY tickets desc) AS RANK 
                        FROM   (SELECT season,
                                       violation_code,
                                       COUNT(DISTINCT summons_number) AS Tickets 
                                FROM   season_nyc_dftable 
                                GROUP  BY season,
                                          violation_code) inner_q  ) outer_q
                WHERE  RANK <= 3  



            """).show()            

+------+--------------+-------+----+
|season|violation_code|tickets|RANK|
+------+--------------+-------+----+
|winter|            21| 238182|   1|
|winter|            36| 221268|   2|
|winter|            38| 187386|   3|
|summer|            21| 127265|   1|
|summer|            36|  96663|   2|
|summer|            38|  83517|   3|
|spring|            21| 402415|   1|
|spring|            36| 344834|   2|
|spring|            38| 271167|   3|
|  fall|            46|    288|   1|
|  fall|            21|    212|   2|
|  fall|            40|    149|   3|
+------+--------------+-------+----+



1) `Winter, Summer and Spring` have violation codes `21, 36 and 38` that tops the list <br>
2) During `Fall` we have violation codes `46,  21 and 40` that tops the list 


##### 7)  The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

a) `Find the total occurrences of the three most common violation code`

In [43]:
nyc_df.createOrReplaceTempView("nyc_dfTable")
violation_pdf = spark.sql("""
                                SELECT violation_code,
                                       COUNT(DISTINCT summons_number) AS Tickets 
                                FROM   season_nyc_dftable 
                                GROUP  BY violation_code 
                                ORDER  BY tickets desc  
                            """).limit(3).toPandas()

In [44]:
violation_pdf = violation_pdf.set_index('violation_code')
violation_pdf

Unnamed: 0_level_0,Tickets
violation_code,Unnamed: 1_level_1
21,768074
36,662765
38,542079


`b) Then, visit the website:  http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page . It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.`

|CODE|DEFINITION|Manhattan 96th St. & below|All Other Areas|Average|
|------|------|------|------|------|
|21|Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device.|\$65|\$45|\$55|
|36|Exceeding the posted speed limit in or near a designated school zone.|\$50|\$50|\$50|
|38|Failing to show a receipt or tag in the windshield.Drivers get a 5-minute grace period past the expired time on parking meter receipts.|\$65|\$35|\$50|

`c) Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection`

In [45]:
fines = {'violation_Code': [21,36,38], \
        'Fine': [55,50,50]}
fines_pdf = pd.DataFrame(fines)
fines_pdf = fines_pdf.set_index('violation_Code')
violation_pdf=violation_pdf.merge(fines_pdf,left_index=True,right_index=True)
violation_pdf

Unnamed: 0_level_0,Tickets,Fine
violation_code,Unnamed: 1_level_1,Unnamed: 2_level_1
21,768074,55
36,662765,50
38,542079,50


In [46]:
violation_pdf['Collected_Amount'] = violation_pdf['Tickets'] * violation_pdf['Fine']
violation_pdf.sort_values(by='Collected_Amount',ascending=False)

Unnamed: 0_level_0,Tickets,Fine,Collected_Amount
violation_code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
21,768074,55,42244070
36,662765,50,33138250
38,542079,50,27103950


`d)What can you intuitively infer from these findings?` <br>

1) Code 21 tops the list : `Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device.`Increasing the fines increase the revenue here, this can be used by department for civic amenity spending. Increasing the fine would also prevent irregular parking. <br>
2) Code 36 is second in the list: `Exceeding the posted speed limit in or near a designated school zone.`. Increased tickets for this code indicates an issue and a hazard for school kids. More analyis can be made why there is exceeded speed and more speed breakers should be introduced in the school zone <br>
3) Code 38 is third on the list : `Failing to show a receipt or tag in the windshield.Drivers get a 5-minute grace period past the expired time on parking meter receipts.`. Revenue from the NYPD needs to be spent on citizen education drive to prevent this violation 


#### Summary and conclusion / Inferences derived from the case study


1) `FORD`, `TOYOTA`, `HONDA`, `NISSAN` and `CHEVROLET` are the top 5 body types in records. This indicates violation has been recorded for a commoner and not any special kind of vehicles  <br>
2) The highest violation frequency is of `almost 1.9 million times` in a year <br>
3) Most of these vehicles are used for `transport and delivery`. This indicates that drivers are driven by required `delivery/arrival time` to `violate regulations`. Evenly distributed `special parking zones` for such vehicles can ease traffic challenges <br>
4) Violation codes `21, 36 and 38` tops the list and description is provided above <br>
5) Observation `Excluding incorrectly tagged 0 ` shows that Number of tickets issued by `Violation Precinct` is less than the number of tickets `Issuer precinct `. This indicates that there is a probability of shortage of manpower in `Issuer precinct `  <br>



In [47]:
spark.stop()