In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
def load_and_process_data(data1, data2, data3, data4, common_key, target_column):
    """Charge, fusionne et nettoie les fichiers CSV."""
    caract = pd.read_csv(data1, sep=";", low_memory=False)
    lieux = pd.read_csv(data2, sep=";", low_memory=False)
    usagers = pd.read_csv(data3, sep=";", low_memory=False)
    vehicules = pd.read_csv(data4, sep=";", low_memory=False)
    
    merged_df = caract.merge(lieux, on=common_key, how="inner") \
                      .merge(usagers, on=common_key, how="inner") \
                      .merge(vehicules, on=common_key, how="inner")
    
    print(f"Colonnes disponibles après fusion : {merged_df.columns.tolist()}")

    merged_df = merged_df.dropna(axis=1, how='all')
    merged_df = merged_df.loc[:, merged_df.nunique() > 1]
    
    y = merged_df[target_column]
    X = merged_df.drop(columns=[target_column, common_key])
    return X, y

def correlation_matrix(X, y, threshold=0.1):
    """Filtre les colonnes numériques basées sur leur corrélation avec la cible."""
    correlations = {}
    for col in X.select_dtypes(include=[np.number]).columns:
        corr = np.corrcoef(X[col], y)[0, 1]
        correlations[col] = corr
    selected = [col for col, corr in correlations.items() if abs(corr) > threshold]
    print(f"Colonnes numériques retenues (corrélation > {threshold}): {selected}")
    return selected

def categorical_analysis(X, y, threshold=0.2):
    """Filtre les colonnes catégorielles en se basant sur la variance des moyennes."""
    X_with_target = X.copy()
    X_with_target['target'] = y

    selected = []
    for col in X.select_dtypes(include='object').columns:
        means = X_with_target.groupby(col)['target'].mean()
        if means.var() > threshold:
            selected.append(col)
    print(f"Colonnes catégorielles retenues (variance > {threshold}): {selected}")
    return selected

def low_variance_filter(X, threshold=0.01):
    """Filtre les colonnes numériques avec une faible variance."""
    X_numeric = X.select_dtypes(include=[np.number])
    variances = X_numeric.var()
    selected = variances[variances > threshold].index.tolist()
    print(f"Colonnes numériques retenues (variance > {threshold}): {selected}")
    return selected


def auto_handle_nan(df, nan_threshold_delete=0.5, nan_threshold_impute=0.1):
    """
    Traite automatiquement les valeurs NaN dans un dataset.
    - Supprime les colonnes avec trop de NaN.
    - Impute (remplace) les NaN avec des stratégies adaptées :
      - Moyenne pour colonnes numériques.
      - Mode ou "Inconnu" pour colonnes catégorielles.
    """
    print("Analyse des NaN dans le dataset...\n")
    df = df.replace(-1, 1)
    nan_percent = df.isnull().mean()
    print("Pourcentage de valeurs manquantes par colonne :")
    print(nan_percent)
    
    cols_to_delete = nan_percent[nan_percent > nan_threshold_delete].index
    print(f"\nColonnes supprimées (trop de NaN > {nan_threshold_delete*100}%): {list(cols_to_delete)}")
    df = df.drop(columns=cols_to_delete)
    
    for col in df.columns:
        missing = df[col].isnull().sum()
        if missing > 0:
            if df[col].dtype == 'object':
                if nan_percent[col] > nan_threshold_impute:
                    print(f"Colonne '{col}' : Imputation avec 'Manquant' (catégorielle)")
                    df[col] = df[col].fillna("Manquant")
                else:
                    print(f"Colonne '{col}' : Imputation avec la valeur la plus fréquente (mode)")
                    df[col] = df[col].fillna(df[col].mode()[0])
            else:
                if nan_percent[col] > nan_threshold_impute:
                    print(f"Colonne '{col}' : Imputation avec la médiane (numérique)")
                    df[col] = df[col].fillna(df[col].median())
                else:
                    print(f"Colonne '{col}' : Imputation avec la moyenne (numérique)")
                    df[col] = df[col].fillna(df[col].mean())
    
    print("\nTraitement des NaN terminé.")
    return df


In [3]:
def preprocess_data(data1, data2, data3, data4, common_key, target_column):
    print("Chargement et fusion des données...")
    X, y = load_and_process_data(data1, data2, data3, data4, common_key, target_column)

    print("\nSuppression des doublons...")
    print("Nombre de doublons avant suppression :", X.duplicated().sum())
    X = X.drop_duplicates()
    print("Nombre de doublons après suppression :", X.duplicated().sum())

    print("\nTraitement des valeurs NaN...")
    X = auto_handle_nan(X)

    print("\nSélection des colonnes numériques importantes...")
    numeric_cols_corr = correlation_matrix(X, y, threshold=0.1)
    numeric_cols_var = low_variance_filter(X, threshold=0.1)

    print("\nSélection des colonnes catégorielles importantes...")
    categorical_cols = categorical_analysis(X, y, threshold=2)

    selected_columns = list(set(numeric_cols_var + categorical_cols))
    print(f"\nColonnes finales sélectionnées : {selected_columns}")

    X_filtered = X[selected_columns]
    # final_data = pd.concat([X_filtered, y], axis=1)

    print("\nRésumé des colonnes importantes pour la prédiction :")
    print(f"Nombre de colonnes finales : {len(X_filtered.columns)}")
    print(X_filtered.head())
    return X_filtered, y

In [None]:
import numpy as np

class NeuralNetwork:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01, epochs=500):
        print("Initialisation du NeuralNetwork...")
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.learning_rate = learning_rate
        self.epochs = epochs

        self.weights_input_hidden = np.random.randn(self.input_size, self.hidden_size) * 0.1
        self.bias_hidden = np.zeros((1, self.hidden_size))
        self.weights_hidden_output = np.random.randn(self.hidden_size, self.output_size) * 0.1
        self.bias_output = np.zeros((1, self.output_size))

    def softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def softmax_derivative(self, z):
        return z * (1 - z)

    def fit(self, X, y, batch_size=32):
        print("Début de l'entraînement du NeuralNetwork...")
        y_one_hot = np.eye(self.output_size)[y]

        for epoch in range(self.epochs):
            if epoch % 50 == 0:
                print(f"Époque {epoch}/{self.epochs}")
            for i in range(0, X.shape[0], batch_size):
                X_batch = X[i:i + batch_size]
                y_batch = y_one_hot[i:i + batch_size]

                hidden_layer_activation = np.dot(X_batch, self.weights_input_hidden) + self.bias_hidden
                hidden_layer_output = self.softmax(hidden_layer_activation)
                final_layer_activation = np.dot(hidden_layer_output, self.weights_hidden_output) + self.bias_output
                output = self.softmax(final_layer_activation)

                error = y_batch - output
                d_output = error

                d_hidden_layer = np.dot(d_output, self.weights_hidden_output.T) * self.softmax_derivative(hidden_layer_output)

                self.weights_hidden_output += np.dot(hidden_layer_output.T, d_output) * self.learning_rate
                self.bias_output += np.sum(d_output, axis=0, keepdims=True) * self.learning_rate
                self.weights_input_hidden += np.dot(X_batch.T, d_hidden_layer) * self.learning_rate
                self.bias_hidden += np.sum(d_hidden_layer, axis=0, keepdims=True)

    def predict(self, X):
        hidden_layer_activation = np.dot(X, self.weights_input_hidden) + self.bias_hidden
        hidden_layer_output = self.softmax(hidden_layer_activation)
        final_layer_activation = np.dot(hidden_layer_output, self.weights_hidden_output) + self.bias_output
        output = self.softmax(final_layer_activation)
        return np.argmax(output, axis=1)

class LogisticRegression:
    def __init__(self, learning_rate=0.01, epochs=500):
        print("Initialisation de la LogisticRegression...")
        self.learning_rate = learning_rate
        self.epochs = epochs

    def fit(self, X, y):
        print("Début de l'entraînement de la LogisticRegression...")
        self.weights = np.zeros((X.shape[1], len(np.unique(y))))
        self.bias = np.zeros((1, len(np.unique(y))))
        m = len(y)
        y_one_hot = np.eye(len(np.unique(y)))[y]

        for epoch in range(self.epochs):
            if epoch % 50 == 0:
                print(f"Époque {epoch}/{self.epochs}")
            linear_model = np.dot(X, self.weights) + self.bias
            predictions = self.softmax(linear_model)

            dw = (1 / m) * np.dot(X.T, (predictions - y_one_hot))
            db = (1 / m) * np.sum(predictions - y_one_hot, axis=0, keepdims=True)

            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db

    def softmax(self, z):
        exp_z = np.exp(z - np.max(z, axis=1, keepdims=True))
        return exp_z / np.sum(exp_z, axis=1, keepdims=True)

    def predict(self, X):
        linear_model = np.dot(X, self.weights) + self.bias
        predictions = self.softmax(linear_model)
        return np.argmax(predictions, axis=1)

class XGBoost:
    def __init__(self, learning_rate=0.1, n_estimators=100, max_depth=3):
        print("Initialisation de XGBoost...")
        self.learning_rate = learning_rate
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.models = []

    def fit(self, X, y):
        print("Début de l'entraînement de XGBoost...")
        y_one_hot = np.eye(len(np.unique(y)))[y]
        residual = y_one_hot.copy()

        for i in range(self.n_estimators):
            model = LogisticRegression(learning_rate=self.learning_rate, epochs=100)
            model.fit(X, np.argmax(residual, axis=1))
            predictions = model.predict(X)
            predictions_one_hot = np.eye(len(np.unique(y)))[predictions]
            residual -= predictions_one_hot
            self.models.append(model)

    def predict(self, X):
        predictions = np.zeros((X.shape[0], len(self.models[0].weights[0])))
        for model in self.models:
            pred = model.predict(X)
            predictions += np.eye(len(predictions[0]))[pred]
        return np.argmax(predictions, axis=1)

def train_test_split(X, y, test_size=0.2, random_state=None):
    print("Division des données en ensembles d'entraînement et de validation...")
    if random_state:
        np.random.seed(random_state)
    indices = np.arange(len(X))
    np.random.shuffle(indices)

    split_idx = int(len(X) * (1 - test_size))
    train_indices = indices[:split_idx]
    test_indices = indices[split_idx:]

    X = np.array(X)
    y = np.array(y)

    return X[train_indices], X[test_indices], y[train_indices], y[test_indices]

def manual_hyperparameter_search(X, y):
    print("Début de la recherche manuelle des hyperparamètres...")
    if isinstance(X, (pd.DataFrame, pd.Series)):
        X = np.array(X)
    if isinstance(y, (pd.DataFrame, pd.Series)):
        y = np.array(y)

    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    print(f"Train size: {len(X_train)}, Validation size: {len(X_val)}")
    print(f"Proportion de classes dans y_train : {np.mean(y_train)}")
    print(f"Proportion de classes dans y_val : {np.mean(y_val)}")

    best_f1 = 0
    best_model = None
    best_conf_matrix = None

    for learning_rate in [0.01, 0.05, 0.1]:
        for epochs in [50, 100, 150]:  # Réduction des epochs pour éviter le sur-apprentissage
            for hidden_size in [10, 20, 30]:
                model = NeuralNetwork(input_size=X_train.shape[1], hidden_size=hidden_size, output_size=len(np.unique(y)), learning_rate=learning_rate, epochs=epochs)
                model.fit(X_train, y_train)
                f1, conf_matrix = evaluate_model(model, X_val, y_val)

                print(f"Testé: lr={learning_rate}, epochs={epochs}, hidden_size={hidden_size} => F1={f1}")

                if f1 > best_f1:
                    best_f1 = f1
                    best_model = model
                    best_conf_matrix = conf_matrix

    for learning_rate in [0.1, 0.2]:
        for n_estimators in [50, 100]:
            model = XGBoost(learning_rate=learning_rate, n_estimators=n_estimators, max_depth=3)
            model.fit(X_train, y_train)
            f1, conf_matrix = evaluate_model(model, X_val, y_val)

            print(f"Testé: lr={learning_rate}, n_estimators={n_estimators} => F1={f1}")

            if f1 > best_f1:
                best_f1 = f1
                best_model = model
                best_conf_matrix = conf_matrix

    print("Recherche manuelle terminée.")
    print("Meilleur F1:", best_f1)
    print("Matrice de confusion:\n", best_conf_matrix)
    return best_model, best_f1, best_conf_matrix


def f1_score_manual(y_true, y_pred):
    f1_scores = []
    for label in np.unique(y_true):
        tp = np.sum((y_true == label) & (y_pred == label))
        fp = np.sum((y_true != label) & (y_pred == label))
        fn = np.sum((y_true == label) & (y_pred != label))
        if tp + fp == 0 or tp + fn == 0:
            f1_scores.append(0)
        else:
            precision = tp / (tp + fp)
            recall = tp / (tp + fn)
            f1_scores.append(2 * (precision * recall) / (precision + recall))
    return np.mean(f1_scores)

def confusion_matrix_manual(y_true, y_pred):
    num_classes = len(np.unique(y_true))
    conf_matrix = np.zeros((num_classes, num_classes), dtype=int)
    for t, p in zip(y_true, y_pred):
        conf_matrix[t, p] += 1
    return conf_matrix

def evaluate_model(model, X_val, y_val):
    predictions = model.predict(X_val)
    print("Prédictions :", predictions[:10])
    print("Vraies valeurs :", y_val[:10])
    f1 = f1_score_manual(y_val, predictions)
    conf_matrix = confusion_matrix_manual(y_val, predictions)
    return f1, conf_matrix

if __name__ == "__main__":
    print("Début du script...")
    X, y = preprocess_data("data/caract-2023.csv", "data/lieux-2023.csv", "data/usagers-2023.csv", "data/vehicules-2023.csv", "Num_Acc", "grav")
    X = (X - np.mean(X, axis=0)) / np.std(X, axis=0)

    best_model, best_f1, best_conf_matrix = manual_hyperparameter_search(X, y)

    with open("best_model.pkl", "wb") as f:
        np.save(f, best_model)

    print("Modèle sauvegardé dans 'best_model.pkl'.")


Début du script...
Chargement et fusion des données...
Colonnes disponibles après fusion : ['Num_Acc', 'jour', 'mois', 'an', 'hrmn', 'lum', 'dep', 'com', 'agg', 'int', 'atm', 'col', 'adr', 'lat', 'long', 'catr', 'voie', 'v1', 'v2', 'circ', 'nbv', 'vosp', 'prof', 'pr', 'pr1', 'plan', 'lartpc', 'larrout', 'surf', 'infra', 'situ', 'vma', 'id_usager', 'id_vehicule_x', 'num_veh_x', 'place', 'catu', 'grav', 'sexe', 'an_nais', 'trajet', 'secu1', 'secu2', 'secu3', 'locp', 'actp', 'etatp', 'id_vehicule_y', 'num_veh_y', 'senc', 'catv', 'obs', 'obsm', 'choc', 'manv', 'motor', 'occutc']

Suppression des doublons...
Nombre de doublons avant suppression : 0
Nombre de doublons après suppression : 0

Traitement des valeurs NaN...
Analyse des NaN dans le dataset...

Pourcentage de valeurs manquantes par colonne :
jour             0.000000
mois             0.000000
hrmn             0.000000
lum              0.000000
dep              0.000000
com              0.000000
agg              0.000000
int       