In [1]:
# The code was removed by Watson Studio for sharing.

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200418222308-0000
KERNEL_ID = e68ac198-67e6-4ebb-b9f9-7425d355a5e0


## NYC_Parking_Violations_IBM_Capstone
### Notebook 2
### Extract Transform Load (ETL)

In [2]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import (dayofmonth, dayofyear, month, year, 
                                   weekofyear, datediff, to_date, col)

In [3]:
# The code was removed by Watson Studio for sharing.

[Row(Plate='126798R', State='NJ', License Type='PAS', Summons Number='8604887570', Issue Date='05/17/2018', Violation Time='04:12P', Violation='FIRE HYDRANT', Judgment Entry Date=None, Fine Amount='115.00', Penalty Amount='10.00', Interest Amount='0.00', Reduction Amount='0.00', Payment Amount='0.00', Amount Due='125.00', Precinct='115', County='Q', Issuing Agency='TRAFFIC', Violation Status=None, Summons Image='View Summons (http://nycserv.nyc.gov/NYCServWeb/ShowImage?searchID=VDBSWmQwNUVaelJPZWxVelRVRTlQUT09&locationName=_____________________)'),
 Row(Plate='HJZ2125', State='PA', License Type='PAS', Summons Number='8604887738', Issue Date='05/22/2018', Violation Time='02:44P', Violation='NO PARKING-DAY/TIME LIMITS', Judgment Entry Date=None, Fine Amount='60.00', Penalty Amount='10.00', Interest Amount='0.00', Reduction Amount='0.00', Payment Amount='0.00', Amount Due='70.00', Precinct='109', County='Q', Issuing Agency='TRAFFIC', Violation Status=None, Summons Image='View Summons (htt

In [4]:
df = df_data_1

In [5]:
df.columns

['Plate',
 'State',
 'License Type',
 'Summons Number',
 'Issue Date',
 'Violation Time',
 'Violation',
 'Judgment Entry Date',
 'Fine Amount',
 'Penalty Amount',
 'Interest Amount',
 'Reduction Amount',
 'Payment Amount',
 'Amount Due',
 'Precinct',
 'County',
 'Issuing Agency',
 'Violation Status',
 'Summons Image']

In [6]:
# Dropping id columns, columns with many missing values, etc
df_1 = df.drop('Plate',
 'Summons Number',
 'Violation Time',
 'Judgment Entry Date',
 'Interest Amount',
 'Violation Status',
 'Summons Image').na.drop()

In [7]:
# Renaming a few columns to avoid issue with filtering
df_2 = df_1.withColumnRenamed('License Type','License_Type').\
                withColumnRenamed('Issuing Agency','Issuing_Agency')

In [8]:
df_2.columns

['State',
 'License_Type',
 'Issue Date',
 'Violation',
 'Fine Amount',
 'Penalty Amount',
 'Reduction Amount',
 'Payment Amount',
 'Amount Due',
 'Precinct',
 'County',
 'Issuing_Agency']

In [9]:
# top 12 states
# top 4  violation categories
# top 10 counties
# top 10 License types
# top 5 Issusing agencies

df_3 = df_2.filter('State in ("NY", "NJ", "PA", "FL", "CT", "IN", "MA", "VA", "NC", "MD","TX","GA")')\
           .filter('Violation in ("NO PARKING-STREET CLEANING", \
                   "PHTO SCHOOL ZN SPEED VIOLATION", \
                   "FAIL TO DSPLY MUNI METER RECPT", \
                   "NO STANDING-DAY/TIME LIMITS")') \
          .filter('County in ("NY", "K", "Q", "BX", "BK", "QN", "ST", "R", "MN", "QUEEN") ')\
          .filter('License_Type in ("PAS", "COM", "OMT", "OMS", "SRF", "999","APP","IRP","MOT","TRC") ')\
          .filter('Issuing_Agency in ("TRAFFIC", "DEPARTMENT OF TRANSPORTATION", \
                                       "DEPARTMENT OF SANITATION", "POLICE DEPARTMENT",\
                                       "OTHER/UNKNOWN AGENCIES")')

In [10]:
df_4 = df_3.sample(True,0.005)

## Feature Engineering / Data Cleansing

## Transforming date entries

In [11]:
# Convert string dates to date_time format
df_5 = df_4.withColumn("Issue Date",to_date("Issue Date","MM/dd/yyyy"))

In [12]:
# Add feature columns: Year, Month, and Day
df_6 = df_5.withColumn('Year',year(df_5['Issue Date']))\
                    .withColumn('Month',month(df_5['Issue Date']))\
                         .withColumn('Day',dayofmonth(df_5['Issue Date']))

In [13]:
df_6.select('Issue Date','Year','Month','Day').show(5)

+----------+----+-----+---+
|Issue Date|Year|Month|Day|
+----------+----+-----+---+
|2018-02-09|2018|    2|  9|
|2018-06-01|2018|    6|  1|
|2019-10-19|2019|   10| 19|
|2018-07-06|2018|    7|  6|
|2019-07-11|2019|    7| 11|
+----------+----+-----+---+
only showing top 5 rows



In [14]:
# Remove non-sensible years, months, and days
# Drop Issue Date since Year, Monty, Day columns have been added already
df_7 = df_6[(df_6['Year']<=2020) & (df_6['Month']<= 12) & (df_6['Day']<= 31)].drop('Issue Date')

In [15]:
df_7.columns

['State',
 'License_Type',
 'Violation',
 'Fine Amount',
 'Penalty Amount',
 'Reduction Amount',
 'Payment Amount',
 'Amount Due',
 'Precinct',
 'County',
 'Issuing_Agency',
 'Year',
 'Month',
 'Day']

## Indexing Strings

In [16]:
# indexing string columns
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_7) for column in ['State',
 'License_Type',
 'Violation',
 'County',
 'Issuing_Agency'] ]

pipeline = Pipeline(stages=indexers)

# fit, transform
# dropping non-indexed string columns
df_8 = pipeline.fit(df_7).transform(df_7).drop('State',
 'License_Type',
 'Violation',
 'County',
 'Issuing_Agency')

In [17]:
df_pd = df_8.toPandas()

In [18]:
# Check if there are any null values left
df_pd.isnull().any()

Fine Amount             False
Penalty Amount          False
Reduction Amount        False
Payment Amount          False
Amount Due              False
Precinct                False
Year                    False
Month                   False
Day                     False
State_index             False
License_Type_index      False
Violation_index         False
County_index            False
Issuing_Agency_index    False
dtype: bool

In [19]:
df_pd.head()

Unnamed: 0,Fine Amount,Penalty Amount,Reduction Amount,Payment Amount,Amount Due,Precinct,Year,Month,Day,State_index,License_Type_index,Violation_index,County_index,Issuing_Agency_index
0,45.0,60.0,0.0,0.0,120.35,68,2018,2,9,0.0,1.0,0.0,0.0,0.0
1,45.0,10.0,0.0,0.0,55.0,48,2018,6,1,10.0,0.0,0.0,3.0,0.0
2,115.0,30.0,0.0,0.0,145.0,49,2019,10,19,2.0,0.0,3.0,3.0,0.0
3,45.0,10.0,0.0,0.0,55.0,48,2018,7,6,10.0,0.0,0.0,3.0,0.0
4,115.0,0.0,115.0,0.0,0.0,10,2019,7,11,9.0,0.0,3.0,1.0,0.0


### Save data for modeling

In [21]:
#project.save_data("sampled_data.csv", df_pd.to_csv())