In [None]:
import torch
import numpy as np
from torchvision import datasets
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.autograd import Variable
import torchvision.utils as vutils
import torchvision as torchvision
!pip install barbar
from barbar import Bar
# !pip install cifar

from google.colab import drive

print(torch.__version__)

# drive.mount('/content/drive')

DATASET_DIR = '/content/drive/My Drive/data/cifar/'
# DATASET_DIR = '/DATASET/SVHN/'
# DATASET_DIR = '/DATASET/CELEB_A/'
IMAGE_DIR = '/content/drive/My Drive/data/images/'

DATASET = "NULL"

def get_cifar10(args, data_dir=DATASET_DIR):
    """Returning cifar dataloder."""
    transform = transforms.Compose([transforms.Resize(32), #3x32x32 images.
                                    transforms.ToTensor()])
    train = datasets.CIFAR10(root=data_dir, train=True, download=True, transform=transform)
    test = datasets.CIFAR10(root=data_dir, train=False, download=True, transform=transform)
    train_dataloader = DataLoader(train, batch_size=args.batch_size, shuffle=True)
    test_dataloader = DataLoader(test, batch_size=args.batch_size, shuffle=True)
    DATASET = "cifar10"
    return train_dataloader,test_dataloader

def get_SVHN(args, data_dir=DATASET_DIR):
    """Returning cifar dataloder."""
    transform = transforms.Compose([transforms.Resize(32), #3x32x32 images.
                                    transforms.ToTensor()])
    train = datasets.SVHN(root=data_dir, split='train', download=True, transform=transform)
    test = datasets.SVHN(root=data_dir, split='test', download=True, transform=transform)
    train_dataloader = DataLoader(train, batch_size=args.batch_size, shuffle=True)
    test_dataloader = DataLoader(test, batch_size=args.batch_size, shuffle=True)
    DATASET = "SVHN"
    return train_dataloader,test_dataloader

def get_CELEB_A(args, data_dir=DATASET_DIR):
    """Returning cifar dataloder."""
    transform = transforms.Compose([transforms.Resize(32), #3x32x32 images.
                                    transforms.ToTensor()])
    train = datasets.CelebA(root=data_dir, split='train', download=True, transform=transform)
    test = datasets.CelebA(root=data_dir, split='test', download=True, transform=transform)
    train_dataloader = DataLoader(train, batch_size=args.batch_size, shuffle=True)
    test_dataloader = DataLoader(test, batch_size=args.batch_size, shuffle=True)
    return train_dataloader,test_dataloader


In [2]:
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1 and classname != 'Conv':
        torch.nn.init.normal_(m.weight.data, 0.0, 1e-3)
        if m.bias is not None:
            m.bias.data.fill_(0)
    elif classname.find("Linear") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 1e-3)
        if m.bias is not None:
            m.bias.data.fill_(0)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.01)
        if m.bias is not None:
            m.bias.data.fill_(0)

In [3]:
class Discriminator(nn.Module):
    def __init__(self, z_dim=32, wasserstein=False):
        super(Discriminator, self).__init__()
        self.wass = wasserstein

        # Inference over x
        self.conv1x = nn.Conv2d(3, 32, 5, stride=1, bias=False)
        self.conv2x = nn.Conv2d(32, 64, 4, stride=2, bias=False)
        self.bn2x = nn.BatchNorm2d(64)
        self.conv3x = nn.Conv2d(64, 128, 4, stride=1, bias=False)
        self.bn3x = nn.BatchNorm2d(128)
        self.conv4x = nn.Conv2d(128, 256, 4, stride=2, bias=False)
        self.bn4x = nn.BatchNorm2d(256)
        self.conv5x = nn.Conv2d(256, 512, 4, stride=1, bias=False)
        self.bn5x = nn.BatchNorm2d(512)

        # Inference over z
        self.conv1z = nn.Conv2d(z_dim, 512, 1, stride=1, bias=False)
        self.conv2z = nn.Conv2d(512, 512, 1, stride=1, bias=False)

        # Joint inference
        self.conv1xz = nn.Conv2d(1024, 1024, 1, stride=1, bias=False)
        self.conv2xz = nn.Conv2d(1024, 1024, 1, stride=1, bias=False)
        self.conv3xz = nn.Conv2d(1024, 1, 1, stride=1, bias=False)

    def inf_x(self, x):
        x = F.dropout2d(F.leaky_relu(self.conv1x(x), negative_slope=0.1), 0.2)
        x = F.dropout2d(F.leaky_relu(self.bn2x(self.conv2x(x)), negative_slope=0.1), 0.2)
        x = F.dropout2d(F.leaky_relu(self.bn3x(self.conv3x(x)), negative_slope=0.1), 0.2)
        x = F.dropout2d(F.leaky_relu(self.bn4x(self.conv4x(x)), negative_slope=0.1), 0.2)
        x = F.dropout2d(F.leaky_relu(self.bn5x(self.conv5x(x)), negative_slope=0.1), 0.2)
        return x

    def inf_z(self, z):
        z = F.dropout2d(F.leaky_relu(self.conv1z(z), negative_slope=0.1), 0.2)
        z = F.dropout2d(F.leaky_relu(self.conv2z(z), negative_slope=0.1), 0.2)
        return z

    def inf_xz(self, xz):
        xz = F.dropout(F.leaky_relu(self.conv1xz(xz), negative_slope=0.1), 0.2)
        xz = F.dropout(F.leaky_relu(self.conv2xz(xz), negative_slope=0.1), 0.2)
        return self.conv3xz(xz)

    def forward(self, x, z):
        x = self.inf_x(x)
        z = self.inf_z(z)
        xz = torch.cat((x,z), dim=1)
        out = self.inf_xz(xz)
        if self.wass:
            return out
        else:
            return torch.sigmoid(out)


class Generator(nn.Module):
    def __init__(self, z_dim=32):
        super(Generator, self).__init__()
        self.z_dim = z_dim

        self.output_bias = nn.Parameter(torch.zeros(3, 32, 32), requires_grad=True)
        self.deconv1 = nn.ConvTranspose2d(z_dim, 256, 4, stride=1, bias=False)
        self.bn1 = nn.BatchNorm2d(256)
        self.deconv2 = nn.ConvTranspose2d(256, 128, 4, stride=2, bias=False)
        self.bn2 = nn.BatchNorm2d(128)
        self.deconv3 = nn.ConvTranspose2d(128, 64, 4, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(64)
        self.deconv4 = nn.ConvTranspose2d(64, 32, 4, stride=2, bias=False)
        self.bn4 = nn.BatchNorm2d(32)
        self.deconv5 = nn.ConvTranspose2d(32, 32, 5, stride=1, bias=False)
        self.bn5 = nn.BatchNorm2d(32)
        self.deconv6 = nn.Conv2d(32, 3, 1, stride=1, bias=True)

    def forward(self, z):
        z = F.leaky_relu(self.bn1(self.deconv1(z)), negative_slope=0.1)
        z = F.leaky_relu(self.bn2(self.deconv2(z)), negative_slope=0.1)
        z = F.leaky_relu(self.bn3(self.deconv3(z)), negative_slope=0.1)
        z = F.leaky_relu(self.bn4(self.deconv4(z)), negative_slope=0.1)
        z = F.leaky_relu(self.bn5(self.deconv5(z)), negative_slope=0.1)
        return torch.sigmoid(self.deconv6(z) + self.output_bias)


class Encoder(nn.Module):
    def __init__(self, z_dim=32):
        super(Encoder, self).__init__()
        self.z_dim = z_dim
        self.conv1 = nn.Conv2d(3, 32, 5, stride=1, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, 4, stride=2, bias=False)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, 4, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 256, 4, stride=2, bias=False)
        self.bn4 = nn.BatchNorm2d(256)
        self.conv5 = nn.Conv2d(256, 512, 4, stride=1, bias=False)
        self.bn5 = nn.BatchNorm2d(512)
        self.conv6 = nn.Conv2d(512, 512, 1, stride=1, bias=False)
        self.bn6 = nn.BatchNorm2d(512)
        self.bn7 = nn.Conv2d(512, z_dim*2, 1, stride=1, bias=True)

    def reparameterize(self, z):
        z = z.view(z.size(0), -1)
        mu, log_sigma = z[:, :self.z_dim], z[:, self.z_dim:]
        std = torch.exp(log_sigma)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        x = F.leaky_relu(self.bn1(self.conv1(x)), negative_slope=0.1)
        x = F.leaky_relu(self.bn2(self.conv2(x)), negative_slope=0.1)
        x = F.leaky_relu(self.bn3(self.conv3(x)), negative_slope=0.1)
        x = F.leaky_relu(self.bn4(self.conv4(x)), negative_slope=0.1)
        x = F.leaky_relu(self.bn5(self.conv5(x)), negative_slope=0.1)
        x = F.leaky_relu(self.bn6(self.conv6(x)), negative_slope=0.1)
        z = self.reparameterize(self.conv6(x))
        return z.view(x.size(0), self.z_dim, 1, 1)


In [None]:
!pip install tqdm
from tqdm import tqdm
import matplotlib.image as mpimg
import os

def D_loss(DG, DE, eps=1e-6):
    loss = torch.log(DE + eps) + torch.log(1 - DG + eps)
    return -torch.mean(loss)

def EG_loss(DG, DE, eps=1e-6):
  loss = torch.log(DG + eps) + torch.log(1 - DE + eps)
  return -torch.mean(loss)

if not os.path.exists("/prj_gen_images"):
    os.makedirs("/prj_gen_images")

class TrainerBiGAN:
    def __init__(self, args, data, device):
        self.args = args
        self.train_loader = data
        self.device = device


    def getModel(self):
      return self.G,self.D,self.E 



    def train(self):
        """Training the BiGAN"""
        self.G = Generator(self.args.latent_dim).to(self.device)
        self.E = Encoder(self.args.latent_dim).to(self.device)
        self.D = Discriminator(self.args.latent_dim, self.args.wasserstein).to(self.device)

        self.G.apply(weights_init_normal)
        self.E.apply(weights_init_normal)
        self.D.apply(weights_init_normal)

        if self.args.wasserstein:
            optimizer_ge = optim.RMSprop(list(self.G.parameters()) +
                                         list(self.E.parameters()), lr=self.args.lr_rmsprop)
            optimizer_d = optim.RMSprop(self.D.parameters(), lr=self.args.lr_rmsprop)
        else:
            optimizer_ge = optim.Adam(list(self.G.parameters()) +
                                      list(self.E.parameters()), lr=self.args.lr_adam)
            optimizer_d = optim.Adam(self.D.parameters(), lr=self.args.lr_adam)

        fixed_z = Variable(torch.randn((16, self.args.latent_dim, 1, 1)),
                           requires_grad=False).to(self.device)
        # criterion = nn.BCELoss()
        stored_d_loss = []
        stored_ge_loss = []
        for epoch in range(self.args.num_epochs+1):
            ge_loss_accuracy = 0
            d_loss_accuracy = 0

            for i,(x,label)  in enumerate(tqdm(self.train_loader)):

                ################################################################################################
                #######################                                         ################################
                #######################         DISCRIMINATOR TRAINING          ################################
                #######################                                         ################################
                ################################################################################################

                # Cleaning gradient of D.
                optimizer_d.zero_grad()

                # Generator:
                z_fake = Variable(torch.randn((x.size(0), self.args.latent_dim, 1, 1)).to(self.device),
                                  requires_grad=False)
                # compute G(z)
                x_fake = self.G(z_fake.to(self.device))

                # Encoder:
                x_true = x.float().to(self.device)
                # compute E(x)
                z_true = self.E(x_true)

                # Discriminator
                # compute D(x, E(x))
                out_true = self.D(x_true , z_true)#.squeeze(1).squeeze(1)
                # compute D(G(z),z)
                out_fake = self.D(x_fake , z_fake)#.squeeze(1).squeeze(1)



                # compute losses
                loss_d = D_loss(out_fake,out_true)
                d_loss_accuracy += loss_d.item()
                # loss_d = criterion(out_true, y_true) + criterion(out_fake, y_fake)

                # Computing gradients and backpropagate in order to train the discriminator.
                loss_d.backward()
                optimizer_d.step()


                ################################################################################################
                #######################                                         ################################
                #######################         GEN & ENC TRAINING              ################################
                #######################                                         ################################
                ################################################################################################


                # Cleaning gradient.
                optimizer_ge.zero_grad()

                # Generator:
                z_fake = Variable(torch.randn((x.size(0), self.args.latent_dim, 1, 1)).to(self.device),
                                  requires_grad=False)
                # compute G(z)
                x_fake = self.G(z_fake)

                # Encoder:
                x_true = x.float().to(self.device)
                # compute E(x)
                z_true = self.E(x_true)

                # Discriminator
                # compute D(x, E(x))
                out_true = self.D(x_true , z_true)#.squeeze(1).squeeze(1)
                # compute D(G(z),z)
                out_fake = self.D(x_fake , z_fake)#.squeeze(1).squeeze(1)

                
                
                # compute losses
                loss_ge = EG_loss(out_fake,out_true)
                ge_loss_accuracy += loss_ge.item()

                # Computing gradients and backpropagate in order to train the generator and encoder.
                loss_ge.backward()
                optimizer_ge.step()
               
  
            stored_d_loss.append(d_loss_accuracy/len(self.train_loader))
            stored_ge_loss.append(ge_loss_accuracy/len(self.train_loader))

            if epoch % 1 == 0:
                with torch.no_grad():
                  z_fake = Variable(torch.randn((x.size(0), self.args.latent_dim, 1, 1)).to(self.device),
                                    requires_grad=False)
                  g_result = self.G(z_fake)
                  ge_result = self.G(self.E(x.float().to(self.device)))

                  vutils.save_image(g_result[:16].data, '/prj_gen_images/{}_G_fake.png'.format(epoch))
                  vutils.save_image(x[:16].cpu().data, '/prj_gen_images/{}_X_fake.png'.format(epoch))
                  vutils.save_image(ge_result[:16].data, '/prj_gen_images/{}_G(E(x))_fake.png'.format(epoch))

                  g_img = mpimg.imread('/prj_gen_images/{}_G_fake.png'.format(epoch))
                  ge_img = mpimg.imread('/prj_gen_images/{}_G(E(x))_fake.png'.format(epoch))
                  x_img = mpimg.imread('/prj_gen_images/{}_X_fake.png'.format(epoch))


                  fig, ax = plt.subplots(4, 1, figsize=(15, 7))
                  fig.subplots_adjust(wspace=0.05, hspace=0.4)
                  plt.rcParams.update({'font.size': 20})
                  fig.suptitle('Epoch {}'.format(epoch))
                  fig.text(0.04, 0.80, 'G(z)', ha='left')
                  fig.text(0.04, 0.60, 'x', ha='left')
                  fig.text(0.04, 0.40, 'G(E(x))', ha='left')
                  # fig.text(0.04, 0.20, 'G(E(x))', ha='left')

                  ax[0].imshow(g_img, cmap='gray')
                  ax[0].axis('off')
                  ax[1].imshow(x_img, cmap='gray')
                  ax[1].axis('off')
                  ax[2].imshow(ge_img, cmap='gray')
                  ax[2].axis('off')
                  # ax[3].set_xlim([0,i+1])
                  ax[3].plot(stored_ge_loss,label='GE loss')
                  ax[3].plot(stored_d_loss,label='D loss')
                  ax[3].legend()
                  plt.show()




            print("\nTraining... Epoch: {}, Discrimiantor Loss: {:.3f}, Generator Loss: {:.3f}".format(
                epoch, d_loss_accuracy/i, ge_loss_accuracy/i)
            )
             

        



In [None]:
import argparse 
import matplotlib.pyplot as plt

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_epochs", type=int, default=200,
                        help="number of epochs")
    parser.add_argument('--lr_adam', type=float, default=1e-4,
                        help='learning rate')
    parser.add_argument('--lr_rmsprop', type=float, default=1e-4,
                        help='learning rate RMSprop if WGAN is True.')
    parser.add_argument("--batch_size", type=int, default=128,
                        help="Batch size")
    parser.add_argument('--latent_dim', type=int, default=256,
                        help='Dimension of the latent variable z')
    parser.add_argument('--wasserstein', type=bool, default=False,
                        help='If WGAN.')
    parser.add_argument('--clamp', type=float, default=1e-2,
                        help='Clipping gradients for WGAN.')

    #parsing arguments.
    args = parser.parse_args(args=[]) 

    #check if cuda is available.
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    train,test = get_cifar10(args)
    # train,test = get_SVHN(args)
    # train,test = get_CELEB_A(args)
   
    bigan = TrainerBiGAN(args, train, device)
    bigan.train()



In [None]:
from google.colab import files
import shutil

shutil.make_archive('/prj_gen_images', 'zip', '/prj_gen_images')
files.download('/prj_gen_images.zip') 

In [None]:
model_D = 'D.pt'
model_G = 'G.pt'
model_E = 'E.pt'

G,D,E = bigan.getModel()

if not os.path.exists("/prj_models"):
    os.makedirs("/prj_models")

path = F"/prj_models/{model_D}" 
torch.save(D.state_dict(), path)
path = F"/prj_models/{model_G}" 
torch.save(G.state_dict(), path)
path = F"/prj_models/{model_E}" 
torch.save(E.state_dict(), path)

shutil.make_archive('/prj_models', 'zip', '/prj_models')
files.download('/prj_models.zip') 


#1-NEAREST NEIGHBOUR

In [None]:
from sklearn.neighbors import KNeighborsClassifier

vedere perche non va

print(DATASET)

if DATASET == "SVHN":
  print("SVHN")
  X_train, y_train = train.dataset.data, train.dataset.labels
  X_test, y_test = test.dataset.data, test.dataset.labels
elif DATASET == "cifar10":
  print("cifar10")
  X_train, y_train = train.dataset.data, train.dataset.targets
  X_test, y_test = test.dataset.data, test.dataset.targets

E.eval()

with torch.no_grad():
    # EX_train = bigan.E(torch.tensor(X_train).float().to(bigan.device))
    EX_train = bigan.E(torch.tensor(X_train).float())
    EX_test = bigan.E(X_test)

KNN = KNeighborsClassifier(n_neighbors=1)
KNN.fit(EX_train, y_train)
predictions = KNN.predict(EX_test)
accuracy = np.sum(predictions == y_test) / len(y_test)
print('Accuracy {:.2f}%'.format(accuracy*100))