In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os
import numpy as np

# --- 1. Define Model Parameters ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")

BATCH_SIZE = 128
EPOCHS = 10
LEARNING_RATE = 1e-3
LATENT_DIM = 20  # Latent dimension
INPUT_DIM = 28 * 28  # 784
HIDDEN_DIM = 400

# --- 2. VAE Model Architecture ---
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        # --- Encoder ---
        # Takes input (784) and maps to hidden (400)
        self.fc1 = nn.Linear(INPUT_DIM, HIDDEN_DIM)
        # Hidden (400) maps to latent mean (20)
        self.fc2_mu = nn.Linear(HIDDEN_DIM, LATENT_DIM)
        # Hidden (400) maps to latent log-variance (20)
        self.fc2_logvar = nn.Linear(HIDDEN_DIM, LATENT_DIM)

        # --- Decoder ---
        # Takes latent (20) and maps to hidden (400)
        self.fc3 = nn.Linear(LATENT_DIM, HIDDEN_DIM)
        # Hidden (400) maps to output (784)
        self.fc4 = nn.Linear(HIDDEN_DIM, INPUT_DIM)

    def encode(self, x):
        h = F.relu(self.fc1(x))
        mu = self.fc2_mu(h)
        logvar = self.fc2_logvar(h)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        """
        The Reparameterization Trick: z = mu + std * epsilon
        This allows gradients to flow back to the encoder.
        """
        std = torch.exp(0.5 * logvar)
        epsilon = torch.randn_like(std)  # Sample from N(0, 1)
        return mu + std * epsilon

    def decode(self, z):
        h = F.relu(self.fc3(z))
        # Use sigmoid to output probabilities (pixels 0-1)
        return torch.sigmoid(self.fc4(h))

    def forward(self, x):
        # x.view(-1, 784) flattens the 28x28 image
        mu, logvar = self.encode(x.view(-1, INPUT_DIM))
        z = self.reparameterize(mu, logvar)
        recon_x = self.decode(z)
        return recon_x, mu, logvar

# --- 3. VAE Loss Function (The ELBO) ---
def vae_loss_function(recon_x, x, mu, logvar):
    """
    Computes the VAE loss, which is the negative ELBO.
    Loss = Reconstruction_Loss + KL_Divergence
    """
    # 1. Reconstruction Loss (Binary Cross-Entropy)
    # Measures how well the decoder rebuilt the input.
    # We use 'sum' reduction because we sum over pixels.
    BCE = F.binary_cross_entropy(recon_x, x.view(-1, INPUT_DIM), reduction='sum')

    # 2. KL Divergence (Analytic Formula)
    # Measures the "cost" of the encoder's latent distribution q(z|x)
    # straying from the "dumb" prior p(z) = N(0, 1).
    # KLD = -0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return BCE + KLD, BCE, KLD

# --- 4. Training Step Function ---
def train_step(epoch, model, train_loader, optimizer):
    model.train()
    train_loss = 0
    
    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(DEVICE)
        
        # --- This is the "Classic VAE" update ---
        optimizer.zero_grad()
        
        # Forward pass
        recon_batch, mu, logvar = model(data)
        
        # Calculate loss (ELBO)
        loss, bce, kld = vae_loss_function(recon_batch, data, mu, logvar)
        
        # Backward pass (calculates gradients for ALL parameters)
        loss.backward()
        
        train_loss += loss.item()
        
        # Optimizer step (updates ALL parameters: encoder + decoder)
        optimizer.step()
        # -------------------------------------------

        if batch_idx % 100 == 0:
            print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] '
                  f'Loss: {loss.item() / len(data):.4f} '
                  f'(BCE: {bce.item() / len(data):.4f}, '
                  f'KLD: {kld.item() / len(data):.4f})')

    avg_loss = train_loss / len(train_loader.dataset)
    print(f'====> Epoch: {epoch} Average loss: {avg_loss:.4f}')

# --- 5. Main Execution ---
def main():
    # Load MNIST Dataset
    transform = transforms.ToTensor()
    # Download MNIST
    data_path = './data'
    if not os.path.exists(data_path):
        os.makedirs(data_path)
        
    train_dataset = datasets.MNIST(data_path, train=True, download=True, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

    # Initialize model and optimizer
    model = VAE().to(DEVICE)
    # The classic VAE uses ONE optimizer for ALL parameters
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Run training
    for epoch in range(1, EPOCHS + 1):
        train_step(epoch, model, train_loader, optimizer)

    print("Baseline training complete.")
    # In a real test, you would save the model and run test_step
    # and generate sample images here.

if __name__ == "__main__":
    main()


Using device: cpu


RuntimeError: Numpy is not available