## NYC Parking Ticket Prediction Using Apache Spark ML

Abstract: _Poor traffic and parking management in large metropolitan cities negatively impacts our living conditions. Additionally, it can be a large source of revenue for these cities, amounting to half a billion dollars yearly in New York City. In this work, we analyze parking ticket data for the period 2015-2020 in NYC, aggregating it with weather, time, holiday and location features with the goal of predicting the number of tickets given hourly at a particular location. Due to the large volume of the original data and the existence of several data sources with different formats and frequencies, we constructed our pipeline in a distributed way using PySpark.
    Our goal is not only to predict the number of parking tickets, but also to gain insight into the data and models by extracting feature importance, which can be valuable in the context of urban planning and traffic management._

#### Importing necessary libraries

In [1]:
## Spark session libraries
import findspark
findspark.init()
import pyspark
findspark.find()

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id

## Required for parsing the file as csv
import csv
from io import StringIO
from itertools import islice, repeat

## For preprocessing
from re import search, split, sub, compile as comp
import numpy as np
from statistics import median

## For Plots & other
import matplotlib.pyplot as plt
from collections import defaultdict
from matplotlib import cm, colors
import pandas as pd
from numpy import allclose
import warnings
import requests, json, time, random
from collections import Counter

## for models
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.feature import StringIndexer
from pyspark.sql import types
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import Row
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.tree import RandomForest
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import KMeansModel

The following cell imports `folium`, which is a simple and interactive Python library which makes working with geographical data easy. To run the cell, first do: `conda install -c conda-forge folium`. Note: this is not necessary for model training, only visualization purposes.

In [None]:
# DO NOT RULL THIS CELL
import folium 
from folium.plugins import MarkerCluster

#### Initializing Spark context and session

Note: [*\] runs with the maximum available number of threads --- we used 12 logical threads on a 6 core PC.

In [2]:
sc = SparkContext(appName="SDDM", master='local[*,4]')
sc.setLocalProperty("spark.scheduler.pool", "pool1")
print(sc.pythonVer)
print (sc.master)
sc

3.8
local[*,4]


Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. 
To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. 
To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread.


In [3]:
ss = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "64g") \
    .appName('SDDM2') \
    .config("spark.driver.maxResultSize", "32g")\
    .config("spark.executor.memory", "64g")\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
    .getOrCreate()

#### Defining necessary functions

In [4]:
def parseCSV(csvRow) :
    '''Parses a row into a list of elements'''
    data = StringIO(csvRow)
    dataReader = csv.reader(data, lineterminator = '')
    return(next(dataReader))

def readFileAsCSV(session, filepath):
    '''Reads a files as text file and then parses each row and returns a list of list: 
        [[Row]
         [Row]
         [Row]]
     '''
    try:
        data = session.textFile(name = str(filepath))
        data = data.map(parseCSV)
    except:
        print('Failed to read the file!')
        data = []
    return data

#### Ticket data pre-processing

In [5]:
def tickTime(x, ind):
    '''Extracts the month and year from the issue date'''
    try:
        m = int(x[ind][0:2])
        y = int(x[ind][6:10])
    except:
        m = '0'
        y = '0'    
    x.append(str(m))
    x.append(str(y))
    return x

def street_preprocess(x, ind):
    '''Preprocess the street names'''
    s = x[ind]
    s = s.replace('AVENUE','AVE').replace('STREET','ST').replace('BLVD','BL')
    s = s.replace('\sEAST\s',' E ').replace('\sWEST\s',' W ').replace('\sNORTH\s',' N ').replace('\sSOUTH\s',' S ')
    s = s.replace('\sROAD\s',' RD ').replace('\sEXPY\s','EXWY').replace('\sPARKWY\s','PKWY').replace('\sISLAND\s','ISL')
    s = s.replace('\sFIRST\s','1').replace('\sSECOND\s','2').replace('\sTHRID\s','3')
    s = s.replace('\sFOURTH\s','4').replace('\sFIVETH\s','5').replace('\sSIXTH\s','6')
    s = s.replace('\sSEVENTH\s','7').replace('\sEIGHTH\s','8').replace('\sNINETH\s','9').replace('\sTENTH\s','10')
    s = s.split()
    result = [x if not search(r'\d', x) else sub('[^0-9]','', x) for x in s]
    result = ' '.join(result)
    x[ind] = result.lower()
    return x

def rState(x, ind):
    if x[ind] == 'NY':
        x.append('0')
    else:
        x.append('1')
    return x

def violationType(x, ind):
    mydict = {"Misc":[35,41,90,91,94],
                        "No Parking":[20,21,23,24,27],
                        "No Standing":[3,4,5,6,8,10,11,12,13,14,15,16,17,18,19,22,25,26,30,31,40,44,54,57,58,63,64,77,78,81,89,92],
                        "Permit/Doc Issue":[1,2,29,70,71,72,73,76,80,83,87,88,93,97],
                        "Plate Issues":[74,75,82],
                        "Obstructing Path":[7,9,36,45,46,47,48,49,50,51,52,53,55,56,59,60,61,62,66,67,68,79,84,96,98],
                        "Overtime":[28,32,33,34,37,38,39,42,43,65,69,85,86]
                        }
    label = ''
    try:
        for key, value in mydict.items():
             for y in value:
                    if y == int(x[ind]):
                        label = key
    except: 
        label = ''
    newLabs = {0:'',
               1:"Misc",
               2:"No Parking",
               3:"No Standing",
               4:"Permit/Doc Issue",
               5:"Plate Issues",
               6:"Obstructing Path",
               7:"Overtime"}
    x.append(str(list(newLabs.keys())[list(newLabs.values()).index(label)]))
    x[ind] = label
    return x
    
def sH(x, ind):
    ''' Extracts the street number and house number from House number column
        Some house numbers are: 123-34, 45-56 and some are 34, 45 etc
    '''
    house_num = x[ind]
    try:
        if house_num == '':
            s = '0'
            h = '0'
        else:
            cond = '-' in house_num
            if cond:
                s, h = house_num.split('-')
            else:
                s = int(house_num)
                h = '0'
    except:
        s = '0'
        h = '0'
    x.append(str(s))
    x.append(str(h))
    return x
    
def preprocessedCSV(session, filepath):
    '''
    Reads the csv files, and then converts Issue date to date and month
    '''
    data = readFileAsCSV(session, filepath)
    header = data.take(1)[0]
    data = data.map(lambda x: [x[0], x[2], x[3], x[4], x[5], x[6], x[7], x[19], x[23], x[24]])
    header = data.take(1)[0]
    #print(header)
    ## Extracting the month and year
    data = data.map(lambda x: tickTime(x, header.index('Issue Date')))
    header.append('Issue Month')
    header.append('Issue Year')
    
    ## Removing the header line
    data = data.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
    ## Preprocessing the street names
    data = data.map(lambda x: street_preprocess(x, header.index('Street Name')))    
    ## Extracts the street and house number
    data = data.map(lambda x: sH(x, header.index('House Number')))
    header.append('Street')
    header.append('House')
    data = data.map(lambda x: rState(x, header.index('Registration State')))
    header.append('RState')
    data = data.map(lambda x: violationType(x, header.index('Violation Code')))
    header.append('VType')
    return data, header

#### Coordinate file processing

In [6]:
def streetHouse(x):
    '''
        Extracts the street and house number ranges for a street name and coordinate
    '''
    house_num = x[0]
    try:
        if house_num == '':
            l_s_min = '0'
            l_s_max = '0'
            r_s_min = '0'
            r_s_max = '0'

            h_l_min = '0'
            h_l_max = '0'
            h_r_min = '0'
            h_r_max = '0'
        else:
            cond = '-' in house_num
            if cond:
                l_s_min, h_l_min = x[0].split('-')
                l_s_max, h_l_max = x[1].split('-')
                r_s_min, h_r_min = x[4].split('-')
                r_s_max, h_r_max = x[5].split('-')
            else:
                l_s_min = int(x[0])
                l_s_max = int(x[1])
                r_s_min = int(x[4])
                r_s_max = int(x[5])
                h_l_min = '0'
                h_l_max = '0'
                h_r_min = '0'
                h_r_max = '0'

    except:
        l_s_min = '0'
        l_s_max = '0'
        r_s_min = '0'
        r_s_max = '0'
        h_l_min = '0'
        h_l_max = '0'
        h_r_min = '0'
        h_r_max = '0'
        
    x.extend([l_s_min, l_s_max, r_s_min, r_s_max, h_l_min, h_l_max, h_r_min, h_r_max])
    return x

def geoms(x, ind):
    '''Extracting one single latitute and longitude values from the geometry '''
    coords = x[ind]
    try: 
        coords = coords.replace('MULTILINESTRING ', '').replace('(','').replace(')', '').split(', ')
        coords = [i.split(' ') for i in coords]
        coords = [[float(j), float(k)] for j,k in coords]
        lon = str(median([j for j,k in coords ]))
        lat = str(median([k for j,k in coords ]))
    except:
        lon = 'NA'
        lat = 'NA'
    x.append(lon)
    x.append(lat)
    return x
    

def createCoordsFiles(session, filepath):
    '''Reading the centerline data set and returns a preprocessed RDD'''
    coords = readFileAsCSV(session, filepath) 
    ## Removing the first line
    coords = coords.map(lambda x: [x[0],x[1],x[28],x[3],x[4],x[5]])
    ## Getting the header
    coords_header = coords.take(1)[0]
    coords = coords.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
    ## Preprocessing the street names
    coords = coords.map(lambda x: street_preprocess(x, coords_header.index('FULL_STREE'))) 
    ## Finding the street numbers and house number limits
    coords = coords.map(streetHouse)
    coords = coords.map(lambda x: geoms(x, coords_header.index('the_geom')))
    ## Extracting the required columns
    #coords = coords.map(lambda x: [x[2],x[6],x[7],x[8],x[9], x[10], x[11]])
    coords_header.extend(['L_S_min', 'L_S_max', 'R_S_min', 'R_S_max', 'L_H_min', 'L_H_max', 'R_H_min', 'R_H_max', 'lon', 'lat'] )
    return coords, coords_header

#### Match the ticket to their respectve coordinates

In [7]:
def matchstreet(t, c):
    '''
    Based on the street and house number in tickets dataframe finds the coordintae value in centerline dataframe
    and merges the coordinate value to it.
    '''
    ## Making the columns integers for comparision
    t =  t.withColumn("Issue_Month", t["Issue_Month"].cast('integer'))
    t =  t.withColumn("Issue_Year", t["Issue_Year"].cast('integer'))
    t =  t.withColumn("Street", t["Street"].cast('integer'))
    t =  t.withColumn("House", t["House"].cast('integer'))
    t =  t.withColumn("RState", t["RState"].cast('integer'))
    t =  t.withColumn("VType", t["VType"].cast('integer'))
    t =  t.withColumn("Ids", monotonically_increasing_id())
    t =  t.withColumn("Ids", t["Ids"].cast('string'))
    
    c = c.select('FULL_STREE', c.L_S_min.cast('integer'),c.L_S_max.cast('integer'),\
                 c.R_S_min.cast('integer'),c.R_S_max.cast('integer'),\
                 c.L_H_min.cast('integer'),c.L_H_max.cast('integer'),\
                 c.R_H_min.cast('integer'),c.R_H_max.cast('integer'),\
                 c.lon.cast('float'),c.lat.cast('float'))
    ## performs inner join on the tickets. 
    merged = t.join(c, [t.Street_Name == c.FULL_STREE,\
                        (t.Street>=c.L_S_min)  | (t.Street>=c.R_S_min),\
                        (t.Street<=c.L_S_max)  | (t.Street<=c.R_S_max),\
                        (t.House >=c.L_H_min)  | (t.House >=c.R_H_min),\
                        (t.House <= c.L_H_max) | (t.House <= c.R_H_max)],'inner').select('Ids', 'Summons_Number', 'Registration_State', 'Plate_Type', 'Issue_Date', 'Violation_Code', 'Vehicle_Body_Type', 'Vehicle_Make', 'House_Number', 'Street_Name', 'Issue_Month', 'Issue_Year', 'Violation_Time', 'Street', 'House', 'RState', 'VType', 'lon', 'lat')
    
    return merged.dropDuplicates(subset=['Ids'])

#### Data collection and EDA

In [8]:
def getData(sc, filepath, filename, year):
    '''Reads each file in a loop and returns a list of RDDs'''
    tickets = []
    for yr in year:
        filelocation = str(filepath)+str(filename)+str(yr)+".csv"
        print(filelocation)
        parking_data, header = preprocessedCSV(sc, filelocation)
        tickets.append(parking_data)
    return tickets, header

In [9]:
def group_data(data, Val, Key):
    pairs = data.map(lambda x: (x[Key], x[Val]))
    return pairs.groupByKey().collect()

def group_data_toList(y, groupby):
    '''
    Input:
        - y: Grouped pyspark data returned from group_data function
        - groupby: Column name to remove an extra element
    '''
    lab = list(map(lambda x:x[0], y))
    val = list(map(lambda x:len(x[1]), y))
    try:
        kick = lab.index(groupby)
        lab.pop(kick)
        val.pop(kick)
    except:
        0
    if groupby=='Issue Month' or groupby == 'Violation Code':
        lab = list(map(lambda x: int(x), lab))
    return [lab,val]

In [10]:
def plot(ax, header, data, year, groupby, pt, plot_data):
    '''
    Input:
        - data: PySpark parsed CSV
        - groupby: Column name for grouping the data 
        - pt: Plot type
        - plot data: Variables required for bar plot
        
    Output: 
        - Plot
    '''
    if pt=='bar':
        col, val, axis_labels, legend_labels = plot_data
        
        cat_index = header.index(col)
        
        cat1 = data.filter(lambda x: x[cat_index]==val)
        cat2 = data.filter(lambda x: x[cat_index]!=val)                   

        groupby_index = header.index(groupby)
        count_column = header.index('Summons Number')
        
        cat1 = group_data_toList(group_data(cat1 , Val=count_column, Key=groupby_index), groupby)
        cat2 = group_data_toList(group_data(cat2 , Val=count_column, Key=groupby_index), groupby)
            
        ## plotting the graph
        ax.bar(cat1[0], cat1[1], width = 0.5, label=legend_labels[0]) 
        ax.bar(cat2[0], cat2[1], width = 0.5, label=legend_labels[1])
        
        ax.legend()
        ax.set_xlabel(groupby, fontsize=18)
        ax.set_ylabel('Number of tickets', fontsize=18)
        ax.set_title('Parking tickets for the year '+str(year-1), fontsize=22)
    if pt == 'pie':
        #try:
            groupby_index = header.index(groupby)
            count_column = header.index('Summons Number')
            
            label, data = group_data_toList(group_data(data , Val=count_column, Key=groupby_index), groupby)
            title = 'Parking ticket '+str(groupby)+' for the year '+str(year-1)
            
            if groupby == 'Violation Code': 
                ## Defining the violation code merges as a dictionary
                labs = {"Misc":[35,41,90,91,94],
                        "No Parking":[20,21,23,24,27],
                        "No Standing":[3,4,5,6,8,10,11,12,13,14,15,16,17,18,19,22,25,26,30,31,40,44,54,57,58,63,64,77,78,81,89,92],
                        "Permit/Doc Issue":[1,2,29,70,71,72,73,76,80,83,87,88,93,97],
                        "Plate Issues":[74,75,82],
                        "Obstructing Path":[7,9,36,45,46,47,48,49,50,51,52,53,55,56,59,60,61,62,66,67,68,79,84,96,98],
                        "Overtime":[28,32,33,34,37,38,39,42,43,65,69,85,86]
                        }
                ## Count based on the grouping
            
                temp = defaultdict(list)
                for i in range(len(label)):
                    for key, val in labs.items():
                        if label[i] in val:
                            if temp[key] == []:
                                temp[key] = 0
                            else:
                                temp[key] = temp[key]+data[i]

                ## Ordering data based on the dictionary 
                label, data  = list(), list()
                for key in labs.keys():
                    label.append(key)
                    data.append(temp[key])    

            ## Defining color for each category
            temp = defaultdict(list)
            for l,c in zip(labs,cm.tab20(range(len(labs)))):
                temp[l]=c

            centre_circle = plt.Circle((0,0),0.85,fc='white') ## radius to make it like a donut
            explode = np.full(len(label), 0.04) ## Gaps between the categories

            pat = ax.pie(list(map(lambda x: x*100/sum(data), data)), labels=label, textprops={'fontsize': 20}, autopct='%1.1f%%', startangle=90, pctdistance=0.6, explode = explode)
            if groupby == 'Violation Code':
                for pie_wedge in pat[0]:
                    pie_wedge.set_edgecolor('white')
                    pie_wedge.set_facecolor(temp[pie_wedge.get_label()]) # Assigning color code for each catergory

            ax.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle
            ax.set_title(title, fontsize =22, pad=20)
            plt.gcf().gca().add_artist(centre_circle)
        #except:
        #    print('Failed to plot!')        
    return ax 

def EDA(header, tickets, year, groupby, pt, plotdim, plot_data):
    fig = plt.figure(figsize=(30, 25))
    axs=plt.GridSpec(plotdim[0], plotdim[1], hspace=0.15, wspace=0.1)
    for i in range(len(year)):
        plot(fig.add_subplot(axs[i]), header, tickets[i], year[i], groupby, pt, plot_data)
    plt.savefig('EDA_'+str(groupby)+'_'+str(pt)+'.png',  bbox_inches='tight')

#### Matching the ticket with their coordinate values
This is done using the join method.
First we convert `all_tickets` into a dataframe, then we read the Centerline dataset as a dataframe `coord_df` and then using the `matchStreet` function we match the coordinates based on the left and right range of street numbers and house numbers based on the street names. Then we select the data from the years of interest.

In [11]:
def read_ticket_files(year, filepath, filename, centerline):
    
    ## Returns a list of RDDs which have data for each year
    
    tickets, header = getData(sc, filepath, filename, year)
    
    ## Reading the Centerline data set
    
    coords, c_header = createCoordsFiles(sc, centerline)
    coord_df = ss.createDataFrame(coords, schema=c_header)
    
    tickets_with_coords = []
    
    for i in range(len(tickets)):
        
        ## Converting the RDD list to a pyspark dataframe
        
        tickets_with_coords.append(matchstreet(ss.createDataFrame(tickets[i], schema=[x.replace(' ', '_') for x in header]), coord_df))
        
        ## Keeping the dataset which is from 2015 to 2020
        
        tickets_with_coords[i] = tickets_with_coords[i].filter(F.col('Issue_Year').isin(year))  
        
    return header, tickets, tickets_with_coords

In [12]:
%%time

# reading files

# download data from: https://www.kaggle.com/new-york-city/nyc-parking-tickets
filepath = 'nyc-parking-tickets/'
filename = 'Parking_Violations_Issued_-_Fiscal_Year_'
centerline = 'Centerline.csv'
year = list(range(2015, 2021))

h, tickets, tc_df = read_ticket_files(year, filepath, filename, centerline)

nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2015.csv
nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2016.csv
nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2017.csv
nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2018.csv
nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2019.csv
nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2020.csv
Wall time: 15.8 s


### EDA: just informative at this stage, not necessary [DO NOT RUN]

#### EDA: Bar plot representing the distribution of tickets over the years 

In [None]:
%%time

## Bar plot

category='Registration State'
value='NY' 
axis_labels=['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']  
legend_labels= ['New York', 'Other Cities']
plot_data = [category, value, axis_labels, legend_labels]

groupby = 'Issue Month'
EDA(h, tickets, year, groupby, 'bar', [int(len(year)/2),2], plot_data)

#### EDA: Pie chart showing the distribution of tickets according to violation code over the years

In [None]:
%%time

## Pie chart

groupby = 'Violation Code'
plot_data = []
EDA(h, tickets, year, groupby, 'pie', [int(len(year)/2),2], plot_data)

#### Model Preparation

In [13]:
%%time

vecAssembler = VectorAssembler(inputCols=["lat", "lon"], outputCol="features")

for i in range(len(year)):
    tc_df[i] = vecAssembler.transform(tc_df[i]) #putting the lattitude and longitude together for each year

Wall time: 248 ms


In [14]:
%%time

# defining kmeans model

# defining parameters

NUMBER_OF_CLUSTERS = 796
RANDOM_SEED = 1

kmeans = KMeans(k=NUMBER_OF_CLUSTERS, seed=RANDOM_SEED)  # clusters here

Wall time: 21 ms


In [None]:
# DO NOT RUN THIS CELL (the model can be loaded from file directly)
# training kmeans model

model = kmeans.fit(tc_df[0].select('features'))

In [15]:
# loading cluster model from file

model = KMeansModel.load('nyc_parking_tickets_final/kmodel')

for i in range(len(year)):
    tc_df[i] = model.transform(tc_df[i]) # transforming every year adds cluster number to a new column added in rdd

In [16]:
# extracting cluster centers 

ctr = []
centers = model.clusterCenters()

for center in centers:
    ctr.append(center) #extracting centers is easy since while fitting and tansforming the model already saves this information

In [17]:
ctr[0] # just for check, the first cluster center 

array([ 40.84229289, -73.90501314])

In [None]:
# DO NOT RUN THIS CELL
# visualization only

mc = MarkerCluster()
m = folium.Map(
    location=[40.767937,-73.982155], # these are center cordinates of new york
    zoom_start=12,)


for i in range(len(ctr)):
    mc.add_child(folium.Marker(location=[ ctr[i][0],ctr[i][1] ])) # adding cluster centers

m.add_child(mc)
m.save('F:/marker_cluster_example_file.html') #visualising the cluster centers just to ensure their distribution is even 

#### Extracting amenities

The section below is the process of extracting amenities. For this process libraries like `Requests` (which handles different API requests) can be used. The actual request has to be sent in the format which is accepted on the actual website. Hence we opted to send it in the format of multiple strings added toghether. The time taken for this process is dependent on traffic on the API server. The usual time taken is 1-4 hours, as a result of the fact that a delay is neccessary after every few requests. This prevents the user from getting kicked out from the API server, since the server has a limit set for each user. To avoid performing the process multiple times, the file provided in the zip package can be used instead of sending requests again.

In [None]:
# DO NOT RUN THIS CELL (use the provided file instead)

amenity_names = []
amenity_count = []

Amenity_per_location =pd.DataFrame()
Amenity_per_location_2=pd.DataFrame()
overpass_url = "http://overpass-api.de/api/interpreter"
for i in range(len(ctr)):
    overpass_query = '[out:json];' +'(' + \
    '// query part for: “aminity=*”' +'\n'+\
    'node["amenity"](around:1000,' + str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    'way["amenity"](around:1000,' +  str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    'relation["amenity"](around:1000,' +  str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    ');' + '\n'+\
    '// print results'+'\n'+\
    'out;'+'\n'+\
    '>;'+'out count;'
     
    response = requests.get(overpass_url, 
                        params={'data': overpass_query})
    try:
        data  = response.json()
    
    except (requests.exceptions.ConnectionError, json.decoder.JSONDecodeError):
        time.sleep(2**1 + random.random()*0.01) #exponential backoff


    typeamenity =[]
    for l in range(len(data['elements'])):
        try:
            ind = list(data['elements'][l].keys()).index('tags')
        
            typeamenity.append(data['elements'][l]['tags']['amenity'])
        except:
            ind = False

    amenity_names.append(list(Counter(typeamenity).keys()))
    amenity_count.append(list(Counter(typeamenity).values()))
    dictionary = dict(zip(amenity_names[i], amenity_count[i]))
    Amenity_per_location =  pd.DataFrame.from_dict(dictionary, orient='index')
    Amenity_per_location_2 = pd.concat([Amenity_per_location, Amenity_per_location_2], axis=1, sort=True)

In [None]:
# DO NOT RUN THIS CELL (use the provided file instead)

Amenity_per_location_2 = Amenity_per_location_2.T
Amenity_per_location_2['prediction'] =np.arange(0,len(Amenity_per_location_2))
Amenity_per_location_2.to_csv('F:/Amenity_per_location_2_clusters.csv')

In [18]:
%%time

Amenity_per_location_2 = ss.read.csv("Amenity_per_location.csv", header=True, sep=",");

for i in range(len(year)):
    tc_df[i] = Amenity_per_location_2.join(tc_df[i], on=['prediction'], how='right_outer')
    tc_df[i] = tc_df[i].withColumn("Violation_Time", F.concat(F.col("Violation_Time"), F.lit("M")))
    tc_df[i] = tc_df[i].withColumn("Violation_Time", F.to_timestamp(tc_df[i].Violation_Time, 'KKmmaa'))
    tc_df[i] = tc_df[i].withColumn("Violation_Time", F.date_format('Violation_Time', 'HH'))
    tc_df[i] = tc_df[i].filter(tc_df[i].Violation_Time.isNotNull())

Wall time: 896 ms


In [19]:
filelocation_weather = 'merged_weather_holidays_fixedMissingValues.csv'
weather = ss.read.csv(filelocation_weather, header=True)
weather = weather.withColumn("time", F.to_timestamp(weather.time, 'HH:mm:ss'))
weather = weather.withColumn("date", F.to_timestamp(weather.date, 'yyyy-MM-dd'))
weather = weather.withColumn("time", F.date_format('time', 'HH'))
weather = weather.withColumn("date", F.date_format('date', 'MM/dd/yyyy'))
weather = weather.withColumnRenamed("time", "Violation_Time")
weather = weather.withColumnRenamed("date", "Issue_Date")

In [20]:
# since the API returns 150 amenities, we dropped some which we assumed would have the least correlation to parking tickets
# future work should consider adding these as well

amenities_to_drop = ['amenity|ice_cream', 
'animal_boarding', 
'animal_shelter', 
'art_centre', 
'arts_centre', 
'bicycle_parking',
'bicycle_rental', 
'bicycle_repair_station', 
'biergarten', 
'boat_rental', 
'boat_storage',
'car_rental', 
'car_service', 
'car_sharing',
 'car_wash',
 'charging_station',
 'clock',
 'community_centre',
 'compressed_air',
 'concert_hall',
 'cooking_school',
 'courthouse',
 'coworking_space',
 'dancing_school',
 'dentist',
 'disused',
 'dojo',
 'drinking_water',
 'driving_school',
 'ferry_terminal',
 'fire_station',
 'food_court',
 'fortune_teller',
 'fountain',
 'fuel',
 'graphic_design',
 'grave_yard',
 'ice_cream',
 'internet_cafe',
 'karaoke_box',
 'language_school',
 'library',
 'loading_dock',
 'meditation_centre',
 'monastery',
 'motorcycle_parking',
 'museum',
 'music_school',
 'music_venue',
 'nail salon',
 'nail_salon',
 'nursing_home',
 'outdoor_seating',
 'parking',
 'parking_entrance',
 'parking_space',
 'payment_centre',
 'payment_terminal', 
'picnic_table',
 'police',
 'post_box',
 'post_depot',
 'prep_school',
 'prison',
 'public_bath',
 'public_bookcase',
 'public_building',
 'radio station',
 'ranger_station',
 'recycling',
 'rescue_station',
 'research_institute',
 'salon',
 'self_storage',
 'shelter',
 'shoe_repair',
 'smoking_area',
 'social_centre',
 'social_facility',
 'spa',
 'stock_exchange',
 'stripclub',
 'studio', 'swimming_pool',
 'swingerclub',
 'taxi',
 'telephone',
 'theatre',
 'toilets',
 'tourism',
 'townhall',
 'training',
 'university',
 'urgent_care',
 'vehicle_inspection',
 'vending_machine',
 'veterinary',
 'waste_basket',
 'waste_disposal',
 'waste_transfer_station',
 'wifi;telephone;device_charging_station',
'_c0',
'summary']

In [21]:
%%time

merged = []

for i in range(len(year)):
    tc_df[i] = tc_df[i].join(weather, ['Issue_Date', 'Violation_Time'])
    merged.append(tc_df[i].groupBy('prediction','Issue_Date','Violation_Time').count())
    merged[i] = merged[i].join(weather, ["Issue_Date", "Violation_Time"])
    merged[i] = merged[i].join(Amenity_per_location_2, (merged[i].prediction == Amenity_per_location_2.prediction)).drop("prediction").drop("datetime")
    merged[i] = merged[i].withColumn("Issue_Date", F.to_timestamp(merged[i].Issue_Date, 'MM/dd/yyyy'))
    merged[i] = merged[i].withColumn('Day_of_week',F.dayofweek(merged[i].Issue_Date))
    merged[i] = merged[i].withColumn('Day_of_year',F.dayofyear(merged[i].Issue_Date))
    merged[i] = merged[i].withColumn('Day_of_month',F.dayofmonth(merged[i].Issue_Date))
    merged[i] = merged[i].withColumn("Month", F.date_format('Issue_Date', 'MM'))
    merged[i] = merged[i].withColumn("Year", F.date_format('Issue_Date', 'YYYY'))
    merged[i] = merged[i].drop("Issue_Date")
    for c in amenities_to_drop:
        merged[i] = merged[i].drop(c)

Wall time: 9.6 s


In [22]:
%%time

# indexing categorical features

# categorical_features = ["icon", "precipType", "summary"]
categorical_features = ["icon", "precipType"]

for cat in categorical_features:
    for i in range(len(year)):
        if i == 0:
            stringIndexer = StringIndexer(inputCol=cat, outputCol=cat+"_cat", stringOrderType="frequencyDesc")
            stringIndexer.setHandleInvalid("keep")
            model = stringIndexer.fit(merged[i])
            model.setHandleInvalid("keep")
        merged[i] = model.transform(merged[i])

Wall time: 7min 44s


In [23]:
# dropping non-indexed categorical feature columns

for i in range(len(year)):
    merged[i] = merged[i].drop("icon").drop("precipType")

In [24]:
# casting the variables to floats for input into the regression model 

cat_features = [] 

floats = [x for x in merged[0].columns if x not in cat_features]

for i in range(len(year)):
    for feature in floats:
        merged[i] = merged[i].withColumn(feature, merged[i][feature].cast(types.FloatType()))

#### Displaying the final feature vector structure

In [25]:
# checking feature vector types

print('The features used in the model are: ')
merged[0].printSchema()

The features used in the model are: 
root
 |-- Violation_Time: float (nullable = true)
 |-- count: float (nullable = false)
 |-- precipIntensity: float (nullable = true)
 |-- precipProbability: float (nullable = true)
 |-- temperature: float (nullable = true)
 |-- apparentTemperature: float (nullable = true)
 |-- dewPoint: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- pressure: float (nullable = true)
 |-- windSpeed: float (nullable = true)
 |-- windGust: float (nullable = true)
 |-- windBearing: float (nullable = true)
 |-- cloudCover: float (nullable = true)
 |-- uvIndex: float (nullable = true)
 |-- visibility: float (nullable = true)
 |-- precipAccumulation: float (nullable = true)
 |-- ozone: float (nullable = true)
 |-- Holiday: float (nullable = true)
 |-- atm: float (nullable = true)
 |-- bakery: float (nullable = true)
 |-- bank: float (nullable = true)
 |-- bar: float (nullable = true)
 |-- bbq: float (nullable = true)
 |-- bench: float (nullable = true)

In [26]:
# final number of features

print('The final number of features used in the model is: '+ str(len(merged[0].columns)))

The final number of features used in the model is: 56


### Training models

In [27]:
# defining parameters 

TRAINING_DATA_RATIO = 0.8
RANDOM_SEED = 3
splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]

In [28]:
# creating vectors from the features for model input 

training_data, test_data = [], []

for i in  range(len(year)):
    feature_indices = [j for j, x in enumerate(merged[i].columns) if j!=1] # all columns except the label
    assembler = VectorAssembler(inputCols=list(np.array(merged[i].columns)[np.array(feature_indices)]),outputCol="features")\
                .setHandleInvalid("keep")
    spDF = assembler.transform(merged[i])
    tr, te = spDF.randomSplit(splits, RANDOM_SEED)
    training_data.append(tr)
    test_data.append(te)

In [29]:
# uniting the vectors per year in a single dataframe

train = training_data[0]
test = test_data[0]

for i in  range(1, len(year)):
    train.union(training_data[i])
    test.union(test_data[i])

1. Baseline

In [30]:
%%time

# extracting the mean count

mean_count = train.agg(F.avg(F.col("count")))
mean_count = mean_count.take(1)
mean_count = mean_count[0][0]
mean_count = int(mean_count)
print('The mean number of ticket per hour per location is: ' + str(mean_count))
test_withAvgCount = test.withColumn('avg_count', F.lit(mean_count))

The mean number of ticket per hour per location is: 4
Wall time: 4min 11s


In [31]:
%%time

# casting to float

test_withAvgCount = test_withAvgCount.withColumn('avg_count', test_withAvgCount['avg_count'].cast(types.FloatType()))
test_withAvgCount = test_withAvgCount.withColumn('count', test_withAvgCount['count'].cast(types.FloatType()))
baseline = test_withAvgCount.select(['avg_count','count'])

Wall time: 85.8 ms


In [32]:
%%time

# evaluating - RMSE

evaluator_rmse = RegressionEvaluator(labelCol="avg_count", predictionCol="count", metricName="rmse")
rmse = evaluator_rmse.evaluate(baseline)
print('RMSE: '+str(rmse))

RMSE: 5.175540554509165
Wall time: 4min 18s


In [33]:
%%time

# evaluating - MAE

evaluator_mae = RegressionEvaluator(labelCol="avg_count", predictionCol="count", metricName="mae")
mae = evaluator_mae.evaluate(baseline)
print('MAE: '+str(mae))

MAE: 3.055790505531438
Wall time: 4min 25s


2. Random Forest

In [34]:
%%time

# the following parameters should probably be pushed as far upwards as our machines can handle for better performance
# training for 1 year with current parameters takes <10min

RF_NUM_TREES = 40
RF_MAX_DEPTH = 7
RF_MAX_BINS = 10

Wall time: 0 ns


In [35]:
%%time

# training RF model

rf = RandomForestRegressor(numTrees=RF_NUM_TREES, maxDepth=RF_MAX_DEPTH, labelCol='count')
rf.setSeed(RANDOM_SEED)
model = rf.fit(train)

Wall time: 6min 30s


In [36]:
# extracting feature importance

featureImp = model.featureImportances

In [37]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    """This function converts the extracted feature importance to a readable format by associating them to column names."""
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [38]:
# extracting feature importance in readable format with column names

scores = ExtractFeatureImp(featureImp, train, 'features')

In [None]:
%%time

# saving feature importance to CSV

scores.to_csv('rf_scores_whole_dataset_50_trees.csv')

In [39]:
# printing the scores

print(scores)

    idx                 name     score
49   49          Day_of_year  0.155417
0     0       Violation_Time  0.150248
42   42     place_of_worship  0.090122
51   51                Month  0.088712
12   12              uvIndex  0.080956
34   34            fast_food  0.047372
48   48          Day_of_week  0.039022
53   53             icon_cat  0.038392
41   41             pharmacy  0.033262
50   50         Day_of_month  0.023116
46   46               school  0.023037
31   31              doctors  0.022810
43   43          post_office  0.018548
22   22                bench  0.018518
40   40            nightclub  0.014902
20   20                  bar  0.013451
27   27               cinema  0.012589
45   45           restaurant  0.012133
38   38          marketplace  0.011920
36   36             hospital  0.011084
25   25                 cafe  0.011027
4     4  apparentTemperature  0.008448
26   26            childcare  0.007706
35   35                  gym  0.006552
3     3          temperat

In [40]:
%%time

# evaluating

predictions = model.transform(test)

rmse_evaluator = RegressionEvaluator(
    labelCol="count", predictionCol="prediction", metricName="rmse")

rmse = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

mae_evaluator = RegressionEvaluator(
    labelCol="count", predictionCol="prediction", metricName="mae")

mae = mae_evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE) on test data = %g" % mae)

Root Mean Squared Error (RMSE) on test data = 4.60102
Mean Absolute Error (MAE) on test data = 2.72508
Wall time: 11min 7s


In [None]:
%%time

# saving the trained model to file

model_name = "RF_model"
model.save(model_name)

3. Gradient Boosted Tree

In [41]:
%%time

# defining model parameters

MAX_ITERS = 10
GBT_MAX_DEPTH = 5

Wall time: 0 ns


In [42]:
%%time

# training GBT model

gbt = GBTRegressor(maxIter=MAX_ITERS, maxDepth=GBT_MAX_DEPTH, labelCol="count", seed=RANDOM_SEED, leafCol="leafId")
model = gbt.fit(train)

Wall time: 9min 35s


In [43]:
# extracting feature importance

featureImp = model.featureImportances

In [44]:
# extracting feature importance in readable format with column names

scores = ExtractFeatureImp(featureImp, train, 'features')
scores = scores.round(5)

In [None]:
%%time

# saving feature importance to CSV

scores.to_csv('gbt_scores_whole_dataset.csv')

In [45]:
# printing the scores

print(scores)

    idx                 name    score
42   42     place_of_worship  0.20747
12   12              uvIndex  0.13155
31   31              doctors  0.07304
19   19                 bank  0.06414
34   34            fast_food  0.05487
0     0       Violation_Time  0.05348
41   41             pharmacy  0.05040
20   20                  bar  0.04081
43   43          post_office  0.03842
36   36             hospital  0.03597
49   49          Day_of_year  0.03217
22   22                bench  0.02513
48   48          Day_of_week  0.02511
27   27               cinema  0.02013
46   46               school  0.01915
40   40            nightclub  0.01663
26   26            childcare  0.01554
45   45           restaurant  0.01132
25   25                 cafe  0.01123
28   28               clinic  0.00958
51   51                Month  0.00936
44   44                  pub  0.00922
17   17                  atm  0.00902
24   24          bus_station  0.00886
53   53             icon_cat  0.00741
33   33     

In [46]:
%%time

# evaluating

predictions = model.transform(test)

rmse_evaluator = RegressionEvaluator(
    labelCol="count", predictionCol="prediction", metricName="rmse")

rmse = rmse_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

mae_evaluator = RegressionEvaluator(
    labelCol="count", predictionCol="prediction", metricName="mae")

mae = mae_evaluator.evaluate(predictions)
print("Mean Absolute Error (MAE) on test data = %g" % mae)

Root Mean Squared Error (RMSE) on test data = 4.367
Mean Absolute Error (MAE) on test data = 2.63164
Wall time: 13min 39s


In [47]:
%%time

# saving the trained model to file

model_name = "GBT_model"
model.save(model_name)

Wall time: 3.68 s


#### Stopping session

In [None]:
sc.stop()
ss.stop()