###**Generative Adversarial Networks <br>**

(Riya Arora)


---

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [18]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.nn as nn
import pandas as pd
import os
import kagglehub

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda





---


##### **Part A: Implement standard GAN on MNIST data.**

**Dataset Preparation**

In [4]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = datasets.MNIST(
    root="./data", train=True, transform=transforms.ToTensor(), download=True
)
test_dataset = datasets.MNIST(
    root="./data", train=False, transform=transforms.ToTensor(), download=True
)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:01<00:00, 5.06MB/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 65.8kB/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:06<00:00, 245kB/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 4.64MB/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw






In [5]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 1 Batch is like
for images, labels in train_loader:
    print(images.shape)
    break

torch.Size([64, 1, 28, 28])


**Model Set-Up**

In [6]:
class Generator(nn.Module):
    def __init__(self, z_dim):
        super(Generator, self).__init__()

        # Using only fully connected layers for now
        # Hoping for results because MNIST dataset should be simple enough
        # Will try later with linear + deconvolutional layers
        self.fc1 = nn.Linear(z_dim, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 1024)
        self.bn3 = nn.BatchNorm1d(1024)
        self.fc4 = nn.Linear(1024, 28*28)
        self.tanh = nn.Tanh()

    def forward(self, z):
        z = torch.relu(self.bn1(self.fc1(z)))
        z = torch.relu(self.bn2(self.fc2(z)))
        z = torch.relu(self.bn3(self.fc3(z)))
        z = self.fc4(z)
        z = z.view(z.size(0), 1, 28, 28)
        return self.tanh(z)

In [7]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        # Using only fully-connected layers for now
        self.fc1 = nn.Linear(28*28, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 256)
        self.fc4 = nn.Linear(256, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = F.leaky_relu(self.fc1(x), 0.2)
        x = F.leaky_relu(self.fc2(x), 0.2)
        x = F.leaky_relu(self.fc3(x), 0.2)
        x = self.fc4(x)
        x = self.sigmoid(x)
        return x

In [8]:
# Weight initialization
def xavier_weights_init(m):
    if isinstance(m, nn. Linear):
        nn.init.xavier_normal_(m.weight)
        if m.bias is not None:
            nn.init.constant_(m.bias, 0)

In [9]:
# Simulating a run of input through the model (Not training yet)

z_dim = 100
batch_size = 64
generator = Generator(z_dim)
discriminator = Discriminator()

generator.apply(xavier_weights_init)
discriminator.apply(xavier_weights_init)

random_noise_input = torch.randn(batch_size, z_dim)
generated_images = generator(random_noise_input)
generated_images = generated_images.view(generated_images.size(0), -1)

real_images, _ = next(iter(train_loader))
real_images = real_images.view(real_images.size(0), -1)

real_output = discriminator(real_images)
fake_output = discriminator(generated_images)

print(real_output)
print(fake_output)

tensor([[0.4890],
        [0.5220],
        [0.4803],
        [0.5215],
        [0.5541],
        [0.5255],
        [0.5715],
        [0.4819],
        [0.5318],
        [0.5445],
        [0.5103],
        [0.5019],
        [0.5331],
        [0.5844],
        [0.5473],
        [0.5365],
        [0.5453],
        [0.6126],
        [0.5601],
        [0.5649],
        [0.4705],
        [0.5220],
        [0.4873],
        [0.5491],
        [0.5012],
        [0.5125],
        [0.4858],
        [0.5520],
        [0.5335],
        [0.4719],
        [0.4686],
        [0.5530],
        [0.4709],
        [0.5541],
        [0.5103],
        [0.5493],
        [0.5518],
        [0.5052],
        [0.5095],
        [0.4569],
        [0.5292],
        [0.5135],
        [0.4784],
        [0.5516],
        [0.5123],
        [0.4908],
        [0.5297],
        [0.4989],
        [0.5263],
        [0.5214],
        [0.5207],
        [0.4960],
        [0.5250],
        [0.5770],
        [0.5534],
        [0

**Introducing Loss Functions**

In [10]:
# Introducing generator loss function

z_dim = 100
batch_size = 64
bce_loss = nn.BCELoss()
real_label = 0.9 # Trying with smoothening
fake_label = 0.0

g1 = Generator(z_dim).to(device)
g1.apply(xavier_weights_init)
d1 = Discriminator().to(device)
d1.apply(xavier_weights_init)

real_images, _ = next(iter(train_loader))
real_images = real_images.to(device).view(real_images.size(0), -1)
real_output = d1(real_images)
d1_real_loss = bce_loss(real_output, torch.full((batch_size, 1), real_label, device = device))

random_noise_input = torch.randn(batch_size, z_dim, device = device)
generated_images = g1(random_noise_input)
generated_images = generated_images.view(generated_images.size(0), -1)
fake_output = d1(generated_images.detach())
d1_fake_loss = bce_loss(fake_output, torch.full((batch_size, 1), fake_label, device = device))

d1_total_loss = d1_real_loss + d1_fake_loss

In [11]:
# Introducing discriminator loss function

fake_output = d1(generated_images)
g1_total_loss = bce_loss(fake_output, torch.full((batch_size, 1), real_label, device = device))


**GAN Training**

In [36]:
# Setting the hyperparameters

z_dim = 100
batch_size = 64
lr = 0.0002
momentum = 0.5
epochs = 50

In [37]:
# Model, Optimizers and Loss

G = Generator(z_dim).to(device)
D = Discriminator().to(device)
G.apply(xavier_weights_init)
D.apply(xavier_weights_init)

G_optimizer = torch.optim.Adam(G.parameters(), lr=lr, betas=(momentum, 0.999))
D_optimizer = torch.optim.Adam(D.parameters(), lr=lr, betas=(momentum, 0.999))

bce_loss = nn.BCELoss()

In [38]:
# Metrics through the epochs
G_losses = []
D_losses = []
D_accuracies = []

In [15]:
# Saving and Loading Checkpoints

import torch

def save_checkpoint(epoch, g_model, d_model, g_optimizer, d_optimizer,checkpoint_path):
    checkpoint = {
        'epoch': epoch,
        'g_model_state_dict': g_model.state_dict(),
        'd_model_state_dict': d_model.state_dict(),
        'g_optimizer_state_dict': g_optimizer.state_dict(),
        'd_optimizer_state_dict': d_optimizer.state_dict(),
    }
    torch.save(checkpoint, checkpoint_path)
    print(f"Checkpoint saved at epoch {epoch} to {checkpoint_path}")

def load_checkpoint(checkpoint_path, g_model, d_model, g_optimizer, d_optimizer):
    checkpoint = torch.load(checkpoint_path)
    g_model.load_state_dict(checkpoint['g_model_state_dict'])
    d_model.load_state_dict(checkpoint['d_model_state_dict'])
    g_optimizer.load_state_dict(checkpoint['g_optimizer_state_dict'])
    d_optimizer.load_state_dict(checkpoint['d_optimizer_state_dict'])
    epoch = checkpoint['epoch']

    print(f"Checkpoint loaded from epoch {epoch} from {checkpoint_path}")
    return epoch

In [39]:
# Training

checkpoint_path = '/content/drive/MyDrive/gen-ai/gan-results/gan_ckpt.pth'
for epoch in range(epochs):
    for real_images, _ in train_loader:
        real_images = real_images.to(device).view(real_images.size(0), -1)

        real_labels = torch.full((real_images.size(0), 1), 0.9, device=device)
        fake_labels = torch.zeros(real_images.size(0), 1, device=device)

        # Generator part
        real_output = D(real_images)
        D_real_loss = bce_loss(real_output, real_labels)

        random_noise = torch.randn(real_images.size(0), z_dim, device=device)
        generated_images = G(random_noise)
        fake_output = D(generated_images.detach())
        D_fake_loss = bce_loss(fake_output, fake_labels)
        D_total_loss = D_real_loss + D_fake_loss

        D_optimizer.zero_grad()
        D_total_loss.backward()
        D_optimizer.step()

        real_pred = (real_output > 0.5).float()
        fake_pred = (fake_output < 0.5).float()
        D_accuracy = (real_pred.sum() + fake_pred.sum()) / (2 * real_images.size(0))

        D_losses.append(D_total_loss.item())
        D_accuracies.append(D_accuracy.item())

        # Generator part
        fake_labels.fill_(0.9)
        fake_output = D(generated_images)
        G_loss = bce_loss(fake_output, fake_labels)

        G_optimizer.zero_grad()
        G_loss.backward()
        G_optimizer.step()

        G_losses.append(G_loss.item())

    if (epoch % 10 == 0 or epoch == (epochs-1)):
            save_checkpoint(epoch, G, D, G_optimizer, D_optimizer, checkpoint_path)

    print(f"Epoch [{epoch+1}/{epochs}]  D Loss: {D_total_loss.item():.4f}  G Loss: {G_loss.item():.4f}")


Checkpoint saved at epoch 0 to /content/drive/MyDrive/gen-ai/gan-results/gan_ckpt.pth
Epoch [1/50]  D Loss: 0.6283  G Loss: 4.1495
Epoch [2/50]  D Loss: 0.4473  G Loss: 5.7977
Epoch [3/50]  D Loss: 0.5119  G Loss: 6.6272
Epoch [4/50]  D Loss: 0.6486  G Loss: 4.6833
Epoch [5/50]  D Loss: 0.4388  G Loss: 5.5219
Epoch [6/50]  D Loss: 0.6262  G Loss: 3.0481
Epoch [7/50]  D Loss: 0.6435  G Loss: 4.4768
Epoch [8/50]  D Loss: 0.5399  G Loss: 8.2189
Epoch [9/50]  D Loss: 0.5735  G Loss: 8.1047
Epoch [10/50]  D Loss: 0.5209  G Loss: 5.7884
Checkpoint saved at epoch 10 to /content/drive/MyDrive/gen-ai/gan-results/gan_ckpt.pth
Epoch [11/50]  D Loss: 0.7038  G Loss: 6.9928
Epoch [12/50]  D Loss: 0.5890  G Loss: 3.3204
Epoch [13/50]  D Loss: 0.4904  G Loss: 6.6650
Epoch [14/50]  D Loss: 0.5411  G Loss: 4.9197
Epoch [15/50]  D Loss: 0.5010  G Loss: 4.4561
Epoch [16/50]  D Loss: 0.6221  G Loss: 6.4596
Epoch [17/50]  D Loss: 0.7377  G Loss: 5.4798
Epoch [18/50]  D Loss: 0.7804  G Loss: 7.7102
Epoch [1

In [40]:
# Saving metrics to disk

metrics_df = pd.DataFrame({
    "g_losses": G_losses,
    "d_losses": D_losses,
    "d_accuracies": D_accuracies
})

metrics_save_path = '/content/drive/MyDrive/gen-ai/gan-results/metrics.csv'
metrics_df.to_csv(metrics_save_path, index=False)

In [41]:
# Reading metrics from disk

metrics_read_path = '/content/drive/MyDrive/gen-ai/gan-results/metrics.csv'
metrics_df = pd.read_csv(metrics_read_path)

G_losses = metrics_df["g_losses"].tolist()
D_losses = metrics_df["d_losses"].tolist()
D_accuracies = metrics_df["d_accuracies"].tolist()
print(len(D_losses))

46900


In [42]:
print(len(train_loader))

938


In [43]:
group_size = 938

average_loss = []
for i in range(0, len(D_accuracies), group_size):
    chunk = D_accuracies[i:i + group_size]
    chunk_average = sum(chunk) / len(chunk)
    average_loss.append(chunk_average)

print(average_loss)

[0.9029850746268657, 0.9549323694029851, 0.9385244536247335, 0.9323027718550106, 0.9373084355010661, 0.9325193230277186, 0.9298707356076759, 0.9306536513859275, 0.9306203358208955, 0.9299207089552238, 0.9306786380597015, 0.927913446162047, 0.9252898454157783, 0.9237989738805971, 0.9216001465884861, 0.917952092217484, 0.9150786247334755, 0.9112723214285714, 0.9077242137526652, 0.9052671908315565, 0.9006946295309168, 0.8962719882729211, 0.8940481743070362, 0.8891008128997868, 0.884694829424307, 0.884694829424307, 0.8797391391257996, 0.8760660980810234, 0.8737173507462687, 0.8692947094882729, 0.8655800239872068, 0.8624150453091685, 0.8588252931769723, 0.8568430170575693, 0.8529867404051172, 0.8542610607675906, 0.8505380463752665, 0.8489139125799574, 0.8487639925373134, 0.8471565165245203, 0.8463236273987207, 0.8451159381663113, 0.8448077691897654, 0.8447661247334755, 0.8432252798507462, 0.8441997601279317, 0.8432002931769723, 0.8434668176972282, 0.8439582222814499, 0.8450493070362474]
