<a href="https://colab.research.google.com/github/KushalGajjar1/Siamese-Neural-Network/blob/main/Siamese_Network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision import transforms

import numpy as np
from tqdm import tqdm

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
class CustomDataset(Dataset):

    def __init__(self, mnist_dataset, transform=None):
        self.mnist_dataset = mnist_dataset
        self.transform = transform

    def __len__(self):
        return len(self.mnist_dataset)

    def __getitem__(self, index):
        img0, label0 = self.mnist_dataset[index]

        same_class = np.random.randint(2)

        if same_class:
            while True:
                img1, label1 = self.mnist_dataset[np.random.randint(0, len(self.mnist_dataset)-1)]
                if label0 == label1:
                    break

        else:
            while True:
                img1, label1 = self.mnist_dataset[np.random.randint(0, len(self.mnist_dataset)-1)]
                if label0 != label1:
                    break

        if self.transform:
            img0 = self.transform(img0)
            img1 = self.transform(img1)

        return img0, img1, torch.tensor(int(label0 != label1), dtype=torch.float32)

In [None]:
class CustomDatasetOffice(Dataset):

    def __init__(self, data, label, transform=None):
        self.data = data
        self.label = label
        self.transform = transform

    def __len__(self):
        return self.label.shape[0]

    def __getitem__(self, index):
        img0 = self.data[index]
        label0 = self.label[index]

        same_class = np.random.randint(2)

        if same_class:
            while True:
                idx = np.random.randint(0, self.data.shape[0]-1)
                img1 = self.data[idx]
                label1 = self.label[idx]
                if label0 == label1:
                    break

        else:
            while True:
                idx = np.random.randint(0, self.data.shape[0]-1)
                img1 = self.data[idx]
                label1 = self.label[idx]
                if label0 != label1:
                    break

        # if self.transform:
        #     img0 = self.transform(img0)
        #     img1 = self.transform(img1)



        return torch.tensor(img0), torch.tensor(img1), torch.from_numpy(np.array([int(label0 != label1)], dtype=np.float32))
        # return img0, img1, torch.from_numpy(np.array([int(label0 != label1)], dtype=np.float32))


In [None]:
class ContrastiveLoss(nn.Module):

    def __init__(self, margin=2.0):
        super().__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim=True)
        loss_contrastive = torch.mean((1-label)*torch.pow(euclidean_distance, 2) + (label)*torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

In [None]:
class Encoder(nn.Module):

    def __init__(self):
        super().__init__()

        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1)
        self.fc1 = nn.Linear(256*4*4, 256)
        self.fc2 = nn.Linear(256, 128)

    def forward(self, x):

        layer1 = F.relu(self.conv1(x))
        layer2 = F.relu(self.conv2(layer1))
        layer3 = F.relu(self.conv3(layer2))
        layer4 = self.conv4(layer3)
        layer4_p = layer4.view(-1, int(layer4.nelement()/layer4.shape[0]))
        layer5 = F.relu(self.fc1(layer4_p))
        layer6 = F.relu(self.fc2(layer5))
        return layer6

In [None]:
class Network(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder

    def forward(self, x1, x2):
        output1 = self.encoder(x1)
        output2 = self.encoder(x2)
        return output1, output2

In [None]:
batchsize = 32

In [None]:
data_source = np.load('/content/drive/MyDrive/Source/source_data.npy')
label_source = np.load('/content/drive/MyDrive/Source/source_label.npy')

# data_target = np.load('data/target_data.npy')
# label_target = np.load('data/target_label.npy')

data_source_norm = data_source / np.max(data_source)
# data_target_norm = data_target / np.max(data_target)

data_source_tensor_initial = torch.tensor(data_source_norm).float()
data_source_tensor = torch.transpose(data_source_tensor_initial, 3, 1)

label_source_tensor = torch.tensor(label_source).long()

# data_target_tensor_initial = torch.tensor(data_target_norm).float()
# data_target_tensor = torch.transpose(data_target_tensor_initial, 3, 1)

# label_target_tensor = torch.tensor(label_target).long()

train_data = data_source_tensor
train_label = label_source_tensor

# test_data = data_target_tensor
# test_label = label_target_tensor

# transform = transforms.Compose([
#     transforms.ToTensor()
# ])

# train_data = CustomDatasetOffice(torch.Tensor.numpy(train_data), torch.Tensor.numpy(train_label), transform=transform)
train_data = CustomDatasetOffice(torch.Tensor.numpy(train_data), torch.Tensor.numpy(train_label))


dataloader = DataLoader(train_data, batch_size=batchsize, shuffle=True, drop_last=True)

In [None]:
next(iter(dataloader))[0].shape

torch.Size([32, 3, 64, 64])

In [None]:
epochs = 50

In [None]:
encoder = Encoder()
model = Network(encoder)

criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

model.to(device)

loss_history = []

for epoch in tqdm(range(epochs)):

    epoch_loss = []

    for img0, img1, label in dataloader:

        img0 = img0.to(device)
        img1 = img1.to(device)
        label = label.to(device)

        optimizer.zero_grad()
        output1, output2 = model(img0, img1)
        loss = criterion(output1, output2, label)
        loss.backward()

        optimizer.step()

        epoch_loss.append(loss.item())

    loss_history.append(np.mean(np.array(epoch_loss)))

100%|██████████| 50/50 [00:05<00:00,  8.75it/s]


In [None]:
loss_history

[1.692101891224201,
 1.1597495675086975,
 1.0328848316119268,
 0.9660993218421936,
 1.0023952447451079,
 0.8978409721301153,
 0.8470766452642587,
 0.9910725125899682,
 0.882320564526778,
 0.9516353882276095,
 0.8627420159486624,
 0.7909680880033053,
 0.7555883389252883,
 0.7554859564854548,
 0.7713319063186646,
 0.7060986207081721,
 0.612116699035351,
 0.5164797306060791,
 0.5501061655007876,
 0.5283873677253723,
 0.508529917551921,
 0.4466073971528273,
 0.4909911820521721,
 0.4172105903808887,
 0.39559943400896513,
 0.4012790872500493,
 0.3801728578714224,
 0.3790355519606517,
 0.3521457199866955,
 0.33548829417962295,
 0.29831865200629604,
 0.3392949069921787,
 0.26507878188903516,
 0.2822979104060393,
 0.2342694986325044,
 0.20906077783841354,
 0.24810896584620842,
 0.20072853106718797,
 0.1740279455597584,
 0.14728253334760666,
 0.15751840288822466,
 0.1558479592204094,
 0.1292335385313401,
 0.13675192055793908,
 0.08847684929004082,
 0.09817819325969769,
 0.08351547930103081,
 0.0

In [None]:
model.to('cpu')
torch.save(model.state_dict(), 'network.pt')