# Assignment 4
 - a) We will be working with the **AFHQ (Animal Faces-HQ)** dataset: https://github.com/clovaai/stargan-v2
    - Approx 15.000 images of animal faces (cat, dog, tiger, ...)
    - Downsample the images to 64x64
 - b) Write **Convolutional** Variational Autoencoder (ConvVAE)
      - Use Conv. layers for encoder and Transposed-Conv. layers for decoder.
      - You are only allowed to use linear layers for estimating the mean and standard deviation. Everything else should be convolutional.
 - c) Investigate the importance of the KL-divergence weight. For this purpose, train multiple models (at least 4) using different weighting values and investigate how this value affects the generation performance.
 - d) Generate new images by sampling latent vectors, investigate latent space and visualize some interpolations.
 - e) Compare the models from b) and c)
     - Qualitative comparison. Which images look better?
     - Quantitative comparison between models using the Fréchet Inception Distance: https://arxiv.org/abs/1706.08500
     - Log generated images and losses into the Tensorboard/W&B
     
     
**Extra Point:**
 - Extend your ConvVAE for Image generation conditioned on a given class. The AFHQ dataset has 3 classes: 'cat', 'dog', and 'wildlife'
 - Train your Conditional-ConvVAE
 - Show that you can generate images conditioned on a label
 - Tutorial: https://ijdykeman.github.io/ml/2016/12/21/cvae.html

In [1]:
import os
import shutil
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, models, transforms
from torchvision.utils import save_image
from utils import *
print(torch.cuda.is_available())

2025-06-03 15:29:16.633552: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-06-03 15:29:16.643916: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748957356.656104   17896 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748957356.659720   17896 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1748957356.669089   17896 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

True


In [2]:
config={"data_dir": "data/AFHQ",
        "img_size": 64,
        "img_channels": 3,
        "batch_size": 256,
        "num_workers": 8,
        "savepath" : "imgs/vanilla_vae",
        "num_epochs": 15
        }
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Computing mean, std of dataset

In [3]:
dataset = datasets.ImageFolder(
    root=config["data_dir"]+"/train",
    transform=transforms.ToTensor()  # Converts images to [C, H, W] with values in [0, 1]
)
"""mean, std = compute_stats(dataset)
print(f"Mean (R, G, B): {mean.tolist()}")
print(f"Std  (R, G, B): {std.tolist()}")
"""

mean = torch.tensor([0.5018709301948547, 0.4601306915283203, 0.3988320827484131])
std = torch.tensor([0.22490206360816956, 0.2184346467256546, 0.21783076226711273])
"""computed:
Mean (R, G, B): [0.5018709301948547, 0.4601306915283203, 0.3988320827484131]
Std  (R, G, B): [0.22490206360816956, 0.2184346467256546, 0.21783076226711273]
"""

'computed:\nMean (R, G, B): [0.5018709301948547, 0.4601306915283203, 0.3988320827484131]\nStd  (R, G, B): [0.22490206360816956, 0.2184346467256546, 0.21783076226711273]\n'

## Preparing data loaders

In [4]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(config["img_size"]),
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
    ]),
    'val': transforms.Compose([
        transforms.Resize(config["img_size"]),
        transforms.ToTensor(),
        transforms.Normalize(mean,std)
    ]),
}

train_dataset = datasets.ImageFolder(os.path.join(config["data_dir"], "train"), data_transforms["train"])
valid_dataset = datasets.ImageFolder(os.path.join(config["data_dir"], "test"), data_transforms["val"])

N_train = len(train_dataset)
N_valid = len(valid_dataset)
print(f"Training set size: {N_train} images")
print(f"Valdiation set size: {N_valid} images")

train_loader = torch.utils.data.DataLoader(train_dataset, config["batch_size"], shuffle=True, num_workers=config["num_workers"])
valid_loader = torch.utils.data.DataLoader(valid_dataset, config["batch_size"], shuffle=False, num_workers=config["num_workers"])

#for i, (images, _) in enumerate(valid_loader):
#    print(images.shape)
#    break

class_names = train_dataset.classes
print(class_names)

Training set size: 14336 images
Valdiation set size: 1467 images
['cat', 'dog', 'wild']


In [5]:
def show_grid(data, titles=None):
    """Imshow for Tensor."""
    data = data.numpy().transpose((0, 2, 3, 1))
    data = std * data + mean  # IMPORTANT! If you normalize imgs in the DataLoader, undo the norm. for visualization
    data = np.clip(data, 0, 1)
    
    plt.figure(figsize=(8*2, 4*2))
    for i in range(32):
        plt.subplot(4,8,i+1)
        plt.imshow(data[i])
        plt.axis("off")
        if titles is not None:
            plt.title(titles[i])
    plt.tight_layout()
    plt.show()
            
# Get a batch of training data and displaying it
#inputs, classes = next(iter(train_loader))
#inputs, classes = next(iter(valid_loader))
#titles = [class_names[x] for x in classes]
#inputs.shape
#show_grid(inputs, titles=titles)

## Convolutional Variational Auto Encoders

In [None]:
class ConvolutionalVAE(nn.Module):
    def __init__(self, in_size=(3, 32, 32), 
                 channel_sizes=[3,16,32,64,128],
                 latent_dim=64,
                 act_final="Sigmoid",
                 act_hidden="ReLU", 
                 kernel_size=3, 
                 padding=1,
                 stride=1):
        
        super().__init__()
        self.in_size = in_size
        self.channel_sizes = channel_sizes
        self.activation_hidden = get_activation(act_hidden)
        self.activation_final= get_activation(act_final)
        self.kernel_size = kernel_size
        self.padding = padding
        self.stride = stride
        self.latent_dim = latent_dim

        self.encoder,self.latent_input_size = self._make_encoder()
        self.decoder = self._make_decoder()
        self.fc_mu = nn.Linear(self.latent_input_size, latent_dim)
        self.fc_sigma = nn.Linear(self.latent_input_size, latent_dim)
        return
        
    def _make_encoder(self):
        """ Defining encoder """
        #layers = [nn.Flatten()]
        layers = []
        for i in range(len(self.channel_sizes)-1):
            layers.append( nn.Conv2d(in_channels=self.channel_sizes[i], 
                                     out_channels=self.channel_sizes[i+1], 
                                     kernel_size=self.kernel_size, 
                                     padding=self.padding,
                                     stride=self.stride) )
            layers.append(nn.BatchNorm2d(self.channel_sizes[i+1]))
            layers.append(self.activation_hidden)
            if i ==0:
                img_size = compute_image_size(list(self.in_size[1:3]), np.ones(2)*self.kernel_size, np.ones(2)*self.padding, np.ones(2)*self.stride)
            else:
                img_size = compute_image_size(img_size, np.ones(2)*self.kernel_size, np.ones(2)*self.padding, np.ones(2)*self.stride)
        
        layers.append(nn.Flatten())
        encoder = nn.Sequential(*layers)
        latent_input_size=int(np.prod(img_size)*self.channel_sizes[-1])
        return encoder, latent_input_size
    
    def _make_decoder(self):
        """ Defining decoder """
        layers = [nn.Linear(in_features=self.latent_dim,out_features=self.latent_input_size),self.activation_hidden]
        
        for i in range(1, len(self.channel_sizes)):
            layers.append( nn.ConvTranspose2d(in_channels=self.channel_sizes[-i], 
                                              out_channels=self.channel_sizes[-i-1], 
                                              kernel_size=self.kernel_size, 
                                              padding=self.padding,
                                              stride=self.stride) )
            layers.append(nn.BatchNorm2d(self.channel_sizes[-i-1]))
            layers.append(self.activation_hidden)

        layers = layers[:-1] + [self.activation_final]
        decoder = nn.Sequential(*layers)
        return decoder
    
    def reparameterize(self, mu, log_var):
        """ Reparametrization trick"""
        std = torch.exp(0.5*log_var)  # we can also predict the std directly, but this works best
        eps = torch.randn_like(std)  # random sampling happens here
        z = mu + std * eps
        return z
    
    def forward(self, x):
        """ Forward pass """
        print(f"x Input shape: {x.shape}")
        x_enc = self.encoder(x)
        print(f"x_enc Input shape: {x_enc.shape}")
        
        mu = self.fc_mu(x_enc)
        log_var = self.fc_sigma(x_enc)
        print(f"mu shape: {mu.shape}, log_var shape: {log_var.shape}")
        z = self.reparameterize(mu, log_var)
        print(f"z shape: {z.shape}")
        x_hat_flat = self.decoder(z)
        print(f"x_hat_flat shape: {x_hat_flat.shape}")
        x_hat = x_hat_flat.view(-1, *self.in_size)
        print(f"x_hat shape: {x_hat.shape}")
        return x_hat, (z, mu, log_var)

### Training

In [7]:
def cvae_loss_function(recons, target, mu, log_var, lambda_kld=1e-3):
    recons_loss = F.mse_loss(recons, target)
    kld = (-0.5 * (1 + log_var - mu**2 - log_var.exp()).sum(dim=1)).mean(dim=0)  # closed-form solution of KLD in Gaussian
    loss = recons_loss + lambda_kld * kld
    return loss, (recons_loss, kld)

In [8]:
def train_epoch(model, train_loader, optimizer, criterion, epoch, device):
    """ Training a model for one epoch """
    
    loss_list = []
    recons_loss = []
    vae_loss = []
    
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader))
    for i, (images, _) in progress_bar:
        images = images.to(device)
        
        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()
         
        # Forward pass
        recons, (z, mu, log_var) = model(images)
         
        # Calculate Loss
        loss, (mse, kld) = criterion(recons, images, mu, log_var)
        loss_list.append(loss.item())
        recons_loss.append(mse.item())
        vae_loss.append(kld.item())
        
        # Getting gradients w.r.t. parameters
        loss.backward()
         
        # Updating parameters
        optimizer.step()
        
        progress_bar.set_description(f"Epoch {epoch+1} Iter {i+1}: loss {loss.item():.5f}. ")
        
    mean_loss = np.mean(loss_list)
    
    return mean_loss, loss_list


@torch.no_grad()
def eval_model(model, eval_loader, criterion, device, epoch=None, savefig=False, savepath="", writer=None):
    """ Evaluating the model for either validation or test """
    loss_list = []
    recons_loss = []
    kld_loss = []
    
    for i, (images, _) in enumerate(eval_loader):
        images = images.to(device)
        print(f"****{images.shape=}")
        # Forward pass 
        recons, (z, mu, log_var) = model(images)
                 
        loss, (mse, kld) = criterion(recons, images, mu, log_var)
        loss_list.append(loss.item())
        recons_loss.append(mse.item())
        kld_loss.append(kld.item())
        
        if(i==0 and savefig):
            save_image( recons[:64].cpu(), os.path.join(savepath, f"recons{epoch}.png") )
            if writer is not None:
                grid = torchvision.utils.make_grid(images[:64].cpu())
                writer.add_image('images', grid, epoch)
                grid = torchvision.utils.make_grid(recons[:64].cpu())
                writer.add_image('output_images', grid, epoch)
            
    # Total correct predictions and loss
    loss = np.mean(loss_list)
    recons_loss = np.mean(recons_loss)
    kld_loss = np.mean(kld_loss)
    return loss, recons_loss, kld_loss


def train_model(model, optimizer, scheduler, criterion, train_loader, valid_loader,
                num_epochs, savepath, writer, save_frequency=5, vis_frequency=2):
    """ Training a model for a given number of epochs"""
    
    train_loss = []
    val_loss =  []
    val_loss_recons =  []
    val_loss_kld =  []
    loss_iters = []
    
    for epoch in range(num_epochs):
           
        # validation epoch
        model.eval()  # important for dropout and batch norms
        log_epoch = (epoch % vis_frequency == 0 or epoch == num_epochs - 1)
        loss, recons_loss, kld_loss = eval_model(
                model=model, eval_loader=valid_loader, criterion=criterion,
                device=device, epoch=epoch, savefig=log_epoch, savepath=savepath,
                writer=writer
            )
        val_loss.append(loss)
        val_loss_recons.append(recons_loss)
        val_loss_kld.append(kld_loss)

        writer.add_scalar(f'Loss/Valid', loss, global_step=epoch)
        writer.add_scalars(f'Loss/All_Valid_Loss', {"recons": recons_loss.item(), "kld": kld_loss.item()}, global_step=epoch)
        
        # training epoch
        model.train()  # important for dropout and batch norms
        mean_loss, cur_loss_iters = train_epoch(
                model=model, train_loader=train_loader, optimizer=optimizer,
                criterion=criterion, epoch=epoch, device=device
            )
        writer.add_scalar(f'Loss/Train', mean_loss, global_step=epoch)
        writer.add_scalars(f'Loss/Comb', {"train": mean_loss.item(), "valid": loss.item()}, global_step=epoch)
        
        # PLATEAU SCHEDULER
        scheduler.step(val_loss[-1])
        train_loss.append(mean_loss)
        loss_iters = loss_iters + cur_loss_iters
        
        if(epoch % save_frequency == 0):
            stats = {
                "train_loss": train_loss,
                "valid_loss": val_loss,
                "loss_iters": loss_iters
            }
            save_model(model=model, optimizer=optimizer, epoch=epoch, stats=stats)
        
        if(log_epoch):
            print(f"    Train loss: {round(mean_loss, 5)}")
            print(f"    Valid loss: {round(loss, 5)}")
            print(f"       Valid loss recons: {round(val_loss_recons[-1], 5)}")
            print(f"       Valid loss KL-D:   {round(val_loss_kld[-1], 5)}")
    
    print(f"Training completed")
    return train_loss, val_loss, loss_iters, val_loss_recons, val_loss_kld



In [9]:
model = ConvolutionalVAE(in_size=(config["img_channels"],config["img_size"],config["img_size"]), 
                         channel_sizes=[3,16,32,64,128], 
                         act_hidden="ReLU",
                         latent_dim=64,
                         stride=2,
                         padding=1,
                         kernel_size=3).to(device)
#print(model)
print(f"model param count={count_model_params(model)}")
writer=getTensorboardWriter(params=[os.getcwd(),"tboard_logs", "cvae"])
optimizer = torch.optim.Adam(model.parameters(), lr=3e-3)
# Decay LR by a factor of 10 after 5 epochs with no improvement
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.1, verbose=True)
model

model param count=590857




ConvolutionalVAE(
  (activation_hidden): ReLU()
  (activation_final): Sigmoid()
  (encoder): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (7): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
    (9): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (10): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): ReLU()
    (12): Flatten(start_dim=1, end_dim=-1)
  )
  (decoder): Sequential(
    (0): Linear(in_features=64, out_features=2048, bias=True)
    (1): ReLU()
    (2): ConvTranspose

In [10]:
train_loss, val_loss, loss_iters, val_loss_recons, val_loss_kld = train_model(
        model=model, optimizer=optimizer, scheduler=scheduler, criterion=vae_loss_function,
        train_loader=train_loader, valid_loader=valid_loader, num_epochs=config["num_epochs"], savepath=config["savepath"],
        writer=writer
    )

****images.shape=torch.Size([256, 3, 64, 64])
x Input shape: torch.Size([256, 3, 64, 64])
x_enc Input shape: torch.Size([256, 2048])


TypeError: cannot unpack non-iterable NoneType object

In [None]:
compute_image_size(config["img_size"], model.kernel_size, model.padding, model.stride)