In [1]:
from pyspark.ml import feature
from pyspark.ml import clustering
from pyspark.ml import Pipeline
from pyspark.ml import pipeline
from pyspark.sql import functions as fn
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import when, lit, col,isnull,split, udf
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, OneHotEncoder
from pyspark.sql.functions import year, month, dayofmonth, hour
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import *

from pyspark.sql.types import IntegerType
from pyspark.ml.feature import PCA
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml import classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
#Read the csv file
data = spark.read.csv('data.csv',inferSchema=True, header=True)

In [3]:
data.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Crash Descriptor: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Day of Week: string (nullable = true)
 |-- Police Report: string (nullable = true)
 |-- Lighting Conditions: string (nullable = true)
 |-- Municipality: string (nullable = true)
 |-- Collision Type Descriptor: string (nullable = true)
 |-- County Name: string (nullable = true)
 |-- Road Descriptor: string (nullable = true)
 |-- Weather Conditions: string (nullable = true)
 |-- Traffic Control Device: string (nullable = true)
 |-- Road Surface Conditions: string (nullable = true)
 |-- DOT Reference Marker Location: string (nullable = true)
 |-- Pedestrian Bicyclist Action: string (nullable = true)
 |-- Event Descriptor: string (nullable = true)
 |-- Number of Vehicles Involved: integer (nullable = true)



In [4]:
# Label the rows with injury accident = 1 and property damage accident = 0
df = data.withColumn('AccidentDescriptor', 
                     when((data['Crash Descriptor'] == 'Property Damage & Injury Accident') | 
                          (data['Crash Descriptor'] == 'Injury Accident')| 
                          (data['Crash Descriptor'] == 'Fatal Accident'), 1).otherwise(0))

In [5]:
## HOUR BUCKET

#Extract the hour from the time column
df = df.withColumn('Hour', hour('Time'))

#Bucket the hour column: 
# 5 Buckets: 0-4a.m. = 0.0, 5-11 = 1.0, 12-16 = 2.0, 17-22 = 3.0, 23-34 = 4.0
# 5 Buckets: 0-4 = Night, 5-11 = Morning, 12-16 = Afternoon, 17-22 = Evening, 23-34 = Night

bucketizer = Bucketizer(splits=[0,5,12,17,23,24],inputCol="Hour", outputCol="Hour_buckets")
df = bucketizer.setHandleInvalid("keep").transform(df)

# convert 5 buckets into 4 buckets (we have twice Night): 1.0 = Morning, 2.0 = Afternoon, 3.0 = Evening, 4.0 = Night
t = {0.0:4, 1.0: 1, 2.0: 2, 3.0: 3, 4.0: 4}
udf_foo = udf(lambda x: t[x], IntegerType())
df = df.withColumn("Hour", udf_foo("Hour_buckets"))

#Extract the month from the Date column
df = df.withColumn('Month', split(df['Date'], '/')[0])
df = df.withColumn('Month',df.Month.cast('int'))


In [6]:
# removing features
df= df.drop('Time','Date','Hour_buckets','Municipality','DOT Reference Marker Location', 'Crash Descriptor',
            'Number of Vehicles Involved', 'Year', 'Police Report', 'Pedestrian Bicyclist Action', 'Event Descriptor')

# let us take a look at it with pandas
df_pd = df.toPandas()

In [7]:
# display table
df_pd.T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,895906,895907,895908,895909,895910,895911,895912,895913,895914,895915
Day of Week,Saturday,Saturday,Saturday,Saturday,Saturday,Saturday,Saturday,Saturday,Saturday,Saturday,...,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday,Wednesday
Lighting Conditions,Dark-Road Unlighted,Dark-Road Unlighted,Daylight,Unknown,Daylight,Dusk,Daylight,Daylight,Daylight,Dark-Road Lighted,...,Dark-Road Unlighted,Daylight,Dark-Road Lighted,Dark-Road Lighted,Daylight,Dark-Road Lighted,Unknown,Daylight,Daylight,Daylight
Collision Type Descriptor,OTHER,SIDESWIPE,LEFT TURN (3),Unknown,LEFT TURN (3),OTHER,OTHER,REAR END,REAR END,RIGHT ANGLE,...,OTHER,RIGHT ANGLE,Unknown,OVERTAKING,OTHER,OTHER,OTHER,OTHER,OTHER,OTHER
County Name,LEWIS,SUFFOLK,OTSEGO,KINGS,RENSSELAER,ERIE,ALBANY,WESTCHESTER,GREENE,NASSAU,...,OTSEGO,SUFFOLK,BRONX,KINGS,ERIE,ERIE,MONROE,ROCKLAND,QUEENS,SUFFOLK
Road Descriptor,Curve and Grade,Straight and Level,Straight and Level,Unknown,Straight and Grade,Straight and Grade,Curve and Level,Curve and Level,Straight at Hill Crest,Straight and Level,...,Curve and Level,Straight and Level,Straight at Hill Crest,Straight and Level,Curve and Level,Straight and Level,Unknown,Straight and Level,Straight and Level,Straight and Level
Weather Conditions,Cloudy,Clear,Cloudy,Unknown,Clear,Clear,Clear,Clear,Cloudy,Clear,...,Snow,Clear,Clear,Clear,Clear,Clear,Unknown,Clear,Clear,Cloudy
Traffic Control Device,,,No Passing Zone,Unknown,Traffic Signal,Unknown,,,,,...,No Passing Zone,Stop Sign,,,,,Unknown,,Traffic Signal,
Road Surface Conditions,Wet,Dry,Wet,Unknown,Dry,Snow/Ice,Snow/Ice,Dry,Snow/Ice,Dry,...,Snow/Ice,Dry,Dry,Dry,Snow/Ice,Wet,Unknown,Dry,Dry,Dry
AccidentDescriptor,0,0,1,1,0,0,1,1,0,1,...,0,0,1,0,0,1,0,1,0,0
Hour,3,1,1,2,1,3,1,2,2,3,...,3,1,3,1,1,4,3,2,1,2


In [8]:
df.columns

['Day of Week',
 'Lighting Conditions',
 'Collision Type Descriptor',
 'County Name',
 'Road Descriptor',
 'Weather Conditions',
 'Traffic Control Device',
 'Road Surface Conditions',
 'AccidentDescriptor',
 'Hour',
 'Month']

In [9]:
#Using StringIndexer to change the column as categorical variable 
#or to convert the textual data to numeric data keeping the categorical context.

pipe_feat= Pipeline(stages=[StringIndexer(inputCol = 'Day of Week', outputCol = 'Day of Week_index'),
                           StringIndexer(inputCol = 'Lighting Conditions', outputCol = 'Lighting Conditions_index'),
                           StringIndexer(inputCol = 'Collision Type Descriptor', outputCol = 'Collision Type Descriptor_index'),
                           StringIndexer(inputCol = 'County Name', outputCol = 'County Name_index'),
                           StringIndexer(inputCol = 'Road Descriptor', outputCol = 'Road Descriptor_index'),
                           StringIndexer(inputCol = 'Weather Conditions', outputCol = 'Weather Conditions_index'),
                            StringIndexer(inputCol = 'Traffic Control Device', outputCol = 'Traffic Control Device_index'),
                            StringIndexer(inputCol = 'Road Surface Conditions', outputCol = 'Road Surface Conditions_index'),
                    OneHotEncoder(inputCol = 'Day of Week_index', outputCol = 'Day of Week_feat', dropLast=False),
                           OneHotEncoder(inputCol = 'Lighting Conditions_index', outputCol = 'Lighting Conditions_feat', dropLast=False),
                           OneHotEncoder(inputCol = 'Collision Type Descriptor_index', outputCol = 'Collision Type Descriptor_feat', dropLast=False),
                           OneHotEncoder(inputCol = 'County Name_index', outputCol = 'County Name_feat', dropLast=False),
                           OneHotEncoder(inputCol = 'Road Descriptor_index', outputCol = 'Road Descriptor_feat', dropLast=False),
                           OneHotEncoder(inputCol = 'Weather Conditions_index', outputCol = 'Weather Conditions_feat', dropLast=False),
                            OneHotEncoder(inputCol = 'Traffic Control Device_index', outputCol = 'Traffic Control Device_feat', dropLast=False),
                            OneHotEncoder(inputCol = 'Road Surface Conditions_index', outputCol = 'Road Surface Conditions_feat', dropLast=False),
                            OneHotEncoder(inputCol = 'Hour', outputCol = 'Hour_feat', dropLast=False),
                            OneHotEncoder(inputCol = 'Month', outputCol = 'Month_feat', dropLast=False),
            
                           VectorAssembler(inputCols=['Hour','Month','Day of Week_feat','Lighting Conditions_feat','Collision Type Descriptor_feat',
                                                    'County Name_feat', 'Road Descriptor_feat','Weather Conditions_feat','Traffic Control Device_feat',
                                                     'Road Surface Conditions_feat'], outputCol = 'features',handleInvalid="keep")]).fit(df)

In [10]:
df_feat = pipe_feat.transform(df)

In [11]:
training_df, validation_df, testing_df = df_feat.randomSplit([0.6, 0.3, 0.1], seed=0)

In [12]:
pipe_lr = Pipeline(stages=[classification.LogisticRegression(labelCol='AccidentDescriptor', featuresCol='features',regParam = 0.1,elasticNetParam = 0.001)])

In [13]:
pipe_rf = Pipeline(stages=[classification.RandomForestClassifier(labelCol='AccidentDescriptor', featuresCol='features',numTrees = 20,maxDepth = 6)])


In [14]:
pipe_gbt = Pipeline(stages=[classification.GBTClassifier(labelCol='AccidentDescriptor', featuresCol='features',maxIter = 10)])

In [15]:
fit_lr = pipe_lr.fit(training_df)
dataframe_lr = fit_lr.transform(validation_df)

fit_rf = pipe_rf.fit(training_df)
dataframe_rf = fit_rf.transform(validation_df)

fit_gbt = pipe_gbt.fit(training_df)
dataframe_gbt = fit_gbt.transform(validation_df)

In [16]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='AccidentDescriptor')
AUC_LR = evaluator.evaluate(dataframe_lr)
AUC_RF = evaluator.evaluate(dataframe_rf)
AUC_GBT = evaluator.evaluate(dataframe_gbt)
print("The AUC of LR, RF, GBT is %s, %s, %s"%(AUC_LR,AUC_RF,AUC_GBT))

The AUC of LR, RF, GBT is 0.7424899334743599, 0.7193723960482135, 0.7416339208036482


In [17]:
df_feature =['Hour','Month']
label1 = pipe_feat.stages[0].labels
for i in label1:
    lab1 = i.replace(' ','_')
    df_feature.append('Day of Week_'+lab1)
label2 = pipe_feat.stages[1].labels
for i in label2:
    lab2 = i.replace(' ','_')
    df_feature.append('Lighting Conditions_'+lab2)
label3 = pipe_feat.stages[2].labels
for i in label3:
    lab3 = i.replace(' ','_')
    df_feature.append('Collision Type Descriptor_'+lab3)
label4 = pipe_feat.stages[3].labels
for i in label4:
    lab4 = i.replace(' ','_')
    df_feature.append('County Name_'+lab4)
label5 = pipe_feat.stages[4].labels
for i in label5:
    lab5 = i.replace(' ','_')
    df_feature.append('Road Descriptor_'+lab5)
label6 = pipe_feat.stages[5].labels
for i in label6:
    lab6= i.replace(' ','_')
    df_feature.append('Weather Conditions_'+lab6)
label7 = pipe_feat.stages[6].labels
for i in label7:
    lab7= i.replace(' ','_')
    df_feature.append('Traffic Control Device_'+lab7)
label8 = pipe_feat.stages[7].labels
for i in label8:
    lab8= i.replace(' ','_')
    df_feature.append('Road Surface Conditions_'+lab8)


In [18]:
lrm = fit_lr.stages[-1]
lrmcoeff = lrm.coefficients

In [26]:
list(zip(df_feature , lrmcoeff))

[('Hour', -0.013157370734570555),
 ('Month', 0.004923748102720214),
 ('Day of Week_Friday', -0.01218352015007196),
 ('Day of Week_Thursday', -0.0014269815025219439),
 ('Day of Week_Wednesday', -0.004722000121662202),
 ('Day of Week_Tuesday', -4.22839751601461e-06),
 ('Day of Week_Monday', -0.0033413897305520153),
 ('Day of Week_Saturday', 0.01617622620160546),
 ('Day of Week_Sunday', 0.02150701100343517),
 ('Lighting Conditions_Daylight', 0.10380190209961163),
 ('Lighting Conditions_Dark-Road_Lighted', 0.1537059013538393),
 ('Lighting Conditions_Dark-Road_Unlighted', -0.39059752606527254),
 ('Lighting Conditions_Unknown', -0.11071602450241778),
 ('Lighting Conditions_Dusk', 0.0019059273013549702),
 ('Lighting Conditions_Dawn', -0.21955436158037828),
 ('Collision Type Descriptor_OTHER', 0.28546154291516884),
 ('Collision Type Descriptor_REAR_END', 0.08629483590315458),
 ('Collision Type Descriptor_RIGHT_ANGLE', 0.061122875655107),
 ('Collision Type Descriptor_OVERTAKING', -0.67024733393

In [35]:
lr_coeff_feature = spark.createDataFrame([(tup[0], float(tup[1])) for tup in list(zip(df_feature , lrmcoeff))],['feature', 'LR Coeff']).sort('LR Coeff', ascending=False)
lr_coeff_feature.show(20,False)

+-------------------------------------------------------------+-------------------+
|feature                                                      |LR Coeff           |
+-------------------------------------------------------------+-------------------+
|County Name_BRONX                                            |1.0112682191971294 |
|County Name_KINGS                                            |0.9663865128796042 |
|County Name_QUEENS                                           |0.8297977524673253 |
|County Name_NEW_YORK                                         |0.7200593496065046 |
|Collision Type Descriptor_HEAD_ON                            |0.6428571750747638 |
|County Name_RICHMOND                                         |0.6379616520702633 |
|Traffic Control Device_Traffic_Signal                        |0.35555146799778875|
|Traffic Control Device_Flashing_Light                        |0.3029962107665864 |
|Collision Type Descriptor_OTHER                              |0.28546154291

In [36]:
importance= fit_rf.stages[-1].featureImportances.toArray()

In [37]:
feature_importance = spark.createDataFrame([(tup[0], float(tup[1])) for tup in list(zip(df_feature , importance))],['feature', 'importance']).sort('importance', ascending=False)

# feature_importance =pd.DataFrame(list(zip(df_feature, importance)),columns = ['feature', 'importance']).sort_values('importance', ascending=False)

In [38]:
feature_importance.show(20,False)

+---------------------------------------+--------------------+
|feature                                |importance          |
+---------------------------------------+--------------------+
|County Name_KINGS                      |0.1714645269124162  |
|Traffic Control Device_Traffic_Signal  |0.1479077575930924  |
|County Name_QUEENS                     |0.12859961412755544 |
|Lighting Conditions_Dark-Road_Unlighted|0.10331683672682393 |
|County Name_NEW_YORK                   |0.07468475094062997 |
|Collision Type Descriptor_OVERTAKING   |0.06033181286309536 |
|County Name_BRONX                      |0.05718543971510933 |
|Weather Conditions_Clear               |0.027539003793519576|
|Road Surface Conditions_Snow/Ice       |0.0247519782659003  |
|Traffic Control Device_None            |0.02341717528667362 |
|Collision Type Descriptor_OTHER        |0.02183743244870458 |
|Road Descriptor_Straight_and_Level     |0.020893419148421615|
|Collision Type Descriptor_Unknown      |0.012850138442

In [39]:
pipe_pca = Pipeline(stages=[feature.VectorAssembler(inputCols= ['Day of Week_feat',
 'Lighting Conditions_feat',
 'Collision Type Descriptor_feat',
 'County Name_feat',
 'Road Descriptor_feat',
 'Weather Conditions_feat',
 'Traffic Control Device_feat',
 'Road Surface Conditions_feat',
 'Hour',
 'Month'], outputCol = 'pcafeatures'), feature.PCA(k=120, inputCol='pcafeatures',outputCol='pc')]).fit(df_feat)


In [40]:
pc_120= pipe_pca.transform(df_feat)
training_pc, validation_pc, testing_pc = pc_120.randomSplit([0.6, 0.3, 0.1], seed=0)

In [41]:
pipe_lr_pc = Pipeline(stages=[classification.LogisticRegression(labelCol='AccidentDescriptor', featuresCol='pc',regParam = 0.1,elasticNetParam = 0.001)]).fit(training_pc)
pc_120_pred = pipe_lr_pc.transform(validation_pc)
AUC_LR_pc = evaluator.evaluate(pc_120_pred)
AUC_LR_pc

0.7447646775048541

In [42]:
pipe_rf_pc = Pipeline(stages=[classification.RandomForestClassifier(labelCol='AccidentDescriptor', featuresCol='pc',numTrees = 20,maxDepth = 6)]).fit(training_pc)
pc_120_pred_rf = pipe_rf_pc.transform(validation_pc)
AUC_RF_pc = evaluator.evaluate(pc_120_pred_rf)
AUC_RF_pc

0.7262452548102839

In [43]:
pipe_gbt_pc = Pipeline(stages=[classification.GBTClassifier(labelCol='AccidentDescriptor', featuresCol='pc',maxIter = 10)]).fit(training_pc)
pc_120_pred_gbt = pipe_gbt_pc.transform(validation_pc)
AUC_GBT_pc = evaluator.evaluate(pc_120_pred_gbt)
AUC_GBT_pc

0.7426137291984092

In [44]:
pca_model = pipe_pca.stages[-1]
pc1 = abs(pca_model.pc.toArray()[:, 0]).tolist()
pc2 = abs(pca_model.pc.toArray()[:, 1]).tolist()
pc1_df = spark.createDataFrame(list(zip(df_feature , pc1)),['feature', 'loadings']).sort('loadings', ascending=False)
pc2_df = spark.createDataFrame(list(zip(df_feature , pc2)),['feature', 'loadings']).sort('loadings', ascending=False)

# pc1_df = pd.DataFrame(list(zip(df_feature , pc1)),columns = ['feature', 'loadings']).sort_values('loadings', ascending=False)
# pc2_df = pd.DataFrame(list(zip(df_feature , pc2)),columns = ['feature', 'loadings']).sort_values('loadings', ascending=False)


In [46]:
pc1_df.show(5,False)

+----------------------------------------+--------------------+
|feature                                 |loadings            |
+----------------------------------------+--------------------+
|Road Surface Conditions_Flooded_Water   |0.9993768546684466  |
|Road Surface Conditions_Dry             |0.02038259078795679 |
|Traffic Control Device_Utility_Work_Area|0.017515872197374414|
|Weather Conditions_Rain                 |0.011091831440936766|
|Road Surface Conditions_Muddy           |0.010625329356173795|
+----------------------------------------+--------------------+
only showing top 5 rows



In [47]:
pc2_df.show(5,False)

+-----------------------------+-------------------+
|feature                      |loadings           |
+-----------------------------+-------------------+
|Road Surface Conditions_Muddy|0.9081255530803878 |
|Day of Week_Saturday         |0.319676337772738  |
|Day of Week_Sunday           |0.17017860138085694|
|Lighting Conditions_Daylight |0.09039427394613768|
|Lighting Conditions_Dusk     |0.07310966950066862|
+-----------------------------+-------------------+
only showing top 5 rows



In [48]:
test_df = spark.createDataFrame([
    ('Saturday','Daylight','SIDESWIPE','SUFFOLK','Curve and Grade','Cloudy','Traffic Signal','Wet',2,3,12)
], ['Day of Week',
 'Lighting Conditions',
 'Collision Type Descriptor',
 'County Name',
 'Road Descriptor',
 'Weather Conditions',
 'Traffic Control Device',
 'Road Surface Conditions',
 'AccidentDescriptor',
 'Hour',
 'Month'])

In [49]:
test_feat = pipe_feat.transform(test_df)

In [50]:
pc_test= pipe_pca.transform(test_feat)

In [51]:
pred = pipe_rf_pc.transform(pc_test)

In [52]:
pred.select('prediction').show()

+----------+
|prediction|
+----------+
|       0.0|
+----------+

