# INFOSYS 722 - BDAS ITERATION

# Predicting Crash Severity On New Zealand Roads

Ferdinand Djohar (adjo446)

## PREREQUISITES
Initialise and start spark session.

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('bdas').getOrCreate()

## DATA UNDERSTANDING & DATA PREPARATION

### 2.2 Data Description
Data set consist of 104,032 records in total, sourced from three CSV files (i.e. one file for each year) with 38 attributes.

Documentation provided by NZTA tells us that most of the value types in the data set are categorical and numeric types. Some variables are derived from other variables e.g. **URBAN** variable is derived from **SPD_LIM** variable giving possible values of *'Urban'* where **SPD_LIM** is less than 80 or *'Open Road'* where **SPD_LIM** is greater or equal to 80.

Please refer to *Appendix A* in the report for more detailed list of attributes of the data set extracted from NZTA documentation.

In [2]:
df = spark.read.csv("Data/*.csv", header = True, inferSchema = True)

# Print out the dimension of the data frame
print(df.count()) #rows
print(len(df.columns)) #columns

104032
38


### 2.3 Data Exploration

View first few records of the data set.

In [3]:
df.limit(5).toPandas()

Unnamed: 0,CRASH_YEAR,CRASH_SEV,MULTI_VEH,HOLIDAY,LG_REGION_DESC,EASTING,NORTHING,CRASH_LOCN1,CRASH_LOCN2,OUTDTD_LOCN_DESC,...,ROAD_SURFACE,ROAD_WET,NUM_LANES,TRAFFIC_CTRL,SPD_LIM,URBAN,LIGHT,STREET_LIGHT,WEATHER_A,WEATHER_B
0,2017,F,Vehicle(s)+Pedestrian(s),Christmas/New Year,Northland,1688160,6101368,KERIKERI INLET ROAD,PA ROAD,Current location,...,Sealed,Dry,2,,80,Openroad,Dark,On,Fine,Unknown
1,2017,F,Vehicle(s)+Pedestrian(s),Christmas/New Year,Northland,1642352,6126632,ORURU ROAD,RYDER ROAD,Current location,...,Sealed,Dry,2,,100,Openroad,Dark,,Fine,Unknown
2,2017,F,Multi vehicle,Christmas/New Year,Bay of Plenty,1942531,5794171,THORNTON ROAD,POWDRELL ROAD,Current location,...,Sealed,Dry,2,,100,Openroad,Overcast,,Fine,Unknown
3,2017,F,Multi vehicle,,Canterbury,1564498,5161943,SH 75,RIFLE RANGE CV,Current location,...,Sealed,Wet,2,,100,Openroad,Overcast,Unknown,Fine,Strong Wind
4,2017,F,Vehicle(s)+Pedestrian(s),,Otago,1313029,4986865,SH 8,AIRPORT ROAD,Current location,...,Sealed,Dry,2,,100,Openroad,Overcast,Unknown,Fine,Unknown


View schema of the data set.

In [4]:
df.printSchema()

root
 |-- CRASH_YEAR: integer (nullable = true)
 |-- CRASH_SEV: string (nullable = true)
 |-- MULTI_VEH: string (nullable = true)
 |-- HOLIDAY: string (nullable = true)
 |-- LG_REGION_DESC: string (nullable = true)
 |-- EASTING: integer (nullable = true)
 |-- NORTHING: integer (nullable = true)
 |-- CRASH_LOCN1: string (nullable = true)
 |-- CRASH_LOCN2: string (nullable = true)
 |-- OUTDTD_LOCN_DESC: string (nullable = true)
 |-- CRASH_RP_RS: integer (nullable = true)
 |-- INTERSECTION: string (nullable = true)
 |-- JUNCTION_TYPE: string (nullable = true)
 |-- CR_RD_SIDE_RD: integer (nullable = true)
 |-- CRASH_DIRN_DESC: string (nullable = true)
 |-- CRASH_DIST: integer (nullable = true)
 |-- CRASH_RP_DIRN_DESC: string (nullable = true)
 |-- DIRN_ROLE1_DESC: string (nullable = true)
 |-- CRASH_RP_DISP: integer (nullable = true)
 |-- CRASH_SH_DESC: string (nullable = true)
 |-- CRASH_RP_SH: string (nullable = true)
 |-- CRASH_RP_NEWS_DESC: string (nullable = true)
 |-- INTSN_MIDBLOCK:

### 2.4 Data Quality Verification
The data quality of the data set seems to be fairly high. Only three out of 38 variables have missing data (see the result of the code chunk below).

We also noticed that there are some variables have high proportion of *"Unknown"* and zero values which may result in such variables being identified as least important variables, thus excluded in later stages of our work.

Apart from minor data issues mentioned above, we found nothing of concern regarding the data quality of the data set.

In [5]:
import pandas as pd

from pyspark.sql.functions import isnan, when, count, col, regexp_replace

# Use spark to calculate the number of missing values for each column and convert the result to pandas.
# Note: the result will contain a single row with 38 columns, small enough to be handled by pandas in memory
missing_values = pd.concat(
    [
        df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().transpose(),
        df.select([count(when(col(c) == " ", c)).alias(c) for c in df.columns]).toPandas().transpose()
    ],
    axis = 1
)

missing_values.columns = ["null", "whitespace"]

missing_values["total"] = df.count()
missing_values["missing"] = missing_values["null"] + missing_values["whitespace"]
missing_values["percent"] = missing_values["missing"] / missing_values["total"] * 100

In [6]:
# Show columns that have missing values
missing_values.loc[missing_values['percent'] > 0].sort_values("percent", ascending = False)

Unnamed: 0,null,whitespace,total,missing,percent
CRASH_DIRN_DESC,0,35316,104032,35316,33.947247
ROAD_LANE,0,54,104032,54,0.051907
CR_RD_SIDE_RD,1,0,104032,1,0.000961


### 3.1 Data Selection
A variable called **OUTDTD_LOCN_DESC** specifies whether a certain crash record has an outdated location or not. A crash is said to have an outdated location where the road might have moved or does not exist anymore. We decided to exclude outdated records from our data set because the data may no longer be valid.

In [7]:
# Exclude outdated data
crash_data = df.filter("OUTDTD_LOCN_DESC <> 'Outdated Location'")

print(crash_data.count())

104006


### 3.2 Data Cleansing
In section 2.4, we identified three variables with missing values. **CRASH_DIRN_DESC** and **ROAD_LANE** both have white spaces, while **CR_RD_SIDE_RD** has null values. We decided to impute white spaces with text constant *"Unknown"* and discard records with null values.

In [8]:
# Drop records with null values
crash_data = crash_data.na.drop()

# Impute white spaces with text constant "Unknown"
crash_data = crash_data.withColumn("ROAD_LANE", regexp_replace(crash_data["ROAD_LANE"], " ", "Unknown"))
crash_data = crash_data.withColumn("CRASH_DIRN_DESC", regexp_replace(crash_data["CRASH_DIRN_DESC"], " ", "Unknown"))

# Check the number of remaining rows
print(crash_data.count())

104005


In [9]:
crash_data.groupBy("ROAD_LANE").count().show()
crash_data.groupBy("CRASH_DIRN_DESC").count().show()

+---------+-----+
|ROAD_LANE|count|
+---------+-----+
|  Unknown|   54|
|        O|  674|
|        1|14251|
|        2|89026|
+---------+-----+

+---------------+-----+
|CRASH_DIRN_DESC|count|
+---------------+-----+
|        Unknown|35295|
|          South|20977|
|           East|13574|
|           West|13554|
|          North|20605|
+---------------+-----+



### 3.3 New Data Construction
In an attempt to reduce the granularity within our data set, we decided to reclassify two variables (**LIGHT** and **CRASH_SEV**) into two new variables (**DARK_LIGHT** and **FATAL_OR_SERIOUS**).

**DARK_LIGHT** is a categorical variable derived from **LIGHT** which tells us how much natural light was around the environment when the crash happened. Possible values of *Light*, *Dark* or *Unknown*.

**FATAL_OR_SERIOUS** is a boolean flag derived from **CRASH_SEV** which tells us whether a record is fatal or serious injury related crash.

In [10]:
crash_data = crash_data.withColumn(
    "DARK_LIGHT",
    when(
        crash_data["LIGHT"].isin(["Bright Sun", "Overcast"]),
        "Light"
    ).otherwise(
        when(
            crash_data["LIGHT"].isin(["Dark", "Twilight"]),
            "Dark"
        ).otherwise("Unknown")
    )
)

crash_data = crash_data.withColumn(
    "FATAL_OR_SERIOUS",
    when(
        crash_data["CRASH_SEV"].isin(["F", "S"]),
        1
    ).otherwise(0)
)

In [11]:
crash_data.groupBy("DARK_LIGHT").count().show()
crash_data.groupBy("FATAL_OR_SERIOUS").count().show()

+----------+-----+
|DARK_LIGHT|count|
+----------+-----+
|   Unknown|  214|
|     Light|69535|
|      Dark|34256|
+----------+-----+

+----------------+-----+
|FATAL_OR_SERIOUS|count|
+----------------+-----+
|               1| 6915|
|               0|97090|
+----------------+-----+



### 3.4 Data Integration
As previously mentioned in section 2.2, the data set used in this work was sourced from three CSV files. We performed data integration step by importing all three files using spark's read csv function and combining them into a single data frame.

We let spark's read csv function automatically determine the data type for each column by setting *inferSchema* flag to *True* and confirmed that data type for each column was correctly assigned.

No issues were encountered during the execution of this step.

### 3.5 Data Formatting
The creation of new variables (**DARK_LIGHT** and **FATAL_OR_SERIOUS**) which are derived from existing variables (**LIGHT** and **CRASH_SEV**) means that **LIGHT** and **CRASH_SEV** would have to be excluded from our data set.

In [12]:
# Exclude variables that we used to derive DARK_LIGHT and FATAL_OR_SERIOUS variables from
crash_data = crash_data.drop("LIGHT", "CRASH_SEV")

# Confirm data types of new variables
crash_data.printSchema()

root
 |-- CRASH_YEAR: integer (nullable = true)
 |-- MULTI_VEH: string (nullable = true)
 |-- HOLIDAY: string (nullable = true)
 |-- LG_REGION_DESC: string (nullable = true)
 |-- EASTING: integer (nullable = true)
 |-- NORTHING: integer (nullable = true)
 |-- CRASH_LOCN1: string (nullable = true)
 |-- CRASH_LOCN2: string (nullable = true)
 |-- OUTDTD_LOCN_DESC: string (nullable = true)
 |-- CRASH_RP_RS: integer (nullable = true)
 |-- INTERSECTION: string (nullable = true)
 |-- JUNCTION_TYPE: string (nullable = true)
 |-- CR_RD_SIDE_RD: integer (nullable = true)
 |-- CRASH_DIRN_DESC: string (nullable = true)
 |-- CRASH_DIST: integer (nullable = true)
 |-- CRASH_RP_DIRN_DESC: string (nullable = true)
 |-- DIRN_ROLE1_DESC: string (nullable = true)
 |-- CRASH_RP_DISP: integer (nullable = true)
 |-- CRASH_SH_DESC: string (nullable = true)
 |-- CRASH_RP_SH: string (nullable = true)
 |-- CRASH_RP_NEWS_DESC: string (nullable = true)
 |-- INTSN_MIDBLOCK: string (nullable = true)
 |-- FLAT_HILL:

We then encode all categorical variables in our data set using *StringIndexer* class available in *pyspark.ml.feature* package. Categorical variable encoding is needed because most machine learning models cannot handle categorical variables unless they are converted to numerical values.

In [13]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

In [14]:
# Get list of categorical columns
cat_cols = [f.name for f in crash_data.schema.fields if f.dataType.simpleString() == "string"]

# Get list of non-categorical columns and remove our target variable from the list
noncat_cols = [f.name for f in crash_data.schema.fields if f.dataType.simpleString() != "string"]
noncat_cols.remove("FATAL_OR_SERIOUS")

# Build StringIndexer stages
strIdx = [StringIndexer(inputCol = c, outputCol = "STRIDX_" + c) for c in cat_cols]

pipe = Pipeline(stages = strIdx)

indexed_data = pipe.fit(crash_data).transform(crash_data)