### Ньяти Каелиле БВТ2201 - Курсовая Работа

### 1. AutoEncoder (AE) for Noise Reduction

In [1]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
import random
from torchvision.utils import make_grid
import math


In [9]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim=784, hidden_dims=[512, 256, 128], latent_dim=64):
        super(Autoencoder, self).__init__()
        
        # Encoder
        encoder_layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            encoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2)
            ])
            prev_dim = hidden_dim
        encoder_layers.append(nn.Linear(prev_dim, latent_dim))
        self.encoder = nn.Sequential(*encoder_layers)
        
        # Decoder
        decoder_layers = []
        hidden_dims_rev = hidden_dims[::-1]
        prev_dim = latent_dim
        for hidden_dim in hidden_dims_rev:
            decoder_layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2)
            ])
            prev_dim = hidden_dim
        decoder_layers.append(nn.Linear(prev_dim, input_dim))
        decoder_layers.append(nn.Sigmoid())
        self.decoder = nn.Sequential(*decoder_layers)
    
    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [10]:
class AETrainer:
    def __init__(self, model, lr=0.001):
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()
    
    def add_noise(self, images, noise_factor=0.5):
        noisy = images + noise_factor * torch.randn_like(images)
        return torch.clamp(noisy, 0., 1.)
    
    def train_epoch(self, dataloader, device):
        self.model.train()
        total_loss = 0
        
        for batch_idx, (data, _) in enumerate(dataloader):
            data = data.view(data.size(0), -1).to(device)
            
            # Add noise to create input, use clean data as target
            noisy_data = self.add_noise(data)
            
            self.optimizer.zero_grad()
            reconstructed = self.model(noisy_data)
            loss = self.criterion(reconstructed, data)
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
        
        return total_loss / len(dataloader)
    
    def evaluate(self, dataloader, device):
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for data, _ in dataloader:
                data = data.view(data.size(0), -1).to(device)
                noisy_data = self.add_noise(data)
                reconstructed = self.model(noisy_data)
                loss = self.criterion(reconstructed, data)
                total_loss += loss.item()
        
        return total_loss / len(dataloader)

In [11]:
def train_ae():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    transform = transforms.Compose([transforms.ToTensor()])
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST('./data', train=False, transform=transform)
    
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
    
    model = Autoencoder().to(device)
    trainer = AETrainer(model)
    
    epochs = 20
    train_losses = []
    test_losses = []
    
    for epoch in range(epochs):
        train_loss = trainer.train_epoch(train_loader, device)
        test_loss = trainer.evaluate(test_loader, device)
        
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
        
        if (epoch + 1) % 5 == 0:
            visualize_denoising(model, test_loader, device, epoch + 1)
    
    return model, trainer, train_losses, test_losses

In [12]:
def visualize_denoising(model, test_loader, device, epoch):
    model.eval()
    with torch.no_grad():
        data, _ = next(iter(test_loader))
        data = data[:8].to(device)
        noisy_data = data + 0.5 * torch.randn_like(data)
        
        noisy_flat = noisy_data.view(noisy_data.size(0), -1)
        reconstructed = model(noisy_flat)
        reconstructed = reconstructed.view(-1, 1, 28, 28)
        
        fig, axes = plt.subplots(3, 8, figsize=(12, 5))
        for i in range(8):
            axes[0, i].imshow(data[i].cpu().squeeze(), cmap='gray')
            axes[0, i].set_title('Original')
            axes[0, i].axis('off')
            
            axes[1, i].imshow(noisy_data[i].cpu().squeeze(), cmap='gray')
            axes[1, i].set_title('Noisy')
            axes[1, i].axis('off')
            
            axes[2, i].imshow(reconstructed[i].cpu().squeeze(), cmap='gray')
            axes[2, i].set_title('Denoised')
            axes[2, i].axis('off')
        
        plt.suptitle(f'AE Denoising - Epoch {epoch}')
        plt.tight_layout()
        plt.savefig(f'ae_denoising_epoch_{epoch}.png')
        plt.close()

In [13]:
if __name__ == "__main__":
    model, trainer, train_losses, test_losses = train_ae()
    print("Autoencoder training completed!")

Epoch 1/20, Train Loss: 0.0629, Test Loss: 0.0475
Epoch 2/20, Train Loss: 0.0435, Test Loss: 0.0353
Epoch 3/20, Train Loss: 0.0374, Test Loss: 0.0323
Epoch 4/20, Train Loss: 0.0353, Test Loss: 0.0303
Epoch 5/20, Train Loss: 0.0339, Test Loss: 0.0289
Epoch 6/20, Train Loss: 0.0329, Test Loss: 0.0279
Epoch 7/20, Train Loss: 0.0322, Test Loss: 0.0272
Epoch 8/20, Train Loss: 0.0316, Test Loss: 0.0267
Epoch 9/20, Train Loss: 0.0311, Test Loss: 0.0259
Epoch 10/20, Train Loss: 0.0306, Test Loss: 0.0253
Epoch 11/20, Train Loss: 0.0303, Test Loss: 0.0250
Epoch 12/20, Train Loss: 0.0300, Test Loss: 0.0248
Epoch 13/20, Train Loss: 0.0297, Test Loss: 0.0243
Epoch 14/20, Train Loss: 0.0294, Test Loss: 0.0241
Epoch 15/20, Train Loss: 0.0292, Test Loss: 0.0238
Epoch 16/20, Train Loss: 0.0290, Test Loss: 0.0237
Epoch 17/20, Train Loss: 0.0288, Test Loss: 0.0236
Epoch 18/20, Train Loss: 0.0286, Test Loss: 0.0233
Epoch 19/20, Train Loss: 0.0284, Test Loss: 0.0228
Epoch 20/20, Train Loss: 0.0284, Test Lo

### 2. Variational AutoEncoder (VAE)

In [14]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Hyperparameters
batch_size = 128
learning_rate = 1e-3
epochs = 50
latent_dim = 20



Using device: cpu


In [None]:
# Data loading and preprocessing
transform = transforms.Compose([
    transforms.ToTensor(),
])

train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# VAE Model Definition
class VAE(nn.Module):
    def __init__(self, input_dim=784, hidden_dim=400, latent_dim=20):
        super(VAE, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )
        
        # Latent space
        self.fc_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc_logvar = nn.Linear(hidden_dim, latent_dim)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim),
            nn.Sigmoid()  # Output between 0 and 1
        )
    
    def encode(self, x):
        h = self.encoder(x)
        return self.fc_mu(h), self.fc_logvar(h)
    
    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def decode(self, z):
        return self.decoder(z)
    
    def forward(self, x):
        mu, logvar = self.encode(x.view(-1, 784))
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

In [None]:
# Loss function
def vae_loss(recon_x, x, mu, logvar):
    # Reconstruction loss (binary cross entropy)
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction='sum')
    
    # KL divergence
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    
    return BCE + KLD, BCE, KLD

# Initialize model, optimizer
model = VAE(latent_dim=latent_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Function to add noise to images
def add_noise(images, noise_factor=0.5):
    """
    Add Gaussian noise to images
    """
    noisy_images = images + noise_factor * torch.randn_like(images)
    noisy_images = torch.clamp(noisy_images, 0., 1.)
    return noisy_images

In [None]:
# Training function
def train(model, dataloader, optimizer, epoch):
    model.train()
    train_loss = 0
    train_bce = 0
    train_kld = 0
    
    for batch_idx, (data, _) in enumerate(dataloader):
        # Add noise to the data
        noisy_data = add_noise(data, noise_factor=0.5)
        noisy_data = noisy_data.to(device)
        data = data.to(device)
        
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(noisy_data)
        loss, bce, kld = vae_loss(recon_batch, data, mu, logvar)
        loss.backward()
        train_loss += loss.item()
        train_bce += bce.item()
        train_kld += kld.item()
        optimizer.step()
        
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(dataloader.dataset)} '
                  f'({100. * batch_idx / len(dataloader):.0f}%)]\tLoss: {loss.item() / len(data):.6f}')
    
    avg_loss = train_loss / len(dataloader.dataset)
    avg_bce = train_bce / len(dataloader.dataset)
    avg_kld = train_kld / len(dataloader.dataset)
    
    return avg_loss, avg_bce, avg_kld

In [None]:
# Test function
def test(model, dataloader):
    model.eval()
    test_loss = 0
    test_bce = 0
    test_kld = 0
    
    with torch.no_grad():
        for data, _ in dataloader:
            # Add noise to the data
            noisy_data = add_noise(data, noise_factor=0.5)
            noisy_data = noisy_data.to(device)
            data = data.to(device)
            
            recon_batch, mu, logvar = model(noisy_data)
            loss, bce, kld = vae_loss(recon_batch, data, mu, logvar)
            test_loss += loss.item()
            test_bce += bce.item()
            test_kld += kld.item()
    
    avg_loss = test_loss / len(dataloader.dataset)
    avg_bce = test_bce / len(dataloader.dataset)
    avg_kld = test_kld / len(dataloader.dataset)
    
    print(f'====> Test set loss: {avg_loss:.4f}, BCE: {avg_bce:.4f}, KLD: {avg_kld:.4f}')
    return avg_loss, avg_bce, avg_kld

# Training loop
train_losses = []
test_losses = []

print("Starting training...")
for epoch in range(1, epochs + 1):
    train_loss, train_bce, train_kld = train(model, train_loader, optimizer, epoch)
    test_loss, test_bce, test_kld = test(model, test_loader)
    
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    
    print(f'Epoch {epoch}: Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

# Visualization function
def visualize_results(model, dataloader, num_images=8):
    model.eval()
    with torch.no_grad():
        # Get a batch of test data
        data, _ = next(iter(dataloader))
        
        # Add different levels of noise
        noise_levels = [0.3, 0.5, 0.7]
        
        fig, axes = plt.subplots(len(noise_levels) + 1, num_images, figsize=(15, 10))
        
        # Show original images
        for i in range(num_images):
            axes[0, i].imshow(data[i].squeeze(), cmap='gray')
            axes[0, i].set_title('Original')
            axes[0, i].axis('off')
        
        # Show noisy and denoised images for different noise levels
        for j, noise_factor in enumerate(noise_levels):
            noisy_data = add_noise(data, noise_factor=noise_factor)
            noisy_data = noisy_data.to(device)
            recon_data, _, _ = model(noisy_data)
            recon_data = recon_data.cpu()
            
            for i in range(num_images):
                # Noisy image
                axes[j+1, i].imshow(noisy_data[i].cpu().squeeze(), cmap='gray')
                axes[j+1, i].set_title(f'Noisy (σ={noise_factor})')
                axes[j+1, i].axis('off')
                
                # Calculate MSE for this image
                mse = F.mse_loss(recon_data[i].view(1, 28, 28), data[i]).item()
                axes[j+1, i].text(0.5, -0.15, f'MSE: {mse:.4f}', 
                                 transform=axes[j+1, i].transAxes, 
                                 ha='center', fontsize=8)
        
        plt.tight_layout()
        plt.show()

# Calculate quantitative metrics
def calculate_metrics(model, dataloader, noise_factor=0.5):
    model.eval()
    total_mse = 0
    total_psnr = 0
    total_samples = 0
    
    with torch.no_grad():
        for data, _ in dataloader:
            noisy_data = add_noise(data, noise_factor=noise_factor)
            noisy_data = noisy_data.to(device)
            data = data.to(device)
            
            recon_data, _, _ = model(noisy_data)
            
            # Calculate MSE
            mse = F.mse_loss(recon_data, data.view(-1, 784), reduction='none')
            mse = mse.mean(dim=1).sum().item()
            total_mse += mse
            
            # Calculate PSNR
            mse_batch = F.mse_loss(recon_data, data.view(-1, 784), reduction='none')
            mse_batch = mse_batch.mean(dim=1)
            psnr_batch = 20 * torch.log10(1.0 / torch.sqrt(mse_batch))
            total_psnr += psnr_batch.sum().item()
            
            total_samples += data.size(0)
    
    avg_mse = total_mse / total_samples
    avg_psnr = total_psnr / total_samples
    
    print(f"Quantitative Results (noise σ={noise_factor}):")
    print(f"Average MSE: {avg_mse:.6f}")
    print(f"Average PSNR: {avg_psnr:.2f} dB")
    
    return avg_mse, avg_psnr

# Display results
print("\nVisualizing results...")
visualize_results(model, test_loader)

print("\nCalculating quantitative metrics...")
calculate_metrics(model, test_loader, noise_factor=0.5)

# Plot training history
plt.figure(figsize=(10, 6))
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')
plt.plot(range(1, epochs + 1), test_losses, label='Test Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Test Loss')
plt.legend()
plt.grid(True)
plt.show()

# Function to show individual examples with different noise levels
def show_detailed_examples(model, dataloader):
    model.eval()
    with torch.no_grad():
        data, _ = next(iter(dataloader))
        
        noise_factors = [0.2, 0.4, 0.6, 0.8]
        
        fig, axes = plt.subplots(4, 5, figsize=(15, 12))
        
        for i, noise_factor in enumerate(noise_factors):
            # Original
            axes[i, 0].imshow(data[i].squeeze(), cmap='gray')
            axes[i, 0].set_title('Original')
            axes[i, 0].axis('off')
            
            # Noisy
            noisy_data = add_noise(data, noise_factor=noise_factor)
            axes[i, 1].imshow(noisy_data[i].squeeze(), cmap='gray')
            axes[i, 1].set_title(f'Noisy Input\n(σ={noise_factor})')
            axes[i, 1].axis('off')
            
            # Denoised
            noisy_data_device = noisy_data.to(device)
            recon_data, _, _ = model(noisy_data_device)
            recon_img = recon_data[i].cpu().view(28, 28)
            axes[i, 2].imshow(recon_img, cmap='gray')
            axes[i, 2].set_title('VAE Denoised')
            axes[i, 2].axis('off')
            
            # Calculate metrics
            mse_noisy = F.mse_loss(noisy_data[i], data[i]).item()
            mse_denoised = F.mse_loss(recon_img, data[i].squeeze()).item()
            
            # Error maps
            error_noisy = torch.abs(noisy_data[i] - data[i])
            error_denoised = torch.abs(recon_img - data[i].squeeze())
            
            axes[i, 3].imshow(error_noisy.squeeze(), cmap='hot')
            axes[i, 3].set_title(f'Noisy Error\nMSE: {mse_noisy:.4f}')
            axes[i, 3].axis('off')
            
            axes[i, 4].imshow(error_denoised, cmap='hot')
            axes[i, 4].set_title(f'Denoised Error\nMSE: {mse_denoised:.4f}')
            axes[i, 4].axis('off')
        
        plt.tight_layout()
        plt.show()

print("\nShowing detailed examples with error analysis...")
show_detailed_examples(model, test_loader)

### 3. Generative Adversarial Network (GAN)

In [None]:
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Hyperparameters
batch_size = 128
learning_rate = 0.0002
epochs = 50
latent_dim = 100


In [None]:
# Data loading and preprocessing
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
])

train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Fixed noise for visualization
fixed_noise = torch.randn(64, latent_dim, device=device)

# Generator Network
class Generator(nn.Module):
    def __init__(self, latent_dim=100):
        super(Generator, self).__init__()
        
        self.main = nn.Sequential(
            # Input: latent_dim
            nn.Linear(latent_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 1024),
            nn.LeakyReLU(0.2),
            nn.Linear(1024, 784),
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.main(x)
        return x.view(-1, 1, 28, 28)

In [None]:
# Discriminator Network
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        
        self.main = nn.Sequential(
            # Input: 784 (28x28)
            nn.Linear(784, 1024),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(1024, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = x.view(-1, 784)
        return self.main(x)

# Initialize networks
generator = Generator(latent_dim).to(device)
discriminator = Discriminator().to(device)

In [None]:
# Initialize weights
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
        if m.bias is not None:
            nn.init.constant_(m.bias.data, 0)

generator.apply(weights_init)
discriminator.apply(weights_init)

# Two optimizers - one for generator, one for discriminator
optimizer_G = optim.Adam(generator.parameters(), lr=learning_rate, betas=(0.5, 0.999))
optimizer_D = optim.Adam(discriminator.parameters(), lr=learning_rate, betas=(0.5, 0.999))

# Loss function
criterion = nn.BCELoss()

# Lists to track progress
G_losses = []
D_losses = []
D_real_losses = []
D_fake_losses = []

# Training function
def train_gan(epoch):
    generator.train()
    discriminator.train()
    
    for i, (real_imgs, _) in enumerate(train_loader):
        batch_size = real_imgs.size(0)
        
        # Move real images to device
        real_imgs = real_imgs.to(device)
        
        # Create labels
        real_labels = torch.ones(batch_size, 1, device=device)
        fake_labels = torch.zeros(batch_size, 1, device=device)
        
        # ========================
        #  Train Discriminator
        # ========================
        
        optimizer_D.zero_grad()
        
        # Loss with real images
        real_output = discriminator(real_imgs)
        d_loss_real = criterion(real_output, real_labels)
        
        # Generate fake images
        noise = torch.randn(batch_size, latent_dim, device=device)
        fake_imgs = generator(noise)
        
        # Loss with fake images
        fake_output = discriminator(fake_imgs.detach())
        d_loss_fake = criterion(fake_output, fake_labels)
        
        # Total discriminator loss
        d_loss = d_loss_real + d_loss_fake
        d_loss.backward()
        optimizer_D.step()
        
        # ========================
        #  Train Generator
        # ========================
        
        optimizer_G.zero_grad()
        
        # Generate new fake images
        noise = torch.randn(batch_size, latent_dim, device=device)
        fake_imgs = generator(noise)
        
        # Generator wants discriminator to think fake images are real
        fake_output = discriminator(fake_imgs)
        g_loss = criterion(fake_output, real_labels)
        
        g_loss.backward()
        optimizer_G.step()
        
        # Save losses for plotting
        if i % 50 == 0:
            G_losses.append(g_loss.item())
            D_losses.append(d_loss.item())
            D_real_losses.append(d_loss_real.item())
            D_fake_losses.append(d_loss_fake.item())
        
        if i % 100 == 0:
            print(f'[{epoch}/{epochs}][{i}/{len(train_loader)}] '
                  f'D_loss: {d_loss.item():.4f} '
                  f'G_loss: {g_loss.item():.4f} '
                  f'D(x): {real_output.mean().item():.4f} '
                  f'D(G(z)): {fake_output.mean().item():.4f}')

In [None]:
# Function to generate and save images
def generate_images(epoch):
    generator.eval()
    with torch.no_grad():
        fake_images = generator(fixed_noise).detach().cpu()
        
        # Denormalize from [-1, 1] to [0, 1]
        fake_images = (fake_images + 1) / 2
        
        # Create grid of images
        grid = make_grid(fake_images, nrow=8, normalize=True)
        
        # Plot
        plt.figure(figsize=(10, 10))
        plt.imshow(grid.permute(1, 2, 0), cmap='gray')
        plt.title(f'Generated Images - Epoch {epoch}')
        plt.axis('off')
        plt.show()
        
        return grid

# Function to show training progress
def plot_training_progress():
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot losses
    ax1.plot(G_losses, label='Generator Loss')
    ax1.plot(D_losses, label='Discriminator Loss')
    ax1.set_xlabel('Iterations')
    ax1.set_ylabel('Loss')
    ax1.set_title('Generator and Discriminator Losses')
    ax1.legend()
    ax1.grid(True)
    
    # Plot discriminator components
    ax2.plot(D_real_losses, label='D Real Loss', alpha=0.7)
    ax2.plot(D_fake_losses, label='D Fake Loss', alpha=0.7)
    ax2.set_xlabel('Iterations')
    ax2.set_ylabel('Loss')
    ax2.set_title('Discriminator Real vs Fake Losses')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Function to show real vs fake comparison
def show_real_vs_fake():
    # Get some real images
    real_imgs, _ = next(iter(train_loader))
    real_imgs = real_imgs[:32]
    
    # Generate fake images
    generator.eval()
    with torch.no_grad():
        noise = torch.randn(32, latent_dim, device=device)
        fake_imgs = generator(noise).detach().cpu()
    
    # Denormalize
    real_imgs = (real_imgs + 1) / 2
    fake_imgs = (fake_imgs + 1) / 2
    
    # Create grids
    real_grid = make_grid(real_imgs, nrow=8, normalize=False)
    fake_grid = make_grid(fake_imgs, nrow=8, normalize=False)
    
    # Plot
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
    
    ax1.imshow(real_grid.permute(1, 2, 0), cmap='gray')
    ax1.set_title('Real MNIST Images')
    ax1.axis('off')
    
    ax2.imshow(fake_grid.permute(1, 2, 0), cmap='gray')
    ax2.set_title('Generated Images')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

# Function to show interpolation in latent space
def show_latent_interpolation():
    generator.eval()
    with torch.no_grad():
        # Create two random points in latent space
        z1 = torch.randn(1, latent_dim, device=device)
        z2 = torch.randn(1, latent_dim, device=device)
        
        # Interpolate between them
        num_steps = 10
        interpolated = []
        
        for alpha in torch.linspace(0, 1, num_steps):
            z = alpha * z1 + (1 - alpha) * z2
            img = generator(z).detach().cpu()
            interpolated.append(img)
        
        # Create grid
        interpolated = torch.cat(interpolated, 0)
        interpolated = (interpolated + 1) / 2  # Denormalize
        grid = make_grid(interpolated, nrow=num_steps, normalize=False)
        
        # Plot
        plt.figure(figsize=(12, 2))
        plt.imshow(grid.permute(1, 2, 0), cmap='gray')
        plt.title('Latent Space Interpolation')
        plt.axis('off')
        plt.show()

# Training loop
print("Starting GAN training...")
print("Generator architecture:")
print(generator)
print("\nDiscriminator architecture:")
print(discriminator)

for epoch in range(1, epochs + 1):
    train_gan(epoch)
    
    # Generate sample images every 5 epochs
    if epoch % 5 == 0 or epoch == 1:
        print(f"\nGenerating sample images at epoch {epoch}...")
        generate_images(epoch)

# Final results
print("\n" + "="*60)
print("GAN TRAINING COMPLETE")
print("="*60)

# Show final generated images
print("\nFinal generated images:")
final_images = generate_images(epochs)

# Show training progress
print("\nTraining progress:")
plot_training_progress()

# Show real vs fake comparison
print("\nReal vs Generated images comparison:")
show_real_vs_fake()

# Show latent space interpolation
print("\nLatent space interpolation:")
show_latent_interpolation()

In [None]:
# Function to generate multiple samples
def generate_multiple_samples(num_samples=64):
    generator.eval()
    with torch.no_grad():
        noise = torch.randn(num_samples, latent_dim, device=device)
        fake_images = generator(noise).detach().cpu()
        fake_images = (fake_images + 1) / 2  # Denormalize
        
        grid = make_grid(fake_images, nrow=8, normalize=False)
        
        plt.figure(figsize=(12, 12))
        plt.imshow(grid.permute(1, 2, 0), cmap='gray')
        plt.title('Multiple Generated Samples')
        plt.axis('off')
        plt.show()
        
        return fake_images

# Generate multiple samples
print("\nGenerating multiple samples...")
generated_samples = generate_multiple_samples(64)

# Save model
torch.save(generator.state_dict(), 'gan_generator.pth')
torch.save(discriminator.state_dict(), 'gan_discriminator.pth')
print("\nModels saved as 'gan_generator.pth' and 'gan_discriminator.pth'")

# Print final statistics
print(f"\nFinal Statistics:")
print(f"Total generator iterations: {len(G_losses)}")
print(f"Final generator loss: {G_losses[-1]:.4f}")
print(f"Final discriminator loss: {D_losses[-1]:.4f}")
print(f"Training completed on: {device}")

In [None]:
# Additional demonstration: Show evolution of generated images
def show_training_evolution():
    print("\n" + "="*50)
    print("TRAINING EVOLUTION DEMONSTRATION")
    print("="*50)
    
    # Create a new generator and show how images improve with training
    test_generator = Generator(latent_dim).to(device)
    test_generator.apply(weights_init)
    
    # Generate with untrained generator
    with torch.no_grad():
        noise = torch.randn(16, latent_dim, device=device)
        untrained_imgs = test_generator(noise).detach().cpu()
        untrained_imgs = (untrained_imgs + 1) / 2
    
    # Compare with trained generator
    with torch.no_grad():
        trained_imgs = generator(noise).detach().cpu()
        trained_imgs = (trained_imgs + 1) / 2
    
    # Plot comparison
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    
    untrained_grid = make_grid(untrained_imgs, nrow=4, normalize=False)
    trained_grid = make_grid(trained_imgs, nrow=4, normalize=False)
    
    ax1.imshow(untrained_grid.permute(1, 2, 0), cmap='gray')
    ax1.set_title('Untrained Generator\n(Random Output)')
    ax1.axis('off')
    
    ax2.imshow(trained_grid.permute(1, 2, 0), cmap='gray')
    ax2.set_title('Trained Generator\n(Realistic Digits)')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

show_training_evolution()

In [None]:
# Final demonstration
def demonstrate_generation():
    print("\n" + "="*50)
    print("FINAL DEMONSTRATION: GAN IMAGE GENERATION")
    print("="*50)
    
    print("Key points to present during defense:")
    print("✓ The generator takes random noise (100-dimensional vector) as input")
    print("✓ It transforms this noise into realistic 28x28 digit images")
    print("✓ The discriminator learns to distinguish real from fake images")
    print("✓ Two separate optimizers train the networks adversarially")
    print("✓ No labels are used - purely unsupervised learning")
    print("✓ The generated images show the model learned the MNIST data distribution")
    
    # Generate one final impressive set
    print("\nFinal generated images demonstration:")
    generate_multiple_samples(25)

demonstrate_generation()

### 4. Denoising Diffusion Probabilistic Model (DDPM)

In [None]:
# Linear Layer Implementation
class LinearLayer(nn.Module):
    def __init__(self, in_features, out_features, bias=True):
        super(LinearLayer, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        
        self.weight = nn.Parameter(torch.randn(out_features, in_features) * 0.1)
        if bias:
            self.bias = nn.Parameter(torch.zeros(out_features))
        else:
            self.register_parameter('bias', None)
    
    def forward(self, x):
        output = x @ self.weight.t()
        if self.bias is not None:
            output += self.bias
        return output

# Activation Functions
class ReLU(nn.Module):
    def forward(self, x):
        return torch.maximum(x, torch.zeros_like(x))

class SiLU(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)

# Sinusoidal Position Embedding
class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    
    def forward(self, x):
        device = x.device
        half_dim = self.dim // 2
        emb = math.log(10000) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = x[:, None] * emb[None, :]
        emb = torch.cat((torch.sin(emb), torch.cos(emb)), dim=-1)
        return emb

In [None]:
# Improved U-Net Block with proper dimension handling
class UNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, time_emb_dim, is_upsample=False):
        super().__init__()
        self.time_mlp = nn.Linear(time_emb_dim, out_channels)
        
        if is_upsample:
            self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)
        else:
            self.conv1 = nn.Conv2d(in_channels, out_channels, 3, padding=1)
            
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.activation = SiLU()
        
        self.norm1 = nn.BatchNorm2d(out_channels)
        self.norm2 = nn.BatchNorm2d(out_channels)
        
        if in_channels != out_channels:
            self.res_conv = nn.Conv2d(in_channels, out_channels, 1)
        else:
            self.res_conv = nn.Identity()
    
    def forward(self, x, t_emb):
        # First convolution
        h = self.conv1(x)
        h = self.norm1(h)
        h = self.activation(h)
        
        # Add time embedding
        t_emb = self.time_mlp(t_emb)
        h = h + t_emb[:, :, None, None]
        
        # Second convolution
        h = self.conv2(h)
        h = self.norm2(h)
        h = self.activation(h)
        
        # Residual connection
        return h + self.res_conv(x)

In [None]:
# Robust DDPM Model with proper dimension handling
class RobustDiffusionModel(nn.Module):
    def __init__(self, image_size=28, channels=1, dim=32, time_emb_dim=128):
        super().__init__()
        self.image_size = image_size
        self.channels = channels
        
        # Time embedding
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(time_emb_dim),
            LinearLayer(time_emb_dim, time_emb_dim),
            SiLU(),
            LinearLayer(time_emb_dim, time_emb_dim)
        )
        
        # Initial convolution
        self.init_conv = nn.Conv2d(channels, dim, 3, padding=1)
        
        # Encoder path with proper downsampling
        self.enc1 = UNetBlock(dim, dim, time_emb_dim)
        self.enc2 = UNetBlock(dim, dim*2, time_emb_dim)
        self.enc3 = UNetBlock(dim*2, dim*4, time_emb_dim)
        
        # Bottleneck
        self.bottleneck = UNetBlock(dim*4, dim*4, time_emb_dim)
        
        # Decoder path with proper upsampling
        self.dec1 = UNetBlock(dim*8, dim*2, time_emb_dim, is_upsample=True)
        self.dec2 = UNetBlock(dim*4, dim, time_emb_dim, is_upsample=True)
        self.dec3 = UNetBlock(dim*2, dim, time_emb_dim, is_upsample=True)
        
        # Final output
        self.final_conv = nn.Sequential(
            nn.Conv2d(dim, dim, 3, padding=1),
            SiLU(),
            nn.Conv2d(dim, channels, 1)
        )
        
        # Pooling and upsampling
        self.pool = nn.AvgPool2d(2)
    
    def forward(self, x, t):
        # Time embedding
        t_emb = self.time_mlp(t)
        
        # Initial convolution
        x0 = self.init_conv(x)
        
        # Encoder with proper downsampling
        e1 = self.enc1(x0, t_emb)                    # 28x28
        e1_pool = self.pool(e1)                      # 14x14
        
        e2 = self.enc2(e1_pool, t_emb)              # 14x14
        e2_pool = self.pool(e2)                      # 7x7
        
        e3 = self.enc3(e2_pool, t_emb)              # 7x7
        e3_pool = self.pool(e3)                      # 3x3
        
        # Bottleneck
        bottleneck = self.bottleneck(e3_pool, t_emb) # 3x3
        
        # Decoder with proper upsampling and skip connections
        # Upsample to match e3 dimensions
        d1 = F.interpolate(bottleneck, size=e3.shape[2:], mode='nearest')  # 7x7
        d1 = torch.cat([d1, e3], dim=1)              # Concatenate along channels
        d1 = self.dec1(d1, t_emb)                    # 7x7
        
        # Upsample to match e2 dimensions
        d2 = F.interpolate(d1, size=e2.shape[2:], mode='nearest')  # 14x14
        d2 = torch.cat([d2, e2], dim=1)              # Concatenate along channels
        d2 = self.dec2(d2, t_emb)                    # 14x14
        
        # Upsample to match e1 dimensions
        d3 = F.interpolate(d2, size=e1.shape[2:], mode='nearest')  # 28x28
        d3 = torch.cat([d3, e1], dim=1)              # Concatenate along channels
        d3 = self.dec3(d3, t_emb)                    # 28x28
        
        # Final output
        return self.final_conv(d3)

In [None]:
# DDPM Trainer
class DDPM:
    def __init__(self, model, timesteps=200, beta_start=1e-4, beta_end=0.02, device='cuda'):
        self.model = model
        self.timesteps = timesteps
        self.device = device
        
        # Linear noise schedule
        self.betas = torch.linspace(beta_start, beta_end, timesteps, device=device)
        self.alphas = 1. - self.betas
        self.alpha_bars = torch.cumprod(self.alphas, dim=0)
        
        # Pre-calculate values for sampling
        self.sqrt_alpha_bars = torch.sqrt(self.alpha_bars)
        self.sqrt_one_minus_alpha_bars = torch.sqrt(1. - self.alpha_bars)
    
    def sample_timesteps(self, n):
        return torch.randint(0, self.timesteps, (n,), device=self.device)
    
    def noise_images(self, x, t):
        sqrt_alpha_bar = self.sqrt_alpha_bars[t][:, None, None, None]
        sqrt_one_minus_alpha_bar = self.sqrt_one_minus_alpha_bars[t][:, None, None, None]
        epsilon = torch.randn_like(x)
        return sqrt_alpha_bar * x + sqrt_one_minus_alpha_bar * epsilon, epsilon
    
    def train_step(self, x, optimizer):
        optimizer.zero_grad()
        
        t = self.sample_timesteps(x.shape[0])
        x_noisy, noise = self.noise_images(x, t)
        predicted_noise = self.model(x_noisy, t)
        
        loss = F.mse_loss(predicted_noise, noise)
        loss.backward()
        optimizer.step()
        
        return loss.item()
    
    @torch.no_grad()
    def sample(self, n_samples=16, img_size=28, channels=1):
        self.model.eval()
        x = torch.randn((n_samples, channels, img_size, img_size), device=self.device)
        
        for i in tqdm(reversed(range(self.timesteps)), desc='Sampling'):
            t = torch.full((n_samples,), i, device=self.device, dtype=torch.long)
            predicted_noise = self.model(x, t)
            
            alpha = self.alphas[t][:, None, None, None]
            alpha_bar = self.alpha_bars[t][:, None, None, None]
            beta = self.betas[t][:, None, None, None]
            
            if i > 0:
                noise = torch.randn_like(x)
            else:
                noise = torch.zeros_like(x)
            
            # DDPM sampling formula
            x = (1 / torch.sqrt(alpha)) * (
                x - ((1 - alpha) / (torch.sqrt(1 - alpha_bar))) * predicted_noise
            ) + torch.sqrt(beta) * noise
        
        self.model.train()
        return torch.clamp(x, -1., 1.)

In [None]:
# Training function
def train_ddpm():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Data preparation
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    
    train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
    
    # Model and trainer
    model = RobustDiffusionModel().to(device)
    ddpm = DDPM(model, device=device, timesteps=200)  # Reduced timesteps for faster training
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    
    # Training loop
    epochs = 15
    losses = []
    
    for epoch in range(epochs):
        epoch_loss = 0
        model.train()
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs}')
        
        for batch_idx, (data, _) in enumerate(progress_bar):
            data = data.to(device)
            loss = ddpm.train_step(data, optimizer)
            epoch_loss += loss
            
            progress_bar.set_postfix({'Loss': f'{loss:.4f}'})
        
        avg_loss = epoch_loss / len(train_loader)
        losses.append(avg_loss)
        print(f'Epoch {epoch+1}, Average Loss: {avg_loss:.4f}')
        
        # Sample and save images every 3 epochs
        if (epoch + 1) % 3 == 0:
            samples = ddpm.sample(n_samples=16)
            save_samples(samples, epoch + 1)
    
    return model, ddpm, losses

In [None]:
def save_samples(samples, epoch, nrow=4):
    samples = samples.cpu()
    samples = (samples + 1) / 2  # Denormalize
    samples = samples.clamp(0, 1)
    
    fig, axes = plt.subplots(nrow, nrow, figsize=(8, 8))
    for i, ax in enumerate(axes.flat):
        if i < len(samples):
            ax.imshow(samples[i].squeeze(), cmap='gray')
        ax.axis('off')
    
    plt.tight_layout()
    plt.savefig(f'ddpm_samples_epoch_{epoch}.png')
    plt.close()
    print(f"Saved samples for epoch {epoch}")


def test_model():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Testing on device: {device}")
    
    model = RobustDiffusionModel().to(device)
    
    # Test with different batch sizes
    batch_sizes = [1, 4, 8]
    
    for batch_size in batch_sizes:
        print(f"\nTesting with batch size {batch_size}:")
        
        # Test forward pass
        x = torch.randn(batch_size, 1, 28, 28).to(device)
        t = torch.randint(0, 200, (batch_size,)).to(device)
        
        try:
            with torch.no_grad():
                output = model(x, t)
            
            print(f"  Input shape: {x.shape}")
            print(f"  Output shape: {output.shape}")
            print(f"  Forward pass successful")
            
            # Test that input and output have same spatial dimensions
            assert x.shape[2:] == output.shape[2:], "Spatial dimensions don't match!"
            print(f"  Spatial dimensions match")
            
        except Exception as e:
            print(f"  Error: {e}")
            return False
    
    print("\n All tests passed! Model architecture is correct.")
    return True

In [None]:
if __name__ == "__main__":
    print("Testing model architecture...")
    success = test_model()
    
    if success:
        print("\nStarting training...")
        model, ddpm, losses = train_ddpm()
        
        # Plot training loss
        plt.figure(figsize=(10, 5))
        plt.plot(losses)
        plt.title('DDPM Training Loss')
        plt.xlabel('Epoch')
        plt.ylabel('MSE Loss')
        plt.grid(True)
        plt.savefig('ddpm_training_loss.png')
        plt.show()
        
        # Generate final samples
        print("Generating final samples...")
        final_samples = ddpm.sample(n_samples=64)
        save_samples(final_samples, "final")
        
        print("Training completed successfully!")
    else:
        print("Model testing failed. Please check the architecture.")

### 5. Vector Autoregressive Model

In [None]:
# VAR Model for MNIST with Linear Layers
class MNISTVARModel(nn.Module):
    def __init__(self, input_dim, lag_order=5, hidden_dims=[512, 256, 128]):
        super(MNISTVARModel, self).__init__()
        self.input_dim = input_dim
        self.lag_order = lag_order
        
        # Flatten the lagged sequences: lag_order * input_dim
        self.flattened_dim = lag_order * input_dim
        
        # Linear layers for VAR modeling
        layers = []
        prev_dim = self.flattened_dim
        
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.3)
            ])
            prev_dim = hidden_dim
        
        self.encoder = nn.Sequential(*layers)
        
        # Output layer to predict all variables (pixels)
        self.output_layer = nn.Sequential(
            nn.Linear(prev_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, input_dim),
            nn.Tanh()  # Output in range [-1, 1] for normalized images
        )
        
    def forward(self, x):
        # x shape: (batch_size, lag_order, input_dim)
        batch_size = x.shape[0]
        
        # Flatten the lagged sequences
        x_flat = x.view(batch_size, -1)  # (batch_size, lag_order * input_dim)
        
        # Process through linear layers
        encoded = self.encoder(x_flat)
        
        # Predict next time step (next MNIST image)
        prediction = self.output_layer(encoded)  # (batch_size, input_dim)
        
        return prediction

In [None]:
# MNIST VAR Trainer
class MNISTVARTrainer:
    def __init__(self, model, lr=0.001):
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
        self.criterion = nn.MSELoss()
        self.scaler = StandardScaler()
    
    def create_mnist_sequences(self, data, labels, lag_order, sequence_length=1000):
        """Create sequential sequences from MNIST data"""
        # Treat different MNIST samples as sequential time steps
        sequences = []
        targets = []
        
        # Use first sequence_length samples to create sequences
        data_subset = data[:sequence_length + lag_order]
        
        for i in range(len(data_subset) - lag_order):
            seq = data_subset[i:i+lag_order]  # (lag_order, input_dim)
            target = data_subset[i+lag_order]  # (input_dim)
            sequences.append(seq)
            targets.append(target)
        
        return torch.FloatTensor(np.array(sequences)), torch.FloatTensor(np.array(targets))
    
    def prepare_mnist_data(self, batch_size=64, train_ratio=0.8):
        """Load and prepare MNIST data for VAR modeling"""
        transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,)),  # Normalize to [-1, 1]
            transforms.Lambda(lambda x: x.view(-1))  # Flatten images
        ])
        
        # Load MNIST dataset
        train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
        test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
        
        # Combine train and test for more sequential data
        all_data = torch.cat([train_dataset.data, test_dataset.data])
        all_labels = torch.cat([train_dataset.targets, test_dataset.targets])
        
        # Convert to numpy and normalize
        all_data = all_data.float() / 255.0  # Normalize to [0, 1]
        all_data = all_data.numpy()
        all_labels = all_labels.numpy()
        
        # Flatten images
        all_data_flat = all_data.reshape(len(all_data), -1)
        
        # Create sequences
        sequences, targets = self.create_mnist_sequences(all_data_flat, all_labels, self.model.lag_order)
        
        # Split into train and test
        split_idx = int(len(sequences) * train_ratio)
        
        X_train, X_test = sequences[:split_idx], sequences[split_idx:]
        y_train, y_test = targets[:split_idx], targets[split_idx:]
        
        print(f"Created {len(X_train)} training sequences and {len(X_test)} test sequences")
        print(f"Input shape: {X_train.shape}, Target shape: {y_train.shape}")
        
        return (X_train, y_train, X_test, y_test), all_data, all_labels
    
    def train(self, epochs=100, patience=10):
        """Train the VAR model with early stopping"""
        # Prepare data
        (X_train, y_train, X_test, y_test), original_data, labels = self.prepare_mnist_data()
        
        train_losses = []
        test_losses = []
        best_test_loss = float('inf')
        patience_counter = 0
        
        for epoch in range(epochs):
            # Training
            self.model.train()
            self.optimizer.zero_grad()
            
            train_predictions = self.model(X_train)
            train_loss = self.criterion(train_predictions, y_train)
            train_loss.backward()
            self.optimizer.step()
            
            # Validation
            self.model.eval()
            with torch.no_grad():
                test_predictions = self.model(X_test)
                test_loss = self.criterion(test_predictions, y_test)
            
            train_losses.append(train_loss.item())
            test_losses.append(test_loss.item())
            
            # Early stopping
            if test_loss < best_test_loss:
                best_test_loss = test_loss
                patience_counter = 0
                torch.save(self.model.state_dict(), 'best_mnist_var_model.pth')
            else:
                patience_counter += 1
            
            if epoch % 10 == 0:
                print(f'Epoch {epoch:3d}/{epochs}: Train Loss: {train_loss:.6f}, Test Loss: {test_loss:.6f}')
            
            if patience_counter >= patience:
                print(f'Early stopping at epoch {epoch}')
                break
        
        # Load best model
        self.model.load_state_dict(torch.load('best_mnist_var_model.pth'))
        
        return train_losses, test_losses, (X_train, y_train, X_test, y_test), original_data, labels
    
    @torch.no_grad()
    def predict_sequence(self, initial_sequence, steps=10):
        """Generate a sequence of predictions"""
        self.model.eval()
        
        predictions = []
        current_sequence = initial_sequence.clone()
        
        for _ in range(steps):
            # Get prediction for next time step
            pred = self.model(current_sequence.unsqueeze(0)).squeeze(0)
            predictions.append(pred.numpy())
            
            # Update sequence: remove oldest, add new prediction
            current_sequence = torch.cat([current_sequence[1:], pred.unsqueeze(0)])
        
        return np.array(predictions)
    
    @torch.no_grad()
    def reconstruct_images(self, test_sequences, num_samples=5):
        """Reconstruct images using the VAR model"""
        self.model.eval()
        
        reconstructions = []
        original_images = []
        
        for i in range(num_samples):
            # Get test sequence and target
            test_seq = test_sequences[i].unsqueeze(0)
            target_img = test_sequences[i+1][-1]  # The actual next image
            
            # Predict next image
            pred_img = self.model(test_seq).squeeze(0)
            
            reconstructions.append(pred_img.numpy())
            original_images.append(target_img.numpy())
        
        return np.array(reconstructions), np.array(original_images)


In [None]:
# Visualization functions for MNIST
def plot_mnist_training_curves(train_losses, test_losses):
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(test_losses, label='Test Loss')
    plt.title('MNIST VAR Model Training Progress')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss')
    plt.legend()
    plt.grid(True)
    
    plt.subplot(1, 2, 2)
    plt.semilogy(train_losses, label='Training Loss (log)')
    plt.semilogy(test_losses, label='Test Loss (log)')
    plt.title('Training Progress (Log Scale)')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss (log)')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig('mnist_var_training_curves.png', dpi=300, bbox_inches='tight')
    plt.show()

In [None]:
def plot_mnist_reconstructions(reconstructions, originals, num_samples=5):
    """Plot original MNIST images vs reconstructions"""
    fig, axes = plt.subplots(2, num_samples, figsize=(15, 6))
    
    for i in range(num_samples):
        # Original image
        orig_img = originals[i].reshape(28, 28)
        axes[0, i].imshow(orig_img, cmap='gray')
        axes[0, i].set_title(f'Original {i+1}')
        axes[0, i].axis('off')
        
        # Reconstructed image
        recon_img = reconstructions[i].reshape(28, 28)
        axes[1, i].imshow(recon_img, cmap='gray')
        axes[1, i].set_title(f'Reconstructed {i+1}')
        axes[1, i].axis('off')
    
    plt.suptitle('MNIST VAR Model: Original vs Reconstructed Images')
    plt.tight_layout()
    plt.savefig('mnist_var_reconstructions.png', dpi=300, bbox_inches='tight')
    plt.show()

In [None]:
def plot_sequence_generation(initial_sequence, generated_sequence, original_data, lag_order):
    """Plot the sequence generation process"""
    num_steps = len(generated_sequence)
    fig, axes = plt.subplots(2, num_steps + 1, figsize=(15, 6))
    
    # Plot initial sequence (lag_order images)
    for i in range(lag_order):
        img = initial_sequence[i].reshape(28, 28)
        axes[0, i].imshow(img, cmap='gray')
        axes[0, i].set_title(f'Initial {i+1}')
        axes[0, i].axis('off')
    
    axes[0, lag_order].axis('off')
    
    # Plot generated sequence
    for i in range(num_steps):
        img = generated_sequence[i].reshape(28, 28)
        axes[1, i].imshow(img, cmap='gray')
        axes[1, i].set_title(f'Generated {i+1}')
        axes[1, i].axis('off')
    
    axes[1, num_steps].axis('off')
    
    plt.suptitle('MNIST VAR Sequence Generation')
    plt.tight_layout()
    plt.savefig('mnist_var_sequence_generation.png', dpi=300, bbox_inches='tight')
    plt.show()

In [None]:
def plot_pixel_correlations(original_data, sample_size=1000):
    """Plot correlation between different pixel regions"""
    # Use a subset of data
    data_subset = original_data[:sample_size].reshape(sample_size, -1)
    
    # Calculate correlation matrix for a subset of pixels
    pixel_indices = np.random.choice(data_subset.shape[1], 100, replace=False)
    corr_matrix = np.corrcoef(data_subset[:, pixel_indices].T)
    
    plt.figure(figsize=(10, 8))
    im = plt.imshow(corr_matrix, cmap='coolwarm', vmin=-1, vmax=1)
    plt.colorbar(im)
    plt.title('Pixel Correlation Matrix (100 random pixels)')
    plt.xlabel('Pixel Index')
    plt.ylabel('Pixel Index')
    
    plt.tight_layout()
    plt.savefig('mnist_pixel_correlations.png', dpi=300, bbox_inches='tight')
    plt.show()

In [None]:
# Main execution
def main():
    print("VAR Model for MNIST with Linear Layers")
    print("=" * 50)
    
    # Model parameters
    input_dim = 28 * 28  # MNIST image size
    lag_order = 5  # How many past images to use for prediction
    hidden_dims = [1024, 512, 256]  # Linear layer dimensions
    
    # Initialize model and trainer
    model = MNISTVARModel(input_dim=input_dim, lag_order=lag_order, hidden_dims=hidden_dims)
    
    trainer = MNISTVARTrainer(model, lr=0.001)
    
    print(f"Model Architecture:")
    print(f"- Input dimension: {input_dim} (28x28 MNIST images)")
    print(f"- Lag order: {lag_order}")
    print(f"- Hidden dimensions: {hidden_dims}")
    print(f"- Total parameters: {sum(p.numel() for p in model.parameters()):,}")
    print()
    
    # Train the model
    print("Training MNIST VAR model...")
    train_losses, test_losses, (X_train, y_train, X_test, y_test), original_data, labels = trainer.train(epochs=100)
    
    print(f"\nTraining completed!")
    print(f"Final Training Loss: {train_losses[-1]:.6f}")
    print(f"Final Test Loss: {test_losses[-1]:.6f}")
    
    # Plot training results
    plot_mnist_training_curves(train_losses, test_losses)
    
    # Plot pixel correlations
    plot_pixel_correlations(original_data)
    
    # Generate reconstructions
    print("\nGenerating image reconstructions...")
    reconstructions, originals = trainer.reconstruct_images(X_test, num_samples=8)
    plot_mnist_reconstructions(reconstructions, originals, num_samples=8)
    
    # Generate sequence
    print("Generating image sequence...")
    initial_sequence = X_test[0]  # First test sequence
    generated_sequence = trainer.predict_sequence(initial_sequence, steps=8)
    plot_sequence_generation(initial_sequence, generated_sequence, original_data, lag_order)
    
    # Statistical analysis
    print(f"\nStatistical Summary:")
    print(f"Best training loss: {min(train_losses):.6f}")
    print(f"Best test loss: {min(test_losses):.6f}")
    
    # Calculate reconstruction quality
    mse_reconstruction = np.mean((reconstructions - originals) ** 2)
    print(f"Average reconstruction MSE: {mse_reconstruction:.6f}")
    
    # Show sample predictions vs actuals
    print(f"\nSample Pixel-wise Comparison (first 10 pixels):")
    sample_pred = reconstructions[0][:10]
    sample_actual = originals[0][:10]
    
    for i in range(10):
        print(f"Pixel {i}: Pred {sample_pred[i]:.3f}, Actual {sample_actual[i]:.3f}, Diff {abs(sample_pred[i]-sample_actual[i]):.3f}")

if __name__ == "__main__":
    main()