<a href="https://colab.research.google.com/github/Kaidavid/CS492A_Courseworks/blob/master/Coursework2_II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

drive.mount('/gdrive')
root = '/gdrive/My Drive/CS492A/Courserwork2'

In [None]:
import os, gc
import numpy as np
import pandas as pd
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import plotly.graph_objects as go
import matplotlib.pyplot as plt 
import plotly.express as px

from sklearn.manifold import TSNE
from plotly.subplots import make_subplots
from collections import Counter
from torchvision.utils import save_image
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from torch.autograd import Variable
from torchvision.utils import make_grid
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from easydict import EasyDict as edict

In [None]:
torch.manual_seed(470)
torch.cuda.manual_seed(470)

opt = edict()

opt.n_epochs = 200
opt.batch_size = 128
opt.lr = 0.0002
opt.b1 = 0.5
opt.b2 = 0.999
opt.n_cpu = 8
opt.latent_dim = 100
opt.img_size = 32
opt.channels = 1
opt.sample_interval = 400

cuda = True if torch.cuda.is_available() else False

In [None]:
transform = transforms.Compose([
                                transforms.Resize(opt.img_size),
                                transforms.ToTensor(),
                                transforms.Normalize([0.5], [0.5])
                                ])

train_dataset = MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = MNIST(root='./data', train=False, download=True, transform=transform)

# Configure data loader
train_loader = DataLoader(train_dataset, batch_size=opt.batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=opt.batch_size, shuffle=False)

In [None]:
train_labels = []
for _, label in train_dataset:
    train_labels.append(label)
test_labels = []
for _, label in test_dataset:
    test_labels.append(label)

count_train_freq = Counter(train_labels).items()
count_train_freq = sorted(count_train_freq)
count_train_freq = [x[1] for x in count_train_freq]
count_train_prop = [x/(len(train_labels)) for x in count_train_freq]

count_test_freq = Counter(test_labels).items()
count_test_freq = sorted(count_test_freq)
count_test_freq = [x[1] for x in count_test_freq]
count_test_prop = [x/(len(test_labels)) for x in count_test_freq]

In [None]:
fig1 = make_subplots(rows=1, cols=2)
fig1.append_trace(go.Bar(name='Train Data', x=np.arange(10), y=count_train_freq, marker_color='rgb(33, 75, 99)'), 1, 1)
fig1.append_trace(go.Bar(name='Test Data', x=np.arange(10), y=count_test_freq, marker_color='rgb(79, 129, 102)'), 1, 1)
fig1.append_trace(go.Bar(name='Train Data', x=np.arange(10), y=count_train_prop, marker_color='rgb(33, 75, 99)'), 1, 2)
fig1.append_trace(go.Bar(name='Test Data', x=np.arange(10), y=count_test_prop, marker_color='rgb(79, 129, 102)'), 1, 2)

fig1.update_layout(
    barmode='group',
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig1.show()

### **CNN**

In [None]:
args = edict()
args.batch_size = 64
args.test_batch_size = 1000
args.epochs = 14
args.lr = 1.0
args.gamma = 0.7
args.dry_run = False
args.log_interval = 10
args.save_model = False
device = torch.device("cuda" if cuda else "cpu")
train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}

if cuda:
    cuda_kwargs = {'num_workers': 1,
                    'pin_memory': True,
                    'shuffle': True}
    train_kwargs.update(cuda_kwargs)
    test_kwargs.update(cuda_kwargs)


transform_ = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
    ])

train_dataset_ = MNIST('../data', train=True, download=True, transform=transform_)
test_dataset_ = MNIST('../data', train=False, download=True, transform=transform_)

train_loader_ = torch.utils.data.DataLoader(train_dataset_, **train_kwargs)
test_loader_ = torch.utils.data.DataLoader(test_dataset_, **test_kwargs)

In [None]:
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1)
    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1)
    self.dropout1 = nn.Dropout(0.25)
    self.dropout2 = nn.Dropout(0.5)
    self.fc1 = nn.Linear(in_features=9216, out_features=128)
    self.fc2 = nn.Linear(in_features=128, out_features=10)

  def forward(self, x):
    x = self.conv1(x)
    x = F.relu(x)
    x = self.conv2(x)
    x = F.relu(x)
    x = F.max_pool2d(x, 2)
    x = self.dropout1(x)
    x = torch.flatten(x, 1)
    x = self.fc1(x)
    x = F.relu(x)
    x = self.dropout2(x)
    x = self.fc2(x)
    output = F.log_softmax(x, dim=1)
    return output

In [None]:
def train(args, model, device, train_loader, optimizer, epoch):
  model.train()
  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    optimizer.zero_grad()
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward(retain_graph=True)
    optimizer.step()
    if batch_idx % args.log_interval == 0:
      print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
          epoch, batch_idx * len(data), len(train_loader),
          100. * batch_idx / len(train_loader), loss.item()))
      if args.dry_run:
        break


def test(model, device, test_loader):
  model.eval()
  test_loss = 0
  correct = 0
  with torch.no_grad():
    for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      output = model(data)
      test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
      pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
      correct += pred.eq(target.view_as(pred)).sum().item()

  test_loss /= len(test_loader.dataset)

  print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
      test_loss, correct, len(test_loader.dataset),
      100. * correct / len(test_loader.dataset)))

In [None]:
def weights_init_noise(m):
  classname = m.__class__.__name__
  if classname.find("Conv") != -1:
      torch.nn.init.normal_(m.weight.data, 0.0, 1.0)
  elif classname.find("BatchNorm2d") != -1:
      torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
      torch.nn.init.constant_(m.bias.data, 0.0)

In [None]:
model = Net().to(device)
optimizer = torch.optim.Adadelta(model.parameters(), lr=args.lr)
#model.apply(weights_init_noise)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
    train(args, model, device, train_loader_, optimizer, epoch)
    test(model, device, test_loader_)
    scheduler.step()

if args.save_model:
    torch.save(model.state_dict(), "mnist_cnn.pt")

In [None]:
def check_images(loader, batch_id, how_many):
  for batch_idx, (image, label) in enumerate(loader):
    if batch_idx == batch_id:
      actual_label = label[:how_many].numpy()      
      plt.figure(figsize=(15,10))
      plt.subplot(1,2,2)
      plt.axis("off")
      plt.imshow(np.transpose(make_grid(image[:how_many]),(1,2,0)))
      plt.show()
      
      image = image.to(device)
      print(f"Actual labels: {actual_label}")
      prediction = model(image[:how_many,:,:,:])
      pred = prediction.argmax(dim=1, keepdim=True) # get the index of the max log-probability
      print(f"Predcited labels: {pred.squeeze(-1).cpu().numpy()}")
      correct = pred.eq(torch.from_numpy(actual_label).to(device).view_as(pred)).sum().item()
      print(f"Number of correct classes: {correct}")
      break

check_images(test_loader_, 0, 8)

In [None]:
def model_accuracy(my_model):
  test_labels = []
  test_pred = []
  total = 0
  for batch_idx, (image, label) in enumerate(test_loader_):
    test_labels += label.tolist()
    image = image.to(device)
    prediction = my_model(image)
    pred = prediction.argmax(dim=1, keepdim=True) # get the index of the max log-probability
    test_pred += pred.squeeze(-1).tolist()
    correct = pred.eq(label.to(device).view_as(pred)).sum().item()
    total += correct
  return test_labels, test_pred, total

### **GAN**

In [None]:
  def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm2d") != -1:
        torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
        torch.nn.init.constant_(m.bias.data, 0.0)

In [None]:
class cGenerator(nn.Module):
    def __init__(self):
        super(cGenerator, self).__init__()
        
        self.label_embedding = nn.Embedding(10, 10)
        self.init_size = opt.img_size // 4
        self.l1 = nn.Sequential(nn.Linear(opt.latent_dim + 10, 128 * self.init_size ** 2)) # 110 -> 8192

        self.conv_blocks = nn.Sequential(
            
            #state size 128, 8, 8
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            
            #state size 128, 16, 16
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),           
            nn.Upsample(scale_factor=2),
            
            #state size 128, 32, 32
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),

            #state size 64, 32, 32
            nn.Conv2d(64, opt.channels, 3, stride=1, padding=1),

            #state size 1, 32, 32
            nn.Tanh(),
        )

    def forward(self, z, labels):
        z = torch.cat((self.label_embedding(labels), z), -1) #110
        out = self.l1(z)  # B -> 110 to 110 -> 8192 = B -> 8192
        out = out.view(out.shape[0], 128, self.init_size, self.init_size) # [B, 128, 8, 8]
        img = self.conv_blocks(out)
        return img

In [None]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.init_size = opt.img_size // 4
        self.l1 = nn.Sequential(nn.Linear(opt.latent_dim, 128 * self.init_size ** 2)) # 100 -> 8192

        self.conv_blocks = nn.Sequential(
            
            #state size 128, 8, 8
            nn.BatchNorm2d(128),
            nn.Upsample(scale_factor=2),
            
            #state size 128, 16, 16
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),           
            nn.Upsample(scale_factor=2),
            
            #state size 128, 32, 32
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),

            #state size 64, 32, 32
            nn.Conv2d(64, opt.channels, 3, stride=1, padding=1),

            #state size 1, 32, 32
            nn.Tanh(),
        )

    def forward(self, z):
        out = self.l1(z)  # 32 -> 100 to 100 -> 8192 = 32 -> 8192
        out = out.view(out.shape[0], 128, self.init_size, self.init_size) # [32, 128, 8, 8]
        img = self.conv_blocks(out)
        return img

In [None]:
class cDiscriminator(nn.Module):
    def __init__(self):
        super(cDiscriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, bn=True):
          block = [nn.Conv2d(in_channels=in_filters, out_channels=out_filters, kernel_size=4, stride=2, padding=1, bias=False),
                   nn.LeakyReLU(0.2, inplace=True),
                   nn.Dropout2d(0.25)]
          if bn:
            block = [nn.Conv2d(in_channels=in_filters, out_channels=out_filters, kernel_size=4, stride=2, padding=1, bias=False),
                     nn.BatchNorm2d(out_filters, 0.8),
                     nn.LeakyReLU(0.2, inplace=True),
                     nn.Dropout2d(0.25)]
          return block
        
        self.label_embedding = nn.Embedding(10, 10)
        self.linear = nn.Linear(10 + opt.img_size**2, opt.img_size**2)
        self.model = nn.Sequential(
            *discriminator_block(opt.channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # The height and width of downsampled image
        ds_size = opt.img_size // 2 ** 4
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img, labels):

        img = torch.cat((img.view(img.size(0), -1), self.label_embedding(labels)), -1) # B,1*32*32 + B,10 = B,1034
        img = self.linear(img)  # B, 1034 -> B, 1024
        img = img.view(img.shape[0], opt.channels, opt.img_size, opt.img_size) # [B, 1, 32, 32]
        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)

        return validity

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_channels=in_filters, out_channels=out_filters, kernel_size=4, stride=2, padding=1, bias=False), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(opt.channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # The height and width of downsampled image
        ds_size = opt.img_size // 2 ** 4
        self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), nn.Sigmoid())

    def forward(self, img):
        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)

        return validity

In [None]:
# Loss function
adversarial_loss = torch.nn.BCELoss()

# Initialize generator and discriminator
generator = cGenerator()
discriminator = cDiscriminator()

if cuda:
    generator.cuda()
    discriminator.cuda()
    adversarial_loss.cuda()

# Initialize weights
generator.apply(weights_init_normal)
discriminator.apply(weights_init_normal)

In [None]:
optimizer_G = torch.optim.Adam(generator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=opt.lr, betas=(opt.b1, opt.b2))

FloatTensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if cuda else torch.LongTensor

fixed_noise = Variable(FloatTensor(np.random.normal(0, 1, (1, opt.latent_dim))))
fixed_label = Variable(LongTensor(np.random.randint(0, 10, 1)))

In [None]:
# ----------
#  Training CGAN
# ----------

# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []
iters = 0

for epoch in range(opt.n_epochs):
    for i, (imgs, labels) in enumerate(train_loader):

        # Adversarial ground truths
        valid = Variable(FloatTensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
        fake = Variable(FloatTensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)

        # Configure input
        real_imgs = Variable(imgs.type(FloatTensor))
        labels = Variable(labels.type(LongTensor))

        # -----------------
        #  Train Generator
        # -----------------

        optimizer_G.zero_grad()

        # Sample noise as generator input
        z = Variable(FloatTensor(np.random.normal(0, 1, (imgs.shape[0], opt.latent_dim))))   # torch.Size([32, 100]
        g_labels = Variable(LongTensor(np.random.randint(0, 10, imgs.shape[0])))

        # Generate a batch of images
        gen_imgs = generator(z, g_labels)

        # Loss measures generator's ability to fool the discriminator
        g_loss = adversarial_loss(discriminator(gen_imgs, g_labels), valid)

        g_loss.backward()
        optimizer_G.step()

        # ---------------------
        #  Train Discriminator
        # ---------------------

        optimizer_D.zero_grad()

        # Measure discriminator's ability to classify real from generated samples
        real_loss = adversarial_loss(discriminator(real_imgs, labels), valid)
        fake_loss = adversarial_loss(discriminator(gen_imgs.detach(), g_labels), fake)
        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()



        # Output training stats
        if i % 50 == 0:
             print(
              "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
              % (epoch, opt.n_epochs, i, len(train_loader), d_loss.item(), g_loss.item())
              )

        # Save Losses for plotting later
        G_losses.append(g_loss.item())
        D_losses.append(d_loss.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        if (iters % 500 == 0) or ((epoch == opt.n_epochs-1) and (i == len(train_loader)-1)):
            with torch.no_grad():
                fake = generator(fixed_noise, fixed_label).detach().cpu()
            img_list.append(make_grid(fake, padding=2, normalize=True)) #stored on every 500 epoch

        iters += 1

In [None]:
  # ----------
#  Training GAN
# ----------
"""
# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []
iters = 0

for epoch in range(opt.n_epochs):
    for i, (imgs, _) in enumerate(train_loader):

        # Adversarial ground truths
        valid = Variable(FloatTensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
        fake = Variable(FloatTensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)

        # Configure input
        real_imgs = Variable(imgs.type(FloatTensor))

        # -----------------
        #  Train Generator
        # -----------------

        optimizer_G.zero_grad()

        # Sample noise as generator input
        z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], opt.latent_dim))))   # torch.Size([32, 100]

        # Generate a batch of images
        gen_imgs = generator(z)

        # Loss measures generator's ability to fool the discriminator
        g_loss = adversarial_loss(discriminator(gen_imgs), valid)

        g_loss.backward()
        optimizer_G.step()

        # ---------------------
        #  Train Discriminator
        # ---------------------

        optimizer_D.zero_grad()

        # Measure discriminator's ability to classify real from generated samples
        real_loss = adversarial_loss(discriminator(real_imgs), valid)
        fake_loss = adversarial_loss(discriminator(gen_imgs.detach()), fake)
        d_loss = (real_loss + fake_loss) / 2

        d_loss.backward()
        optimizer_D.step()

        # Output training stats
        if i % 50 == 0:
             print(
              "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
              % (epoch, opt.n_epochs, i, len(train_loader), d_loss.item(), g_loss.item())
              )

        # Save Losses for plotting later
        G_losses.append(g_loss.item())
        D_losses.append(d_loss.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        if (iters % 500 == 0) or ((epoch == opt.n_epochs-1) and (i == len(train_loader)-1)):
            with torch.no_grad():
                fake = generator(fixed_noise).detach().cpu()
            img_list.append(make_grid(fake, padding=2, normalize=True)) #stored on every 500 epoch

        iters += 1


For a batch of 128
  60,000/ 128 = 469 batches
Gradient descent occurs
  469 * number of epoches = 93,800
img_list is updated on every 500 epoch
  93,800 / 500 = 187 = len(img_list)
"""

In [None]:
plt.figure(figsize=(10,5))
plt.title("CGAN Generator and Discriminator Loss During Training")
plt.plot(G_losses,label="G")
plt.plot(D_losses,label="D")
plt.xlabel("iterations")
plt.ylabel("Loss")
plt.legend()
plt.show()

In [None]:
# Grab a batch of real images from the dataloader

device = torch.device("cuda:0" if (torch.cuda.is_available() and 1 > 0) else "cpu")
real_batch = next(iter(train_loader))

# Plot the real images
plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

In [None]:
examples = enumerate(train_loader)
batch_idx, (example_data, example_targets) = next(examples)
plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.axis("off")
plt.imshow(np.transpose(make_grid(example_data.to(device)[0:2], padding=5, normalize=True).cpu(),(1,2,0)))

In [None]:
_ , axes = plt.subplots(1, 5, figsize=(20, 8))
for i, ax in enumerate(axes.flat):
  image = img_list[89+i]
  ax.imshow(np.transpose(make_grid(image),(1,2,0)))
  ax.set_yticklabels([])
  ax.set_xticklabels([])

In [None]:
z = Variable(FloatTensor(np.random.normal(0, 1, (10**2, opt.latent_dim))))
labels = np.array([num for _ in range(10) for num in range(10)])
labels = Variable(LongTensor(labels))
images = generator(z, labels)
grid = make_grid(images, nrow=10, normalize=True)
fig, ax = plt.subplots(figsize=(10,10))
ax.imshow(grid.permute(1, 2, 0).data.cpu(), cmap='binary')
ax.axis('off')

In [None]:
## Generate 1024 image for each class 
def cgan_accuracy():
  all_labels = []
  all_pred = []
  total = 0
  transform_back = transforms.Resize(28)

  for j in range(10):
    for i in range(1, 33):
      z = Variable(FloatTensor(np.random.normal(0, 1, (32, opt.latent_dim))))
      labels = np.array([j for _ in range(32)])
      #labels = np.array([num for _ in range(5) for num in range(10)])
      labels = Variable(LongTensor(labels))
      all_labels += labels.cpu().tolist()
      images = generator(z, labels)
      transformed_image = transform_back(images)
      prediction = model(transformed_image)
      pred = prediction.argmax(dim=1, keepdim=True) # get the index of the max log-probability
      all_pred += pred.squeeze(-1).cpu().tolist()
      correct = pred.eq(labels.view_as(pred)).sum().item()
      total += correct
  return all_labels, all_pred, total

In [None]:
test_labels, test_pred, test_acc = model_accuracy(model)
test_labels_freq = Counter(test_labels).items()
test_labels_freq = sorted(test_labels_freq)
test_labels_freq = [x[1] for x in test_labels_freq]
test_labels_prop = [x/(len(test_labels)) for x in test_labels_freq]

test_pred_freq = Counter(test_pred).items()
test_pred_freq = sorted(test_pred_freq)
test_pred_freq = [x[1] for x in test_pred_freq]
test_pred_prop = [x/(len(test_pred)) for x in test_pred_freq]
print(f"Our CNN model accuracy: {test_acc/100}%")

In [None]:
fig2 = go.Figure()
fig2.add_trace(go.Bar(name='True labels', x=np.arange(10), y=test_labels_prop, marker_color='rgb(33, 75, 99)'))
fig2.add_trace(go.Bar(name='Predicted labels CNN', x=np.arange(10), y=test_pred_prop, marker_color='rgb(79, 129, 102)'))


fig2.update_layout(
    barmode='group',
    title="Pre-trained CNN model",
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig2.show()

In [None]:
cgan_generated_labels, cgan_generated_pred, cgan_generated_acc = cgan_accuracy()

cgan_generated_freq = Counter(cgan_generated_labels).items()
cgan_generated_freq = sorted(cgan_generated_freq)
cgan_generated_freq = [x[1] for x in cgan_generated_freq]
cgan_generated_prop = [x/(len(cgan_generated_labels)) for x in cgan_generated_freq]

cgan_generated_pred_freq = Counter(cgan_generated_pred).items()
cgan_generated_pred_freq = sorted(cgan_generated_pred_freq)
cgan_generated_pred_freq = [x[1] for x in cgan_generated_pred_freq]
cgan_generated_pred_prop = [x/(len(cgan_generated_pred)) for x in cgan_generated_pred_freq]

print(f"Our CNN model accuracy on generated image: {cgan_generated_acc/100}%")

In [None]:
fig3 = go.Figure()
fig3.add_trace(go.Bar(name='Assigned labels', x=np.arange(10), y=cgan_generated_prop, marker_color='rgb(33, 75, 99)'))
fig3.add_trace(go.Bar(name='Predicted labels CNN', x=np.arange(10), y=cgan_generated_pred_prop, marker_color='rgb(79, 129, 102)'))


fig3.update_layout(
    barmode='group',
    title="CGAN Generated",
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig3.show()

In [None]:
## Generate 1024 image for each class 
def cgan_10000(size):
  cgan_loader = []
  transform_back = transforms.Resize(28)
  for i in range(size):
    z = Variable(FloatTensor(np.random.normal(0, 1, (args.batch_size, opt.latent_dim))))
    labels = np.random.randint(0, 10, args.batch_size)
    labels = Variable(LongTensor(labels))
    images = generator(z, labels)
    transformed_image = transform_back(images)
    cgan_loader.append((transformed_image, labels))
  return cgan_loader

In [None]:
percent = 100
cgan_loader = cgan_10000(20)
# train_loader_new = cgan_loader + random.sample(list(train_loader_), int((percent/100)*(len(train_loader_))))
# random.shuffle(train_loader_new)

In [None]:
#Train new CNN with generated + percentage of trian data
model_new = Net().to(device)
optimizer = torch.optim.Adadelta(model_new.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
    train(args, model_new, device, train_loader_new, optimizer, epoch)
    test(model_new, device, test_loader_)
    scheduler.step()

In [None]:
_, test_pred_new, test_acc = model_accuracy(model_new)

test_pred_new_freq = Counter(test_pred_new).items()
test_pred_new_freq = sorted(test_pred_new_freq)
test_pred_new_freq = [x[1] for x in test_pred_new_freq]
test_pred_new_prop = [x/(len(test_pred_new)) for x in test_pred_new_freq]
print(f"Our CNN model accuracy with Real + Fake: {test_acc/100}%")

In [None]:
accuracy = [95.27, 98.07, 98.61, 99.05, 99.03, 99.1]
percent = [0, 10, 20, 50, 70, 100]

In [None]:
fig4 = go.Figure()
fig4.add_trace(go.Scatter(x=percent, y=accuracy, mode="markers+lines"))
fig4.update_layout(title="6,400 Generated Images")

In [None]:
# pick 100 images from each
images_list = []
labels_list = []
for i in range(10):
  images_l = []
  labels_l = []
  for batch_idx, (images, labels) in enumerate(train_loader_):
    for j in range(labels.shape[0]):
      if labels[j] == i:
        images_l.append(images[j])
        labels_l.append(i)
        break
    if len(images_l) == 10:
      break
  images_list += images_l
  labels_list += labels_l


for i in range(10):
  images_l = []
  labels_l = []
  for batch_idx, (images, labels) in enumerate(cgan_loader):
    for j in range(labels.shape[0]):
      if labels[j] == i:
        images_l.append(images[j])
        labels_l.append(i)
        break
    if len(images_l) == 10:
      break
  images_list += images_l
  labels_list += labels_l


In [None]:
X = [x.view(1,-1).cpu() for x in images_list]
labels_list2 = [int(x) for x in labels_list]
X = torch.cat(X)
X = X.detach().numpy()
X_embedded = TSNE(n_components=2, learning_rate='auto', init='random').fit_transform(X)

In [None]:
df = pd.DataFrame(X_embedded, columns=["dim1", "dim2"])
binary_1or0 = [0 for x in range(100)]
binary_1or0 += [1 for x in range(100)]
df["binary"] = binary_1or0

In [None]:
fig5 = px.scatter(df, x="dim1", y="dim2", color="binary")
fig5.update_layout(title="Embedded by tSNE",template="plotly_dark")
fig5.update_xaxes(title_text='X')
fig5.update_yaxes(title_text='Y')
fig5.show()

In [None]:
fig6 = go.Figure()
fig6.add_trace(go.Bar(name='True labels', x=np.arange(10), y=test_labels_prop, marker_color='rgb(33, 75, 99)'))
fig6.add_trace(go.Bar(name='Predicted labels Real CNN', x=np.arange(10), y=test_pred_prop, marker_color='rgb(79, 129, 102)'))
fig6.add_trace(go.Bar(name='Predicted labels Real + Fake CNN', x=np.arange(10), y=test_pred_new_prop, marker_color= 'rgb(214, 39, 40)'))


fig6.update_layout(
    barmode='group',
    title="CNN model trained on Fake + 10% Real",
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig6.show()

In [None]:
## Used on DCGAN
def label_generated(how_many): 
  generated_labels = []
  transform_back = transforms.Resize(28)

  for i in range(how_many):
    noise = Variable(Tensor(np.random.normal(0, 1, (32, opt.latent_dim))))
    fake = generator(noise).detach().cpu()
    image = fake.to(device)
    transformed_image = transform_back(image)
    prediction = model(transformed_image)
    pred = prediction.argmax(dim=1, keepdim=True)
    generated_labels += pred.squeeze(-1).cpu().numpy().tolist()

  return generated_labels

In [None]:
generate = 2**11
generated_labels = label_generated(generate)
generated_freq = Counter(generated_labels).items()
generated_freq = sorted(generated_freq)
generated_freq = [x[1] for x in generated_freq]
generated_prop = [x/(len(generated_labels)) for x in generated_freq]

In [None]:
fig7 = go.Figure()
fig7.add_trace(go.Bar(name='Train Data', x=np.arange(10), y=count_train_freq, marker_color='rgb(33, 75, 99)'))
fig7.add_trace(go.Bar(name='Test Data', x=np.arange(10), y=count_test_freq, marker_color='rgb(79, 129, 102)'))
fig7.add_trace(go.Bar(name='Generated Data', x=np.arange(10), y=generated_freq, marker_color= 'rgb(214, 39, 40)'))


fig7.update_layout(
    barmode='group',
    title="Frequency",
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig7.show()

In [None]:
fig8 = go.Figure()
fig8.add_trace(go.Bar(name='Train Data', x=np.arange(10), y=count_train_prop, marker_color='rgb(33, 75, 99)'))
fig8.add_trace(go.Bar(name='Test Data', x=np.arange(10), y=count_test_prop, marker_color='rgb(79, 129, 102)'))
fig8.add_trace(go.Bar(name='Generated Data', x=np.arange(10), y=generated_prop, marker_color= 'rgb(214, 39, 40)'))

fig8.update_layout(
    barmode='group',
    title="Ratio",
    width=1200,
    height=500,
    xaxis = dict(
        tickmode = 'array',
        tickvals = [x for x in range(10)],
        ticktext = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
    )
)
fig3.show()