In [None]:
mport numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.optim import Adam
from sklearn.datasets import make_swiss_roll

# Generate Swiss roll dataset
def generate_swiss_roll(n_samples=1000):
    data, _ = make_swiss_roll(n_samples, noise=0.1)
    return data[:, [0, 2]]  # Use only two dimensions for simplicity

# Forward diffusion process: Add Gaussian noise
def forward_diffusion(x, t, beta):
    alpha = 1 - beta
    alpha_bar = np.cumprod(alpha[:t])
    noise = np.random.normal(size=x.shape)
    x_t = np.sqrt(alpha_bar)[:, None] * x + np.sqrt(1 - alpha_bar)[:, None] * noise
    return x_t

# Neural network to predict noise
class DiffusionModel(nn.Module):
    def __init__(self, input_dim):
        super(DiffusionModel, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )

    def forward(self, x, t):
        return self.net(torch.cat([x, t], dim=-1))

# Training loop
def train_diffusion_model(model, optimizer, x, beta, n_steps, epochs):
    mse_loss = nn.MSELoss()
    for epoch in range(epochs):
        total_loss = 0
        for _ in range(len(x) // 64):  # Mini-batch gradient descent
            idx = np.random.choice(len(x), 64)
            x0 = x[idx]
            t = np.random.randint(1, n_steps + 1, size=(64,))
            x_t = forward_diffusion(x0, t, beta)
            x_t = torch.tensor(x_t, dtype=torch.float32)
            x0 = torch.tensor(x0, dtype=torch.float32)
            t = torch.tensor(t[:, None] / n_steps, dtype=torch.float32)
            
            optimizer.zero_grad()
            predicted_noise = model(x_t, t)
            noise = (x_t - x0) / np.sqrt(1 - np.cumprod(1 - beta)[t.numpy().astype(int)])
            noise = torch.tensor(noise, dtype=torch.float32)
            loss = mse_loss(predicted_noise, noise)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}")

# Generate data and initialize model
n_samples = 1000
n_steps = 100
beta = np.linspace(1e-4, 0.02, n_steps)

data = generate_swiss_roll(n_samples)
model = DiffusionModel(input_dim=2)
optimizer = Adam(model.parameters(), lr=1e-3)

# Train the model
train_diffusion_model(model, optimizer, data, beta, n_steps, epochs=50)

# Visualize results
def visualize_results(data, beta, model):
    plt.figure(figsize=(12, 4))
    
    # Original data
    plt.subplot(1, 3, 1)
    plt.scatter(data[:, 0], data[:, 1], s=10, alpha=0.7)
    plt.title("Original Data")

    # Noisy data
    noisy_data = forward_diffusion(data, n_steps, beta)
    plt.subplot(1, 3, 2)
    plt.scatter(noisy_data[:, 0], noisy_data[:, 1], s=10, alpha=0.7)
    plt.title("Noisy Data")
    
    # Reconstructed data
    reconstructed = []
    with torch.no_grad():
        x_t = torch.tensor(noisy_data, dtype=torch.float32)
        for t in range(n_steps - 1, -1, -1):
            t_tensor = torch.full((len(data), 1), t / n_steps, dtype=torch.float32)
            predicted_noise = model(x_t, t_tensor)
            x_t = (x_t - np.sqrt(1 - beta[t]) * predicted_noise) / np.sqrt(beta[t])
        reconstructed = x_t.numpy()

    plt.subplot(1, 3, 3)
    plt.scatter(reconstructed[:, 0], reconstructed[:, 1], s=10, alpha=0.7)
    plt.title("Reconstructed Data")

    plt.tight_layout()
    plt.show()

visualize_results(data, beta, model)