In [2]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Libraries for Dataset
from torchvision import transforms
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# Summary writer and path
from torch.utils.tensorboard import SummaryWriter

# Libraries for array operations
import numpy as np
import matplotlib.pyplot as plt

# Libraries for Pytorch dataset module
from torch.utils.data import Dataset, DataLoader

#Libraries for ML
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#Visualization and Saving Image
import matplotlib.pyplot as plt
from torchvision.utils import save_image

import copy


In [4]:
# Fetching Dataset
mnist_data = fetch_openml('mnist_784', version=1)
#mist = fetch_openml('mnist_784', version=1)


# Seperating data and Target
X = np.array(mnist_data.data.astype('float32'))
y = np.array(mnist_data.target.astype('int64'))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [5]:
t = []

In [6]:
t.append(np.where(y_train==0)[0])

In [7]:
t[0]

array([    5,    17,    21, ..., 52490, 52498, 52499])

In [8]:
y_train[0]

3

In [9]:
#copy.deepcopy(X_train[0])

In [11]:
class Mnist_Train_Dataset(Dataset):

  def __init__(self, noise=0,noise_mean = 0, noise_std = 0.2):

    '''
    Dataset module design for train dataset
    '''

    self.noise = noise

    self.noise_mean = noise_mean

    self.noise_std = noise_std

    self.size = np.shape(X_train)[1]

    self.max = 255


    # Calculating positive and negative indices of every target
    self.similar_target = []
    self.negative_target = []

    for i in range(0,10):
      self.similar_target.append(np.where(y_train==i)[0])
      self.negative_target.append(np.where(y_train!=i)[0])

  def __len__(self):

    '''
    Returning number of datapoints in training
    '''

    return np.shape(X_train)[0]

  def __getitem__(self,idx):
    '''
    Return Data vector and target vector
    
    Feature Vector : Normalized pixel values (Some noise might be added)

    Target Vector : Normalized pixel values (Without noise)
    '''

    # Normalization

    # Fetching anchor

    anchor_vector = copy.deepcopy(X_train[idx])/self.max

    target_vector = copy.deepcopy(X_train[idx])/self.max

    # Fetching Positive data

    while (1):

      positive_index = np.random.choice(self.similar_target[y_train[idx]],1)[0]

      if positive_index != idx:

        positive_vector = copy.deepcopy(X_train[positive_index])/self.max
        break

    # Fetching negative data
    
    negative_index = np.random.choice(self.negative_target[y_train[idx]],1)[0]

    negative_vector = copy.deepcopy(X_train[negative_index])/self.max

    # Adding Noise to the ANCHOR VECTOR



    # Changing shape of the vector

    anchor_vector = torch.squeeze(torch.from_numpy(np.reshape(anchor_vector,(1,self.size)))).float()

    positive_vector = torch.squeeze(torch.from_numpy(np.reshape(positive_vector,(1,self.size)))).float()

    negative_vector = torch.squeeze(torch.from_numpy(np.reshape(negative_vector,(1,self.size)))).float()

    target_vector = torch.squeeze(torch.from_numpy(np.reshape(target_vector,(1,self.size)))).float()

    sample = {'anchor_vector':anchor_vector, 'positive_vector':positive_vector, 'negative_vector':negative_vector, 'target_vector':target_vector}

    return sample


In [12]:
class Mnist_Validation_Dataset(Dataset):

  def __init__(self, noise=0,noise_mean = 0, noise_std = 0.2):

    '''
    Dataset module design for train dataset
    '''

    self.noise = noise

    self.noise_mean = noise_mean

    self.noise_std = noise_std

    self.size = np.shape(X_test)[1]

    self.max = 255


    # Calculating positive and negative indices of every target
    self.similar_target = []
    self.negative_target = []

    for i in range(0,10):
      self.similar_target.append(np.where(y_test==i)[0])
      self.negative_target.append(np.where(y_test!=i)[0])

  def __len__(self):

    '''
    Returning number of datapoints in training
    '''

    return np.shape(X_test)[0]

  def __getitem__(self,idx):
    '''
    Return Data vector and target vector
    
    Feature Vector : Normalized pixel values (Some noise might be added)

    Target Vector : Normalized pixel values (Without noise)
    '''

    # Normalization

    # Fetching anchor

    anchor_vector = copy.deepcopy(X_test[idx])/self.max

    target_vector = copy.deepcopy(X_test[idx])/self.max 

    # Fetching Positive data

    while (1):

      positive_index = np.random.choice(self.similar_target[y_test[idx]],1)[0]

      if positive_index != idx:

        positive_vector = copy.deepcopy(X_test[positive_index])/self.max
        break

    # Fetching negative data
    
    negative_index = np.random.choice(self.negative_target[y_test[idx]],1)[0]

    negative_vector = copy.deepcopy(X_test[negative_index])/self.max



    # Changing shape of the vector

    anchor_vector = torch.squeeze(torch.from_numpy(np.reshape(anchor_vector,(1,self.size)))).float()

    positive_vector = torch.squeeze(torch.from_numpy(np.reshape(positive_vector,(1,self.size)))).float()

    negative_vector = torch.squeeze(torch.from_numpy(np.reshape(negative_vector,(1,self.size)))).float()

    target_vector = torch.squeeze(torch.from_numpy(np.reshape(target_vector,(1,self.size)))).float()

    sample = {'anchor_vector':anchor_vector, 'positive_vector':positive_vector, 'negative_vector':negative_vector, 'target_vector':target_vector}

    return sample


In [13]:
class Mnist_Visualization(Dataset):

  def __init__(self, noise=0,noise_mean = 0, noise_std = 0.2):

    '''
    Dataset module design for train dataset
    '''

    self.noise = noise

    self.noise_mean = noise_mean

    self.noise_std = noise_std

    self.size = np.shape(X_test)[1]

    self.max = 255


    # Calculating positive and negative indices of every target
    self.similar_target = []
    self.negative_target = []

    for i in range(0,10):
      self.similar_target.append(np.where(y_test==i)[0])
      self.negative_target.append(np.where(y_test!=i)[0])

  def __len__(self):

    '''
    Returning number of datapoints in training
    '''

    return np.shape(X_test)[0]

  def __getitem__(self,idx):
    '''
    Return Data vector and target vector
    
    Feature Vector : Normalized pixel values (Some noise might be added)

    Target Vector : Normalized pixel values (Without noise)
    '''
    sample = {}

    for i in range(0,10):

      # Normalization

      # Fetching anchor

      anchor_vector = copy.deepcopy(X_test[np.random.choice(self.similar_target[i],1)[0]])/self.max

      target_vector = copy.deepcopy(anchor_vector)

      # Fetching Positive data


      positive_vector = copy.deepcopy(X_test[np.random.choice(self.similar_target[i],1)[0]])/self.max
      

      # Fetching negative data
      
      negative_index = np.random.choice(self.negative_target[y_test[idx]],1)[0]

      negative_vector = copy.deepcopy(X_test[negative_index])/self.max



      # Changing shape of the vector

      anchor_vector = torch.squeeze(torch.from_numpy(np.reshape(anchor_vector,(1,self.size)))).float()

      positive_vector = torch.squeeze(torch.from_numpy(np.reshape(positive_vector,(1,self.size)))).float()

      negative_vector = torch.squeeze(torch.from_numpy(np.reshape(negative_vector,(1,self.size)))).float()

      target_vector = torch.squeeze(torch.from_numpy(np.reshape(target_vector,(1,self.size)))).float()

      s = {'anchor_vector':anchor_vector, 'positive_vector':positive_vector, 'negative_vector':negative_vector, 'target_vector':target_vector}

      sample[i] = s
    return sample


In [14]:
# mnist_vis = Mnist_Visualization()
# mnist_vis[0]

In [15]:
class VAE(nn.Module):

  def __init__(self,latent_dimentions=2, p=0.2):

    super(VAE,self).__init__()

    '''
    Lets define encoder parameters.
    '''

    en_inp_dim = 28*28

    en_hid1 = 500

    en_hid2 = 300

    en_hid3 = 200

    en_out = latent_dimentions

    self.encoder1 = nn.Linear(en_inp_dim,en_hid1)

    self.en_drop1 = nn.Dropout(p=p)

    self.encoder2 = nn.Linear(en_hid1,en_hid2)

    self.en_drop2 = nn.Dropout(p=p)

    self.encoder3 = nn.Linear(en_hid2,en_hid3)

    self.en_drop3 = nn.Dropout(p=p)

    self.mean_layer = nn.Linear(en_hid3,en_out)

    self.log_var_layer = nn.Linear(en_hid3,en_out)


    '''
    Defining parameters for decoder
    '''

    dec_inp_dim = latent_dimentions

    dec_hid1 = 200

    dec_hid2 = 300

    dec_hid3 = 500

    dec_out = 28*28

    self.decoder1 = nn.Linear(dec_inp_dim,dec_hid1)

    self.dec_drop1 = nn.Dropout(p=p)

    self.decoder2 = nn.Linear(dec_hid1,dec_hid2)

    self.dec_drop2 = nn.Dropout(p=p)

    self.decoder3 = nn.Linear(dec_hid2,dec_hid3)

    self.dec_drop3 = nn.Dropout(p=p)

    self.decoder_out_layer = nn.Linear(dec_hid3,dec_out)



  def Encoder(self,x):

    '''
    Defining Dncoder Layer
    '''

    x = F.relu(self.encoder1(x))

    x = self.en_drop1(x)

    x = F.relu(self.encoder2(x))

    x = self.en_drop2(x)

    x = F.relu(self.encoder3(x))

    x = self.en_drop3(x)

    mean = self.mean_layer(x)   

    log_var = self.log_var_layer(x)

    return mean,log_var

  def Decoder(self,z):

    '''
    Defining Decoder Layer
    '''

    x = F.relu(self.decoder1(z))

    x = self.dec_drop1(x)

    x = F.relu(self.decoder2(x))

    x = self.dec_drop2(x)

    x = F.relu(self.decoder3(x))

    x = self.dec_drop3(x)

    x = F.sigmoid(self.decoder_out_layer(x))

    return x

  def Sample(self,mean,log_var):

    '''
    Sampling from gaussian ditribution using reparameterization technique
    '''
    #Satandard Deviation
    std = torch.exp(0.5*log_var)
    #Gaussian sample with Standard deviation std
    eps = torch.randn_like(std)
    # Gaussian sample with Standard deviation std and Mean mean
    return eps.mul(std).add_(mean)

  def forward(self, x):
    '''
    Forward pass through VAE
    '''
    # Extracting mean and variance from encoder
    mu, log_var = self.Encoder(x)
    # Fetching sample from distribution
    z = self.Sample(mu, log_var)
    
    # Return decoded signal with mean and log variance generated
    return (self.Decoder(z), mu, log_var)


In [16]:
def Euclidean_Distance(x1, x2):
  # Calculating Euclidean DIstance between x1 and x2
  return (x1 - x2).pow(2).sum(1)

def Triplet_loss(anchor,positive,negative):
  # Triplet loss
  Positive_distance = Euclidean_Distance(anchor, positive)
  Negative_distance = Euclidean_Distance(anchor, negative)
  loss = torch.relu(Positive_distance - Negative_distance + 1.0)
  
  return torch.sum(loss)
  



In [17]:
def Negative_ELBO_BCE_KL(pred, target, mean, log_var, batch_size, anchor, positive, negative):
  '''
  Function that compute ELBO and negates it so that we can convert maximization task to minimization task
  '''

  # Calculating Binary Cross Entropy
  # This part is to check reconstruction of target image
  BCE = F.binary_cross_entropy(pred, target, reduction='sum') 
  #print(BCE)

  # Calculating KL Divergence
  # This Calculates how good is mean and variance
  KL = -0.5 * torch.sum(1 + log_var - mean.pow(2) - log_var.exp())
  #print(KL)
  #KL = KL / batch_size

  T_loss = (Triplet_loss(anchor,positive,negative))


  return (BCE + KL + T_loss) / batch_size



In [18]:
#Setting up the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Reading data loader and dataset class
train_data = Mnist_Train_Dataset()
train_dataloader = DataLoader(train_data,batch_size=128,shuffle=True)

# Reading data loader and dataset class
validation_data = Mnist_Validation_Dataset()
validation_dataloader = DataLoader(validation_data,batch_size=128,shuffle=True)

# Visualization
mnist_vis = Mnist_Visualization()

# Defining VAE Model 
net = VAE(latent_dimentions=20).to(device)

# Defining loss function
criterion = nn.CrossEntropyLoss()

#Optimizer selection
#optimizer = optim.SGD(net.parameters(),lr = 0.0001)
optimizer = optim.Adam(net.parameters())

In [19]:
#train_data[0]

In [20]:
# default `log_dir` is "runs" - we'll be more specific here
writer = SummaryWriter('/content/drive/My Drive/Colab Notebooks/VAE/MNIST_NN_BCE_T/savings/LD20/')

In [21]:

for epoch in range(201):

  running_loss = 0
  running_acc = 0
  count = 0

  for i,data in enumerate(train_dataloader):
    net.train()
    # get the inputs; data is a list of [inputs, labels]
    #sample = {'anchor_vector':anchor_vector, 'positive_vector':positive_vector, 'negative_vector':negative_vector, 'target_vector':target_vector}

    anchor_inp, positive_inp = data['anchor_vector'].to(device), data['positive_vector'].to(device)

    negative_inp, labels = data['negative_vector'].to(device), data['target_vector'].to(device)

    # zero the parameter gradients
    optimizer.zero_grad()

    # forward + backward + optimize
    # Main output
    outputs = net(anchor_inp)
    # anchor mean
    anchor = outputs[1]
    # Positive mean
    positive = net(positive_inp)[1]
    # Negative mean
    negative = net(negative_inp)[1]

    #Calculating Accuracy
    #accuracy = torch.sum(torch.argmax(outputs,dim=1) == labels).float()/np.shape(labels)[0]
    #print(outputs.size())

    #Calculating Loss
    loss = Negative_ELBO_BCE_KL(pred=outputs[0], target=labels, mean=outputs[1], log_var=outputs[2], batch_size=np.shape(labels)[0], anchor=anchor, positive=positive, negative=negative) 
    loss.backward()
    optimizer.step()

    

    # print statistics
    running_loss += loss.item()
    #running_acc  += accuracy.item()
    count += 1
    if i % 100 == 0:    # print every 100 mini-batches
      #After one training loop
      print('------------------------------------------------------------------------------------------------------')
      print('Epoch:',epoch+1)
      print('Iteration: %d Average Training Loss: %.3f' % (i, running_loss / count))
      writer.add_scalar('Average Training Loss : ',running_loss / count,epoch * len(train_dataloader) + i)
      
      running_loss = 0.0
      
      count = 0
      val_loss = 0
      val_acc = 0
      val_count = 0
      net.eval()
      with torch.no_grad():
        for val_i,val_data in enumerate(validation_dataloader):
          anchor_inp, positive_inp = val_data['anchor_vector'].to(device), val_data['positive_vector'].to(device)

          negative_inp, val_labels = val_data['negative_vector'].to(device), val_data['target_vector'].to(device)
          # Main output
          val_outputs = net(anchor_inp)
          # anchor mean
          anchor = val_outputs[1]
          # Positive mean
          positive = net(positive_inp)[1]
          # Negative mean
          negative = net(negative_inp)[1]

          #Calculating Accuracy
          #accuracy = torch.sum(torch.argmax(outputs,dim=1) == labels).float()/np.shape(labels)[0]
          #print(outputs.size())

          #Calculating Loss
          val_loss += Negative_ELBO_BCE_KL(pred=val_outputs[0], target=val_labels, mean=val_outputs[1], log_var=val_outputs[2], batch_size=np.shape(val_labels)[0], anchor=anchor, positive=positive, negative=negative).item()
          
          val_count += 1
          #if val_i%10000 == 0:
          #  break
          
      print('Average Validation Loss:',val_loss/val_count)
      writer.add_scalar('Average Validation Loss',val_loss / val_count,epoch * len(train_dataloader) + i)
      
  if epoch % 50 == 0:


    temp_full = mnist_vis[0]

    for key,val in temp_full.items():

      out = net(val['anchor_vector'].to(device))

      save_image(out[0].data.view(28, 28),'/content/drive/My Drive/Colab Notebooks/VAE/MNIST_NN_BCE_T/savings/LD20/' + str(epoch)+'/'+str(key)+'_pred' + '.png')
      save_image(val['target_vector'].view(28,28),'/content/drive/My Drive/Colab Notebooks/VAE/MNIST_NN_BCE_T/savings/LD20/' + str(epoch)+'/'+str(key)+'.png')





------------------------------------------------------------------------------------------------------
Epoch: 1
Iteration: 0 Average Training Loss: 544.277
Average Validation Loss: 535.6910168724338
------------------------------------------------------------------------------------------------------
Epoch: 1
Iteration: 100 Average Training Loss: 228.625
Average Validation Loss: 201.52909973590045
------------------------------------------------------------------------------------------------------
Epoch: 1
Iteration: 200 Average Training Loss: 199.506
Average Validation Loss: 192.70166338621266
------------------------------------------------------------------------------------------------------
Epoch: 1
Iteration: 300 Average Training Loss: 190.366
Average Validation Loss: 181.41122358558823
------------------------------------------------------------------------------------------------------
Epoch: 1
Iteration: 400 Average Training Loss: 182.421
Average Validation Loss: 173.96524304

In [23]:
# Closing the Writer and saving the model. Saving the model is not enough, we also need to save the model class as well
writer.close()
PATH = '/content/drive/My Drive/Colab Notebooks/VAE/MNIST_NN_BCE_T/savings/LD20/saved_model'
torch.save(net.state_dict(), PATH)

In [None]:
0.6949*128