In [6]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torch.utils.data import DataLoader
from torchvision import transforms

# Utility For Model

In [31]:
def modelSummary(model, verbose=False):
    """Method provides a description of a model and its parameters

    Args:
        model (nn.Module): The model to summarize
        verbose (bool, optional): Describes the model with specification for each layers. Defaults to False.
    """
    if verbose:
        print(model)
    
    total_parameters = 0
        
    for name, param in model.named_parameters():
        num_params = param.size()[0]
        total_parameters += num_params
        if verbose:
            print(f"Layer: {name}")
            print(f"\tNumber of parameters: {num_params}")
            print(f"\tShape: {param.shape}")
    
    if total_parameters > 1e5:
        print(f"Total number of parameters: {total_parameters/1e6:.2f}M")
    else:
        print(f"Total number of parameters: {total_parameters/1e3:.2f}K") 

In [2]:
class ConvolutionBlock(nn.Module):
    def __init__(self, in_channel, out_channels, kernel_size, stride, padding):
        super(ConvolutionBlock, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(in_channel,
                      out_channels,
                      kernel_size,
                      stride,
                      padding,
                      bias=False), nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2, inplace=True))

    def forward(self, x):
        return self.layers(x)


class Convolution2dTransposeBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride,
                 padding):
        super(Convolution2dTransposeBlock, self).__init__()
        self.layers = nn.Sequential(
            nn.ConvTranspose2d(in_channels,
                               out_channels,
                               kernel_size,
                               stride,
                               padding,
                               bias=False), nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True))

    def forward(self, x):
        return self.layers(x)

# Training Utility

In [38]:
def train_epoch(discriminator: nn.Module, generator: nn.Module, device: torch.device, train_dataloader: DataLoader, training_params: dict, metrics: dict):
    """Method to train a model for one epoch

    Args:
        discriminator (nn.Module): discriminator to be trained
        generator (nn.Module) : generator to be trained
        device (str): device to be trained on
        train_dataloader (nn.data.DataLoader): Dataloader object to load batches of dataset
        training_params (dict): Dictionary of training parameters containing "batch_size", "loss_function"
                                "optimizer".
        metrics (dict): Dictionary of functional methods that would compute the metric value

    Returns:
        run_results (dict): Dictionary of metrics computed for the epoch
    """
    OPTIMIZER_DISC = training_params["optimizer_discriminator"]
    OPTIMIZER_GEN = training_params["optimizer_generator"]
    CRITERION = training_params['loss_function']

    discriminator = discriminator.to(device)
    generator = generator.to(device)
    discriminator.train()
    generator.train()

    # Dictionary holding result of this epoch
    run_results = dict()
    for metric in metrics:
        run_results[metric] = 0.0
    run_results["loss_discriminator"] = 0.0
    run_results["loss_generator"] = 0.0

    # Iterate over batches
    num_batches = 0
    for x, target in tqdm(train_dataloader, desc="Training"):
        num_batches += 1

        # Move tensors to device
        real = x.to(device)
        noise = torch.randn(
            training_params['batch_size'], training_params['noise_dims']).to(device)
        fake = generator(noise)

        # Train Discriminator
        discriminator_real_output = discriminator(real).view(-1)
        # Detach because we dont want to accumalate gradients in the generator
        discriminator_fake_output = discriminator(fake.detach()).view(-1)

        loss_discriminator_real = CRITERION(
            discriminator_real_output, torch.ones_like(discriminator_real_output))
        loss_discriminator_fake = CRITERION(
            discriminator_fake_output, torch.zeros_like(discriminator_fake_output))

        loss_discriminator = (loss_discriminator_fake +
                              loss_discriminator_real) / 2

        discriminator.zero_grad()
        loss_discriminator.backward()
        OPTIMIZER_DISC.step()

        # Train Generator
        discriminator_fake = discriminator(fake).view(-1)
        loss_generator = CRITERION(
            discriminator_fake, torch.ones_like(discriminator_fake))

        generator.zero_grad()
        loss_generator.backward()
        OPTIMIZER_GEN.step()

        # Update metrics
        run_results["loss_generator"] += loss_generator.detach().item()
        run_results['loss_discriminator'] += loss_discriminator.detach().item()

        for key, func in metrics.items():
            run_results[key] += func(output, input).detach().item()

        # Clean up memory
        del real
        del noise
        del fake
        del loss_discriminator
        del loss_discriminator_real
        del loss_discriminator_fake
        del discriminator_fake
        del loss_generator

    for key in run_results:
        run_results[key] /= num_batches

    return run_results

In [37]:
def evaluate_epoch(discriminator: nn.Module, generator: nn.Module, device: torch.device, validation_dataloader: DataLoader, training_params: dict, metrics: dict):
    """Method to evaluate a model for one epoch

    Args:
        discriminator (nn.Module): discriminator to be evaluate
        generator (nn.Module) : generator to be evaluate
        device (str): device to evaluate on
        validation_dataloader (DataLoader): DataLoader for evaluation
        training_params (dict): Dictionary of training parameters containing "batch_size", "loss_function"
                                "optimizer".
        metrics (dict): Dictionary of functional methods that would compute the metric value

    Returns:
        run_results (dict): Dictionary of metrics computed for the epoch
    """
    discriminator = discriminator.to(device)
    generator = generator.to(device)

    # Dictionary holding result of this epoch
    run_results = dict()
    for metric in metrics:
        run_results[metric] = 0.0
    run_results["loss_discriminator"] = 0.0
    run_results["loss_generator"] = 0.0

    # Iterate over batches
    with torch.no_grad():
        discriminator.eval()
        generator.eval()
        num_batches = 0

        for x, target in tqdm(validation_dataloader, desc='Validation'):
            # Move tensors to device
            real = x.to(device)
            noise = torch.randn(
                training_params['batch_size'], training_params['noise_dims']).to(device)
            fake = generator(noise)

            # Evaluate Discriminator
            discriminator_real_output = discriminator(real).view(-1)
            # Detach because we dont want to accumalate gradients in the generator
            discriminator_fake_output = discriminator(fake.detach()).view(-1)

            loss_discriminator_real = CRITERION(
                discriminator_real_output, torch.ones_like(discriminator_real_output))
            loss_discriminator_fake = CRITERION(
                discriminator_fake_output, torch.zeros_like(discriminator_fake_output))

            loss_discriminator = (
                loss_discriminator_fake + loss_discriminator_real) / 2

            # Evaluate Generator
            discriminator_fake = discriminator(fake).view(-1)
            loss_generator = CRITERION(
                discriminator_fake, torch.ones_like(discriminator_fake))

            # Update metrics
            run_results["loss_generator"] += loss_generator.detach().item()
            run_results['loss_discriminator'] += loss_discriminator.detach().item()

            for key, func in metrics.items():
                run_results[key] += func(output, input).detach().item()

            # Clean up memory
            del real
            del noise
            del fake
            del loss_discriminator
            del loss_discriminator_real
            del loss_discriminator_fake
            del discriminator_fake
            del loss_generator

    for key in run_results:
        run_results[key] /= num_batches

    return run_results

In [36]:
def save_plots(fixed_noise, model, device, epoch, training_params):
    """Function to save plots of the model

    Args:
        fixed_samples (torch.Tensor): Samples to be plotted
        fixed_noise (torch.Tensor): Noise to be plotted
        model (nn.Module): Model to be tested
        epoch (int): Epoch number
        SAVE_PATH (str): Path to save plots
    """
    SAMPLE_SIZE = training_params["sample_size"]
    SAVE_PATH = training_params["save_path"]
    model = model.to(device)

    with torch.no_grad():
        model.eval()
        fixed_noise = fixed_noise.to(device)

        generated_images = model.decoder(fixed_noise)

        _, axs = plt.subplots(10, 10, figsize=(30, 20))
        axs = axs.flatten()

        for image, ax in zip(generated_images, axs):
            ax.imshow(image.cpu().numpy().reshape(28, 28))
            ax.axis('off')

        plt.savefig(f"{SAVE_PATH}/generated_images/epoch{epoch + 1}.png")
        plt.close("all")

        # Clean up memory
        del generated_images
        del image
        del _, axs

In [23]:
def train_evaluate(discriminator: nn.Module, generator: nn.Module, device: torch.device, train_dataloader: DataLoader, validation_dataloader: DataLoader, training_params: dict, metrics: dict):
    """Function to train a model and provide statistics during training

    Args:
        model (nn.Module): Model to be trained
        device (torch.device): Device to be trained on
        train_dataset (DataLoader): Dataset to be trained on
        validation_dataset (DataLoader): Dataset to be evaluated on
        training_params (dict): Dictionary of training parameters containing "num_epochs", "batch_size", "loss_function",
                                                                             "save_path", "optimizer"
        metrics (dict): Dictionary of functional methods that would compute the metric value

    Returns:
        _type_: _description_
    """
    NUM_EPOCHS = training_params["num_epochs"]
    SAVE_PATH = training_params["save_path"]
    SAMPLE_SIZE = training_params["sample_size"]
    PLOT_EVERY = training_params["plot_every"]
    SAVE_EVERY = training_params["save_every"]
    LATENT_DIMS = training_params["latent_dims"]

    # Initialize metrics
    train_results = dict()
    train_results['loss_generator'] = np.empty(1)
    train_results['loss_discriminator'] = np.empty(1)
    evaluation_results = dict()
    evaluation_results['loss'] = np.empty(1)
    evaluation_results['loss'] = np.empty(1)

    for metric in metrics:
        train_results[metric] = np.empty(1)
        evaluation_results[metric] = np.empty(1)

    batch = next(iter(validation_dataloader))
    idxs = []
    for i in range(SAMPLE_SIZE):
        idx = torch.where(batch[1] == i)[0].squeeze()[0]
        idxs.append(idx.item())

    FIXED_SAMPLES = batch[0][idxs].to(device).detach()

    FIXED_NOISE = torch.normal(0, 1, size=(
        100, LATENT_DIMS), device=device).detach()

    # Clean up
    del idxs
    del batch

    for epoch in range(NUM_EPOCHS):
        start = time.time()

        print(f"=========== Epoch {epoch+1}/{NUM_EPOCHS} ===========")

        # Train Model
        diff_train = psutil.virtual_memory().percent
        epoch_train_results = train_epoch(
            model, device, train_dataloader, training_params, metrics)
        diff_train = psutil.virtual_memory().percent - diff_train

        # Evaluate Model
        diff_eval = psutil.virtual_memory().percent
        epoch_evaluation_results = evaluate_epoch(
            model, device, validation_dataloader, training_params, metrics)
        diff_eval = psutil.virtual_memory().percent - diff_eval

        diff_metric = psutil.virtual_memory().percent
        for metric in metrics:
            np.append(train_results[metric], epoch_train_results[metric])
            np.append(evaluation_results[metric],
                      epoch_evaluation_results[metric])
        diff_metric = psutil.virtual_memory().percent - diff_metric

        # Print results of epoch
        print(
            f"Completed Epoch {epoch+1}/{NUM_EPOCHS} in {(time.time() - start):.2f}s")
        print(f"Train Generator Loss: {epoch_train_results['loss_generator']:.2f} \t Train Discriminator Loss: {epoch_train_results['loss_discriminator']:.2f}" +
              f"\nValidation Generator Loss: {epoch_evaluation_results['loss_generator']:.2f} \t Validation Discriminator Loss: {epoch_evaluation_results['loss_discriminator']:.2f}")

        diff_plot = psutil.virtual_memory().percent
        # Plot results
        if epoch % PLOT_EVERY == 0:
            save_plots(FIXED_NOISE, generator, device, epoch, training_params)
        diff_plot = psutil.virtual_memory().percent - diff_plot

        print(f"Items cleaned up: {gc.collect()}")

        print(f"Memory used in Training: {diff_train:.2f}%")
        print(f"Memory used in Evaluation: {diff_eval:.2f}%")
        print(f"Memory used in Metrics: {diff_metric:.2f}%")
        print(f"Memory used in Plotting: {diff_plot:.2f}%")
        # Save model
        if epoch % SAVE_EVERY == 0 and epoch != 0:
            SAVE = f"{SAVE_PATH}_epoch{epoch + 1}.pt"
            torch.save(model.state_dict(), SAVE)

    return train_results, evaluation_results

# Model

In [3]:
class Discriminator(nn.Module):
    def __init__(self, input_channels, features_dim):
        super(Discriminator, self).__init__()

        self.conv1 = nn.Conv2d(input_channels,
                               features_dim,
                               kernel_size=4,
                               stride=2,
                               padding=1)

        self.conv2 = ConvolutionBlock(features_dim,
                                      features_dim * 2,
                                      kernel_size=4,
                                      stride=2,
                                      padding=1)
        self.conv3 = ConvolutionBlock(features_dim * 2,
                                      features_dim * 4,
                                      kernel_size=4,
                                      stride=2,
                                      padding=1)
        self.conv4 = ConvolutionBlock(features_dim * 4,
                                      features_dim * 8,
                                      kernel_size=4,
                                      stride=2,
                                      padding=1)

        self.conv5 = nn.Conv2d(features_dim * 8,
                               1,
                               kernel_size=4,
                               stride=1,
                               padding=0)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), 0.2)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = torch.sigmoid(x)
        return x


class Generator(nn.Module):
    def __init__(self, noise_channels, input_channels, features_gen_dim):
        super(Generator, self).__init__()
        self.conv2d1 = Convolution2dTransposeBlock(noise_channels,
                                                   features_gen_dim * 16,
                                                   kernel_size=4,
                                                   stride=1,
                                                   padding=0)
        self.conv2d2 = Convolution2dTransposeBlock(features_gen_dim * 16,
                                                   features_gen_dim * 8,
                                                   kernel_size=4,
                                                   stride=2,
                                                   padding=1)
        self.conv2d3 = Convolution2dTransposeBlock(features_gen_dim * 8,
                                                   features_gen_dim * 4,
                                                   kernel_size=4,
                                                   stride=2,
                                                   padding=1)
        self.conv2d4 = Convolution2dTransposeBlock(features_gen_dim * 4,
                                                   features_gen_dim * 2,
                                                   kernel_size=4,
                                                   stride=2,
                                                   padding=1)

        self.conv2d5 = nn.ConvTranspose2d(features_gen_dim * 2,
                                          input_channels,
                                          kernel_size=4,
                                          stride=2,
                                          padding=1)

    def forward(self, x):
        x = self.conv2d1(x)
        x = self.conv2d2(x)
        x = self.conv2d3(x)
        x = self.conv2d4(x)
        x = torch.tanh(self.conv2d5(x))
        return x

# Load Data

In [35]:
BATCH_SIZE = 256
IMG_CHANNELS = 3
IMAGE_SIZE = 64

dataset_transforms = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5 for _ in range(IMG_CHANNELS)], [
                         0.5 for _ in range(IMG_CHANNELS)])
])

train_dataset = DataLoader(torchvision.datasets.CelebA(
    root='./data',
    split='train',
    download=True,
    transform=dataset_transforms),
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True)

validation_dataset = DataLoader(torchvision.datasets.CelebA(
    root='./data',
    split='valid',
    download=True,
    transform=dataset_transforms),
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True)

RuntimeError: The daily quota of the file img_align_celeba.zip is exceeded and it can't be downloaded. This is a limitation of Google Drive and can only be overcome by trying again later.

In [33]:
LEARNING_RATE = 2e-4
FEATURES_DISC_DIM = 1024
FEATURES_GEN_DIM = 1024
FEATURES_NOISE_CHANNELS = 100
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [34]:
discriminator = Discriminator(input_channels=3, features_dim=FEATURES_DISC_DIM).to(device)
generator = Generator(noise_channels=FEATURES_NOISE_CHANNELS, input_channels=3, features_gen_dim=FEATURES_GEN_DIM).to(device)
    
modelSummary(discriminator)
modelSummary(generator)

Total number of parameters: 45.06K
Total number of parameters: 92.26K
