<h1>Imports</h1>

In [None]:
from glob import glob
import os
import PIL
from tqdm import tqdm
import random
import warnings
import glob
from pickletools import optimize
import time
from __future__ import print_function

import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.mixture import GaussianMixture
from skimage import io, transform
from scipy.stats import multivariate_normal
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from mpl_toolkits.axes_grid1 import ImageGrid
from scipy.signal import savgol_filter

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.utils as vutils
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader,TensorDataset
from torchvision.io import read_image
from torchvision import transforms
from torchvision.utils import make_grid
import torchvision.datasets as datasets
from torch.autograd import Variable

warnings.filterwarnings("ignore")

<h1>VAE</h1>

In [None]:
## Global Variables
batch_size  = 100
device = 'cuda:1' if torch.cuda.is_available() else 'cpu'

<h3>Supporting functions</h3>

In [None]:
def show(img):
    npimg = img.numpy()
    fig = plt.imshow(np.transpose(npimg, (1,2,0)), interpolation='nearest')
    fig.axes.get_xaxis().set_visible(False)
    fig.axes.get_yaxis().set_visible(False)


## Get Variance of a dataloader
def get_variance(loader):
    num_batches,total,squared_sum = 0,0,0
    
    for data in loader:
        total += torch.mean(data,dim=[0,1,2,3])
        squared_sum += torch.mean(data**2,dim=[0,1,2,3])
        num_batches += 1
    variance = squared_sum/num_batches - (total/num_batches)**2
    return variance

def get_variance_batch(batch):
    
    total = torch.mean(data,dim=[0,1,2,3])
    squared_sum = torch.mean(data**2,dim=[0,1,2,3])
    variance = squared_sum - (total)**2
    return variance

<h3>Data Loaders</h3>

In [None]:
class Dsprites_Dataset(Dataset):
    def __init__(self, data , transform=None, target_transform=None):
        self.data = data
        
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return self.data.size()[0]

    def __getitem__(self, idx):
        
        return self.data[idx]

class CelebA_Dataset(Dataset):
    def __init__(self, data_list , img_dir, transform=None, target_transform=None):
        self.img_titles = data_list
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_titles)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_titles [idx])
        image = PIL.Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

<h3>Vanilla VAE class implementation</h3>

In [None]:
class VanillaVAE(nn.Module):


    def __init__(self,
                 in_channels,
                 latent_dim,
                 hidden_dims,
                 **kwargs) -> None:
        super(VanillaVAE, self).__init__()

        self.latent_dim = latent_dim
        self.in_channels = in_channels
        modules = []
        if hidden_dims is None:
            hidden_dims = [32, 64, 128, 256, 512]

        # Build Encoder
        for h_dim in hidden_dims:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, out_channels=h_dim,
                              kernel_size= 3, stride= 2, padding  = 1),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            in_channels = h_dim

        self.encoder = nn.Sequential(*modules)
        self.fc_mu = nn.Linear(hidden_dims[-1]*4, latent_dim)
        self.fc_var = nn.Linear(hidden_dims[-1]*4, latent_dim)


        # Build Decoder
        modules = []

        self.decoder_input = nn.Linear(latent_dim, hidden_dims[-1]*4)

        hidden_dims.reverse()

        for i in range(len(hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(hidden_dims[i],
                                       hidden_dims[i + 1],
                                       kernel_size=3,
                                       stride = 2,
                                       padding=1,
                                       output_padding=1),
                    nn.BatchNorm2d(hidden_dims[i + 1]),
                    nn.LeakyReLU())
            )



        self.decoder = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
                            nn.ConvTranspose2d(hidden_dims[-1],
                                               hidden_dims[-1],
                                               kernel_size=3,
                                               stride=2,
                                               padding=1,
                                               output_padding=1),
                            nn.BatchNorm2d(hidden_dims[-1]),
                            nn.LeakyReLU(),
                            nn.Conv2d(hidden_dims[-1], out_channels= self.in_channels,
                                      kernel_size= 3, padding= 1),
                            nn.Tanh())

    def encode(self, input) :
        
        result = self.encoder(input)
        
        result = torch.flatten(result, start_dim=1)
        
        mu = self.fc_mu(result)
        log_var = self.fc_var(result)

        return [mu, log_var]

    def decode(self, z):
        
        result = self.decoder_input(z)
        result = result.view(-1, 512, 2, 2)
        result = self.decoder(result)
        result = self.final_layer(result)
        return result

    def reparameterize(self, mu, logvar) :
        
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input, **kwargs):
        
        mu, log_var = self.encode(input)
        z = self.reparameterize(mu, log_var)
        return  [self.decode(z),input, mu, log_var]

    def loss_function(self,*args):
        recons = args[0]
        input_ = args[1]
        mu = args[2]
        log_var = args[3]
        variance = args[4]
        
        recons_loss = F.mse_loss(recons, input_)/(2*variance)
        kl_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)

        loss = recons_loss + kl_loss
        return [loss, recons_loss.detach(), kl_loss.detach()]

    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) :
       
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

<h3>Early Stopping</h3>

In [None]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

<h3>Train Function</h3>

In [None]:
def train(in_channels,latent_dim,train_dataloader,val_dataloader,checkpoint_path,train_variance,val_variance):
    model = VanillaVAE(in_channels=in_channels,latent_dim=latent_dim,hidden_dims=None).to(device)
    optimizer =  torch.optim.Adam(model.parameters(),lr = 1e-3)
    epochs = 10
    train_loss,train_kl,train_recons = [],[],[]
    earlystopping = EarlyStopping(patience=5, verbose=False, delta=0, path=checkpoint_path)
    for epoch in range(epochs):
        avg_loss,avg_kl,avg_recon = 0,0,0
        start = time.time()
        for idx,data in enumerate(train_dataloader):
            model.train()
            data = data.to(device)
            output,input_,mu,log_var = model(data)
            loss,recons_loss,kl_loss = model.loss_function(output,input_,mu,log_var,train_variance)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            avg_loss += loss.item()
            avg_kl += kl_loss.item()
            avg_recon += recons_loss.item()
        train_loss.append(avg_loss/idx),train_kl.append(avg_kl/idx),train_recons.append(avg_recon/idx)
        end = time.time()
        print(f' Time taken for epoch = {epoch} = {end-start}, Loss = {avg_loss/idx}, KL Loss = {avg_kl/idx}, recons_loss = {avg_recon/idx}')
        with torch.no_grad():
            for idx,data in enumerate(val_dataloader):
                model.eval()
                data = data.to(device)
                output,input_,mu,log_var = model(data)
                val_loss,val_recons_loss,val_kl_loss = model.loss_function(output,input_,mu,log_var,val_variance)
            earlystopping(val_loss,model)
        if epoch ==  epochs-1:
            torch.save(model.state_dict(),checkpoint_path)

    plt.figure(0)
    plt.plot(np.arange(len(train_loss)),train_loss,label = 'Train Loss')
    plt.plot(np.arange(len(train_kl)),train_kl,label = 'KL Loss')
    plt.plot(np.arange(len(train_recons)),train_loss,label = 'Train Recons')
    plt.show()
    plt.close()

<h3>Intialization, training and visualization for dsprite dataset</h3>

In [None]:
data = np.load('dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz')

imgs = torch.tensor(data['imgs'].reshape(-1,1,64,64),dtype = torch.float32)
imgs = imgs[torch.randperm(imgs.size()[0])]
train_len = int(0.9*imgs.size()[0])   ## 90-10 train val split
train_imgs,val_imgs = imgs[0:train_len],imgs[train_len:]

train_dataset = Dsprites_Dataset(train_imgs)
val_dataset = Dsprites_Dataset(val_imgs)
train_dataloader = DataLoader(train_dataset,batch_size = batch_size,shuffle=True)
val_dataloader = DataLoader(val_dataset,batch_size = batch_size,shuffle=True)

train_variance = get_variance(train_dataloader).item()
val_variance = get_variance(val_dataloader).item()


train(1,128,train_dataloader,val_dataloader,'/home/hiren/Apoorv Pandey/ADRL/Ass1/VanillaVAE_dsprites.pt',train_variance,val_variance)


## Visualize Generated Images of Dsprites
model = VanillaVAE(in_channels=1,latent_dim=128,hidden_dims = None).to(device)
model.load_state_dict(torch.load('/home/hiren/Apoorv Pandey/ADRL/Ass1/VanillaVAE_dsprites.pt'))

model.eval()

val_originals = next(iter(val_dataloader))
val_originals = val_originals.to(device)
val_recons,_,_,_ = model(val_originals)
show(make_grid(val_originals.cpu(),nrow=10))
show(make_grid(val_recons.cpu(),nrow=10))

<h3>Intialization, training and visualization for CelebA dataset</h3>

In [None]:
data = pd.read_csv('/home/hiren/Apoorv Pandey/ADRL/Ass1/list_eval_partition.csv')
train_data,val_data,test_data = data[data['partition']==0]['image_id'].to_list(),data[data['partition']==1]['image_id'].to_list(),\
                                data[data['partition']==2]['image_id'].to_list()


transform = transforms.Compose([transforms.ToTensor(),transforms.Resize((64,64))])
train_dataset = CelebA_Dataset(train_data,'/home/hiren/Apoorv Pandey/ADRL/Ass1/img_align_celeba/img_align_celeba',transform)
val_dataset = CelebA_Dataset(val_data,'/home/hiren/Apoorv Pandey/ADRL/Ass1/img_align_celeba/img_align_celeba',transform)
test_dataset = CelebA_Dataset(test_data,'/home/hiren/Apoorv Pandey/ADRL/Ass1/img_align_celeba/img_align_celeba')

train_loader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
val_loader = DataLoader(val_dataset,batch_size=batch_size,shuffle=True)
test_loader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True)

train_variance = get_variance(train_loader)
val_variance = get_variance(val_loader)
train(3,128,train_loader,val_loader,'/home/hiren/Apoorv Pandey/ADRL/Ass1/VanillaVAE_Celeb.pt',train_variance,val_variance) ## Train Celeb A dataset


## Visualize Generated Images of Celeb A
model = VanillaVAE(in_channels=3,latent_dim=128,hidden_dims=None).to(device)
model.load_state_dict(torch.load('/home/hiren/Apoorv Pandey/ADRL/Ass1/VanillaVAE_Celeb.pt'))

model.eval()

val_originals = next(iter(val_loader))
val_originals = val_originals.to(device)
val_recons,_,_,_ = model(val_originals)
show(make_grid(val_originals.cpu(),nrow=10))
show(make_grid(val_recons.cpu(),nrow=10))

<h3>Caluclating Marginal Likelihood Of data</h3>

In [None]:
train_loader = DataLoader(train_dataset,batch_size = 10,shuffle=False)


with torch.no_grad():
    model = VanillaVAE(in_channels=3,latent_dim=128,hidden_dims=None).to(device)
    model.load_state_dict(torch.load('/home/hiren/Apoorv Pandey/ADRL/Ass1/VanillaVAE_Celeb.pt'))

    marginal_likelihood = 0
    num_samples = 0
    L = 5 
    sampled_z_list = []
    data = next(iter(train_loader))
    data = data.to(device)
    outputs = model(data)[0]
    train_var = get_variance_batch(data).item()
    print(train_var)
    batch_size = data.size()[0]
    mu , logvar = model.encode(data)
    sampled_z = torch.zeros(L,batch_size,128)
    for i in range(L):
        sampled_z[i] = model.reparameterize(mu,logvar)

    sampled_z = sampled_z.view(batch_size*L,-1)
    sampled_z_np = sampled_z.numpy()
    sampled_z_pca = PCA(n_components=4).fit_transform(sampled_z_np)
    posterior_density = GaussianMixture(n_components=32, random_state=0).fit(sampled_z_pca)
    
    sampled_z_new = torch.zeros(L,batch_size,128)
    sampled_z_new_list = []
    for i in range(L):
        sampled_z_new[i] = model.reparameterize(mu,logvar)

    sampled_z_new = sampled_z_new.reshape(batch_size*L,-1)
    sampled_z_new_list.append(sampled_z_new.cpu())
    sampled_z_new = torch.cat(sampled_z_new_list,dim=0)
    sampled_z_new_np = sampled_z_new.numpy()
    sampled_z_new_pca = PCA(n_components=4).fit_transform(sampled_z_new_np)
    
    outputs = model.decode(sampled_z_new.to(device))
    print(outputs.size())
    log_like = posterior_density.score_samples(sampled_z_new_pca)  ## Fitted density likelihood
    likelihood = np.exp(log_like)
    
    data_np = data.cpu().numpy().reshape(batch_size,-1)
    outputs_np = outputs.cpu().numpy().reshape(batch_size*L,-1)
    overall_likelihood = 0 
    for i in tqdm(range(0,batch_size*L,L)):
        sample_likelihood = 0
        
        for j in range(L):
            current_sample = sampled_z_new_pca[i+j]
            
            prior = multivariate_normal.pdf(current_sample, mean=np.zeros_like(current_sample), cov = np.identity(current_sample.shape[0]))
            decoder_prob = multivariate_normal.pdf(outputs_np[i+j], mean=data_np[i//L], cov = train_var*np.identity(data_np[i//L].shape[0]))
            sample_likelihood += likelihood[i+j]/((prior*decoder_prob))
            print(f'Q score = { likelihood[i+j]}, prior = {prior}, decoder prob = {decoder_prob}')
        sample_likelihood = L/(sample_likelihood)
        print(f'Likelihood for {i} sample = {sample_likelihood}')
        overall_likelihood += sample_likelihood
    overall_likelihood = overall_likelihood/batch_size
    print(f'Marginal Likelihood = {overall_likelihood}')

<h1>VQ-VAE</h1>

<h3>Dataloaders</h3>

In [None]:
class TinyImageNetDataset(Dataset):
    def __init__(self, data_path_list, transform=None, target_transform=None):
        self.img_titles = data_path_list
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_titles)

    def __getitem__(self, idx):
        image = PIL.Image.open(self.img_titles[idx]).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image

class LatentDataset(Dataset):
    def __init__(self, data , transform=None, target_transform=None):
        self.data = data
        
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return self.data.size()[0]

    def __getitem__(self, idx):
        
        return self.data[idx]

<h3>VQVAE architecture</h3>

In [None]:
class VQVAE(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, commitment_cost):
        super(VQVAE, self).__init__()
        
        self.embedding_dim = embedding_dim
        self.codebook_dim = num_embeddings
        
        self.codebook = nn.Embedding(self.codebook_dim, self.embedding_dim)
        self.codebook.weight.data.uniform_(-1/self.codebook_dim, 1/self.codebook_dim)
        self.commitment_cost = commitment_cost

    def forward(self, inputs):
       
        inputs = inputs.permute(0, 2, 3, 1).contiguous()
        input_shape = inputs.shape
        
       
        flat_input = inputs.view(-1, self.embedding_dim)
        
        
        distances = (torch.sum(flat_input**2, dim=1, keepdim=True) 
                    + torch.sum(self.codebook.weight**2, dim=1)
                    - 2 * torch.matmul(flat_input, self.codebook.weight.t()))
            
       
        encoding_indices = torch.argmin(distances, dim=1).unsqueeze(1)
        encodings = torch.zeros(encoding_indices.shape[0], self.codebook_dim, device=inputs.device)
        encodings.scatter_(1, encoding_indices, 1)
        
        
        quantized = torch.matmul(encodings, self.codebook.weight).view(input_shape)
        
        
        e_latent_loss = F.mse_loss(quantized.detach(), inputs)
        q_latent_loss = F.mse_loss(quantized, inputs.detach())
        loss = q_latent_loss + self.commitment_cost * e_latent_loss
        
        quantized = inputs + (quantized - inputs).detach()  ## Passing encoder gradients directly to decoder
        
       
        return loss, quantized.permute(0, 3, 1, 2).contiguous()

<h3>Encoder</h3>

In [None]:
class Encoder(nn.Module):
    def __init__(self, in_channels, num_hiddens):
        super(Encoder, self).__init__()

        self.conv_1 = nn.Conv2d(in_channels=in_channels,
                                 out_channels=num_hiddens//2,
                                 kernel_size=4,
                                 stride=2, padding=1)
        self.conv_2 = nn.Conv2d(in_channels=num_hiddens//2,
                                 out_channels=num_hiddens,
                                 kernel_size=4,
                                 stride=2, padding=1)
        self.conv_3 = nn.Conv2d(in_channels=num_hiddens,
                                 out_channels=num_hiddens,
                                 kernel_size=3,
                                 stride=1, padding=1)
        

    def forward(self, inputs):
        x = self.conv_1(inputs)
        #print(f"encoder image shape afte 1st layer = {x.size()}")
        x = F.relu(x)
        
        x = self.conv_2(x)
        x = F.relu(x)
        #print(f"encoder image shape afte 2nd layer = {x.size()}")
        x = self.conv_3(x)
        #print(f"encoder image shape afte 3rd layer = {x.size()}")
        return F.relu(x)

<h3>Decoder</h3>

In [None]:
class Decoder(nn.Module):
    def __init__(self, in_channels, num_hiddens,final_out_channels=3):
        super(Decoder, self).__init__()
        self.out_channels = final_out_channels
        self.conv_1 = nn.Conv2d(in_channels=in_channels,
                                 out_channels=num_hiddens,
                                 kernel_size=3, 
                                 stride=1, padding=1)
        
        self.conv_trans_1 = nn.ConvTranspose2d(in_channels=num_hiddens, 
                                                out_channels=num_hiddens//2,
                                                kernel_size=4, 
                                                stride=2, padding=1)
        self.conv_trans_2 = nn.ConvTranspose2d(in_channels=num_hiddens//2, 
                                                out_channels=self.out_channels,
                                                kernel_size=4, 
                                                stride=2, padding=1)

    def forward(self, inputs):

        x = self.conv_1(inputs)
        x = F.relu(x)
        #print(f"decoder image shape afte 1st layer = {x.size()}")
        x = self.conv_trans_1(x)
        x = F.relu(x)
        #print(f"decoder image shape afte 2nd layer = {x.size()}")
        x = self.conv_trans_2(x)
        #print(f"decoder image shape afte 3rd layer = {x.size()}")
        return F.relu(x)

<h3>Model</h3>

In [None]:
class Model(nn.Module):
    def __init__(self, num_hiddens, 
                 num_embeddings, embedding_dim, commitment_cost, decay=0):
        super(Model, self).__init__()
        
        self.encoder = Encoder(3, num_hiddens)
        self.pre_vq_conv = nn.Conv2d(in_channels=num_hiddens, 
                                      out_channels=embedding_dim,
                                      kernel_size=1, 
                                      stride=1)
        
        self.vq_vae = VQVAE(num_embeddings, embedding_dim,
                                           commitment_cost)
        self.decoder = Decoder(embedding_dim,
                                num_hiddens)

    def forward(self, x):
        encoded = self.encoder(x)
        z = self.pre_vq_conv(encoded)
        loss, quantized = self.vq_vae(z)
        x_recon = self.decoder(quantized)

        return loss, x_recon, quantized,z

<h3>VanillaVAE_Conv</h3>

In [None]:
class VanillaVAE_Conv(nn.Module):


    def __init__(self,
                 in_channels,
                 latent_dim,num_hiddens,out_channels,
                 **kwargs) -> None:
        super(VanillaVAE_Conv, self).__init__()

        self.latent_dim = latent_dim
        self.in_channels = in_channels
        self.num_hiddens = num_hiddens
        self.encoder = Encoder(in_channels,num_hiddens)
        self.decoder = Decoder(2*in_channels,num_hiddens,out_channels)
        self.fc_mu = nn.Linear(2048,latent_dim)
        self.fc_var = nn.Linear(2048,latent_dim)
        self.decoder_input = nn.Linear(latent_dim, 2048)
    def encode(self, input) :
        
        result = self.encoder(input)
        result = torch.flatten(result, start_dim=1).to(device)
        
        mu = self.fc_mu(result)
        log_var = self.fc_var(result)

        return [mu, log_var]

    def decode(self, z):
        
        result = self.decoder_input(z)
        result = result.view(-1, 128 , 4, 4)
        result = self.decoder(result)
        return result

    def reparameterize(self, mu, logvar) :
        
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input, **kwargs):
        
        mu, log_var = self.encode(input)
        z = self.reparameterize(mu, log_var)
        return  [self.decode(z),input, mu, log_var]

    def loss_function(self,*args):
        recons = args[0]
        input_ = args[1]
        mu = args[2]
        log_var = args[3]
        variance = args[4]
        
        recons_loss = F.mse_loss(recons, input_)
        kl_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)

        loss = recons_loss+kl_loss
        return [loss, recons_loss.detach(), kl_loss.detach()]

    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) :
       
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

<h3>VanillaVAE_Linear</h3>

In [None]:
class VanillaVAE_Linear(nn.Module):


    def __init__(self,
                 hidden_dims,latent_dim) -> None:
        super(VanillaVAE_Linear, self).__init__()
        layers = []
        for i in range(len(hidden_dims)-1):
            layers.append(nn.Sequential(nn.Linear(hidden_dims[i],hidden_dims[i+1]),nn.ReLU()))
        self.encoder = nn.Sequential(*layers)
        hidden_dims.reverse()
        layers = []
        for i in range(len(hidden_dims)-1):
            layers.append(nn.Sequential(nn.Linear(hidden_dims[i],hidden_dims[i+1]),nn.ReLU()))
        self.decoder = nn.Sequential(*layers)
        hidden_dims.reverse()
        self.fc_mu = nn.Linear(hidden_dims[-1],latent_dim)
        self.fc_var = nn.Linear(hidden_dims[-1],latent_dim)
        self.decoder_input = nn.Linear(latent_dim,hidden_dims[-1])
    def encode(self, input) :
        #print(f'encoder input size = {input.size()}')
        result = self.encoder(input)
        #print(f'encoder output size = {result.size()}')
        mu = self.fc_mu(result)
        log_var = self.fc_var(result)
        #print(f'mu size = {mu.size()}')
        return [mu, log_var]

    def decode(self, z):
        
        result = F.relu(self.decoder_input(z))
        #print(f'decoder input size = {result.size()}')
        result = self.decoder(result)
        #print(f'decoder output size = {result.size()}')
        return result

    def reparameterize(self, mu, logvar) :
        
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return eps * std + mu

    def forward(self, input, **kwargs):
        
        mu, log_var = self.encode(input)
        z = self.reparameterize(mu, log_var)
        return  [self.decode(z),input, mu, log_var]

    def loss_function(self,*args):
        recons = args[0]
        input_ = args[1]
        mu = args[2]
        log_var = args[3]
        variance = args[4]
        
        recons_loss = F.mse_loss(recons, input_)
        kl_loss = torch.mean(-0.5 * torch.sum(1 + log_var - mu ** 2 - log_var.exp(), dim = 1), dim = 0)

        loss = recons_loss + kl_loss
        return [loss, recons_loss.detach(), kl_loss.detach()]

    def sample(self,
               num_samples:int,
               current_device: int, **kwargs) :
       
        z = torch.randn(num_samples,
                        self.latent_dim)

        z = z.to(current_device)

        samples = self.decode(z)
        return samples

<h3>Train Function</h3>

In [None]:
def train_VAE(model,in_channels,latent_dim,num_hiddens,train_dataloader,train_variance,path):
    
    optimizer =  torch.optim.AdamW(model.parameters(),lr = 1e-3)
    epochs = 10
    train_loss,train_kl,train_recons = [],[],[]
    for epoch in range(epochs):
        avg_loss,avg_kl,avg_recon = 0,0,0
        start = time.time()
        for idx,data in enumerate(train_dataloader):
            model.train()
            data = data.to(device)
            output,input_,mu,log_var = model(data)
            loss,recons_loss,kl_loss = model.loss_function(output,data,mu,log_var,train_variance)
            #loss = Variable(loss, requires_grad = True)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            avg_loss += loss.item()
            avg_kl += kl_loss.item()
            avg_recon += recons_loss.item()
            
            torch.cuda.empty_cache()
        train_loss.append(avg_loss/(idx+1)),train_kl.append(avg_kl/(idx+1)),train_recons.append(avg_recon/(idx+1))
        print(f' loss = {avg_loss/(idx+1)}')
    torch.save(model.state_dict(),path)
    plt.figure(0)
    plt.plot(np.arange(len(train_loss)),train_loss,label = 'Train Loss')
    plt.plot(np.arange(len(train_kl)),train_kl,label = 'KL Loss')
    plt.plot(np.arange(len(train_recons)),train_loss,label = 'Train Recons')
    plt.show()
    plt.close()

<h3>Intialization, Training and visualization for on tiny imagenet</h3>

In [None]:
train_images_path = glob.glob('/home/hiren/Apoorv Pandey/ADRL/Ass1/tiny-imagenet-200/train/*/images/*.JPEG')
val_images_path = glob.glob('/home/hiren/Apoorv Pandey/ADRL/Ass1/tiny-imagenet-200/val/images/*.JPEG')
test_images_path = glob.glob('/home/hiren/Apoorv Pandey/ADRL/Ass1/tiny-imagenet-200/test/images/*.JPEG')

transform = transforms.Compose([transforms.ToTensor()])
train_dataset = TinyImageNetDataset(train_images_path,transform)
val_dataset = TinyImageNetDataset(val_images_path,transform)
test_dataset = TinyImageNetDataset(test_images_path,transform)


train_loader = DataLoader(train_dataset,batch_size=100,shuffle=True)
val_loader = DataLoader(val_dataset,batch_size=100,shuffle=True)
test_loader = DataLoader(test_dataset,batch_size=100,shuffle=True)

train_variance = get_variance(train_loader)
val_variance =  get_variance(val_loader)

batch_size = 100
num_epochs = 10

num_hiddens = 128  ## channel width after passing through encoder

embedding_dim = 64
num_embeddings = 128  ## codebook dimension

commitment_cost = 0.25


learning_rate = 1e-3

model = Model(num_hiddens,num_embeddings, embedding_dim, commitment_cost).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

train_res_recon_error = []
for i in range(num_epochs):
    avg_recon_error,avg_perplexity = 0,0
    steps = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()

        vq_loss, data_recon, quantized,encoded = model(data)
        recon_error = F.mse_loss(data_recon, data)/(2*train_variance)
        loss = recon_error + vq_loss
        loss.backward()
        avg_recon_error += recon_error.item()
        optimizer.step()
        steps += 1
    train_res_recon_error.append(avg_recon_error/steps)

    print(f' iterations {i+1}')
    print(f'recon_error: %.3f {avg_recon_error/steps}')
    print()
torch.save(model.state_dict(),f'/home/hiren/Apoorv Pandey/ADRL/Ass1/VQ_VAE_TinyImageNet_{num_embeddings}.pt')

## Visualize images

model = Model(num_hiddens,128, embedding_dim, commitment_cost).to(device)
model.load_state_dict(torch.load(f'/home/hiren/Apoorv Pandey/ADRL/Ass1/VQ_VAE_TinyImageNet_{128}.pt'))

model.eval()

valid_originals = next(iter(val_loader))
valid_originals = valid_originals.to(device)

vq_output_eval = model.pre_vq_conv(model.encoder(valid_originals))
_, valid_quantize = model.vq_vae(vq_output_eval)
valid_reconstructions = model.decoder(valid_quantize)

<h3>Getting latent vectors output as a dataset</h3>

In [None]:
in_dim = embedding_dim
latent_dim = 512
model = Model(num_hiddens,num_embeddings, embedding_dim, commitment_cost).to(device)
model.load_state_dict(torch.load(f'/home/hiren/Apoorv Pandey/ADRL/Ass1/VQ_VAE_TinyImageNet_{num_embeddings}.pt'))
model.eval()
latent_vectors_list = []
encoded_vectors_list = []

### Getting latent vectors output as a dataset
with torch.no_grad():
    for data in train_loader:
        data = data.to(device)  ## get first batch 
        vq_loss, data_recon, latent_vectors,encoded = model(data)
        
        latent_vectors_list.append(latent_vectors.cpu())
        encoded_vectors_list.append(encoded.cpu())
        torch.cuda.empty_cache()
    latent_vectors = torch.cat(latent_vectors_list, dim=0)
    encoded_vectors = torch.cat(encoded_vectors_list, dim=0)
    latent_vectors_np = latent_vectors.cpu().numpy()
    encoded_vectors_np = encoded_vectors.cpu().numpy()
    



#gm_e = GaussianMixture(n_components=5, random_state=0).fit(encoded_vectors_pca)  ## Fitting a GMM on latent space of quantized outputs
in_channels = latent_vectors.size()[1]
## Fitting a VAE on latent space

train_quantized_dataset = LatentDataset(latent_vectors)
train_quantized_dataloader = DataLoader(train_quantized_dataset,batch_size=100,shuffle=True)
train_q_variance = get_variance(train_quantized_dataloader)

#train_VAE(in_channels,latent_dim,128,train_quantized_dataloader,train_q_variance,f'/home/hiren/Apoorv Pandey/ADRL/Ass1/Latent_VAE_Q_{latent_dim}.pt')  ## Fitting a VAE on latent space of quantized outputs


model = VanillaVAE_Conv(in_channels,latent_dim,num_hiddens,in_channels).to(device)

train_VAE(model,in_channels,latent_dim,128,train_quantized_dataloader,train_q_variance,f'/home/hiren/Apoorv Pandey/ADRL/Ass1/Latent_VAE_Q_{latent_dim}.pt') 


<h3>Visalize reconstructed images</h3>

In [None]:
model_latentVAE = VanillaVAE(64,512,128,64).to(device)
model_latentVAE.load_state_dict(torch.load(f'/home/hiren/Apoorv Pandey/ADRL/Ass1/Latent_VAE_Q_{latent_dim}.pt'))

train_originals = next(iter(train_loader))
train_originals = train_originals.to(device)
_, _,quantized,encoded = model(train_originals)
recons_quantized,_,_,_ = model_latentVAE(quantized)
train_reconstruction = model.decoder(recons_quantized)

show(make_grid(train_reconstruction.cpu(),nrow=10), )  

<h3>Fitting a GMM on latent space and sampling from it</h3>

In [None]:
model.eval()
data = next(iter(train_loader)).to(device)
vq_loss, data_recon, latent_vectors,encoded = model(data)
latent_vectors_np = latent_vectors.detach().cpu().numpy()
n = latent_vectors_np.shape[0]
gm_q = GaussianMixture(n_components=64, random_state=0).fit(latent_vectors_np.reshape(n,-1)) ## Fitting a GMM on latent space of quantized outputs


samples = gm_q.sample(100)## sampling from gmm
samples = torch.tensor(samples.reshape(-1,64,16,16),dtype=torch.float)
train_reconstruction = model.decoder(samples)

show(make_grid(train_reconstruction.cpu(),nrow=10), ) 

<h1>DCGAN</h1>

In [None]:
def weights_init(w):
    classname = w.__class__.__name__
    if classname.find('conv') != -1:
        nn.init.normal_(w.weight.data, 0.0, 0.02)
    elif classname.find('bn') != -1:
        nn.init.normal_(w.weight.data, 1.0, 0.02)
        nn.init.constant_(w.bias.data, 0)

In [None]:
class BitmojiDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.paths = glob(self.root_dir+"/*")

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_name = self.paths[idx]
        image = transform.resize(io.imread(img_name), (64,64,3)).reshape(3,64,64)

        return image

In [None]:
def full_scale_contrast_streching(image):
    a = np.min(image)
    b = np.max(image)
    P = 255/(b-a)
    L = -1*P*a
    contrast_enhanced_image = (P*image+L).astype('uint8')
    return contrast_enhanced_image

<h3>Generator</h3>

In [None]:
class Generator(nn.Module):
    def __init__(self, params):
        super().__init__()

        # Input is the latent vector Z.
        self.tconv1 = nn.ConvTranspose2d(params['nz'], params['ngf']*8,
            kernel_size=4, stride=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(params['ngf']*8)

        # Input Dimension: (ngf*8) x 4 x 4
        self.tconv2 = nn.ConvTranspose2d(params['ngf']*8, params['ngf']*4,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ngf']*4)

        # Input Dimension: (ngf*4) x 8 x 8
        self.tconv3 = nn.ConvTranspose2d(params['ngf']*4, params['ngf']*2,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ngf']*2)

        # Input Dimension: (ngf*2) x 16 x 16
        self.tconv4 = nn.ConvTranspose2d(params['ngf']*2, params['ngf'],
            4, 2, 1, bias=False)
        self.bn4 = nn.BatchNorm2d(params['ngf'])

        # Input Dimension: (ngf) * 32 * 32
        self.tconv5 = nn.ConvTranspose2d(params['ngf'], params['nc'],
            4, 2, 1, bias=False)
        #Output Dimension: (nc) x 64 x 64

    def forward(self, x):
        x = F.relu(self.bn1(self.tconv1(x)))
        x = F.relu(self.bn2(self.tconv2(x)))
        x = F.relu(self.bn3(self.tconv3(x)))
        x = F.relu(self.bn4(self.tconv4(x)))

        x = F.tanh(self.tconv5(x))

        return x

<h3>Discriminator</h3>

In [None]:
class Discriminator(nn.Module):
    def __init__(self, params):
        super().__init__()

        # Input Dimension: (nc) x 64 x 64
        self.conv1 = nn.Conv2d(params['nc'], params['ndf'],
            4, 2, 1, bias=False)

        # Input Dimension: (ndf) x 32 x 32
        self.conv2 = nn.Conv2d(params['ndf'], params['ndf']*2,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ndf']*2)

        # Input Dimension: (ndf*2) x 16 x 16
        self.conv3 = nn.Conv2d(params['ndf']*2, params['ndf']*4,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ndf']*4)

        # Input Dimension: (ndf*4) x 8 x 8
        self.conv4 = nn.Conv2d(params['ndf']*4, params['ndf']*8,
            4, 2, 1, bias=False)
        self.bn4 = nn.BatchNorm2d(params['ndf']*8)

        # Input Dimension: (ndf*8) x 4 x 4
        self.conv5 = nn.Conv2d(params['ndf']*8, 1, 4, 1, 0, bias=False)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), 0.2, True)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.2, True)
        x = F.leaky_relu(self.bn3(self.conv3(x)), 0.2, True)
        x = F.leaky_relu(self.bn4(self.conv4(x)), 0.2, True)

        x = F.sigmoid(self.conv5(x))

        return x

<h3>Hyperparameters</h3>

In [None]:
seed = 369
random.seed(seed)
torch.manual_seed(seed)
print("Random Seed: ", seed)

params = {
    "bsize" : 512,# Batch size during training.
    'imsize' : 128,# Spatial size of training images. All images will be resized to this size during preprocessing.
    'nc' : 3,# Number of channles in the training images. For coloured images this is 3.
    'nz' : 100,# Size of the Z latent vector (the input to the generator).
    'ngf' : 128,# Size of feature maps in the generator. The depth will be multiples of this.
    'ndf' : 128, # Size of features maps in the discriminator. The depth will be multiples of this.
    'nepochs' : 10,# Number of training epochs.
    'lr' : 0.0002,# Learning rate for optimizers
    'beta1' : 0.5,# Beta1 hyperparam for Adam optimizer
    'save_epoch' : 2}# Save step.

device = torch.device("cuda:0" if(torch.cuda.is_available()) else "cpu")

data = BitmojiDataset('bitmojis')
dataloader = DataLoader(data, params['bsize'], shuffle=True)

<h3>Intialization</h3>

In [None]:
netG = Generator(params).to(device)
netG.apply(weights_init)

netD = Discriminator(params).to(device)
netD.apply(weights_init)

criterion = nn.BCELoss()

fixed_noise = torch.randn(64, params['nz'], 1, 1, device=device)

real_label = 1
fake_label = 0

optimizerD = optim.Adam(netD.parameters(), lr=params['lr'], betas=(params['beta1'], 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=params['lr'], betas=(params['beta1'], 0.999))

img_list = []
G_losses = []
D_losses = []

<h3>Training</h3>

In [None]:
iters = 0

print("Starting Training Loop...")
print("-"*25)

for epoch in range(params['nepochs']):
    for i, data in enumerate(dataloader, 0):
        real_data = data.float().to(device)
        b_size = real_data.size(0)
        
        netD.zero_grad()
        label = torch.full((b_size, ), real_label, device=device).float()
        output = netD(real_data).view(-1)
        errD_real = criterion(output, label)
        errD_real.backward()
        D_x = output.mean().item()
        
        noise = torch.randn(b_size, params['nz'], 1, 1, device=device)
        fake_data = netG(noise)
        label.fill_(fake_label)
        output = netD(fake_data.detach()).view(-1)
        errD_fake = criterion(output, label)
        errD_fake.backward()
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake
        optimizerD.step()
        
        netG.zero_grad()
        label.fill_(real_label)
        output = netD(fake_data).view(-1)
        errG = criterion(output, label)
        errG.backward()

        D_G_z2 = output.mean().item()
        optimizerG.step()

        if i%50 == 0:
            print(torch.cuda.is_available())
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, params['nepochs'], i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        G_losses.append(errG.item())
        D_losses.append(errD.item())

        if (iters % 100 == 0) or ((epoch == params['nepochs']-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake_data = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake_data, padding=2, normalize=True))

        iters += 1

    if epoch % params['save_epoch'] == 0:
        torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'model_epoch_{}.pth'.format(epoch))

<h3>Saving plot and models</h3>

In [None]:
torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'model_final.pth')
            
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.savefig('DCGan models/train_plot.png')

<h3>Generating images for the grid and saving it</h3>

In [None]:
noise = torch.randn(100, params['nz'], 1, 1, device=device)
with torch.no_grad():
    generated_img = netG(noise).detach().cpu()
generated_img = generated_img.numpy().reshape(100,64,64,3)

In [None]:
fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_img]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('DCGan models/grid.png')

In [None]:
generated_images = []
dims = []
for i in range(10):
    noise = torch.randn(1,100, 1, 1).to(device)
    r = np.random.randint(100)
    while(r in dims):
        r = np.random.randint(100)
    dims.append(r)
    temp_noise = noise
    for j in np.linspace(0,1,10):
        temp_noise[0][r][0][0] = j
        generated_images.append(netG(temp_noise).cpu().detach().numpy())
generated_images = np.array(generated_images).reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_images]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('DCGan/latent_grid.png')

<h3>FID Calculations</h3>

In [None]:
#selecting 1000 random images
image_paths = np.array(glob('bitmojis/*'))
image_paths = image_paths[np.random.randint(0, len(image_paths), 1000)]
for i in image_paths:
    image = transform.resize(io.imread(i), (64,64,3))
    io.imsave("FID/real/"+i.split('/')[1], full_scale_contrast_streching(image))

#generating 1000 images
noise = torch.FloatTensor(np.random.randn(1000,100, 1, 1)).to(device)
model = Generator(params)
generated_img = netG(noise).cpu().detach().numpy().reshape(1000,64,64,3)
for i in range(len(generated_img)):
    io.imsave('FID/generated/'+str(i)+'.png',full_scale_contrast_streching(generated_img[i]))

In [None]:
!python -m pytorch_fid FID/real FID/generated

<h1>LSGAN</h1>

Since, only the loss function is changed, everything is same until Intialization part. Same functions and classed can be used.

<h3>Intialization</h3>

In [None]:
netG = Generator(params).to(device)
netG.apply(weights_init)

netD = Discriminator(params).to(device)
netD.apply(weights_init)

criterion = nn.MSELoss()

fixed_noise = torch.randn(64, params['nz'], 1, 1, device=device)

real_label = 1
fake_label = -1

optimizerD = optim.RMSprop(netD.parameters(), lr=params['lr'])#, betas=(params['beta1'], 0.999))
optimizerG = optim.RMSprop(netG.parameters(), lr=params['lr'])#, betas=(params['beta1'], 0.999))

img_list = []
G_losses = []
D_losses = []

In [None]:
iters = 0

print("Starting Training Loop...")
print("-"*25)

for epoch in range(params['nepochs']):
    for i, data in enumerate(dataloader, 0):
        real_data = data.float().to(device)
        b_size = real_data.size(0)
        
        netD.zero_grad()
        label = torch.full((b_size, ), real_label, device=device).float()
        output = netD(real_data).view(-1)
        errD_real = criterion(output, label)
        errD_real.backward()
        D_x = output.mean().item()
        
        noise = torch.randn(b_size, params['nz'], 1, 1, device=device)
        fake_data = netG(noise)
        label.fill_(fake_label)
        output = netD(fake_data.detach()).view(-1)
        errD_fake = criterion(output, label)
        errD_fake.backward()
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake
        optimizerD.step()
        
        netG.zero_grad()
        label.fill_(real_label)
        output = netD(fake_data).view(-1)
        errG = criterion(output, label)
        errG.backward()

        D_G_z2 = output.mean().item()
        optimizerG.step()

        if i%50 == 0:
            print(torch.cuda.is_available())
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, params['nepochs'], i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        G_losses.append(errG.item())
        D_losses.append(errD.item())

        if (iters % 100 == 0) or ((epoch == params['nepochs']-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake_data = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake_data, padding=2, normalize=True))

        iters += 1

    if epoch % params['save_epoch'] == 0:
        torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'lsgan/model_epoch_{}.pth'.format(epoch))

In [None]:
torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'lsgan/model_final.pth')
plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig('lsgan/train_plot_lsgan.png')

In [None]:
noise = torch.randn(100, params['nz'], 1, 1, device=device)
with torch.no_grad():
    generated_img = netG(noise).detach().cpu()
generated_img = generated_img.numpy().reshape(100,64,64,3)


fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_img]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('lsgan/grid.png')

In [None]:
generated_images = []
dims = []
for i in range(10):
    noise = torch.randn(1,100, 1, 1).to(device)
    r = np.random.randint(100)
    while(r in dims):
        r = np.random.randint(100)
    dims.append(r)
    temp_noise = noise
    for j in np.linspace(0,1,10):
        temp_noise[0][r][0][0] = j
        generated_images.append(netG(temp_noise).cpu().detach().numpy())
generated_images = np.array(generated_images).reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_images]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('lsgan/latent_grid.png')

<h1>BiGAN</h1>

Most things stay the same except that the discriminator accepts 4 channels now instead of 3. And we have a new encoder model.

<h3>Discriminator</h3>

In [None]:
# Define the Discriminator Network
class Discriminator(nn.Module):
    def __init__(self, params):
        super().__init__()

        # Input Dimension: (nc) x 64 x 64
        self.conv1 = nn.Conv2d(params['nc']+1, params['ndf'],
            4, 2, 1, bias=False)

        # Input Dimension: (ndf) x 32 x 32
        self.conv2 = nn.Conv2d(params['ndf'], params['ndf']*2,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ndf']*2)

        # Input Dimension: (ndf*2) x 16 x 16
        self.conv3 = nn.Conv2d(params['ndf']*2, params['ndf']*4,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ndf']*4)

        # Input Dimension: (ndf*4) x 8 x 8
        self.conv4 = nn.Conv2d(params['ndf']*4, params['ndf']*8,
            4, 1, 0, bias=False)
        self.bn4 = nn.BatchNorm2d(params['ndf']*8)

        # Input Dimension: (ndf*8) x 4 x 4
        self.conv5 = nn.Conv2d(params['ndf']*8, 1, 4, 2, 0, bias=False)

    def forward(self, x, z):
        x = torch.cat((x,z.view(x.shape[0],1,64,64)), dim=1)
        x = F.leaky_relu(self.conv1(x), 0.2, True)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.2, True)
        x = F.leaky_relu(self.bn3(self.conv3(x)), 0.2, True)
        x = F.leaky_relu(self.bn4(self.conv4(x)), 0.2, True)
        x = F.sigmoid(self.conv5(x))

        return x

<h3>Encoder</h3>

In [None]:
class Encoder(nn.Module):
    def __init__(self, params): 
        super().__init__()

        # Input Dimension: (nc) x 64 x 64
        self.conv1 = nn.Conv2d(params['nc'], params['ndf'],
            4, 2, 1, bias=False)

        # Input Dimension: (ndf) x 32 x 32
        self.conv2 = nn.Conv2d(params['ndf'], params['ndf']*2,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ndf']*2)

        # Input Dimension: (ndf*2) x 16 x 16
        self.conv3 = nn.Conv2d(params['ndf']*2, params['ndf']*4,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ndf']*4)

        # Input Dimension: (ndf*4) x 8 x 8
        self.conv4 = nn.Conv2d(params['ndf']*4, params['ndf']*8,
            4, 1, 0, bias=False)
        self.bn4 = nn.BatchNorm2d(params['ndf']*8)

        # Input Dimension: (ndf*8) x 4 x 4
        self.conv5 = nn.Conv2d(params['ndf']*8, 32*32, 4, 1, 0, bias=False)
        
    def forward(self, x):
        bsize = x.shape[0]
        x = F.leaky_relu(self.conv1(x), 0.2, True)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.2, True)
        x = F.leaky_relu(self.bn3(self.conv3(x)), 0.2, True)
        x = F.leaky_relu(self.bn4(self.conv4(x)), 0.2, True)
        x = F.relu(self.conv5(x))

        return x.view(bsize, 1, 64, 64)

<h3>Hyperparameters</h3>
A few new hyperparameters are introduced.

In [None]:
seed = 369
random.seed(seed)
torch.manual_seed(seed)
print("Random Seed: ", seed)

params = {
    "bsize" : 1024,# Batch size during training.
    'imsize' : 128,# Spatial size of training images. All images will be resized to this size during preprocessing.
    'nc' : 3,# Number of channles in the training images. For coloured images this is 3.
    'nz' : 100,# Size of the Z latent vector (the input to the generator).
    'ngf' : 128,# Size of feature maps in the generator. The depth will be multiples of this.
    'ndf' : 128, # Size of features maps in the discriminator. The depth will be multiples of this.
    'nepochs' : 5,# Number of training epochs.
    'lrg':0.002,
    'lrd' : 0.0002,#Learning rate for optimizers
    'beta1' : 0.5,# Beta1 hyperparam for Adam optimizer
    'save_epoch' : 2,# Save step.
    'num_classes':10,#number of classes
    'n_descriminator':5,
    'label_embeddings':100}#dimension of label embeddings

device = torch.device("cuda:0" if(torch.cuda.is_available()) else "cpu")
data = BitmojiDataset('bitmojis')
dataloader = DataLoader(data, 1024, shuffle=True)

In [None]:
netG = Generator(params).to(device)
netG.apply(weights_init)

netE = Encoder(params).to(device)
netE.apply(weights_init)

netD = Discriminator(params).to(device)
netD.apply(weights_init)

criterion = nn.BCELoss()

fixed_noise = torch.randn(100, params['nz'],1,1, device=device)
real_label = 1
fake_label = 0

optimizerD = optim.Adam(netD.parameters(), lr=params['lrd'], betas=(params['beta1'], 0.999))
optimizerG = optim.Adam(list(netE.parameters()) + list(netG.parameters()), lr=params['lrg'], betas=(params['beta1'], 0.999))
#optimizerE = optim.Adam(netE.parameters(), lr=params['lr'], betas=(params['beta1'], 0.999))

img_list = []
G_losses = []
D_losses = []
E_losses = []

In [None]:
iters = 0

print("Starting Training Loop...")
print("-"*25)

for epoch in range(params['nepochs'],params['nepochs']*2):
    for i, data in enumerate(dataloader, 0):
        real_data = data.float().to(device)
        b_size = real_data.size(0)

        noise = torch.randn(b_size, params['nz'],1,1, device=device)
        
        netD.zero_grad()
        label = torch.full((b_size,), real_label, device=device).float()
        real_z = netE(real_data)
        output_real = netD(real_data, real_z).view(-1).to(device)
        errD_real = criterion(output_real, label)
        errD_real.backward(retain_graph=True)
        D_x = output_real.mean().item()
        
        fake_data = netG(noise)
        label.fill_(fake_label)
        output = netD(fake_data.detach(), netE(fake_data)).view(-1).to(device)
        errD_fake = criterion(output, label)
        errD_fake.backward(retain_graph=True)
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake
        optimizerD.step()
        
        #if(epoch%params['n_descriminator']==0):

        netG.zero_grad()
        label.fill_(real_label)
        output = netD(fake_data, netE(fake_data)).view(-1)
        errG = criterion(output, label)
        errG.backward()
        
        optimizerG.step()
        D_G_z2 = output.mean().item()
        if i%50 == 0:
            print(torch.cuda.is_available())
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                    % (epoch+1, params['nepochs'], i, len(dataloader),
                        errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        G_losses.append(errG.item())
        D_losses.append(errD.item())

        if (iters % 100 == 0) or ((epoch == params['nepochs']-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake_data = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake_data, padding=2, normalize=True))

        iters += 1

    if epoch % params['save_epoch'] == 0:
        torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'bigan2/model_epoch_{}.pth'.format(epoch))

In [None]:
torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'bigan2/model_final.pth')

plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig('bigan2/training_plots.png')

In [None]:
noise = torch.randn(100, params['nz'],1,1, device=device)
labels = torch.randint(low = 0, high=10, size=(100,)).to(device)
with torch.no_grad():
    generated_img = netG(noise).detach().cpu()
generated_img = generated_img.numpy().reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_img]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)
plt.savefig('bigan2/grid.png')

In [None]:
generated_images = []
dims = []
for i in range(10):
    noise = torch.randn(1,100, 1, 1).to(device)
    r = np.random.randint(100)
    while(r in dims):
        r = np.random.randint(100)
    dims.append(r)
    temp_noise = noise
    for j in np.linspace(0,1,10):
        temp_noise[0][r][0][0] = j
        generated_images.append(netG(temp_noise).cpu().detach().numpy())
generated_images = np.array(generated_images).reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_images]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('bigan2/latent_grid.png')

<h1>WGAN</h1><h4>(EXTRA, using BitMoji dataset)</h4>


Everthing stays the same except discriminator has tanh output activation now and using RMSprop as optimizer.

<h3>Gradient penalty calculation function</h3>

In [None]:
def calculate_gradient_penalty(model, real_images, fake_images, device):
    """Calculates the gradient penalty loss for WGAN GP"""
    # Random weight term for interpolation between real and fake data
    alpha = torch.randn((real_images.size(0), 1, 1, 1), device=device)
    # Get random interpolation between real and fake data
    interpolates = (alpha * real_images + ((1 - alpha) * fake_images)).requires_grad_(True)

    model_interpolates = model(interpolates)
    grad_outputs = torch.ones(model_interpolates.size(), device=device, requires_grad=False)

    # Get gradient w.r.t. interpolates
    gradients = torch.autograd.grad(
        outputs=model_interpolates,
        inputs=interpolates,
        grad_outputs=grad_outputs,
        create_graph=True,
        retain_graph=True,
        only_inputs=True,
    )[0]
    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = torch.mean((gradients.norm(2, dim=1) - 1) ** 2)
    return gradient_penalty

<h3>Discriminator</h3>

In [None]:
class Discriminator(nn.Module):
    def __init__(self, params):
        super().__init__()

        # Input Dimension: (nc) x 64 x 64
        self.conv1 = nn.Conv2d(params['nc'], params['ndf'],
            4, 2, 1, bias=False)

        # Input Dimension: (ndf) x 32 x 32
        self.conv2 = nn.Conv2d(params['ndf'], params['ndf']*2,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ndf']*2)

        # Input Dimension: (ndf*2) x 16 x 16
        self.conv3 = nn.Conv2d(params['ndf']*2, params['ndf']*4,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ndf']*4)

        # Input Dimension: (ndf*4) x 8 x 8
        self.conv4 = nn.Conv2d(params['ndf']*4, params['ndf']*8,
            4, 2, 1, bias=False)
        self.bn4 = nn.BatchNorm2d(params['ndf']*8)

        # Input Dimension: (ndf*8) x 4 x 4
        self.conv5 = nn.Conv2d(params['ndf']*8, 1, 4, 1, 0, bias=False)

    def forward(self, x):
        x = F.leaky_relu(self.conv1(x), 0.2, True)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.2, True)
        x = F.leaky_relu(self.bn3(self.conv3(x)), 0.2, True)
        x = F.leaky_relu(self.bn4(self.conv4(x)), 0.2, True)

        x = F.tanh(self.conv5(x))

        return x

In [None]:
netG = Generator(params).to(device)
netG.apply(weights_init)

netD = Discriminator(params).to(device)
netD.apply(weights_init)

criterion = nn.BCELoss()

fixed_noise = torch.randn(64, params['nz'], 1, 1, device=device)

real_label = 1
fake_label = 0

optimizerD = optim.Adam(netD.parameters(), lr=params['lr'], betas=(params['beta1'], 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=params['lr'], betas=(params['beta1'], 0.999))

img_list = []
G_losses = []
D_losses = []

In [None]:
iters = 0

print("Starting Training Loop...")
print("-"*25)

for epoch in range(params['nepochs']):
    for i, data in enumerate(dataloader, 0):
        real_data = data.float().to(device)
        b_size = real_data.size(0)
        
        netD.zero_grad()
        label = torch.full((b_size, ), real_label, device=device).float()
        output = netD(real_data).view(-1)
        errD_real = torch.mean(output)
        #errD_real.backward()
        D_x = output.mean().item()
        
        noise = torch.randn(b_size, params['nz'], 1, 1, device=device)
        fake_data = netG(noise)
        label.fill_(fake_label)
        output = netD(fake_data.detach()).view(-1)
        errD_fake = torch.mean(output)
        #errD_fake.backward()
        D_G_z1 = output.mean().item()

        #gradient_penalty = calculate_gradient_penalty(netD, real_data, fake_data, device)
        errD = -errD_real + errD_fake #+ gradient_penalty * 10
        errD.backward()    
        optimizerD.step()
        for p in netD.parameters():
                p.data.clamp_(-params['clip'], params['clip'])
        
        if(epoch%params['n_descriminator']==0):
            netG.zero_grad()
            label.fill_(real_label)
            output = netD(fake_data).view(-1)
            errG = -torch.mean(output)
            errG.backward()
            optimizerG.step()

            D_G_z2 = output.mean().item()

            if i%50 == 0:
                print(torch.cuda.is_available())
                print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                      % (epoch, params['nepochs'], i, len(dataloader),
                         errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        G_losses.append(errG.item())
        D_losses.append(errD.item())

        iters += 1

    if epoch % params['save_epoch'] == 0:
        torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'WGan models/model_epoch_{}.pth'.format(epoch))

In [None]:
torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'wgan/model_final.pth')

plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig('wgan/training_plots.png')

In [None]:
noise = torch.randn(100, params['nz'],1,1, device=device)
labels = torch.randint(low = 0, high=10, size=(100,)).to(device)
with torch.no_grad():
    generated_img = netG(noise).detach().cpu()
generated_img = generated_img.numpy().reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_img]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)
plt.savefig('wgan/grid.png')

In [None]:
generated_images = []
dims = []
for i in range(10):
    noise = torch.randn(1,100, 1, 1).to(device)
    r = np.random.randint(100)
    while(r in dims):
        r = np.random.randint(100)
    dims.append(r)
    temp_noise = noise
    for j in np.linspace(0,1,10):
        temp_noise[0][r][0][0] = j
        generated_images.append(netG(temp_noise).cpu().detach().numpy())
generated_images = np.array(generated_images).reshape(100,64,64,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_images]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.savefig('wgan/latent_grid.png')

<h1>Conditional DCGAN</h1><h4>(Extra, didn't get perceptually good results)</h4>

<h3>DataLoader</h3>

In [None]:
class svnhDataLoader(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        X_train = np.load(root_dir+'/X_train.npy')
        y_train = np.load(root_dir+'/y_train.npy')
        X_test = np.load(root_dir+'/X_test.npy')
        y_test = np.load(root_dir+'/y_test.npy')
        self.X = np.concatenate([X_train, X_test], axis=3)
        self.X = self.X.reshape(99289, 3, 32, 32)
        self.y = np.append(y_train, y_test)
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):

        return self.X[idx], self.y[idx]

<h3>Generator</h3>

In [None]:
class Generator(nn.Module):
    def __init__(self, params):
        super().__init__()
        
        self.embed = nn.Embedding(params['num_classes'], params['label_embeddings'])
        # Input is the latent vector Z.
        self.tconv1 = nn.ConvTranspose2d(params['nz']+params['label_embeddings'], params['ngf']*8,
            kernel_size=4, stride=1, padding=0, bias=False)
        self.bn1 = nn.BatchNorm2d(params['ngf']*8)

        # Input Dimension: (ngf*8) x 4 x 4
        self.tconv2 = nn.ConvTranspose2d(params['ngf']*8, params['ngf']*4,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ngf']*4)

        # Input Dimension: (ngf*4) x 8 x 8
        self.tconv3 = nn.ConvTranspose2d(params['ngf']*4, params['ngf']*2,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ngf']*2)

        # Input Dimension: (ngf*2) x 16 x 16
        self.tconv4 = nn.ConvTranspose2d(params['ngf']*2, params['nc'],
            4, 2, 1, bias=False)
#        self.bn4 = nn.BatchNorm2d(params['ngf'])

        # Input Dimension: (ngf) * 32 * 32
        #self.tconv5 = nn.ConvTranspose2d(params['ngf'], params['nc'],
            #4, 2, 1, bias=False)
        #Output Dimension: (nc) x 64 x 64

    def forward(self, z,y):
        y=self.embed(y.long())
        z=torch.cat([z,y],1)
        z=z.view(-1,params['nz']+params['label_embeddings'],1,1)
        x = F.relu(self.bn1(self.tconv1(z)))
        x = F.relu(self.bn2(self.tconv2(x)))
        x = F.relu(self.bn3(self.tconv3(x)))
        #x = F.relu(self.bn4(self.tconv4(x)))

        x = F.tanh(self.tconv4(x))

        return x

<h3>Discriminator</h3>

In [None]:
class Discriminator(nn.Module):
    def __init__(self, params):
        super().__init__()

        # Input Dimension: (nc) x 64 x 64
        self.conv1 = nn.Conv2d(params['nc']+1, params['ndf'],
            4, 2, 1, bias=False)

        # Input Dimension: (ndf) x 32 x 32
        self.conv2 = nn.Conv2d(params['ndf'], params['ndf']*2,
            4, 2, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(params['ndf']*2)

        # Input Dimension: (ndf*2) x 16 x 16
        self.conv3 = nn.Conv2d(params['ndf']*2, params['ndf']*4,
            4, 2, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(params['ndf']*4)

        # Input Dimension: (ndf*4) x 8 x 8
        self.conv4 = nn.Conv2d(params['ndf']*4, 1,
            4, 1, 0, bias=False)
        #self.bn4 = nn.BatchNorm2d(params['ndf']*8)

        # Input Dimension: (ndf*8) x 4 x 4
        #self.conv5 = nn.Conv2d(params['ndf']*8, 1, 4, 1, 0, bias=False)
        self.embed = nn.Embedding(params['num_classes'],32*32)

        
    def forward(self, x, labels):
        embedding = self.embed(labels.long()).view(labels.shape[0], 1, 32,32)
        x = torch.cat([x,embedding], dim=1)
        x = F.leaky_relu(self.conv1(x), 0.2, True)
        x = F.leaky_relu(self.bn2(self.conv2(x)), 0.2, True)
        x = F.leaky_relu(self.bn3(self.conv3(x)), 0.2, True)
#        x = F.leaky_relu(self.bn4(self.conv4(x)), 0.2, True)

        x = F.sigmoid(self.conv4(x))

        return x

In [None]:
seed = 369
random.seed(seed)
torch.manual_seed(seed)
print("Random Seed: ", seed)

params = {
    "bsize" : 512,# Batch size during training.
    'imsize' : 128,# Spatial size of training images. All images will be resized to this size during preprocessing.
    'nc' : 3,# Number of channles in the training images. For coloured images this is 3.
    'nz' : 100,# Size of the Z latent vector (the input to the generator).
    'ngf' : 128,# Size of feature maps in the generator. The depth will be multiples of this.
    'ndf' : 128, # Size of features maps in the discriminator. The depth will be multiples of this.
    'nepochs' : 25,# Number of training epochs.
    'lrg' : 0.002,#Learning rate for generator
    'lrd' : 0.00002,#Learning rate for discriminator
    'beta1' : 0.5,# Beta1 hyperparam for Adam optimizer
    'save_epoch' : 2,# Save step.
    'num_classes':10,#number of classes
    'n_descriminator':5,
    'label_embeddings':50}#dimension of label embeddings

device = torch.device("cuda:0" if(torch.cuda.is_available()) else "cpu")

data = svnhDataLoader('svnh')
dataloader = DataLoader(data, 512, shuffle=True)

In [None]:
netG = Generator(params).to(device)
netG.apply(weights_init)

netD = Discriminator(params).to(device)
netD.apply(weights_init)

criterion = nn.BCELoss()

fixed_noise = torch.randn(100, params['nz'], device=device)
fixed_labels = torch.randint(low = 0, high=10, size=(100,), device= device) #to_categorical(torch.randint(low = 0, high=10, size=(100,)).cpu().detach(), params['num_classes']).to(device)
real_label = 1
fake_label = 0

optimizerD = optim.Adam(netD.parameters(), lr=params['lrd'], betas=(params['beta1'], 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=params['lrg'], betas=(params['beta1'], 0.999))

img_list = []
G_losses = []
D_losses = []

In [None]:
iters = 0

print("Starting Training Loop...")
print("-"*25)

for epoch in range(params['nepochs']):
    for i, data in enumerate(dataloader, 0):
        real_data,labels = data[0].float().to(device),data[1].float().to(device)#to_categorical(data[1].cpu().detach(), params['num_classes']).to(device)
        b_size = real_data.size(0)
        
        netD.zero_grad()
        label = torch.full((b_size,), real_label, device=device).float()
        output = netD(real_data, labels).view(-1)
        errD_real = criterion(output, label)
        errD_real.backward()
        D_x = output.mean().item()
        
        noise = torch.randn(b_size, params['nz'], device=device)
        fake_data = netG(noise, labels)
        label.fill_(fake_label)
        output = netD(fake_data.detach(), labels).view(-1).to(device)
        errD_fake = criterion(output, label)
        errD_fake.backward()
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake
        optimizerD.step()
        
        #if(epoch%params['n_descriminator']==0):

        netG.zero_grad()
        label.fill_(real_label)
        output = netD(fake_data, labels).view(-1)
        errG = criterion(output, label)
        errG.backward()

        D_G_z2 = output.mean().item()
        optimizerG.step()

        if i%50 == 0:
            print(torch.cuda.is_available())
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                    % (epoch, params['nepochs'], i, len(dataloader),
                        errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        G_losses.append(errG.item())
        D_losses.append(errD.item())

        if (iters % 100 == 0) or ((epoch == params['nepochs']-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake_data = netG(fixed_noise, fixed_labels).detach().cpu()
            img_list.append(vutils.make_grid(fake_data, padding=2, normalize=True))

        iters += 1

    if epoch % params['save_epoch'] == 0:
        torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'cDCGAN outputs/model_epoch_{}.pth'.format(epoch))

In [None]:
torch.save({
            'generator' : netG.state_dict(),
            'discriminator' : netD.state_dict(),
            'optimizerG' : optimizerG.state_dict(),
            'optimizerD' : optimizerD.state_dict(),
            'params' : params
            }, 'cDCGAN outputs/model_final.pth')

plt.figure(figsize=(10,5))
plt.title("Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.savefig('cDCGAN outputs/training_plots.png')

In [None]:
noise = torch.randn(100, params['nz'], device=device)
labels = torch.randint(low = 0, high=10, size=(100,)).to(device)

with torch.no_grad():
    generated_img = netG(noise, labels).detach().cpu()

generated_img = generated_img.numpy().reshape(100,32,32,3)

fig = plt.figure(figsize=(200., 200.))
grid = ImageGrid(fig, 111,  # similar to subplot(111)
                 nrows_ncols=(10, 10),  # creates 2x2 grid of axes
                 axes_pad=0.1,  # pad between axes in inch.
                 )

for ax, im in zip(grid, [np.clip(im,0,1) for im in generated_img]):
    # Iterating over the grid returns the Axes.
    ax.imshow(im)

plt.show()
plt.savefig('cDCGAN outputs/grid.png')