# Siamese Network Mnist Dataset

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
import torchvision.transforms as transforms
from torchvision.datasets import MNIST

class DATA():

    def __init__(self):
        self.batch_size = 128
        self.transform = None
        self.data_index = 0
        self.dataset = None
        self.train_dataloader = None
        self.test_dataloader = None
        #self.test_data = None

    def read(self):
        trans = transforms.Compose([transforms.ToTensor()])
        self.dataset = MNIST(root="~/torch_datasets", train = True, transform = trans, download=True)
        self.dataset_test = MNIST(root="~/torch_datasets", train = False, transform = trans, download=True)
        self.train_dataloader = DataLoader(self.dataset, self.batch_size, shuffle = True)
        self.test_dataloader = DataLoader(self.dataset_test, self.batch_size, shuffle = True)

    def read_test(self):
      pass
        # dataset = input_data.read_data_sets(root="~/torch_datasets", one_hot = False)   
        # test_data_x = dataset.test.images
        # test_data_y = dataset.test.labels
        # return test_data_x, test_data_y
    
    def generate_batch(self):
      train_iter = iter(self.train_dataloader)
      input_1, label_1 = next(train_iter)
      input_2, label_2 = next(train_iter)
      input_1 = input_1.reshape(input_1.size()[0], -1)
      input_2 = input_2.reshape(input_2.size()[0], -1)
      np_label_1 = label_1.numpy()
      np_label_2 = label_2.numpy()
      label = (np_label_1 == np_label_2).astype('float32')
      return input_1, input_2, label

    def generate_batch_test(self):
      train_iter = iter(self.test_dataloader)
      input_1, label_1 = next(train_iter)
      input_2, label_2 = next(train_iter)
      input_1 = input_1.reshape(input_1.size()[0], -1)
      input_2 = input_2.reshape(input_2.size()[0], -1)
      np_label_1 = label_1.numpy()
      np_label_2 = label_2.numpy()
      label = (np_label_1 == np_label_2).astype('float32')
      return input_1, input_2, label

In [None]:
import torch 
import torch.nn as nn
from torch.autograd import Variable
import os
    

class MODEL(nn.Module):
    def __init__(self):
        super(MODEL, self).__init__()
        self.HiddenLayer_1 = nn.Linear(28*28, 1024)
        self.HiddenLayer_2 = nn.Linear(1024, 1024)
        self.OutputLayer = nn.Linear(1024, 2)
        
    def forward_once(self, X):
        output = nn.functional.relu(self.HiddenLayer_1(X))
        output = nn.functional.relu(self.HiddenLayer_2(output))
        output = self.OutputLayer(output)
        return output
    
    def forward(self, X1, X2):
        out_1 = self.forward_once(X1)
        out_2 = self.forward_once(X2)
        return out_1, out_2
        
class ContrastiveLoss(torch.nn.Module):
    
    def __init__(self, margin = 5.0):
      super(ContrastiveLoss, self).__init__()
      self.margin = margin
      self.eps = 1e-6
        
    def forward(self, out_1, out_2, Y):
      euclidean_distance = nn.functional.pairwise_distance(out_1, out_2)
      loss_contrastive = torch.mean((Y) * torch.pow(euclidean_distance, 2) + (1 - Y) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
      return loss_contrastive


class Operators():
    
    def __init__(self, net):
      self.net = net
      self.loss = ContrastiveLoss()
      self.optimizer = torch.optim.SGD(self.net.parameters(), lr = 0.01)
        
    def train(self, data):      
        for epoch in range(5000):
          input_1, input_2, out = data.generate_batch()
          X_1 = Variable(torch.Tensor(input_1).float())
          X_2 = Variable(torch.Tensor(input_2).float())
          Y = Variable(torch.Tensor(out).float())
          self.optimizer.zero_grad()
          out_1, out_2 = self.net.forward(X_1, X_2)
          loss_val = self.loss.forward(out_1, out_2, Y)
          loss_val.backward()
          self.optimizer.step()
          if epoch % 500 == 0:
            print('Epoch: %d Loss: %.3f' % (epoch, loss_val))
        
    def test(self, dataX):
        self.net.eval()
        val_loss = 0
        for epoch in range(5000):
          input_1, input_2, out = data.generate_batch_test()
          X_1 = Variable(torch.Tensor(input_1).float())
          X_2 = Variable(torch.Tensor(input_2).float())
          Y = Variable(torch.Tensor(out).float())
          self.optimizer.zero_grad()
          out_1, out_2 = self.net.forward(X_1, X_2)
          loss_val = self.loss.forward(out_1, out_2, Y)
          loss_val.backward()
          self.optimizer.step()
          val_loss += loss_val
        val_loss /= 5000
        print('Test Loss: %.3f' % (val_loss))

In [None]:
net = MODEL()

In [None]:
data = DATA()
data.read()

In [None]:
modeloperator = Operators(net)

In [None]:
modeloperator.train(data)

Epoch: 0 Loss: 21.979
Epoch: 500 Loss: 1.297
Epoch: 1000 Loss: 1.204
Epoch: 1500 Loss: 1.301
Epoch: 2000 Loss: 1.160
Epoch: 2500 Loss: 0.669
Epoch: 3000 Loss: 0.691
Epoch: 3500 Loss: 0.553
Epoch: 4000 Loss: 1.293
Epoch: 4500 Loss: 1.092


In [None]:
modeloperator.test(data)

Test Loss: 0.324


# Siamese Network Using Triplet Loss

In [None]:
from torch.utils.data import Dataset
from PIL import Image
from torchvision import datasets, transforms
import torchvision
import numpy as np


class TripletMNIST(Dataset):
    def __init__(self, mnist_dataset):
        self.mnist_dataset = mnist_dataset
        self.train = self.mnist_dataset.train
        self.transform = self.mnist_dataset.transform

        if self.train:
            self.train_labels = self.mnist_dataset.train_labels
            self.train_data = self.mnist_dataset.train_data
            self.labels_set = set(self.train_labels.numpy())
            self.label_to_indices = {label: np.where(self.train_labels.numpy() == label)[0]
                                     for label in self.labels_set}

        else:
            self.test_labels = self.mnist_dataset.test_labels
            self.test_data = self.mnist_dataset.test_data
            # generate fixed triplets for testing
            self.labels_set = set(self.test_labels.numpy())
            self.label_to_indices = {label: np.where(self.test_labels.numpy() == label)[0]
                                     for label in self.labels_set}

            random_state = np.random.RandomState(29)

            triplets = [[i,
                         random_state.choice(self.label_to_indices[self.test_labels[i].item()]),
                         random_state.choice(self.label_to_indices[
                                                 np.random.choice(
                                                     list(self.labels_set - set([self.test_labels[i].item()]))
                                                 )
                                             ])
                         ]
                        for i in range(len(self.test_data))]
            self.test_triplets = triplets

    def __getitem__(self, index):
        if self.train:
            img1, label1 = self.train_data[index], self.train_labels[index].item()
            positive_index = index
            while positive_index == index:
                positive_index = np.random.choice(self.label_to_indices[label1])
            negative_label = np.random.choice(list(self.labels_set - set([label1])))
            negative_index = np.random.choice(self.label_to_indices[negative_label])
            img2 = self.train_data[positive_index]
            img3 = self.train_data[negative_index]
        else:
            img1 = self.test_data[self.test_triplets[index][0]]
            img2 = self.test_data[self.test_triplets[index][1]]
            img3 = self.test_data[self.test_triplets[index][2]]

        img1 = Image.fromarray(img1.numpy(), mode='L')
        img2 = Image.fromarray(img2.numpy(), mode='L')
        img3 = Image.fromarray(img3.numpy(), mode='L')
        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
            img3 = self.transform(img3)
        return (img1, img2, img3), []

    def __len__(self):
        return len(self.mnist_dataset)


In [None]:
class TripletNet(nn.Module):
    def __init__(self):
        super(TripletNet, self).__init__()
        self.embedding_net = nn.Sequential(nn.Linear(28*28, 256),
                                nn.PReLU(),
                                nn.Linear(256, 256),
                                nn.PReLU(),
                                nn.Linear(256, 2)
                                )

    def forward(self, x1, x2, x3):
        output1 = self.embedding_net(x1)
        output2 = self.embedding_net(x2)
        output3 = self.embedding_net(x3)
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)

In [None]:
trans = transforms.Compose([transforms.ToTensor()])
data_loader = TripletMNIST(torchvision.datasets.MNIST(root="~/torch_datasets", train = True, transform = trans, download=True))
val_loader = TripletMNIST(torchvision.datasets.MNIST(root="~/torch_datasets", train = False, transform = trans, download=True))



In [None]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=2)
model = TripletNet()
optimizer = torch.optim.Adadelta(model.parameters(), lr=0.01)
model.train()
for epoch in range(2):
  running_loss = 0
  for data in enumerate(data_loader):
    optimizer.zero_grad()
    i , images = data
    imagesList = list(images[0]) 
    imagesList[0] = imagesList[0].reshape(-1,784)
    imagesList[1] = imagesList[1].reshape(-1,784)
    imagesList[2] = imagesList[2].reshape(-1,784)
    images = tuple(imagesList)
    output = model(*images)
    loss = triplet_loss(*output)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  running_loss/= len(data_loader)
  print('\nTrain set: Average loss: {:.4f} , epoch: {}'.format(running_loss,epoch))


Train set: Average loss: 0.3311 , epoch: 0

Train set: Average loss: 0.2199 , epoch: 1


In [None]:
 with torch.no_grad():
   model.eval()
   val_loss = 0
   for batch_idx, (data, target) in enumerate(val_loader):
     imagesList = list(data)
     imagesList[0] = imagesList[0].reshape(-1,784)
     imagesList[1] = imagesList[1].reshape(-1,784)
     imagesList[2] = imagesList[2].reshape(-1,784)
     data = tuple(imagesList)
     outputs = model(*data)
     loss = triplet_loss(*output)
     val_loss+=loss.item()
   val_loss/=len(val_loader)
   print('test set: Average loss: {:.4f} '.format(running_loss))

	est set: Average loss: 0.2199 


# Using RMSprop

In [None]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=2)
model = TripletNet()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.01)
model.train()
for epoch in range(2):
  running_loss = 0
  for data in enumerate(data_loader):
    optimizer.zero_grad()
    i , images = data
    imagesList = list(images[0]) 
    imagesList[0] = imagesList[0].reshape(-1,784)
    imagesList[1] = imagesList[1].reshape(-1,784)
    imagesList[2] = imagesList[2].reshape(-1,784)
    images = tuple(imagesList)
    output = model(*images)
    loss = triplet_loss(*output)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  running_loss/= len(data_loader)
  print('\nTrain set: Average loss: {:.4f} , epoch: {}'.format(running_loss,epoch))



Train set: Average loss: 41.7002 , epoch: 0

Train set: Average loss: 82.6408 , epoch: 1


In [None]:
 with torch.no_grad():
   model.eval()
   val_loss = 0
   for batch_idx, (data, target) in enumerate(val_loader):
     imagesList = list(data)
     imagesList[0] = imagesList[0].reshape(-1,784)
     imagesList[1] = imagesList[1].reshape(-1,784)
     imagesList[2] = imagesList[2].reshape(-1,784)
     data = tuple(imagesList)
     outputs = model(*data)
     loss = triplet_loss(*output)
     val_loss+=loss.item()
   val_loss/=len(val_loader)
   print('test set: Average loss: {:.4f} '.format(running_loss))

test set: Average loss: 82.6408 


# Using Mini Batch gradient Descent

In [None]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=2)
model = TripletNet()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
model.train()
for epoch in range(2):
  running_loss = 0
  for data in enumerate(data_loader):
    optimizer.zero_grad()
    i , images = data
    imagesList = list(images[0]) 
    imagesList[0] = imagesList[0].reshape(-1,784)
    imagesList[1] = imagesList[1].reshape(-1,784)
    imagesList[2] = imagesList[2].reshape(-1,784)
    images = tuple(imagesList)
    output = model(*images)
    loss = triplet_loss(*output)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  running_loss/= len(data_loader)
  print('\nTrain set: Average loss: {:.4f} , epoch: {}'.format(running_loss,epoch))


Train set: Average loss: 0.1925 , epoch: 0

Train set: Average loss: 0.1176 , epoch: 1


In [None]:
 with torch.no_grad():
   model.eval()
   val_loss = 0
   for batch_idx, (data, target) in enumerate(val_loader):
     imagesList = list(data)
     imagesList[0] = imagesList[0].reshape(-1,784)
     imagesList[1] = imagesList[1].reshape(-1,784)
     imagesList[2] = imagesList[2].reshape(-1,784)
     data = tuple(imagesList)
     outputs = model(*data)
     loss = triplet_loss(*output)
     val_loss+=loss.item()
   val_loss/=len(val_loader)
   print('test set: Average loss: {:.4f} '.format(running_loss))

test set: Average loss: 0.1176 


# changing the margin

In [None]:
triplet_loss = nn.TripletMarginLoss(margin=2.0, p=2)
model = TripletNet()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
model.train()
for epoch in range(2):
  running_loss = 0
  for data in enumerate(data_loader):
    optimizer.zero_grad()
    i , images = data
    imagesList = list(images[0]) 
    imagesList[0] = imagesList[0].reshape(-1,784)
    imagesList[1] = imagesList[1].reshape(-1,784)
    imagesList[2] = imagesList[2].reshape(-1,784)
    images = tuple(imagesList)
    output = model(*images)
    loss = triplet_loss(*output)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  running_loss/= len(data_loader)
  print('\nTrain set: Average loss: {:.4f} , epoch: {}'.format(running_loss,epoch))


Train set: Average loss: 0.4423 , epoch: 0

Train set: Average loss: 0.3482 , epoch: 1


In [None]:
 with torch.no_grad():
   model.eval()
   val_loss = 0
   for batch_idx, (data, target) in enumerate(val_loader):
     imagesList = list(data)
     imagesList[0] = imagesList[0].reshape(-1,784)
     imagesList[1] = imagesList[1].reshape(-1,784)
     imagesList[2] = imagesList[2].reshape(-1,784)
     data = tuple(imagesList)
     outputs = model(*data)
     loss = triplet_loss(*output)
     val_loss+=loss.item()
   val_loss/=len(val_loader)
   print('test set: Average loss: {:.4f} '.format(running_loss))

test set: Average loss: 0.3482 


#changing the p value

In [None]:
triplet_loss = nn.TripletMarginLoss(margin=1.0, p=1)
model = TripletNet()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
model.train()
for epoch in range(2):
  running_loss = 0
  for data in enumerate(data_loader):
    optimizer.zero_grad()
    i , images = data
    imagesList = list(images[0]) 
    imagesList[0] = imagesList[0].reshape(-1,784)
    imagesList[1] = imagesList[1].reshape(-1,784)
    imagesList[2] = imagesList[2].reshape(-1,784)
    images = tuple(imagesList)
    output = model(*images)
    loss = triplet_loss(*output)
    loss.backward()
    optimizer.step()
    running_loss += loss.item()
  running_loss/= len(data_loader)
  print('\nTrain set: Average loss: {:.4f} , epoch: {}'.format(running_loss,epoch))


Train set: Average loss: 0.2361 , epoch: 0

Train set: Average loss: 0.1957 , epoch: 1


In [None]:
 with torch.no_grad():
   model.eval()
   val_loss = 0
   for batch_idx, (data, target) in enumerate(val_loader):
     imagesList = list(data)
     imagesList[0] = imagesList[0].reshape(-1,784)
     imagesList[1] = imagesList[1].reshape(-1,784)
     imagesList[2] = imagesList[2].reshape(-1,784)
     data = tuple(imagesList)
     outputs = model(*data)
     loss = triplet_loss(*output)
     val_loss+=loss.item()
   val_loss/=len(val_loader)
   print('test set: Average loss: {:.4f} '.format(running_loss))

test set: Average loss: 0.1957 


# Pros of Siamese Network
## More Robust to class Imbalance
## Nice to an ensemble with the best classifier
##Learning from Semantic Similarity


# Cons of Siamese Network
## Needs more training time than normal networks
## Doesn’t output probabilities