# Entraînement du Modèle de Dynamique du Drone

Ce notebook:
1. Charge les données d'entrée/sortie (séries temporelles)
2. Prépare et normalise les données
3. Entraîne un modèle (LSTM, GRU ou Transformer) au choix
4. Évalue les performances
5. Exporte le modèle pour utilisation dans le contrôleur MPC

## 1. Imports et Configuration

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')

# Configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')
np.random.seed(42)
torch.manual_seed(42)

## 2. Chargement des Données

**Format**: Chaque ligne = UNE série temporelle (séquence complète)

In [None]:
# Charger les données
data_in = np.loadtxt('bdd_in_mat_05.csv', delimiter=',')
data_out = np.loadtxt('bdd_out_mat_05.csv', delimiter=',')

print(f'Shape données entrée (inputs): {data_in.shape}')
print(f'Shape données sortie (outputs): {data_out.shape}')
print(f'Nombre de séries temporelles: {data_in.shape[0]}')
print(f'Longueur de chaque série: {data_in.shape[1]}')

# Afficher quelques statistiques
print(f'\nStats données entrée:')
print(f'  Min: {data_in.min():.4f}, Max: {data_in.max():.4f}, Mean: {data_in.mean():.4f}')
print(f'\nStats données sortie:')
print(f'  Min: {data_out.min():.4f}, Max: {data_out.max():.4f}, Mean: {data_out.mean():.4f}')

## 3. Préparation des Données

Créer des séquences chevauchantes (sliding window) pour l'entraînement séquentiel

In [None]:
# Paramètres de séquence
seq_length = 50  # Longueur des séquences d'entrée
pred_steps = 1   # Nombre de pas à prédire (1 = prédiction 1 pas en avant)
batch_size = 32
train_ratio = 0.8

def create_sequences(X, y, seq_len, pred_steps):
    """
    Crée des séquences chevauchantes à partir des données.
    
    Args:
        X: array (N_series, T) - données d'entrée
        y: array (N_series, T) - données de sortie
        seq_len: longueur des séquences d'entrée
        pred_steps: nombre de pas à prédire
    
    Returns:
        X_seq, y_seq: séquences pour l'entraînement
    """
    X_seq, y_seq = [], []
    
    for series_idx in range(X.shape[0]):
        for t in range(X.shape[1] - seq_len - pred_steps + 1):
            # Fenêtre d'entrée: [t, t+seq_len)
            X_seq.append(X[series_idx, t:t+seq_len])
            # Cible: sortie à t+seq_len
            y_seq.append(y[series_idx, t+seq_len:t+seq_len+pred_steps])
    
    return np.array(X_seq), np.array(y_seq)

# Créer les séquences
print('Création des séquences chevauchantes...')
X_seq, y_seq = create_sequences(data_in, data_out, seq_length, pred_steps)
print(f'X_seq shape: {X_seq.shape}')  # (N_samples, seq_length)
print(f'y_seq shape: {y_seq.shape}')  # (N_samples, pred_steps)

# Normalisation
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_seq_flat = X_seq.reshape(-1, 1)
X_seq_norm = scaler_X.fit_transform(X_seq_flat).reshape(X_seq.shape)

y_seq_flat = y_seq.reshape(-1, 1)
y_seq_norm = scaler_y.fit_transform(y_seq_flat).reshape(y_seq.shape)

print(f'\nAprès normalisation:')
print(f'X_seq_norm - Mean: {X_seq_norm.mean():.4f}, Std: {X_seq_norm.std():.4f}')
print(f'y_seq_norm - Mean: {y_seq_norm.mean():.4f}, Std: {y_seq_norm.std():.4f}')

## 4. Train / Test Split

In [None]:
# Split train/test
n_train = int(len(X_seq_norm) * train_ratio)
X_train = X_seq_norm[:n_train]
y_train = y_seq_norm[:n_train]
X_test = X_seq_norm[n_train:]
y_test = y_seq_norm[n_train:]

print(f'Train set: {X_train.shape[0]} samples')
print(f'Test set: {X_test.shape[0]} samples')

# Convertir en tensors PyTorch
X_train_t = torch.FloatTensor(X_train).unsqueeze(-1).to(device)  # (N, seq_len, 1)
y_train_t = torch.FloatTensor(y_train).to(device)                # (N, pred_steps)
X_test_t = torch.FloatTensor(X_test).unsqueeze(-1).to(device)
y_test_t = torch.FloatTensor(y_test).to(device)

print(f'\nX_train_t shape: {X_train_t.shape}')
print(f'y_train_t shape: {y_train_t.shape}')

# DataLoader
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(X_test_t, y_test_t)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

## 5. Définition des Modèles

In [None]:
class LSTMModel(nn.Module):
    """Modèle LSTM pour prédiction de séries temporelles"""
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, output_size)
        )
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        # Utiliser la dernière sortie du LSTM
        last_hidden = lstm_out[:, -1, :]
        output = self.fc(last_hidden)
        return output

class GRUModel(nn.Module):
    """Modèle GRU pour prédiction de séries temporelles"""
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, output_size)
        )
    
    def forward(self, x):
        gru_out, _ = self.gru(x)
        last_hidden = gru_out[:, -1, :]
        output = self.fc(last_hidden)
        return output

class TransformerModel(nn.Module):
    """Modèle Transformer pour prédiction de séries temporelles"""
    def __init__(self, input_size=1, d_model=64, nhead=4, num_layers=2, output_size=1, seq_length=50):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_size, d_model)
        self.pos_encoder = nn.Parameter(torch.randn(1, seq_length, d_model))
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=128, 
            dropout=0.2, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Sequential(
            nn.Linear(d_model, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, output_size)
        )
    
    def forward(self, x):
        # Embedding
        x = self.embedding(x)
        # Positional encoding
        x = x + self.pos_encoder
        # Transformer
        trans_out = self.transformer(x)
        # Utiliser le dernier token
        last_hidden = trans_out[:, -1, :]
        output = self.fc(last_hidden)
        return output

print('Modèles définis: LSTM, GRU, Transformer')

## 6. Sélection du Modèle et Paramètres d'Entraînement

⬇️ **CHOISISSEZ ICI VOTRE MODÈLE**

In [None]:
# ====== CHOISIR LE MODÈLE ======
MODEL_TYPE = 'LSTM'  # Options: 'LSTM', 'GRU', 'Transformer'

# Hyperparamètres
hidden_size = 64
num_layers = 2
learning_rate = 1e-3
epochs = 50

# Créer le modèle
if MODEL_TYPE == 'LSTM':
    model = LSTMModel(input_size=1, hidden_size=hidden_size, num_layers=num_layers, output_size=1)
elif MODEL_TYPE == 'GRU':
    model = GRUModel(input_size=1, hidden_size=hidden_size, num_layers=num_layers, output_size=1)
elif MODEL_TYPE == 'Transformer':
    model = TransformerModel(input_size=1, d_model=hidden_size, nhead=4, num_layers=num_layers, 
                            output_size=1, seq_length=seq_length)
else:
    raise ValueError(f"Modèle non reconnu: {MODEL_TYPE}")

model = model.to(device)
print(f'Modèle: {MODEL_TYPE}')
print(f'Nombre de paramètres: {sum(p.numel() for p in model.parameters()):,}')
print(f'\n{model}')

## 7. Entraînement du Modèle

In [None]:
# Loss et optimiseur
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)

# Historique
train_losses = []
test_losses = []

print(f'Entraînement sur {epochs} epochs...')
print(f'Learning rate: {learning_rate}')
print('-' * 60)

for epoch in range(epochs):
    # ====== TRAIN ======
    model.train()
    train_loss = 0.0
    
    for X_batch, y_batch in train_loader:
        # Forward
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        
        # Backward
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        train_loss += loss.item() * X_batch.size(0)
    
    train_loss /= len(train_loader.dataset)
    train_losses.append(train_loss)
    
    # ====== TEST ======
    model.eval()
    test_loss = 0.0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item() * X_batch.size(0)
    
    test_loss /= len(test_loader.dataset)
    test_losses.append(test_loss)
    
    scheduler.step(test_loss)
    
    if (epoch + 1) % 5 == 0:
        print(f'Epoch {epoch+1:3d}/{epochs} | Train Loss: {train_loss:.6f} | Test Loss: {test_loss:.6f}')

print('-' * 60)
print('✓ Entraînement terminé!')

## 8. Visualisation des Pertes d'Entraînement

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Train Loss', linewidth=2, alpha=0.8)
plt.plot(test_losses, label='Test Loss', linewidth=2, alpha=0.8)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (MSE)', fontsize=12)
plt.title(f'Courbes d\'entraînement - {MODEL_TYPE}', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'training_loss_{MODEL_TYPE}.png', dpi=150, bbox_inches='tight')
plt.show()

print(f'Train Loss initial: {train_losses[0]:.6f}')
print(f'Train Loss final:   {train_losses[-1]:.6f}')
print(f'Test Loss initial:  {test_losses[0]:.6f}')
print(f'Test Loss final:    {test_losses[-1]:.6f}')

## 9. Évaluation du Modèle

In [None]:
model.eval()

with torch.no_grad():
    # Prédictions train
    y_train_pred_norm = model(X_train_t).cpu().numpy()
    y_train_true_norm = y_train_t.cpu().numpy()
    
    # Prédictions test
    y_test_pred_norm = model(X_test_t).cpu().numpy()
    y_test_true_norm = y_test_t.cpu().numpy()

# Inverse transform pour échelle originale
y_train_pred = scaler_y.inverse_transform(y_train_pred_norm)
y_train_true = scaler_y.inverse_transform(y_train_true_norm)
y_test_pred = scaler_y.inverse_transform(y_test_pred_norm)
y_test_true = scaler_y.inverse_transform(y_test_true_norm)

# Métriques
train_mse = mean_squared_error(y_train_true, y_train_pred)
train_mae = mean_absolute_error(y_train_true, y_train_pred)
train_r2 = r2_score(y_train_true, y_train_pred)

test_mse = mean_squared_error(y_test_true, y_test_pred)
test_mae = mean_absolute_error(y_test_true, y_test_pred)
test_r2 = r2_score(y_test_true, y_test_pred)

print('=' * 60)
print(f'MÉTRIQUES D\'ÉVALUATION - {MODEL_TYPE}')
print('=' * 60)
print(f'\nTRAIN SET:')
print(f'  MSE:  {train_mse:.6f}')
print(f'  MAE:  {train_mae:.6f}')
print(f'  R²:   {train_r2:.6f}')
print(f'\nTEST SET:')
print(f'  MSE:  {test_mse:.6f}')
print(f'  MAE:  {test_mae:.6f}')
print(f'  R²:   {test_r2:.6f}')
print('=' * 60)

## 10. Visualisation des Prédictions

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# ===== Train vs Prédictions (Train) =====
axes[0, 0].scatter(y_train_true, y_train_pred, alpha=0.5, s=20)
axes[0, 0].plot([y_train_true.min(), y_train_true.max()], 
                [y_train_true.min(), y_train_true.max()], 
                'r--', lw=2, label='Perfect Prediction')
axes[0, 0].set_xlabel('Vraie Valeur', fontsize=11)
axes[0, 0].set_ylabel('Prédiction', fontsize=11)
axes[0, 0].set_title(f'Train Set (R² = {train_r2:.4f})', fontsize=12, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# ===== Test vs Prédictions (Test) =====
axes[0, 1].scatter(y_test_true, y_test_pred, alpha=0.5, s=20, color='orange')
axes[0, 1].plot([y_test_true.min(), y_test_true.max()], 
               [y_test_true.min(), y_test_true.max()], 
               'r--', lw=2, label='Perfect Prediction')
axes[0, 1].set_xlabel('Vraie Valeur', fontsize=11)
axes[0, 1].set_ylabel('Prédiction', fontsize=11)
axes[0, 1].set_title(f'Test Set (R² = {test_r2:.4f})', fontsize=12, fontweight='bold')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# ===== Résidus (Train) =====
residuals_train = y_train_true - y_train_pred
axes[1, 0].hist(residuals_train, bins=30, alpha=0.7, color='blue', edgecolor='black')
axes[1, 0].axvline(0, color='r', linestyle='--', linewidth=2)
axes[1, 0].set_xlabel('Résidu', fontsize=11)
axes[1, 0].set_ylabel('Fréquence', fontsize=11)
axes[1, 0].set_title(f'Distribution des Résidus Train (σ = {residuals_train.std():.4f})', fontsize=12, fontweight='bold')
axes[1, 0].grid(True, alpha=0.3, axis='y')

# ===== Résidus (Test) =====
residuals_test = y_test_true - y_test_pred
axes[1, 1].hist(residuals_test, bins=30, alpha=0.7, color='orange', edgecolor='black')
axes[1, 1].axvline(0, color='r', linestyle='--', linewidth=2)
axes[1, 1].set_xlabel('Résidu', fontsize=11)
axes[1, 1].set_ylabel('Fréquence', fontsize=11)
axes[1, 1].set_title(f'Distribution des Résidus Test (σ = {residuals_test.std():.4f})', fontsize=12, fontweight='bold')
axes[1, 1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig(f'model_evaluation_{MODEL_TYPE}.png', dpi=150, bbox_inches='tight')
plt.show()

## 11. Séquences Temporelles - Quelques Exemples

In [None]:
# Afficher quelques prédictions sur des séquences test
n_examples = 5
fig, axes = plt.subplots(n_examples, 1, figsize=(14, 12))

indices = np.random.choice(len(y_test_true), n_examples, replace=False)

for idx, i in enumerate(indices):
    # Prédiction vs vrai
    axes[idx].bar(['Vraie Valeur', 'Prédiction'], 
                   [y_test_true[i, 0], y_test_pred[i, 0]], 
                   color=['blue', 'orange'], alpha=0.7, edgecolor='black', linewidth=1.5)
    axes[idx].set_ylabel('Sortie (accélération)', fontsize=11)
    axes[idx].set_title(f'Exemple {idx+1} - Erreur: {abs(y_test_true[i, 0] - y_test_pred[i, 0]):.6f}', fontsize=11)
    axes[idx].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig(f'prediction_examples_{MODEL_TYPE}.png', dpi=150, bbox_inches='tight')
plt.show()

## 12. Exportation du Modèle

In [None]:
# Créer un dictionnaire avec tous les éléments nécessaires
export_dict = {
    'model_type': MODEL_TYPE,
    'model_state_dict': model.state_dict(),
    'hidden_size': hidden_size,
    'num_layers': num_layers,
    'seq_length': seq_length,
    'scaler_X_mean': scaler_X.mean_,
    'scaler_X_scale': scaler_X.scale_,
    'scaler_y_mean': scaler_y.mean_,
    'scaler_y_scale': scaler_y.scale_,
    'metrics': {
        'train_mse': float(train_mse),
        'train_mae': float(train_mae),
        'train_r2': float(train_r2),
        'test_mse': float(test_mse),
        'test_mae': float(test_mae),
        'test_r2': float(test_r2)
    }
}

# Sauvegarder
model_filename = f'drone_model_{MODEL_TYPE}.pt'
torch.save(export_dict, model_filename)
print(f'✓ Modèle sauvegardé: {model_filename}')
print(f'\nTaille du fichier: {np.prod(export_dict["model_state_dict"][list(export_dict["model_state_dict"].keys())[0]].shape)} paramètres')

## 13. Résumé du Modèle Exporté

In [None]:
print('\n' + '='*70)
print('RÉSUMÉ - MODÈLE EXPORTÉ')
print('='*70)
print(f'\nType de modèle: {MODEL_TYPE}')
print(f'Fichier: {model_filename}')
print(f'\nArchitecture:')
print(f'  - Taille cachée: {hidden_size}')
print(f'  - Nombre de couches: {num_layers}')
print(f'  - Longueur de séquence: {seq_length}')
print(f'\nPerformances:')
print(f'  Test MAE: {test_mae:.6f}')
print(f'  Test R²:  {test_r2:.6f}')
print(f'\nNormalisation stockée:')
print(f'  - Scaler entrée (X): mean={scaler_X.mean_[0]:.6f}, scale={scaler_X.scale_[0]:.6f}')
print(f'  - Scaler sortie (y): mean={scaler_y.mean_[0]:.6f}, scale={scaler_y.scale_[0]:.6f}')
print(f'\n✓ Le modèle est prêt à être utilisé dans le contrôleur MPC!')
print('='*70)