<a href="https://colab.research.google.com/github/LironSimon/modular_dl_algos/blob/main/vae_and_svm_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Imports & Downloads**
*Imports all needed packages, and creates a function to load FashionMnist dataset from torchvision.*

***An action is needed in the 2nd and 3rd cells!***


In [None]:
import numpy as np
import pandas as pd
import tqdm.notebook as tq
import random
import math
import pickle
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.autograd import Variable
from sklearn import svm, metrics

# check device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

**2. Mount to Drive.** *Ensures we can save information to Drive*

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


**3. Change to a relevant path** *to be able to save or load data properly.*

In [None]:
##########################################
# ACTION NEEDED! Change to a relevant path
##########################################
path = '/content/gdrive/MyDrive/Deep Learning/EX3_302338470'

**4. Create a function to load FashionMnist data.** *Returns a labeled set, an unlabeled set or a testing set - as needed. The number of images in the labeled set is defined by 'Nlabels'. As in Kingsma et al., all classes have the same number of labelled points.*

In [None]:
def load_data(train_bool, Nlabels=None, batch_size=64):
  # transform PIL images to tensors with 0 or 1 values
  flatten_bernoulli = lambda x: transforms.ToTensor()(x).view(-1).bernoulli()
  # load trainig/ testing data from the internet to root path.
  FashionMNIST_set = datasets.FashionMNIST(root=f'{path}/Q3 - saved data', download=True, train=train_bool, transform=flatten_bernoulli)  
  
  # load data accourding to type and number of labeled observations:

  if not train_bool:  # FashionMNIST_set is a testing set
    dataset = torch.utils.data.DataLoader(FashionMNIST_set, batch_size=len(FashionMNIST_set))
  
  else: # FashionMNIST_set is a training set
    if Nlabels is None:  # VAE training
      dataset = torch.utils.data.DataLoader(FashionMNIST_set, batch_size=batch_size)
    else:  # SVM training
      # count classes and initialise a list to save Nlabels indices of labeled observations
      Nclasses = len(FashionMNIST_set.classes)
      sample_indices = []
      for Class in range(Nclasses):
        # find observation of this class and squeeze to a 1D aaray
        class_indices = np.argwhere(FashionMNIST_set.targets.numpy() == Class).squeeze()
        # generate (Nlabels // Nclasses) random samples for this class, and save thier indices
        labeled_data_indices = np.random.choice(class_indices, Nlabels // Nclasses)
        [sample_indices.append(x) for x in labeled_data_indices]
        # create a randm list of labeled data indices 
        sampler = torch.utils.data.SubsetRandomSampler(sample_indices)
        # load labeled data from the specific indices
        dataset = torch.utils.data.DataLoader(FashionMNIST_set, batch_size=batch_size, sampler=sampler)  

  return dataset



#**2. Setting the model** 
*As described in [Kingsma et al. [2014]](https://proceedings.neurips.cc/paper/2014/file/d523773c6b194f37b938d340d5d02232-Paper.pdf). Model is constructed using several classes defined seperatly such as the Encoder and Decoder.*


**1. Define hyperparameters.**

In [None]:
batch_size = 64
epochs = 20
learning_rate = 3e-4
Nlabels = [100, 600, 1000, 3000]

**2. Set classes.** *Creates a stochastic layer, an encoder, a decoder, and finally - the M1 class.*

In [None]:
class GaussianSample(nn.Module):
    """ A stochastic layer that represents a sample from a
        Gaussian distribution."""

    def __init__(self, in_features, out_features):
      super().__init__()
      self.in_features = in_features
      self.out_features = out_features
      self.mu = nn.Linear(in_features, out_features)
      self.log_var = nn.Linear(in_features, out_features)
    
    def reparametrize(self, mu, log_var):
      ''' Draws a sample from a distribution parametrised by mu and log_var.'''
      # std = exp(0.5 * log_var)
      std = log_var.mul(0.5).exp_().to(device)
      # z = std * epsilon + mu
      epsilon = Variable(torch.randn(mu.size()), requires_grad=False).to(device)
      mu = mu.to(device)
      z = mu.addcmul(std, epsilon)
      return z

    def forward(self, x):
      mu = self.mu(x)
      log_var = self.log_var(x)
      return self.reparametrize(mu, log_var), mu, log_var


In [None]:
class Encoder(nn.Module):
  """Network that attempts to infer p(z|x) from the data
     by fitting a variational distribution q_φ(z|x). 
     Returns the two parameters of the distribution (µ, log σ²)."""    
  def __init__(self, dims):
    super().__init__()
    [Xdim, Ydim, Zdim] = dims
    neurons = [Xdim, *Ydim]
    self.sample = GaussianSample(Ydim[-1], Zdim)
    linear_layers = [nn.Linear(neurons[i-1], neurons[i]) for i in range(1, len(neurons))]
    self.hidden = nn.ModuleList(linear_layers)

  def forward(self, x):
    for layer in self.hidden:
        x = F.softplus(layer(x))
    return self.sample(x)


In [None]:
class Decoder(nn.Module):
  '''Network that generates samples from p(x) by finding p_θ(x|z).'''
  def __init__(self, dims):
      super().__init__()
      [Zdim, Ydim, Xdim] = dims
      neurons = [Zdim, *Ydim]
      self.reconstruction = nn.Linear(Ydim[-1], Xdim)
      linear_layers = [nn.Linear(neurons[i-1], neurons[i]) for i in range(1, len(neurons))]
      self.hidden = nn.ModuleList(linear_layers)
      self.output_activation = nn.Sigmoid()

  def forward(self, x):
      for layer in self.hidden:
          x = F.softplus(layer(x))
      return self.output_activation(self.reconstruction(x))


**2. Create a calculative function** *needed for M1 model*

In [None]:
def logGaussian(x, mu, log_var):
    """Returns log N(x|µ,σ), the log pdf of a normal distribution parametrised
    by mu (mean of distribution) and log_var ( log variance of distribution) evaluated at point x.""" 
    log_pdf = - 0.5 * math.log(2 * math.pi) - log_var / 2 - (x - mu)**2 / (2 * torch.exp(log_var))
    return torch.sum(log_pdf, dim=-1)

**3. Set M1** *accourding to the cited paper*

In [None]:
class M1_model(nn.Module):
  """An encoder/decoder pair for which
     a variational distribution is fitted to the
     encoder."""
  def __init__(self, Xdim):
       super().__init__()
       Ydim = (600, 600)
       Zdim = 50
       self.z_dim = Zdim
       self.flow = None
       self.encoder = Encoder([Xdim, Ydim, Zdim]).to(device)
       self.decoder = Decoder([Zdim, Ydim, Xdim]).to(device)
       self.kl_divergence = 0
       for m in self.modules():
        if isinstance(m, nn.Linear):
          nn.init.normal_(m.weight.data, std=1e-3).to(device)
          if m.bias is not None:
             m.bias.data.zero_().to(device)


  def _kld(self, z, q_param, p_param=None):
        """Given:
           - z: sample from q-distribuion
           - q_param: (mu, log_var) of the q-distribution
           - p_param: (mu, log_var) of the p-distribution
        Returns KL(q||p), the KL-divergence of element z."""
        (mu, log_var) = q_param
        # define qz
        if self.flow is not None:
            f_z, log_det_z = self.flow(z)
            qz = logGaussian(z, mu, log_var) - sum(log_det_z)
            z = f_z
        else: qz = logGaussian(z, mu, log_var)
        # define pz
        if p_param is None:  pz = torch.sum(-0.5 * math.log(2 * math.pi) - z ** 2 / 2, dim=-1)
        else:
            (mu, log_var) = p_param
            pz = logGaussian(z, mu, log_var)
        # Return KL-divergence of elemnt z
        return qz - pz


  def forward(self, x):
        """Given a data point x,
           returns its reconstruction and q distribution parameters."""
        z, z_mu, z_log_var = self.encoder(x)
        self.kl_divergence = self._kld(z, (z_mu, z_log_var))
        x_mu = self.decoder(z)
        return x_mu


  def sample(self, z):
        """Given z ~ N(0, I) generates a sample from
        the learned distribution based on p_θ(x|z)."""
        return self.decoder(z)
  

  def add_flow(self, flow):
      self.flow = flow


# **3. Train VAE and prepare to test M1**

**1. Train VAE and save the weights.** *Uses Adam optimizer and BCE loss criterion.*

In [None]:
def trainVAE(epochs=epochs, batch_size=batch_size,lr=learning_rate):
    """Trains the implemented M1 VAE network using Adam optimizer and BCE Loss.
       Returs the trained weights"""
    # load the VAE training dataset in batches of 64 images
    dataset = load_data(train_bool=True, Nlabels=None, batch_size=batch_size)
    # initialize model
    VAE = M1_model(784)   # Xdim = size of an image
    VAE.to(device)
    # Set Adam optimizer, learning rate and  Binary Cross Entropy criterion
    optimizer = torch.optim.Adam(VAE.parameters(), lr=learning_rate)
    criterion = torch.nn.BCELoss(reduction='none')

    # train...
    loss_lst = []
    for epoch in tq.tqdm(range(epochs), desc="VAE Training Progress"):
        VAE.train()
        totLoss = 0
        for samples, _ in dataset:
            samples = samples.to(device)
            x = VAE(samples)
            likelihood = -criterion(x, samples)
            J = likelihood.sum(axis=1) - VAE.kl_divergence
            loss = -torch.mean(J)
            
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            totLoss += loss.item()
        loss_lst.append(totLoss)

    return VAE.state_dict()

In [None]:
VAE_weights = trainVAE()
torch.save(VAE_weights, f'{path}/Q3 - saved data/VAEweights.pt')

VAE Training Progress:   0%|          | 0/20 [00:00<?, ?it/s]

**2. Create a function to train the SVM** *and calculate accuracy in [%]*

In [None]:
def trainSVM(Nlabels, VEA_model):
    '''Given the desired number of labeled data,
       Returns the trained SVM and its accuracy[%] '''

    # load the SVM training dataset in batches
    dataset = load_data(train_bool=True, Nlabels=Nlabels, batch_size=Nlabels)

    # get latent representation of dataset:
    samples, labels = next(iter(dataset))
    samples = samples.to(device)
    latent, _, _ = VAE.encoder(samples)

    # train the SVM based on the latent representation:
    trainedSvm = svm.SVC(kernel='rbf')
    trainedSvm.fit(latent.detach().to('cpu').numpy(), labels)

    # Initialize test set:
    dataloader = load_data(train_bool=False)
    
    # get testSet's latent representation:
    samples, labels = next(iter(dataloader))
    
    # Get latent representation and predict class:
    samples = samples.to(device)
    latent, _, _ = VAE.encoder(samples)
    preds = trainedSvm.predict(latent.detach().to('cpu').numpy())
    
    # calculate accuracy:
    accuracy = metrics.accuracy_score(labels, preds)*100

    return trainedSvm, accuracy

# **4. Train and test M1 in 4 variations**
*Each time using a differrent number of labeled images.*

**1. Set needed parameters** 

In [None]:
accuracies = pd.Series(index=Nlabels, data=np.zeros_like(Nlabels), name='Accuracies', dtype=float)

seed = 10 # so the results would be consistent

##########################################
# ACTION NEEDED! Change to a relevant path
##########################################
# in case you didn't train the VAE and would
# like to load weights, remove hashtags from the next line:
#
# VAE_weights = torch.load(f'{path}/Q3 - saved data/VAEweights.pt')

**2. Loop over number of labels used** *from [100,600,1000,3000], save the trained SVM classifier to drive, and save the accuracy of each net to a dataframe.*

In [None]:
pbar = tq.tqdm(Nlabels)
for labels in pbar:
    pbar.set_description(f"Training with {labels} Labels")
    torch.manual_seed(seed)
    np.random.seed(seed)

    # initialize model and load the trained weights
    VAE = M1_model(784)
    VAE.load_state_dict(VAE_weights)
    VAE.to(device)

    ## Train and save SVM:   
    with torch.no_grad():
        VAE.eval()
        trainedSvm, accuracy = trainSVM(labels, VAE)

    pickle.dump(trainedSvm, open(f'{path}/Q3 - saved data/{str(labels)}.sav', 'wb'))
   
    accuracies[labels] = accuracy
    
    print(f'{labels} Labels Accuracy: {round(accuracies[labels],2)}')
    print('-'*100)


  0%|          | 0/4 [00:00<?, ?it/s]

100 Labels Accuracy: 29.95
----------------------------------------------------------------------------------------------------
600 Labels Accuracy: 52.62
----------------------------------------------------------------------------------------------------
1000 Labels Accuracy: 58.53
----------------------------------------------------------------------------------------------------
3000 Labels Accuracy: 68.11
----------------------------------------------------------------------------------------------------


#**5. Show results and compare to paper.**
*Shows the error count in [%] and compares to the error count presented in Kingsma et al. [2014] from using M1 model on MNIST dataset (vs FashionMNIST)*


**1. Create an np.series of classification error** *from the 'accuracies' series, and save it to drive.*

In [None]:
classErr = 100 - accuracies
classErr.name = 'Classification Error [%] of M1 over FashionMnist'

classErr.to_csv(f'{path}/Q3 - saved data/results.csv')

**2. Create an array with the results from the paper** *comparing the classification error of M1 on FashionMnist to that on Mnist*

In [None]:
paper_results = pd.Series([11.82,5.72,4.24,3.49], index=Nlabels, name='Classification Error [%] of M1 over Mnist')
pd.concat([classErr,paper_results], axis=1)

Unnamed: 0,Classification Error [%] of M1 over FashionMnist,Classification Error [%] of M1 over Mnist
100,70.05,11.82
600,47.38,5.72
1000,41.47,4.24
3000,31.89,3.49
