In [1]:
#1. Business and/or Situation understanding

#The objective of this research is to identify clusters of factors that contribute 
#to high probability of death or injury in a motor vehicle. 
#Prepared by Michael Gerlikhman, InfoSys722.
#GitHub: https://github.com/Gerlikhman/INFOSYS722_BDAS

#Initiation and Import
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
spark = SparkSession.builder.appName('Contributing_Factors_to_Motor_Vehicle_Crashes_in_New_Zealand').getOrCreate()

In [2]:
#2. Data understanding

#Loading the datasets
crash = spark.read.load('finaldata_201809.csv', format = 'csv', header = 'true')
driver = spark.read.load('driverdata_simulated.csv', format = 'csv', header = 'true')

#Tableau Was Used for initial visualisations

In [3]:
#View Unique values in categorical variables
crash.createOrReplaceTempView('crash')

#Preview
spark.sql("""SELECT distinct
        HOLIDAY, MULTI_VEH, TLA_NAME      
    FROM
        crash
    """).show() 

#Full View (optional)
#spark.sql("""SELECT distinct
#        HOLIDAY, MULTI_VEH, TLA_NAME, CRASH_LOCN1, INTERSECTION, JUNCTION_TYPE, INTSN_MIDBLOCK,
#        FLAT_HILL, ROAD_CHARACTER, ROAD_CURVATURE, ROAD_SURFACE, LIGHT, WEATHER_A, WEATHER_B       
#    FROM
#        crash
#    """).show(20, False) 

#crash.select("TLA_NAME").distinct().show() #exploring unique values for a single column

+------------------+--------------------+--------------------+
|           HOLIDAY|           MULTI_VEH|            TLA_NAME|
+------------------+--------------------+--------------------+
|Christmas/New Year|      Single vehicle|       Tauranga City|
|   Queens Birthday|Vehicle(s)+Cyclis...|        Dunedin City|
|Christmas/New Year|Vehicle(s)+Pedest...|   Christchurch City|
|Christmas/New Year|       Multi vehicle|     Clutha District|
|    Labour Weekend|       Multi vehicle|  Whakatane District|
|              None|Others without no...|     Wellington City|
|              None|Vehicle(s)+Cyclis...|     Clutha District|
|    Labour Weekend|Vehicle(s)+Pedest...|Waimakariri District|
|Christmas/New Year|Vehicle(s)+Pedest...|     Wellington City|
|    Labour Weekend|      Single vehicle|         Nelson City|
|              None|       Multi vehicle|     Timaru District|
|   Queens Birthday|      Single vehicle|  Whangarei District|
|Christmas/New Year|       Multi vehicle|    Opotiki Di

In [4]:
#Data Exploration

#crash.count()
crash.printSchema() #all strings?

#crash.describe().show() #very slow with large dataset, not practical
#crash.show() #too large to display in a practical way

#merged.select("TLA_NAME").distinct().show() #exploring unique values for columns

root
 |-- CRASH_YEAR: string (nullable = true)
 |-- CRASH_FIN_YEAR: string (nullable = true)
 |-- CRASH_SEV: string (nullable = true)
 |-- FATAL_COUNT: string (nullable = true)
 |-- SERIOUSINJ_COUNT: string (nullable = true)
 |-- MINORINJ_COUNT: string (nullable = true)
 |-- MULTI_VEH: string (nullable = true)
 |-- HOLIDAY: string (nullable = true)
 |-- LG_REGION_DESC: string (nullable = true)
 |-- TLA_ID: string (nullable = true)
 |-- TLA_NAME: string (nullable = true)
 |-- AU_ID: string (nullable = true)
 |-- MB_ID: string (nullable = true)
 |-- EASTING: string (nullable = true)
 |-- NORTHING: string (nullable = true)
 |-- CRASH_LOCN1: string (nullable = true)
 |-- CRASH_LOCN2: string (nullable = true)
 |-- OUTDTD_LOCN_DESC: string (nullable = true)
 |-- CRASH_RP_RS: string (nullable = true)
 |-- INTERSECTION: string (nullable = true)
 |-- JUNCTION_TYPE: string (nullable = true)
 |-- CR_RD_SIDE_RD: string (nullable = true)
 |-- CRASH_DIRN_DESC: string (nullable = true)
 |-- CRASH_DIS

In [5]:
#Removing Irrelevant Columns (based on meta description)

crash = crash.drop(*['CRASH_FIN_YEAR','LG_REGION_DESC','TLA_ID','AU_ID', 'MB_ID', 
                     'EASTING','NORTHING','CRASH_LOCN2', 'OUTDTD_LOCN_DESC',
                     'CRASH_RP_RS','CR_RD_SIDE_RD','CRASH_DIRN_DESC','CRASH_DIST',
                     'CRASH_RP_DIRN_DESC','DIRN_ROLE1_DESC','CRASH_RP_DISP', 'TRAFFIC_CTRL',
                     'CRASH_SH_DESC','CRASH_RP_SH','CRASH_RP_NEWS_DESC', 'ROAD_WET', 'URBAN',
                     'ROAD_LANE','DARK_LIGHT','OTHER','OTHER_VEHICLE_TYPE','UNKNOWN_VEHICLE_TYPE'])

In [6]:
#Data Exploration and Drop for Driver Set

driver.columns
driver = driver.drop(*['_c0', 'CRASH_YEAR', 'CRASH_SEV']) #these were a part of simulation, and are now duplicate
driver.show() #remaining columns

+------+-----+---+
|   Sex|  Age|DUI|
+------+-----+---+
|Female|20-24|  F|
|Female|60-64|  F|
|  Male|40-44|  F|
|  Male|15-19|  T|
|Female|35-39|  F|
|Female|15-19|  T|
|  Male|45-49|  F|
|  Male|30-34|  F|
|  Male|35-39|  F|
|  Male|25-29|  F|
|  Male|45-49|  F|
|Female|65-69|  F|
|  Male|40-44|  T|
|Female|40-44|  T|
|  Male|45-49|  T|
|  Male|  80+|  F|
|  Male|30-34|  F|
|  Male|30-34|  F|
|  Male|65-69|  F|
|Female|  80+|  F|
+------+-----+---+
only showing top 20 rows



In [7]:
#Unfortunately there is no 'easy' (at least for me) method to combine the two very large dataframes 
#on index or id in spark due to partitioning features, when applied this method 
#is unreliable  and outputs only half of the original rows (partitions > 1 are ignored). 
#I have left my attempts here for the record.

#from pyspark.sql.functions import monotonically_increasing_id

#crash = crash.withColumn('id', monotonically_increasing_id())
#driver = driver.withColumn('id', monotonically_increasing_id())

#merged = crash.join(driver, crash.id == driver.id)
#or
#merged_data = crash.join(driver, 'id', 'inner').drop('id')
#or
#merged = crash.join(driver, on=['row_index'])

#merged_data.count()
#reports only 332963 rows when 674811 are expected!

In [8]:
#Loading a merged (elsewhere) dataset

#Adding schema to the dataset
from pyspark.sql.types import (StructField,StringType,IntegerType,BooleanType,StructType)

data_schema = [StructField('_c0',IntegerType(),True),
              StructField('CRASH_YEAR',StringType(),True),
              StructField('CRASH_SEV',StringType(),True),
              StructField('FATAL_COUNT',IntegerType(),True),
              StructField('SERIOUSINJ_COUNT',IntegerType(),True),
              StructField('MINORINJ_COUNT',IntegerType(),True),
              StructField('MULTI_VEH',StringType(),True),
              StructField('HOLIDAY',StringType(),True),
              StructField('TLA_NAME',StringType(),True),
              StructField('CRASH_LOCN1',StringType(),True),
              StructField('INTERSECTION',StringType(),True),
              StructField('JUNCTION_TYPE',StringType(),True),
              StructField('INTSN_MIDBLOCK',StringType(),True),
              StructField('FLAT_HILL',StringType(),True),
              StructField('ROAD_CHARACTER',StringType(),True), 
              StructField('ROAD_CURVATURE',StringType(),True), 
              StructField('ROAD_MARKINGS',StringType(),True),
              StructField('ROAD_SURFACE',StringType(),True),
              StructField('NUM_LANES',StringType(),True),
              StructField('SPD_LIM',StringType(),True),
              StructField('ADV_SPD',StringType(),True),
              StructField('TMP_SPD_LIM',StringType(),True),
              StructField('LIGHT',StringType(),True),
              StructField('STREET_LIGHT',StringType(),True),
              StructField('WEATHER_A',StringType(),True),
              StructField('WEATHER_B',StringType(),True),
              StructField('ANIMALS',IntegerType(),True),
              StructField('BRIDGE',IntegerType(),True),
              StructField('CLIFF_BANK',IntegerType(),True),
              StructField('DEBRIS',IntegerType(),True),
              StructField('DITCH',IntegerType(),True),
              StructField('FENCE',IntegerType(),True),
              StructField('GUARD_RAIL',IntegerType(),True),
              StructField('HOUSE_OR_BLDG',IntegerType(),True),
              StructField('KERB',IntegerType(),True),
              StructField('OBJ_THROWN_DROPPED',IntegerType(),True),
              StructField('OVER_BANK',IntegerType(),True),
              StructField('PARKED_VEHICLE',IntegerType(),True),
              StructField('PHONE_BOX_ETC',IntegerType(),True),
              StructField('POST_OR_POLE',IntegerType(),True),
              StructField('ROADWORKS',IntegerType(),True),
              StructField('SLIP_OR_FLOOD',IntegerType(),True),
              StructField('STRAY_ANIMAL',IntegerType(),True),
              StructField('TRAFFIC_ISLAND',IntegerType(),True),
              StructField('TRAFFIC_SIGN',IntegerType(),True),
              StructField('TRAIN',IntegerType(),True),
              StructField('TREE',IntegerType(),True),
              StructField('VEHICLE',IntegerType(),True),
              StructField('WATER_RIVER',IntegerType(),True),
              StructField('BICYCLE',IntegerType(),True),
              StructField('BUS',IntegerType(),True),
              StructField('CAR_STN_WAGON',IntegerType(),True),
              StructField('MOPED',IntegerType(),True),
              StructField('MOTOR_CYCLE',IntegerType(),True),
              StructField('SCHOOL_BUS',IntegerType(),True),
              StructField('SUV',IntegerType(),True),
              StructField('TAXI',IntegerType(),True),
              StructField('TRUCK',IntegerType(),True),
              StructField('VAN_OR_UTILITY',IntegerType(),True),
              StructField('PEDESTRIAN',IntegerType(),True),
              StructField('Sex',StringType(),True),
              StructField('Age',StringType(),True), 
              StructField('DUI',StringType(),True)]

final_struct = StructType(fields=data_schema)

merged = spark.read.load('merged.csv', format = 'csv', header = 'true', schema=final_struct)

In [9]:
merged.printSchema() #now types are correct

root
 |-- _c0: integer (nullable = true)
 |-- CRASH_YEAR: string (nullable = true)
 |-- CRASH_SEV: string (nullable = true)
 |-- FATAL_COUNT: integer (nullable = true)
 |-- SERIOUSINJ_COUNT: integer (nullable = true)
 |-- MINORINJ_COUNT: integer (nullable = true)
 |-- MULTI_VEH: string (nullable = true)
 |-- HOLIDAY: string (nullable = true)
 |-- TLA_NAME: string (nullable = true)
 |-- CRASH_LOCN1: string (nullable = true)
 |-- INTERSECTION: string (nullable = true)
 |-- JUNCTION_TYPE: string (nullable = true)
 |-- INTSN_MIDBLOCK: string (nullable = true)
 |-- FLAT_HILL: string (nullable = true)
 |-- ROAD_CHARACTER: string (nullable = true)
 |-- ROAD_CURVATURE: string (nullable = true)
 |-- ROAD_MARKINGS: string (nullable = true)
 |-- ROAD_SURFACE: string (nullable = true)
 |-- NUM_LANES: string (nullable = true)
 |-- SPD_LIM: string (nullable = true)
 |-- ADV_SPD: string (nullable = true)
 |-- TMP_SPD_LIM: string (nullable = true)
 |-- LIGHT: string (nullable = true)
 |-- STREET_LIGHT

In [10]:
merged.count() #correct number of rows

674811

In [11]:
#3.2 Data Clean
#Filtering out NA's and some Unknown observations (not all, as some geniuinly describe a crash)
filtered_data = merged.filter((merged.FLAT_HILL != 'Unknown') &
                                   (merged.ROAD_CURVATURE != 'Unknown') & 
                                   (merged.ROAD_SURFACE != 'Unknown') & 
                                   (merged.LIGHT != 'Unknown') &
                                   (merged.WEATHER_A != 'Unknown') &
                                   (merged.SPD_LIM != ''))

filtered_data.na.fill('None')

filtered_data.count()

665055

In [12]:
#Check for any NAs (optional, takes a while)

#from pyspark.sql.functions import isnan, when, count, col
#filtered_data.select([count(when(isnan(c), c)).alias(c) for c in filtered_data.columns]).show()

In [13]:
#Creating Response Varialbes
from pyspark.sql.functions import when

#Creating a count variable that is the sum of fatal and serious injuries
filtered_data = filtered_data.withColumn('FS',filtered_data['SERIOUSINJ_COUNT']+filtered_data['FATAL_COUNT'])

#Creating a flag variable if crash had any fatal and serious injuries
filtered_data = filtered_data.withColumn('FSL',when(filtered_data.FS > 0, 1).otherwise(0))

In [14]:
#CHECKPOINT (save) *OPTIONAL
#filtered_data.repartition(1).write.format('com.databricks.spark.csv').save("filtered_save",header = 'true')

In [15]:
#CHECKPOINT (load) *OPTIONAL

from pyspark.sql.types import (StructField,StringType,IntegerType,BooleanType,StructType)

data_schema2 = [StructField('_c0',IntegerType(),True),
              StructField('CRASH_YEAR',StringType(),True),
              StructField('CRASH_SEV',StringType(),True),
              StructField('FATAL_COUNT',IntegerType(),True),
              StructField('SERIOUSINJ_COUNT',IntegerType(),True),
              StructField('MINORINJ_COUNT',IntegerType(),True),
              StructField('MULTI_VEH',StringType(),True),
              StructField('HOLIDAY',StringType(),True),
              StructField('TLA_NAME',StringType(),True),
              StructField('CRASH_LOCN1',StringType(),True),
              StructField('INTERSECTION',StringType(),True),
              StructField('JUNCTION_TYPE',StringType(),True),
              StructField('INTSN_MIDBLOCK',StringType(),True),
              StructField('FLAT_HILL',StringType(),True),
              StructField('ROAD_CHARACTER',StringType(),True), 
              StructField('ROAD_CURVATURE',StringType(),True), 
              StructField('ROAD_MARKINGS',StringType(),True),
              StructField('ROAD_SURFACE',StringType(),True),
              StructField('NUM_LANES',StringType(),True),
              StructField('SPD_LIM',StringType(),True),
              StructField('ADV_SPD',StringType(),True),
              StructField('TMP_SPD_LIM',StringType(),True),
              StructField('LIGHT',StringType(),True),
              StructField('STREET_LIGHT',StringType(),True),
              StructField('WEATHER_A',StringType(),True),
              StructField('WEATHER_B',StringType(),True),
              StructField('ANIMALS',IntegerType(),True),
              StructField('BRIDGE',IntegerType(),True),
              StructField('CLIFF_BANK',IntegerType(),True),
              StructField('DEBRIS',IntegerType(),True),
              StructField('DITCH',IntegerType(),True),
              StructField('FENCE',IntegerType(),True),
              StructField('GUARD_RAIL',IntegerType(),True),
              StructField('HOUSE_OR_BLDG',IntegerType(),True),
              StructField('KERB',IntegerType(),True),
              StructField('OBJ_THROWN_DROPPED',IntegerType(),True),
              StructField('OVER_BANK',IntegerType(),True),
              StructField('PARKED_VEHICLE',IntegerType(),True),
              StructField('PHONE_BOX_ETC',IntegerType(),True),
              StructField('POST_OR_POLE',IntegerType(),True),
              StructField('ROADWORKS',IntegerType(),True),
              StructField('SLIP_OR_FLOOD',IntegerType(),True),
              StructField('STRAY_ANIMAL',IntegerType(),True),
              StructField('TRAFFIC_ISLAND',IntegerType(),True),
              StructField('TRAFFIC_SIGN',IntegerType(),True),
              StructField('TRAIN',IntegerType(),True),
              StructField('TREE',IntegerType(),True),
              StructField('VEHICLE',IntegerType(),True),
              StructField('WATER_RIVER',IntegerType(),True),
              StructField('BICYCLE',IntegerType(),True),
              StructField('BUS',IntegerType(),True),
              StructField('CAR_STN_WAGON',IntegerType(),True),
              StructField('MOPED',IntegerType(),True),
              StructField('MOTOR_CYCLE',IntegerType(),True),
              StructField('SCHOOL_BUS',IntegerType(),True),
              StructField('SUV',IntegerType(),True),
              StructField('TAXI',IntegerType(),True),
              StructField('TRUCK',IntegerType(),True),
              StructField('VAN_OR_UTILITY',IntegerType(),True),
              StructField('PEDESTRIAN',IntegerType(),True),
              StructField('Sex',StringType(),True),
              StructField('Age',StringType(),True), 
              StructField('DUI',StringType(),True),
              StructField('FS',IntegerType(),True),
              StructField('FSL',IntegerType(),True)]

final_struct2 = StructType(fields=data_schema2)


#df = spark.read.load('filtered.csv', format = 'csv', header = 'true', schema=final_struct2) #load from checkpoint if needed
#or
df = filtered_data


In [16]:
#6.1 Conduct exploratory analysis and discuss

#Creating a small (10k) test subset for algorithm selection
df_t = df.limit(10000).drop('_c0')
#df_t.columns

responce_vars = ['FS','FSL']
variable_list_emblem = ['Sex','Age','DUI']
df_t = df_t.select(responce_vars + variable_list_emblem)

df_t.show()

+---+---+------+-----+---+
| FS|FSL|   Sex|  Age|DUI|
+---+---+------+-----+---+
|  3|  1|Female|20-24|  F|
|  2|  1|Female|60-64|  F|
|  3|  1|  Male|40-44|  F|
|  6|  1|  Male|15-19|  T|
|  1|  1|Female|35-39|  F|
|  1|  1|Female|15-19|  T|
|  1|  1|  Male|45-49|  F|
|  1|  1|  Male|30-34|  F|
|  1|  1|  Male|35-39|  F|
|  2|  1|  Male|25-29|  F|
|  1|  1|  Male|45-49|  F|
|  3|  1|Female|65-69|  F|
|  1|  1|  Male|40-44|  T|
|  1|  1|Female|40-44|  T|
|  1|  1|  Male|45-49|  T|
|  2|  1|  Male|  80+|  F|
|  1|  1|  Male|30-34|  F|
|  2|  1|  Male|30-34|  F|
|  1|  1|  Male|65-69|  F|
|  1|  1|Female|  80+|  F|
+---+---+------+-----+---+
only showing top 20 rows



In [17]:
#Data indexing -> encoding -> assembly pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

categorical_columns= ['Sex', 'Age', 'DUI']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

#Pipeline
pipeline = Pipeline(stages=indexers + encoders+[assembler])

#Call to transform
model_t = pipeline.fit(df_t)
transformed_t = model_t.transform(df_t)

In [18]:
#Selection and test/train Split
final_t = transformed_t.select('FS','FSL','features')
final_t.show()
train_t, test_t = final_t.randomSplit([0.7,.3])

+---+---+--------------------+
| FS|FSL|            features|
+---+---+--------------------+
|  3|  1|(18,[1,2,16],[1.0...|
|  2|  1|(18,[1,11,16],[1....|
|  3|  1|(18,[0,6,16],[1.0...|
|  6|  1|(18,[0,4,17],[1.0...|
|  1|  1|(18,[1,5,16],[1.0...|
|  1|  1|(18,[1,4,17],[1.0...|
|  1|  1|(18,[0,8,16],[1.0...|
|  1|  1|(18,[0,7,16],[1.0...|
|  1|  1|(18,[0,5,16],[1.0...|
|  2|  1|(18,[0,3,16],[1.0...|
|  1|  1|(18,[0,8,16],[1.0...|
|  3|  1|(18,[1,13,16],[1....|
|  1|  1|(18,[0,6,17],[1.0...|
|  1|  1|(18,[1,6,17],[1.0...|
|  1|  1|(18,[0,8,17],[1.0...|
|  2|  1|(18,[0,12,16],[1....|
|  1|  1|(18,[0,7,16],[1.0...|
|  2|  1|(18,[0,7,16],[1.0...|
|  1|  1|(18,[0,13,16],[1....|
|  1|  1|(18,[1,12,16],[1....|
+---+---+--------------------+
only showing top 20 rows



In [19]:
#Logistic Regression vs GLM Gaussian vs GLM Poisson test
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import GeneralizedLinearRegression

lr = LogisticRegression(featuresCol='features',labelCol='FSL')

glrg = GeneralizedLinearRegression(family="gaussian", link="identity", labelCol='FS', 
                                   featuresCol='features', maxIter=10, regParam=0.3, fitIntercept = True)

glrp = GeneralizedLinearRegression(family="poisson", link="log", labelCol='FS', 
                                   featuresCol='features', maxIter=10, regParam=0.3, fitIntercept = True)


In [20]:
#Fitting and Evaluating Logistic Regression

from pyspark.ml.evaluation import BinaryClassificationEvaluator

model_lr = lr.fit(train_t)
results_lr = model_lr.transform(test_t)
eval_lr = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='FSL')

summary_lr = model_lr.summary
AUC = eval_lr.evaluate(results_lr)
print("AUC:" + str(AUC))

AUC:0.5829316900397581


In [21]:
#Fitting and Evaluating GLM Gaussian

model_glrg = glrg.fit(train_t)
results_glrg = model_glrg.transform(test_t)
eval_glrg = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='FS')

summary_glrg = model_glrg.summary
AUC = eval_glrg.evaluate(results_glrg)
print("AUC:" + str(AUC))

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model_glrg.coefficients))
print("Intercept: " + str(model_glrg.intercept))

# Summarize the model over the training set and print out some metrics
print("Coefficient Standard Errors: " + str(summary_glrg.coefficientStandardErrors))
#print("T Values: " + str(summary_glrg.tValues))
print("P Values: " + str(summary_glrg.pValues))
print("Dispersion: " + str(summary_glrg.dispersion))
#print("Null Deviance: " + str(summary_glrg.nullDeviance))
#print("Residual Degree Of Freedom Null: " + str(summary_glrg.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary_glrg.deviance))
#print("Residual Degree Of Freedom: " + str(summary_glrg.residualDegreeOfFreedom))
print("AIC: " + str(summary_glrg.aic))
#print("Deviance Residuals: ")
#summary_glrg.residuals().show()

AUC:0.6066056662916736
Coefficients: [0.008173267972252929,-0.008173267972252853,-0.03292222194291687,-0.00479969758229749,-0.006605992753457887,-0.017860123348540972,0.01138630757600587,-0.013372948122147064,0.01208063423219443,0.015651129838158983,0.029857147598803386,0.008803853086716494,0.02274530104898066,-0.0015808928669626854,0.004452438336725364,0.08989853212993383,-0.18372236258616684,0.18372236258616648]
Intercept: 0.40779289590689205
Coefficient Standard Errors: [0.015572543527485423, 0.015572543527485423, 0.018306934485625037, 0.018890007369784936, 0.019115162267565825, 0.021827562090597438, 0.022189333221026775, 0.022895123955247, 0.023153033738146825, 0.024006909349673176, 0.024572415655036612, 0.025520418561882086, 0.025803006850404587, 0.027770147114092737, 0.03335051087208101, 0.051260799827331796, 0.02306396804543514, 0.023063968045435115, 0.029165808144352982]
P Values: [0.5997032201784012, 0.5997032201784012, 0.072164976492475, 0.7994361047922021, 0.7296618549501619

In [22]:
#Fitting and Evaluating GLM Poisson

model_glrp = glrp.fit(train_t)
results_glrp = model_glrp.transform(test_t)
eval_glrp = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='FS')

summary_glrp = model_glrp.summary
AUC = eval_glrp.evaluate(results_glrp)
print("AUC:" + str(AUC))

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model_glrp.coefficients))
print("Intercept: " + str(model_glrp.intercept))

# Summarize the model over the training set and print out some metrics
print("Coefficient Standard Errors: " + str(summary_glrp.coefficientStandardErrors))
#print("T Values: " + str(summary_glrp.tValues))
print("P Values: " + str(summary_glrp.pValues))
print("Dispersion: " + str(summary_glrp.dispersion))
#print("Null Deviance: " + str(summary_glrp.nullDeviance))
#print("Residual Degree Of Freedom Null: " + str(summary_glrp.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary_glrp.deviance))
#print("Residual Degree Of Freedom: " + str(summary_glrp.residualDegreeOfFreedom))
print("AIC: " + str(summary_glrp.aic))
#print("Deviance Residuals: ")
#summary_glrp.residuals().show()

AUC:0.6086519940846774
Coefficients: [0.024199027970040175,-0.02419902796987876,-0.04199291535883224,-0.005570384912688892,-0.0025359562488697536,-0.021691595651703414,0.013002378973480493,-0.015565344812879614,0.008449973334513536,0.009315874946861594,0.024551961996780967,0.004012231803960074,0.013814326681536282,-0.005527130685534327,-8.452267313886252e-05,0.019821102606597803,-0.31950996706601564,0.31950996706614854]
Intercept: -1.0920154089808356
Coefficient Standard Errors: [0.03483425067039704, 0.03483425067039701, 0.035736154855385366, 0.03609069693853523, 0.036103461787580615, 0.03752519350542378, 0.03750080212866343, 0.037879058073075116, 0.03794537562595549, 0.0382583601803345, 0.03829327408230291, 0.03860154483021337, 0.03871720061446873, 0.039151041669655724, 0.03989961972592014, 0.040939431582935074, 0.03588609925720829, 0.03588609925720844, 0.050769377719240436]
P Values: [0.48724919370518815, 0.48724919370809205, 0.2399618772285561, 0.8773384538675197, 0.944001555255136,

In [23]:
#Feature selection method test

from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.feature import ChiSqSelector

#Vector Slicer
slicer = VectorSlicer(inputCol="features", outputCol="features2", indices=[1])
output = slicer.transform(final_t)
output.select("features", "features2").show()

#ChiSq Selector
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="FS")

result = selector.fit(final_t).transform(final_t)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()

+--------------------+-------------+
|            features|    features2|
+--------------------+-------------+
|(18,[1,2,16],[1.0...|(1,[0],[1.0])|
|(18,[1,11,16],[1....|(1,[0],[1.0])|
|(18,[0,6,16],[1.0...|    (1,[],[])|
|(18,[0,4,17],[1.0...|    (1,[],[])|
|(18,[1,5,16],[1.0...|(1,[0],[1.0])|
|(18,[1,4,17],[1.0...|(1,[0],[1.0])|
|(18,[0,8,16],[1.0...|    (1,[],[])|
|(18,[0,7,16],[1.0...|    (1,[],[])|
|(18,[0,5,16],[1.0...|    (1,[],[])|
|(18,[0,3,16],[1.0...|    (1,[],[])|
|(18,[0,8,16],[1.0...|    (1,[],[])|
|(18,[1,13,16],[1....|(1,[0],[1.0])|
|(18,[0,6,17],[1.0...|    (1,[],[])|
|(18,[1,6,17],[1.0...|(1,[0],[1.0])|
|(18,[0,8,17],[1.0...|    (1,[],[])|
|(18,[0,12,16],[1....|    (1,[],[])|
|(18,[0,7,16],[1.0...|    (1,[],[])|
|(18,[0,7,16],[1.0...|    (1,[],[])|
|(18,[0,13,16],[1....|    (1,[],[])|
|(18,[1,12,16],[1....|(1,[0],[1.0])|
+--------------------+-------------+
only showing top 20 rows

ChiSqSelector output with top 1 features selected
+---+---+--------------------+------

In [24]:
#Trimmied data set with all features
dft = df.limit(10000).drop('_c0') 

In [25]:
#Full Model (~1min)


categorical_columns = ['CRASH_YEAR','MULTI_VEH','HOLIDAY','TLA_NAME','INTERSECTION','JUNCTION_TYPE',
                      'INTSN_MIDBLOCK','FLAT_HILL','ROAD_CHARACTER','ROAD_CURVATURE','ROAD_MARKINGS',
                      'ROAD_SURFACE','NUM_LANES','SPD_LIM','ADV_SPD','TMP_SPD_LIM','LIGHT',
                      'STREET_LIGHT','WEATHER_A','WEATHER_B','Sex','Age','DUI']
numerical_columns = ['ANIMALS','BRIDGE','CLIFF_BANK','DEBRIS','DITCH','FENCE','GUARD_RAIL','HOUSE_OR_BLDG',
                     'KERB','OBJ_THROWN_DROPPED','OVER_BANK','PARKED_VEHICLE','PHONE_BOX_ETC','POST_OR_POLE',
                     'ROADWORKS','SLIP_OR_FLOOD','STRAY_ANIMAL','TRAFFIC_ISLAND','TRAFFIC_SIGN','TRAIN',
                     'TREE','VEHICLE','WATER_RIVER','BICYCLE','BUS','CAR_STN_WAGON','MOPED','MOTOR_CYCLE',
                     'SCHOOL_BUS','SUV','TAXI','TRUCK','VAN_OR_UTILITY','PEDESTRIAN']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numerical_columns,outputCol="features")

#Pipeline
pipeline = Pipeline(stages=indexers + encoders+[assembler])

#Call to transform
model_f = pipeline.fit(dft)
transformed_f = model_f.transform(dft)

#Selection and test/train Split
final_f = transformed_f.select('FS','features')
train_f, test_f = final_f.randomSplit([0.7,.3])
train_f.show()

+---+--------------------+
| FS|            features|
+---+--------------------+
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
|  0|(228,[0,1,9,14,81...|
+---+--------------------+
only showing top 20 rows



In [26]:
#Fitting and Evaluating Full Model (~4min)

model_glrp = glrp.fit(train_f)
results_glrp = model_glrp.transform(test_f)
eval_glrp = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='FS')

summary_glrp = model_glrp.summary
AUC = eval_glrp.evaluate(results_glrp)
print("AUC:" + str(AUC))

# Print the coefficients and intercept for generalized linear regression model
#print("Coefficients: " + str(model_glrp.coefficients))
print("Intercept: " + str(model_glrp.intercept))

# Summarize the model over the training set and print out some metrics
#print("Coefficient Standard Errors: " + str(summary_glrp.coefficientStandardErrors))
#print("T Values: " + str(summary_glrp.tValues))
#print("P Values: " + str(summary_glrp.pValues))
#print("Dispersion: " + str(summary_glrp.dispersion))
#print("Null Deviance: " + str(summary_glrp.nullDeviance))
#print("Residual Degree Of Freedom Null: " + str(summary_glrp.residualDegreeOfFreedomNull))
#print("Deviance: " + str(summary_glrp.deviance))
#print("Residual Degree Of Freedom: " + str(summary_glrp.residualDegreeOfFreedom))
print("AIC: " + str(summary_glrp.aic))
#print("Deviance Residuals: ")
#summary_glrp.residuals().show()

AUC:0.7671447485684412
Intercept: -0.9550110032756095
AIC: 8841.511578903093


In [27]:
#ChiSq Selector
selector = ChiSqSelector(numTopFeatures=25, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="FS")

result = selector.fit(final_f).transform(final_f)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
#result.show(20, False) #Full view, hard to read

ChiSqSelector output with top 25 features selected
+---+--------------------+--------------------+
| FS|            features|    selectedFeatures|
+---+--------------------+--------------------+
|  3|(228,[0,2,10,14,8...|(25,[1,4,6,9,12,1...|
|  2|(228,[0,1,10,22,8...|(25,[0,7,8,10,13,...|
|  3|(228,[0,1,10,27,8...|(25,[0,7,8,10,11,...|
|  6|(228,[0,1,10,33,8...|(25,[0,7,8,10,13,...|
|  1|(228,[0,1,9,79,82...|(25,[0,6,9,10,16,...|
|  1|(228,[0,1,9,14,81...|(25,[0,4,7,8,11,1...|
|  1|(228,[0,2,9,33,81...|(25,[1,7,8,13,19,...|
|  1|(228,[0,3,9,14,81...|(25,[2,4,8,10,12,...|
|  1|(228,[0,3,9,19,81...|(25,[2,7,8,10,12,...|
|  2|(228,[0,2,9,14,81...|(25,[1,4,7,8,17,2...|
|  1|(228,[0,4,9,32,81...|(25,[8,10,13,19,2...|
|  3|(228,[0,2,9,50,81...|(25,[1,7,8,13,16,...|
|  1|(228,[0,2,9,14,81...|(25,[1,4,7,8,13,1...|
|  1|(228,[0,1,9,18,82...|(25,[0,6,9,17,21,...|
|  1|(228,[0,1,9,14,81...|(25,[0,4,7,8,13,1...|
|  2|(228,[0,1,9,14,82...|(25,[0,4,6,9,10,1...|
|  1|(228,[0,3,9,14,82...|(25,[2,4,6,

In [28]:
#7 Data Mining
#Final Model (~2min)

categorical_columns = ['HOLIDAY','JUNCTION_TYPE','ROAD_CHARACTER','ROAD_CURVATURE',
                       'SPD_LIM','LIGHT','Sex','Age','DUI']

#categorical_columns = ['TLA_NAME'] #Due to large number of levels it's best to run this seperatly

numerical_columns = ['BRIDGE','CLIFF_BANK','DITCH','FENCE','OVER_BANK','POST_OR_POLE','TRAFFIC_SIGN',
                     'TRAIN','TREE','BICYCLE','BUS','MOPED','MOTOR_CYCLE','WATER_RIVER','SUV',
                     'TRUCK','VAN_OR_UTILITY','PEDESTRIAN']

# The index of string vlaues multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]

# The encode of indexed vlaues multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + numerical_columns, outputCol="features")

#Pipeline
pipeline = Pipeline(stages=indexers + encoders+[assembler])

#Call to transform
model = pipeline.fit(df)
transformed = model.transform(df)

#Selection and test/train Split
final = transformed.select('FS','features')
train, test = final.randomSplit([0.9,.1])
train.show()

+---+--------------------+
| FS|            features|
+---+--------------------+
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
|  0|(82,[0,5,12,16,20...|
+---+--------------------+
only showing top 20 rows



In [29]:
#Fitting and Evaluating Final Model (~15min with full dataset)

model_glrp = glrp.fit(train)
results_glrp = model_glrp.transform(test)
eval_glrp = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='FS')

summary_glrp = model_glrp.summary
AUC = eval_glrp.evaluate(results_glrp)
print("AUC:" + str(AUC))

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model_glrp.coefficients))
print("Intercept: " + str(model_glrp.intercept))

# Summarize the model over the training set and print out some metrics
print("Coefficient Standard Errors: " + str(summary_glrp.coefficientStandardErrors))
print("P Values: " + str(summary_glrp.pValues))
print("Dispersion: " + str(summary_glrp.dispersion))
print("AIC: " + str(summary_glrp.aic))

AUC:0.7471969567836736
Coefficients: [-0.028136810422688348,0.01937140050436939,0.002011382699577969,0.0025027798476085616,0.004251247371048675,0.12704157106332828,-0.016162128133622874,-0.00594384881960432,-0.022519201123155832,-0.06297712807056198,-0.0136459432050499,-0.00579332170828007,0.010610041506717877,0.009096858865215544,-0.025580176163049133,0.005873275791373888,-0.13874619517755174,0.047364074844803476,0.08870028421536819,0.002681836121231098,-0.2668763781621775,0.3019734836597745,-0.006095590888973929,0.015970416715005136,-0.03145545337854414,-0.012112675115051595,-0.0067750900815662235,-0.002215249436427502,0.004069589469237835,-0.0012295933560959285,0.0012262496788667955,8.953881544077225e-05,0.0016623743350506882,0.00184364746234318,-3.567391834077336e-05,-2.7368754940059366e-05,-7.76909013807928e-05,8.664259947364489e-05,-3.196093093393474e-05,3.837905917560747e-05,-2.2258452612387222e-05,-5.338413763649706e-06,0.011914941342896598,-0.08070629673475369,0.06552591448381