Objectives of the Case Study
Primarily, this case study is meant as a deep dive into the usage of Spark. As you saw while working with Spark, its syntax behaves differently from a regular Python syntax. One of the major objectives of this case study is to gain familiarity with how analysis works in PySpark as opposed to base Python.

Learning the basic idea behind using functions in PySpark can help in using other libraries like SparkR. If you are in a company where R is a primary language, you can easily pick up SparkR syntax and use Spark’s processing power.

The process of running a model-building command boils down to a few lines of code. While drawing inferences from data, the most time-consuming step is preparing the data up to the point of model building. So, this case study will focus more on exploratory analysis.

Problem Statement
Big data analytics allows you to analyse data at scale. It has applications in almost every industry in the world. Let’s consider an unconventional application that you wouldn’t ordinarily encounter.

 

New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets.

 

In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available on Kaggle. We will try and perform some exploratory analysis on a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017. 

 

Note: Although the broad goal of any analysis of this type is to have better parking and fewer tickets, we are not looking for recommendations on how to reduce the number of parking tickets—there are no specific points reserved for this.

 

The purpose of this case study is to conduct an exploratory data analysis that will help you understand the data. Since the size of the dataset is large, your queries will take some time to run, and you will need to identify the correct queries quicker. The questions given below will guide your analysis.

 

The dataset structure is available on this page along with the data.

https://www.kaggle.com/new-york-city/nyc-parking-tickets/data

General Guidelines:
The queries may take time to get executed. Please have some patience. If you are getting errors with correct queries, restart the PySpark session and try again, as the session may have expired.
Keep a copy of the commands on your local drive or S3 so that you do not lose any work in case of session expiry or you need to close the EMR instance for the timebeing. 
If you want to run SQL commands, create an SQL view first. Also, if you make any changes in the table (like substitution or dropping null values), please ensure that you update the SQL view related to that table for further analysis.
Remember to stop Spark whenever you finish working on the cluster. Use spark.stop().

# Questions to Be Answered in the Analysis
The following analysis should be performed on PySpark mounted on your EMR notebook instance, using the PySpark library. Remember that you need to summarise the analysis with your insights along with the code.

 

## Examine the data

- Find the total number of tickets for the year.
- Find out the number of unique states from where the cars that got parking tickets came. (Hint: Use the column 'Registration State'.)
    There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.
 

### Aggregation tasks

- How often does each violation code occur? Display the frequency of the top five violation codes.
- How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? (Hint: Find the top 5 for both.)
- A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
    - 'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?
    - 'Issuer Precinct' (This is the precinct that issued the ticket.) Here, you would have noticed that the dataframe has the 'Violating Precinct' or 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)
- Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 
    (Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)
- Find out the properties of parking violations across different times of the day:
    - Find a way to deal with missing values, if any.
        (Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

    - The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

    - Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
        (Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

    - Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

- Let’s try and find some seasonality in this data:

    - First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

    - Then, find the three most common violations for each of these seasons.
        (Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

- The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
    - Find the total occurrences of the three most common violation codes.
    - Then, visit the website:
        http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
        It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
    - Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.
    - What can you intuitively infer from these findings?

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.appName('NYC EDA').getOrCreate()

In [3]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [4]:
df = spark.read.csv("Parking_Violations_2017.csv",inferSchema=True, header=True,sep=',')

In [5]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [6]:
df.show(3,False)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+----------------------------+---------------------------------+-----------------+------------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Prec

Examine the data
Find the total number of tickets for the year.
Find out the number of unique states from where the cars that got parking tickets came. (Hint: Use the column 'Registration State'.) There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [7]:
date_pattern = 'dd/MM/YYYY'
df2 = df.withColumn('Issue Date',to_timestamp(df['Issue Date'],date_pattern))

In [8]:
df2.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation C

In [9]:
df2.show(3,truncate=False)

+--------------+--------+------------------+----------+-------------------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+----------------------------+---------------------------------+-----------------+------------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date         |Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Pr

In [10]:
df2 = df2.withColumn("Year_Column",year("Issue Date"))

In [11]:
df2.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation C

In [12]:
df2.select("Year_Column").groupBy('Year_Column').count().orderBy('count',ascending=False).show(truncate=False)

+-----------+-------+
|Year_Column|count  |
+-----------+-------+
|2017       |5432975|
|2015       |5368391|
|2018       |472    |
|2014       |419    |
|1999       |185    |
|2012       |157    |
|2013       |120    |
|2026       |50     |
|2009       |48     |
|2006       |26     |
|2025       |24     |
|2019       |22     |
|2020       |22     |
|2010       |22     |
|2029       |12     |
|2027       |8      |
|2023       |8      |
|2024       |6      |
|2030       |5      |
|2068       |5      |
+-----------+-------+
only showing top 20 rows



In [13]:
# Find the total number of tickets for the year.
df2.select('Issue Date').filter(df2['Year_Column']==2068).show(truncate=False)

+-------------------+
|Issue Date         |
+-------------------+
|2068-01-01 00:00:00|
|2068-12-30 00:00:00|
|2068-12-30 00:00:00|
|2068-12-30 00:00:00|
|2068-12-30 00:00:00|
+-------------------+



In [14]:
#Find out the number of unique states from where the cars that got parking tickets came. 
#(Hint: Use the column 'Registration State'.) There is a numeric entry '99' in the column, which should be 
#corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [15]:
df2.select("Registration State").distinct().count()

67

In [16]:
df2.select("Registration State").groupBy('Registration State').count().orderBy('count',ascending=False).show(67,truncate=False)


+------------------+-------+
|Registration State|count  |
+------------------+-------+
|NY                |8481061|
|NJ                |925965 |
|PA                |285419 |
|FL                |144556 |
|CT                |141088 |
|MA                |85547  |
|IN                |80749  |
|VA                |72626  |
|MD                |61800  |
|NC                |55806  |
|IL                |37329  |
|GA                |36852  |
|99                |36625  |
|TX                |36516  |
|AZ                |26426  |
|OH                |25302  |
|CA                |24260  |
|SC                |21836  |
|ME                |21574  |
|MN                |18227  |
|OK                |18165  |
|TN                |17275  |
|DE                |16325  |
|MI                |15703  |
|RI                |12224  |
|NH                |8752   |
|VT                |7367   |
|AL                |6891   |
|WA                |6311   |
|ON                |5601   |
|OR                |5483   |
|MO           

In [17]:
#replaced 99 with NY as NY is the mostly occuring state in the dataset.

df2 = df2.withColumn('Registration State',when(col('Registration State')=='99','NY').otherwise(col('Registration State')))

In [18]:
df2.select("Registration State").groupBy('Registration State').count().orderBy('count',ascending=False).show(67,truncate=False)

+------------------+-------+
|Registration State|count  |
+------------------+-------+
|NY                |8517686|
|NJ                |925965 |
|PA                |285419 |
|FL                |144556 |
|CT                |141088 |
|MA                |85547  |
|IN                |80749  |
|VA                |72626  |
|MD                |61800  |
|NC                |55806  |
|IL                |37329  |
|GA                |36852  |
|TX                |36516  |
|AZ                |26426  |
|OH                |25302  |
|CA                |24260  |
|SC                |21836  |
|ME                |21574  |
|MN                |18227  |
|OK                |18165  |
|TN                |17275  |
|DE                |16325  |
|MI                |15703  |
|RI                |12224  |
|NH                |8752   |
|VT                |7367   |
|AL                |6891   |
|WA                |6311   |
|ON                |5601   |
|OR                |5483   |
|MO                |4454   |
|QB           

In [19]:
df2.select("Registration State").distinct().count()

66

In [20]:
df2.createOrReplaceTempView("data")

In [21]:
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data group by `Registration State` order by `count` desc").show()

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|8517686|
|                NJ| 925965|
|                PA| 285419|
|                FL| 144556|
|                CT| 141088|
|                MA|  85547|
|                IN|  80749|
|                VA|  72626|
|                MD|  61800|
|                NC|  55806|
|                IL|  37329|
|                GA|  36852|
|                TX|  36516|
|                AZ|  26426|
|                OH|  25302|
|                CA|  24260|
|                SC|  21836|
|                ME|  21574|
|                MN|  18227|
|                OK|  18165|
+------------------+-------+
only showing top 20 rows



In [22]:
# Aggregation tasks
# How often does each violation code occur? Display the frequency of the top five violation codes.
# How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'?
#(Hint: Find the top 5 for both.)

In [23]:
df2.select("Violation Code").show(5)

+--------------+
|Violation Code|
+--------------+
|             7|
|             7|
|             5|
|            47|
|            69|
+--------------+
only showing top 5 rows



In [24]:
# How often does each violation code occur? Display the frequency of the top five violation codes.
df2.select("Violation Code").groupBy("Violation Code").count().orderBy('count',ascending=False).show(5,False)

+--------------+-------+
|Violation Code|count  |
+--------------+-------+
|21            |1528588|
|36            |1400614|
|38            |1062304|
|14            |893498 |
|20            |618593 |
+--------------+-------+
only showing top 5 rows



In [25]:
# How often does each 'vehicle body type' get a parking ticket?
df2.select("Vehicle Body Type").groupBy("Vehicle Body Type").count().orderBy('count',ascending=False).show(5,False)

+-----------------+-------+
|Vehicle Body Type|count  |
+-----------------+-------+
|SUBN             |3719802|
|4DSD             |3082020|
|VAN              |1411970|
|DELV             |687330 |
|SDN              |438191 |
+-----------------+-------+
only showing top 5 rows



In [26]:
# How about the 'vehicle make'?
df2.select("Vehicle Make").groupBy("Vehicle Make").count().orderBy('count',ascending=False).show(5,False)

+------------+-------+
|Vehicle Make|count  |
+------------+-------+
|FORD        |1280958|
|TOYOT       |1211451|
|HONDA       |1079238|
|NISSA       |918590 |
|CHEVR       |714655 |
+------------+-------+
only showing top 5 rows



In [27]:
# A precinct is a police station that has a certain zone of the city under its command.
# Find the (5 highest) frequencies of tickets for each of the following:
# 'Violation Precinct' (This is the precinct of the zone where the violation occurred).
# Using this, can you draw any insights for parking violations in any specific areas of the city?
# 'Issuer Precinct' (This is the precinct that issued the ticket).
# Here, you would have noticed that the dataframe has the 'Violating Precinct' or 'Issuing Precinct' as '0'.
# These are erroneous entries. Hence, you need to provide the records for five correct precincts.
# (Hint: Print the top six entries after sorting.)

In [28]:
df2.select("Violation Precinct").groupBy("Violation Precinct").count().orderBy("count",ascending=False).show(6,False)

+------------------+-------+
|Violation Precinct|count  |
+------------------+-------+
|0                 |2072400|
|19                |535671 |
|14                |352450 |
|1                 |331810 |
|18                |306920 |
|114               |296514 |
+------------------+-------+
only showing top 6 rows



In [29]:
df2.select("Issuer Precinct").groupBy("Issuer Precinct").count().orderBy("count",ascending=False).show(6,False)

+---------------+-------+
|Issuer Precinct|count  |
+---------------+-------+
|0              |2388479|
|19             |521513 |
|14             |344977 |
|1              |321170 |
|18             |296553 |
|114            |289950 |
+---------------+-------+
only showing top 6 rows



In [30]:
# Find the violation code frequencies for three precincts that have issued the most number of tickets. 
# Do these precinct zones have an exceptionally high frequency of certain violation codes? 
# Are these codes common across precincts? (Hint: In the SQL view, use the 'where' attribute to filter 
# among three precincts.)

In [31]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data where `Issuer Precinct` = '19'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             19|            46|        86390|
|             19|            37|        72437|
|             19|            38|        72344|
|             19|            14|        57563|
|             19|            21|        54700|
+---------------+--------------+-------------+



In [32]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data where `Issuer Precinct` = '14'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|             14|            14|        73837|
|             14|            69|        58026|
|             14|            31|        39857|
|             14|            47|        30540|
|             14|            42|        20663|
+---------------+--------------+-------------+



In [33]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data where `Issuer Precinct` = '1'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

+---------------+--------------+-------------+
|Issuer Precinct|Violation Code|count_tickets|
+---------------+--------------+-------------+
|              1|            14|        73522|
|              1|            16|        38937|
|              1|            20|        27841|
|              1|            46|        22534|
|              1|            38|        16989|
+---------------+--------------+-------------+



In [34]:
#Violation code 14 is common.

In [35]:
# Find out the properties of parking violations across different times of the day:
# Find a way to deal with missing values, if any. (Hint: Check for the null values using
#'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)
# The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you 
# can use to divide into groups.
# Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these 
# groups, find the three most commonly occurring violations. (Hint: Use the CASE-WHEN in SQL view to segregate 
# into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned 
# in the hint for question 4.)
# Now, try another direction. For the three most commonly occurring violation codes, find the most common 
# time of the day (in terms of the bins from the previous part).

In [36]:
df2.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Plate Type',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Issuing Agency',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Vehicle Expiration Date',
 'Violation Location',
 'Violation Precinct',
 'Issuer Precinct',
 'Issuer Code',
 'Issuer Command',
 'Issuer Squad',
 'Violation Time',
 'Time First Observed',
 'Violation County',
 'Violation In Front Of Or Opposite',
 'House Number',
 'Street Name',
 'Intersecting Street',
 'Date First Observed',
 'Law Section',
 'Sub Division',
 'Violation Legal Code',
 'Days Parking In Effect    ',
 'From Hours In Effect',
 'To Hours In Effect',
 'Vehicle Color',
 'Unregistered Vehicle?',
 'Vehicle Year',
 'Meter Number',
 'Feet From Curb',
 'Violation Post Code',
 'Violation Description',
 'No Standing or Stopping Violation',
 'Hydrant Violation',
 'Double Parking Violation',
 'Year_Column']

In [37]:
df2.select("Violation Description").distinct().show(100,False)

+------------------------------+
|Violation Description         |
+------------------------------+
|49-Excavation (obstruct traff)|
|80-Missing Equipment (specify)|
|74B-Covered Plate             |
|70B-Impropr Dsply of Reg (NYS)|
|71A-Insp Sticker Expired (NYS)|
|50-Crosswalk                  |
|16A-No Std (Com Veh) Non-COM  |
|16-No Std (Com Veh) Com Plate |
|42-Exp. Muni-Mtr (Com. Mtr. Z)|
|04-Downtown Bus Area, 3 Hr Lim|
|09-Blocking the Box           |
|96-Railroad Crossing          |
|91-Veh for Sale (Dealer Only) |
|71B-Improp Safety Stkr (NYS)  |
|02-No operator N/A/PH         |
|75-No Match-Plate/Reg. Sticker|
|14-No Standing                |
|24-No Parking (exc auth veh)  |
|70-Reg. Sticker Missing (NYS) |
|null                          |
|70A-Reg. Sticker Expired (NYS)|
|47A-Angle PKG - Midtown       |
|22-No Parking (exc hotel load)|
|84-Platform lifts in low posit|
|89-No Stand Exc Com Plate     |
|03-Unauth passenger pick-up   |
|78-Nighttime PKG on Res Street|
|48-Bike L

In [39]:
df2.show(5,truncate=False)

+--------------+--------+------------------+----------+-------------------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+-----------------------------+---------------------------------+-----------------+------------------------+-----------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date         |Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location

- Find out the properties of parking violations across different times of the day:
    - Find a way to deal with missing values, if any.
        (Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

    - The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

    - Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
        (Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

    - Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

- Let’s try and find some seasonality in this data:

    - First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

    - Then, find the three most common violations for each of these seasons.
        (Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

- The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
    - Find the total occurrences of the three most common violation codes.
    - Then, visit the website:
        http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
        It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
    - Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.
    - What can you intuitively infer from these findings?

In [42]:
spark.sql("Select count(*) as cnt from data where `Violation Time` is Null ").show(truncate=False)

+---+
|cnt|
+---+
|63 |
+---+



In [44]:
df2.where(col('Violation Time').isNull()).count()

63

In [45]:
df2.select([count(when(col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+-----------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct

In [47]:
df2.where(df2['Violation Time'].isNull()).count()

63

In [48]:
df_filtered = df2.dropna(subset=['Violation Time'])

In [49]:
df_filtered.where(col('Violation Time').isNull()).count()

0

In [51]:
#The Violation Time field is specified in a strange format.
#Find a way to make this a time attribute that you can use to divide into groups.
df_filtered.select("Violation Time").show(5,truncate=False)

+--------------+
|Violation Time|
+--------------+
|0143A         |
|0400P         |
|0233P         |
|1120A         |
|0555P         |
+--------------+
only showing top 5 rows



In [52]:
from pyspark.sql import functions as F

In [65]:
df_filtered = df_filtered.withColumn(
    "Violation Time Formatted",
    F.concat(
        F.expr('substring(`Violation Time`, 1, 2)'),  # Hour part
        F.lit(":"),
        F.expr('substring(`Violation Time`, 3, 2)'),  # Minute part
        F.lit(" "),
        F.when(F.expr('substring(`Violation Time`, 5, 1)') == "A", F.lit("AM"))
         .otherwise(F.lit("PM"))  # Append "AM" or "PM" based on the last character
    )
)


In [69]:
df_filtered.select("Violation Time","Violation Time Formatted").show()

+--------------+------------------------+
|Violation Time|Violation Time Formatted|
+--------------+------------------------+
|         0143A|                01:43 AM|
|         0400P|                04:00 PM|
|         0233P|                02:33 PM|
|         1120A|                11:20 AM|
|         0555P|                05:55 PM|
|         0852P|                08:52 PM|
|         0215A|                02:15 AM|
|         0758A|                07:58 AM|
|         1005A|                10:05 AM|
|         0845A|                08:45 AM|
|         0015A|                00:15 AM|
|         0707A|                07:07 AM|
|         1022A|                10:22 AM|
|         1150A|                11:50 AM|
|         0525A|                05:25 AM|
|         0645P|                06:45 PM|
|         1122A|                11:22 AM|
|         0256P|                02:56 PM|
|         1232A|                12:32 AM|
|         1034A|                10:34 AM|
+--------------+------------------

In [83]:
df_converted = df_filtered.withColumn(
    "Violation Time 24 Hour",
    F.when((F.expr("substring(`Violation Time`,5,1)") == "A") & (F.expr("substring(`Violation Time`,1,2)") == "12"),
        F.concat(F.lit("00"), F.lit(":"), F.expr("substring(`Violation Time`, 3, 2)"))
    ).otherwise(        
        F.when(
            F.expr("substring(`Violation Time`, 5, 1)") == 'A',
            # If it's AM, just use the hour and minutes as is
            F.concat(F.expr("substring(`Violation Time`, 1, 2)"), 
                     F.lit(":"), 
                     F.expr("substring(`Violation Time`, 3, 2)"))
        ).otherwise(
            # Convert PM time to 24-hour format by adding 12 to the hour
            F.concat(
                (F.expr("cast(substring(`Violation Time`, 1, 2) as int)") + 12).cast("string"), 
                F.lit(":"), 
                F.expr("substring(`Violation Time`, 3, 2)")
            )
        )
    )
)




In [None]:
df_converted = df_filtered.withColumn(
    "Violation Time 24 Hour",
    F.when(F.expr("substring()"))


)



In [81]:
df_converted.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Plate Type',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Issuing Agency',
 'Street Code1',
 'Street Code2',
 'Street Code3',
 'Vehicle Expiration Date',
 'Violation Location',
 'Violation Precinct',
 'Issuer Precinct',
 'Issuer Code',
 'Issuer Command',
 'Issuer Squad',
 'Violation Time',
 'Time First Observed',
 'Violation County',
 'Violation In Front Of Or Opposite',
 'House Number',
 'Street Name',
 'Intersecting Street',
 'Date First Observed',
 'Law Section',
 'Sub Division',
 'Violation Legal Code',
 'Days Parking In Effect    ',
 'From Hours In Effect',
 'To Hours In Effect',
 'Vehicle Color',
 'Unregistered Vehicle?',
 'Vehicle Year',
 'Meter Number',
 'Feet From Curb',
 'Violation Post Code',
 'Violation Description',
 'No Standing or Stopping Violation',
 'Hydrant Violation',
 'Double Parking Violation',
 'Year_Column',
 'Violation Time Formatted',
 'Violation Time 24 Hour']

In [75]:
df_converted.select(
 'Violation Time',
 'Violation Time Formatted',
 'Violation Time 24 Hour').show(truncate=False)

+--------------+------------------------+----------------------+
|Violation Time|Violation Time Formatted|Violation Time 24 Hour|
+--------------+------------------------+----------------------+
|0143A         |01:43 AM                |01:43                 |
|0400P         |04:00 PM                |16:00                 |
|0233P         |02:33 PM                |14:33                 |
|1120A         |11:20 AM                |11:20                 |
|0555P         |05:55 PM                |17:55                 |
|0852P         |08:52 PM                |20:52                 |
|0215A         |02:15 AM                |02:15                 |
|0758A         |07:58 AM                |07:58                 |
|1005A         |10:05 AM                |10:05                 |
|0845A         |08:45 AM                |08:45                 |
|0015A         |00:15 AM                |00:15                 |
|0707A         |07:07 AM                |07:07                 |
|1022A         |10:22 AM 

In [84]:
df_converted.select(
 'Violation Time',
 'Violation Time Formatted',
 'Violation Time 24 Hour').show(truncate=False)

+--------------+------------------------+----------------------+
|Violation Time|Violation Time Formatted|Violation Time 24 Hour|
+--------------+------------------------+----------------------+
|0143A         |01:43 AM                |01:43                 |
|0400P         |04:00 PM                |16:00                 |
|0233P         |02:33 PM                |14:33                 |
|1120A         |11:20 AM                |11:20                 |
|0555P         |05:55 PM                |17:55                 |
|0852P         |08:52 PM                |20:52                 |
|0215A         |02:15 AM                |02:15                 |
|0758A         |07:58 AM                |07:58                 |
|1005A         |10:05 AM                |10:05                 |
|0845A         |08:45 AM                |08:45                 |
|0015A         |00:15 AM                |00:15                 |
|0707A         |07:07 AM                |07:07                 |
|1022A         |10:22 AM 

    - Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
        (Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

    - Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [85]:
df_converted.createOrReplaceTempView("time_violation_data")

time_violation_analysis = spark.sql('''select case
                                       when int(substring(`Violation Time 24 Hour`,1,2)) between 00 and 03
                                       then '00:00-03:59'
                                       when int(substring(`Violation Time 24 Hour`,1,2)) between 04 and 07
                                       then '04:00-07:59'
                                       when int(substring(`Violation Time 24 Hour`,1,2)) between 08 and 11
                                       then '08:00-11:59'
                                       when int(substring(`Violation Time 24 Hour`,1,2)) between 12 and 15
                                       then '12:00-15:59'
                                       when int(substring(`Violation Time 24 Hour`,1,2)) between 16 and 19
                                       then '16:00-19:59'
                                       else '20:00-23:59'
                                       end as bins,  `Violation Time`, `Violation Code`,`Violation Time Formatted`,
                                       `Violation Time 24 Hour`
                                       from time_violation_data''')


In [86]:
time_violation_analysis.show()

+-----------+--------------+--------------+------------------------+----------------------+
|       bins|Violation Time|Violation Code|Violation Time Formatted|Violation Time 24 Hour|
+-----------+--------------+--------------+------------------------+----------------------+
|00:00-03:59|         0143A|             7|                01:43 AM|                 01:43|
|16:00-19:59|         0400P|             7|                04:00 PM|                 16:00|
|12:00-15:59|         0233P|             5|                02:33 PM|                 14:33|
|08:00-11:59|         1120A|            47|                11:20 AM|                 11:20|
|16:00-19:59|         0555P|            69|                05:55 PM|                 17:55|
|20:00-23:59|         0852P|             7|                08:52 PM|                 20:52|
|00:00-03:59|         0215A|            40|                02:15 AM|                 02:15|
|04:00-07:59|         0758A|            36|                07:58 AM|            

In [None]:
# - Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
# For each of these groups, find the three most commonly occurring violations.
# (Hint: Use the CASE-WHEN in SQL view to segregate into bins.
# To find the most commonly occurring violations, you can use an approach similar to the one 
# mentioned in the hint for question 4.)


In [89]:
time_violation_analysis.groupBy("bins",'Violation Code').count().orderBy("count",ascending=False).show(100,False)

+-----------+--------------+-------+
|bins       |Violation Code|count  |
+-----------+--------------+-------+
|08:00-11:59|21            |1182689|
|08:00-11:59|36            |751422 |
|12:00-15:59|36            |376961 |
|12:00-15:59|38            |356253 |
|08:00-11:59|38            |346518 |
|08:00-11:59|14            |274288 |
|12:00-15:59|37            |265848 |
|08:00-11:59|46            |213696 |
|20:00-23:59|36            |211434 |
|16:00-19:59|38            |203232 |
|12:00-15:59|14            |201379 |
|08:00-11:59|71            |192307 |
|12:00-15:59|20            |177007 |
|08:00-11:59|20            |175688 |
|12:00-15:59|46            |172962 |
|12:00-15:59|71            |155062 |
|20:00-23:59|38            |153537 |
|16:00-19:59|37            |145784 |
|16:00-19:59|14            |144749 |
|20:00-23:59|21            |144082 |
|04:00-07:59|14            |141276 |
|08:00-11:59|40            |132488 |
|16:00-19:59|7             |131768 |
|04:00-07:59|21            |119469 |
|

In [90]:
time_violation_analysis.groupBy("bins",'Violation Code').count().orderBy("bins",ascending=False).show(100,False)

+-----------+--------------+------+
|bins       |Violation Code|count |
+-----------+--------------+------+
|20:00-23:59|98            |10154 |
|20:00-23:59|23            |2590  |
|20:00-23:59|10            |5327  |
|20:00-23:59|55            |44    |
|20:00-23:59|65            |73    |
|20:00-23:59|39            |242   |
|20:00-23:59|46            |81583 |
|20:00-23:59|24            |6177  |
|20:00-23:59|13            |3030  |
|20:00-23:59|56            |234   |
|20:00-23:59|3             |31    |
|20:00-23:59|19            |43978 |
|20:00-23:59|89            |398   |
|20:00-23:59|69            |17651 |
|20:00-23:59|43            |62    |
|20:00-23:59|48            |9801  |
|20:00-23:59|99            |729   |
|20:00-23:59|6             |139   |
|20:00-23:59|2             |14    |
|20:00-23:59|52            |90    |
|20:00-23:59|80            |523   |
|20:00-23:59|93            |6     |
|20:00-23:59|83            |1153  |
|20:00-23:59|82            |3892  |
|20:00-23:59|11            |

In [93]:
# Updating the SQL view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [94]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '00:00-03:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-----+
|       bins|Violation Code|count|
+-----------+--------------+-----+
|00:00-03:59|            21|77461|
|00:00-03:59|            40|50948|
|00:00-03:59|            78|32243|
+-----------+--------------+-----+



In [95]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '04:00-07:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+------+
|       bins|Violation Code| count|
+-----------+--------------+------+
|04:00-07:59|            14|141276|
|04:00-07:59|            21|119469|
|04:00-07:59|            40|112186|
+-----------+--------------+------+



In [96]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '08:00-11:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+-------+
|       bins|Violation Code|  count|
+-----------+--------------+-------+
|08:00-11:59|            21|1182689|
|08:00-11:59|            36| 751422|
|08:00-11:59|            38| 346518|
+-----------+--------------+-------+



In [97]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '12:00-15:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+-----------+--------------+------+
|       bins|Violation Code| count|
+-----------+--------------+------+
|12:00-15:59|            36|376961|
|12:00-15:59|            38|356253|
|12:00-15:59|            37|265848|
+-----------+--------------+------+



In [98]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '16:00-20:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+----+--------------+-----+
|bins|Violation Code|count|
+----+--------------+-----+
+----+--------------+-----+



In [99]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '21:00-23:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

+----+--------------+-----+
|bins|Violation Code|count|
+----+--------------+-----+
+----+--------------+-----+

