# Installations

# Imports

In [63]:
import os
import math
import random
import warnings
from collections import Counter

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import AdamW
from torch.utils.data import DataLoader, Dataset, Subset, ConcatDataset, TensorDataset
from torchvision import transforms, models
from torchvision.datasets import ImageFolder, CelebA
from torchvision.datasets.folder import default_loader
from torchvision.transforms import ToTensor, Compose, Resize

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import StratifiedKFold
import torchvision.utils as vutils
from tqdm import tqdm

# Suppress warnings
warnings.filterwarnings("ignore")

In [64]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Dataset: Pins Face Recognition

In [65]:
data_url = 'https://www.kaggle.com/datasets/hereisburak/pins-face-recognition/data'
dataset_path = '/kaggle/input/105_classes_pins_dataset'

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

dataset = ImageFolder(root=dataset_path, transform=transform)
class_to_idx = dataset.class_to_idx
num_classes = len(class_to_idx)

# Extract labels for stratification
labels = [label for _, label in dataset]

data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
# Load dataset to get class names
full_dataset = ImageFolder(root=dataset_path, transform=transform)
class_names = list(full_dataset.class_to_idx.keys())  # Get class (person) names

# Select 30 random classes to INCLUDE
random.seed(42)  # Ensure reproducibility
included_classes = set(random.sample(class_names, 30))
print(f"Including {len(included_classes)} classes: {included_classes}")

# Define a custom dataset that includes only the selected classes
class FilteredImageFolder(ImageFolder):
    def __init__(self, root, transform=None, included_classes=set()):
        super().__init__(root, transform=transform)
        self.filtered_samples = [
            (path, label) for path, label in self.samples
            if self.classes[label] in included_classes
        ]
        self.samples = self.filtered_samples
        self.targets = [s[1] for s in self.samples]

    def __len__(self):
        return len(self.samples)

# Load the filtered dataset
dataset = FilteredImageFolder(dataset_path, transform=transform, included_classes=included_classes)

# Create DataLoader
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# Print dataset size after filtering
print(f"Filtered dataset size: {len(dataset)}")

In [None]:
# Calculate the number of images per class
image_count_per_class = {class_name: 0 for class_name in class_to_idx.keys()}
for _, label in dataset:
    class_name = list(class_to_idx.keys())[list(class_to_idx.values()).index(label)]
    image_count_per_class[class_name] += 1

# Total number of images
total_images = len(dataset)

# Print dataset stats
print(f"Number of classes: {num_classes}")
print(f"Total number of images: {total_images}")


In [None]:
# Plot the distribution of image count per class
plt.figure(figsize=(20, 10))
plt.bar(image_count_per_class.keys(), image_count_per_class.values())
plt.xticks(rotation=90)
plt.xlabel('Classes')
plt.ylabel('Number of Images')
plt.title('Distribution of Image Count per Class')
plt.tight_layout()
plt.show()

In [None]:
# Get a batch of data
images, labels = next(iter(data_loader))

# Plot the batch of images
plt.figure(figsize=(10, 10))
plt.axis("off")
plt.title("Batch of Images")
grid_img = vutils.make_grid(images[:16], nrow=4, normalize=True)
plt.imshow(grid_img.permute(1, 2, 0))
plt.show()

# Siamease Network (Triple Loss)

## Triplet Dataset

In [73]:
# Custom ImageFolder with filtering applied
class FilteredImageFolder(ImageFolder):
    def __init__(self, samples, transform=None):
        self.samples = samples
        self.targets = [s[1] for s in self.samples]
        self.classes = sorted(set([s[1] for s in self.samples]))  # Unique class labels
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.transform = transform
        self.loader = default_loader  # ✅ Fix: Define the image loader correctly

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        img = self.loader(path)  # ✅ Now, the loader is correctly defined
        if self.transform:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.samples)
        
class TripletDataset(torch.utils.data.Dataset):
    def __init__(self, subset):
        self.subset = subset
        self.class_indices = self._get_class_indices()
    
    def _get_class_indices(self):
        """Create a mapping from class labels to dataset indices."""
        class_indices = {}
        for subset_idx, (_, label) in enumerate(self.subset):
            if label not in class_indices:
                class_indices[label] = []
            class_indices[label].append(subset_idx)
        
        # Convert to numpy arrays for efficient sampling
        return {k: np.array(v) for k, v in class_indices.items()}

    def __getitem__(self, idx):
        """Returns a (anchor, positive, negative) triplet."""

        # Select anchor image and its label
        anchor, anchor_label = self.subset[idx]

        # Ensure there are at least 2 images in this class
        if len(self.class_indices[anchor_label]) < 2:
            raise ValueError(f"Not enough samples for anchor class {anchor_label}")

        # Select a positive sample (same class)
        positive_indices = self.class_indices[anchor_label]
        positive_idx = random.choice(positive_indices[positive_indices != idx])
        positive, _ = self.subset[positive_idx]

        # Select a negative sample (different class)
        negative_labels = [l for l in self.class_indices.keys() if l != anchor_label]

        # Ensure there are available negative samples
        if not negative_labels:
            raise ValueError("No negative samples available for triplet loss training!")

        negative_label = random.choice(negative_labels)
        negative_idx = random.choice(self.class_indices[negative_label])
        negative, _ = self.subset[negative_idx]

        return anchor, positive, negative

    def __len__(self):
        return len(self.subset)

## Augmentation

### Pipline

In [74]:
augmented_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),  # Randomly crop and resize
    transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip horizontally
    transforms.RandomRotation(degrees=45),  # Random rotation
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Random translation
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Adjust color properties
    transforms.ToTensor(),  # Convert to tensor
])

### Sample

In [75]:
def augment_dataset(X, y, augmentation_transform, num_augmentations=1):
    """Expands dataset by adding augmented versions of images."""
    augmented_X, augmented_y = [], []
    
    for i in range(len(X)):
        original_image = X[i]
        augmented_X.append(original_image)
        augmented_y.append(y[i])

        for _ in range(num_augmentations):
            augmented_image = augmentation_transform(original_image)
            augmented_X.append(augmented_image)
            augmented_y.append(y[i])  # Augmented image has the same label

    return torch.stack(augmented_X), torch.tensor(augmented_y)

In [None]:
def plot_original_vs_augmented(dataset, augmentation_transform, num_samples=5):
    """
    Plots original images alongside their augmented versions for the Pins Face Recognition dataset.
    """
    fig, axes = plt.subplots(num_samples, 2, figsize=(8, num_samples * 2))

    for i in range(num_samples):
        original_image, _ = dataset[i]  # Get image and ignore label
        augmented_image = augmentation_transform(original_image)  # Apply augmentation

        # Convert tensors to NumPy format correctly
        original_image_np = original_image.permute(1, 2, 0).cpu().numpy()  # (C, H, W) -> (H, W, C)
        augmented_image_np = augmented_image.permute(1, 2, 0).cpu().numpy()

        # Normalize values if they are outside [0,1] range
        original_image_np = (original_image_np - original_image_np.min()) / (original_image_np.max() - original_image_np.min())
        augmented_image_np = (augmented_image_np - augmented_image_np.min()) / (augmented_image_np.max() - augmented_image_np.min())

        # Plot original image
        axes[i, 0].imshow(original_image_np)
        axes[i, 0].set_title("Original")
        axes[i, 0].axis("off")

        # Plot augmented image
        axes[i, 1].imshow(augmented_image_np)
        axes[i, 1].set_title("Augmented")
        axes[i, 1].axis("off")

    plt.tight_layout()
    plt.show()

# ✅ Split dataset into train and test
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

# ✅ Test the function with the train dataset
plot_original_vs_augmented(train_dataset, augmentation_transform, num_samples=5)

## Training Loop

In [90]:
def train_triplet_model(model, train_loader, val_loader=None, device='cuda', epochs=10, lr=1e-4, weight_decay=1e-4):
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.TripletMarginLoss(margin=1.0, p=2)

    # Track loss and classification metrics
    history = {
        'train_loss': [], 'train_acc': [], 'train_f1': []
    }

    if val_loader is not None:
        history.update({'val_loss': [], 'val_acc': [], 'val_f1': []})

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        y_true_train, y_pred_train = [], []

        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", leave=False)
        for anchor, positive, negative in progress_bar:
            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
            optimizer.zero_grad()

            # Forward pass
            anchor_out, positive_out, negative_out = model(anchor, positive, negative)

            # Compute triplet loss
            loss = criterion(anchor_out, positive_out, negative_out)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

            # Convert embeddings to binary predictions for classification metrics
            pos_dist = torch.norm(anchor_out - positive_out, p=2, dim=1)
            neg_dist = torch.norm(anchor_out - negative_out, p=2, dim=1)
            preds = (pos_dist < neg_dist).cpu().numpy()
            y_true = np.ones_like(preds)

            y_true_train.extend(y_true)
            y_pred_train.extend(preds)
            
            progress_bar.set_postfix(loss=loss.item())

        # Compute training metrics
        train_acc = accuracy_score(y_true_train, y_pred_train)
        train_f1 = f1_score(y_true_train, y_pred_train, zero_division=0)

        history['train_loss'].append(train_loss / len(train_loader))
        history['train_acc'].append(train_acc)
        history['train_f1'].append(train_f1)

        # Validation Phase (if val_loader is available)
        if val_loader is not None:
            model.eval()
            val_loss = 0
            y_true_val, y_pred_val = [], []

            with torch.no_grad():
                progress_bar_val = tqdm(val_loader, desc="Validation", leave=False)
                for anchor, positive, negative in progress_bar_val:
                    anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
                    
                    anchor_out, positive_out, negative_out = model(anchor, positive, negative)
                    loss = criterion(anchor_out, positive_out, negative_out)
                    val_loss += loss.item()

                    # Convert embeddings to predictions for classification metrics
                    pos_dist = torch.norm(anchor_out - positive_out, p=2, dim=1)
                    neg_dist = torch.norm(anchor_out - negative_out, p=2, dim=1)
                    preds = (pos_dist < neg_dist).cpu().numpy()
                    y_true = np.ones_like(preds)

                    y_true_val.extend(y_true)
                    y_pred_val.extend(preds)
                    
                    progress_bar_val.set_postfix(loss=loss.item())

            # Compute validation metrics
            val_acc = accuracy_score(y_true_val, y_pred_val)
            val_f1 = f1_score(y_true_val, y_pred_val, zero_division=0)

            history['val_loss'].append(val_loss / len(val_loader))
            history['val_acc'].append(val_acc)
            history['val_f1'].append(val_f1)

            print(f"Epoch [{epoch+1}/{epochs}] | Train Loss: {train_loss / len(train_loader):.4f} | Val Loss: {val_loss / len(val_loader):.4f}")
            print(f"Train: Acc: {train_acc:.4f}, F1: {train_f1:.4f}")
            print(f"Val: Acc: {val_acc:.4f}, F1: {val_f1:.4f}")

        else:
            print(f"Epoch [{epoch+1}/{epochs}] | Train Loss: {train_loss / len(train_loader):.4f}")
            print(f"Train: Acc: {train_acc:.4f}, F1: {train_f1:.4f}")

    return history


## Architecture

In [97]:
class TripletNetwork(nn.Module):
    def __init__(self):
        super(TripletNetwork, self).__init__()
        
        base_model = models.resnet50(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(base_model.children())[:-1])  # Remove the final FC layer

        self.fc1 = nn.Linear(2048, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.dropout1 = nn.Dropout(0.3)

        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.dropout2 = nn.Dropout(0.3)

    def forward_once(self, x):
        x = self.feature_extractor(x)
        x = torch.flatten(x, start_dim=1)  # Flatten to vector
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout1(x)
        x = F.relu(self.bn2(self.fc2(x)))
        x = self.dropout2(x)
        return x  # Returns a feature vector instead of a similarity score

    def forward(self, anchor, positive, negative):
        anchor_embedding = self.forward_once(anchor)
        positive_embedding = self.forward_once(positive)
        negative_embedding = self.forward_once(negative)
        return anchor_embedding, positive_embedding, negative_embedding

## Tripelet Selection Logic

In [None]:
def create_triplets(X, y):
    """Creates triplets (anchor, positive, negative) for Triplet Loss training."""
    triplets = []
    num_classes = len(torch.unique(y))
    class_indices = [torch.where(y == i)[0] for i in range(num_classes)]

    for class_id in range(num_classes):
        indices = class_indices[class_id]
        num_samples = len(indices)

        if num_samples < 2:
            continue  # Skip classes with fewer than 2 images

        # Create triplets
        for _ in range(num_samples // 2):
            anchor, positive = np.random.choice(indices, 2, replace=False)
            negative_class = random.choice([c for c in range(num_classes) if c != class_id])
            negative = random.choice(class_indices[negative_class])

            triplets.append((X[anchor], X[positive], X[negative]))

    # Convert to PyTorch tensors
    anchor_tensor = torch.stack([t[0] for t in triplets])
    positive_tensor = torch.stack([t[1] for t in triplets])
    negative_tensor = torch.stack([t[2] for t in triplets])

    return anchor_tensor, positive_tensor, negative_tensor

## Metrics Plot

In [None]:
def plot_training_metrics(history):
    """Plots training (and optionally validation) loss, accuracy, and F1-score over epochs."""
    epochs = range(1, len(history["train_loss"]) + 1)
    has_validation = "val_loss" in history  # Check if validation metrics exist

    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    fig.suptitle("Training Metrics Over Epochs", fontsize=16, fontweight="bold")

    # Plot Loss
    axes[0].plot(epochs, history["train_loss"], label="Train Loss")
    if has_validation:
        axes[0].plot(epochs, history["val_loss"], label="Validation Loss", linestyle="dashed")
    axes[0].set_title("Loss Curve")
    axes[0].set_xlabel("Epochs")
    axes[0].set_ylabel("Loss")
    axes[0].legend()

    # Plot Accuracy
    axes[1].plot(epochs, history["train_acc"], label="Train Accuracy")
    if has_validation:
        axes[1].plot(epochs, history["val_acc"], label="Validation Accuracy", linestyle="dashed")
    axes[1].set_title("Accuracy Curve")
    axes[1].set_xlabel("Epochs")
    axes[1].set_ylabel("Accuracy")
    axes[1].legend()

    # Plot F1-score
    axes[2].plot(epochs, history["train_f1"], label="Train F1-score")
    if has_validation:
        axes[2].plot(epochs, history["val_f1"], label="Validation F1-score", linestyle="dashed")
    axes[2].set_title("F1-score Curve")
    axes[2].set_xlabel("Epochs")
    axes[2].set_ylabel("F1-score")
    axes[2].legend()

    plt.tight_layout(rect=[0, 0, 1, 0.95])  # Adjust layout
    plt.show()

## Dataset Prepreparation

In [None]:
# Map old labels to new ones
new_samples = [(path, class_to_idx_filtered[full_dataset.classes[label]]) for path, label in filtered_samples]

# Define transformations without augmentation for original dataset
original_transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images
    transforms.ToTensor(),  # Convert to tensor
])

# Load datasets separately
original_dataset = FilteredImageFolder(new_samples, transform=original_transform)  # Original images
augmented_dataset = FilteredImageFolder(new_samples, transform=augmented_transform)  # Augmented images

# Combine both datasets
combined_dataset = ConcatDataset([original_dataset, augmented_dataset])

# Extract updated labels for stratification
labels = [combined_dataset.datasets[0].targets[i] for i in range(len(combined_dataset.datasets[0]))] + \
         [combined_dataset.datasets[1].targets[i] for i in range(len(combined_dataset.datasets[1]))]

## Cros Validation


In [None]:
# 5-Fold Cross Validation
kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for fold_idx, (train_idx, test_idx) in enumerate(kf.split(range(len(combined_dataset)), labels)):
    print(f"\n⭕ Training Fold {fold_idx + 1}...\n")

    # Create subsets
    train_subset = Subset(combined_dataset, train_idx)
    test_subset = Subset(combined_dataset, test_idx)

    # Apply different transformations for test set (no augmentation)
    train_dataset = TripletDataset(train_subset)  # Train dataset with original and augmented images
    test_dataset = TripletDataset(test_subset)  # Test dataset with only resizing and normalization

    # Create DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

    # Define model
    model = TripletNetwork().to(device)

    # Train model
    history = train_triplet_model(model, train_loader, test_loader, device, epochs=20, lr=1e-4)

    # Save model for the fold
    save_model(model, f"triplet_model_fold{fold_idx + 1}.pth")

    # Plot training metrics
    plot_training_metrics(history)

## Train On Full Dataset

In [None]:
# Convert to TripletDataset
train_dataset = TripletDataset(combined_dataset)

# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

# Define model
model = TripletNetwork().to(device)

# Train model (no validation set)
history = train_triplet_model(model, train_loader, None, device, epochs=20, lr=1e-4)

save_model(model, "triplet_model.pth")

# Plot training metrics
plot_training_metrics(history)