<a href="https://colab.research.google.com/github/DEEPLEARNINGTP/Lenet-5/blob/main/TP9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import zipfile
import os

# Chemin du fichier ZIP
zip_path = 'amhcd-data-64.zip'
# Dossier de destination
extract_folder = '/content'
os.makedirs(extract_folder, exist_ok=True)

# Extraction
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_folder)

print(f"Dossier extrait dans : {extract_folder}")

Dossier extrait dans : /content


In [15]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from PIL import Image
import os
import zipfile
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pickle
import warnings
warnings.filterwarnings('ignore')

class LeNet5:
    def __init__(self, learning_rate=0.001, num_classes=33):
        """
        Initialisation du réseau LeNet-5
        """
        self.learning_rate = learning_rate
        self.num_classes = num_classes

        # Initialisation du cache
        self.cache = {}

        # Initialisation des poids et biais
        self.initialize_parameters()

        # Historique d'entraînement
        self.train_loss_history = []
        self.val_loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []

    def initialize_parameters(self):
        """
        Initialisation des paramètres du réseau selon He initialization
        """
        # Couche C1: 6 filtres 5x5x1
        self.W1 = np.random.randn(6, 1, 5, 5) * np.sqrt(2.0 / (5*5*1))
        self.b1 = np.zeros((6, 1))

        # Couche C3: 16 filtres 5x5x6
        self.W3 = np.random.randn(16, 6, 5, 5) * np.sqrt(2.0 / (5*5*6))
        self.b3 = np.zeros((16, 1))

        # Couche C5: 120 neurones (16*5*5 = 400 entrées)
        self.W5 = np.random.randn(120, 400) * np.sqrt(2.0 / 400)
        self.b5 = np.zeros((120, 1))

        # Couche F6: 84 neurones
        self.W6 = np.random.randn(84, 120) * np.sqrt(2.0 / 120)
        self.b6 = np.zeros((84, 1))

        # Couche de sortie: 33 neurones
        self.W7 = np.random.randn(self.num_classes, 84) * np.sqrt(2.0 / 84)
        self.b7 = np.zeros((self.num_classes, 1))

    def tanh_activation(self, x):
        """Fonction d'activation tanh"""
        return np.tanh(x)

    def tanh_derivative(self, x):
        """Dérivée de la fonction tanh"""
        return 1 - np.tanh(x)**2

    def sigmoid_activation(self, x):
        """Fonction d'activation sigmoïde"""
        # Clip pour éviter overflow
        x = np.clip(x, -500, 500)
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        """Dérivée de la fonction sigmoïde"""
        s = self.sigmoid_activation(x)
        return s * (1 - s)

    def softmax(self, x):
        """Fonction softmax pour la couche de sortie"""
        # Stabilité numérique
        x = x - np.max(x, axis=0, keepdims=True)
        exp_x = np.exp(x)
        return exp_x / np.sum(exp_x, axis=0, keepdims=True)

    def conv2d(self, input_data, kernel, bias, stride=1, padding=0):
        """
        Convolution 2D
        input_data: (batch_size, channels, height, width)
        kernel: (out_channels, in_channels, kernel_height, kernel_width)
        """
        batch_size, in_channels, in_h, in_w = input_data.shape
        out_channels, _, k_h, k_w = kernel.shape

        # Padding
        if padding > 0:
            input_data = np.pad(input_data, ((0, 0), (0, 0), (padding, padding), (padding, padding)), 'constant')
            in_h += 2 * padding
            in_w += 2 * padding

        # Calcul des dimensions de sortie
        out_h = (in_h - k_h) // stride + 1
        out_w = (in_w - k_w) // stride + 1

        # Initialisation de la sortie
        output = np.zeros((batch_size, out_channels, out_h, out_w))

        # Convolution
        for b in range(batch_size):
            for oc in range(out_channels):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * stride
                        h_end = h_start + k_h
                        w_start = j * stride
                        w_end = w_start + k_w

                        # Convolution sur toutes les couches d'entrée
                        conv_sum = 0
                        for ic in range(in_channels):
                            conv_sum += np.sum(input_data[b, ic, h_start:h_end, w_start:w_end] * kernel[oc, ic])

                        output[b, oc, i, j] = conv_sum + bias[oc, 0]

        return output

    def avg_pool2d(self, input_data, pool_size=2, stride=2):
        """
        Average Pooling 2D
        """
        batch_size, channels, in_h, in_w = input_data.shape
        out_h = (in_h - pool_size) // stride + 1
        out_w = (in_w - pool_size) // stride + 1

        output = np.zeros((batch_size, channels, out_h, out_w))

        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * stride
                        h_end = h_start + pool_size
                        w_start = j * stride
                        w_end = w_start + pool_size

                        output[b, c, i, j] = np.mean(input_data[b, c, h_start:h_end, w_start:w_end])

        return output

    def forward(self, x):
        """
        Propagation avant
        """
        # Initialisation du cache s'il n'existe pas
        if not hasattr(self, 'cache'):
            self.cache = {}

        # Couche C1: Convolution + Tanh
        self.cache['z1'] = self.conv2d(x, self.W1, self.b1)
        self.cache['a1'] = self.tanh_activation(self.cache['z1'])

        # Couche S2: Average Pooling
        self.cache['a2'] = self.avg_pool2d(self.cache['a1'])

        # Couche C3: Convolution + Tanh
        self.cache['z3'] = self.conv2d(self.cache['a2'], self.W3, self.b3)
        self.cache['a3'] = self.tanh_activation(self.cache['z3'])

        # Couche S4: Average Pooling
        self.cache['a4'] = self.avg_pool2d(self.cache['a3'])

        # Flatten pour les couches fully connected
        batch_size = x.shape[0]
        self.cache['a4_flat'] = self.cache['a4'].reshape(batch_size, -1).T

        # Couche C5: Fully Connected + Tanh
        self.cache['z5'] = np.dot(self.W5, self.cache['a4_flat']) + self.b5
        self.cache['a5'] = self.tanh_activation(self.cache['z5'])

        # Couche F6: Fully Connected + Tanh
        self.cache['z6'] = np.dot(self.W6, self.cache['a5']) + self.b6
        self.cache['a6'] = self.tanh_activation(self.cache['z6'])

        # Couche de sortie: Fully Connected + Softmax
        self.cache['z7'] = np.dot(self.W7, self.cache['a6']) + self.b7
        self.cache['a7'] = self.softmax(self.cache['z7'])

        return self.cache['a7']

    def compute_loss(self, y_pred, y_true):
        """
        Calcul de la perte cross-entropy
        """
        batch_size = y_true.shape[1]
        # Éviter log(0)
        y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)
        loss = -np.sum(y_true * np.log(y_pred)) / batch_size
        return loss

    def backward(self, y_pred, y_true):
        """
        Rétropropagation des gradients
        """
        batch_size = y_true.shape[1]

        # Gradient de la couche de sortie
        dz7 = y_pred - y_true

        # Gradients pour W7 et b7
        dW7 = np.dot(dz7, self.cache['a6'].T) / batch_size
        db7 = np.sum(dz7, axis=1, keepdims=True) / batch_size

        # Gradient pour la couche F6
        da6 = np.dot(self.W7.T, dz7)
        dz6 = da6 * self.tanh_derivative(self.cache['z6'])

        # Gradients pour W6 et b6
        dW6 = np.dot(dz6, self.cache['a5'].T) / batch_size
        db6 = np.sum(dz6, axis=1, keepdims=True) / batch_size

        # Gradient pour la couche C5
        da5 = np.dot(self.W6.T, dz6)
        dz5 = da5 * self.tanh_derivative(self.cache['z5'])

        # Gradients pour W5 et b5
        dW5 = np.dot(dz5, self.cache['a4_flat'].T) / batch_size
        db5 = np.sum(dz5, axis=1, keepdims=True) / batch_size

        # Reshape le gradient pour la couche S4
        da4_flat = np.dot(self.W5.T, dz5)
        da4 = da4_flat.T.reshape(self.cache['a4'].shape)

        # Gradient pour la couche C3 (à travers S4)
        da3 = self.avg_pool_backward(da4, self.cache['a3'].shape)
        dz3 = da3 * self.tanh_derivative(self.cache['z3'])

        # Gradients pour W3 et b3
        dW3, db3 = self.conv_backward(dz3, self.cache['a2'], self.W3)

        # Gradient pour la couche C1 (à travers S2)
        da2 = self.conv_backward_input(dz3, self.W3, self.cache['a2'].shape)
        da1 = self.avg_pool_backward(da2, self.cache['a1'].shape)
        dz1 = da1 * self.tanh_derivative(self.cache['z1'])

        # Gradients pour W1 et b1
        dW1, db1 = self.conv_backward(dz1, self.cache['x'], self.W1)

        # Stockage des gradients
        self.grads = {
            'dW7': dW7, 'db7': db7,
            'dW6': dW6, 'db6': db6,
            'dW5': dW5, 'db5': db5,
            'dW3': dW3, 'db3': db3,
            'dW1': dW1, 'db1': db1
        }

    def conv_backward(self, dout, x, w):
        """
        Rétropropagation pour la convolution
        """
        batch_size, out_channels, out_h, out_w = dout.shape
        _, in_channels, in_h, in_w = x.shape
        _, _, k_h, k_w = w.shape

        dw = np.zeros_like(w)
        db = np.zeros((out_channels, 1))

        for b in range(batch_size):
            for oc in range(out_channels):
                db[oc] += np.sum(dout[b, oc])
                for i in range(out_h):
                    for j in range(out_w):
                        for ic in range(in_channels):
                            dw[oc, ic] += dout[b, oc, i, j] * x[b, ic, i:i+k_h, j:j+k_w]

        return dw / batch_size, db / batch_size

    def conv_backward_input(self, dout, w, input_shape):
        """
        Rétropropagation pour l'entrée de la convolution
        """
        batch_size, in_channels, in_h, in_w = input_shape
        _, out_channels, out_h, out_w = dout.shape
        _, _, k_h, k_w = w.shape

        dx = np.zeros(input_shape)

        for b in range(batch_size):
            for ic in range(in_channels):
                for oc in range(out_channels):
                    for i in range(out_h):
                        for j in range(out_w):
                            dx[b, ic, i:i+k_h, j:j+k_w] += dout[b, oc, i, j] * w[oc, ic]

        return dx

    def avg_pool_backward(self, dout, input_shape):
        """
        Rétropropagation pour l'average pooling
        """
        batch_size, channels, in_h, in_w = input_shape
        _, _, out_h, out_w = dout.shape

        dx = np.zeros(input_shape)
        pool_size = 2
        stride = 2

        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * stride
                        h_end = h_start + pool_size
                        w_start = j * stride
                        w_end = w_start + pool_size

                        # Distribuer le gradient uniformément
                        dx[b, c, h_start:h_end, w_start:w_end] += dout[b, c, i, j] / (pool_size * pool_size)

        return dx

    def update_parameters(self, optimizer='sgd'):
        """
        Mise à jour des paramètres
        """
        if optimizer == 'sgd':
            self.W7 -= self.learning_rate * self.grads['dW7']
            self.b7 -= self.learning_rate * self.grads['db7']
            self.W6 -= self.learning_rate * self.grads['dW6']
            self.b6 -= self.learning_rate * self.grads['db6']
            self.W5 -= self.learning_rate * self.grads['dW5']
            self.b5 -= self.learning_rate * self.grads['db5']
            self.W3 -= self.learning_rate * self.grads['dW3']
            self.b3 -= self.learning_rate * self.grads['db3']
            self.W1 -= self.learning_rate * self.grads['dW1']
            self.b1 -= self.learning_rate * self.grads['db1']
        elif optimizer == 'adam':
            self.adam_update()

    def adam_update(self):
        """
        Optimiseur Adam
        """
        if not hasattr(self, 'adam_initialized'):
            self.adam_initialized = True
            self.beta1 = 0.9
            self.beta2 = 0.999
            self.epsilon = 1e-8
            self.t = 0

            # Initialisation des moments
            self.m = {}
            self.v = {}
            for param in ['W7', 'b7', 'W6', 'b6', 'W5', 'b5', 'W3', 'b3', 'W1', 'b1']:
                self.m[param] = np.zeros_like(getattr(self, param))
                self.v[param] = np.zeros_like(getattr(self, param))

        self.t += 1

        # Mise à jour des paramètres avec Adam
        for param in ['W7', 'b7', 'W6', 'b6', 'W5', 'b5', 'W3', 'b3', 'W1', 'b1']:
            grad = self.grads[f'd{param}']

            # Mise à jour des moments
            self.m[param] = self.beta1 * self.m[param] + (1 - self.beta1) * grad
            self.v[param] = self.beta2 * self.v[param] + (1 - self.beta2) * (grad ** 2)

            # Correction du biais
            m_corrected = self.m[param] / (1 - self.beta1 ** self.t)
            v_corrected = self.v[param] / (1 - self.beta2 ** self.t)

            # Mise à jour des paramètres
            setattr(self, param, getattr(self, param) - self.learning_rate * m_corrected / (np.sqrt(v_corrected) + self.epsilon))

    def train(self, X_train, y_train, X_val, y_val, epochs=50, batch_size=32, optimizer='adam'):
        """
        Entraînement du modèle
        """
        print(f"Début de l'entraînement avec {len(X_train)} échantillons")

        for epoch in range(epochs):
            # Mélange des données
            indices = np.random.permutation(len(X_train))
            X_train_shuffled = X_train[indices]
            y_train_shuffled = y_train[indices]

            epoch_loss = 0
            epoch_correct = 0

            # Entraînement par batch
            for i in range(0, len(X_train), batch_size):
                batch_X = X_train_shuffled[i:i+batch_size]
                batch_y = y_train_shuffled[i:i+batch_size]

                # Reshape pour le réseau
                batch_X = batch_X.reshape(batch_X.shape[0], 1, 32, 32)

                # Forward pass
                self.cache['x'] = batch_X
                y_pred = self.forward(batch_X)

                # Calcul de la perte
                loss = self.compute_loss(y_pred, batch_y.T)
                epoch_loss += loss

                # Calcul de la précision
                predictions = np.argmax(y_pred, axis=0)
                true_labels = np.argmax(batch_y, axis=1)
                epoch_correct += np.sum(predictions == true_labels)

                # Backward pass
                self.backward(y_pred, batch_y.T)

                # Mise à jour des paramètres
                self.update_parameters(optimizer)

            # Calcul des métriques d'entraînement
            train_loss = epoch_loss / (len(X_train) // batch_size)
            train_acc = epoch_correct / len(X_train)

            # Évaluation sur l'ensemble de validation
            val_loss, val_acc = self.evaluate(X_val, y_val)

            # Stockage de l'historique
            self.train_loss_history.append(train_loss)
            self.val_loss_history.append(val_loss)
            self.train_acc_history.append(train_acc)
            self.val_acc_history.append(val_acc)

            if epoch % 5 == 0:
                print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    def evaluate(self, X, y):
        """
        Évaluation du modèle
        """
        # Vérification que le cache existe
        if not hasattr(self, 'cache'):
            self.cache = {}

        X = X.reshape(X.shape[0], 1, 32, 32)
        y_pred = self.forward(X)

        loss = self.compute_loss(y_pred, y.T)
        predictions = np.argmax(y_pred, axis=0)
        true_labels = np.argmax(y, axis=1)
        accuracy = np.mean(predictions == true_labels)

        return loss, accuracy

    def predict(self, X):
        """
        Prédiction
        """
        # Vérification que le cache existe
        if not hasattr(self, 'cache'):
            self.cache = {}

        X = X.reshape(X.shape[0], 1, 32, 32)
        y_pred = self.forward(X)
        return np.argmax(y_pred, axis=0)

# Classe pour la gestion des données
class TifinaghDataLoader:
    def __init__(self, zip_path):
        self.zip_path = zip_path
        self.label_encoder = LabelEncoder()

    def load_data(self):
        """
        Chargement des données depuis le fichier zip
        """
        images = []
        labels = []

        with zipfile.ZipFile(self.zip_path, 'r') as zip_ref:
            file_list = zip_ref.namelist()

            for file_name in file_list:
                if file_name.endswith(('.png', '.jpg', '.jpeg')):
                    # Extraction du label depuis le nom du fichier
                    label = file_name.split('/')[-2] if '/' in file_name else file_name.split('_')[0]

                    # Lecture de l'image
                    with zip_ref.open(file_name) as file:
                        img = Image.open(file).convert('L')  # Conversion en niveaux de gris
                        img = img.resize((32, 32))  # Redimensionnement
                        img_array = np.array(img) / 255.0  # Normalisation

                        images.append(img_array)
                        labels.append(label)

        images = np.array(images)
        labels = np.array(labels)

        # Encodage des labels
        labels_encoded = self.label_encoder.fit_transform(labels)

        return images, labels_encoded

    def one_hot_encode(self, labels, num_classes=33):
        """
        Encodage one-hot des labels
        """
        one_hot = np.zeros((len(labels), num_classes))
        for i, label in enumerate(labels):
            one_hot[i, label] = 1
        return one_hot

# Fonction de visualisation
def plot_training_curves(model):
    """
    Visualisation des courbes d'entraînement
    """
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

    # Courbes de perte
    ax1.plot(model.train_loss_history, label='Train Loss')
    ax1.plot(model.val_loss_history, label='Validation Loss')
    ax1.set_title('Évolution de la Perte')
    ax1.set_xlabel('Époque')
    ax1.set_ylabel('Perte')
    ax1.legend()
    ax1.grid(True)

    # Courbes de précision
    ax2.plot(model.train_acc_history, label='Train Accuracy')
    ax2.plot(model.val_acc_history, label='Validation Accuracy')
    ax2.set_title('Évolution de la Précision')
    ax2.set_xlabel('Époque')
    ax2.set_ylabel('Précision')
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, class_names):
    """
    Visualisation de la matrice de confusion
    """
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title('Matrice de Confusion')
    plt.ylabel('Vraie Classe')
    plt.xlabel('Classe Prédite')
    plt.show()

def visualize_feature_maps(model, sample_image):
    """
    Visualisation des cartes de caractéristiques
    """
    # Passage avant pour obtenir les activations
    sample_image = sample_image.reshape(1, 1, 32, 32)
    model.forward(sample_image)

    # Visualisation des cartes de la couche C1
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    fig.suptitle('Cartes de Caractéristiques - Couche C1', fontsize=16)

    for i in range(6):
        ax = axes[i//3, i%3]
        ax.imshow(model.cache['a1'][0, i], cmap='gray')
        ax.set_title(f'Filtre {i+1}')
        ax.axis('off')

    plt.tight_layout()
    plt.show()

    # Visualisation des cartes de la couche C3
    fig, axes = plt.subplots(4, 4, figsize=(16, 16))
    fig.suptitle('Cartes de Caractéristiques - Couche C3', fontsize=16)

    for i in range(16):
        ax = axes[i//4, i%4]
        ax.imshow(model.cache['a3'][0, i], cmap='gray')
        ax.set_title(f'Filtre {i+1}')
        ax.axis('off')

    plt.tight_layout()
    plt.show()

# Fonction principale
def main():
    # Chargement des données
    print("Chargement des données...")
    data_loader = TifinaghDataLoader('amhcd-data-64.zip')
    X, y = data_loader.load_data()

    print(f"Forme des données: {X.shape}")
    print(f"Nombre de classes: {len(np.unique(y))}")

    # Encodage one-hot
    y_one_hot = data_loader.one_hot_encode(y, num_classes=33)

    # Division des données
    X_train, X_temp, y_train, y_temp = train_test_split(X, y_one_hot, test_size=0.3, random_state=42)
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

    print(f"Taille d'entraînement: {X_train.shape[0]}")
    print(f"Taille de validation: {X_val.shape[0]}")
    print(f"Taille de test: {X_test.shape[0]}")

    # Initialisation du modèle
    model = LeNet5(learning_rate=0.001, num_classes=33)

    # Entraînement
    print("\nDébut de l'entraînement...")
    model.train(X_train, y_train, X_val, y_val, epochs=50, batch_size=32, optimizer='adam')

    # Évaluation finale
    print("\nÉvaluation finale...")
    test_loss, test_acc = model.evaluate(X_test, y_test)
    print(f"Précision sur l'ensemble de test: {test_acc:.4f}")

    # Visualisation des résultats
    plot_training_curves(model)

    # Prédictions pour la matrice de confusion
    y_pred = model.predict(X_test)
    y_true = np.argmax(y_test, axis=1)

    # Noms des classes (à adapter selon vos données)
    class_names = [str(i) for i in range(33)]
    plot_confusion_matrix(y_true, y_pred, class_names)

    # Visualisation des cartes de caractéristiques
    visualize_feature_maps(model, X_test[0])

    # Rapport de classification
    print("\nRapport de classification:")
    print(classification_report(y_true, y_pred))

    # Sauvegarde du modèle
    with open('lenet5_tifinagh_model.pkl', 'wb') as f:
        pickle.dump(model, f)
    print("Modèle sauvegardé dans 'lenet5_tifinagh_model.pkl'")

if __name__ == "__main__":
    main()

Chargement des données...
Forme des données: (28182, 32, 32)
Nombre de classes: 33
Taille d'entraînement: 19727
Taille de validation: 4227
Taille de test: 4228

Début de l'entraînement...
Début de l'entraînement avec 19727 échantillons


KeyboardInterrupt: 