# TP1 : modèles de diffusion en 2D

### Définition des données

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset, random_split

# Générer des données 2D
def generate_2d_data(n_samples=1000, noise_level=0.1):
    angles = np.linspace(0, 2 * np.pi, n_samples)
    x = np.stack([np.cos(angles), np.sin(angles)], axis=1)  # Points du cercle
    x += np.random.normal(0, noise_level, x.shape)  # Ajouter du bruit
    return torch.tensor(x, dtype=torch.float32)

# Préparer le dataset
data = generate_2d_data(10000)
dataset = TensorDataset(data)

# Diviser les données en ensembles d'entraînement, de validation et de test
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

### Définition du modèle de diffusion

In [None]:
# Modèle de Diffusion
class DiffusionModel(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        pass

    def forward(self, x, t):
        pass

# Fonction de perte
def diffusion_loss(noise_pred, noise):
    pass

### Entrainement du modèle de diffusion

In [None]:
# Initialiser le modèle et l'optimiseur
input_dim = 2  # Données en 2D
hidden_dim = 128
epochs = 2000
T = 100  # Nombre de pas de temps

diffusion_model = DiffusionModel(input_dim=input_dim, hidden_dim=hidden_dim)
optimizer = optim.Adam(diffusion_model.parameters(), lr=1e-4)

# Ajouter un learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

# Fonction pour sauvegarder le modèle
def save_model(model, path):
    torch.save(model.state_dict(), path)

# Fonction pour charger le modèle
def load_model(model, path):
    model.load_state_dict(torch.load(path))

# Chemin pour sauvegarder le meilleur modèle
best_model_path = 'best_diffusion_model.pth'
best_val_loss = float('inf')

# Entraînement du modèle
for epoch in range(epochs):
    diffusion_model.train()
    train_loss = 0
    for batch in train_loader:
        # FIXME 
        continue

    train_loss /= len(train_loader)

    # Validation
    diffusion_model.eval()
    val_loss = 0
    with torch.no_grad():
        for batch in val_loader:
            ## FIXME
            continue

    val_loss /= len(val_loader)

    # Mettre à jour le learning rate scheduler
    scheduler.step()

    # Sauvegarder le meilleur modèle
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        save_model(diffusion_model, best_model_path)

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

### Evaluation du modèle, génération de données

In [None]:
# Fonction pour débruiter les données en plusieurs étapes
def denoise_data_multi_step(model, x_noisy, T, steps):
    with torch.no_grad():
        x_denoised = x_noisy
        for step in range(steps):
            # FIXME
            continue
        return x_denoised

# Générer des données bruitées et débruitées par le modèle en plusieurs étapes
with torch.no_grad():
    t = torch.tensor([1.0] * data.size(0), dtype=torch.float32)  # Temps initial pour visualisation
    noise = torch.randn_like(data)
    x_noisy = data + noise * torch.sqrt(t).unsqueeze(1)
    steps = 100  # Nombre d'étapes de débruitage
    denoised_data = denoise_data_multi_step(diffusion_model, x_noisy, T, steps)

# Visualisation des résultats
plt.figure(figsize=(8, 6))
plt.scatter(data[:, 0], data[:, 1], color='red', alpha=0.6, label="Données originales", s=10)
plt.scatter(x_noisy[:, 0], x_noisy[:, 1], color='blue', alpha=0.6, label="Données bruitées", s=10)
plt.scatter(denoised_data[:, 0], denoised_data[:, 1], color='green', alpha=0.6, label="Données débruitées", s=10)

plt.title("Données 2D : Originales, Bruitées et Débruitées par le Modèle")
plt.xlabel("x1")
plt.ylabel("x2")
plt.legend()
plt.axis("equal")
plt.show()
