In [None]:
import torch
from torch import nn
from tqdm.auto import tqdm
import torch.optim as optim
from torchvision import transforms
from torchvision.datasets import MNIST # Training dataset
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import accuracy_score

def show_tensor_images(image_tensor, num_images=25, size=(1, 28, 28)):
    '''
    Function for visualizing images: Given a tensor of images, number of images, and
    size per image, plots and prints the images in a uniform grid.
    '''
    image_unflat = image_tensor.detach().cpu().view(-1, *size)
    image_grid = make_grid(image_unflat[:num_images], nrow=5)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

In [None]:
# obtain mnist data and process
batch_size = 128
num_of_classes = 10

def get_indices(dataset,ind_array):
    indices =  []
    for i in range(len(dataset.targets)):
        for ind in ind_array:
            if dataset.targets[i] == ind:
                indices.append(i)
    return indices

# def get_indices(dataset,class_name1,class_name2,class_name3):
#     indices =  []
#     for i in range(len(dataset.targets)):
#         if dataset.targets[i] == class_name1 or dataset.targets[i] == class_name2 or dataset.targets[i] == class_name3:
#             indices.append(i)
#     return indices

dataset = MNIST('.', train=True, download=False,transform=transforms.ToTensor())

idx = get_indices(dataset, np.arange(num_of_classes))
data_loader_target = DataLoader(dataset, batch_size=batch_size, sampler = SubsetRandomSampler(idx))
data_loader_gan = DataLoader(dataset, batch_size=batch_size, sampler = SubsetRandomSampler(idx))

classes = np.arange(num_of_classes)

In [None]:
# functions and classes for the target model
def tar_block(input_dim, output_dim):
    '''
    Function for returning a block of the generator's neural network
    given input and output dimensions.
    Parameters:
        input_dim: the dimension of the input vector, a scalar
        output_dim: the dimension of the output vector, a scalar
    Returns:
        a generator neural network layer, with a linear transformation 
          followed by a batch normalization and then a relu activation
    '''
    return nn.Sequential(
        nn.Linear(input_dim, output_dim),
        nn.BatchNorm1d(output_dim),
        nn.ReLU(inplace=True),
    )

class target(nn.Module):
    def __init__(self, num_of_classes, im_dim=784, hidden_dim=128):
        super(target, self).__init__()
        # Build the neural network
        self.trmod = nn.Sequential(
            tar_block(im_dim, hidden_dim*2),
            tar_block(hidden_dim*2, hidden_dim),
            tar_block(hidden_dim, 64),
            tar_block(64, 32),
            nn.Linear(32, num_of_classes),
            nn.Sigmoid()
        )
        
    def forward(self, img):
        '''
        Function for completing a forward pass of the target model: Given a image tensor, 
        returns probabilities of the two classes.
        Parameters:
            img: a image tensor with dimensions (n_samples, im_dim=784)
        '''
        return self.trmod(img)
    
net = target(num_of_classes)
criterion_tar = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
# train the target model 
for epoch in range(25):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(data_loader_target, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        #labels = (labels-1)//2
        inputs = torch.reshape(inputs,(len(inputs),28*28))

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion_tar(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')

In [None]:
# save the model
PATH = './torch_gan_data/dist_net_'+str(num_of_classes)+'classes.pth'
#torch.save(net.state_dict(), PATH)

# load the model
net = target(num_of_classes)
net.load_state_dict(torch.load(PATH))

In [None]:
# test target model accuracy 
correct = 0
total = 0
with torch.no_grad():
    for data in data_loader_gan:
        images, labels = data
        #labels = (labels-1)//2
        images = torch.reshape(images,(len(images),28*28))
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100. * correct / total))

In [None]:
# functions and classes for the GAN
def get_generator_block(input_dim, output_dim):
    '''
    Function for returning a block of the generator's neural network
    given input and output dimensions.
    Parameters:
        input_dim: the dimension of the input vector, a scalar
        output_dim: the dimension of the output vector, a scalar
    Returns:
        a generator neural network layer, with a linear transformation 
          followed by a batch normalization and then a relu activation
    '''
    return nn.Sequential(
        nn.Linear(input_dim, output_dim),
        nn.BatchNorm1d(output_dim),
        nn.ReLU(inplace=True),
    )

class Generator(nn.Module):
    '''
    Generator Class
    Values:
        z_dim: the dimension of the noise vector, a scalar
        im_dim: the dimension of the images, fitted for the dataset used, a scalar
          (MNIST images are 28 x 28 = 784 so that is your default)
        hidden_dim: the inner dimension, a scalar
    '''
    def __init__(self, im_dim=784, hidden_dim=392):
        super(Generator, self).__init__()
        # Build the neural network
        self.gen = nn.Sequential(
            get_generator_block(im_dim, hidden_dim*2),
            get_generator_block(hidden_dim*2, hidden_dim * 4),
            get_generator_block(hidden_dim * 4, hidden_dim * 6),
            get_generator_block(hidden_dim * 6, hidden_dim * 2),
            nn.Linear(hidden_dim * 2, im_dim),
            nn.Sigmoid()
        )
    def forward(self, img):
        '''
        Function for completing a forward pass of the generator: Given a noise tensor, 
        returns generated images.
        Parameters:
            img: a image tensor with dimensions (n_samples, im_dim)
        '''
        return self.gen(img)
    
def get_discriminator_block(input_dim, output_dim):
    '''
    Discriminator Block
    Function for returning a neural network of the discriminator given input and output dimensions.
    Parameters:
        input_dim: the dimension of the input vector, a scalar
        output_dim: the dimension of the output vector, a scalar
    Returns:
        a discriminator neural network layer, with a linear transformation 
          followed by an nn.LeakyReLU activation with negative slope of 0.2 
          (https://pytorch.org/docs/master/generated/torch.nn.LeakyReLU.html)
    '''
    return nn.Sequential(
            nn.Linear(input_dim, output_dim),
            nn.LeakyReLU(0.2)
    )

class Discriminator(nn.Module):
    '''
    Discriminator Class
    Values:
        im_dim: the dimension of the images, fitted for the dataset used, a scalar
            (MNIST images are 28x28 = 784 so that is your default)
        hidden_dim: the inner dimension, a scalar
    '''
    def __init__(self, im_dim=784, hidden_dim=128):
        super(Discriminator, self).__init__()
        self.disc = nn.Sequential(
            get_discriminator_block(im_dim, hidden_dim * 4),
            get_discriminator_block(hidden_dim * 4, hidden_dim * 2),
            get_discriminator_block(hidden_dim * 2, hidden_dim),
            nn.Linear(hidden_dim, 1),
        )

    def forward(self, image):
        '''
        Function for completing a forward pass of the discriminator: Given an image tensor, 
        returns a 1-dimension tensor representing fake/real.
        Parameters:
            image: a flattened image tensor with dimension (im_dim)
        '''
        return self.disc(image)
    
    # Needed for grading
    def get_disc(self):
        '''
        Returns:
            the sequential model
        '''
        return self.disc

In [None]:
# Set your parameters
criterion = nn.BCEWithLogitsLoss()
n_epochs = 200
display_step = 500
batch_size = 128
lr = 0.00001
device = 'cpu'

gen = Generator().to(device)
gen_opt = torch.optim.Adam(gen.parameters(), lr=lr)
disc = Discriminator().to(device) 
disc_opt = torch.optim.Adam(disc.parameters(), lr=lr)

In [None]:
disc_coeff = 1850.
hinge_coeff = 75.
adv_coeff = 400.
c = 0.2

def get_disc_loss(gen, disc, criterion, real, num_images, device):
    '''
    Return the loss of the discriminator given inputs.
    Parameters:
        gen: the generator model, which returns an image given z-dimensional noise
        disc: the discriminator model, which returns a single-dimensional prediction of real/fake
        criterion: the loss function, which should be used to compare 
               the discriminator's predictions to the ground truth reality of the images 
               (e.g. fake = 0, real = 1)
        real: a batch of real images
        num_images: the number of images the generator should produce, 
                which is also the length of the real images
        z_dim: the dimension of the noise vector, a scalar
        device: the device type
    Returns:
        disc_loss: a torch scalar loss value for the current batch
    '''
    #     These are the steps you will need to complete:
    #       1) Create noise vectors and generate a batch (num_images) of fake images. 
    #            Make sure to pass the device argument to the noise.
    #       2) Get the discriminator's prediction of the fake image 
    #            and calculate the loss. Don't forget to detach the generator!
    #            (Remember the loss function you set earlier -- criterion. You need a 
    #            'ground truth' tensor in order to calculate the loss. 
    #            For example, a ground truth tensor for a fake image is all zeros.)
    #       3) Get the discriminator's prediction of the real image and calculate the loss.
    #       4) Calculate the discriminator's loss by averaging the real and fake loss
    #            and set it to disc_loss.
    #     Note: Please do not use concatenation in your solution. The tests are being updated to 
    #           support this, but for now, average the two losses as described in step (4).
    #     *Important*: You should NOT write your own loss function here - use criterion(pred, true)!
    #### START CODE HERE ####
    fake = gen(real) + real # Generate Fake Image Samples
    fakepred = disc(fake.detach()) # Discrimantor's prediction for fake samples
    fake_label = torch.zeros_like(fakepred,device=device) # Ground truth for fake samples
    lossF = criterion(fakepred,fake_label) # Loss criteria for fake
    realpred = disc(real) # Discriminator's prediction for real samples
    real_label = torch.ones_like(realpred,device=device)  #Ground truth for real samples
    lossR = criterion(realpred,real_label) # Loss criteria on true
    disc_loss = 0.5*(lossF + lossR)*disc_coeff #Discriminator's loss
    #### END CODE HERE ####
    return disc_loss

def get_gen_loss(gen, disc, criterion, target_model, tar_criterion, images, labels, num_images, device):
    '''
    Return the loss of the generator given inputs.
    Parameters:
        gen: the generator model, which returns an image given z-dimensional noise
        disc: the discriminator model, which returns a single-dimensional prediction of real/fake
        criterion: the loss function, which should be used to compare 
               the discriminator's predictions to the ground truth reality of the images 
               (e.g. fake = 0, real = 1)
        target_model:
        tar_criterion: 
        images:
        labels:
        num_images: the number of images the generator should produce, 
                which is also the length of the real images
        z_dim: the dimension of the noise vector, a scalar
        device: the device type
    Returns:
        gen_loss: a torch scalar loss value for the current batch
    '''
    #### START CODE HERE ####
    # gen loss
    pert = gen(images)
    fake = pert + images # Generate Fake Image Samples
    fakepred = disc(fake) # Discrimantor's prediction for fake samples
    fake_label = torch.ones_like(fakepred,device=device) # Ground truth for fake samples
    gen_loss = criterion(fakepred,fake_label) # Loss criteria for fake
    
    # pert loss
    t = torch.norm(pert,2,-1) # could also do frobenius norm 'fro'
    C = torch.full(t.shape, c)
    diff = t-C
    hinge_loss = torch.mean(torch.max(diff,torch.zeros(diff.shape)))
    
    #tar loss
    opp_lbl = (labels+1)%num_of_classes
    preds = target_model(fake)
    adv_loss = tar_criterion(preds, opp_lbl)
    print(adv_loss)
    
    ################################################# need to edit this some #######################################
    gen_loss_total = gen_loss + hinge_coeff*hinge_loss + adv_coeff*adv_loss
    # 2, 10
    # 
    #### END CODE HERE ####
    return gen_loss_total

In [None]:
cur_step = 0
mean_generator_loss = 0
mean_discriminator_loss = 0
test_generator = True # Whether the generator should be tested
gen_loss = False
error = False
gen_path = './torch_advgan_model/advgan_torch_disc_'+str(num_of_classes)+'classes_'+str(disc_coeff).replace('.','p')+'_'+str(hinge_coeff).replace('.','p')+'hinge_'+str(c).replace('.','p')+'c_'+str(adv_coeff).replace('.','p')+'adv.pt'

for epoch in range(n_epochs):  

    running_loss = 0.0
    for i, data in enumerate(data_loader_gan, 0):
        inputs, labels = data
        cur_batch_size = len(inputs)

        # Flatten the batch of real images from the dataset
        #real = real.view(cur_batch_size, -1).to(device)
        lbl = labels #(labels-1)//2
        real = torch.reshape(inputs,(len(inputs),28*28))

        ### Update discriminator ###
        # Zero out the gradients before backpropagation
        disc_opt.zero_grad()

        # Calculate discriminator loss
        disc_loss = get_disc_loss(gen, disc, criterion, real, cur_batch_size, device)

        # Update gradients
        disc_loss.backward(retain_graph=True)

        # Update optimizer
        disc_opt.step()

        # For testing purposes, to keep track of the generator weights
        if test_generator:
            old_generator_weights = gen.gen[0][0].weight.detach().clone()

        ### Update generator ###
        gen_opt.zero_grad()

        # Calculate discriminator loss
        gen_loss = get_gen_loss(gen, disc, criterion, net, criterion_tar, real, lbl, cur_batch_size, device)

        # Update gradients
        gen_loss.backward(retain_graph=True)

        # Update optimizer
        gen_opt.step()


        # Keep track of the average discriminator loss
        mean_discriminator_loss += disc_loss.item() / display_step

        # Keep track of the average generator loss
        mean_generator_loss += gen_loss.item() / display_step
        
        # save the generator
        torch.save(gen, gen_path )

        ### Visualization code ###
        if cur_step % display_step == 0 and cur_step > 0:
            print('epoch: ', epoch)
            print(f"Step {cur_step}: Generator loss: {mean_generator_loss}, discriminator loss: {mean_discriminator_loss}")
            #fake_noise = get_noise(cur_batch_size, z_dim, device=device)
            perc_correct = accuracy_score(torch.argmax(net(real),dim=1),lbl)
            fake = gen(real) + real
            perc_wrong = 1-accuracy_score(torch.argmax(net(fake), dim=1), lbl)
            print('% wrong: '+str(perc_wrong)+' | target model % correct: '+str(perc_correct))
            
            show_tensor_images(fake)
            show_tensor_images(real)
            mean_generator_loss = 0
            mean_discriminator_loss = 0
        cur_step += 1

In [None]:
gen_path = './torch_advgan_model/advgan_torch_disc_'+str(num_of_classes)+'classes_'+str(disc_coeff).replace('.','p')+'_'+str(hinge_coeff).replace('.','p')+'hinge_'+str(c).replace('.','p')+'c_'+str(adv_coeff).replace('.','p')+'adv.pt'
model = torch.load(gen_path)

In [None]:
with torch.no_grad():
    for i,data in enumerate(data_loader_gan):
        images, labels = data
        images = torch.reshape(images,(len(images),28*28))
        outputs = net(images)
        pert = model(images) 
        adv_images = pert + images 
        adv_pred = net(adv_images)
        if i%5 == 0:
            fig, (ax1, ax2, ax3) = plt.subplots(1, 3)
            fig.suptitle('actual label: '+str(int(labels[0]))+'\n target model output: '+str(torch.argmax(outputs[0]).detach().numpy())+
                        '\n target model output on adv example: '+str(torch.argmax(adv_pred[0]).detach().numpy())+
                        '\n frobenius norm of pertubation: '+str(torch.norm(pert[0],'fro').detach().numpy()))
            ax1.imshow(images[0].reshape(28,28).detach().numpy(),cmap='gray')
            ax2.imshow(adv_images[0].reshape(28,28).detach().numpy(),cmap='gray')
            ax3.imshow((adv_images[0]-images[0]).reshape(28,28).detach().numpy(),cmap='gray')
            plt.show()

In [None]:
with torch.no_grad():
    for i,data in enumerate(data_loader_gan):
        if i == 0:
            images, labels = data
            #labels = (labels-1)//2
            images = torch.reshape(images,(len(images),28*28))
            img = images[2]
            images = img.unsqueeze(0).repeat(128,1)
            
            outputs = net(images)
            pert = model(images) 
            adv_images = pert + images 
            adv_pred = net(adv_images)
            
            for img,adimg,out,adpred,p in zip(images,adv_images,outputs,adv_pred,pert):
                print('target model output: ', torch.argmax(out))
                print('target model output on adv example: ', torch.argmax(adpred))
                print('frobenius norm of pertubation: ',torch.norm(p,'fro'))
                fig, (ax1, ax2) = plt.subplots(1, 2)
                fig.suptitle('actual label: '+str(int(labels[0])))
                ax1.imshow(img.reshape(28,28).detach().numpy(),cmap='gray')
                ax2.imshow(adimg.reshape(28,28).detach().numpy(),cmap='gray')
                plt.show()
        else:
            pass