## Big data case study - NYC Parking Tickets: An Exploratory Analysis                                                                    
 Members:                                                                                                
 1.Bhanu Pratap                                                                                            
 2.Shadab Hussain                                                                                     
 3.Md. Liyakat                                                                                           
 4.Nidhi Tripathi
 
 ### Brief on subject: 
The NYC Department of Finance collects data on every parking ticket issued in NYC (~10M per year!). This data is made publicly 
available to aid in ticket resolution and to guide policymakers. New York City is a thriving metropolis. Just like most other 
metros that size, one of the biggest problems its citizens face, is parking. The classic combination of a huge number of cars,
and a cramped geography is the exact recipe that leads to a huge number of parking tickets. In an attempt to scientifically analyse
this phenomenon, the NYC Police Department has collected data for parking tickets. Out of these, the data files from 2014 to 2017
are publicly available on Kaggle. We will try and perform some exploratory analysis on this data.

### Importing and Initialising spark 

In [1]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.
#A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and 
# read parquet files.To create a SparkSession, use the following builder pattern:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame and Sql") \
    .getOrCreate()

### Loading data if spark dataframe from the data stored in HDFS

In [2]:
df = spark.read.format('csv').options(header='false') \
     .load('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv' )

### Dispalying first 5 rows of dataframe

In [3]:
df.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|           _c0|     _c1|               _c2|       _c3|           _c4|              _c5|         _c6|               _c7|            _c8|           _c9|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|

In [4]:
#DataFame will have columns, and we use a schema to define them.
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [5]:
df.show(1)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|           _c0|     _c1|               _c2|       _c3|           _c4|              _c5|         _c6|               _c7|            _c8|           _c9|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
only showing top 1 row



In [6]:
# schema defines the column names and types of a DataFrame. We can either let a data source define the schema
# (called schema-on-read) or we can define it explicitly ourselves.
#A schema is a StructType made up of a number of fields, StructFields, that have a name, type, a Boolean flag which
#specifies whether that column can contain missing or null values, and, finally, users can optionally specify associated
#metadata with that column.

from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("Summons_Number", StringType(), True),
  StructField("Plate_ID", StringType(), True),
  StructField("Registration_State", StringType(), True),
  StructField("Issue_Date", StringType(), True),
  StructField("Violation_Code", StringType(), True),    
  StructField("Vehicle_Body_Type", StringType(), True),    
  StructField("Vehicle_Make", StringType(), True),    
  StructField("Violation_Precinct", StringType(), True),
  StructField("Issuer_Precinct", StringType(), True),    
  StructField("Violation_Time", StringType(), True),
])

df = spark.read.format('csv').schema(myManualSchema).load('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv' )

In [7]:
df.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|

#### Removing the 1st Row as it is redundant and contains columns names

In [8]:
df=df.filter(df['Summons_Number']!='Summons Number')
df.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

#### Converting Issue_Date to Issued data in pySpark Date format

In [9]:
from pyspark.sql.types import DateType
df = df.withColumn("Issued_date",df['Issue_Date'].cast(DateType()))

In [10]:
df.printSchema()

root
 |-- Summons_Number: string (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: string (nullable = true)
 |-- Violation_Code: string (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: string (nullable = true)
 |-- Issuer_Precinct: string (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Issued_date: date (nullable = true)



In [11]:
df.select('Issued_date','Issue_Date').show(5)

+-----------+----------+
|Issued_date|Issue_Date|
+-----------+----------+
| 2016-07-10|2016-07-10|
| 2016-07-08|2016-07-08|
| 2016-08-23|2016-08-23|
| 2017-06-14|2017-06-14|
| 2016-11-21|2016-11-21|
+-----------+----------+
only showing top 5 rows



#### Dropping Issue_Date Columns as we don't need it anymore

In [12]:
df = df.drop('Issue_Date')

### Selecting data only from Year 2017 in DataFrame

In [13]:
from pyspark.sql.functions import lit
df = df.filter((df["Issued_date"]>lit("2016-12-31")) & (df["Issued_date"]<lit("2017-12-31")) )
df.show(5)

+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+
|Summons_Number|Plate_ID|Registration_State|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|Issued_date|
+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+
|    8478629828| 66623ME|                NY|            47|             REFG|       MITSU|                14|             14|         1120A| 2017-06-14|
|    5096917368| FZD8593|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P| 2017-06-13|
|    1407740258| 2513JMG|                NY|            78|             DELV|       FRUEH|               106|            106|         0015A| 2017-01-11|
|    1413656420|T672371C|                NY|            40|             TAXI|     

### Conveting Custom Violation Time column into correct time format

In [14]:
def time_formatting(time):
    if time[-1]=='P' and time[:2]!="12":
            return str(int(time[:2])+12)+":"+time[2:4]+":"+"00"
    else:
        return time[:2]+":"+time[2:4]+":"+"00"
time_formatting("1220P")    

'12:20:00'

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType

time_udf = udf(lambda time: time_formatting(time), StringType())

In [16]:
df = df.withColumn("Time", time_udf(df.Violation_Time))

In [17]:
df.select('Violation_Time', 'Time').show(5)

+--------------+--------+
|Violation_Time|    Time|
+--------------+--------+
|         1120A|11:20:00|
|         0852P|20:52:00|
|         0015A|00:15:00|
|         0525A|05:25:00|
|         0256P|14:56:00|
+--------------+--------+
only showing top 5 rows



In [18]:
df.printSchema()

root
 |-- Summons_Number: string (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Violation_Code: string (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: string (nullable = true)
 |-- Issuer_Precinct: string (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Issued_date: date (nullable = true)
 |-- Time: string (nullable = true)



In [19]:
# from pyspark.sql.functions import col, to_date
# df = df.withColumn("Violation_Time", to_date(col("Time"), "HH:mm:ss"))
# df.select("Violation_Time","Issued_date").show(5)

### Voilation Time is in Time Format Now

In [20]:
df.show(5)

+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+
|Summons_Number|Plate_ID|Registration_State|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|Issued_date|    Time|
+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+
|    8478629828| 66623ME|                NY|            47|             REFG|       MITSU|                14|             14|         1120A| 2017-06-14|11:20:00|
|    5096917368| FZD8593|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P| 2017-06-13|20:52:00|
|    1407740258| 2513JMG|                NY|            78|             DELV|       FRUEH|               106|            106|         0015A| 2017-01-11|00:15:00|
|    1413656420|T672371C|   

## Creating RDMS temp Table from DataFrame for Executing SQL Queries

In [21]:
df.createOrReplaceTempView("dfTable")
# after registering temp table you can run your sql queries

In [22]:
spark.sql('Select * from dfTable limit 5').show()

+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+
|Summons_Number|Plate_ID|Registration_State|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|Issued_date|    Time|
+--------------+--------+------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+
|    8478629828| 66623ME|                NY|            47|             REFG|       MITSU|                14|             14|         1120A| 2017-06-14|11:20:00|
|    5096917368| FZD8593|                NY|             7|             SUBN|       ME/BE|                 0|              0|         0852P| 2017-06-13|20:52:00|
|    1407740258| 2513JMG|                NY|            78|             DELV|       FRUEH|               106|            106|         0015A| 2017-01-11|00:15:00|
|    1413656420|T672371C|   

## Examine the data:

### 1). Find the total number of tickets for the year

In [23]:
spark.sql('SELECT count(Summons_Number) as Ticket_Count FROM dfTable').show()

+------------+
|Ticket_Count|
+------------+
|     5431909|
+------------+



There are total `5431909` tickets for year 2017.

### 2.) Find out the number of unique states from where the cars that got parking tickets came.

#### Using SQL query 

In [24]:
spark.sql('SELECT count(Distinct(Registration_State)) as Distinct_State_Count FROM dfTable').show()

+--------------------+
|Distinct_State_Count|
+--------------------+
|                  65|
+--------------------+



#### Using PySpark inbuilt function

In [25]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("Registration_State").alias('Distinct_State_Count')).show()

+--------------------+
|Distinct_State_Count|
+--------------------+
|                  65|
+--------------------+



In [26]:
spark.sql('SELECT count(*) as State_Count , Registration_State  FROM dfTable group by Registration_State \
          ORDER by State_Count').show()

+-----------+------------------+
|State_Count|Registration_State|
+-----------+------------------+
|          8|                FO|
|          9|                SK|
|         17|                MB|
|         38|                PR|
|         54|                BC|
|         57|                NB|
|         61|                PE|
|         79|                AB|
|        156|                HI|
|        188|                WY|
|        254|                ND|
|        298|                AK|
|        322|                NS|
|        348|                GV|
|        505|                MT|
|        561|                UT|
|        704|                NE|
|        706|                KS|
|        725|                NV|
|        763|                ID|
+-----------+------------------+
only showing top 20 rows



#### Filtering State with Max Count of tickets for Replacement with 99

In [27]:
spark.sql('SELECT count(*) as State_Count , Registration_State  FROM dfTable group by Registration_State \
          ORDER by State_Count Desc limit 1').show()

+-----------+------------------+
|State_Count|Registration_State|
+-----------+------------------+
|    4273944|                NY|
+-----------+------------------+



Replacing the `99` in registration state with the `NY`.

In [28]:
from pyspark.sql.functions import when

targetDf = df.withColumn("Registration_State", \
              when(df["Registration_State"] == '99', 'NY').otherwise(df['Registration_State']))

In [29]:
targetDf.createOrReplaceTempView("dfTable")
spark.sql('SELECT count(*) as State_Count , Registration_State  FROM dfTable group by Registration_State \
          ORDER by State_Count Desc').show()

+-----------+------------------+
|State_Count|Registration_State|
+-----------+------------------+
|    4289999|                NY|
|     475824|                NJ|
|     140285|                PA|
|      70403|                CT|
|      69468|                FL|
|      45525|                IN|
|      38941|                MA|
|      34367|                VA|
|      30213|                MD|
|      27152|                NC|
|      18827|                TX|
|      18666|                IL|
|      17537|                GA|
|      12379|                AZ|
|      12281|                OH|
|      12153|                CA|
|      10806|                ME|
|      10395|                SC|
|      10083|                MN|
|       9088|                OK|
+-----------+------------------+
only showing top 20 rows



#### As we can see 99 is not present in the list anymore, we can evaluate count of Distinct State with this updated Table

In [30]:
targetDf.select(countDistinct("Registration_State").alias('Distinct_State_Count')).show()

+--------------------+
|Distinct_State_Count|
+--------------------+
|                  64|
+--------------------+



***So there are 64 unique state***

## Aggregation tasks

### 1). How often does each violation code occur? Display the frequency of the top five violation codes.


In [31]:
spark.sql('SELECT count(distinct(Violation_Code)) as Distinct_Violation_Code_Count FROM dfTable').show()

+-----------------------------+
|Distinct_Violation_Code_Count|
+-----------------------------+
|                          100|
+-----------------------------+



In [32]:
spark.sql('SELECT  Violation_Code, count(*) as Violation_Code_Count   FROM dfTable group by Violation_Code \
          ORDER by Violation_Code_Count Desc').show(5)

+--------------+--------------------+
|Violation_Code|Violation_Code_Count|
+--------------+--------------------+
|            21|              768085|
|            36|              662765|
|            38|              542079|
|            14|              476664|
|            20|              319644|
+--------------+--------------------+
only showing top 5 rows



#### The top 5 Violation_Code with frequency are following:-<br>
1. `21`  occurance `768085`
2. `36`  occurance `662765`
3. `38`  occurance `542079`
4. `14`  occurance `476664`
5. `20`  occurance `319644`

### 2). How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? (Hint: Find the top 5 for both)

#### Analysis for how often does each 'vehicle body type' get parking ticket

In [33]:
spark.sql('SELECT count(distinct(Vehicle_Body_Type)) as Distinct_Vehicle_Body_Type_Count FROM dfTable').show()

+--------------------------------+
|Distinct_Vehicle_Body_Type_Count|
+--------------------------------+
|                            1165|
+--------------------------------+



In [34]:
spark.sql('SELECT  Vehicle_Body_Type, count(*) as Vehicle_Body_Type_Count   FROM dfTable group by Vehicle_Body_Type \
          ORDER by Vehicle_Body_Type_Count Desc').show(5)

+-----------------+-----------------------+
|Vehicle_Body_Type|Vehicle_Body_Type_Count|
+-----------------+-----------------------+
|             SUBN|                1883953|
|             4DSD|                1547312|
|              VAN|                 724027|
|             DELV|                 358984|
|              SDN|                 194191|
+-----------------+-----------------------+
only showing top 5 rows



#### The top 5 vehicle body type which get parking tickets are followings:-<br>
1. `SUBN` gets parking ticktes `1883369` times
2. `4DSD` gets parking ticktes `1547312` times
3. `VAN`  gets parking ticktes `723783`  times
4. `DELV` gets parking ticktes `358779`  times
5. `SDN`  gets parking ticktes `193453` times

#### Analysis for how often does each 'vehicle Make' get parking ticket

In [35]:
spark.sql('SELECT count(distinct(Vehicle_Make)) as Distinct_Vehicle_Make_Count FROM dfTable').show()

+---------------------------+
|Distinct_Vehicle_Make_Count|
+---------------------------+
|                       3179|
+---------------------------+



In [36]:
spark.sql('SELECT  Vehicle_Make, count(*) as Vehicle_Make_Count   FROM dfTable group by Vehicle_Make \
          ORDER by Vehicle_Make_Count Desc').show(5)

+------------+------------------+
|Vehicle_Make|Vehicle_Make_Count|
+------------+------------------+
|        FORD|            636843|
|       TOYOT|            605288|
|       HONDA|            538884|
|       NISSA|            462017|
|       CHEVR|            356031|
+------------+------------------+
only showing top 5 rows



#### The top 5 vehicle make which get parking tickets are followings:-<br>
1. `FORD`  gets parking ticktes `636843` times
2. `TOYOT` gets parking ticktes `605288` times
3. `HONDA` gets parking ticktes `538884` times
4. `NISSA` gets parking ticktes `462017` times
5. `CHEVR` gets parking ticktes `356031` times

### 3). A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
- **1). 'Violation Precinct' (This is the precinct of the zone where the violation occurred). <br>  Using this, can you draw any insights for parking violations in any specific areas of the city?<br>**
- **2). 'Issuer Precinct' (This is the precinct that issued the ticket.)**

Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. **
(Hint: Print the top six entries after sorting.)

In [37]:
spark.sql('SELECT  Violation_Precinct, count(*) as Violation_Precinct_Count   FROM dfTable  group by Violation_Precinct \
          ORDER by Violation_Precinct_Count Desc').show(6)

+------------------+------------------------+
|Violation_Precinct|Violation_Precinct_Count|
+------------------+------------------------+
|                 0|                  925595|
|                19|                  274444|
|                14|                  203553|
|                 1|                  174702|
|                18|                  169131|
|               114|                  147444|
+------------------+------------------------+
only showing top 6 rows



#### The top 5 Violation Precinct are `19`, `14`, `1`, `18`, `114`.

In [38]:
spark.sql('SELECT Issuer_Precinct, count(*) as Issuer_Precinct_Count   FROM dfTable group by Issuer_Precinct \
          ORDER by Issuer_Precinct_Count Desc').show(6)

+---------------+---------------------+
|Issuer_Precinct|Issuer_Precinct_Count|
+---------------+---------------------+
|              0|              1078404|
|             19|               266961|
|             14|               200495|
|              1|               168740|
|             18|               162994|
|            114|               144054|
+---------------+---------------------+
only showing top 6 rows



#### The top 5 Issuer Precinct are `19`, `14`, `1`, `18`, `114`.

### 4). Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

In [39]:
spark.sql('SELECT Issuer_Precinct, count(*) as Issuer_Precinct_Count FROM dfTable group by Issuer_Precinct \
          ORDER by Issuer_Precinct_Count Desc limit 4').show()

+---------------+---------------------+
|Issuer_Precinct|Issuer_Precinct_Count|
+---------------+---------------------+
|              0|              1078404|
|             19|               266961|
|             14|               200495|
|              1|               168740|
+---------------+---------------------+



In [40]:
spark.sql('SELECT  Issuer_Precinct,Violation_Code, count(*) as Violation_Code_Count  FROM dfTable \
            where Issuer_Precinct in ( "19", "14", "1") \
            group by Violation_Code,Issuer_Precinct \
            ORDER by Violation_Code_Count Desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Violation_Code_Count|
+---------------+--------------+--------------------+
|             19|            46|               48445|
|             14|            14|               45036|
|              1|            14|               38354|
|             19|            38|               36386|
|             19|            37|               36056|
+---------------+--------------+--------------------+
only showing top 5 rows



#### Precinct 19

In [41]:
spark.sql('SELECT  Issuer_Precinct,Violation_Code, count(*) as Violation_Code_Count  FROM dfTable \
            where Issuer_Precinct = "19" \
            group by Violation_Code,Issuer_Precinct \
            ORDER by Violation_Code_Count Desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Violation_Code_Count|
+---------------+--------------+--------------------+
|             19|            46|               48445|
|             19|            38|               36386|
|             19|            37|               36056|
|             19|            14|               29797|
|             19|            21|               28415|
+---------------+--------------+--------------------+
only showing top 5 rows



 In `Precinct 19`, top 5 Violation_Code are `46`, `38`, `37`, `14`, `21`

#### Precinct 14

In [42]:
spark.sql('SELECT  Issuer_Precinct,Violation_Code, count(*) as Violation_Code_Count  FROM dfTable \
            where Issuer_Precinct = "14" \
            group by Violation_Code,Issuer_Precinct \
            ORDER by Violation_Code_Count Desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Violation_Code_Count|
+---------------+--------------+--------------------+
|             14|            14|               45036|
|             14|            69|               30464|
|             14|            31|               22555|
|             14|            47|               18364|
|             14|            42|               10027|
+---------------+--------------+--------------------+
only showing top 5 rows



In `Precinct 14` top 5 Violation_Code are `14`,`69`,`31`,`47`, `42`.

#### Precinct 1

In [43]:
spark.sql('SELECT  Issuer_Precinct,Violation_Code, count(*) as Violation_Code_Count  FROM dfTable \
            where Issuer_Precinct = "1" \
            group by Violation_Code,Issuer_Precinct \
            ORDER by Violation_Code_Count Desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Violation_Code_Count|
+---------------+--------------+--------------------+
|              1|            14|               38354|
|              1|            16|               19081|
|              1|            20|               15408|
|              1|            46|               12745|
|              1|            38|                8535|
+---------------+--------------+--------------------+
only showing top 5 rows



In `Precinct 1` top 5 Violation_Code are `14`,`16`,`20`,`46`, `38`.

## 5. Find out the properties of parking violations across different times of the day:
- Find a way to deal with missing values, if any.
(Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

- The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

- Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
(Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

- Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).



In [44]:
df = df.na.drop()

In [45]:
df.select('Time').show()

+--------+
|    Time|
+--------+
|11:20:00|
|20:52:00|
|00:15:00|
|05:25:00|
|14:56:00|
|12:32:00|
|10:34:00|
|10:21:00|
|07:21:00|
|09:40:00|
|12:23:00|
|10:28:00|
|01:48:00|
|12:06:00|
|13:41:00|
|08:22:00|
|08:20:00|
|10:43:00|
|14:04:00|
|08:53:00|
+--------+
only showing top 20 rows



In [46]:
def time_bins(time):
    if time[:2] in ["00", "01", "02" , "03"]:
        return 'Slot_1'
    elif time[:2] in ["04", "05", "06" , "07"]:
        return 'Slot_2'
    elif time[:2] in ["08", "09", "10" , "11"]:
        return 'Slot_3'
    elif time[:2] in ["12", "13", "14" , "15"]:
        return 'Slot_4'
    elif time[:2] in ["16", "17", "18" , "19"]:
        return 'Slot_5'
    elif time[:2] in ["20", "21", "22" , "23"]:
        return 'Slot_6'
    else:
        return 'None'


In [47]:
timebin_udf = udf(lambda time : time_bins(time), StringType())

In [48]:
df = df.withColumn("Time_Bin" , timebin_udf(df.Time))

In [49]:
df.select('Time','Time_Bin').show(5)

+--------+--------+
|    Time|Time_Bin|
+--------+--------+
|11:20:00|  Slot_3|
|20:52:00|  Slot_6|
|00:15:00|  Slot_1|
|05:25:00|  Slot_2|
|14:56:00|  Slot_4|
+--------+--------+
only showing top 5 rows



In [50]:
df = df.filter(df['Time_Bin'] != 'None')

In [51]:
df.createOrReplaceTempView('binTab')

### Querying top 3 Violation Codes for each Time_Slot

#### Slot_1

In [52]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_1" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+-----+
|Violation_Code|Count|
+--------------+-----+
|            21|34703|
|            40|23628|
|            14|14168|
+--------------+-----+



For slot_1, the top 3 Violation_Code are `21`, `40`, `14`.

#### Slot_2

In [53]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_2" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+-----+
|Violation_Code|Count|
+--------------+-----+
|            14|74114|
|            40|60652|
|            21|57897|
+--------------+-----+



For slot_2, the top 3 Violation_Code are `14`, `40`, `21`.

#### Slot_3

In [54]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_3" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+------+
|Violation_Code| Count|
+--------------+------+
|            21|598068|
|            36|348165|
|            38|176570|
+--------------+------+



For slot_3, the top 3 Violation_Code are `21`, `36`, `38`.

#### Slot_4

In [55]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_4" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+------+
|Violation_Code| Count|
+--------------+------+
|            36|286284|
|            38|240795|
|            37|167044|
+--------------+------+



For slot_4, the top 3 Violation_Code are `36`, `38`, `37`.

#### Slot 5

In [56]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_5" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+------+
|Violation_Code| Count|
+--------------+------+
|            38|102855|
|            14| 75902|
|            37| 70345|
+--------------+------+



For slot_5, the top 3 Violation_Code are `38`, `14`, `37`.

#### Slot_6

In [57]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab where Time_Bin="Slot_6" \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+-----+
|Violation_Code|Count|
+--------------+-----+
|             7|26293|
|            40|22337|
|            14|21045|
+--------------+-----+



For slot_6, the top 3 Violation_Code are `7`, `40`, `14`.

### Part 2:


In [59]:
spark.sql('SELECT  Violation_Code,  count(Violation_Code) as Count from binTab  \
            group by Violation_Code order by Count desc limit 3' ).show()

+--------------+------+
|Violation_Code| Count|
+--------------+------+
|            21|768060|
|            36|662765|
|            38|542078|
+--------------+------+



### So Most Frequent Codes are 21,36, 38, Let's Find out the most frequent time_slot for each of these codes

#### Violation Code 21

In [60]:
spark.sql('SELECT Time_Bin , count(Violation_Code) as Count from binTab \
            where Violation_Code = 21 group by Time_Bin Order by Count Desc Limit 1').show()

+--------+------+
|Time_Bin| Count|
+--------+------+
|  Slot_3|598068|
+--------+------+



the `Violation Code 21` is most frequent in `slot_3`.

#### Voilation Code 36

In [61]:
spark.sql('SELECT Time_Bin , count(Violation_Code) as Count from binTab \
            where Violation_Code = 36 group by Time_Bin Order by Count Desc Limit 1').show()

+--------+------+
|Time_Bin| Count|
+--------+------+
|  Slot_3|348165|
+--------+------+



the `Violation Code 36` is most frequent in `slot_3`.

#### Voilation Code 38

In [62]:
spark.sql('SELECT Time_Bin , count(Violation_Code) as Count from binTab \
            where Violation_Code = 38 group by Time_Bin Order by Count Desc Limit 1').show()

+--------+------+
|Time_Bin| Count|
+--------+------+
|  Slot_4|240795|
+--------+------+



the `Violation Code 38` is most frequent in `slot_4`.

## Question 6: Let’s try and find some seasonality in this data:

First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

Then, find the three most common violations for each of these seasons.
(Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

In [63]:
df.select('Issued_Date').show(5)

+-----------+
|Issued_Date|
+-----------+
| 2017-06-14|
| 2017-06-13|
| 2017-01-11|
| 2017-02-04|
| 2017-01-26|
+-----------+
only showing top 5 rows



### Function to divide data into American Seasons based on month of issued_date

In [64]:
def season_slot(year):
    year = str(year)
    if year[5:7] in ["03" , "04", "05"]:
        return "Spring"
    elif year[5:7] in ["06" , "07", "08"]:
        return "Summer"
    elif year[5:7] in ["09" , "10", "11"]:
        return "Fall"
    elif year[5:7] in ["12" , "01", "02"]:
        return "Winter"
    else:
        return "None"

In [65]:
season_udf = udf( lambda Issued_Date : season_slot((Issued_Date)),  StringType())

In [66]:
df = df.withColumn('Season', season_udf((df['Issued_Date'])))

df.select('Issued_Date', 'Season').show(5)

+-----------+------+
|Issued_Date|Season|
+-----------+------+
| 2017-06-14|Summer|
| 2017-06-13|Summer|
| 2017-01-11|Winter|
| 2017-02-04|Winter|
| 2017-01-26|Winter|
+-----------+------+
only showing top 5 rows



In [67]:
df.createOrReplaceTempView("SeasonTable")

#### Frequencies of tickets for each season

In [68]:
spark.sql('SELECT Season ,count(*) as Count from SeasonTable group BY season order by count(*) DESC' ).show()

+------+-------+
|Season|  Count|
+------+-------+
|Spring|2873339|
|Winter|1704663|
|Summer| 852855|
|  Fall|    979|
+------+-------+



##### Yes, we can clearly see seasonality as most number of Violations are in  `Spring` followed by  `Winter ` ,` Summer` and `Fall`  respectively.

### The three most common violations for each of these seasons

#### Summer

In [69]:
spark.sql('SELECT Season, Violation_Code ,count(*) as Count from SeasonTable  WHERE season = "Summer" \
group BY Violation_Code, Season ORDER by Count DESC LIMIT 3' ).show()

+------+--------------+------+
|Season|Violation_Code| Count|
+------+--------------+------+
|Summer|            21|127347|
|Summer|            36| 96663|
|Summer|            38| 83518|
+------+--------------+------+



For `Summer` top 3 Violation_Code are `21`, `36`, `38`.

#### Spring

In [70]:
spark.sql('SELECT Season, Violation_Code ,count(*) as Count from SeasonTable  WHERE season = "Spring" \
group BY Violation_Code, Season ORDER by Count DESC LIMIT 3' ).show()

+------+--------------+------+
|Season|Violation_Code| Count|
+------+--------------+------+
|Spring|            21|402408|
|Spring|            36|344834|
|Spring|            38|271167|
+------+--------------+------+



For `Spring` top 3 Violation_Code are `21`, `36`, `38`.

#### Winter

In [72]:
spark.sql('SELECT Season, Violation_Code ,count(*) as count from SeasonTable  WHERE season = "Winter" \
group BY Violation_Code, Season ORDER by count DESC LIMIT 3' ).show()

+------+--------------+------+
|Season|Violation_Code| count|
+------+--------------+------+
|Winter|            21|238177|
|Winter|            36|221268|
|Winter|            38|187385|
+------+--------------+------+



For `Winter` top 3 Violation_Code are `21`, `36`, `38`.

## Question : 7

In [73]:
spark.sql('SELECT  Violation_Code, count(*) as Violation_Code_Count   FROM dfTable group by Violation_Code \
          ORDER by Violation_Code_Count Desc limit 3').show()

+--------------+--------------------+
|Violation_Code|Violation_Code_Count|
+--------------+--------------------+
|            21|              768085|
|            36|              662765|
|            38|              542079|
+--------------+--------------------+



### Fines:
* For code 21  fine = 65+45/2 = 55 dollars
* For code 36 fine = 50+50/2 = 50 dollars
* For code 38 fine = (65+35)/2 = 50 dollars 


### Calulating revenue generated by each fine

In [74]:
spark.sql('SELECT  Violation_Code, count(*) as Violation_Code_Count , \
          CASE \
              WHEN Violation_Code="21" THEN count(*)*55 \
              WHEN Violation_Code="36" THEN count(*)*50 \
              WHen Violation_Code="38" THEN count(*)*50 \
          END as Revenue \
          FROM dfTable group by Violation_Code \
          ORDER by Violation_Code_Count Desc limit 3').show()

+--------------+--------------------+--------+
|Violation_Code|Violation_Code_Count| Revenue|
+--------------+--------------------+--------+
|            21|              768085|42244675|
|            36|              662765|33138250|
|            38|              542079|27103950|
+--------------+--------------------+--------+



### Selecting violation code which results in Maximum revenue for Traffic Department

In [75]:
spark.sql('SELECT  Violation_Code, count(*) as Violation_Code_Count , \
          CASE \
              WHEN Violation_Code="21" THEN count(*)*55 \
              WHEN Violation_Code="36" THEN count(*)*50 \
              WHEN Violation_Code="38" THEN count(*)*50 \
          END as Revenue \
          FROM dfTable where Violation_Code in ("21" , "36" , "38") \
          group by Violation_Code \
          ORDER by Revenue Desc limit 1').show()

+--------------+--------------------+--------+
|Violation_Code|Violation_Code_Count| Revenue|
+--------------+--------------------+--------+
|            21|              768085|42244675|
+--------------+--------------------+--------+



### Note: Ordered by Revenue

### Stopping Spark Session

In [76]:
spark.stop()