In [12]:
from __future__ import print_function
import os
import argparse
import random
import torch
import torchvision
import torch.nn as nn
import torch.utils.data
import torch.nn.parallel
import torch.optim as optim
import torch.nn.functional as F
from IPython.display import HTML
import torchvision.utils as vutils
import torch.backends.cudnn as cudnn
import matplotlib.animation as animation
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split, Dataset
from sklearn.model_selection import train_test_split
import cv2
from torch.autograd import Variable
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pickle
import time

import warnings
warnings.filterwarnings("ignore")

# Set random seed for reproducibility
manualSeed = 42
#manualSeed = random.randint(1, 10000) # use if you want new results
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)

# CUDA for PyTorch
use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
torch.backends.cudnn.benchmark = True

Random Seed:  42


Cant get this fucking thing to work

In [13]:
# If training from scratch, set to True
train = True

# Batch size during training
batch_size = 1

# Spatial size of training images. All images will be resized to this size using a transformer.
img_size = 256

# Number of channels in the training images. For color images this is 3
n_channels = 3

# Size of z latent vector (i.e. size of generator input)
z_vector = 100

# Size of feature maps in generator
f_map_g = 64

# Size of feature maps in discriminator
f_map_d = 64

# Learning rate for optimizers
lrG , lrD = 0.0002 , 0.0002

# Beta1 hyperparam for Adam optimizers
beta1, beta2 = 0.5, 0.999

# Number of classes.
n_class = 51

# Root directory of dataset location
root = 'C:/Users/tfiel/Documents/Python Scripts/CS595A/datasets'

Lambda = 100 #lambda for L1 loss
epochs = 100


In [14]:
data_transforms = transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor()
        ])

if train == True:
    # Define the data directory and the data transforms
    data_dir = root + '/top_artists/images/'

    # Load the dataset using ImageFolder
    top_artists = datasets.ImageFolder(data_dir, transform=data_transforms)
    n_class = len(top_artists.class_to_idx)

In [15]:
if train == True:
    train_set, test_set = train_test_split(top_artists, random_state=0, test_size=0.2)

    # Create a data loader for the dataset
    #Dtr = DataLoader(top_artists, batch_size=batch_size, shuffle=True)
    Dtr = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    Dte = DataLoader(test_set, batch_size=batch_size, shuffle=True)

In [16]:
# Makes sure that we have a directory of clean and adversarial images
dir = root + '/test_data/images/adversarial_images'

img_name = []
for path, subdirs, files in os.walk(dir):
    for name in files:
        img_name.append(name)
dir = root + '/test_data/images/clean_images'
for path, subdirs, files in os.walk(dir):
    for name in files:
        if name not in img_name:
            img_path = (path+'/'+name).replace("\\", "/")
            os.remove(img_path)

In [17]:
class BuildConv(nn.Module):
    def __init__(self , input_size , output_size , kernel_size = 4 , stride = 2 , padding = 1 , batch_norm = True, activation = True):
        super(BuildConv , self).__init__()
        self.Conv = nn.Conv2d(input_size,  output_size , kernel_size , stride , padding)
        self.activation = activation
        self.leaky_relu = nn.LeakyReLU(0.2 , True)
        self.batch_norm = batch_norm
        self.BatchNorm = nn.BatchNorm2d(output_size)
    def forward(self , x):
        if self.activation:
            op = self.Conv(self.leaky_relu(x))
        else:
            op = self.Conv(x)
        if self.batch_norm:
            return self.BatchNorm(op)
        else:
            return op

#BuildDeconv is a utility class to help build deconv layers
class BuildDeconv(nn.Module):
    def __init__(self , input_size , output_size , kernel_size = 4 , stride = 2 , padding = 1 , batch_norm = True , dropout = False):
        super(BuildDeconv ,self).__init__()
        self.DeConv = nn.ConvTranspose2d(input_size , output_size , kernel_size , stride , padding)
        self.BatchNorm = nn.BatchNorm2d(output_size)
        self.Dropout = nn.Dropout2d(0.5)
        self.relu = nn.ReLU(True)
        self.dropout = dropout
        self.batch_norm = batch_norm
    def forward(self , x):
        if self.batch_norm:
            op = self.BatchNorm(self.DeConv(self.relu(x)))
        else:
            op = self.DeConv(self.relu(x))
        if self.dropout:
            return self.Dropout(op)
        else:
            return op
    
#Let's define our Generator class (U-net architecture)
class Generator(nn.Module):
    def __init__(self ,input_dim , num_filters , output_dim):
        super(Generator , self).__init__()
        #Encoder
        #self.conv1 
        self.conv1 = BuildConv(input_dim , num_filters , batch_norm = True  , activation = False)
        self.conv2 = BuildConv(num_filters , num_filters*2)
        self.conv3 = BuildConv(num_filters*2 , num_filters*4)
        self.conv4 = BuildConv(num_filters*4 , num_filters*8)
        self.conv5 = BuildConv(num_filters*8 , num_filters*8)
        self.conv6 = BuildConv(num_filters*8 , num_filters*8)
        self.conv7 = BuildConv(num_filters*8 , num_filters*8)
        self.conv8 = BuildConv(num_filters*8 , num_filters*8 , batch_norm= False)
        #Decoder
        self.deconv1 = BuildDeconv(num_filters*8 , num_filters*8 , dropout= True)
        self.deconv2 = BuildDeconv(num_filters*8*2 , num_filters*8 , dropout= True)
        self.deconv3 = BuildDeconv(num_filters*8*2 , num_filters*8 , dropout= True)
        self.deconv4 = BuildDeconv(num_filters*8*2 , num_filters*8)
        self.deconv5 = BuildDeconv(num_filters*8*2 , num_filters*4)
        self.deconv6 = BuildDeconv(num_filters*4*2 , num_filters*2)
        self.deconv7 = BuildDeconv(num_filters*2*2 , num_filters)
        self.deconv8 = BuildDeconv(num_filters*2 , output_dim , batch_norm= False)
    def forward(self , x):
        #Encoder
        enc1 = self.conv1(x)
        enc2 = self.conv2(enc1)
        enc3 = self.conv3(enc2)
        enc4 = self.conv4(enc3)
        enc5 = self.conv5(enc4)
        enc6 = self.conv6(enc5)
        enc7 = self.conv7(enc6)
        enc8 = self.conv8(enc7)
        #Decoder with skip connections
        dec1 = self.deconv1(enc8)
        dec1 = torch.cat([dec1, enc7], 1)
        dec2 = self.deconv2(dec1)
        dec2 = torch.cat([dec2, enc6], 1)
        dec3 = self.deconv3(dec2)
        dec3 = torch.cat([dec3, enc5], 1)
        dec4 = self.deconv4(dec3)
        dec4 = torch.cat([dec4, enc4], 1)
        dec5 = self.deconv5(dec4)
        dec5 = torch.cat([dec5, enc3], 1)
        dec6 = self.deconv6(dec5)
        dec6 = torch.cat([dec6, enc2], 1)
        dec7 = self.deconv7(dec6)
        dec7 = torch.cat([dec7, enc1], 1)
        dec8 = self.deconv8(dec7)
        out = torch.nn.Tanh()(dec8)
        return out

    def normal_weight_init(self , mean = 0.0 ,std = 0.02):
        ''' This function initializes weights of layers'''
        for m in self.children():
            if isinstance(m , BuildConv):
                nn.init.normal_(m.Conv.weight , mean , std)
            if isinstance(m , BuildDeconv):
                nn.init.normal_(m.DeConv.weight , mean , std)

#Lets define class Discriminator
class Discriminator(nn.Module):
    def __init__(self , input_dim , num_filters , output_dim):
        super(Discriminator , self).__init__()
        self.conv1 = BuildConv(input_dim , num_filters , batch_norm= False , activation= False)
        self.conv2 = BuildConv(num_filters , num_filters*2)
        self.conv3 = BuildConv(num_filters*2 , num_filters*4)
        self.conv4 = BuildConv(num_filters*4 , num_filters*8 , stride = 1)
        self.conv5 = BuildConv(num_filters*8 , output_dim , stride = 1 , batch_norm= False)
    def forward(self , x , label):
        print('0: {}'.format(x.shape))
        x = torch.cat([x , label] , 1)
        print('1: {}'.format(x.shape))
        x = self.conv1(x)
        print('2: {}'.format(x.shape))
        x = self.conv2(x)
        print('3: {}'.format(x.shape))
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        op = torch.nn.Sigmoid()(x)
        return op

    def normal_weight_init(self , mean = 0.0 , std = 0.02):
        ''' This function initializes the weights for layers'''
        for m in self.children():
            if isinstance(m , BuildConv):
                nn.init.normal_(m.Conv.weight , mean , std)

In [18]:
data = iter(Dtr)
img , p = data.next()
print(img.shape , p.shape)

torch.Size([1, 3, 256, 256]) torch.Size([1])


In [19]:
#Plot Losses
def plot_loss(g_loss , d_loss ,num_epochs, save = False , save_dir = 'results/' , show = False):

    fig , ax = plt.subplots()
    ax.set_xlim(0 , num_epochs)
    ax.set_ylim(0 , max(np.max(g_loss) , np.max(d_loss))*1.1)
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.plot(d_loss , label = 'Discriminator')
    plt.plot(g_loss , label = 'Generator')
    plt.legend()

    if save:

        if not os.path.exists(save_dir):
            os.mkdir(save_dir)
        save_fn  = save_dir + 'loss_values_epoch_{:d}'.format(num_epochs) + '.png'
        plt.savefig(save_fn)

    if save:
        plt.show()
    else:
        plt.close()
        

#Function to plot test results

def plot_test_result(input, target, gen_image, epoch, training=True, save=False, save_dir='results/', show=False, fig_size=(5, 5)):
    if not training:
        fig_size = (input.size(2) * 3 / 100, input.size(3)/100)

    fig, axes = plt.subplots(1, 3, figsize=fig_size)
    imgs = [input, gen_image, target]
    for ax, img in zip(axes.flatten(), imgs):
        ax.axis('off')
        ax.set_adjustable('box-forced')
        # Scale to 0-255
        img = (((img[0] - img[0].min()) * 255) / (img[0].max() - img[0].min())).numpy().transpose(1, 2, 0).astype(np.uint8)
        ax.imshow(img, cmap=None, aspect='equal')
    plt.subplots_adjust(wspace=0, hspace=0)

    if training:
        title = 'Epoch {0}'.format(epoch + 1)
        fig.text(0.5, 0.04, title, ha='center')

    # save figure
    if save:
        if not os.path.exists(save_dir):
            os.mkdir(save_dir)
        if training:
            save_fn = save_dir + 'Result_epoch_{:d}'.format(epoch+1) + '.png'
        else:
            save_fn = save_dir + 'Test_result_{:d}'.format(epoch+1) + '.png'
            fig.subplots_adjust(bottom=0)
            fig.subplots_adjust(top=1)
            fig.subplots_adjust(right=1)
            fig.subplots_adjust(left=0)
        plt.savefig(save_fn)

    if show:
        plt.show()
    else:
        plt.close()

In [20]:
netG = Generator(3 , f_map_g , 3)
netD = Discriminator(4 , f_map_d , 1)
netG.normal_weight_init(mean = 0.0 , std = 0.02)
netD.normal_weight_init(mean = 0.0 , std  = 0.02)

#Loss functions
bce_loss = torch.nn.BCELoss()
l1_loss = torch.nn.L1Loss()

#Defining the optimizers
G_optim = torch.optim.Adam(netG.parameters() , lr = lrG , betas = (beta1 , beta2))
D_optim = torch.optim.Adam(netD.parameters() , lr = lrD , betas = (beta1 , beta2))


test_input, test_target = Dte.__iter__().__next__()

In [21]:
########Training############
# Training Loop
def batch_gd(netG, netD, Dtr, epochs):
    # Lists to keep track of progress
    print_every = 50
    start = time.time()
    G_avg_loss , D_avg_loss = [] , []

    netD.to(device)
    netG.to(device)

    for epoch in range(epochs):
        
        g_loss , d_loss = [] , []
        for i , (inp , target) in enumerate(Dtr):
            x , y = Variable(inp.to(device)) , Variable(target.to(device))
            # Expand the label tensor to match the spatial dimensions of the input
            y = y.view(y.size(0), 1, 1, 1)
            y = y.repeat(1, 1, x.size(2), x.size(3))
            
            #Training the discriminator with real_data
            D_real = netD(x , y).squeeze()
            real_target = Variable(torch.ones(D_real.size()).to(device))
            D_real_loss = bce_loss(D_real , real_target)
            
            #Training the discriminator with fake images
            gen_img = netG(x)
            D_fake = netD(x , gen_img).squeeze()
            fake_target = Variable(torch.ones(D_fake.size()).to(device))
            
            D_fake_loss = bce_loss(D_fake , fake_target)
            
            #Back Propogation
            D_loss = (D_real_loss + D_fake_loss) * 0.5
            D_optim.zero_grad()
            D_loss.backward()
            D_optim.step()
            
            #Training the Generator
            gen_img = netG(x)
            D_fake_decision = netD(x , gen_img).squeeze()
            G_fake_loss = bce_loss(D_fake_decision , real_target)
            
            #L1 loss
            
            G_l1 = Lambda*l1_loss(gen_img, y)
            
            #total generator loss
            
            G_loss = G_l1 + G_fake_loss
            G_optim.zero_grad()
            G_loss.backward()
            G_optim.step()
            
            
            g_loss.append(G_loss.item())
            d_loss.append(D_loss.item())
            if(i % print_every) == 0 :
                print("Epoch[{}/{}]|| D_loss : {:.4f} || G_loss : {:.4f}".format(epoch + 1 , epochs
                                                                    , D_loss.item() , G_loss.item()))
            
        #avg_loss values
        G_avg_loss.append(torch.mean(torch.FloatTensor(g_loss)))
        D_avg_loss.append(torch.mean(torch.FloatTensor(d_loss)))
        
        #Showing result for test image
        gen_image = netG(Variable(test_input.cuda()))
        gen_image = gen_image.cpu().data
        plot_test_result(test_input , test_target , gen_image , epoch ,save = True ,save_dir = './generated_samples/cgan/')
    print(time.time() - start)
    
#Plot average losses
#Utility.plot_loss(G_avg_loss , D_avg_loss , num_epochs , save = True , save_dir = save_dir)

#make gif
#Utility.generate_gif('maps' , num_epochs , save_dir = save_dir)

In [22]:
if train == True:
    batch_gd(netG, netD, Dtr, epochs)
    # Save the trained models
    model_list = [netD,netG]
    pickle.dump(model_list, open('trained_gan_{}.pkl'.format(img_size), 'wb'))

else:
    # Load already trained models
    model_list = pickle.load(open('trained_gan_{}.pkl'.format(img_size), 'rb'))
    netD = model_list[0]
    netG = model_list[1]

0: torch.Size([1, 3, 256, 256])
1: torch.Size([1, 4, 256, 256])
2: torch.Size([1, 64, 128, 128])
3: torch.Size([1, 128, 64, 64])
0: torch.Size([1, 3, 256, 256])
1: torch.Size([1, 6, 256, 256])


RuntimeError: Given groups=1, weight of size [64, 4, 4, 4], expected input[1, 6, 256, 256] to have 4 channels, but got 6 channels instead

In [None]:
plt.plot(range(1 , epochs + 1),G_avg_loss , color = 'blue')

plt.xlabel('Epochs')
plt.ylabel("Generator Loss")
plt.plot()

NameError: name 'epochs' is not defined

In [None]:
plt.plot(range(1 , num_epochs + 1),D_avg_loss , color = 'blue')

plt.xlabel('Epochs')
plt.ylabel("Discriminator Loss")
plt.plot()

In [None]:
#Testing
for i ,(inp , target) in enumerate(Dte):
    
    x , y = Variable(inp.cuda()) , Variable(target.cuda())
    gen_image = Gen(x)
    gen_image = gen_image.cpu()
    
    Utility.plot_test_result(inp , target, gen_image , i ,training = False , save = True , save_dir = save_test_dir)
    print('%d images are generated.' % (i + 1))