## vanilla GAN by Goodfellow et al., 2014
##### directories that belong with this JN = MNIST and runs


##### https://www.machinecurve.com/index.php/2021/07/17/building-a-simple-vanilla-gan-with-pytorch/

In [1]:
#Imports

import os
import torch
from torch import nn
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt
import uuid


In [2]:
#Configurable variables 

num_epochs = 5
noise_dimension = 50
batch_size = 128
train_on_gpu = True
unique_run_id = str(uuid.uuid4())
stats_after_batch = 50
optimizer_lr = 0.0002
optimizer_betas = (0.5, 0.999)
generator_output_img_shape = 28*28*1 #784


In [3]:
#Speed ups; makes the code run faster

torch.autograd.set_detect_anomaly(False)
torch.autograd.profiler.profile(False)
torch.autograd.profiler.emit_nvtx(False)
torch.backends.cudnn.benchmark = True


In [4]:
#Generator 

class Generator(nn.Module):
    '''
    vanilla GAN generator 
    '''
    def __init__(self,):
        super().__init__()
        self.layers = nn.Sequential(
        #1st upsampling
        #nn.Linear(noise_dimension, 128, bias=False),
        nn.ConvTranspose2d(noise_dimension, 128, kernel_size=2, stride=1, bias=False),
        nn.BatchNorm1d(128, 0.8),
        nn.LeakyReLU(0.25),
        #2nd upsampling
        nn.ConvTranspose2d(128, 256, kernel_size=2, stride=1, bias=False),
        nn.BatchNorm1d(256, 0.8),
        nn.LeakyReLU(0.25),
        #3rd upsampling
        nn.ConvTranspose2d(256, 512, kernel_size=2, stride=1, bias=False),
        nn.BatchNorm1d(512, 0.8),
        nn.LeakyReLU(0.25),
        #Final upsampling
        nn.ConvTranspose2d(512, generator_output_img_shape, kernel_size=2, stride=1, bias=False),
        nn.Tanh()
    )
    
    def forward(self, x):
        '''Forward pass'''
        return self.layers(x)



In [5]:
# # CCGAN generator
# class cont_cond_cnn_generator(nn.Module):
# #    def __init__(self, nz=128, dim_embed=DIM_EMBED, ngf = GEN_SIZE):
#     def __init__(self, nz=128, dim_embed=DIM_EMBED, ngf = GEN_SIZE, image_size = IMAGE_SIZE): #new
#         super(cont_cond_cnn_generator, self).__init__()
#         self.nz = nz
#         self.ngf = ngf
#         self.dim_embed = dim_embed
#         self.image_size = IMAGE_SIZE #new

#         self.linear = nn.Linear(nz, 4 * 4 * ngf * 8) #4*4*512

#         self.deconv1 = nn.ConvTranspose2d(ngf * 8, ngf * 8, kernel_size=4, stride=2, padding=1)# bias=bias) #h=2h 8
#         self.deconv2 = nn.ConvTranspose2d(ngf * 8, ngf * 4, kernel_size=4, stride=2, padding=1)# bias=bias) #h=2h 16
#         self.deconv3 = nn.ConvTranspose2d(ngf * 4, ngf * 2, kernel_size=4, stride=2, padding=1)# bias=bias) #h=2h 32
#         self.deconv4 = nn.ConvTranspose2d(ngf * 2, ngf, kernel_size=4, stride=2, padding=1)# bias=bias) #h=2h 64
#         self.condbn1 = ConditionalBatchNorm2d(ngf * 8, dim_embed)
#         self.condbn2 = ConditionalBatchNorm2d(ngf * 4, dim_embed)
#         self.condbn3 = ConditionalBatchNorm2d(ngf * 2, dim_embed)
#         self.condbn4 = ConditionalBatchNorm2d(ngf, dim_embed)
#         self.relu = nn.ReLU()

#         self.final_conv = nn.Sequential(
#             nn.Conv2d(ngf, ngf, kernel_size=3, stride=1, padding=1, bias=bias), #h=h
#             nn.BatchNorm2d(ngf),
#             nn.ReLU(),
#             nn.Conv2d(ngf, 1, kernel_size=3, stride=1, padding=1, bias=bias), #h=h
#             nn.Tanh()
#         )
        

In [6]:
#Discriminator

class Discriminator(nn.Module):
    '''
    vanilla GAN discriminator
    '''
    def __init__(self):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Conv2d(generator_output_img_shape, 1024, kernel_size=2, stride=1),
            nn.LeakyReLU(0.25),
            nn.Conv2d(1024, 512, kernel_size=2, stride=1),
            nn.LeakyReLU(0.25),
            nn.Conv2d(512, 256, kernel_size=2, stride=1),
            nn.LeakyReLU(0.25),
            nn.Conv2d(256, 1, kernel_size=2, stride=1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        '''Forward pass'''
        return self.layers(x)
    

In [7]:
# # CCGAN discriminator
# class cont_cond_cnn_discriminator(nn.Module):
# #    def __init__(self, dim_embed=DIM_EMBED, ndf = DISC_SIZE):
#     def __init__(self, dim_embed=DIM_EMBED, ndf = DISC_SIZE, image_size = IMAGE_SIZE): #new
#         super(cont_cond_cnn_discriminator, self).__init__()
#         self.ndf = ndf
#         self.dim_embed = dim_embed
#         self.image_size = IMAGE_SIZE #new

#         # FLAG commenting out to remove linear layer
# #         self.linearB = nn.Linear(image_size*image_size, 64*64, bias=False) #STP 10/8
        
#         # input is (nc) x 64 x 64
#         self.conv1 = nn.Conv2d(1, self.ndf, kernel_size=4, stride=2, padding=1, bias=bias) #h=h/2
#         self.conv2 = nn.BatchNorm2d(self.ndf)
#         self.conv3 = nn.LeakyReLU(0.2, inplace=True)
#         # input is ndf x 32 x 32
#         self.conv4 = nn.Conv2d(self.ndf, self.ndf*2, kernel_size=4, stride=2, padding=1, bias=bias) #h=h/2
#         self.conv5 = nn.BatchNorm2d(self.ndf*2)
#         self.conv6 = nn.LeakyReLU(0.2, inplace=True)
#         # input is (ndf*2) x 16 x 16
#         self.conv7 = nn.Conv2d(self.ndf*2, self.ndf*4, kernel_size=4, stride=2, padding=1, bias=bias) #h=h/2
#         self.conv8 = nn.BatchNorm2d(self.ndf*4)
#         self.conv9 = nn.LeakyReLU(0.2, inplace=True)
#         # input is (ndf*4) x 8 x 8
#         self.conv10 = nn.Conv2d(self.ndf*4, self.ndf*8, kernel_size=4, stride=2, padding=1, bias=bias) #h=h/2
#         # nn.BatchNorm2d(self.ndf*8),
#         self.conv11 = nn.LeakyReLU(0.2, inplace=True)

#         # nn.Conv2d(self.ndf*8, self.ndf*8, 3, stride=1, padding=1, bias=bias),  #h=h                                                                                                       
#         # nn.LeakyReLU(0.2, inplace=True) 
        

In [8]:
#Housekeeping functions (device, directory creation, image generation, saving models, printing training progress)

def get_device():
    '''Gets device based on settings and availability'''
    return torch.device("cuda:0" if torch.cuda.is_available() and train_on_gpu else "cpu")

def make_directory_for_run():
    """ Make a directory for this training run. """
    print(f'Preparing training run {unique_run_id}')
    if not os.path.exists('./runs'):
        os.mkdir('./runs')
    os.mkdir(f'./runs/{unique_run_id}')

def generate_image(generator, epoch = 0, batch = 0, device=get_device()):
    """ Generate subplots with generated examples. """
    images = []
    noise = generate_noise(batch_size, device=device)
    generator.eval()
    images = generator(noise)
    plt.figure(figsize=(10, 10))
    for i in range(16):
    # Get image
        image = images[i]
    # Convert image back onto CPU and reshape
        image = image.cpu().detach().numpy()
        image = np.reshape(image, (28, 28))
    # Plot
        plt.subplot(4, 4, i+1)
        plt.imshow(image, cmap='gray')
        plt.axis('off')
    if not os.path.exists(f'./runs/{unique_run_id}/images'):
        os.mkdir(f'./runs/{unique_run_id}/images')
    print(f'./runs/{unique_run_id}/images/epoch{epoch}_batch{batch}.jpg') 
    plt.savefig(f'./runs/{unique_run_id}/images/epoch{epoch}_batch{batch}.jpg')

def save_models(generator, discriminator, epoch):
    """ Save models at specific point in time. """
    torch.save(generator.state_dict(), f'./runs/{unique_run_id}/generator_{epoch}.pth')
    torch.save(discriminator.state_dict(), f'./runs/{unique_run_id}/discriminator_{epoch}.pth')
    
def print_training_progress(batch, generator_loss, discriminator_loss):
    """ Print training progress. """
    print('Losses after mini-batch %5d: generator %e, discriminator %e' % (batch, generator_loss, discriminator_loss))


In [9]:
#Forwards and backwards pass

def generate_noise(number_of_images = 1, noise_dimension = noise_dimension, device=None):
    """ Generate noise for number_of_images images, with a specific noise_dimension """
    return torch.randn(number_of_images, noise_dimension, device=device)


def efficient_zero_grad(model):
    """ 
    Apply zero_grad more efficiently
    Source: https://betterprogramming.pub/how-to-make-your-pytorch-code-run-faster-93079f3c1f7b
    """
    for param in model.parameters():
        param.grad = None

        
def forward_and_backward(model, data, loss_function, targets):
    """
    Perform forward and backward pass in a generic way. Returns loss value.
    """
    outputs = model(data)
    error = loss_function(outputs, targets)
    error.backward()
    return error.item()


In [10]:
#Preparing dataset

def prepare_dataset():
    """ Prepare dataset through DataLoader """
  # Prepare MNIST dataset
    dataset = MNIST(os.getcwd(), download=True, train=True, transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
  ]))
    
  # Batch and shuffle data with DataLoader
    trainloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
  # print("here's the trainloader:", trainloader)
  # Return dataset through DataLoader
    return trainloader


In [11]:
#Initialization functions (models, loss and optimizers)

def initialize_models(device = get_device()):
    """ Initialize Generator and Discriminator models """
    generator = Generator()
    discriminator = Discriminator()
  # Move models to specific device
    generator.to(device)
    discriminator.to(device)
  # Return models
    return generator, discriminator

def initialize_loss():
    """ Initialize loss function. """
    return nn.BCELoss()

def initialize_optimizers(generator, discriminator):
    """ Initialize optimizers for Generator and Discriminator. """
    generator_optimizer = torch.optim.AdamW(generator.parameters(), lr=optimizer_lr,betas=optimizer_betas)
    discriminator_optimizer = torch.optim.AdamW(discriminator.parameters(), lr=optimizer_lr,betas=optimizer_betas)
    return generator_optimizer, discriminator_optimizer


In [12]:
#Training step

def perform_train_step(generator, discriminator, real_data, loss_function, generator_optimizer, discriminator_optimizer, device = get_device()):
    """ Perform a single training step. """
  
  ## 1. PREPARATION = Set real and fake labels.
    real_label, fake_label = 1.0, 0.0
  # Get images on CPU or GPU as configured and available
  # Also set 'actual batch size', whih can be smaller than BATCH_SIZE in some cases.
    real_images = real_data[0].to(device)
    actual_batch_size = real_images.size(0)
    label = torch.full((actual_batch_size, 1), real_label, device=device)
    
  ## 2. TRAINING THE DISCRIMINATOR = Zero the gradients for discriminator
    efficient_zero_grad(discriminator)
  # Forward + backward on real images, reshaped
    real_images = real_images.view(real_images.size(0), -1)
    error_real_images = forward_and_backward(discriminator, real_images, loss_function, label)
  # Forward + backward on generated images
    noise = generate_noise(actual_batch_size, device=device)
    generated_images = generator(noise)
    label.fill_(fake_label)
    error_generated_images =forward_and_backward(discriminator, generated_images.detach(), loss_function, label)
  # Optim for discriminator
    discriminator_optimizer.step()
  
  ## 3. TRAINING THE GENERATOR = Forward + backward + optim for generator, including zero grad
    efficient_zero_grad(generator)
    label.fill_(real_label)
    error_generator = forward_and_backward(discriminator, generated_images, loss_function, label)
    generator_optimizer.step()
  
  ## 4. COMPUTING RESULTS = Compute loss values in floats for discriminator, which is joint loss.
    error_discriminator = error_real_images + error_generated_images
  # Return generator and discriminator loss so that it can be printed.

    return error_generator, error_discriminator


In [13]:
#Perform epoch

def perform_epoch(dataloader, generator, discriminator, loss_function, generator_optimizer, discriminator_optimizer, epoch):
    """ Perform a single epoch. """
    for batch_no, real_data in enumerate(dataloader, 0):
        #print(batch_no, real_data)
        # Perform training step
        generator_loss_val, discriminator_loss_val = perform_train_step(generator, discriminator, real_data, loss_function, generator_optimizer, discriminator_optimizer)
        # Print statistics and generate image after every n-th batch
        if batch_no % stats_after_batch == 0:
            #print(batch_no)
            print_training_progress(batch_no, generator_loss_val, discriminator_loss_val)
            generate_image(generator, epoch, batch_no)
        
    # Save models on epoch completion.
    save_models(generator, discriminator, epoch)
    # Clear memory after every epoch
    torch.cuda.empty_cache()


In [14]:
#Training

def train_dcgan():
    """ Train the DCGAN. """
  # Make directory for unique run
    make_directory_for_run()
  # Set fixed random number seed
    torch.manual_seed(42)
  # Get prepared dataset
    dataloader = prepare_dataset()
  # Initialize models
    generator, discriminator = initialize_models()
  # Initialize loss and optimizers
    loss_function = initialize_loss()
    generator_optimizer, discriminator_optimizer = initialize_optimizers(generator, discriminator)
  # Train the model
    for epoch in range(num_epochs):
        print(f'Starting epoch {epoch}...')
        perform_epoch(dataloader, generator, discriminator, loss_function, generator_optimizer, discriminator_optimizer, epoch)
  # Finished :-)
        print(f'Finished unique run {unique_run_id}')

    return dataloader, generator, discriminator, loss_function, generator_optimizer, discriminator_optimizer, epoch


if __name__ == '__main__':
    train_dcgan()


Preparing training run a62230b3-41e9-4cd5-a0bb-477c78aac25d


  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


Starting epoch 0...


RuntimeError: Expected 4-dimensional input for 4-dimensional weight [1024, 784, 2, 2], but got 2-dimensional input of size [128, 784] instead

### Edits to have it fit for stride=1 and conv2d layers

In [None]:
##Attempting to reshape the dataset with 4 dimensions since it has only two - [128, 784]

    # x = torch.randn(3, 4)
    # x = torch.unsqueeze(x, dim=-1)
    # x.shape

    # # Expected result
    # # torch.Size([3, 4, 1])

def prepare_dataset():
    """ Prepare dataset through DataLoader """
  # Prepare MNIST dataset
    dataset = MNIST(os.getcwd(), download=True, train=True, transform=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
  ]))
    

In [None]:
##Attempting to reshape the dataset with 4 dimensions since it has only two - [128, 784]

dataset = prepare_dataset()
dataset = torch.reshape(dataset, 784)
dataset.shape


### How to load ("see") saved models

#### https://pytorch.org/tutorials/beginner/saving_loading_models.html

In [None]:
# model = Discriminator()
# model.load_state_dict(torch.load('/home/stp4007/exampleGAN/runs/53f66de5-8c5f-4a42-a2fc-7a2a8d5094d2/discriminator_4.pth'))
# model.eval()


In [None]:
# model = Discriminator()
# model.load_state_dict(torch.load('/home/stp4007/exampleGAN/runs/53f66de5-8c5f-4a42-a2fc-7a2a8d5094d2/discriminator_4.pth'))
# model.eval()
