# Road Safety Dataset
## Logistic Regression

by Bernardo Augusto and Miguel Cisneiros

In [2]:
# Imports
from pyspark import SparkFiles

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import plotly.express as px



from pyspark.sql import functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


from sklearn import metrics
from sklearn.metrics import classification_report 


import findspark
findspark.init()
import pyspark # Call this only after findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [3]:
# Load the data
dataset = spark.read.csv("file://"+ SparkFiles.get("/Users/bernardoaugusto/Desktop/3º ano/1º semestre/Big Data/Project/2/Road Safety Data - Accidents 2019.csv"),header=True, sep=",", inferSchema=True)

#show top 10 rows
dataset.show(10) 

+--------------+---------------------+----------------------+---------+---------+------------+-----------------+------------------+--------------------+----------+-----------+-----+--------------------------+-------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+-------------------------+
|Accident_Index|Location_Easting_OSGR|Location_Northing_OSGR|Longitude| Latitude|Police_Force|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|      Date|Day_of_Week| Time|Local_Authority_(District)|Local_Authority_(Highway)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_

In [4]:
# Drop the -1 values from the spark dataset
dataset = dataset.filter((dataset["Light_Conditions"] != -1) & (dataset["Junction_Control"] != -1) & (dataset["2nd_Road_Class"] != -1)
                        & (dataset["Pedestrian_Crossing-Human_Control"] != -1) & (dataset["Pedestrian_Crossing-Physical_Facilities"] != -1)
                        & (dataset["Road_Surface_COnditions"] != -1) & (dataset["Special_Conditions_at_Site"] != -1)
                        & (dataset["Carriageway_Hazards"] != -1)
                        )

In [5]:
dataset.na.drop("all")

DataFrame[Accident_Index: string, Location_Easting_OSGR: int, Location_Northing_OSGR: int, Longitude: double, Latitude: double, Police_Force: int, Accident_Severity: int, Number_of_Vehicles: int, Number_of_Casualties: int, Date: string, Day_of_Week: int, Time: string, Local_Authority_(District): int, Local_Authority_(Highway): string, 1st_Road_Class: int, 1st_Road_Number: int, Road_Type: int, Speed_limit: int, Junction_Detail: int, Junction_Control: int, 2nd_Road_Class: int, 2nd_Road_Number: int, Pedestrian_Crossing-Human_Control: int, Pedestrian_Crossing-Physical_Facilities: int, Light_Conditions: int, Weather_Conditions: int, Road_Surface_Conditions: int, Special_Conditions_at_Site: int, Carriageway_Hazards: int, Urban_or_Rural_Area: int, Did_Police_Officer_Attend_Scene_of_Accident: int, LSOA_of_Accident_Location: string]

In [6]:
# drop the indexes and the correlated variables
dataset = dataset.drop("Accident_Index", "Location_Easting_OSGR", "Location_Northing_OSGR", "Police_Force", "LSOA_of_Accident_Location", 
                      "Local_Authority_(Highway)", "Time", "Date", "Longitude", "Latitude")

In [7]:
# Schema
dataset.printSchema()

root
 |-- Accident_Severity: integer (nullable = true)
 |-- Number_of_Vehicles: integer (nullable = true)
 |-- Number_of_Casualties: integer (nullable = true)
 |-- Day_of_Week: integer (nullable = true)
 |-- Local_Authority_(District): integer (nullable = true)
 |-- 1st_Road_Class: integer (nullable = true)
 |-- 1st_Road_Number: integer (nullable = true)
 |-- Road_Type: integer (nullable = true)
 |-- Speed_limit: integer (nullable = true)
 |-- Junction_Detail: integer (nullable = true)
 |-- Junction_Control: integer (nullable = true)
 |-- 2nd_Road_Class: integer (nullable = true)
 |-- 2nd_Road_Number: integer (nullable = true)
 |-- Pedestrian_Crossing-Human_Control: integer (nullable = true)
 |-- Pedestrian_Crossing-Physical_Facilities: integer (nullable = true)
 |-- Light_Conditions: integer (nullable = true)
 |-- Weather_Conditions: integer (nullable = true)
 |-- Road_Surface_Conditions: integer (nullable = true)
 |-- Special_Conditions_at_Site: integer (nullable = true)
 |-- Carriag

# ML Pipeline

In [20]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# transformer
vector_assembler = VectorAssembler(inputCols=["Number_of_Vehicles", "Number_of_Casualties", "Day_of_Week", 
                                              "Local_Authority_(District)", "1st_Road_Class", "1st_Road_Number", 
                                              "Road_Type", "Speed_limit", "Junction_Detail", "Junction_Control", "2nd_Road_Class", 
                                              "2nd_Road_Number", "Pedestrian_Crossing-Human_Control", "Pedestrian_Crossing-Physical_Facilities", 
                                              "Light_Conditions", "Weather_Conditions", "Road_Surface_Conditions", "Special_Conditions_at_Site", 
                                              "Carriageway_Hazards", "Urban_or_Rural_Area", "Did_Police_Officer_Attend_Scene_of_Accident"
                                             ],outputCol="features")
output = vector_assembler.transform(dataset)
output = output.withColumn('target', output.Accident_Severity)
output.show(5)

+-----------------+------------------+--------------------+-----------+--------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+--------------------+------+
|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Day_of_Week|Local_Authority_(District)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_Control|Pedestrian_Crossing-Physical_Facilities|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Special_Conditions_at_Site|Carriageway_Hazards|Urban_or_Rural_Area|Did_Police_Officer_Attend_Scene_of_Accident|            features|target|
+-----------------+---

# Convert into Binary

In [21]:
from pyspark.sql import functions as F

output = output.withColumn("Accident_Severity", F.when(F.col("Accident_Severity")<=2,0).otherwise(F.when(F.col("Accident_Severity")>2,1)))

output = output.withColumn("Binary Target", output.Accident_Severity)

output.select("Target", "Binary Target").show(5)

+------+-------------+
|Target|Binary Target|
+------+-------------+
|     3|            1|
|     3|            1|
|     2|            0|
|     3|            1|
|     3|            1|
+------+-------------+
only showing top 5 rows



# Standardizing features

In [22]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

scalerModel = scaler.fit(output).transform(output)

scalerModel.select("features","scaledFeatures").show(5)

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[2.0,3.0,2.0,1.0,...|[3.27972936072257...|
|[2.0,1.0,3.0,2.0,...|[3.27972936072257...|
|[1.0,1.0,3.0,2.0,...|[1.63986468036128...|
|[2.0,2.0,3.0,28.0...|[3.27972936072257...|
|[1.0,1.0,3.0,20.0...|[1.63986468036128...|
+--------------------+--------------------+
only showing top 5 rows



# Undersampling

In [23]:
from pyspark.sql.functions import col, explode, array, lit

major_df = scalerModel.filter(col("Binary Target") == 1)
minor_df = scalerModel.filter(col("Binary Target") == 0)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

ratio: 3


In [24]:
sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df_2 = sampled_majority_df.unionAll(minor_df)
scalerModel = combined_df_2
scalerModel.show(5)

+-----------------+------------------+--------------------+-----------+--------------------------+--------------+---------------+---------+-----------+---------------+----------------+--------------+---------------+---------------------------------+---------------------------------------+----------------+------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------------------------------+--------------------+------+-------------+--------------------+
|Accident_Severity|Number_of_Vehicles|Number_of_Casualties|Day_of_Week|Local_Authority_(District)|1st_Road_Class|1st_Road_Number|Road_Type|Speed_limit|Junction_Detail|Junction_Control|2nd_Road_Class|2nd_Road_Number|Pedestrian_Crossing-Human_Control|Pedestrian_Crossing-Physical_Facilities|Light_Conditions|Weather_Conditions|Road_Surface_Conditions|Special_Conditions_at_Site|Carriageway_Hazards|Urban_or_Rural_Area|Did_Police_Officer_Attend_Scene_of_Accident|            feat

In [25]:
final_data = scalerModel.select("scaledFeatures","Accident_Severity")

final_data.show(5)

+--------------------+-----------------+
|      scaledFeatures|Accident_Severity|
+--------------------+-----------------+
|[3.27972936072257...|                1|
|[3.27972936072257...|                1|
|[3.27972936072257...|                1|
|[3.27972936072257...|                1|
|[3.27972936072257...|                1|
+--------------------+-----------------+
only showing top 5 rows



In [26]:
train, test = final_data.randomSplit([0.7, 0.3],seed=1000)
print("training dataset:", str(train.count()))
print("test dataset:", str(test.count()))

training dataset: 20748
test dataset: 9036


# Logistic Regression

In [27]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn import metrics
from sklearn.metrics import classification_report 

lr = LogisticRegression(featuresCol = 'scaledFeatures',labelCol="Accident_Severity", maxIter=5)

lrModel = lr.fit(train)

In [28]:
predict_train=lrModel.transform(train)
predict_test=lrModel.transform(test)

In [29]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Accident_Severity")

predict_test.select("Accident_Severity","prediction").show(5)

print("Train score {}".format(evaluator.evaluate(predict_train)))
print("Test score {}".format(evaluator.evaluate(predict_test)))

+-----------------+----------+
|Accident_Severity|prediction|
+-----------------+----------+
|                1|       1.0|
|                1|       1.0|
|                1|       1.0|
|                1|       0.0|
|                1|       0.0|
+-----------------+----------+
only showing top 5 rows

Train score 0.6302466268000029
Test score 0.6337443454305766


In [30]:
print(classification_report(test.select("Accident_Severity").toPandas(), predict_test.select("prediction").toPandas()))

              precision    recall  f1-score   support

           0       0.56      0.37      0.44      3841
           1       0.63      0.78      0.70      5195

    accuracy                           0.61      9036
   macro avg       0.59      0.58      0.57      9036
weighted avg       0.60      0.61      0.59      9036

