### Importing libraries

In [1]:
#Importing image file paths
import os
import glob
import time

#For managing dataframe
import pandas as pd
import numpy as np

#PyTorch
import torchaudio
import torch
import timm
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset
from torch import nn
from torchvision import transforms
from fastai.vision.learner import create_body
from torchinfo import summary

from tqdm import tqdm


#Metrics
import seaborn as sb
import matplotlib.pyplot as plt
from torchmetrics.classification import MulticlassF1Score
from torchmetrics.classification import ConfusionMatrix

# Misc.
import warnings
import torchvision
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import StandardScaler
from tqdm.notebook import tqdm_notebook
from collections import defaultdict

import torch.nn.functional as F
from torchvision import transforms
import glob
import time


warnings.filterwarnings("ignore")

#Selecting device
device="cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device}")

Using cuda


### Dataset

In [2]:
class Dataset(torch.utils.data.IterableDataset):
    def __init__(self, path, shuffle_pairs=True, augment=False):
        '''
        Create an iterable dataset from a directory containing sub-directories of 
        entities with their images contained inside each sub-directory.

            Parameters:
                    path (str):                 Path to directory containing the dataset.
                    shuffle_pairs (boolean):    Pass True when training, False otherwise. When set to false, the image pair generation will be deterministic
                    augment (boolean):          When True, images will be augmented using a standard set of transformations.

            where b = batch size

            Returns:
                    output (torch.Tensor): shape=[b, 1], Similarity of each pair of images
        '''
        self.path = path

        self.feed_shape = [3, 224, 224]
        self.shuffle_pairs = shuffle_pairs

        self.augment = augment

        if self.augment:
            # If images are to be augmented, add extra operations for it (first two).
            self.transform = transforms.Compose([
                transforms.RandomAffine(degrees=20, translate=(0.2, 0.2), scale=(0.8, 1.2), shear=0.2),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                transforms.Resize(self.feed_shape[1:])
            ])
        else:
            # If no augmentation is needed then apply only the normalization and resizing operations.
            self.transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                transforms.Resize(self.feed_shape[1:])
            ])

        self.create_pairs()

    def create_pairs(self):
        '''
        Creates two lists of indices that will form the pairs, to be fed for training or evaluation.
        '''

        self.image_paths = glob.glob(os.path.join(self.path, "*/*.jpg"))
        self.image_classes = []
        self.class_indices = {}

        for image_path in self.image_paths:
            image_class = image_path.split(os.path.sep)[-2]
            self.image_classes.append(image_class)

            if image_class not in self.class_indices:
                self.class_indices[image_class] = []
            self.class_indices[image_class].append(self.image_paths.index(image_path))

        self.indices1 = np.arange(len(self.image_paths))

        if self.shuffle_pairs:
            np.random.seed(int(time.time()))
            np.random.shuffle(self.indices1)
        else:
            # If shuffling is set to off, set the random seed to 1, to make it deterministic.
            np.random.seed(1)

        select_pos_pair = np.random.rand(len(self.image_paths)) < 0.5

        self.indices2 = []

        for i, pos in zip(self.indices1, select_pos_pair):
            class1 = self.image_classes[i]
            if pos:
                class2 = class1
            else:
                class2 = np.random.choice(list(set(self.class_indices.keys()) - {class1}))
            idx2 = np.random.choice(self.class_indices[class2])
            self.indices2.append(idx2)
        self.indices2 = np.array(self.indices2)

    def __iter__(self):
        self.create_pairs()

        for idx, idx2 in zip(self.indices1, self.indices2):

            image_path1 = self.image_paths[idx]
            image_path2 = self.image_paths[idx2]

            class1 = self.image_classes[idx]
            class2 = self.image_classes[idx2]

            image1 = Image.open(image_path1).convert("RGB")
            image2 = Image.open(image_path2).convert("RGB")

            if self.transform:
                image1 = self.transform(image1).float()
                image2 = self.transform(image2).float()

            yield (image1, image2), torch.FloatTensor([class1==class2]), (class1, class2)
        
    def __len__(self):
        return len(self.image_paths)


In [3]:
train_dataset   = Dataset("../DATASET/train/256_objectcategories/256_ObjectCategories/", shuffle_pairs=True, augment=True)
val_dataset     = Dataset("../DATASET/val", shuffle_pairs=False, augment=False)

print(f"For training dataset: {len(train_dataset)}")
print(f"For val dataset: {len(val_dataset)}")

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, drop_last=True)
val_dataloader   = torch.utils.data.DataLoader(val_dataset, batch_size=32)

For training dataset: 30607
For val dataset: 188


### Model

In [4]:
timm.list_models("vit*",pretrained=True)

['vit_base_patch8_224.augreg2_in21k_ft_in1k',
 'vit_base_patch8_224.augreg_in21k',
 'vit_base_patch8_224.augreg_in21k_ft_in1k',
 'vit_base_patch8_224.dino',
 'vit_base_patch14_dinov2.lvd142m',
 'vit_base_patch14_reg4_dinov2.lvd142m',
 'vit_base_patch16_224.augreg2_in21k_ft_in1k',
 'vit_base_patch16_224.augreg_in1k',
 'vit_base_patch16_224.augreg_in21k',
 'vit_base_patch16_224.augreg_in21k_ft_in1k',
 'vit_base_patch16_224.dino',
 'vit_base_patch16_224.mae',
 'vit_base_patch16_224.orig_in21k',
 'vit_base_patch16_224.orig_in21k_ft_in1k',
 'vit_base_patch16_224.sam_in1k',
 'vit_base_patch16_224_miil.in21k',
 'vit_base_patch16_224_miil.in21k_ft_in1k',
 'vit_base_patch16_384.augreg_in1k',
 'vit_base_patch16_384.augreg_in21k_ft_in1k',
 'vit_base_patch16_384.orig_in21k_ft_in1k',
 'vit_base_patch16_clip_224.datacompxl',
 'vit_base_patch16_clip_224.dfn2b',
 'vit_base_patch16_clip_224.laion2b',
 'vit_base_patch16_clip_224.laion2b_ft_in1k',
 'vit_base_patch16_clip_224.laion2b_ft_in12k',
 'vit_base

In [5]:
class embeddings(nn.Module):
    def __init__(self, backbone="resnet18", input_size=(1,3,224,224)):
        super().__init__()

        
        # Create a backbone network from the pretrained models provided in torchvision.models 
        if backbone in torchvision.models.__dict__:
            self.backbone=create_body(torchvision.models.__dict__[backbone](pretrained=True, progress=True),pretrained=True, n_in=3, cut=-2)
        elif backbone in timm.list_models(pretrained=True):
            self.backbone=create_body(timm.create_model(backbone,pretrained=True),pretrained=True, n_in=3,cut=-2)
        else:
            raise Exception("No model named {} exists in torchvision.models or timm models.".format(backbone))
                  
        
        self.flatten=nn.Flatten()

        self.encoder=nn.Sequential(
            self.backbone,
            self.flatten
        )

        self.flattened_features=self.encoder(torch.rand(input_size)).shape[1]
 
        self.last_block=nn.Sequential(
            nn.Linear(self.flattened_features,1024),
            nn.ReLU(inplace=True)
            #nn.BatchNorm1d(1024)
        )
        
        self.fc = nn.Sequential(
            nn.Linear(2048, 1),
            nn.Sigmoid()
        )
        
    def forward(self,x):
        return(self.last_block(self.encoder(x)))

    def comparator(self,input1,target):
        
        output = torch.cat((input1, target), dim=1)
        output = self.fc(output)
        return output

### Contrastive Loss

In [6]:
class ContrastiveLoss(torch.nn.Module):

    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, input1, input2, y):
        diff = input1 - input2
        dist_sq = torch.sum(torch.pow(diff, 2), 1)
        dist = torch.sqrt(dist_sq)
        mdist = self.margin - dist
        dist = torch.clamp(mdist, min=0.0)
        loss = y * dist_sq + (1 - y) * torch.pow(dist, 2)
        loss = torch.sum(loss) / 2.0 / input1.size()[0]
        return loss

### Model and Optimizer

In [32]:
model = embeddings()
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = ContrastiveLoss()

In [33]:
count=0
for layer in model.parameters():
    count+=1

In [34]:
print(count)

64


In [35]:
summary(model)

Layer (type:depth-idx)                             Param #
embeddings                                         --
├─Sequential: 1-1                                  --
│    └─Conv2d: 2-1                                 9,408
│    └─BatchNorm2d: 2-2                            128
│    └─ReLU: 2-3                                   --
│    └─MaxPool2d: 2-4                              --
│    └─Sequential: 2-5                             --
│    │    └─BasicBlock: 3-1                        73,984
│    │    └─BasicBlock: 3-2                        73,984
│    └─Sequential: 2-6                             --
│    │    └─BasicBlock: 3-3                        230,144
│    │    └─BasicBlock: 3-4                        295,424
│    └─Sequential: 2-7                             --
│    │    └─BasicBlock: 3-5                        919,040
│    │    └─BasicBlock: 3-6                        1,180,672
│    └─Sequential: 2-8                             --
│    │    └─BasicBlock: 3-7                

In [36]:
count=0
for layer in model.parameters():
    if(count==61):
        break
    layer.requires_grad=False
    count+=1

In [37]:
summary(model)

Layer (type:depth-idx)                             Param #
embeddings                                         --
├─Sequential: 1-1                                  --
│    └─Conv2d: 2-1                                 (9,408)
│    └─BatchNorm2d: 2-2                            (128)
│    └─ReLU: 2-3                                   --
│    └─MaxPool2d: 2-4                              --
│    └─Sequential: 2-5                             --
│    │    └─BasicBlock: 3-1                        (73,984)
│    │    └─BasicBlock: 3-2                        (73,984)
│    └─Sequential: 2-6                             --
│    │    └─BasicBlock: 3-3                        (230,144)
│    │    └─BasicBlock: 3-4                        (295,424)
│    └─Sequential: 2-7                             --
│    │    └─BasicBlock: 3-5                        (919,040)
│    │    └─BasicBlock: 3-6                        (1,180,672)
│    └─Sequential: 2-8                             --
│    │    └─BasicBlock: 3-7

### Training Loop

In [38]:
len(train_dataset)

30607

In [39]:
for epoch in tqdm_notebook(range(50)):
        print("[{} / {}]".format(epoch, 50))
        model.train()

        losses = []
        
        correct = 0
        total = 0


        # Training Loop Start
        for (img1, img2), y, (class1, class2) in tqdm_notebook(train_dataloader,desc="Trianing"):
    
            img1, img2, y = map(lambda x: x.to(device), [img1, img2, y])

            emb1 = model(img1)
            emb2 = model(img2)
            loss = criterion(emb1, emb2, y)
    
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            losses.append(loss.item())
            total += len(y)
    
    
        print("\tTraining: Loss={:.2f}".format(sum(losses)/len(losses)))
        # Training Loop End

        # Evaluation Loop Start
        model.eval()

        losses = []
        correct = 0
        total = 0
    
        for (img1, img2), y, (class1, class2) in tqdm_notebook(val_dataloader, desc="Validation"):
            try:
                img1, img2, y = map(lambda x: x.to(device), [img1, img2, y])
    
                emb1 = model(img1)
                ebm2 = model(img2)
                #print(emb1.shape,emb2.shape)
                loss = criterion(emb1, emb2, y)
    
                losses.append(loss.item())
                total += len(y)
            except Exception as e:
                #print(e)
                continue

        val_loss = sum(losses)/max(1, len(losses))
        
        print("\tValidation: Loss={:.2f}".format(val_loss))
        # Evaluation Loop End


  0%|          | 0/50 [00:00<?, ?it/s]

[0 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=2250.17


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=1226.04
[1 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=647.15


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=361.09
[2 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=215.86


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=155.65
[3 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=91.08


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=75.72
[4 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=44.48


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=40.47
[5 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=24.23


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=24.06
[6 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=14.70


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=15.20
[7 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=10.26


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=11.96
[8 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=8.23


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=10.74
[9 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=7.27


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=8.33
[10 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.88


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=8.69
[11 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.71


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=7.73
[12 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.61


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=7.36
[13 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.55


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=8.03
[14 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.48


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=7.59
[15 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.44


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=7.27
[16 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

	Training: Loss=6.43


Validation:   0%|          | 0/6 [00:00<?, ?it/s]

	Validation: Loss=7.27
[17 / 50]


Trianing:   0%|          | 0/956 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [40]:
torch.save(model.state_dict(),"weights.pt")

### Testing the model

In [41]:
def test_model(image_path1,image_path2):
    img1=Image.open(image_path1)
    img2=Image.open(image_path2)

    feed_shape = [3, 224, 224]
    
    transform = transforms.Compose([
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        transforms.Resize(feed_shape[1:])
    ])

    convert=transforms.PILToTensor()

    img1=transform(convert(img1).type(torch.float32)).to(device)
    img2=transform(convert(img2).type(torch.float32)).to(device)

    img1=img1.unsqueeze(dim=0)
    img2=img2.unsqueeze(dim=0)

    print(img1.shape,img2.shape)
    
    input1=model(img1)
    input2=model(img2)

    diff = input1 - input2
    dist_sq = torch.sum(torch.pow(diff, 2), 1)
    dist = torch.sqrt(dist_sq)
    
    print(dist)

In [42]:
test_model("../DATASET/val/WALLET 2/WhatsApp Image 2024-04-07 at 16.11.40_2396d0c2.jpg","../DATASET/val/WALLET 2/WhatsApp Image 2024-04-07 at 16.11.39_f9f764e9.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.], device='cuda:0', grad_fn=<SqrtBackward0>)


In [43]:
test_model("../DATASET/val/WALLET 2/WhatsApp Image 2024-04-07 at 16.11.40_2396d0c2.jpg","../DATASET/val/PEN4/WhatsApp Image 2024-04-08 at 00.12.30_41c732d4.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.1969], device='cuda:0', grad_fn=<SqrtBackward0>)


In [44]:
test_model("../DATASET/val/WALLET 2/WhatsApp Image 2024-04-07 at 16.11.40_2396d0c2.jpg","../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.44_ec9ce85c.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.], device='cuda:0', grad_fn=<SqrtBackward0>)


In [46]:
test_model("../DATASET/val/WALLET 2/WhatsApp Image 2024-04-07 at 16.11.40_2396d0c2.jpg","../DATASET/val/WALLET/WhatsApp Image 2024-04-08 at 00.08.28_97faa39b.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.0727], device='cuda:0', grad_fn=<SqrtBackward0>)


In [47]:
test_model("../DATASET/val/METAL BOTTLE/WhatsApp Image 2024-04-07 at 16.11.53_a66a4d7b.jpg","../DATASET/val/METAL BOTTLE/WhatsApp Image 2024-04-08 at 00.12.47_6dd4f8b8.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.], device='cuda:0', grad_fn=<SqrtBackward0>)


In [48]:
test_model("../DATASET/val/METAL BOTTLE/WhatsApp Image 2024-04-07 at 16.11.53_a66a4d7b.jpg","../DATASET/val/PLASTIC BOTTLE/WhatsApp Image 2024-04-07 at 16.11.52_6516f3f0.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.], device='cuda:0', grad_fn=<SqrtBackward0>)


In [49]:
test_model("../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.43_6673c8d1.jpg","../DATASET/val/PLASTIC BOTTLE/WhatsApp Image 2024-04-07 at 16.11.52_6516f3f0.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.3175], device='cuda:0', grad_fn=<SqrtBackward0>)


In [50]:
test_model("../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.43_6673c8d1.jpg","../DATASET/val/HEADPHONES/WhatsApp Image 2024-04-07 at 16.11.04_428e33f8.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.3175], device='cuda:0', grad_fn=<SqrtBackward0>)


In [51]:
test_model("../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.43_6673c8d1.jpg","../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.43_6673c8d1.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.], device='cuda:0', grad_fn=<SqrtBackward0>)


In [52]:
test_model("../DATASET/val/LAPTOP CHARGER/WhatsApp Image 2024-04-07 at 16.35.43_6673c8d1.jpg","../DATASET/val/MOBILE CHARGER/WhatsApp Image 2024-04-07 at 16.11.48_60059a4a.jpg")

torch.Size([1, 3, 224, 224]) torch.Size([1, 3, 224, 224])
tensor([0.3175], device='cuda:0', grad_fn=<SqrtBackward0>)
