# Import Libraries

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
from __future__ import print_function
#%matplotlib inline
import argparse
import os
import random
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML
from PIL import Image

# Set random seed for reproducibility
manualSeed = 999
#manualSeed = random.randint(1, 10000) # use if you want new results
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

# Get Data and fix channel order

In [0]:
celebAreal = np.load("/content/drive/My Drive/Deep Learning Project Spring 20/data/image_array_10000.npy")
celebAfakes = np.load("/content/drive/My Drive/Deep Learning Project Spring 20/data/image_array_10000_fakes.npy")

real = np.transpose(celebAreal,(0,3,1,2))
fake = celebAfakes

In [0]:
training = np.concatenate((real[0:8000], fake[0:8000]), 0)
training_labels= np.zeros(len(training))
training_labels[0:8000] = 1
np.shape(training), np.shape(training_labels)

val = np.concatenate((real[8000:], fake[8000:]), 0)
val_labels= np.zeros(len(val))
val_labels[0:2000] = 1
np.shape(val), np.shape(val_labels)

# Create Datasets and Dataloaders

In [0]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = torch.LongTensor(targets)
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        y = self.targets[index]

        if self.transform:
            x = Image.fromarray(self.data[index].astype(np.uint8).transpose(1,2,0))
            x = self.transform(x)

        return x, y

    def __len__(self):
        return len(self.data)

#Normalisation of Image
transform_norm = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.25, 0.25, 0.25))])

#Loading to a Dataloader; Batch size 128
dataset = MyDataset(training, training_labels, transform=transform_norm)
dataloader = torch.utils.data.DataLoader(dataset, 128, shuffle=True)

dataset_val = MyDataset(val, val_labels, transform=transform_norm)
dataloader_val = torch.utils.data.DataLoader(dataset_val, 128, shuffle=True)

In [0]:
# Number of workers for dataloader
workers = 2

# Batch size during training
batch_size = 128

# Spatial size of training images. All images will be resized to this
#   size using a transformer.
image_size = 64

# Number of channels in the training images. For color images this is 3
nc = 3

# Size of z latent vector (i.e. size of generator input)
nz = 100

# Size of feature maps in generator
ngf = 64

# Size of feature maps in discriminator
ndf = 64

# Number of training epochs
num_epochs = 30

# Learning rate for optimizers
lr = 0.0002

# Beta1 hyperparam for Adam optimizers
beta1 = 0.5

# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")

# Define Model

In [0]:
# custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [0]:
# Generator Code

class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d( nz, ngf * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf * 8),
            nn.ReLU(True),
            # state size. (ngf*8) x 4 x 4
            nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 4),
            nn.ReLU(True),
            # state size. (ngf*4) x 8 x 8
            nn.ConvTranspose2d( ngf * 4, ngf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(True),
            # state size. (ngf*2) x 16 x 16
            nn.ConvTranspose2d( ngf * 2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),
            # state size. (ngf) x 32 x 32
            nn.ConvTranspose2d( ngf, nc, 4, 2, 1, bias=False),
            nn.Tanh()
            # state size. (nc) x 64 x 64
        )

    def forward(self, input):
        return self.main(input)

In [0]:
# Create the generator
netG = Generator(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netG = nn.DataParallel(netG, list(range(ngpu)))

# Apply the weights_init function to randomly initialize all weights
#  to mean=0, stdev=0.2.
netG.apply(weights_init)

# Print the model
print(netG)


In [0]:
#Discriminator Architecture goes here
class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            # input is (nc) x 64 x 64
            nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf) x 32 x 32
            nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 2),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*2) x 16 x 16
            nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*4) x 8 x 8
            nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ndf * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (ndf*8) x 4 x 4
            nn.Conv2d(ndf * 8, 2, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.main(input)

In [0]:
# Create the Discriminator
netD = Discriminator(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netD = nn.DataParallel(netD, list(range(ngpu)))

# Apply the weights_init function to randomly initialize all weights
#  to mean=0, stdev=0.2.
netD.apply(weights_init)

# Print the model
print(netD)

In [0]:
class GeneratorLossEncoder(nn.Module):
    def __init__(self, ngpu):
        super(GeneratorLossEncoder, self).__init__()
        self.ngpu = ngpu
        self.main = nn.Sequential(
            nn.Linear(2,1),
            nn.LeakyReLU(0.2, inplace=True)
        )

    def forward(self, input):
        return self.main(input)

In [0]:
# Create the Generator Loss Encoder (to convert the CrossEntropy Loss output to BCE)
netGLE = GeneratorLossEncoder(ngpu).to(device)

# Handle multi-gpu if desired
if (device.type == 'cuda') and (ngpu > 1):
    netGLE = nn.DataParallel(netGLE, list(range(ngpu)))

# Apply the weights_init function to randomly initialize all weights
#  to mean=0, stdev=0.2.
netGLE.apply(weights_init)

# Print the model
print(netGLE)

# Define Loss Functions

In [0]:
#Loss tranfer code goes here
# Initialize BCELoss function
criterion = nn.CrossEntropyLoss()
gen_criterion = nn.BCELoss()


# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, nz, 1, 1, device=device)

# Establish convention for real and fake labels during training
real_label = 1
fake_label = 0

# Setup Adam optimizers for both G and D
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999))

# Train Model

In [0]:
import pickle
from torch.autograd import Variable
# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []
iters = 0

print("Starting Training Loop...")
# For each epoch
for epoch in range(num_epochs):
    # For each batch in the dataloader
    for i, data in enumerate(dataloader, 0):

        ############################
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        ## Train with all-real batch
        netD.zero_grad()
        # Format batch
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = data[1].to(device).long()
        # Forward pass real batch through D
        output = netD(real_cpu).squeeze()
        # Calculate loss on all-real batch
        errD_real = criterion(output, label)
        errD_real.backward()
        # Calculate gradients for D in backward pass
        _, predicted = torch.max(output.data.squeeze(),1)
        total = label.size(0)
        correct = (predicted.squeeze() == label).sum().item()
        D_x = correct/total

        ## Train with all-fake batch
        # Generate batch of latent vectors
        noise = torch.randn(b_size, nz, 1, 1, device=device)
        # Generate fake image batch with G
        fake = netG(noise)
        label.fill_(fake_label)
        # Classify all fake batch with D
        output = netD(fake.detach()).squeeze()
        # Calculate D's loss on the all-fake batch
        errD_fake = criterion(output, label)
        errD_fake.backward()
        # Calculate the gradients for this batch
        _, predicted = torch.max(output.data.squeeze(),1)
        total = label.size(0)
        correct = (predicted.squeeze() == label).sum().item()
        D_G_z1 = correct/total
        # Add the gradients from the all-real and all-fake batches
        errD = errD_real + errD_fake
        # Update D
        optimizerD.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
        netG.zero_grad()
        label.fill_(real_label)  # fake labels are real for generator cost
        # Since we just updated D, perform another forward pass of all-fake batch through D
        output = netD(fake).squeeze()
        # Calculate G's loss based on this output
        errG = criterion(output, label)
        # Calculate gradients for G
        errG.backward()
        _, predicted = torch.max(output.data.squeeze(),1)
        total = label.size(0)
        correct = (predicted.squeeze() == label).sum().item()
        D_G_z2 = correct/total
        # Update G
        optimizerG.step()

        # Output training stats
        if i % 100 == 0:
            print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
                  % (epoch, num_epochs, i, len(dataloader),
                     errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        if (iters % 100 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
            torch.save({'D state_dict':netD.state_dict(), 'DOptim state_dict':optimizerD.state_dict(), 'G state_dict':netG.state_dict(), 'GOptim state_dict':optimizerG.state_dict()},"/content/drive/My Drive/Deep Learning Project Spring 20/data/CelebA_DCGAN_detect_binary_class_state_dict")
            with torch.no_grad():
                fake = netG(fixed_noise).detach().cpu()
            img_list.append(vutils.make_grid(fake, padding=2, normalize=True))

        iters += 1
np.save('/content/drive/My Drive/Deep Learning Project Spring 20/Results/DCGAN_binary_class_train_loss.npy', np.array(G_losses), np.array(D_losses))
with open('/content/drive/My Drive/Deep Learning Project Spring 20/Results/DCGAN_binary_class_train_hist.pkl', 'wb') as f:
    pickle.dump(img_list, f)

# Save Model

In [0]:
state_dict = torch.load("/content/drive/My Drive/Deep Learning Project Spring 20/data/CelebA_DCGAN_detect_binary_class_state_dict")
netD.load_state_dict(state_dict['D state_dict'])
optimizerD.load_state_dict(state_dict['DOptim state_dict'])

netG.load_state_dict(state_dict['G state_dict'])
optimizerG.load_state_dict(state_dict['GOptim state_dict'])

# Test Data Performance

In [0]:
#Load Data
nvidia_fakes = np.load('/content/drive/My Drive/CelebA/image_fake_array_10000.npy')
real_images = np.load('/content/drive/My Drive/CelebA/image_array_20000.npy')[:10000]

if nvidia_fakes.shape[1:]==(64, 64, 3):
  nvidia_fake_array = np.transpose(nvidia_fakes, (0, 1, 3, 2))
  nvidia_fake_array = np.transpose(nvidia_fake_array, (0, 2, 1, 3))
else:
  nvidia_fake_array=nvidia_fakes

if real_images.shape[1:]==(64, 64, 3):
  real_images_array = np.transpose(real_images, (0, 1, 3, 2))
  real_images_array = np.transpose(real_images_array, (0, 2, 1, 3))
else:
  real_images_array=real_images

#Create a mixture of real and Nvidia Fakes

test_x=np.concatenate([nvidia_fake_array, real_images_array])
test_y=np.concatenate([np.ones(len(nvidia_fake_array)), np.zeros(len(real_images_array))])

#Create a dataloader
dataset_test=MyDataset(x_test, y_test, transform=transform_norm)
dataloader_test=torch.utils.data.DataLoader(dataset_test, 50, shuffle=True)

In [0]:
correct = 0
total = 0
with torch.no_grad():
    for i, data in enumerate(dataloader_test, 0):
        images, labels = data
        labels = labels.to(device)
        outputs = netD(images.to(device))
        _, predicted = torch.max(outputs.data,1)
        total += labels.size(0)
        correct += (predicted.squeeze() == labels).sum().item()
    #acc_nvidia = 100 * correct/total
print('Accuracy of the network on the 10000 test images: %f %%' % (
    100 * correct / total))