## Project 1: Road Traffic Severity Classification

In [None]:
import pandas as pd

from sklearn.ensemble import ExtraTreesClassifier
from mlxtend.evaluate import bias_variance_decomp

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
df=pd.read_csv('../data/raw/RTA Dataset.csv')

# **** EDA ****

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.columns

In [None]:
df.dtypes

In [None]:
df.info()

In [None]:
df.isnull().sum()

In [None]:
df.duplicated().sum()

### A bit of descriptive statistics

In [None]:
df.describe()

In [None]:
df.describe(include="all")

### Now some graphs

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

We verify the target variable

In [None]:
df['Accident_severity'].value_counts()

In [None]:
sns.barplot(x=df['Accident_severity'].value_counts().index,
            y=df['Accident_severity'].value_counts().values)
plt.show()

In [None]:
import missingno as mi

In [None]:
mi.bar(df)

In [None]:
mi.matrix(df)

# **** PREPROCESSING ****

Para los faltantes aplicaremos un SimpleImputer 

In [None]:
import numpy as np
from sklearn.impute import SimpleImputer

We impute the null values of the columns

In [None]:
df['Educational_level'].replace({np.nan : pd.NA})
df['Vehicle_driver_relation'].replace({np.nan : pd.NA})
df['Driving_experience'].replace({np.nan : pd.NA})
df['Type_of_vehicle'].replace({np.nan : pd.NA})
df['Owner_of_vehicle'].replace({np.nan : pd.NA})
df['Area_accident_occured'].replace({np.nan : pd.NA})
df['Lanes_or_Medians'].replace({np.nan : pd.NA})
df['Road_allignment'].replace({np.nan : pd.NA})
df['Types_of_Junction'].replace({np.nan : pd.NA})
df['Road_surface_type'].replace({np.nan : pd.NA})
df['Type_of_collision'].replace({np.nan : pd.NA})
df['Vehicle_movement'].replace({np.nan : pd.NA})

In [None]:
impute_mode = SimpleImputer(strategy = 'most_frequent', missing_values=pd.NA)

In [None]:
impute_mode.fit(df[['Educational_level',
                    'Vehicle_driver_relation',
                    'Driving_experience',
                    'Type_of_vehicle',
                    'Owner_of_vehicle',
                    'Area_accident_occured',
                    'Lanes_or_Medians',
                    'Road_allignment',
                    'Types_of_Junction',
                    'Road_surface_type',
                    'Type_of_collision',
                    'Vehicle_movement']])

df[['Educational_level',
                    'Vehicle_driver_relation',
                    'Driving_experience',
                    'Type_of_vehicle',
                    'Owner_of_vehicle',
                    'Area_accident_occured',
                    'Lanes_or_Medians',
                    'Road_allignment',
                    'Types_of_Junction',
                    'Road_surface_type',
                    'Type_of_collision',
                    'Vehicle_movement']] = impute_mode.transform(df[['Educational_level',
                                            'Vehicle_driver_relation',
                                            'Driving_experience',
                                            'Type_of_vehicle',
                                            'Owner_of_vehicle',
                                            'Area_accident_occured',
                                            'Lanes_or_Medians',
                                            'Road_allignment',
                                            'Types_of_Junction',
                                            'Road_surface_type',
                                            'Type_of_collision',
                                            'Vehicle_movement']])

In [None]:
df['Service_year_of_vehicle'].replace({np.nan : pd.NA})
df['Defect_of_vehicle'].replace({np.nan : pd.NA})
df['Work_of_casuality'].replace({np.nan : pd.NA})
df['Fitness_of_casuality'].replace({np.nan : pd.NA})

In [None]:
impute_mode.fit(df[['Service_year_of_vehicle',
                    'Defect_of_vehicle',
                    'Work_of_casuality',
                    'Fitness_of_casuality']])

df[['Service_year_of_vehicle',
                    'Defect_of_vehicle',
                    'Work_of_casuality',
                    'Fitness_of_casuality']] = impute_mode.transform(df[['Service_year_of_vehicle',
                                            'Defect_of_vehicle',
                                            'Work_of_casuality',
                                            'Fitness_of_casuality']])

In [None]:
impute_mode = SimpleImputer(strategy = 'most_frequent', missing_values='na')
impute_mode.fit(df[['Casualty_class','Sex_of_casualty','Age_band_of_casualty', 'Casualty_severity']])
df[['Casualty_class',
    'Sex_of_casualty',
    'Age_band_of_casualty', 
    'Casualty_severity']] = impute_mode.transform(df[['Casualty_class',
                                                      'Sex_of_casualty',
                                                      'Age_band_of_casualty',
                                                      'Casualty_severity']])

Now we will work with the transformation of categorical data to numbers

In [None]:
df.sample(10)

In [None]:
# una rapida transofrmacion de tipo de dato
df['Casualty_severity'] = df['Casualty_severity'].astype('int64')

Now LabelEncoder

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
df['TEMP_Day_of_week']=LabelEncoder().fit_transform(df.Day_of_week)
df['TEMP_Age_band_of_driver']=LabelEncoder().fit_transform(df.Age_band_of_driver)
df['TEMP_Educational_level']=LabelEncoder().fit_transform(df.Educational_level )
df['TEMP_Vehicle_driver_relation']=LabelEncoder().fit_transform(df.Vehicle_driver_relation )
df['TEMP_Driving_experience']=LabelEncoder().fit_transform(df.Driving_experience )
df['TEMP_Type_of_vehicle']=LabelEncoder().fit_transform(df.Type_of_vehicle )
df['TEMP_Owner_of_vehicle']=LabelEncoder().fit_transform(df.Owner_of_vehicle )
df['TEMP_Service_year_of_vehicle']=LabelEncoder().fit_transform(df.Service_year_of_vehicle )
df['TEMP_Defect_of_vehicle']=LabelEncoder().fit_transform(df.Defect_of_vehicle )
df['TEMP_Area_accident_occured']=LabelEncoder().fit_transform(df.Area_accident_occured )
df['TEMP_Lanes_or_Medians']=LabelEncoder().fit_transform(df.Lanes_or_Medians )
df['TEMP_Road_allignment']=LabelEncoder().fit_transform(df.Road_allignment )
df['TEMP_Types_of_Junction']=LabelEncoder().fit_transform(df.Types_of_Junction )
df['TEMP_Road_surface_type']=LabelEncoder().fit_transform(df.Road_surface_type )
df['TEMP_Road_surface_conditions']=LabelEncoder().fit_transform(df.Road_surface_conditions )
df['TEMP_Light_conditions']=LabelEncoder().fit_transform(df.Light_conditions )
df['TEMP_Weather_conditions']=LabelEncoder().fit_transform(df.Weather_conditions )
df['TEMP_Type_of_collision']=LabelEncoder().fit_transform(df.Type_of_collision)
df['TEMP_Vehicle_movement']=LabelEncoder().fit_transform(df.Vehicle_movement )
df['TEMP_Casualty_class']=LabelEncoder().fit_transform(df.Casualty_class)
df['TEMP_Age_band_of_casualty']=LabelEncoder().fit_transform(df.Age_band_of_casualty)
df['TEMP_Work_of_casuality']=LabelEncoder().fit_transform(df.Work_of_casuality )
df['TEMP_Fitness_of_casuality']=LabelEncoder().fit_transform(df.Fitness_of_casuality )
df['TEMP_Pedestrian_movement']=LabelEncoder().fit_transform(df.Pedestrian_movement )
df['TEMP_Cause_of_accident']=LabelEncoder().fit_transform(df.Cause_of_accident)

OneHotEncoder Sex_of_driver and Sex_of_casualty columns

In [None]:
from sklearn.preprocessing import OneHotEncoder

In [None]:
ohc = OneHotEncoder()
ohs1 = ohc.fit_transform(df.Sex_of_driver.values.reshape(-1,1)).toarray()
dfs1 = pd.DataFrame(ohs1, columns = ["Temp_sex_driver_"+str(ohc.categories_[0][i])
                               for i in range(len(ohc.categories_[0]))])
df = pd.concat([df, dfs1], axis = 1)                                   

In [None]:
ohc2 = OneHotEncoder()
ohs2 = ohc2.fit_transform(df.Sex_of_casualty.values.reshape(-1,1)).toarray()
dfs2 = pd.DataFrame(ohs2, columns = ["Temp_sex_casualty_"+str(ohc2.categories_[0][i])
                                for i in range(len(ohc2.categories_[0]))])
df = pd.concat([df, dfs2], axis = 1)   

The Time column is separated into Hours and Minutes respectively

In [None]:
df['Time_hour'] = pd.to_datetime(df['Time'], format='%H:%M:%S').dt.hour
df['Time_minute'] = pd.to_datetime(df['Time'], format='%H:%M:%S').dt.minute

In [None]:
df_encoded = df[['Time_hour', 'Time_minute', 'Number_of_vehicles_involved', 'Number_of_casualties','Casualty_severity',
                'TEMP_Day_of_week', 'TEMP_Age_band_of_driver', 'TEMP_Educational_level', 'TEMP_Vehicle_driver_relation', 
               'TEMP_Driving_experience', 'TEMP_Type_of_vehicle', 'TEMP_Owner_of_vehicle', 'TEMP_Service_year_of_vehicle', 
               'TEMP_Defect_of_vehicle', 'TEMP_Area_accident_occured', 'TEMP_Lanes_or_Medians', 'TEMP_Road_allignment',
               'TEMP_Types_of_Junction', 'TEMP_Road_surface_type', 'TEMP_Road_surface_conditions', 'TEMP_Light_conditions', 
               'TEMP_Weather_conditions', 'TEMP_Type_of_collision', 'TEMP_Vehicle_movement', 'TEMP_Casualty_class', 
               'TEMP_Age_band_of_casualty', 'TEMP_Work_of_casuality', 'TEMP_Fitness_of_casuality', 'TEMP_Pedestrian_movement', 
               'TEMP_Cause_of_accident', 'Temp_sex_driver_Female', 'Temp_sex_driver_Male', 'Temp_sex_driver_Unknown', 
               'Temp_sex_casualty_Female', 'Temp_sex_casualty_Male', 'Accident_severity']].copy()

df_encoded.sample(5)

In [None]:
df_encoded.dtypes

At this point we have the dataset, without null values and encoded in numeric values

Now we are going to work on the imbalance in the classes of the target variable 'Accident_severity'

In [None]:
import imblearn as il

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X = df_encoded.iloc[:, :-1]
y = df_encoded.iloc[:, -1]

In [None]:
X.shape

In [None]:
y.shape

SMOTETomek is used to balance the target variable

In [None]:
import collections
from imblearn.combine import SMOTETomek

smote_tomek = SMOTETomek(random_state=0)
X_resampled, y_resampled = smote_tomek.fit_resample(X, y)

In [None]:
y_resampled.shape

In [None]:
from sklearn.decomposition import PCA

In [None]:
pca = PCA().fit(X_resampled)

plt.rcParams["figure.figsize"] = (12,6)

fig, ax = plt.subplots()
xi = np.arange(1, 36, step=1)
y = np.cumsum(pca.explained_variance_ratio_)

plt.ylim(0.0,1.1)
plt.plot(xi, y, marker='o', linestyle='--', color='b')

plt.xlabel('Number of Components')
plt.xticks(np.arange(0, 36, step=1)) #change from 0-based array index to 1-based human-readable label
plt.ylabel('Cumulative variance (%)')
plt.title('The number of components needed to explain variance')

plt.axhline(y=0.95, color='r', linestyle='-')
plt.text(0.5, 0.85, '95% cut-off threshold', color = 'red', fontsize=16)

ax.grid(axis='x')
plt.show()

# **** FEATURE ENGINEERING ****

In [None]:
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif

Since in this project the input is number and the output is categorical, ANOVA F measure via f_classif() function is used to get the best features.

In [None]:
#fs = SelectKBest(score_func=f_classif, k='all')
#X_selected = fs.fit_transform(X_resampled, y_resampled)
#X_selected

In [None]:
pcaFinal = PCA(n_components=9)
X_selected = pcaFinal.fit_transform(X_resampled)
X_selected

In [None]:
X_selected.shape

# **** MODELING ****

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn import tree

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_selected , 
                                                    y_resampled, 
                                                    shuffle = True, 
                                                    test_size=0.2, 
                                                    random_state=1)

In [None]:
X_train.shape

In [None]:
y_train.shape

In [None]:
X_test.shape

In [None]:
y_test.shape

In [None]:
def check_bias_variance (m, X_train, y_train, X_test, y_test,  r):
    avg_expected_loss, avg_bias,avg_var  = bias_variance_decomp(m, 
                                                                X_train,
                                                                y_train.to_numpy(), 
                                                                X_test,
                                                                y_test.to_numpy(), 
                                                                loss='0-1_loss', 
                                                                num_rounds=r, 
                                                                random_seed=1)
    print('Bias Variance analisys')
    print('Average expected loss: %.3f' % avg_expected_loss)
    print('Average bias: %.3f' % avg_bias)
    print('Average variance: %.3f' % avg_var ) 

In [None]:
def evaluate_model(model, x_test, y_test):
    from sklearn import metrics
    y_pred = model.predict(x_test)
    f1 = metrics.f1_score(y_test, y_pred, average='micro')
    cm = metrics.confusion_matrix(y_test, y_pred)
    return {'f1': f1, 'cm': cm}

In [None]:
def generate_model(model, X_train, X_test, y_train, y_test ):
    
    if model == 'RandomForestClassifier':
        rf = RandomForestClassifier(random_state=0)
        rf.fit(X_train, y_train)
        rf_eval = evaluate_model(rf, X_test, y_test)
        print('RandomForestClassifier - F1 Score:', rf_eval['f1'])
        print('RandomForestClassifier - CM:', rf_eval['cm'])

    if model == 'GaussianNB':
            nb = GaussianNB()
            nb.fit(X_train, y_train)            
            nb_eval = evaluate_model(nb, X_test, y_test)
            # Print result
            print('GaussianNB - F1 Score:', nb_eval['f1'])

    if model == 'KNeighborsClassifier':
            knn = KNeighborsClassifier()
            knn.fit(X_train, y_train)            
            knn_eval = evaluate_model(knn, X_test, y_test)
            # Print result
            print('KNeighborsClassifier - F1 Score:', knn_eval['f1'])
            
    if model == 'DecisionTreeClassifier':
            dtc = tree.DecisionTreeClassifier(random_state=0)
            dtc.fit(X_train, y_train)            
            # Evaluate Model
            dtc_eval = evaluate_model(dtc, X_test, y_test)
            # Print result
            print('DecisionTreeClassifier - F1 Score:', dtc_eval['f1'])
    if model == 'xtree':
            xt = ExtraTreesClassifier()
            xt.fit(X_train, y_train)            
            # Evaluate Model
            dtc_eval = evaluate_model(xt, X_test, y_test)
            check_bias_variance (xt, X_train, y_train, X_test, y_test,  5)
            # Print result
            print('xtree - F1 Score:', dtc_eval['f1'])
            print('xtree - CM:', dtc_eval['cm'])
            
            

In [None]:
generate_model('RandomForestClassifier', X_train, X_test, y_train, y_test)

In [None]:
#generate_model('DecisionTreeClassifier', X_train, X_test, y_train, y_test)

In [None]:
#generate_model('KNeighborsClassifier', X_train, X_test, y_train, y_test)

In [None]:
#generate_model('GaussianNB', X_train, X_test, y_train, y_test)

In [None]:
generate_model('xtree', X_train, X_test, y_train, y_test)

In [None]:
check_bias_variance (xt, X_train, y_train, X_test, y_test,  5)

The best model was RandomForestClassifier, this is the one we will take as a base to do the hyperparameter tuning

A continuacion se hace un Hyperparameter tunning

In [None]:
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import RepeatedStratifiedKFold

In [None]:
# space = dict()
# space['max_depth'] = [11,15]
# space['min_samples_split'] = [2,3]
# space['class_weight'] = ['balanced', None]

# model = ExtraTreesClassifier()

# cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)

# search = GridSearchCV(model, space, cv = cv, scoring='f1_micro', n_jobs=-1)

# result = search.fit(X_train, y_train)

# print("Best: %f using %s" % (result.best_score_, result.best_params_))
# means = result.cv_results_['mean_test_score']
# stds = result.cv_results_['std_test_score']
# params = result.cv_results_['params']
# for mean, stdev, param in zip(means, stds, params):
#     print("%f (%f) with: %r" % (mean, stdev, param))

Here then we define a new model with the same train and test sets but with the hyperparameters found

In [None]:
rf = ExtraTreesClassifier(max_depth=15, min_samples_split = 3, class_weight = None)
rf.fit(X_train, y_train)

In [None]:
rf_eval = evaluate_model(rf, X_test, y_test)
print('FINAL RandomForestClassifier - F1 Score:', rf_eval['f1'])
print('FINAL RandomForestClassifier - Confusion Matrix: ', rf_eval['cm'])

Here are some graphs related to the feature imprtance and confusion matrix

In [None]:
importances = rf.feature_importances_
indices = np.argsort(importances)
features = df_encoded.columns
plt.figure(figsize=(20,10))
plt.title('Feature Importances')
plt.barh(range(len(indices)), importances[indices], color='g', align='center')
plt.yticks(range(len(indices)), [features[i] for i in indices])
plt.xlabel('Relative Importance')
plt.show()

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay
class_names = ['Slight Injury', 'Serious Injury', 'Fata Injury' ]

disp = ConfusionMatrixDisplay.from_estimator(
        rf,
        X_test,
        y_test,
        display_labels=class_names,
        cmap=plt.cm.Blues)

# **** SHAP ****

In [None]:
import shap as sh

In [None]:
data_sample = X_train[:1000]
shap_values = sh.TreeExplainer(rf).shap_values(data_sample[:100])
y_hat = rf.predict(data_sample)
sh.summary_plot(shap_values, data_sample, plot_type="bar")

# **** Save the model to joblib format ****

In [None]:
#import pickle

In [None]:
#pickle.dump(rf, open('../models/TrafficSeverityClassificationModel.pkl', 'wb'))