Enrico Convento -- id:2023572

If you want to avoid to retrain the model load the files :


*   generator.pth
*   losses




# DCGAN

In [1]:
import torch
import torchvision
import matplotlib.pyplot as plt
import numpy as np
from torch import nn
import random
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from time import time

import pandas as pd # this module is useful to work with tabular data
import random # this module will be used to select random samples from a collection
import os # this module will be used just to create directories in the local filesystem
from tqdm import tqdm # this module is useful to plot progress bars

from torchvision.utils import make_grid

## Dataset

In [None]:
### Download the data and create dataset
data_dir = 'dataset'
dataset = torchvision.datasets.FashionMNIST(data_dir, train=True, download=True)


transform = transforms.Compose([
        transforms.Grayscale(num_output_channels=1), #reduce channels to 1
        transforms.ToTensor(), # img to tensor: channels × height × width
        transforms.Normalize((0.5,), (0.5,)) #Scale and translate the pixel values from the range [0.0, 1.0] to [-1.0, 1.0]. The first argument is μ and the second argument is σ
        ])

# Set the train transform
dataset.transform = transform

use_gpu = True
# Check if the GPU is available
device = torch.device("cuda") if (torch.cuda.is_available() and use_gpu) else torch.device("cpu")
if use_gpu:
  print(f"Training device: {device}: {torch.cuda.get_device_name(0)}")
else:
  print(f'Training device: {device}')


## Generator

In [3]:
class Generator(nn.Module):
    def __init__(self, latent_dim=100, batchnorm=True):
        """A generator for mapping a latent space to a sample space.
        The sample space for this generator is single-channel, 28x28 images
        with pixel intensity ranging from -1 to +1.
        Args:
            latent_dim (int): latent dimension ("noise vector")
            batchnorm (bool): Whether or not to use batch normalization
        """
        super(Generator, self).__init__()
        self.latent_dim = latent_dim
        self.batchnorm = batchnorm
        self._init_modules()

    def _init_modules(self):
        """Initialize the modules."""
        # Project the input
        self.linear1 = nn.Linear(self.latent_dim, 256*7*7)
        self.bn1d1 = nn.BatchNorm1d(256*7*7) if self.batchnorm else None
        self.leaky_relu = nn.LeakyReLU()

        # Convolutions
        self.conv1 = nn.ConvTranspose2d(
                in_channels=256,
                out_channels=128,
                kernel_size=5,
                stride=1,
                padding=2
                )
        self.bn2d1 = nn.BatchNorm2d(128) if self.batchnorm else None
        
        self.conv2 = nn.ConvTranspose2d(
                in_channels=128,
                out_channels=64,
                kernel_size=4,
                stride=2,
                padding=1
                ) # used for upscaling the image
        self.bn2d2 = nn.BatchNorm2d(64) if self.batchnorm else None

        self.conv3 = nn.ConvTranspose2d(
                in_channels=64,
                out_channels=1,
                kernel_size=4,
                stride=2,
                padding=1
              )
        self.tanh = nn.Tanh() # rescaling our images to the range [-1, 1], so our generator output activation should reflect that.


    def forward(self, input_tensor):
        """Forward pass; map latent vectors to samples."""
        intermediate = self.linear1(input_tensor)
        intermediate = self.bn1d1(intermediate)
        intermediate = self.leaky_relu(intermediate)

        intermediate = intermediate.view((-1, 256, 7, 7)) #reshape the Tensor operation “view”
        intermediate = self.conv1(intermediate)
        if self.batchnorm:
            intermediate = self.bn2d1(intermediate)
        intermediate = self.leaky_relu(intermediate)

        intermediate = self.conv2(intermediate)
        if self.batchnorm:
            intermediate = self.bn2d2(intermediate)
        intermediate = self.leaky_relu(intermediate)

        intermediate = self.conv3(intermediate)
        output_tensor = self.tanh(intermediate)
        return output_tensor

## Discriminator

In [4]:
class Discriminator(nn.Module):
    def __init__(self):
        """A discriminator for discerning real from generated images.
        Images must be single-channel and 28x28 pixels.
        Output activation is Sigmoid.
        """
        super(Discriminator, self).__init__()
        self._init_modules()

    def _init_modules(self):
        """Initialize the modules."""
        self.conv1 = nn.Conv2d(
                in_channels=1,
                out_channels=64,
                kernel_size=5,
                stride=2,
                padding=2)
        self.leaky_relu = nn.LeakyReLU()
        self.dropout_2d = nn.Dropout2d(0.3)

        self.conv2 = nn.Conv2d(
                in_channels=64,
                out_channels=128,
                kernel_size=5,
                stride=2,
                padding=2)

        self.linear1 = nn.Linear(128*7*7, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_tensor):
        """Forward pass; map samples to confidence they are real [0, 1]."""
        intermediate = self.conv1(input_tensor)
        intermediate = self.leaky_relu(intermediate)
        intermediate = self.dropout_2d(intermediate)

        intermediate = self.conv2(intermediate)
        intermediate = self.leaky_relu(intermediate)
        intermediate = self.dropout_2d(intermediate)

        intermediate = intermediate.view((-1, 128*7*7))
        intermediate = self.linear1(intermediate)
        output_tensor = self.sigmoid(intermediate)

        return output_tensor

## Training

In [5]:
class Training():
    def __init__(self, latent_dim, noise_fn, dataloader,
                 batch_size=32, device='cpu', lr_d=1e-3, lr_g=2e-4):
        """
        Args:
            generator: a Ganerator network
            discriminator: A Discriminator network
            noise_fn: function f(num: int) -> pytorch tensor, (latent vectors)
            dataloader: a pytorch dataloader for loading images
            batch_size: training batch size. Must match that of dataloader
            device: cpu or CUDA
            lr_d: learning rate for the discriminator
            lr_g: learning rate for the generator
        """
        self.generator = Generator(latent_dim).to(device)
        self.discriminator = Discriminator().to(device)
        self.noise_fn = noise_fn
        self.dataloader = dataloader
        self.batch_size = batch_size
        self.device = device
        self.criterion = nn.BCELoss()
        self.optim_d = optim.Adam(self.discriminator.parameters(),
                                  lr=lr_d, betas=(0.5, 0.999))
        self.optim_g = optim.Adam(self.generator.parameters(),
                                  lr=lr_g, betas=(0.5, 0.999))
        self.losses = []
        '''
        Targets for training, set to the specified device. Remember, the 
        Discriminator is trying to classify real samples as 1 and generated 
        samples as 0, while the Generator is trying to get the the Discriminator 
        to misclassify generated samples as 1. 
        '''
        self.target_ones = torch.ones((batch_size, 1), device=device)
        self.target_zeros = torch.zeros((batch_size, 1), device=device)
    def generate_samples(self, latent_vec=None, num=None):
        """Sample images from the generator.
        Images are returned as a 4D tensor of values between -1 and 1.
        Dimensions are (number, channels, height, width). Returns the tensor
        on cpu.
        Args:
            latent_vec: A pytorch latent vector or None
            num: The number of samples to generate if latent_vec is None
        If latent_vec and num are None then use self.batch_size
        random latent vectors.
        """
        num = self.batch_size if num is None else num
        latent_vec = self.noise_fn(num) if latent_vec is None else latent_vec
        with torch.no_grad():
            samples = self.generator(latent_vec)
        samples = samples.cpu()  # move images to cpu
        return samples

    def train_step_generator(self):
        """Train the generator one step and return the loss."""
        self.generator.zero_grad() # Clear the generator’s gradient
 
        latent_vec = self.noise_fn(self.batch_size)  # Get a batch of latent vectors
        generated = self.generator(latent_vec) # use them to generate samples
        classifications = self.discriminator(generated) # discriminate how realistic each samples is
        loss = self.criterion(classifications, self.target_ones)
        loss.backward()
        self.optim_g.step() #generator params update
        return loss.item() # item method, returning a float instead of a tensor

    def train_step_discriminator(self, real_samples):
        """Train the discriminator one step and return the losses."""
        self.discriminator.zero_grad()

        # real samples
        pred_real = self.discriminator(real_samples)
        # loss_real is the Discriminator’s loss for the real samples 
        # loss_real = self.criterion(pred_real, self.target_ones)
        loss_real = self.criterion(pred_real, torch.ones((pred_real.shape[0], 1), device=device))
        # generated samples
        latent_vec = self.noise_fn(self.batch_size)
        with torch.no_grad(): # reduce computations since only the discriminator is being trained
            fake_samples = self.generator(latent_vec)
        pred_fake = self.discriminator(fake_samples)
        # loss_fake is the loss for the fake samples
        loss_fake = self.criterion(pred_fake, self.target_zeros)

        # combine
        loss = (loss_real + loss_fake) / 2
        loss.backward()
        self.optim_d.step()
        return loss_real.item(), loss_fake.item()

    def train_epoch(self, print_frequency=10, max_steps=0):
        """Train both networks for one epoch and return the losses.
        Args:
            print_frequency (int): print stats every `print_frequency` steps.
            max_steps (int): End epoch after `max_steps` steps, or set to 0
                             to do the full epoch.
        """
        g_loss_log, d_loss_real_log, d_loss_fake_log = [], [], []
        loss_g_running, loss_d_real_running, loss_d_fake_running = 0, 0, 0
        # assign the batch of images tensor to real_samples, and ignore the labels since don’t needed 
        batch = 0
        for real_samples, _ in self.dataloader: #  wrap the dataloader in an enumerator so that we can keep track of the batch number
            real_samples = real_samples.to(self.device)
            ldr_, ldf_ = self.train_step_discriminator(real_samples)
            loss_d_real_running += ldr_
            loss_d_fake_running += ldf_
            loss_g_running += self.train_step_generator()
            g_loss_log.append(loss_g_running / (batch+1))
            d_loss_real_log.append(loss_d_real_running / (batch+1))
            d_loss_fake_log.append(loss_d_fake_running / (batch+1))
            if print_frequency and (batch+1) % print_frequency == 0:
              #print(batch+1)
              print(f"{batch+1}/{len(self.dataloader)}:",
                    f" G={loss_g_running / (batch+1):.3f},",
                    f" Dr={loss_d_real_running / (batch+1):.3f},",
                    f" Df={loss_d_fake_running / (batch+1):.3f}")
            if max_steps and batch == max_steps:
                break

            batch+= 1
        if print_frequency:
            print()
        loss_g_running /= batch
        loss_d_real_running /= batch
        loss_d_fake_running /= batch
        return (loss_g_running, loss_d_real_running, loss_d_fake_running)
        #return (g_loss_log, d_loss_real_log, d_loss_fake_log)
    def train(self, epochs):
        start = time()
        for i in range(epochs):
            print(f"Epoch {i+1}; Elapsed time = {int(time() - start)}s")
            self.losses.append(self.train_epoch())

## Setup

In [6]:
BATCH_SIZE = 512
EPOCHS = 100
LATENT_DIM = 16
dataloader = DataLoader(dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=2
        )

noise_fn = lambda x: torch.randn((x, LATENT_DIM), device=device) # generating random, normally-distributed noise

trainer = Training(LATENT_DIM, noise_fn, dataloader, device=device, batch_size=BATCH_SIZE) #Build and train the GAN



### Training
!!! Run this cell only to perform the training !!!

It is possibile to load directly the models below

In [None]:
#trainer.train(EPOCHS)



### Loading models

In [9]:
## Saving model ##
#torch.save(trainer.generator.state_dict(), 'generator.pth')
#torch.save(trainer.discriminator.state_dict(), 'discriminator.pth')

## Load model ##
trainer.generator.load_state_dict(torch.load('generator.pth'))
#trainer.discriminator.load_state_dict(torch.load('discriminator.pth'))

<All keys matched successfully>

### Loading losses

In [10]:
import pickle


#with open("losses", "wb") as fp:   #Pickling
#   pickle.dump(trainer.losses, fp)

with open("losses", "rb") as fp:   # Unpickling
     losses = pickle.load(fp)

## Generate images

In [None]:
images = trainer.generate_samples()
ims = make_grid(images[0:49], normalize=True)
plt.figure(figsize=(20,10))
plt.imshow(ims.numpy().transpose((1,2,0)))
plt.show()

## Losses

In [None]:
g_loss, d_loss =[], []
for i in range(0, len(losses)):
  g_loss.append(losses[i][0])
  d_loss.append((losses[i][1] + losses[i][2])/2)
  
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(g_loss,label="G loss")
plt.plot(d_loss,label="D loss ")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()