# Imports

In [2]:
import os
import cv2
import random
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from itertools import combinations
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader
import torchvision.models as models
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Model Architecture

In [3]:
class DropPath(nn.Module):
    def __init__(self, drop_prob=0.):
        super().__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        if self.drop_prob == 0. or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        shape = (x.shape[0],) + (1,) * (x.ndim - 1)
        random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
        random_tensor.floor_()
        return x.div(keep_prob) * random_tensor

class WindowAttention(nn.Module):
    def __init__(self, dim, heads=4):
        super().__init__()
        self.heads = heads
        self.scale = (dim // heads) ** -0.5
        self.to_qkv = nn.Linear(dim, dim * 3, bias=False)
        self.to_out = nn.Linear(dim, dim)

    def forward(self, x):
        B, N, C = x.shape
        qkv = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(
            lambda t: t.view(B, N, self.heads, C // self.heads).transpose(1, 2),
            qkv
        )
        dots = torch.matmul(q, k.transpose(-2, -1)) * self.scale
        attn = dots.softmax(dim=-1)
        out = torch.matmul(attn, v)
        out = out.transpose(1, 2).reshape(B, N, C)
        return self.to_out(out)

class MSFF_WinAttn_MobileNet_Embedding(nn.Module):
    def __init__(self, embedding_dim=128, drop_path_prob=0.1):
        super().__init__()
        mobilenet = models.mobilenet_v2(pretrained=True).features

        self.stage1 = mobilenet[:4]
        self.stage2 = mobilenet[4:7]
        self.stage3 = mobilenet[7:14]
        self.stage4 = mobilenet[14:]

        self.reduce1 = nn.Conv2d(24, 256, 1)
        self.reduce2 = nn.Conv2d(32, 256, 1)
        self.reduce3 = nn.Conv2d(96, 256, 1)
        self.reduce4 = nn.Conv2d(1280, 256, 1)

        self.attn1 = WindowAttention(256, heads=4)
        self.bn1 = nn.BatchNorm1d(256)
        self.drop_path1 = DropPath(drop_path_prob)

        self.attn2 = WindowAttention(256, heads=4)
        self.bn2 = nn.BatchNorm1d(256)
        self.drop_path2 = DropPath(drop_path_prob)

        self.embed_fc = nn.Linear(256 * 4, embedding_dim)

        self.dropout = nn.Dropout(0.3)
        self.bn_final = nn.BatchNorm1d(256 * 4)


    def forward(self, x):
        x1 = self.stage1(x)
        x2 = self.stage2(x1)
        x3 = self.stage3(x2)
        x4 = self.stage4(x3)

        x1 = self.dropout(F.adaptive_avg_pool2d(self.reduce1(x1), 1).flatten(1))
        x2 = self.dropout(F.adaptive_avg_pool2d(self.reduce2(x2), 1).flatten(1))
        x3 = self.dropout(F.adaptive_avg_pool2d(self.reduce3(x3), 1).flatten(1))
        x4 = self.dropout(F.adaptive_avg_pool2d(self.reduce4(x4), 1).flatten(1))


        feats = torch.stack([x1, x2, x3, x4], dim=1)
        feats = self.attn1(feats)
        feats = self.drop_path1(feats)
        B, N, C = feats.shape
        feats = feats.view(B * N, C)
        feats = self.bn1(feats)
        feats = F.relu(feats)
        feats = feats.view(B, N, C)

        feats = self.attn2(feats)
        feats = self.drop_path2(feats)
        feats = feats.view(B * N, C)
        feats = self.bn2(feats)
        feats = F.relu(feats)
        feats = feats.view(B, N, C)

        out = feats.flatten(1)
        # embed = self.embed_fc(out)
        out = self.dropout(out)
        out = F.relu(out)
        out = self.bn_final(out)  # <-- Add in __init__
        embed = self.embed_fc(out)

        embed = F.normalize(embed, p=2, dim=1)
        return embed

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MSFF_WinAttn_MobileNet_Embedding(embedding_dim=128).to(device)

Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-b0353104.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 42.7MB/s]


# Testing

In [5]:
def load_images_from_folder(folder, image_size):
    images = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        if filename.lower().endswith((".jpg", ".jpeg", ".png")):  # Ensure only images
            img = cv2.imread(img_path)
            if img is not None:
                img = cv2.resize(img, image_size)
                images.append(img)
    return images

def create_pairs_with_distortions_balanced(base_path, save_dir, image_size=(128, 128)):
    os.makedirs(save_dir, exist_ok=True)
    os.makedirs(os.path.join(save_dir, "x1"), exist_ok=True)
    os.makedirs(os.path.join(save_dir, "x2"), exist_ok=True)

    people = [p for p in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, p))]
    pair_records = []
    pair_id = 0
    positive_pairs = []

    print("Generating positive pairs...")

    # Collect all positive pairs
    person_to_images = {}
    for person in tqdm(people):
        person_path = os.path.join(base_path, person)
        normal_images = load_images_from_folder(person_path, image_size)
        distorted_path = os.path.join(person_path, "distorted")
        distorted_images = []
        if os.path.exists(distorted_path):
            distorted_images = load_images_from_folder(distorted_path, image_size)

        all_images = normal_images + distorted_images
        person_to_images[person] = all_images

        for img1, img2 in combinations(all_images, 2):
            positive_pairs.append((img1, img2))

    num_positive = len(positive_pairs)
    print(f"Total positive pairs: {num_positive}")

    #Save positive pairs
    for img1, img2 in positive_pairs:
        x1_path = os.path.join("x1", f"pair_{pair_id}.jpg")
        x2_path = os.path.join("x2", f"pair_{pair_id}.jpg")
        cv2.imwrite(os.path.join(save_dir, x1_path), img1)
        cv2.imwrite(os.path.join(save_dir, x2_path), img2)
        pair_records.append([x1_path, x2_path, 1])
        pair_id += 1

    # Generate negative pairs equal in number to positive pairs
    print("Generating balanced negative pairs...")
    negative_pairs = []
    attempts = 0
    max_attempts = num_positive * 10  # safety cap

    while len(negative_pairs) < num_positive and attempts < max_attempts:
        person1, person2 = random.sample(people, 2)
        imgs1 = person_to_images[person1]
        imgs2 = person_to_images[person2]

        if not imgs1 or not imgs2:
            attempts += 1
            continue

        img1 = random.choice(imgs1)
        img2 = random.choice(imgs2)

        negative_pairs.append((img1, img2))
        attempts += 1


    for img1, img2 in negative_pairs:
        x1_path = os.path.join("x1", f"pair_{pair_id}.jpg")
        x2_path = os.path.join("x2", f"pair_{pair_id}.jpg")
        cv2.imwrite(os.path.join(save_dir, x1_path), img1)
        cv2.imwrite(os.path.join(save_dir, x2_path), img2)
        pair_records.append([x1_path, x2_path, 0])
        pair_id += 1


    df = pd.DataFrame(pair_records, columns=["img1", "img2", "label"])
    df.to_csv(os.path.join(save_dir, "pairs_labels.csv"), index=False)
    print(f"Total pairs saved: {len(df)} (Positive: {df['label'].sum()}, Negative: {(df['label']==0).sum()})")


In [6]:
def test_model(model, model_path, test_folder, device='cuda' if torch.cuda.is_available() else 'cpu', batch_size=32):
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    create_pairs_with_distortions_balanced(test_folder,"./test_pairs")
    test_dataset = FacePairsDataset("./test_pairs/pairs_labels.csv","./test_pairs",image_size=(128, 128),augment=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    all_preds = []
    all_labels = []
    val_loss = 0.0
    test_batches = tqdm(test_loader, desc="test", leave=False)

    with torch.no_grad():
        for img1, img2, labels in test_batches:
            img1 = img1.cuda()
            img2 = img2.cuda()
            labels = labels.cuda()

            emb1 = model(img1)
            emb2 = model(img2)

            loss = criterion(emb1, emb2, labels)
            val_loss += loss.item()

            dists = F.pairwise_distance(emb1, emb2)
            preds = (dists < threshold).long()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
    rec = recall_score(all_labels, all_preds, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)

    print(f"\n Test Accuracy: {acc:.4f}")
    print(f" Precision:     {prec:.4f}")
    print(f" Recall:        {rec:.4f}")
    print(f" F1 Score:      {f1:.4f}")

    return acc, prec, rec, f1

In [7]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torchvision import transforms

class FacePairsDataset(Dataset):
    def __init__(self, csv_path, base_dir, image_size=(128,128), augment=False):
        self.df = pd.read_csv(csv_path)
        self.base_dir = base_dir
        self.image_size = image_size
        self.augment = augment

        self.transform = A.Compose([
            A.Resize(*image_size),
            A.HorizontalFlip(p=0.5),
            A.Rotate(limit=15, p=0.5),
            A.Normalize(),
            ToTensorV2()
                ]) if augment else A.Compose([
                    A.Resize(*image_size),
                    A.Normalize(),
                    ToTensorV2()
                ])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img1_path = os.path.join(self.base_dir, row['img1'])
        img2_path = os.path.join(self.base_dir, row['img2'])

        img1 = cv2.imread(img1_path)
        img2 = cv2.imread(img2_path)

        img1 = cv2.cvtColor(img1, cv2.COLOR_BGR2RGB)
        img2 = cv2.cvtColor(img2, cv2.COLOR_BGR2RGB)

        img1 = self.transform(image=img1)['image']
        img2 = self.transform(image=img2)['image']

        label = torch.tensor(row['label'], dtype=torch.float32)

        return img1, img2, label

In [8]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, emb1, emb2, label):
        dist = F.pairwise_distance(emb1, emb2)
        loss_same = label * dist.pow(2)
        loss_diff = (1 - label) * F.relu(self.margin - dist).pow(2)
        return 0.5 * (loss_same + loss_diff).mean()

In [9]:
criterion = ContrastiveLoss(margin=1.0)


In [10]:
threshold = 0.5  # Distance threshold

In [11]:
test_folder = "/content/extracted_folder/Comys_Hackathon5/Task_B/train"
model_path = "/content/best_model_Task_B.pth"

acc, prec, rec, f1 = test_model(model, model_path, test_folder)

Generating positive pairs...


100%|██████████| 877/877 [00:06<00:00, 138.41it/s]


Total positive pairs: 34016
Generating balanced negative pairs...
Total pairs saved: 68032 (Positive: 34016, Negative: 34016)





 Test Accuracy: 0.9756
 Precision:     0.9765
 Recall:        0.9756
 F1 Score:      0.9756


In [None]:
test_folder = "/content/extracted_folder/Comys_Hackathon5/Task_B/val"
model_path = "/content/best_model_Task_B.pth"

acc, prec, rec, f1 = test_model(model, model_path, test_folder)

Generating positive pairs...


100%|██████████| 250/250 [00:02<00:00, 90.90it/s]


Total positive pairs: 619
Generating balanced negative pairs...
Total pairs saved: 1238 (Positive: 619, Negative: 619)


                                                     


 Test Accuracy: 0.7593
 Precision:     0.7850
 Recall:        0.7593
 F1 Score:      0.7537


