In [9]:
# Cellule 1: Imports et configuration
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torchvision
import matplotlib.pyplot as plt
import numpy as np
import onnx
import onnxruntime as ort
from IPython.display import display, HTML
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import time
from torch.utils.tensorboard import SummaryWriter
import os

# Configuration de matplotlib pour de meilleurs graphiques
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# Configuration TensorBoard
log_dir = "runs/mnist_experiment"
if os.path.exists(log_dir):
    import shutil
    shutil.rmtree(log_dir)
os.makedirs(log_dir, exist_ok=True)

# Configuration du device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Utilisation du device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

print(f"TensorBoard logs seront sauvegardés dans: {log_dir}")
print("Pour visualiser les métriques: tensorboard --logdir=runs")

Utilisation du device: cuda
GPU: NVIDIA GeForce RTX 2070
TensorBoard logs seront sauvegardés dans: runs/mnist_experiment
Pour visualiser les métriques: tensorboard --logdir=runs


In [None]:
# Cellule 2: Définition du modèle OPTIMISÉ avec Sequential
from collections import OrderedDict

class EnhancedMNISTNet(nn.Module):
    def __init__(self):
        super(EnhancedMNISTNet, self).__init__()
        
        # Bloc convolutionnel 1 - Extraction de caractéristiques de bas niveau
        self.conv_block1 = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(1, 64, 3, padding=1)),#1, 28, 28 ->64, 28, 28
            ('bn1', nn.BatchNorm2d(64)),
            ('relu1', nn.ReLU(inplace=True)),
            ('conv2', nn.Conv2d(64, 64, 3, padding=1)), #64, 28, 28
            ('bn2', nn.BatchNorm2d(64)),
            ('relu2', nn.ReLU(inplace=True)),
            ('pool1', nn.MaxPool2d(2, 2)), #64, 14, 14
            ('dropout1', nn.Dropout2d(0.1))
        ]))
        
        # Bloc convolutionnel 2 - Caractéristiques de niveau moyen
        self.conv_block2 = nn.Sequential(OrderedDict([
            ('conv3', nn.Conv2d(64, 128, 3, padding=1)), #128, 14, 14
            ('bn3', nn.BatchNorm2d(128)),
            ('relu3', nn.ReLU(inplace=True)),
            ('conv4', nn.Conv2d(128, 128, 3, padding=1)),
            ('bn4', nn.BatchNorm2d(128)),
            ('relu4', nn.ReLU(inplace=True)),
            ('pool2', nn.MaxPool2d(2, 2)),
            ('dropout2', nn.Dropout2d(0.15))
        ]))
        
        # Bloc convolutionnel 3 - Caractéristiques de haut niveau
        self.conv_block3 = nn.Sequential(OrderedDict([
            ('conv5', nn.Conv2d(128, 256, 3, padding=1)),
            ('bn5', nn.BatchNorm2d(256)),
            ('relu5', nn.ReLU(inplace=True)),
            ('conv6', nn.Conv2d(256, 256, 3, padding=1)),
            ('bn6', nn.BatchNorm2d(256)),
            ('relu6', nn.ReLU(inplace=True)),
            ('pool3', nn.MaxPool2d(2, 2)),
            ('dropout3', nn.Dropout2d(0.2))
        ]))
        
        # Couches fully connected
        self.classifier = nn.Sequential(OrderedDict([
            ('flatten', nn.Flatten()),
            ('fc1', nn.Linear(256 * 3 * 3, 1024)),
            ('bn_fc1', nn.BatchNorm1d(1024)),
            ('relu_fc1', nn.ReLU(inplace=True)),
            ('dropout4', nn.Dropout(0.4)),
            ('fc2', nn.Linear(1024, 512)),
            ('bn_fc2', nn.BatchNorm1d(512)),
            ('relu_fc2', nn.ReLU(inplace=True)),
            ('dropout5', nn.Dropout(0.3)),
            ('fc3', nn.Linear(512, 256)),
            ('bn_fc3', nn.BatchNorm1d(256)),
            ('relu_fc3', nn.ReLU(inplace=True)),
            ('dropout6', nn.Dropout(0.2)),
            ('fc4', nn.Linear(256, 10)),
            ('log_softmax', nn.LogSoftmax(dim=1))
        ]))

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.classifier(x)
        return x

# Créer le modèle optimisé
model = EnhancedMNISTNet()
print("Architecture du modèle OPTIMISÉ avec Sequential:")
print(model)

# Calculer le nombre de paramètres
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"\nParamètres totaux: {total_params:,}")
print(f"Paramètres entraînables: {trainable_params:,}")

# Ajouter le modèle au TensorBoard
writer = SummaryWriter(log_dir)
dummy_input = torch.randn(1, 1, 28, 28)
writer.add_graph(model, dummy_input)
print("Graphe du modèle optimisé ajouté à TensorBoard")

Architecture du modèle OPTIMISÉ avec Sequential:
EnhancedMNISTNet(
  (conv_block1): Sequential(
    (conv1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu1): ReLU(inplace=True)
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu2): ReLU(inplace=True)
    (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (dropout1): Dropout2d(p=0.1, inplace=False)
  )
  (conv_block2): Sequential(
    (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu3): ReLU(inplace=True)
    (conv4): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (bn4): BatchNorm2d(128, eps=1e-05, momentum

In [11]:
# Transformations d'augmentation pour l'entraînement
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,)),
])

train_dataset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('data', train=False, transform=transform)

# Suppression du multiprocessing pour éviter les erreurs de pickle
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False, pin_memory=True)

print(f"Données d'entraînement: {len(train_dataset)} échantillons")
print(f"Données de test: {len(test_dataset)} échantillons")
print("AUGMENTATION DE DONNÉES ACTIVÉE pour un surentraînement optimal!")

# Ajouter des échantillons à TensorBoard
dataiter = iter(train_loader)
images, labels = next(dataiter)

# Créer une grille d'images pour TensorBoard
img_grid = torchvision.utils.make_grid(images[:32])
writer.add_image('MNIST_Images_Sample_Enhanced', img_grid)

# Ajouter les embeddings des données
writer.add_embedding(images.view(-1, 28*28)[:100], 
                    metadata=labels[:100].tolist(),
                    tag='MNIST_Embeddings_Enhanced')

# Distribution des classes
labels_list = [train_dataset[i][1] for i in range(len(train_dataset))]
class_counts = np.bincount(labels_list)

# Ajouter l'histogramme des classes à TensorBoard
for i, count in enumerate(class_counts):
    writer.add_scalar(f'Dataset/Class_{i}_Count', count, 0)

print("Échantillons et statistiques d'entraînement renforcé ajoutés à TensorBoard")

Données d'entraînement: 60000 échantillons
Données de test: 10000 échantillons
AUGMENTATION DE DONNÉES ACTIVÉE pour un surentraînement optimal!
Échantillons et statistiques d'entraînement renforcé ajoutés à TensorBoard
Échantillons et statistiques d'entraînement renforcé ajoutés à TensorBoard


In [12]:
# Cellule 4: Fonctions d'entraînement et de test RENFORCÉES
class EnhancedTensorBoardTracker:
    def __init__(self, writer):
        self.writer = writer
        self.train_losses = []
        self.test_losses = []
        self.test_accuracies = []
        self.learning_rates = []
        self.best_accuracy = 0
        
    def update(self, epoch, train_loss, test_loss, test_acc, lr, model=None):
        self.train_losses.append(train_loss)
        self.test_losses.append(test_loss)
        self.test_accuracies.append(test_acc)
        self.learning_rates.append(lr)
        
        # Suivre la meilleure accuracy
        if test_acc > self.best_accuracy:
            self.best_accuracy = test_acc
            print(f"NOUVELLE MEILLEURE ACCURACY: {test_acc:.3f}%")
        
        # Enregistrer les métriques dans TensorBoard
        self.writer.add_scalar('Loss/Train', train_loss, epoch)
        self.writer.add_scalar('Loss/Test', test_loss, epoch)
        self.writer.add_scalar('Accuracy/Test', test_acc, epoch)
        self.writer.add_scalar('Accuracy/Best', self.best_accuracy, epoch)
        self.writer.add_scalar('Learning_Rate', lr, epoch)
        
        # Ajouter les histogrammes des poids et gradients
        if model is not None:
            for name, param in model.named_parameters():
                self.writer.add_histogram(f'Weights/{name}', param, epoch)
                if param.grad is not None:
                    self.writer.add_histogram(f'Gradients/{name}', param.grad, epoch)
                    # Surveillance des gradients
                    grad_norm = param.grad.norm().item()
                    self.writer.add_scalar(f'Gradient_Norms/{name}', grad_norm, epoch)

def train_epoch_enhanced(model, device, train_loader, optimizer, epoch, tracker=None, scheduler=None):
    model.train()
    total_loss = 0
    correct = 0
    processed = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
        
        output = model(data)
        loss = F.nll_loss(output, target)
        
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        total_loss += loss.item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        processed += len(data)
        
        if batch_idx % 100 == 0:  # Affichage toutes les 100 itérations
            current_acc = 100. * correct / processed
            print(f'Époque {epoch}, Batch {batch_idx}/{len(train_loader)}, '
                  f'Loss: {loss.item():.6f}, Accuracy: {current_acc:.2f}%')
            
            # Ajouter la loss par batch à TensorBoard
            global_step = (epoch - 1) * len(train_loader) + batch_idx
            if tracker and tracker.writer:
                tracker.writer.add_scalar('Loss/Train_Batch', loss.item(), global_step)
                tracker.writer.add_scalar('Accuracy/Train_Batch', current_acc, global_step)
    
    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / len(train_loader.dataset)
    
    return avg_loss, accuracy

def test_model_enhanced(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []
    all_confidences = []
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
            
            # Calculer les confidences
            probs = torch.exp(output)
            confidences = probs.max(dim=1)[0]
            all_confidences.extend(confidences.cpu().numpy())
            
            all_preds.extend(pred.cpu().numpy().flatten())
            all_targets.extend(target.cpu().numpy())

    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    avg_confidence = np.mean(all_confidences)
    
    print(f'Test Loss: {test_loss:.4f}, Accuracy: {accuracy:.3f}%, Confiance moyenne: {avg_confidence:.3f}')
    
    return test_loss, accuracy, all_preds, all_targets, avg_confidence

In [13]:
# Cellule 5: Entraînement optimisé avec TensorBoard
def train_model_with_tensorboard():
    print("DÉBUT DE L'ENTRAÎNEMENT OPTIMISÉ AVEC TENSORBOARD")
    print("="*60)
    
    # Initialiser le modèle renforcé
    model = EnhancedMNISTNet().to(device)
    
    # Optimiseur avec paramètres équilibrés
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4, betas=(0.9, 0.999))
    
    # Scheduler adapté pour 10 époques
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer, max_lr=0.005, epochs=10, steps_per_epoch=len(train_loader),
        pct_start=0.3, anneal_strategy='cos'
    )
    
    tracker = EnhancedTensorBoardTracker(writer)
    
    # Entraînement optimisé: 10 époques
    num_epochs = 10
    start_time = time.time()
    
    print(f"Entraînement configuré pour {num_epochs} époques avec {len(train_loader)} batchs par époque")
    
    for epoch in range(1, num_epochs + 1):
        print(f"\n{'='*50}")
        print(f"ÉPOQUE {epoch}/{num_epochs}")
        print(f"{'='*50}")
        
        # Entraînement avec la fonction améliorée
        train_loss, train_acc = train_epoch_enhanced(
            model, device, train_loader, optimizer, epoch, tracker, scheduler
        )
        
        # Test avec métriques étendues
        test_loss, test_acc, preds, targets, avg_conf = test_model_enhanced(
            model, device, test_loader
        )
        
        # Update du scheduler
        scheduler.step()
        current_lr = optimizer.param_groups[0]['lr']
        
        # Tracking avec TensorBoard
        tracker.update(epoch, train_loss, test_loss, test_acc, current_lr, model)
        
        # Ajouter métriques additionnelles
        writer.add_scalar('Confidence/Average', avg_conf, epoch)
        writer.add_scalar('Training/Train_Accuracy', train_acc, epoch)
        
        # Matrices de confusion toutes les 3 époques
        if epoch % 3 == 0 or epoch == 1:
            cm = confusion_matrix(targets, preds)
            fig, ax = plt.subplots(figsize=(10, 8))
            sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax, cbar_kws={'shrink': 0.8})
            ax.set_title(f'Matrice de Confusion - Époque {epoch}\nAccuracy: {test_acc:.3f}%')
            ax.set_ylabel('Vraie classe')
            ax.set_xlabel('Classe prédite')
            writer.add_figure(f'Confusion_Matrix/Epoch_{epoch}', fig, epoch)
            plt.close(fig)
        
        print(f"Époque {epoch} terminée - Train Acc: {train_acc:.3f}%, Test Acc: {test_acc:.3f}%")
        print(f"Confiance moyenne: {avg_conf:.3f}, LR: {current_lr:.6f}")
    
    total_time = time.time() - start_time
    print(f"\n{'='*50}")
    print(f"ENTRAÎNEMENT TERMINÉ en {total_time:.2f} secondes!")
    print(f"Accuracy finale: {tracker.test_accuracies[-1]:.3f}%")
    print(f"MEILLEURE ACCURACY ATTEINTE: {tracker.best_accuracy:.3f}%")
    print(f"{'='*50}")
    
    # Rapport de classification final
    print("\nRapport de classification FINAL:")
    report = classification_report(targets, preds, target_names=[str(i) for i in range(10)], digits=3)
    print(report)
    
    # Ajouter le rapport de classification à TensorBoard
    class_report = classification_report(targets, preds, target_names=[str(i) for i in range(10)], output_dict=True)
    for class_name, metrics in class_report.items():
        if isinstance(metrics, dict):
            for metric_name, value in metrics.items():
                if isinstance(value, (int, float)):
                    writer.add_scalar(f'Final_Classification_Report/{class_name}_{metric_name}', value, num_epochs)
    
    return model, tracker

In [14]:
# Cellule 7: Export ONNX avec validation
def export_and_validate_onnx(model, device, model_path='mnist_model.onnx'):
    print("Export du modèle en ONNX...")
    
    model.eval()
    dummy_input = torch.randn(1, 1, 28, 28).to(device)
    
    # Export ONNX
    torch.onnx.export(
        model,
        dummy_input,
        model_path,
        export_params=True,
        opset_version=11,
        do_constant_folding=True,
        input_names=['input'],
        output_names=['output'],
        dynamic_axes={
            'input': {0: 'batch_size'},
            'output': {0: 'batch_size'}
        }
    )
    
    print(f"Modèle exporté vers {model_path}")
    
    # Validation ONNX
    onnx_model = onnx.load(model_path)
    onnx.checker.check_model(onnx_model)
    print("Modèle ONNX validé!")
    
    # Test de comparaison PyTorch vs ONNX
    print("Comparaison PyTorch vs ONNX...")
    
    ort_session = ort.InferenceSession(model_path)
    
    # Test sur quelques échantillons
    test_data, test_targets = next(iter(test_loader))
    test_sample = test_data[:5].to(device)
    
    # Prédiction PyTorch
    with torch.no_grad():
        pytorch_output = model(test_sample)
        pytorch_pred = pytorch_output.argmax(dim=1).cpu().numpy()
    
    # Prédiction ONNX
    ort_inputs = {ort_session.get_inputs()[0].name: test_sample.cpu().numpy()}
    onnx_output = ort_session.run(None, ort_inputs)[0]
    onnx_pred = np.argmax(onnx_output, axis=1)
    
    # Comparaison
    print("Comparaison des prédictions:")
    matches = 0
    for i in range(5):
        match = pytorch_pred[i] == onnx_pred[i]
        matches += match
        print(f"Échantillon {i+1}: PyTorch={pytorch_pred[i]}, ONNX={onnx_pred[i]}, "
              f"Match={'OUI' if match else 'NON'}")
    
    # Différence dans les probabilités
    pytorch_probs = torch.exp(pytorch_output).cpu().numpy()
    onnx_probs = np.exp(onnx_output)  # Convertir log_softmax en probabilités
    
    max_diff = np.max(np.abs(pytorch_probs - onnx_probs))
    print(f"Différence maximale entre les probabilités: {max_diff:.6f}")
    
    if max_diff < 1e-5:
        print("Les modèles PyTorch et ONNX produisent des résultats identiques!")
    else:
        print("Petites différences détectées (normal due à la précision)")
    
    # Ajouter les résultats de comparaison à TensorBoard
    writer.add_scalar('ONNX_Validation/Prediction_Matches', matches, 0)
    writer.add_scalar('ONNX_Validation/Max_Probability_Difference', max_diff, 0)
    
    return model_path

In [None]:
# Cellule 9: Fonction principale pour exécuter tout le pipeline

print("DÉMARRAGE DU PIPELINE COMPLET MNIST")
print("="*60)

# 1. Entraînement OPTIMISÉ
model, tracker = train_model_with_tensorboard()

# 2. Analyse des prédictions
print("ANALYSE DES PRÉDICTIONS")
print("="*40)

# 3. Export ONNX
print("EXPORT ONNX")
print("="*40)
onnx_path = export_and_validate_onnx(model, device)

# 4. Sauvegarde PyTorch
torch.save(model.state_dict(), 'mnist_model.pth')
print("Modèle PyTorch sauvegardé: mnist_model.pth")

# 5. Instructions de déploiement
print("INSTRUCTIONS DE DÉPLOIEMENT")
print("="*40)

print("PIPELINE TERMINÉ AVEC SUCCÈS!")
print(f"Accuracy finale: {tracker.test_accuracies[-1]:.2f}%")
print(f"Fichiers générés:")
print(f"   - mnist_model.pth (modèle PyTorch)")
print(f"   - {onnx_path} (modèle ONNX pour le web)")

DÉMARRAGE DU PIPELINE COMPLET MNIST
DÉBUT DE L'ENTRAÎNEMENT OPTIMISÉ AVEC TENSORBOARD
Entraînement configuré pour 10 époques avec 469 batchs par époque

ÉPOQUE 1/10
Époque 1, Batch 0/469, Loss: 2.397455, Accuracy: 12.50%
Époque 1, Batch 0/469, Loss: 2.397455, Accuracy: 12.50%
Époque 1, Batch 100/469, Loss: 0.271298, Accuracy: 77.48%
Époque 1, Batch 100/469, Loss: 0.271298, Accuracy: 77.48%
Époque 1, Batch 200/469, Loss: 0.094768, Accuracy: 87.18%
Époque 1, Batch 200/469, Loss: 0.094768, Accuracy: 87.18%
Époque 1, Batch 300/469, Loss: 0.048194, Accuracy: 90.68%
Époque 1, Batch 300/469, Loss: 0.048194, Accuracy: 90.68%
Époque 1, Batch 400/469, Loss: 0.089169, Accuracy: 92.51%
Époque 1, Batch 400/469, Loss: 0.089169, Accuracy: 92.51%
Test Loss: 0.0299, Accuracy: 99.130%, Confiance moyenne: 0.988
NOUVELLE MEILLEURE ACCURACY: 99.130%
Test Loss: 0.0299, Accuracy: 99.130%, Confiance moyenne: 0.988
NOUVELLE MEILLEURE ACCURACY: 99.130%
Époque 1 terminée - Train Acc: 93.335%, Test Acc: 99.130%
C

: 