In [1]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# Imports
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType, col, when, max, min, rand, hour, minute, expr
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from functools import reduce

In [2]:
# Building session now
spark = SparkSession.builder.appName('ML_with_spark').getOrCreate()

In [3]:
df1 = spark.read.csv('/mnt/bdpa23-group14-pvc/accidents_2005_to_2007.csv', header=True, inferSchema=True)
df2 = spark.read.csv('/mnt/bdpa23-group14-pvc/accidents_2009_to_2011.csv', header=True, inferSchema=True)
df3 = spark.read.csv('/mnt/bdpa23-group14-pvc/accidents_2012_to_2014.csv', header=True, inferSchema=True)
# Combine the DataFrames into a single DataFrame
data = df1.union(df2).union(df3)

In [4]:
data = data.drop("Junction_Detail")
data = data.drop("Junction_Control")
data = data.drop("LSOA_of_Accident_Location")
data = data.withColumnRenamed("Local_Authority_(District)", "Local_Authority_District")
data = data.withColumnRenamed("Local_Authority_(Highway)", "Local_Authority_Highway")
data = data.withColumnRenamed("Pedestrian_Crossing-Human_Control", "Pedestrian_Crossing_Human_Control")
data = data.withColumnRenamed("Pedestrian_Crossing-Physical_Facilities", "Pedestrian_Crossing_Physical_Facilities")
data = data.dropna(how='any', thresh=None, subset=None)

In [5]:
df = data
# separate the dataframe into two based on the labels
df_majority = df.filter(col("Accident_Severity") == 3)
df_minority_1 = df.filter(col("Accident_Severity") == 2)
df_minority_2 = df.filter(col("Accident_Severity") == 1)
majority = df_majority.count()
minority_1 = df_minority_1.count()
minority_2 = df_minority_2.count()
print(majority)
print(minority_1)
print(minority_2)

1275966
204119
19415


In [6]:
# Downsample frequent classes
ratio_majority = float(minority_1 / majority)
ratio_minority_2 = float(minority_1 / minority_2)
print(ratio_majority, ratio_minority_2)

0.15997213091884893 10.51346896729333


In [7]:
# Resample to get balanced data between classes
df_oversampled_minority_2 = df_minority_2.sample(True, ratio_minority_2, seed=42)
df_undersampled_majority = df_majority.sample(False, ratio_majority, seed=42)
print(df_undersampled_majority.count())
print(df_oversampled_minority_2.count())
print(df_minority_1.count())

203547
203954
204119


In [12]:
# Combine into a new dataframe
balanced_df = df_oversampled_minority_2.union(df_undersampled_majority).union(df_minority_1)

In [13]:
#functions
def get_string_mapping(col_name, dataframe):
    print("Getting mapping")
    unique_strings = [row[0] for row in dataframe.select(col_name).distinct().collect()]
    mapping = dict(zip(unique_strings, range(len(unique_strings))))
    return mapping

def get_embedding(col_name, mapping, dataframe):
    print("Getting embedding")
    when_expr = reduce(lambda a, b: a.when(dataframe[col_name] == b, mapping[b]),
                       mapping, when(dataframe[col_name].isNull(), None))
    dataframe = dataframe.withColumn(col_name + "_embedded", when_expr)
    dataframe = dataframe.drop(col_name)
    dataframe = dataframe.withColumnRenamed(col_name + "_embedded", col_name)
    return dataframe

def normalize_column(col_name, dataframe):
    print("Normalizing")
    max_value = dataframe.select(max(col_name)).collect()[0][0]
    dataframe = dataframe.withColumn(f"{col_name}_normalized", expr("{} / {}".format(col_name, max_value))).drop(col_name)
    dataframe = dataframe.withColumnRenamed(col_name + "_normalized", col_name)
    return dataframe 

def embed_time(df):
    print("Embedding time")
    # Convert to seconds since midnight
    df = df.withColumn('Time', hour('Time') * 3600 + minute('Time') * 60)
    return df

def get_col_names(df):
    col_names = []
    for col in df.dtypes:
        col_names.append((col[0], col[1]))
    return col_names

def normalize_column_2(col_name, dataframe):
    max_value = dataframe.select(max(col_name)).collect()[0][0]
    min_value = dataframe.select(min(col_name)).collect()[0][0]
    diff = max_value - min_value
    dataframe = dataframe.withColumn(f"{col_name}_normalized",
                                     expr("(({} - {}) / {}) * 2 - 1".format(col_name, min_value, diff))).drop(col_name)
    dataframe = dataframe.withColumnRenamed(col_name + "_normalized", col_name)
    return dataframe 
def handle_dataframe(df):
    df_cols = get_col_names(df)
    print("Starting dataframe handling")
    for col in df_cols:
        col_name = col[0]
     
        if col_name == "Accident_Severity":
            df = df.withColumnRenamed("Accident_Severity", "label")
        elif col_name == "Accident_Index":
            pass
        else:
            print(f"Handling {col_name}")
            if col[1] == 'string':
                if col_name == "Time":
                    df = embed_time(df)
                else:
                    mapping = get_string_mapping(col_name, df)
                    df = get_embedding(col_name, mapping, df)
                    
            df = normalize_column_2(col_name, df)
    return df

In [14]:
data = handle_dataframe(balanced_df)
data.show(3, vertical=True)

Starting dataframe handling
Handling Location_Easting_OSGR
Handling Location_Northing_OSGR
Handling Longitude
Handling Latitude
Handling Police_Force
Handling Number_of_Vehicles
Handling Number_of_Casualties
Handling Date
Getting mapping
Getting embedding
Handling Day_of_Week
Handling Time
Embedding time
Handling Local_Authority_District
Handling Local_Authority_Highway
Getting mapping
Getting embedding
Handling 1st_Road_Class
Handling 1st_Road_Number
Handling Road_Type
Getting mapping
Getting embedding
Handling Speed_limit
Handling 2nd_Road_Class
Handling 2nd_Road_Number
Handling Pedestrian_Crossing_Human_Control
Getting mapping
Getting embedding
Handling Pedestrian_Crossing_Physical_Facilities
Getting mapping
Getting embedding
Handling Light_Conditions
Getting mapping
Getting embedding
Handling Weather_Conditions
Getting mapping
Getting embedding
Handling Road_Surface_Conditions
Getting mapping
Getting embedding
Handling Special_Conditions_at_Site
Getting mapping
Getting embedding
Ha

In [15]:
data.columns

['Accident_Index',
 'label',
 'Location_Easting_OSGR',
 'Location_Northing_OSGR',
 'Longitude',
 'Latitude',
 'Police_Force',
 'Number_of_Vehicles',
 'Number_of_Casualties',
 'Date',
 'Day_of_Week',
 'Time',
 'Local_Authority_District',
 'Local_Authority_Highway',
 '1st_Road_Class',
 '1st_Road_Number',
 'Road_Type',
 'Speed_limit',
 '2nd_Road_Class',
 '2nd_Road_Number',
 'Pedestrian_Crossing_Human_Control',
 'Pedestrian_Crossing_Physical_Facilities',
 'Light_Conditions',
 'Weather_Conditions',
 'Road_Surface_Conditions',
 'Special_Conditions_at_Site',
 'Carriageway_Hazards',
 'Urban_or_Rural_Area',
 'Did_Police_Officer_Attend_Scene_of_Accident',
 'Year']

In [16]:
data.printSchema()

root
 |-- Accident_Index: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- Location_Easting_OSGR: double (nullable = true)
 |-- Location_Northing_OSGR: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Police_Force: double (nullable = true)
 |-- Number_of_Vehicles: double (nullable = true)
 |-- Number_of_Casualties: double (nullable = true)
 |-- Date: double (nullable = true)
 |-- Day_of_Week: double (nullable = true)
 |-- Time: double (nullable = true)
 |-- Local_Authority_District: double (nullable = true)
 |-- Local_Authority_Highway: double (nullable = true)
 |-- 1st_Road_Class: double (nullable = true)
 |-- 1st_Road_Number: double (nullable = true)
 |-- Road_Type: double (nullable = true)
 |-- Speed_limit: double (nullable = true)
 |-- 2nd_Road_Class: double (nullable = true)
 |-- 2nd_Road_Number: double (nullable = true)
 |-- Pedestrian_Crossing_Human_Control: double (nullable = true)
 |-- Pedestrian_

In [17]:
assembler = VectorAssembler(
  inputCols=['Location_Easting_OSGR',
             'Location_Northing_OSGR',
             'Longitude',
             'Police_Force',
             'Number_of_Vehicles',
             'Number_of_Casualties',
             'Date',
             'Day_of_Week',
             'Time',
             'Local_Authority_District',
             'Local_Authority_Highway',
             '1st_Road_Class',
             '1st_Road_Number',
             'Road_Type',
             'Speed_limit',
             '2nd_Road_Class',
             '2nd_Road_Number',
             'Pedestrian_Crossing_Human_Control',
             'Pedestrian_Crossing_Physical_Facilities',
             'Light_Conditions',
             'Weather_Conditions',
             'Road_Surface_Conditions',
             'Special_Conditions_at_Site',
             'Carriageway_Hazards',
             'Urban_or_Rural_Area',
             'Did_Police_Officer_Attend_Scene_of_Accident',
             'Year'],
              outputCol="features")

output = assembler.transform(data)

In [18]:
"""assembler = VectorAssembler(
  inputCols=[
             'Number_of_Vehicles',
             'Number_of_Casualties'],
              outputCol="features")

output = assembler.transform(data)"""

'assembler = VectorAssembler(\n  inputCols=[\n             \'Number_of_Vehicles\',\n             \'Number_of_Casualties\'],\n              outputCol="features")\n\noutput = assembler.transform(data)'

In [19]:
categorical_columns = ['Accident_Index']
indexer = StringIndexer(inputCol='Accident_Index', outputCol="Accident_Index_index")

In [20]:
output_fixed = indexer.fit(output).transform(output)
final_data = output_fixed.select("features",'label')

In [21]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [23]:
dtc = DecisionTreeClassifier(labelCol='label',featuresCol='features')
rfc = RandomForestClassifier(labelCol='label',featuresCol='features')
#gbt = GBTClassifier(labelCol='label',featuresCol='features')

In [24]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
#gbt_model = gbt.fit(train_data)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o22086.fit

In [None]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
#gbt_predictions = gbt_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
#gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [None]:
print("Here are the results!")
print('-'*50)
print('A single decision tree had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*50)
print('A random forest ensemble had an accuracy of: {0:2.2f}%'.format(rfc_acc*100))

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import itertools
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')
        print(cm)
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
        horizontalalignment="center",
        color="white" if cm[i, j] > thresh else "black")
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
#Get Class labels
class_temp_dtc = dtc_predictions.select("label").groupBy("label").count().sort('label', ascending=False).toPandas()
print(class_temp_dtc)
class_temp_rtc = rfc_predictions.select("label").groupBy("label").count().sort('label', ascending=False).toPandas()
print(class_temp_rtc)

In [None]:
#Calculate confusion matrix for dtc
from sklearn.metrics import confusion_matrix
y_true = dtc_predictions.select("label")
y_true = y_true.toPandas()
y_pred = dtc_predictions.select("prediction")
y_pred = y_pred.toPandas()
cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_temp_dtc['label'])
cnf_matrix



In [None]:
#Plotting Results
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_temp_dtc['label'].values, title='Decision tree confusion matrix, without normalization')
plt.show()

In [None]:

#Calculate confusion matrix for rtc
from sklearn.metrics import confusion_matrix
y_true = rfc_predictions.select("label")
y_true = y_true.toPandas()
y_pred = rfc_predictions.select("prediction")
y_pred = y_pred.toPandas()
cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_temp_rtc['label'])
cnf_matrix

In [None]:
#Plotting Results
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_temp_rtc['label'].values, title='Random forest confusion matrix, without normalization')
plt.show()