## dataset prep

In [1]:
# import pandas as pd
# import shutil
# import os

# # Load the dataset
# df = pd.read_csv('pet_biometric_challenge_2022/train/train_data.csv')

# # Count the number of images for each dog ID
# image_counts = df['dog ID'].value_counts()

# # Filter IDs with 5 or more images
# ids_with_enough_images = image_counts[image_counts >= 8].index

# # Create directories and copy files
# for dog_id in ids_with_enough_images:
#     # Create a directory for the dog ID if it doesn't exist
#     directory_path = f'./dataset/train/{dog_id}'
#     os.makedirs(directory_path, exist_ok=True)
    
#     # Get all images for this dog ID
#     images_to_copy = df[df['dog ID'] == dog_id]['nose print image']
    
#     # Copy each image
#     for image in images_to_copy:
#         src_path = f'pet_biometric_challenge_2022/train/images/{image}'  # Adjust this path
#         dst_path = f'{directory_path}/{image}'
#         shutil.copy(src_path, dst_path)

#     print(f'Copied {len(images_to_copy)} images for dog ID {dog_id}')


In [2]:
# len(os.listdir('dataset/train'))

## Model

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet152
from torch.utils.data import DataLoader, Dataset
import numpy as np


In [4]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
from PIL import Image
import random

class PairedDogNoseDataset(Dataset):
    def __init__(self, image_folder, transform=None):
        self.dataset = ImageFolder(root=image_folder, transform=transform)
        self.transform = transform

        # Create a dictionary of lists for each class
        self.class_to_images = {}
        for img, label in self.dataset.imgs:
            if label not in self.class_to_images:
                self.class_to_images[label] = []
            self.class_to_images[label].append(img)

    def __getitem__(self, index):
        # Randomly select whether to get a positive or negative pair
        should_get_same_class = random.randint(0, 1) == 0
        
        first_image, label1 = self.dataset.imgs[index]
        if should_get_same_class:
            second_image = random.choice(self.class_to_images[label1])
            label = 1
        else:
            different_class = random.choice(list(set(self.dataset.class_to_idx.values()) - {label1}))
            second_image = random.choice(self.class_to_images[different_class])
            label = 0

        img1 = Image.open(first_image)
        img2 = Image.open(second_image)

        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, label

    def __len__(self):
        return len(self.dataset.imgs)


In [5]:
# Define your transformations - you can add more augmentations as necessary
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize images to 256x256
    transforms.ToTensor(),          # Convert images to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize with mean and std
])

paired_train_dataset = PairedDogNoseDataset('dataset/train', transform=transform)
paired_train_dataloader = DataLoader(paired_train_dataset, batch_size=8, shuffle=True)


In [6]:
class ArcFaceLoss(nn.Module):
    def __init__(self, s=30.0, m=0.50):
        super(ArcFaceLoss, self).__init__()
        self.s = s
        self.m = m

    def forward(self, cosine, labels):
        # Add margin
        phi = cosine - self.m
        # Apply the softmax on the adjusted scores
        one_hot = torch.zeros(cosine.size(), device=cosine.device)
        one_hot.scatter_(1, labels.view(-1, 1).long(), 1)
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)  # only adjust the angles for correct class
        output *= self.s

        loss = nn.CrossEntropyLoss()(output, labels)
        return loss


In [7]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = nn.functional.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1 - label) * torch.pow(euclidean_distance, 2) +
                                      label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
        return loss_contrastive


In [8]:
class ResNetBackbone(nn.Module):
    def __init__(self):
        super(ResNetBackbone, self).__init__()
        # Load a pre-trained ResNet-152 and remove the last GAP and FC
        base_model = resnet152(pretrained=True)
        self.features = nn.Sequential(*list(base_model.children())[:-2])

        # Additional blocks to reduce channel dimensions
        self.reduce_channels = nn.Sequential(
            nn.Conv2d(2048, 1024, kernel_size=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.Conv2d(1024, 512, kernel_size=1),
            nn.BatchNorm2d(512),
            nn.ReLU()
        )

    def forward(self, x):
        x = self.features(x)
        x = self.reduce_channels(x)
        return x


In [9]:
class AttentionModule(nn.Module):
    def __init__(self):
        super(AttentionModule, self).__init__()
        self.channel_attention = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(512, 512, kernel_size=1),
            nn.Sigmoid()
        )
        self.spatial_attention = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel attention
        ca = self.channel_attention(x) * x

        # Spatial attention
        sa = self.spatial_attention(ca) * ca

        return sa


In [10]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.backbone = ResNetBackbone()
        self.attention = AttentionModule()
        self.pooling = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(512, 1024)  # Output 1024-dimensional embedding

    def forward(self, x1, x2):
        out1 = self.backbone(x1)
        out1 = self.attention(out1)
        out1 = self.pooling(out1)
        out1 = out1.view(out1.size(0), -1)
        out1 = self.fc(out1)

        out2 = self.backbone(x2)
        out2 = self.attention(out2)
        out2 = self.pooling(out2)
        out2 = out2.view(out2.size(0), -1)
        out2 = self.fc(out2)

        return out1, out2


In [11]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [12]:
# Assuming model, optimizers, and loss functions are already defined
model = SiameseNetwork().to(device)
optimizer_contrastive = optim.Adam(model.parameters(), lr=0.0001, betas=(0.5, 0.999))
optimizer_arcface = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=0.0005)

criterion_contrastive = ContrastiveLoss(margin=2.0)
criterion_arcface = ArcFaceLoss(s=30, m=0.5)

# Scheduler for learning rate decay of the ArcFace optimizer
scheduler_arcface = optim.lr_scheduler.LinearLR(optimizer_arcface, start_factor=1.0, end_factor=0, total_iters=200)

# Training loop
for epoch in range(10):  # Total epochs set as 200
    model.train()  # Set the model to training mode
    total_loss = 0
    
    for img1, img2, labels in paired_train_dataloader:
        img1, img2, labels = img1.to(device), img2.to(device), labels.to(device)
        
        # Forward pass: Compute predicted outputs by passing inputs to the model
        output1, output2 = model(img1, img2)
        
        # Compute Contrastive Loss
        loss_contrastive = criterion_contrastive(output1, output2, labels)
        
        # Calculate the mean embedding for ArcFace Loss (simplified version)
        embeddings = (output1 + output2) / 2
        loss_arcface = criterion_arcface(embeddings, labels)
        
        # Combine losses
        total_loss = loss_contrastive + loss_arcface
        print(f'Epoch {epoch+1}, Loss: {total_loss.item():.4f}')
        
        # Zero gradients, perform a backward pass, and update the weights.
        optimizer_contrastive.zero_grad()
        optimizer_arcface.zero_grad()
        total_loss.backward()
        optimizer_contrastive.step()
        optimizer_arcface.step()
    
    scheduler_arcface.step()  # Update the learning rate
    
    print(f'Epoch {epoch+1}/200, Loss: {total_loss.item():.4f}')






Epoch 1, Loss: 26.1961
Epoch 1, Loss: 25.2341
Epoch 1, Loss: 21.9405
Epoch 1, Loss: 19.3104
Epoch 1, Loss: 20.3678
Epoch 1, Loss: 18.0179
Epoch 1, Loss: 16.7515
Epoch 1, Loss: 12.9856
Epoch 1, Loss: 14.7783
Epoch 1, Loss: 23.6225
Epoch 1, Loss: 16.8969
Epoch 1, Loss: 17.0347
Epoch 1, Loss: 20.6166
Epoch 1, Loss: 17.8448
Epoch 1, Loss: 14.6770
Epoch 1, Loss: 16.6490
Epoch 1, Loss: 16.5824
Epoch 1, Loss: 15.5156
Epoch 1, Loss: 11.3931
Epoch 1, Loss: 10.7938
Epoch 1, Loss: 19.6931
Epoch 1, Loss: 16.4525
Epoch 1, Loss: 14.0477
Epoch 1, Loss: 14.0073
Epoch 1, Loss: 16.5781
Epoch 1, Loss: 21.7767
Epoch 1, Loss: 23.6572
Epoch 1, Loss: 19.4430
Epoch 1, Loss: 17.2542
Epoch 1, Loss: 22.2114
Epoch 1, Loss: 15.6215
Epoch 1, Loss: 18.2482
Epoch 1, Loss: 15.4398
Epoch 1, Loss: 15.3676
Epoch 1, Loss: 16.5256
Epoch 1, Loss: 18.0599
Epoch 1, Loss: 17.0484
Epoch 1, Loss: 16.5431
Epoch 1, Loss: 17.5243
Epoch 1, Loss: 15.0646
Epoch 1, Loss: 17.0666
Epoch 1, Loss: 16.3739
Epoch 1, Loss: 12.1146
Epoch 1, Lo

## inference

In [None]:
# Load the trained model
model = SiameseNetwork().cuda()
model.load_state_dict(torch.load('path_to_saved_model.pth'))
model.eval()  # Set the model to evaluation mode


In [30]:
model.eval()  # Set the model to evaluation mode
# Change the device to CPU
device = torch.device('cpu')
model.to(device)
transform_val = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize the image to 256x256
    transforms.ToTensor(),          # Convert the image to a tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])


In [31]:
def infer_similarity(img_path1, img_path2):
    # Load and transform images
    img1 = Image.open(img_path1).convert('RGB')
    img2 = Image.open(img_path2).convert('RGB')
    img1 = transform(img1).unsqueeze(0).to(device) # Add batch dimension and send to GPU
    img2 = transform(img2).unsqueeze(0).to(device)  # Add batch dimension and send to GPU

    # Perform inference
    with torch.no_grad():  # No need to track gradients
        output1, output2 = model(img1, img2)
        euclidean_distance = torch.nn.functional.pairwise_distance(output1, output2)

    return euclidean_distance.item()


In [32]:
# Paths to the images you want to compare
image_path1 = 'dataset/train/1771/A*d-fSRoB7LOAAAAAAAAAAAAAAAQAAAQ.jpg'
image_path2 = 'dataset/train/1775/A*8dSMQ6vUnmUAAAAAAAAAAAAAAQAAAQ.jpg'

# Perform inference
distance = infer_similarity(image_path1, image_path2)
print(f'The Euclidean distance between the images is: {distance}')


The Euclidean distance between the images is: 1.481567621231079


In [37]:
def find_most_similar(query_img_path, reference_img_paths):
    min_distance = float('inf')
    most_similar_img = None

    for ref_path in reference_img_paths:
        distance = infer_similarity(query_img_path, ref_path)
        if distance < min_distance:
            min_distance = distance
            most_similar_img = ref_path

    return most_similar_img, min_distance

# List of reference image paths
reference_images = ['dataset/train/1781/0Tolu7cpRQurgl7b87DAOwAAACMAARAD.jpg','dataset/train/1790/A*Kf4tQrPJVrgAAAAAAAAAAAAAAQAAAQ.jpg', 'dataset/train/1771/A*jP8SQ4cdHVDm2wJBthDn0AAAAQAAAQ.jpg', 'dataset/train/1775/A*cSJSR4QTq74AAAAAAAAAAAAAAQAAAQ.jpg', 'dataset/train/1780/A*83ofSqE-8DMAAAAAAAAAAAAAAQAAAQ.jpg']
most_similar_image, similarity_score = find_most_similar('dataset/train/1790/A*o2r2TqdLuOoAAAAAAAAAAAAAAQAAAQ.jpg', reference_images)
print(f'Most similar image: {most_similar_image}, Distance: {similarity_score}')


Most similar image: dataset/train/1790/A*Kf4tQrPJVrgAAAAAAAAAAAAAAQAAAQ.jpg, Distance: 2.301255226135254
