In [3]:
import pandas as pd
import random
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

Handling data

In [8]:
df = pd.read_csv("dataset_traffic_accident_prediction1.csv")
df = df.drop("Accident_Severity", axis=1).drop("Number_of_Vehicles", axis=1) # Not predicting those labels; only predicting if accident occurs or not
missing_data = df[df.isnull().any(axis=1)]  # Preserved for predicting the missing data
df = df.dropna()
#pd.concat([X_test, y_test], axis=1)

In [263]:
def class_counts(df):
    counts = {}
    def count(row):
        if(row.iloc[-1] not in counts):
            counts[row.iloc[-1]] = 0
        counts[row.iloc[-1]] += 1
    df.apply(count, axis=1)
    
    return counts

class DecisionTree:
    def __init__(self):
        self.forest_tree = False
        self.n_random_features = 0
        self.min_samples_split = 0
        self.min_gain = 0

    class Question:
        def __init__(self, feature, value):
            self.feature = feature
            self.value = value

        def match(self, selected_row):
            val = selected_row[self.feature]
            return val >= self.value
    
    def information_gain(self, current_impurity, true_df, false_df):
        p = float(true_df.shape[0]) / (true_df.shape[0] + false_df.shape[0])
        return current_impurity - p * self.gini_impurity(true_df) - (1-p) * self.gini_impurity(false_df)
        
    def gini_impurity(self, df):
        impurity = 1

        if(df.shape[0] == 0):
            return 0

        counts = class_counts(df)

        for label in counts:
            prob_of_label = counts[label] / float(df.shape[0])
            impurity -= prob_of_label**2 

        return impurity
        
    def partition(self, df, question):
        matches = df.apply(lambda row: question.match(row), axis=1)
        true_df, false_df = df[matches], df[~matches]
        return true_df, false_df

    def find_best_split(self, df):
        best_gain = 0
        best_question = None
        best_true_df = None
        best_false_df = None
        current_impurity = self.gini_impurity(df)
        features = df.columns[:-1] # Excluding label - important

        if(self.forest_tree == True):
            features = random.sample(list(features), self.n_random_features)
            #print(f"Took: {features}")
        
        for feature in features:
            
            values = None

            if(feature in self.categorical_features):
                values = set(df[feature])
            else:
                sorted_pre_values = sorted(set(df[feature]))
                values = []
                for i in range(len(sorted_pre_values) - 1):
                    midpoint = (sorted_pre_values[i+1] + sorted_pre_values[i] ) / 2
                    values.append(midpoint)

                    #print(f"midpoint between {sorted_pre_values[i]} and {sorted_pre_values[i+1]} is {midpoint} ({feature})")



            #print(f"Candidates: {values}")

            for value in values:
                question = self.Question(feature, value)
                true_df, false_df = self.partition(df, question)

                if(true_df.shape[0] == 0 or false_df.shape[0] == 0):
                    continue

                gain = self.information_gain(current_impurity, true_df, false_df)

                if(gain > best_gain):
                    best_gain = gain
                    best_question = question
                    best_true_df = true_df
                    best_false_df = false_df

        #if(best_gain != 0):
            #print(f"Best gain: {best_gain} and best question: {best_question.feature, best_question.value}")

        return best_gain, best_question, best_true_df, best_false_df

    class Decision_Node:
        def __init__(self, question, true_branch, false_branch):
            self.question = question
            self.true_branch = true_branch
            self.false_branch = false_branch

    class Leaf:
        def __init__(self, df):
            self.predictions = class_counts(df)

    def build_tree(self, df, current_depth):

        if(current_depth == self.max_depth or df.shape[0] < self.min_samples_split):
            return self.Leaf(df)

        gain, question, true_df, false_df = self.find_best_split(df)

        if(gain == 0):
            return self.Leaf(df)
        
        true_branch = self.build_tree(true_df, current_depth+1)
        false_branch = self.build_tree(false_df, current_depth+1)
        
        #if(current_depth == 0):
        #    print("Tree done!")
        return self.Decision_Node(question, true_branch, false_branch)
    
    def set_to_forest_tree(self, n_random_features):
        self.forest_tree = True
        self.n_random_features = n_random_features
    
    def fit(self, df, max_depth, min_samples_split, categorical_features):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.categorical_features = categorical_features
        self.root = self.build_tree(df, 0)
    
    def predict(self, df):
        def classify(row, current_node):
            if(isinstance(current_node, self.Leaf)):
                return current_node.predictions
            
            if(current_node.question.match(row)):
                return classify(row, current_node.true_branch)
            else:
                return classify(row, current_node.false_branch)
            
        predictions = []

        #print(f"Predicting: {df}")

        predictions = df.apply(lambda row: classify(row, self.root), axis=1)

        #print(predictions.values)
        predictions = predictions.apply(lambda prediction: max(prediction, key=prediction.get))

        return predictions.values

In [254]:
categorical_features = df.select_dtypes(include=["object"]).columns

for column in categorical_features:
    print(column, df[column].unique())
    

Weather ['Rainy' 'Clear' 'Foggy' 'Stormy' 'Snowy']
Road_Type ['City Road' 'Highway' 'Mountain Road' 'Rural Road']
Time_of_Day ['Morning' 'Evening' 'Afternoon' 'Night']
Road_Condition ['Wet' 'Icy' 'Under Construction' 'Dry']
Vehicle_Type ['Car' 'Bus' 'Truck' 'Motorcycle']
Road_Light_Condition ['Artificial Light' 'Daylight' 'No Light']


In [255]:
label_encoders = {}
for column in categorical_features:
    label_encoders[column] = LabelEncoder()
    df[column] = label_encoders[column].fit_transform(df[column])

df

Unnamed: 0,Weather,Road_Type,Time_of_Day,Traffic_Density,Speed_Limit,Driver_Alcohol,Road_Condition,Vehicle_Type,Driver_Age,Driver_Experience,Road_Light_Condition,Accident
0,2,0,2,1.0,100.0,0.0,3,1,51.0,48.0,0,0.0
2,2,1,1,1.0,60.0,0.0,1,1,54.0,52.0,0,0.0
3,0,0,0,2.0,60.0,0.0,2,0,34.0,31.0,1,0.0
4,2,1,2,1.0,195.0,0.0,0,1,62.0,55.0,0,1.0
6,1,1,0,0.0,60.0,0.0,0,3,27.0,26.0,1,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...
832,3,0,1,1.0,50.0,0.0,3,1,18.0,12.0,0,0.0
835,0,1,3,2.0,30.0,0.0,0,1,23.0,15.0,1,0.0
836,2,3,1,2.0,60.0,0.0,0,2,52.0,46.0,1,1.0
838,1,1,0,2.0,60.0,0.0,0,1,25.0,19.0,0,0.0


In [286]:
X = df.drop(["Accident"], axis=1)
y = df["Accident"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)


In [276]:
def create_bootstrapped_data(df):
    return df.sample(n=df.shape[0]).reset_index(drop=True)

n_random_features = 4
max_depth = 12
min_samples_split = 6
n_trees = 10
trees = []

for i in range(n_trees):
    random_sample = create_bootstrapped_data(pd.concat([X_train, y_train], axis=1))
    tree = DecisionTree()
    tree.set_to_forest_tree(n_random_features)
    tree.fit(random_sample, max_depth, min_samples_split, categorical_features)
    trees.append(tree)
    print(i)

0
1
2
3
4
5
6
7
8
9


In [302]:
test_data_extreme = {
    "Weather": ["Stormy"],
    "Road_Type": ["Highway"],
    "Time_of_Day": ["Night"],
    "Traffic_Density": [5.0],
    "Speed_Limit": [100.0],
    "Driver_Alcohol": [1.0],
    "Road_Condition": ["Icy"],
    "Vehicle_Type": ["Motorcycle"],
    "Driver_Age": [30.0],
    "Driver_Experience": [3.0],
    "Road_Light_Condition": ["No Light"]
}
test_df_extreme = pd.DataFrame(test_data_extreme)
for column in categorical_features:
    test_df_extreme[column] = label_encoders[column].transform(test_df_extreme[column])

predictions = []
for tree in trees:
    predictions.append(tree.predict(X_test))

final_predictions = []
for i in range(len(X_test)):
    row_predictions = [predictions[j][i] for j in range(len(trees))]
    
    label_counts = {}

    for row_prediction in row_predictions: 
        if(row_prediction not in label_counts):
            label_counts[row_prediction] = 0
        label_counts[row_prediction] += 1

    final_predictions.append(max(label_counts, key=label_counts.get))

y_test = y_test.reset_index(drop=True)
true_pred = 0
for i in range(len(y_test)):
    print(f"Prediction: {final_predictions[i]} - Label: {y_test.iloc[i]}")
    if(final_predictions[i] == y_test.iloc[i]):
        true_pred += 1

print("True predictions: ", true_pred)
print("False predictions: ", len(y_test) - true_pred)

Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 1.0 - Label: 1.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 1.0 - Label: 0.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 1.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 1.0
Prediction: 0.0 - Label: 1.0
Prediction: 1.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.0 - Label: 0.0
Prediction: 0.

In [314]:
random_forest = RandomForestClassifier(n_estimators=1000,
                                       criterion="gini",
                                       min_samples_split=10,
                                       max_depth=14)

random_forest.fit(X_train, y_train)
y_pred = random_forest.predict(X_test)
print(y_pred)
accuracy_score(y_test, y_pred)

[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


0.6985294117647058