In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import matplotlib.pyplot as plt
import imblearn

L'objectif est de prédire si un accident donné est mortel ou non.

In [None]:
label = "mortal"

# Lecture datasets
df1 = pd.read_csv("dataset/usagers-2022.csv", sep=';')
df2 = pd.read_csv("dataset/lieux-2022.csv", sep=';')
df3 = pd.read_csv("dataset/carcteristiques-2022.csv", sep=';')
df4 = pd.read_csv("dataset/vehicules-2022.csv", sep=';')

df4 = df4.drop(columns=['id_vehicule', 'num_veh'])


df = df1.join(df2.set_index('Num_Acc'), on='Num_Acc')
df = df.join(df3.set_index('Accident_Id'), on='Num_Acc')
df = df.join(df4.set_index('Num_Acc'), on='Num_Acc', lsuffix='_')

In [None]:
# Suppression colonnes inutiles
df = df.drop(columns=['v1', 'v2', 'pr', 'pr1', 'lartpc', 'larrout'
                      , 'num_veh', 'occutc', 'adr', 'senc','etatp','actp', 
                      'manv', 'jour', 'com', 'hrmn', 'motor', 'place', 'vosp', 'locp'])

df = df.drop_duplicates(subset=['id_usager']) # retire les doublons dans les usagers

# Remplacement des valeurs NaN
df['an_nais'] = df['an_nais'].fillna(df['an_nais'].mode()[0])

# Convertir en entier
df['id_vehicule'] = df['id_vehicule'].apply(lambda l: l[0:3] + l[4:7])
df['id_vehicule'] = df['id_vehicule'].astype(int)
df['sexe'] = df['sexe'].astype(int)
df

In [None]:
for column in df.columns:
  if df[column].isnull().values.any() == True:
    print(column, df[column].isnull().values.any()) # afficher s'il y a des valeurs nulles

On peut déjà remarquer qu'il y a des valeurs nulles pour l'attribut *an_nais*.

In [None]:
# valeurs numériques
numerical_features = list(df.select_dtypes(include=np.number).columns)

# valeurs catégorielles
categorical_features = list(set(df.columns)-set(numerical_features))
print("numerical : ", numerical_features)
print("categorical : ", categorical_features)

In [None]:
# Passage d'une caractéristique à un attribut
def to_attribute(id_valid, cat_a, cat_b):
    m = []
    for i in df['id_vehicule']:
        if i in id_valid['id_vehicule'].to_numpy():
            m.append(cat_a)
        else:
            m.append(cat_b)
    return m

In [None]:
df_2 = df.copy()
# On crée un attribut pour les accidents mortels

# Véhicule impliqué dans un accident mortel
with_death = df[df['grav'] == 2]
acc_with_death = with_death['Num_Acc']

d = []
for i in df['Num_Acc']:
    if i in acc_with_death.to_numpy():
        d.append(1)
    else:
        d.append(0)
df_2['mortal'] = d


# Accident impliquant un piéton
has_pedestrian = df[df['catu'] == 3]
# Sexe du conducteur
driver = df[(df['catu'] == 1) & (df['sexe'] == 1)]

p = to_attribute(has_pedestrian, 1, 0)
dr = to_attribute(driver, 1, 0)

df_2['pieton'] = p
df_2['sexe_conducteur'] = dr
df_2 = df_2.drop_duplicates(subset=['id_vehicule'])

In [None]:
# 1 : Bicyclette
# 2 : cyclomoteur
# 3 : VL (voiture)
# 4 : Utilitaire
# 5 : Motocyclette
# 0 : autre

# On réduit les carégories de véhicules
cat = [1, 2, 7, 33, 10]
corresp = {1:1, 2:2, 7:3, 10:4, 33:5}

r = []
for i in df_2['catv']:
    if i in cat:
        r.append(corresp[i])
    else:
        r.append(0)
df_2['catv'] = r


In [None]:
# On enlève la catégorie peu repésentées qu'on ajoute dans une catégorie autre (identifiant 5)

r = []
for i in df_2['catr']:
    if i > 4:
        r.append(5)
    else:
        r.append(i)
df_2['catr'] = r

In [None]:
# Découpage en 3 catégories de vitesse
# Identifiants:
#   1 -> <50
#   2 -> >=50 && < 100
#   3 -> >=100

v = df_2['vma']
cat = []
for i in v:
    if i < 50:
        cat.append(1)
    elif i >= 100:
        cat.append(3)
    else:
        cat.append(2)
df_2['vma'] = cat 

Il faut transformer les types de 'nbv', 'hrmn', 'dep', 'com', 'lat', 'long' de objets à respectivement : int, int, int, int, float, float

Pour dep et com : transformer les dpt corses pour leur donner un nom en chiffre et non en lettre (2A/2B)

In [None]:
# On ne garde que l'heure dans hrmn, on le concvertit en type datetime 
"""
df_2['hrmn'] = pd.to_datetime(df_2['hrmn'], format='%H:%M')

df_2['hr'] = df_2['hrmn'].dt.hour
df_2['hr'] = pd.to_numeric(df_2['hr'], errors='coerce', downcast='integer')
"""

In [None]:
#le cas de nbv en int 
df_2['nbv'] = pd.to_numeric(df_2['nbv'], errors='coerce', downcast='integer')
df_2['nbv'].fillna(2, inplace=True) # only one entry 

# invalid_entries = df_2[df_2['nbv2'].isna()]
# print("Entrées invalides : ")
# print(invalid_entries)

In [None]:
# les cas des dep

def transforme_dpt(n):
    if n[-1] == 'D': # Rhone
        return '69'
    elif n[-1] == 'M': # Lyon métropole
        return '96'
    elif n[-1] == 'A': # Corse du sud
        return '97'
    elif n[-1] == 'B': # Haute Corse
        return '98'
    else:
        return str(n)



df_2['dep'] = df_2['dep'].apply(transforme_dpt)
df_2['dep'] = pd.to_numeric(df_2['dep'], errors='coerce', downcast='integer')
"""
invalid_entries = df_2[df_2['dep2'].isna()]
print("Entrées invalides : ")
print(invalid_entries[['dep', 'dep2']])
"""

In [None]:
# Lat et long en float :

df_2['lat'] = pd.to_numeric(df_2['lat'], errors='coerce')
df_2['long'] = pd.to_numeric(df_2['long'], errors='coerce')

In [None]:
# Age du conducteur du véhicule
driver_age = df_2[(df_2['catu'] == 1)][['an_nais', 'id_vehicule', 'an']]
driver_age['an_nais'] = driver_age['an'] - driver_age['an_nais']

df_2 = df_2.join(driver_age.set_index('id_vehicule'), on='id_vehicule', lsuffix='_')
df_2 = df_2.rename(columns={'an_nais':'age'})
df_2['age'] = df_2['age'].fillna(df_2['age'].mode()[0])

# On enlève les attributs qui ne sont plus utiles
df_2 = df_2.drop(columns=['an_nais_','grav', 'sexe','catu', 'Num_Acc', 'id_usager', 'id_vehicule',
                          'secu1','secu2','secu3','an_', 'an', 'lat', 'long'])

In [None]:
for column in df_2.columns:
  if df_2[column].isnull().values.any() == True:
    print(column, df_2[column].isnull().values.any()) # afficher s'il y a des valeurs nulles


In [None]:
df_2.columns

In [None]:
print("numerical : ", numerical_features)
print("categorical : ", categorical_features)
# valeurs catégorielles
categorical_features = ['trajet', 'catr', 'circ', 'nbv', 'prof',
                        'plan', 'surf', 'infra', 'situ', 'vma', 'mois', 'lum', 'dep', 'agg', 
                        'int', 'atm', 'col', 'catv', 'obs', 'obsm', 'choc', 'mortal', 'pieton',
                        'sexe_conducteur']
# valeurs numériques
numerical_features = ['age']

In [None]:
val = [len(df_2[df_2.mortal == 1]), len(df_2[df_2.mortal == 0])]
labels = ['Accident mortel', 'Accident non mortel']

px.pie(values=val, names=labels)

In [None]:
val = [len(df_2[df_2.pieton == 1]), len(df_2[df_2.pieton == 0])]
labels = ['Implique piéton', 'N\'implique pas de piéton']

px.pie(values=val, names=labels)

In [None]:
fig = px.histogram(df_2, x="catv")
fig.show()

In [None]:
fig = px.box(df_2, x="age")
fig.show()

In [None]:
fig = px.histogram(df_2, x="sexe_conducteur")
fig.show()

In [None]:
fig = px.histogram(df_2, x="catr")
fig.show()

In [None]:
fig = px.histogram(df_2, x="col")
fig.show()

In [None]:
def rapport_corr(x, y):
  '''
  Calcule le rapport de correlation entre une variable
  qualitative (x) et une variable quantitative (y)
  x : list of the qualitative variable observations
  y : list of the quantitative variable obesvations
  x and y must be of the same  length
  '''
  if len(x)!=len(y):
    print("The two list doesn't have the same length {len(x)}!={len(y)}")
    return -1
  mean_y = np.mean(y)
  se2 = 0
  sr2 = 0
  for cat in set(x):
    y_cat = [y[i] for i in range(len(x)) if x[i]==cat]
    se2 += len(y_cat)*(np.mean(y_cat)-mean_y)**2
    sr2 += len(y_cat)*np.var(y_cat)
  se2/=len(y)
  sr2/=len(y)
  return se2/(se2 + sr2)

In [None]:
def analyse_bi_quali_quanti(quali, quanti, df):
  # Rapport de correlation
  rapp = rapport_corr(df[quali].values, df[quanti].values)
  print(f"Rapport de corr {quali} X {quanti}: {rapp} ")
  # boite a moustaches
  bam = px.box(df, x=quali, y=quanti)
  bam.show()
  # histogramme
  hist = px.histogram(df, x=quanti, color=quali, barmode='overlay', opacity=0.75)
  hist.show()

  hist_prop = px.histogram(df, x=quanti, color=quali, barmode='overlay', opacity=0.75, histnorm="probability")
  hist_prop.show()

In [None]:
features = ['age', 'mois', 'catr', 'mortal', 'sexe_conducteur']

for exp in features:
    print(exp)
    analyse_bi_quali_quanti("catv", exp, df_2)

In [None]:
features = ['age', 'mois', 'catr', 'mortal']

for exp in features:
    print(exp)
    analyse_bi_quali_quanti("vma", exp, df_2)

In [None]:
features = ['age', 'vma', 'mois', 'catr', 'mortal']

for exp in features:
    print(exp)
    analyse_bi_quali_quanti("col", exp, df_2)

In [None]:
df_3 = df_2.drop(columns=['mortal'])
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(df_3, df_2[label], test_size=0.33, random_state=42)

In [None]:
from sklearn import tree

clf = tree.DecisionTreeClassifier(random_state=42)
clf = clf.fit(X_train, y_train)

preds = clf.predict(X_test)

clf.score(X_test, y_test)

In [None]:
from sklearn.metrics import confusion_matrix
tn, fp, fn, tp = confusion_matrix(y_test, preds).ravel()
tn, fp, fn, tp

In [None]:
import plotly.express as px


fig = px.imshow([[tn, fp], [fn, tp]], text_auto=True, labels=dict(y="Truth", x="Pred"),
                x=["False", "True"],
                y=["False", "True"]
               )
fig.show()

## Générer des contrefactuels

In [None]:
import dice_ml
from dice_ml.utils import helpers

In [None]:
train_dataset = df_2.copy()
train_dataset[label] = y_train
d = dice_ml.Data(dataframe=train_dataset, continuous_features=numerical_features, outcome_name=label)

m = dice_ml.Model(model=clf, backend="sklearn")

exp = dice_ml.Dice(d, m)

In [None]:
# Generate counterfactual examples
query_instance = train_dataset.drop(columns=label)[0:4]
dice_exp = exp.generate_counterfactuals(query_instance, total_CFs=4, desired_class="opposite")
# Visualize counterfactual explanation
dice_exp.visualize_as_dataframe()