# IST718 Big Data Final Project - SF_crime
##### Project1 - Chiau Yin Yang, Qing Chen, Zilong Chen

We ran our models on databricks and download the python notebook for submission

In [2]:
# import package
from csv import reader
from pyspark.sql import SparkSession
import pyspark

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.ml import feature, regression, Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as fn, Row
from pyspark.sql.types import *
from pyspark import sql
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

import numpy as np
import pandas as pd
import seaborn as sb
import matplotlib.pyplot as plt
import warnings

import os

## Convert Data to Spark DataFrame

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.format("csv").option("header", "true").load("FileStore/tables/Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv")


In [5]:
# check list of file
%fs ls FileStore/tables

path,name,size
dbfs:/FileStore/tables/Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv,Police_Department_Incident_Reports__Historical_2003_to_May_2018.csv,300601428
dbfs:/FileStore/tables/crime_type_predefined-1fa1e.csv,crime_type_predefined-1fa1e.csv,615


In [6]:
# show dataframe
df.show(3)
print(df.count())

In [7]:
# drop na
df1 = df.na.drop()
df1.count()

## Data Preprocessing - converting date-time
    - Analysis data by time

In [9]:
from pyspark.sql.types import DateType
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import udf
from datetime import datetime
# Convert Date type from string to unix_timestamp
func = udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
# Add 'Year', 'Month', 'Day' columns to the dataframe
df_with_timestamp = df1.withColumn('Spark_Date', func(df.Date))
df_with_timestamp = df_with_timestamp.withColumn('Year', year(df_with_timestamp.Spark_Date)).withColumn('Month', month(df_with_timestamp.Spark_Date))
df_with_timestamp = df_with_timestamp.withColumn('Day', dayofmonth(df_with_timestamp.Spark_Date))


In [10]:
# add year, month, day to dataframe
df_with_timestamp.show(3)

In [12]:
# Convert the Date and Time from String to Integer
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import year, month, dayofmonth, hour, minute
from datetime import datetime
# Convert Time type from string to timestamp
func_timestamp = udf (lambda x: datetime.strptime(x, '%H:%M'), TimestampType())
# Add 'Year', 'Month', 'Day' columns to the dataframe
df_with_timestamp = df_with_timestamp.withColumn('Spark_Time', func_timestamp(df_with_timestamp.Time))
df_with_timestamp = df_with_timestamp.withColumn('Hour', hour(df_with_timestamp.Spark_Time)).withColumn('Minute', minute(df_with_timestamp.Spark_Time))

In [13]:
# to reduce processing time, save only necessary columns
final_df = df_with_timestamp.select("Category", "DayOfWeek", "PdDistrict", "Year","Month","Hour","Resolution")

In [14]:
# convert target attribute to numeric
final_df = final_df.withColumn(
    'Res_num', fn.when((fn.col('Resolution') == 'NONE') | (fn.col('Resolution') == 'UNFOUNDED'), int(0)).otherwise(int(1)))

In [16]:
# see target attribute distribution
final_df.groupby('Res_num').count().show()

In [17]:
# Load the pre-defined crime label dataset
df_crime_type = spark.read.format("csv").option("header", "true").load("FileStore/tables/crime_type_predefined-1fa1e.csv")

In [18]:
#df_crime_type.drop(index=0, inplace = True)
df_crime_type.head(3)

In [19]:
import pyspark.sql
from pyspark.sql import functions as fn


In [20]:
# check dataframe
final_df.show(5)

## Feature engineering
- Split hours into 5 levels and use morning as baseline reference
- Let OneHotEncoder process the rest of the categorical columns - location, month, crime category, day of the week

#### Variables that are considered:
- location
- Crime category
- Incident time
    - Hour (5 levels)
    - Month (12 levels)
    - Day of the week (7 levels)

In [22]:
# create dummy variable for hours - use morning as reference
final_df = final_df.withColumn('Morning', fn.when((fn.col('Hour') == 1) | (fn.col('Hour') == 2) |
                                                  (fn.col('Hour') == 3) |(fn.col('Hour') == 4) |
                                                  (fn.col('Hour') == 5) |(fn.col('Hour') == 6) |
                                                  (fn.col('Hour') == 7) | (fn.col('Hour') == 8), 1).otherwise(0))


In [23]:
#create dummy variable for day of the week - use wednesday as our reference
final_df = final_df.withColumn('Near noon', fn.when((fn.col('Hour') == 9) | (fn.col('Hour') == 10) |
                                              (fn.col('Hour') == 11) | (fn.col('Hour') == 12), 1).otherwise(0))
final_df = final_df.withColumn('Afternoon', fn.when((fn.col('Hour') == 13) | (fn.col('Hour') == 14) |
                                            (fn.col('Hour') == 15) | (fn.col('Hour') == 16) |
                                             (fn.col('Hour') == 17), 1).otherwise(0))
final_df = final_df.withColumn('evening', fn.when((fn.col('Hour') >= 18) & (fn.col('Hour') <= 20), 1).otherwise(0))
final_df = final_df.withColumn('Night', fn.when((fn.col('Hour') == 21) | (fn.col('Hour') == 22) |
                                                 (fn.col('Hour') == 23) | (fn.col('Hour') == 0), 1).otherwise(0))

final_df.show(3)
#training_df = df_final.where('Year >= 2008 and Year < 2014')                       

In [24]:
# dropping morning cuz it is baseline reference
final_df = final_df.drop('Morning')
final_df.show(2)

In [26]:
# checking the size of the dataframe
print((df_final.count(), len(df_final.columns)))

In [27]:
df_crime_type = df_crime_type.selectExpr("crime_type as Category", "level as Level")
df_crime_type.show(2)

In [28]:
# join 2 dataframe
crime_join = final_df.join(df_crime_type, on='Category',how='left')
crime_join.show(3)


In [29]:
#df_final = df_final.withColumn('Very Minor', fn.when(fn.col('Level') == '1', 1).otherwise(0)) 
# as our reference 
crime_join = crime_join.withColumn('Minor', fn.when(fn.col('Level') == '2', 1).otherwise(0))
crime_join = crime_join.withColumn('Medium', fn.when(fn.col('Level') == '3', 1).otherwise(0))
crime_join = crime_join.withColumn('Servere', fn.when(fn.col('Level') == '4', 1).otherwise(0))
crime_join = crime_join.withColumn('Very Servere', fn.when(fn.col('Level') == '5', 1).otherwise(0))

crime_join.show(3)

In [30]:
crime_join.groupby('Level').count().show()
# why is there missing values?? yay it is all fixed!!

In [31]:
final_df.describe()

## Split dataset into training, validation and testing by the year of the incidents
- Follow 0.6, 0.3, and 0.1 rule to split (6 years of data as training etc)

In [33]:
training_df = final_df.where('Year >= 2008 and Year < 2014')
validation_df = final_df.where('Year >= 2014 and Year < 2017')
testing_df = final_df.where('Year >= 2017')

In [34]:
print("# points in training: ", training_df.count())
print("# points in validation: ", validation_df.count())
print("# points in testing: ", testing_df.count())

In [35]:
print(crime_join.count())
training_df.count() + validation_df.count() + testing_df.count()

In [36]:
# Functionality for computing features
from pyspark.ml import feature
# Functionality for regression
from pyspark.ml import regression
# Funcionality for classification
from pyspark.ml import classification
# Object for creating sequences of transformations
from pyspark.ml import Pipeline, evaluation

In [37]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics

In [38]:
from pyspark.sql.functions import isnan
training_df.columns

In [39]:
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import StringIndexer


## Modeling - tring different method

In [40]:
categoricalColumns = ['Category','DayOfWeek','PdDistrict']
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [41]:
# Transform all features into a vector using VectorAssembler
numericCols = ['Year','Month','Day','Hour','Level']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [42]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol="Res_num", outputCol="label")
stages += [label_stringIdx]

In [43]:
from pyspark.ml.classification import LogisticRegression
  
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(training_df)
preppedDataDF = pipelineModel.transform(validation_df)

In [44]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.9425262878280136
0.0,0.03125,0.9425262878280136
0.0,0.0625,0.930316496802224
0.0,0.09375,0.9292849222832752
0.0,0.125,0.9258791069127278
0.0,0.15625,0.9201190275232932
0.0,0.1875,0.9157598696966304
0.0,0.21875,0.9142448951429124
0.0,0.25,0.9009023445034938
0.0,0.28125,0.8957767532993453


In [45]:
# Keep relevant columns
dataset = preppedDataDF.select(["label", "features"])
display(dataset)

label,features
0.0,"List(0, 58, List(2, 43, 45, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 19.0, 14.0, 1.0))"
0.0,"List(0, 58, List(11, 48, 53, 54, 55, 56, 57), List(1.0, 1.0, 2015.0, 2.0, 1.0, 15.0, 4.0))"
0.0,"List(0, 58, List(3, 48, 53, 54, 55, 56, 57), List(1.0, 1.0, 2015.0, 2.0, 1.0, 15.0, 4.0))"
0.0,"List(0, 58, List(13, 48, 53, 54, 55, 56, 57), List(1.0, 1.0, 2015.0, 2.0, 1.0, 15.0, 5.0))"
0.0,"List(0, 58, List(5, 41, 46, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 27.0, 19.0, 4.0))"
0.0,"List(0, 58, List(2, 53, 54, 55, 56, 57), List(1.0, 2015.0, 2.0, 1.0, 16.0, 1.0))"
0.0,"List(0, 58, List(13, 40, 47, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 31.0, 21.0, 5.0))"
0.0,"List(0, 58, List(5, 40, 47, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 31.0, 21.0, 4.0))"
0.0,"List(0, 58, List(7, 40, 49, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 31.0, 16.0, 4.0))"
0.0,"List(0, 58, List(0, 40, 49, 53, 54, 55, 56, 57), List(1.0, 1.0, 1.0, 2015.0, 1.0, 31.0, 17.0, 3.0))"


In [46]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(dataset)

In [47]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = lrModel.transform(dataset)
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

# Modeling - run with different variables
- Random Forest (refere as rf)
- Logistic regression (refer as lr)
- Gradient boosting (refer as gb)

###### Differentiated by model number

Parameters used in model if any:
1. (RF) number of trees (10, 20, 30, 50, 80, 100)
2. (RF) cachenodeID
3. (GB, RF) seed = 0 for same result
4. (GB, LR) maxIter=10
5. (LR) regParam, elasticNetParam

In [None]:
# category + location + year + month + hour + level = all
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m1 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 #feature.StringIndexer(inputCol="Year",outputCol="yr_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 #feature.StringIndexer(inputCol="Level",outputCol="le_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 #feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot','cat_hot','mon_hot'], outputCol='features'),
                 classification.RandomForestClassifier(labelCol='Res_num', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m1_fit = rf_m1.fit(training_df)

In [None]:
rf_m1_fitted_trans = rf_m1_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
AUC1 = evaluator.evaluate(rf_m1_fitted_trans)
#regParam=0, elasticNetParam=0
AUC1

#### Checking naive bayes result, but did not use eventually

In [50]:
# category + day + location + year + month + hour + level = all
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
nb_m1 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Year','Month','Day','Hour','Level', 'day_wk_hot','dist_hot','cat_hot'], outputCol='features'),
                 classification.NaiveBayes(smoothing=1.0, modelType="multinomial")])
nb_m1_fit = nb_m1.fit(training_df)
nb_m1_fitted_trans = nb_m1_fit.transform(validation_df)


In [51]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

nb_m1_fitted_trans = nb_m1_fit.transform(validation_df)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(nb_m1_fitted_trans)
accuracy

In [52]:
#nb_m1_fitted_trans.show(5)
nb_m1_fitted_trans.select('label', 'prediction').show(30)

In [53]:
validation_df.show(3)

In [54]:
# all # morning as reference
lr_m1 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 #feature.StringIndexer(inputCol="Year",outputCol="yr_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 #feature.StringIndexer(inputCol="Level",outputCol="le_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot', 'cat_hot','mon_hot'], outputCol='features'),
                 classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m1_fit = lr_m1.fit(training_df)
lr_m1_fitted_trans = lr_m1_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC1 = evaluator.evaluate(lr_m1_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M1=',lr_AUC1)

In [55]:
# all # morning as reference
gb_m1 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 #feature.StringIndexer(inputCol="Year",outputCol="yr_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 #feature.StringIndexer(inputCol="Level",outputCol="le_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot', 'cat_hot','mon_hot'], outputCol='features'),
                 classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m1_fit = gb_m1.fit(training_df)
gb_m1_fitted_trans = gb_m1_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC1 = evaluator.evaluate(gb_m1_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M1=',gb_AUC1)

In [56]:
# All variables (yr, mon, day, hr, dayofweek, district) - including only categories, no levels
lr_m2 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 #feature.StringIndexer(inputCol="Year",outputCol="yr_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 #feature.StringIndexer(inputCol="Level",outputCol="le_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot','cat_hot','mon_hot'], outputCol='features'),
                 classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m2_fit = lr_m2.fit(training_df)
lr_m2_fitted_trans = lr_m2_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC2 = evaluator.evaluate(lr_m2_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M2=',lr_AUC2)

## Live demo on poster day - input data and evaluate result

In [57]:
# create dataframe
test_d = [{'Category': 'ROBBERY', 'DayOfWeek': 'Sunday','PdDistrict':'BAYVIEW', 'Year':2018,'Month':3,'Hour':12, 'Near noon':1,
          'Afternoon':0,'evening':0,'Night':0}]
test_df = spark.createDataFrame(test_d)#.collect()


In [58]:
test_df.show()

In [59]:
trial_result = lr_m2_fit.transform(test_df)
trial_result.select('prediction','probability').show()

In [60]:
training_df.show(2)

# Modeling - run with different variables (differentiated by model#) - continued

In [61]:
# All variables (yr, mon, day, hr, dayofweek, district) - including only categories, no levels
gb_m2 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 #feature.StringIndexer(inputCol="Year",outputCol="yr_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 #feature.StringIndexer(inputCol="Level",outputCol="le_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot','cat_hot','mon_hot'], outputCol='features'),
                 classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m2_fit = gb_m2.fit(training_df)
gb_m2_fitted_trans = gb_m2_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC2 = evaluator.evaluate(gb_m2_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M2=',gb_AUC2)

In [62]:
# category + day + location + year + month + hour + level, instead of level + cat, only use cat = all
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m2 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                          outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m2_fit = rf_m2.fit(training_df)
rf_m2_fitted_trans = rf_m2_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC2 = evaluator.evaluate(rf_m2_fitted_trans)
#regParam=0, elasticNetParam=0
print('rf_Model2 = ',AUC2)

In [63]:
# both level + cat, but removed day
# only consider year, month, time (hour), day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m4 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
         feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
         feature.StringIndexer(inputCol="Res_num", outputCol="label"),
         feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
         classification.GBTClassifier(labelCol='label', featuresCol='features',maxIter=10)])
gb_m4_fit = gb_m4.fit(training_df)
gb_m4_fitted_trans = gb_m4_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC4 = evaluator.evaluate(gb_m4_fitted_trans)
print('gb_Model4 = ',gb_AUC4)

In [64]:
# both level + cat, but removed day
# only consider year, month, time (hour), day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m4 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0, numTrees=10, cacheNodeIds = True)])
rf_m4_fit = rf_m4.fit(training_df)
rf_m4_fitted_trans = rf_m4_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC4 = evaluator.evaluate(rf_m4_fitted_trans)
print('Model4 = ',AUC4)

In [65]:
# both level + cat, but removed day
# only consider year, month, time (hour), day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m4 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
         feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
         feature.StringIndexer(inputCol="Res_num", outputCol="label"),
         feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','dist_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
         classification.LogisticRegression(labelCol='label', featuresCol='features',maxIter=10)])
lr_m4_fit = lr_m4.fit(training_df)
lr_m4_fitted_trans = lr_m4_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC4 = evaluator.evaluate(lr_m4_fitted_trans)
print('lr_Model4 = ',lr_AUC4)

In [66]:
# only category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m5 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['cat_ind'], outputCols=['cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['cat_hot'], outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m5_fit = rf_m5.fit(training_df)
rf_m5_fitted_trans = rf_m5_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
AUC5 = evaluator.evaluate(rf_m5_fitted_trans)
print('Model5 = ',AUC5)

In [67]:
# only category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m5 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['cat_ind'], outputCols=['cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['cat_hot'], outputCol='features'),
                classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m5_fit = lr_m5.fit(training_df)
lr_m5_fitted_trans = lr_m5_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
lr_AUC5 = evaluator.evaluate(lr_m5_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M5 =',lr_AUC5)

In [68]:
# only category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m5 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['cat_ind'], outputCols=['cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['cat_hot'], outputCol='features'),
                classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m5_fit = gb_m5.fit(training_df)
gb_m5_fitted_trans = gb_m5_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
gb_AUC5 = evaluator.evaluate(gb_m5_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M5 =',gb_AUC5)

In [69]:
# only location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m6 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind'], outputCols=['dist_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['dist_hot'], outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m6_fit = rf_m6.fit(training_df)
rf_m6_fitted_trans = rf_m6_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
AUC6 = evaluator.evaluate(rf_m6_fitted_trans)
print('Model6 = ',AUC6)

In [70]:
# only location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m6 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind'],
                                                           outputCols=['dist_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['dist_hot'], outputCol='features'),
              classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m6_fit = gb_m6.fit(training_df)
gb_m6_fitted_trans = gb_m6_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
gb_AUC6 = evaluator.evaluate(gb_m6_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M6 =',gb_AUC6)

In [71]:
# only location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m6 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind'],
                                                           outputCols=['dist_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['dist_hot'], outputCol='features'),
              classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m6_fit = lr_m6.fit(training_df)
lr_m6_fitted_trans = lr_m6_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
lr_AUC6 = evaluator.evaluate(lr_m6_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M6 =',lr_AUC6)

In [72]:
# only time (yr, mon, day, hour, dayofweek)
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m7 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m7_fit = rf_m7.fit(training_df)
rf_m7_fitted_trans = rf_m7_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC7 = evaluator.evaluate(rf_m7_fitted_trans)
print('Model7 = ',AUC7)

In [73]:
# only time (yr, mon, day, hour, dayofweek)
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m7 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','mon_hot'],
                                        outputCol='features'),
      classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m7_fit = lr_m7.fit(training_df)
lr_m7_fitted_trans = lr_m7_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC7 = evaluator.evaluate(lr_m7_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M7 =',lr_AUC7)

In [74]:
# only time (yr, mon, day, hour, dayofweek)
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m7 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot','mon_hot'],
                                        outputCol='features'),
      classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m7_fit = gb_m7.fit(training_df)
gb_m7_fitted_trans = gb_m7_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC7 = evaluator.evaluate(gb_m7_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M7 =',gb_AUC7)

In [75]:
# only time (yr, mon, day, hour, dayofweek) + category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m8 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m8_fit = rf_m8.fit(training_df)
rf_m8_fitted_trans = rf_m8_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC8 = evaluator.evaluate(rf_m8_fitted_trans)
print('Model8 = ',AUC8)

In [76]:
# only time (yr, mon, day, hour, dayofweek) + category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m8 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
      classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m8_fit = lr_m8.fit(training_df)
lr_m8_fitted_trans = lr_m8_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC8 = evaluator.evaluate(lr_m8_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M8 =',lr_AUC8)

In [77]:
# only time (yr, mon, day, hour, dayofweek) + category
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m8 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 #feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'cat_hot','mon_hot'],
                                        outputCol='features'),
      classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m8_fit = gb_m8.fit(training_df)
gb_m8_fitted_trans = gb_m8_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC8 = evaluator.evaluate(gb_m8_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M8 =',gb_AUC8)

In [78]:
# only time (yr, mon, day, hour, dayofweek) + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m9 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'dist_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'dist_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m9_fit = rf_m9.fit(training_df)
rf_m9_fitted_trans = rf_m9_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC9 = evaluator.evaluate(rf_m9_fitted_trans)
print('Model9 = ',AUC9)

In [79]:
# only time (yr, mon, day, hour, dayofweek) + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m9 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'dist_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'dist_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','mon_hot'],
                                        outputCol='features'),
      classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m9_fit = lr_m9.fit(training_df)
lr_m9_fitted_trans = lr_m9_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC9 = evaluator.evaluate(lr_m9_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M9 =',lr_AUC9)

In [80]:
# only time (yr, mon, day, hour, dayofweek) + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m9 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 #feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind', 'dist_ind','mon_ind'],
                                                           outputCols=['day_wk_hot', 'dist_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','mon_hot'],
                                        outputCol='features'),
      classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m9_fit = gb_m9.fit(training_df)
gb_m9_fitted_trans = gb_m9_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC9 = evaluator.evaluate(gb_m9_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M9 =',gb_AUC9)

In [81]:
# only category + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m10 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind','cat_ind'],
                                                           outputCols=['dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['dist_hot','cat_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features', seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m10_fit = rf_m10.fit(training_df)
rf_m10_fitted_trans = rf_m10_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC10 = evaluator.evaluate(rf_m10_fitted_trans)
print('Model10 = ',AUC10)

In [82]:
# only category + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m10 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind','cat_ind'],
                                                           outputCols=['dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
      feature.VectorAssembler(inputCols=['dist_hot','cat_hot'], outputCol='features'),
      classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m10_fit = lr_m10.fit(training_df)
lr_m10_fitted_trans = lr_m10_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC10 = evaluator.evaluate(lr_m10_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M10 =',lr_AUC10)

In [83]:
# only category + location
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m10 = Pipeline(stages=[
                 #feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['dist_ind','cat_ind'],
                                                           outputCols=['dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
      feature.VectorAssembler(inputCols=['dist_hot','cat_hot'], outputCol='features'),
      classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m10_fit = gb_m10.fit(training_df)
gb_m10_fitted_trans = gb_m10_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC10 = evaluator.evaluate(gb_m10_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M10 =',gb_AUC10)

In [84]:
# only category + location + time but day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m11 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m11_fit = rf_m11.fit(training_df)
rf_m11_fitted_trans = rf_m11_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC11 = evaluator.evaluate(rf_m11_fitted_trans)
print('Model11 = ',AUC11)


In [85]:
# only category + location + time but day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m11 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m11_fit = lr_m11.fit(training_df)
lr_m11_fitted_trans = lr_m11_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC11 = evaluator.evaluate(lr_m11_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M11 =',lr_AUC11)


In [86]:
# only category + location + time but day
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m11 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m11_fit = gb_m11.fit(training_df)
gb_m11_fitted_trans = gb_m11_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC11 = evaluator.evaluate(gb_m11_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M11 =',gb_AUC11)


In [87]:
# category + location + time but not day and year
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m12 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                   feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m12_fit = rf_m12.fit(training_df)
rf_m12_fitted_trans = rf_m12_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC12 = evaluator.evaluate(rf_m12_fitted_trans)
print('Model12 = ',AUC12)


In [88]:
# category + location + time but not day and year
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m12 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                   feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m12_fit = lr_m12.fit(training_df)
lr_m12_fitted_trans = lr_m12_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC12 = evaluator.evaluate(lr_m12_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M12 =',lr_AUC12)


In [89]:
# category + location + time but not day and year
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m12 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                   feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m12_fit = gb_m12.fit(training_df)
gb_m12_fitted_trans = gb_m12_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC12 = evaluator.evaluate(gb_m12_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M12 =',gb_AUC12)


In [90]:
# category + location + time but not day and year and month
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m13 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m13_fit = rf_m13.fit(training_df)
rf_m13_fitted_trans = rf_m13_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
AUC13 = evaluator.evaluate(rf_m13_fitted_trans)
print('Model13 = ',AUC13)


In [91]:
# category + location + time but not day and year and month
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m13 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot'],
                                        outputCol='features'),
     classification.LogisticRegression(labelCol='Res_num', featuresCol='features', maxIter=10)])
lr_m13_fit = lr_m13.fit(training_df)
lr_m13_fitted_trans = lr_m13_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
lr_AUC13 = evaluator.evaluate(lr_m13_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M13 =',lr_AUC13)


In [92]:
# category + location + time but not day and year and month
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m13 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['Near noon','Afternoon','evening','Night','day_wk_hot', 'dist_hot','cat_hot'],
                                        outputCol='features'),
     classification.GBTClassifier(labelCol='Res_num', featuresCol='features', maxIter=10)])
gb_m13_fit = gb_m13.fit(training_df)
gb_m13_fitted_trans = gb_m13_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
gb_AUC13 = evaluator.evaluate(gb_m13_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M13 =',gb_AUC13)


In [93]:
# category + location + time but not day and hour
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
rf_m15 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
                 classification.RandomForestClassifier(labelCol='label', featuresCol='features',seed = 0,
                            numTrees=10, cacheNodeIds = True)])
rf_m15_fit = rf_m15.fit(training_df)
rf_m15_fitted_trans = rf_m15_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
AUC15 = evaluator.evaluate(rf_m15_fitted_trans)
print('Model15 = ',AUC15)


In [94]:
# category + location + time but not day and hour
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
lr_m15 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.LogisticRegression(labelCol='label', featuresCol='features', maxIter=10)])
lr_m15_fit = lr_m15.fit(training_df)
lr_m15_fitted_trans = lr_m15_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
lr_AUC15 = evaluator.evaluate(lr_m15_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M15 =',lr_AUC15)

In [95]:
# category + location + time but not day and hour
# StringIndexer transformer operates only on a single column at the time so you'll need a single indexer and a single encoder for each column you want to transform.
gb_m15 = Pipeline(stages=[
                 feature.StringIndexer(inputCol="DayOfWeek", outputCol="day_wk_ind"),
                 feature.StringIndexer(inputCol="PdDistrict",outputCol="dist_ind"),
                 feature.StringIndexer(inputCol="Category",outputCol="cat_ind"),
                 feature.StringIndexer(inputCol="Month",outputCol="mon_ind"),
                 feature.OneHotEncoderEstimator(inputCols=['day_wk_ind','dist_ind','cat_ind','mon_ind'],
                                                           outputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot']),
                 feature.StringIndexer(inputCol="Res_num", outputCol="label"),
                 feature.VectorAssembler(inputCols=['day_wk_hot','dist_hot','cat_hot','mon_hot'],
                                        outputCol='features'),
     classification.GBTClassifier(labelCol='label', featuresCol='features', maxIter=10)])
gb_m15_fit = gb_m15.fit(training_df)
gb_m15_fitted_trans = gb_m15_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='label')
gb_AUC15 = evaluator.evaluate(gb_m15_fitted_trans)
#regParam=0, elasticNetParam=0
print('gb M15 =',gb_AUC15)

## Evaluate models and get feature importance

In [96]:
rf_m13_fit_model = rf_m13_fit.stages[-1]

In [97]:
m13_trans = rf_m13_fit.transform(validation_df)

In [98]:
# define function that extract features that are inside Onehotencoder
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [99]:
# random forest
ExtractFeatureImp(rf_m13_fit_model.featureImportances, m13_trans, "features").head(20)


In [100]:
# logistic regression
lr_m2_fit_model = lr_m2_fit.stages[-1]
m2_trans = lr_m2_fit.transform(validation_df)
ExtractFeatureImp(lr_m2_fit_model.coefficients, m2_trans, "features").head(60)

In [101]:
len(rf_m13_fit_model.trees)

In [102]:
# view a random tree
print(rf_m13_fit_model.trees[1].toDebugString)

In [103]:
# extract features for gradient boosting
gb_m13_fit_model = gb_m13_fit.stages[-1]
m13_trans = gb_m13_fit.transform(validation_df)
ExtractFeatureImp(gb_m13_fit_model.featureImportances, m13_trans, "features").head(20)


In [104]:
# randomForest - category + location + time but not day
rf_AUC_best = evaluator.evaluate(rf_m13_fit.transform(testing_df))
rf_AUC_best

In [105]:
# best model from gradient boosting
gb_AUC_best = evaluator.evaluate(gb_m13_fit.transform(testing_df))
gb_AUC_best

In [106]:
# Logistic Regression - category + location + time but not day and year
lr_AUC_best = evaluator.evaluate(lr_m12_fit.transform(testing_df))
lr_AUC_best

## Evaluate models by confustion matrix 
- Calculating recall and precision for each best model respectively

In [107]:
rf_model_13 = rf_m13_fit.transform(testing_df)
#.select((fn.col('prediction') == fn.col('Res_num'))).show()
rf_TP = int(rf_model_13.filter((rf_model_13["label"] == 1) & (rf_model_13["prediction"] == 1)).count())
rf_TP_FN = int(rf_model_13.filter(rf_model_13["label"] == 1).count())
rf_TP_FP = int(rf_model_13.filter(rf_model_13["prediction"] == 1).count())
rf_recall = int(rf_TP) / int(rf_TP_FN)
rf_precision = int(rf_TP) / int(rf_TP_FP)


#rf_m4_fitted_trans = rf_m4_fit.transform(validation_df)


In [108]:
gb_model_13 = gb_m13_fit.transform(testing_df)
#.select((fn.col('prediction') == fn.col('Res_num'))).show()
gb_TP = int(gb_model_13.filter((gb_model_13["label"] == 1) & (gb_model_13["prediction"] == 1)).count())
gb_TP_FN = int(gb_model_13.filter(gb_model_13["label"] == 1).count())
gb_TP_FP = int(gb_model_13.filter(gb_model_13["prediction"] == 1).count())
gb_recall = int(gb_TP) / int(gb_TP_FN)
gb_precision = int(gb_TP) / int(gb_TP_FP)


#rf_m4_fitted_trans = rf_m4_fit.transform(validation_df)


In [109]:
lr_model_2 = lr_m2_fit.transform(testing_df)
#.select((fn.col('prediction') == fn.col('Res_num'))).show()
lr_TP = int(lr_model_2.filter((lr_model_2["label"] == 1) & (lr_model_2["prediction"] == 1)).count())
lr_TP_FN = int(lr_model_2.filter(lr_model_2["label"] == 1).count())
lr_TP_FP = int(lr_model_2.filter(lr_model_2["prediction"] == 1).count())
lr_recall = int(rf_TP) / int(rf_TP_FN)
lr_precision = int(rf_TP) / int(rf_TP_FP)


#rf_m4_fitted_trans = rf_m4_fit.transform(validation_df)


In [110]:
print('RF',rf_recall)
print('RF-P',rf_precision)
print('LR',lr_recall)
print('LR-P',lr_precision)
print('GB',gb_recall)
print('GB-P',gb_precision)


### Review prediction for best model

In [111]:
lr_m12_fit.transform(testing_df).select('Res_num', 'prediction').show(30)

In [112]:
rf_m4_fit.transform(testing_df).select('Res_num', 'prediction').show(30)

In [113]:
rf_m12_fit.transform(testing_df).select(fn.avg(fn.expr('prediction = 1').cast('float'))).show()


In [114]:
rf_m4_fit.transform(testing_df).select(fn.avg('prediction')).show()


### Inference
- Feature importance for random forest and gradient boosting
- coefficients for logistic regression

In [115]:
rf_m12_model = rf_m12_fit.stages[-1]
rf_m12_model.predictionCol

In [116]:
rf_m12_model.minInfoGain

In [117]:
crime_m4_fitted.stages[-1].coefficients

In [118]:

lr_m12_fit = lr_m12.fit(training_df)
lr_m12_fitted_trans = lr_m12_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
lr_AUC12 = evaluator.evaluate(lr_m12_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M12 =',lr_AUC12)


In [119]:
print(lr_m12_fit.stages[-1].intercept)
lr_m12_fit.stages[-1].coefficients


In [120]:
m12_coe = lr_m12_fit.stages[-1].coefficients.tolist()


In [121]:
m9_feature = ['Minor', 'Medium', 'Servere',
                    'Very Servere','Year','Month','SOUTHERN','MISSION','NORTHERN','CENTRAL',
                    'BAYVIEW','TENDERLOIN', 'INGLESIDE','TARAVAL','PARK','Near noon','Afternoon',
                    'evening','Night']


In [122]:
m9_coe

In [123]:
mapping = dict(zip(m9_feature, m9_coe))
mapping

In [124]:
m9_feature_df = spark.createDataFrame(m9_feature, StringType()).toPandas()
m9_feature_df.head(3)
m9_feature_df.rename(columns={'value':'feature'},inplace=True)

In [125]:
#from pyspark.sql.types import IntegerType

# notice the parens after the type name
m9_coe_df = spark.createDataFrame(m9_coe, FloatType()).toPandas()

In [126]:
type(m9_coe_df)
m9_coe_df.rename(columns={'value':'coefficients'},inplace=True)

In [127]:
result = pd.concat([m9_feature_df, m9_coe_df], axis=1)
result['weights'] = (result['coefficients'] / 4)

In [128]:
result

In [129]:
lr_m12_fit = lr_m12.fit(training_df)
lr_m12_fitted_trans = lr_m12_fit.transform(validation_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='Res_num')
lr_AUC12 = evaluator.evaluate(lr_m12_fitted_trans)
#regParam=0, elasticNetParam=0
print('Lr M12 =',lr_AUC12)
