In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset, random_split
from tqdm import tqdm
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

In [None]:
DEVICE       = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE   = 64
EPOCHS       = 20
LR_HEAD      = 1e-3      
LR_FULL      = 3e-5      
WEIGHT_DECAY = 1e-4
EMBEDDING_DIM = 512
DROPOUT      = 0.4       
NUM_WORKERS  = 2
VAL_SPLIT    = 0.2
TEMPERATURE  = 0.07      
GRAD_CLIP    = 1.0

FACE_DIR        = "/kaggle/input/face-fingerprint-dataset/face"
FINGERPRINT_DIR = "/kaggle/input/face-fingerprint-dataset/fingerprint"

print(f"Using device: {DEVICE}")
print(f"Face samples:        {len(os.listdir(FACE_DIR))}")
print(f"Fingerprint samples: {len(os.listdir(FINGERPRINT_DIR))}")

In [None]:
class PairedFaceFingerDataset(Dataset):
    """
    Pairs face and fingerprint images by sorted filename index.
    Assumes filenames encode subject identity so that sorting
    aligns face[i] with fingerprint[i].

    If you have a CSV with (face_file, fingerprint_file, label)
    columns, replace __init__ accordingly.
    """
    def __init__(self, face_dir, fp_dir, transform=None):
        self.face_dir  = face_dir
        self.fp_dir    = fp_dir
        self.transform = transform

        face_files = sorted(os.listdir(face_dir))
        fp_files   = sorted(os.listdir(fp_dir))

     
        n = min(len(face_files), len(fp_files))
        self.face_files = face_files[:n]
        self.fp_files   = fp_files[:n]


        self.labels = []
        for fname in self.face_files:
            stem = os.path.splitext(fname)[0]
            digits = ''.join(filter(str.isdigit, stem.split('_')[0]))
            self.labels.append(int(digits) if digits else 0)

    def __len__(self):
        return len(self.face_files)

    def __getitem__(self, idx):
        face_img = Image.open(
            os.path.join(self.face_dir, self.face_files[idx])
        ).convert("RGB")
        fp_img = Image.open(
            os.path.join(self.fp_dir, self.fp_files[idx])
        ).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            face_img = self.transform(face_img)
            fp_img   = self.transform(fp_img)

        return face_img, fp_img, torch.tensor(label, dtype=torch.long)


In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])


In [None]:
full_dataset = PairedFaceFingerDataset(
    face_dir=FACE_DIR,
    fp_dir=FINGERPRINT_DIR,
    transform=train_transforms   
)

val_size   = int(len(full_dataset) * VAL_SPLIT)
train_size = len(full_dataset) - val_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])


val_dataset.dataset.transform = val_transforms  

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
                          shuffle=True,  num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE,
                          shuffle=False, num_workers=NUM_WORKERS, pin_memory=True)

print(f"Train pairs: {train_size} | Val pairs: {val_size}")


In [None]:
class SingleEncoder(nn.Module):
    """EfficientNet-B3 backbone → L2-normalised embedding."""
    def __init__(self, embedding_dim: int, dropout: float):
        super().__init__()
        backbone = models.efficientnet_b3(weights="IMAGENET1K_V1")
        in_features = backbone.classifier[1].in_features

        # Freeze backbone for Phase 1
        for param in backbone.features.parameters():
            param.requires_grad = False

        backbone.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(in_features, 1024),
            nn.ReLU(),
            nn.Dropout(dropout / 2),
            nn.Linear(1024, embedding_dim),
            nn.BatchNorm1d(embedding_dim)
        )
        self.backbone = backbone

    def forward(self, x):
        return F.normalize(self.backbone(x), dim=1) 

    def unfreeze_backbone(self):
        for param in self.backbone.features.parameters():
            param.requires_grad = True


class DualEncoderModel(nn.Module):
    """Two independent encoders share the same embedding space."""
    def __init__(self, embedding_dim: int = EMBEDDING_DIM, dropout: float = DROPOUT):
        super().__init__()
        self.face_encoder = SingleEncoder(embedding_dim, dropout)
        self.fp_encoder   = SingleEncoder(embedding_dim, dropout)

    def forward(self, face, fingerprint):
        face_emb = self.face_encoder(face)
        fp_emb   = self.fp_encoder(fingerprint)
        return face_emb, fp_emb

    def unfreeze_all(self):
        self.face_encoder.unfreeze_backbone()
        self.fp_encoder.unfreeze_backbone()


model = DualEncoderModel().to(DEVICE)
print(model)

In [None]:
class NTXentLoss(nn.Module):
    """
    Normalized Temperature-scaled Cross Entropy Loss.
    Treats each (face_i, fingerprint_i) pair as a positive;
    all other combinations within the batch are negatives.
    """
    def __init__(self, temperature: float = TEMPERATURE):
        super().__init__()
        self.temperature = temperature
        self.criterion   = nn.CrossEntropyLoss()

    def forward(self, face_emb, fp_emb):
        
        batch_size = face_emb.size(0)

       
        embeddings = torch.cat([face_emb, fp_emb], dim=0)          
        sim = torch.mm(embeddings, embeddings.T) / self.temperature  

        
        mask = torch.eye(2 * batch_size, dtype=torch.bool, device=DEVICE)
        sim.masked_fill_(mask, float('-inf'))

        
        labels = torch.arange(batch_size, device=DEVICE)
        labels = torch.cat([labels + batch_size, labels])

        loss = self.criterion(sim, labels)
        return loss


criterion = NTXentLoss(temperature=TEMPERATURE)


In [None]:
def get_phase1_optimizer(model):
    """Phase 1: Only train the classifier heads."""
    params = list(model.face_encoder.backbone.classifier.parameters()) + \
             list(model.fp_encoder.backbone.classifier.parameters())
    return optim.AdamW(params, lr=LR_HEAD, weight_decay=WEIGHT_DECAY)

def get_phase2_optimizer(model):
    """Phase 2: Fine-tune everything."""
    return optim.AdamW(model.parameters(), lr=LR_FULL, weight_decay=WEIGHT_DECAY)

def get_scheduler(optimizer, epochs):
    return optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)


In [None]:
scaler = torch.cuda.amp.GradScaler(enabled=DEVICE.type == "cuda")

def train_one_epoch(model, loader, optimizer):
    model.train()
    running_loss = 0.0

    for face_imgs, fp_imgs, _ in tqdm(loader, desc="  Train", leave=False):
        face_imgs = face_imgs.to(DEVICE, non_blocking=True)
        fp_imgs   = fp_imgs.to(DEVICE,   non_blocking=True)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast(enabled=DEVICE.type == "cuda"):
            face_emb, fp_emb = model(face_imgs, fp_imgs)
            loss = criterion(face_emb, fp_emb)

        scaler.scale(loss).backward()

        
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)

        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    return running_loss / len(loader)


@torch.no_grad()
def validate(model, loader):
    model.eval()
    running_loss = 0.0

    for face_imgs, fp_imgs, _ in tqdm(loader, desc="  Val  ", leave=False):
        face_imgs = face_imgs.to(DEVICE, non_blocking=True)
        fp_imgs   = fp_imgs.to(DEVICE,   non_blocking=True)

        with torch.cuda.amp.autocast(enabled=DEVICE.type == "cuda"):
            face_emb, fp_emb = model(face_imgs, fp_imgs)
            loss = criterion(face_emb, fp_emb)

        running_loss += loss.item()

    return running_loss / len(loader)

In [None]:
history = {"train_loss": [], "val_loss": []}


PHASE1_EPOCHS = 5
print("\n=== PHASE 1: Training classifier heads (backbone frozen) ===")
optimizer  = get_phase1_optimizer(model)
scheduler  = get_scheduler(optimizer, PHASE1_EPOCHS)

for epoch in range(1, PHASE1_EPOCHS + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer)
    val_loss   = validate(model, val_loader)
    scheduler.step()
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    print(f"  [Phase 1] Epoch {epoch:02d}/{PHASE1_EPOCHS} | "
          f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

PHASE2_EPOCHS = EPOCHS - PHASE1_EPOCHS
print(f"\n=== PHASE 2: Full fine-tuning ({PHASE2_EPOCHS} epochs) ===")
model.unfreeze_all()
optimizer = get_phase2_optimizer(model)
scheduler = get_scheduler(optimizer, PHASE2_EPOCHS)

best_val_loss  = float("inf")
patience       = 5
no_improve     = 0

for epoch in range(1, PHASE2_EPOCHS + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer)
    val_loss   = validate(model, val_loader)
    scheduler.step()
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    print(f"  [Phase 2] Epoch {epoch:02d}/{PHASE2_EPOCHS} | "
          f"Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")


    if val_loss < best_val_loss:
        best_val_loss = val_loss
        no_improve    = 0
        torch.save({
            "epoch":                epoch,
            "model_state_dict":     model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "val_loss":             val_loss,
        }, "best_dual_encoder.pth")
        print(f"    ✔ New best model saved (val_loss={val_loss:.4f})")
    else:
        no_improve += 1
        if no_improve >= patience:
            print(f"    ⚑ Early stopping at epoch {epoch} (no improvement for {patience} epochs)")
            break

In [None]:
plt.figure(figsize=(9, 4))
plt.plot(history["train_loss"], label="Train Loss", linewidth=2)
plt.plot(history["val_loss"],   label="Val Loss",   linewidth=2, linestyle="--")
plt.axvline(PHASE1_EPOCHS - 1, color="gray", linestyle=":", label="Phase 1 → 2")
plt.xlabel("Epoch")
plt.ylabel("NT-Xent Loss")
plt.title("Dual-Encoder Training Curve")
plt.legend()
plt.tight_layout()
plt.savefig("training_curve.png", dpi=150)
plt.show()
print("Loss curve saved → training_curve.png")

In [None]:
torch.save(model.face_encoder.state_dict(), "face_encoder_final.pth")
torch.save(model.fp_encoder.state_dict(),   "fp_encoder_final.pth")
print("Saved: face_encoder_final.pth | fp_encoder_final.pth")


print("\n=== Embedding Similarity Demo (first batch) ===")
model.eval()
face_sample, fp_sample, labels = next(iter(val_loader))
face_sample = face_sample.to(DEVICE)
fp_sample   = fp_sample.to(DEVICE)

with torch.no_grad():
    face_emb, fp_emb = model(face_sample, fp_sample)

cosine_sims = (face_emb * fp_emb).sum(dim=1)  
print(f"  Mean cosine similarity (positive pairs): {cosine_sims.mean().item():.4f}")
print(f"  Min:  {cosine_sims.min().item():.4f}  |  Max: {cosine_sims.max().item():.4f}")
