### **Install & Import necessary libraries**

In [None]:
!pip install -q kaggle
!pip install torch torchvision tqdm imageio
!pip install torchinfo


In [None]:
import os
import csv
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image
import imageio
import itertools
from google.colab import files
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, auc
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import roc_curve, auc

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
from torchinfo import summary
import timm


In [None]:
# Set random seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed()


In [None]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')


### **Kaggle API Configuration**

In [None]:
# Prompt the user to upload the kaggle.json file
print("Please upload your kaggle.json file.")
uploaded = files.upload()


In [None]:
# Create the .kaggle directory and move the kaggle.json file
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


### **Download and Extract the Dataset**

In [None]:
# Define the dataset URL
dataset_url = "leslietiong/cmfpdb"


In [None]:
# Download the dataset
!kaggle datasets download -d {dataset_url}

# Unzip the dataset
!unzip -q cmfpdb.zip -d ./cmfpdb



**Dataset Structure:**
```
Cross-modal Face-Periocular Dataset/
|---- Hispanic/
|---- |---- Alice
|---- |---- |---- face
|---- |---- |---- |---- 10.jpg
|---- |---- |---- |---- 105.jpg
|---- |---- |---- |---- ......
|---- |---- |---- Ocular_left
|---- |---- |---- |---- 10.jpg
|---- |---- |---- |---- 105.jpg
|---- |---- |---- |---- ......
|---- |---- |---- Ocular_right
|---- |---- |---- |---- 10.jpg
|---- |---- |---- |---- 105.jpg
|---- |---- |---- |---- ......
|---- |---- ....
|---- East Asian/
|---- South Asian/
|---- Caucasian/
|---- Middle Eastern/
|---- Melanesian/
|---- African/
```



### **Preprocess the Data**


*   Label-1: same person, same region
*   Label-0: different person, same region

In [None]:
dataset_dir = '/content/cmfpdb/Cross-modal Face-Periocular Dataset'

person_images = {}  # {person_id: [image_paths]}
region_persons = {}  # {region: [person_ids]}

person_id_counter = 0
subfolder = 'face'

# Traverse the dataset directory
print("Collecting image paths...")
for region in os.listdir(dataset_dir):
    region_path = os.path.join(dataset_dir, region)
    if os.path.isdir(region_path):
        region_persons[region] = []
        for person in os.listdir(region_path):
            person_path = os.path.join(region_path, person)
            if os.path.isdir(person_path):
                person_id = f"{region}_{person_id_counter}"
                person_id_counter += 1
                images = []
                subfolder_path = os.path.join(person_path, subfolder)
                if os.path.exists(subfolder_path):
                    for img_file in os.listdir(subfolder_path):
                        if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                            img_path = os.path.join(subfolder_path, img_file)
                            images.append(img_path)
                if len(images) >= 2:
                    person_images[person_id] = images
                    region_persons[region].append(person_id)
print("Finished collecting image paths.")


print("Generating Pairs...")
all_pairs = []

# Create same-person pairs (100 per person)
for person_id, images in tqdm(person_images.items(), desc="Generating Same-Person Pairs"):
    possible_pairs = list(itertools.combinations(images, 2))
    if len(possible_pairs) > 100:
        selected_pairs = random.sample(possible_pairs, 100)
    else:
        selected_pairs = possible_pairs

    for img1, img2 in selected_pairs:
        all_pairs.append([img1, img2, '1'])

# Create different-person pairs (100 per person)
for person_id, images in tqdm(person_images.items(), desc="Generating Different-Person Pairs"):
    other_person_ids = list(person_images.keys())
    other_person_ids.remove(person_id)

    selected_diff_pairs = []
    while len(selected_diff_pairs) < 100:
        other_person_id = random.choice(other_person_ids)
        other_person_images = person_images[other_person_id]
        other_img = random.choice(other_person_images)

        img1 = random.choice(images)
        selected_diff_pairs.append([img1, other_img, '0'])

    all_pairs.extend(selected_diff_pairs)

print(f"Total pairs generated: {len(all_pairs)}")

output_csv = 'image_pairs_face.csv'
print(f"Writing pairs to {output_csv}...")
with open(output_csv, 'w', newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    csvwriter.writerow(['Image1', 'Image2', 'Label'])
    csvwriter.writerows(all_pairs)

print("Done!")



In [None]:
# Load the existing CSV file
face_csv = '/content/image_pairs_face.csv'
df_face = pd.read_csv(face_csv)

# Function to generate new CSV by replacing 'face' with the specified subfolder
def generate_ocular_csv(subfolder_name):
    df_ocular = df_face.copy()
    df_ocular['Image1'] = df_ocular['Image1'].str.replace('/face/', f'/{subfolder_name}/')
    df_ocular['Image2'] = df_ocular['Image2'].str.replace('/face/', f'/{subfolder_name}/')

    existing_rows = []
    for index, row in df_ocular.iterrows():
        img1_path = row['Image1']
        img2_path = row['Image2']
        if os.path.exists(img1_path) and os.path.exists(img2_path):
            existing_rows.append(row)
        else:
            pass

    df_existing = pd.DataFrame(existing_rows)
    output_csv = f'image_pairs_{subfolder_name}.csv'
    df_existing.to_csv(output_csv, index=False)
    print(f"Generated {output_csv} with {len(df_existing)} valid pairs.")

# Generate 'image_pairs_ocular_left.csv' and 'image_pairs_ocular_right.csv'
generate_ocular_csv('ocular_left')
generate_ocular_csv('ocular_right')


### **Define Dataset and DataLoader**

In [None]:
# Define the transformation for face images
transform_face = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Define the transformation for ocular images
transform_eyes = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])


In [None]:
class SiameseDataset(Dataset):
    def __init__(self, face_df, ocular_left_df, ocular_right_df, transform_face=None, transform_eyes=None):
        self.face_df = face_df
        self.ocular_left_df = ocular_left_df
        self.ocular_right_df = ocular_right_df
        self.transform_face = transform_face
        self.transform_eyes = transform_eyes

    def __len__(self):
        return len(self.face_df)

    def __getitem__(self, idx):
        person1_face = Image.open(self.face_df['Image1'][idx]).convert('RGB')
        person1_ocular_left = Image.open(self.ocular_left_df['Image1'][idx]).convert('RGB')
        person1_ocular_right = Image.open(self.ocular_right_df['Image1'][idx]).convert('RGB')
        person2_face = Image.open(self.face_df['Image2'][idx]).convert('RGB')
        person2_ocular_left = Image.open(self.ocular_left_df['Image2'][idx]).convert('RGB')
        person2_ocular_right = Image.open(self.ocular_right_df['Image2'][idx]).convert('RGB')

        # Apply transformations
        if self.transform_face:
            person1_face = self.transform_face(person1_face)
            person2_face = self.transform_face(person2_face)
        if self.transform_eyes:
            person1_ocular_left = self.transform_eyes(person1_ocular_left)
            person1_ocular_right = self.transform_eyes(person1_ocular_right)
            person2_ocular_left = self.transform_eyes(person2_ocular_left)
            person2_ocular_right = self.transform_eyes(person2_ocular_right)

        # Create label (1 for same person, 0 for different person)
        label = torch.tensor(int(self.face_df['Label'][idx]), dtype=torch.float32)

        return (person1_face, person1_ocular_left, person1_ocular_right,
                person2_face, person2_ocular_left, person2_ocular_right, label)


In [None]:
# Load CSV Files
face_df = pd.read_csv('/content/image_pairs_face.csv')
ocular_left_df = pd.read_csv('/content/image_pairs_ocular_left.csv')
ocular_right_df = pd.read_csv('/content/image_pairs_ocular_right.csv')

# Create the dataset/dataloader
siamese_dataset = SiameseDataset(face_df, ocular_left_df, ocular_right_df,
                                 transform_face=transform_face,
                                 transform_eyes=transform_eyes)


In [None]:
# Split the dataset
train_size = int(0.7 * len(siamese_dataset))
val_size = int(0.15 * len(siamese_dataset))
test_size = len(siamese_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    siamese_dataset, [train_size, val_size, test_size],
    generator=torch.Generator().manual_seed(42)  # Ensure reproducibility
)


batch_size = 32
num_workers = 0

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)




### **Define the Model (Siamese mViT)**

In [None]:
class MobileViTEncoder(nn.Module):
    def __init__(self, embedding_dim=256):
        super(MobileViTEncoder, self).__init__()
        # Initialize MobileViT model from timm
        self.mobilevit = timm.create_model('mobilevit_xxs', pretrained=True, num_classes=0)
        self.feature_dim = self.mobilevit.num_features
        self.fc = nn.Linear(self.feature_dim * 3, embedding_dim)
        self.embedding_dim = embedding_dim

    def forward(self, face, ocular_left, ocular_right):
        face_feat = self.mobilevit(face)
        ocular_left_feat = self.mobilevit(ocular_left)
        ocular_right_feat = self.mobilevit(ocular_right)

        # Concatenate features from all three regions
        combined = torch.cat([face_feat, ocular_left_feat, ocular_right_feat], dim=1)  # Shape: (batch_size, feature_dim*3)

        embedding = F.relu(self.fc(combined))

        return embedding



class SiameseMobileViTFace(nn.Module):
    def __init__(self, encoder, hidden_dim=512):
        super(SiameseMobileViTFace, self).__init__()
        self.encoder = encoder  # Shared encoder
        self.hidden_dim = hidden_dim

        self.fc1 = nn.Linear(encoder.embedding_dim, self.hidden_dim)
        self.fc2 = nn.Linear(self.hidden_dim, 1)

    def forward(self, face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2):
        emb1 = self.encoder(face1, ocular_left1, ocular_right1)
        emb2 = self.encoder(face2, ocular_left2, ocular_right2)

        diff = torch.abs(emb1 - emb2)

        x = F.relu(self.fc1(diff))
        logits = self.fc2(x)

        return logits.squeeze(1)



### **Training the Model**

In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)


In [None]:
def train_model(model, train_loader, val_loader, num_epochs=20, lr=1e-4):
    model.to(device)

    labels = train_loader.dataset.dataset.face_df['Label']
    class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
    pos_weight = class_weights[1] / class_weights[0]

    # Loss function and optimizer
    criterion = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight).to(device))
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    best_val_acc = 0
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        model.train()
        running_loss = 0.0

        for batch in tqdm(train_loader, desc="Training"):
            face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2, labels = batch
            face1, ocular_left1, ocular_right1 = face1.to(device), ocular_left1.to(device), ocular_right1.to(device)
            face2, ocular_left2, ocular_right2 = face2.to(device), ocular_left2.to(device), ocular_right2.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        print(f"Training Loss: {avg_loss:.4f}")

        # Validation step
        val_acc, val_loss = evaluate_model(model, val_loader, device, criterion)
        print(f"Validation Accuracy: {val_acc:.4f}, Validation Loss: {val_loss:.4f}")

        # Step the scheduler
        scheduler.step()

        # Save the best model based on validation accuracy
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), "best_siamese_mobilevitface_model.pth")
            model_path_drive = '/content/drive/My Drive/Research/FacialRecognition/best_siamese_mobilevitface_model.pth'
            torch.save(model.state_dict(), model_path_drive)
            print(f"Best model saved to Google Drive at: {model_path_drive}")

    print("Training Complete!")

def evaluate_model(model, data_loader, device, criterion):
    model.eval()
    all_preds = []
    all_labels = []
    running_loss = 0.0

    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Evaluating"):
            face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2, labels = batch
            face1, ocular_left1, ocular_right1 = face1.to(device), ocular_left1.to(device), ocular_right1.to(device)
            face2, ocular_left2, ocular_right2 = face2.to(device), ocular_left2.to(device), ocular_right2.to(device)
            labels = labels.to(device)

            outputs = model(face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            preds = torch.sigmoid(outputs) >= 0.5  # Threshold at 0.5
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    avg_loss = running_loss / len(data_loader)
    return accuracy, avg_loss

def evaluate_on_test_set(model, test_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for batch in tqdm(test_loader, desc="Testing"):
            face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2, labels = batch
            face1, ocular_left1, ocular_right1 = face1.to(device), ocular_left1.to(device), ocular_right1.to(device)
            face2, ocular_left2, ocular_right2 = face2.to(device), ocular_left2.to(device), ocular_right2.to(device)
            labels = labels.to(device)

            outputs = model(face1, ocular_left1, ocular_right1, face2, ocular_left2, ocular_right2)
            probs = torch.sigmoid(outputs)
            preds = (probs >= 0.5).int()

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Compute metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, zero_division=0)
    recall = recall_score(all_labels, all_preds, zero_division=0)
    f1 = f1_score(all_labels, all_preds, zero_division=0)
    cm = confusion_matrix(all_labels, all_preds)

    # Print metrics
    print(f"\nTest Accuracy: {accuracy:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall: {recall:.4f}")
    print(f"Test F1-Score: {f1:.4f}")
    print("Confusion Matrix:")
    print(cm)

    # Plot Confusion Matrix
    plt.figure(figsize=(6,4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=['Different', 'Same'],
                yticklabels=['Different', 'Same'])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.show()

    # Plot ROC Curve
    plot_roc_curve(all_labels, all_probs)

def plot_roc_curve(labels, probs):
    fpr, tpr, thresholds = roc_curve(labels, probs)
    roc_auc = auc(fpr, tpr)

    plt.figure(figsize=(8,6))
    plt.plot(fpr, tpr, color='darkorange',
             lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc="lower right")
    plt.show()


In [None]:
import gc
gc.collect()

torch.cuda.empty_cache()
# del variable_name


In [None]:
# Initialize MobileViT Encoder
encoder = MobileViTEncoder(embedding_dim=256)

# Initialize Siamese MobileViTFace Model
model = SiameseMobileViTFace(encoder=encoder, hidden_dim=512)

summary(model, input_size=[
    (1, 3, 128, 128),  # face1
    (1, 3, 128, 128),  # ocular_left1
    (1, 3, 128, 128),  # ocular_right1
    (1, 3, 128, 128),  # face2
    (1, 3, 128, 128),  # ocular_left2
    (1, 3, 128, 128),  # ocular_right2
])


### **Start training**

In [None]:
# training
train_model(model, train_loader, val_loader, num_epochs=20, lr=1e-4)


### **Load trained Model**

In [None]:
def load_siamese_model(model_path, device):
    encoder = MobileViTEncoder(embedding_dim=256)
    model = SiameseMobileViTFace(encoder=encoder, hidden_dim=512)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model

# Specify the path to the best model saved on Google Drive
model_path = '/content/drive/My Drive/Research/FacialRecognition/best_siamese_mobilevitface_model.pth'

# Load the model
model = load_siamese_model(model_path, device)
print("Model loaded successfully.")


### **Evaluation**

In [None]:
evaluate_on_test_set(model, test_loader, device)
