### Adversarially trained CNN
The CNN below is trained on the MNIST dataset and follows an almost identical architecture used by the Tensorflow CNN tutorial (+99% accuracey on MNIST). The CNN will first train on a limited portion of the MNIST dataset NUMBER_OF_EXAMPLES. Then using GANs, the CNN will be further trained with aritificially generated training examples. Hopefully, an increased eval accuracey is achieved. Inspired by https://arxiv.org/abs/1711.04340.

In [2]:
import torch
from torch import nn
import torchvision.datasets
import numpy as np
import matplotlib.pyplot as plt

In [12]:
batch_size = 16

In [92]:
transform = torchvision.transforms.ToTensor()
mnist_train = torchvision.datasets.MNIST('./MNIST_data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(mnist_train, batch_size=batch_size)
mnist_test = torchvision.datasets.MNIST('./MNIST_data', train=False, download=True, transform=transform)
test_loader = torch.utils.data.DataLoader(mnist_test, batch_size=batch_size)


In [93]:
class Flatten(nn.Module):
    def forward(self, input):
        flattened = input.view(input.shape[0], -1)
        return flattened
    
class Unflatten(nn.Module):
    def __init__(self, C=128, H=7, W=7):
        super(Unflatten, self).__init__()
        self.C = C
        self.H = H
        self.W = W
        
    def forward(self, input):
        unflattened = input.view(-1, self.C, self.H, self.W)
        return unflattened

In [4]:
def CNN():
    model = nn.Sequential(
        nn.Conv2d(1, 32, [5,5], stride=[1,1]),
        nn.LeakyReLU(negative_slope=.01),
        nn.MaxPool2d([2,2], stride=[2,2]),
        nn.Conv2d(32, 64, [5,5], stride=[1,1]),
        nn.LeakyReLU(negative_slope=.01),
        nn.MaxPool2d([2,2], stride=[2,2]),
        Flatten(),
        nn.Linear((4*4*64), (4*4*64)), 
        nn.LeakyReLU(negative_slope=.01),
        nn.Linear((4*4*64), 10)
    )
    return model

In [5]:
def create_optimizer(model, lr=.01, betas=None):
    if betas == None:
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    else:
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=betas)
    return optimizer

In [6]:
def loss(logits, labels):
    loss = nn.CrossEntropyLoss()

In [7]:
model = CNN()
optimizer = create_optimizer(model)

NameError: name 'Flatten' is not defined

In [8]:
def train_CNN(model, train_loader, epochs, optimizer, num_train_batches=-1, print_results=True):
    loss_op = nn.CrossEntropyLoss()
    for iters in range(epochs):
        for i, (examples, labels) in enumerate(train_loader):
            if i == num_train_batches:
                break
            logits = model(examples)
            cost = loss_op(logits, labels)
            optimizer.zero_grad()
            cost.backward()
            optimizer.step()
            if (i % 250 == 0) and (print_results):
                print("cost @ epoch", iters, "@ batch", i , cost.detach().numpy())
        print("Completed epoch #" + str(iters + 1))
        
    return model

In [9]:
CNN = train_CNN(model, train_loader, 10, optimizer, num_train_batches=200, print_results=False)

NameError: name 'model' is not defined

In [10]:
def eval_CNN(model, test_loader, num_test_batches=-1):
    loss_op = nn.CrossEntropyLoss()
    total_examples = 0
    correct = 0
    for i, (examples, labels) in enumerate(test_loader):
        if i == num_test_batches:
            break
        logits = model(examples)
#         print(logits)
        _, true_logits = torch.max(logits, 1)
        total_examples += logits.shape[0]
        correct += (true_logits == labels).sum()
    #.numpy so will print regularly
    print("Test accuracy: " + str((correct * 100/total_examples).numpy()) + "%")
#         costs true_logits - labels
#         cost = loss_op(logits, labels)
#         costs += cost

In [11]:
eval_CNN(CNN, test_loader, num_test_batches=2)

NameError: name 'test_loader' is not defined

In [None]:
def generate_nosie(batch_size, dim=96):
    noise = torch.rand(batch_size, dim) * 2 - 1
    return noise

In [None]:
test_noise = generate_nosie(4, 96)

In [None]:
def generator(noise_dim=96):
    model = nn.Sequential(
        nn.Linear(noise_dim, 1024),
        nn.ReLU(),
        nn.BatchNorm1d(1024),
        nn.Linear(1024, (7*7*128)),
        nn.ReLU(),
        nn.BatchNorm1d(7*7*128),
        Unflatten(),
        nn.ConvTranspose2d(128, 64, [4,4], stride=[2,2], padding=1),
        nn.ReLU(),
        nn.BatchNorm2d(64),
        nn.ConvTranspose2d(64, 1, [4,4], stride=[2,2], padding=1),
        nn.Tanh(),
        Flatten()
    )
    return model

### Gernator & Discrimnator Loss
Generator Loss:
$$\ell_G  =  \frac{1}{2}\mathbb{E}_{z \sim p(z)}\left[\left(D(G(z))-1\right)^2\right]$$
Discriminator Loss:
$$ \ell_D = \frac{1}{2}\mathbb{E}_{x \sim p_\text{data}}\left[\left(D(x)-1\right)^2\right] + \frac{1}{2}\mathbb{E}_{z \sim p(z)}\left[ \left(D(G(z))\right)^2\right]$$
Loss functions from https://arxiv.org/abs/1611.04076 <br/>

In [None]:
def discriminator_loss(scores_real, scores_fake):
    true_labels = torch.ones_like(scores_real)
    valid_loss = torch.mean((scores_real - true_labels) ** 2) * .5
    invalid_loss = torch.mean(scores_fake ** 2) * .5
    loss = valid_loss + invalid_loss
    return loss

In [None]:
def generator_loss(scores_fake):
    true_labels = torch.ones_like(scores_fake)
    loss = torch.mean((scores_fake - true_labels) ** 2) * .5
    return loss

In [None]:
def train_gan(generator, discriminator, image_loader, epochs, num_train_batches=-1):
    generator_optimizer = create_optimizer(generator)
    discriminator_optimizer = create_optimizer(discriminator)
    
    for iters in range(epochs):
        for i, (examples, labels) in enumerate(image_loader):
            generator_optimizer.zero_grad()
            discriminator_optimizer.zero_grad()

            z = generate_nosie(batch_size)
            images_fake = generator(z)
            scores_fake = discriminator(images_fake)
            
            ##TODO, fix scores_fake 10 class problem
            
            g_cost = generator_loss(scores_fake)
            g_cost.backward()
            generator_optimizer.step()

            scores_real = discriminator(examples)
            d_cost = discriminator_loss(scores_real, scores_fake)
            d_cost.backward()
            discriminator_optimizer.step()
            
            if i % 100 == 0:
                print("Discriminator Cost", d_cost)
                print("Generator Cost", g_cost)

    return generator, discriminator

In [None]:
generator = generator()
discriminator = CNN()
image_loader = train_loader
epochs = 10
num_train_batches = 100
train_gan(generator, discriminator, image_loader, epochs, num_train_batches=num_train_batches)