In [5]:
# we will train a conditional wgan on svhn dataset
# we will use the gradient penalty to stabilize the training


In [6]:

model_name = "wgan4"
#check model saving path is there

# imports

In [7]:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.transforms as transformations
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

ModuleNotFoundError: No module named 'torch'

In [None]:
import PIL.Image as Image
import torchvision.models as models
#numpy
import numpy as np

In [None]:
import tqdm
from ignite.metrics.gan import FID

# hyperparameters for the dataset and the MOdel

In [None]:
# Hyperparameters etc.
device = "cuda:1" if torch.cuda.is_available() else "cpu"
LEARNING_RATE = 1e-4
BATCH_SIZE = 3200
IMAGE_SIZE = 64
CHANNELS_IMG = 3
Z_DIM = 100
NUM_EPOCHS = 1
FEATURES_CRITIC = 16
FEATURES_GEN = 16
CRITIC_ITERATIONS = 5
LAMBDA_GP = 10

# prepare the dataset

### we will use SVHN dataset for this example
### we will combine the train, test and extra datasets to make a bigger dataset

In [None]:
transforms = transforms.Compose(
    [
        transforms.Resize(IMAGE_SIZE),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.5 for _ in range(CHANNELS_IMG)], [0.5 for _ in range(CHANNELS_IMG)]),
    ]
)

In [None]:
#get the dataset
#train part of svhn
train_dataset = datasets.SVHN(root="dataset_svhm/", split='train', transform=transforms, download=True)
#test part of svhn
test_dataset = datasets.SVHN(root="dataset_svhm/", split='test', transform=transforms, download=True)
#extra part of svhn
extra_dataset = datasets.SVHN(root="dataset_svhm/", split='extra', transform=transforms, download=True)
#concatenate the train, test and extra dataset
dataset = torch.utils.data.ConcatDataset([train_dataset, test_dataset, extra_dataset])
loader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
)


Using downloaded and verified file: dataset_svhm/train_32x32.mat
Using downloaded and verified file: dataset_svhm/test_32x32.mat
Using downloaded and verified file: dataset_svhm/extra_32x32.mat


In [None]:
#print the total number of images in the dataset
print(len(dataset))
# print the shape of the images
print(dataset[0][0].shape)
# print the label of the image
print(dataset[0][1])

630420
torch.Size([3, 64, 64])
1


# Model


## generator

In [None]:
class Generator(nn.Module):
    def __init__(self, channels_noise, channels_img, features_g):
        super(Generator, self).__init__()
        self.net = nn.Sequential(
            # Input: N x channels_noise x 1 x 1
            self.block_architecture_generator(channels_noise, features_g * 16, 4, 1, 0),  # img: 4x4
            self.block_architecture_generator(features_g * 16, features_g * 8, 4, 2, 1),  # img: 8x8
            self.block_architecture_generator(features_g * 8, features_g * 4, 4, 2, 1),  # img: 16x16
            self.block_architecture_generator(features_g * 4, features_g * 2, 4, 2, 1),  # img: 32x32
            nn.ConvTranspose2d(
                features_g * 2, channels_img, kernel_size=4, stride=2, padding=1
            ),
            # Output: N x channels_img x 64 x 64
            nn.Tanh(),
        )

    def block_architecture_generator(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.ConvTranspose2d(
                in_channels, out_channels, kernel_size, stride, padding, bias=False,
            ),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.net(x)

In [None]:
class Discriminator(nn.Module):
    def __init__(self, channels_img, features_d):
        super(Discriminator, self).__init__()
        self.disc = nn.Sequential(
            # input: N x channels_img x 64 x 64
            nn.Conv2d(channels_img, features_d, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            # _block(in_channels, out_channels, kernel_size, stride, padding)
            self.block_architecture_critic(features_d, features_d * 2, 4, 2, 1),
            self.block_architecture_critic(features_d * 2, features_d * 4, 4, 2, 1),
            self.block_architecture_critic(features_d * 4, features_d * 8, 4, 2, 1),
            # After all _block img output is 4x4 (Conv2d below makes into 1x1)
            nn.Conv2d(features_d * 8, 1, kernel_size=4, stride=2, padding=0),
        )

    def block_architecture_critic(self, in_channels, out_channels, kernel_size, stride, padding):
        return nn.Sequential(
            nn.Conv2d(
                in_channels, out_channels, kernel_size, stride, padding, bias=False,
            ),
            nn.InstanceNorm2d(out_channels, affine=True),
            nn.LeakyReLU(0.2),
        )

    def forward(self, x):
        return self.disc(x)

--------------------------------

# model initialization

In [None]:

def initialize_weights(model):
    # Initializes weights according to the DCGAN paper
    for m in model.modules():
        if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d, nn.BatchNorm2d)):
            nn.init.normal_(m.weight.data, 0.0, 0.02)



In [None]:
# initialize gen and critic
gen = Generator(Z_DIM, CHANNELS_IMG, FEATURES_GEN).to(device)
critic = Discriminator(CHANNELS_IMG, FEATURES_CRITIC).to(device)
initialize_weights(gen)
initialize_weights(critic)

### initialize optimizer

In [None]:
# initializate optimizer
opt_gen = optim.Adam(gen.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))
opt_critic = optim.Adam(critic.parameters(), lr=LEARNING_RATE, betas=(0.0, 0.9))

### inintialize tensorboard

In [None]:
# for tensorboard plotting
fixed_noise = torch.randn(100, Z_DIM, 1, 1).to(device)
#plot loss of generator and critic
writer_loss = SummaryWriter(f"runs/"+model_name+"/loss")
writer_real = SummaryWriter(f"logs/"+model_name+"/real")
writer_fake = SummaryWriter(f"logs/"+model_name+"/fake")

2022-10-03 11:42:08.493929: I tensorflow/core/util/util.cc:169] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


--------------------------------

### initialize FID wrapper

In [None]:
fid_score = FID()

In [None]:
#interpolate function to resize images to 299,299,3  which is the input size of inception network
def interpolate(batch):
    arr = []
    for img in batch:
        pil_img = transformations.ToPILImage()(img)
        resized_img = pil_img.resize((299,299), Image.BILINEAR)
        arr.append(transformations.ToTensor()(resized_img))
    return torch.stack(arr)

## start training

In [None]:


gen.train()
critic.train()

Discriminator(
  (disc): Sequential(
    (0): Conv2d(3, 16, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.2)
    (2): Sequential(
      (0): Conv2d(16, 32, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (3): Sequential(
      (0): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (4): Sequential(
      (0): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
      (1): InstanceNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=False)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (5): Conv2d(128, 1, kernel_size=(4, 4), stride=(2, 2))
  )
)

#### gradient penalty function for WGAN-GP

In [None]:
def gradient_penalty(critic, real, fake, device="cpu"):
    BATCH_SIZE, C, H, W = real.shape
    alpha = torch.rand((BATCH_SIZE, 1, 1, 1)).repeat(1, C, H, W).to(device)
    interpolated_images = real * alpha + fake * (1 - alpha)

    # Calculate critic scores
    mixed_scores = critic(interpolated_images)

    # Take the gradient of the scores with respect to the images
    gradient = torch.autograd.grad(
        inputs=interpolated_images,
        outputs=mixed_scores,
        grad_outputs=torch.ones_like(mixed_scores),
        create_graph=True,
        retain_graph=True,
    )[0]
    gradient = gradient.view(gradient.shape[0], -1)
    gradient_norm = gradient.norm(2, dim=1)
    gradient_penalty = torch.mean((gradient_norm - 1) ** 2)
    return gradient_penalty

In [None]:
step = 0

for epoch in range(NUM_EPOCHS):
    #we will track the total loss of the generator and critic for each epoch over the entire dataset
    #initialize the total loss of the generator and critic for each epoch to 0
    total_loss_gen = 0
    total_loss_critic = 0
    
    #move these to device

    #have no gradient for these losse
    # total_loss_gen = torch.no_grad()
    # total_loss_critic = torch.no_grad()
    
    # Target labels not needed! <3 unsupervised
    for batch_idx, (real, _) in enumerate(loader):
        batch_step = 0
        real = real.to(device)
        cur_batch_size = real.shape[0]

        # Train Critic: max E[critic(real)] - E[critic(fake)]
        # equivalent to minimizing the negative of that
        for _ in range(CRITIC_ITERATIONS):
            noise = torch.randn(cur_batch_size, Z_DIM, 1, 1).to(device)
            fake = gen(noise)
            critic_real = critic(real).reshape(-1)
            critic_fake = critic(fake).reshape(-1)
            gp = gradient_penalty(critic, real, fake, device=device)
            loss_critic = (
                -(torch.mean(critic_real) - torch.mean(critic_fake)) + LAMBDA_GP * gp
            )
            
            
            critic.zero_grad()
            loss_critic.backward(retain_graph=True)
            opt_critic.step()
            
        #trained critic
        

        # Train Generator: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        gen_fake = critic(fake).reshape(-1)
        loss_gen = -torch.mean(gen_fake)
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()
        
        
        
        
        
        #update the total loss of the generator and critic for each batch in the epoch
        #add just the value no gradients
        #have no gradient for these losse
        #add just the value no gradientsfrom the loss_gen tensor
      
        with torch.no_grad():
            total_loss_gen += loss_gen.item()
            total_loss_critic += loss_critic.item()
        
        # Print losses occasionally and print to tensorboard in a batch
        if batch_idx % 10 == 0 and batch_idx > 0:
            print(
                f"Epoch [{epoch}/{NUM_EPOCHS}] Batch {batch_idx}/{len(loader)} \
                  Loss D: {loss_critic:.4f}, loss G: {loss_gen:.4f}"
            )
            
            with torch.no_grad():
                
                #AVERAGE LOSS---
        
                #get average loss of generator and critic for each epoch
                avg_loss_gen = total_loss_gen / len(loader)
                avg_loss_critic = total_loss_critic / len(loader)
                #write loss to tensorboard
                writer_loss.add_scalar("Generator loss Epoch", avg_loss_gen, global_step=batch_step)
                writer_loss.add_scalar("Critic loss Epoch", avg_loss_critic, global_step=batch_step)
                
                #AVERAGE LOSS----
                
                #FID--
                #calculate FID score of this batch
                #update the fid_score with real and fake images
                fid_score.update(interpolate(real), interpolate(fake))
                computed_fid_score = fid_score.compute()
                print("FID score: ", computed_fid_score)
                writer_loss.add_scalar("FID Score", computed_fid_score, global_step=batch_step)
                #reset the fid score
                fid_score.reset()
                ##FID--
                
                batch_step += 1
            
            
        
        

        # Print losses occasionally and print to tensorboard per epoch
        # if batch_idx % 100 == 0 and batch_idx > 0:
        
            ## PRINT for few epochs %10
            # print(
            #     f"Epoch [{epoch}/{NUM_EPOCHS}]  \
            #         Loss D: {loss_critic:.4f}, loss G: {loss_gen:.4f}"
            # )
            
    #calculate the Fréchet Inception Distance (FID) to evaluate the performance of the generator
    

    with torch.no_grad():
        fake = gen(fixed_noise)
        # take out (up to) 32 examples
        img_grid_real = torchvision.utils.make_grid(real[:100], normalize=True)
        img_grid_fake = torchvision.utils.make_grid(fake[:100], normalize=True)

        writer_real.add_image("Real", img_grid_real, global_step=step)
        writer_fake.add_image("Fake", img_grid_fake, global_step=step)
        
        
        
        
        #BATCH LOSS-----
        
        #write gen_loss and critic_loss to tensorboard
        writer_loss.add_scalar("Generator loss Batch", loss_gen, global_step=step)
        writer_loss.add_scalar("Critic loss Batch", loss_critic, global_step=step)
        
        #BATCH LOSS-------
        
        #we will plot the gradient of critic output with respect to the input image
        #get the gradient of the critic output with respect to the input image
        gradient = torch.autograd.grad(
        inputs=real,
        outputs=critic_real,
        grad_outputs=torch.ones_like(critic_real),
        create_graph=True,
        retain_graph=True,
        )[0]
        #flatten the gradient
        gradient = gradient.view(gradient.shape[0], -1)
        #get the norm of the gradient
        gradient_norm = gradient.norm(2, dim=1)
        #write gradient norm to tensorboard
        writer_loss.add_scalar("Gradient norm Critic Real", gradient_norm.mean(), global_step=step)
        
        #----------------
        #we will plot the gradient of critic output with respect to the input image
        #get the gradient of the critic output with respect to the input image
        gradient = torch.autograd.grad(
        inputs=fake,
        outputs=critic_fake,
        grad_outputs=torch.ones_like(critic_fake),
        create_graph=True,
        retain_graph=True,
        )[0]
        #flatten the gradient
        gradient = gradient.view(gradient.shape[0], -1)
        #get the norm of the gradient
        gradient_norm = gradient.norm(2, dim=1)
        #write gradient norm to tensorboard
        writer_loss.add_scalar("Gradient norm Critic Fake", gradient_norm.mean(), global_step=step)
        
        #----------------
        #we will plot the gradient of genrator output with respect to the input 
        #we will plot the gradient of genrator output with respect to the input 
        #get the gradient of the generator output with respect to the input noise
        gradient = torch.autograd.grad(
        inputs=noise,
        outputs=gen_fake,
        grad_outputs=torch.ones_like(gen_fake),
        create_graph=True,
        retain_graph=True,
        )[0]
        #flatten the gradient
        gradient = gradient.view(gradient.shape[0], -1)
        #get the norm of the gradient
        gradient_norm = gradient.norm(2, dim=1)
        #write gradient norm to tensorboard
        writer_loss.add_scalar("Gradient norm Generator", gradient_norm.mean(), global_step=step)
        
        #----------------

        
        
        # we will plot the gradient penalty
        writer_loss.add_scalar("GP", gp, global_step=step)
        #we will analyze for vanishing gradient
        writer_loss.add_scalar("Critic Real", critic_real.mean(), global_step=step)
        writer_loss.add_scalar("Critic Fake", critic_fake.mean(), global_step=step)
        
        #get the gradient of the critic for the parameters weights of first layer
        #we will write the norm of the gardient of weights of the first layer of the critic
        for name, param in critic.named_parameters():
            if name == "disc.0.weight":
                writer_loss.add_scalar("Critic Gradient w.r.t 1st layer", param.grad.norm(), global_step=step)
            #also plot the norm of gradient of 2nd layer
            elif name == "disc.2.0.weight":
                writer_loss.add_scalar("Critic Gradient w.r.t 2nd layer", param.grad.norm(), global_step=step)
        
       
        
        
        

    step += 1
        
        #save the trained model
        #check if trained_model folder exists
    if not os.path.exists("trained_models"):
        os.mkdir("trained_models")
    
    #now trained_model folder exists
    if not os.path.exists("trained_models/"+model_name):
        os.mkdir("trained_models/"+model_name)
    #check if "trained_models/"+model_name     
    torch.save(gen.state_dict(), "trained_models/"+model_name+"/gen.pth")
    torch.save(critic.state_dict(), "trained_models/"+model_name+"/critic.pth")
    
    
    


NameError: name 'NUM_EPOCHS' is not defined

In [None]:
#save the tensorboard
writer_real.close()
writer_fake.close()
writer_loss.close()

In [None]:
#calculate the Fréchet Inception Distance (FID) to evaluate the performance of the generator

#(https://github.com/mseitzer/pytorch-fid)
#imports for FID


ModuleNotFoundError: No module named 'tensorflow'