In [126]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd

# 1. Define data paths
root_dir = '/Users/dionbaldsing/Documents/CMU/AIFORMED/projectFahad/archiveLFW'
images_dir = os.path.join(root_dir, 'lfw-deepfunneled', 'lfw-deepfunneled')

# Paths to the  CSV files
matchpairs_train_csv = os.path.join(root_dir, 'matchpairsDevTrain.csv')
mismatchpairs_train_csv = os.path.join(root_dir, 'mismatchpairsDevTrain.csv')
matchpairs_test_csv = os.path.join(root_dir, 'matchpairsDevTest.csv')
mismatchpairs_test_csv = os.path.join(root_dir, 'mismatchpairsDevTest.csv')

#  transformations
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    # Add normalization if needed
    # transforms.Normalize(mean=[0.5], std=[0.5])
])

#  Dataset class boiler code from snippets taken from kaggle.com
class LFWDataset(Dataset):
    def __init__(self, matches_csv, mismatches_csv, images_dir, transform=None):
        self.images_dir = images_dir
        self.transform = transform

        # Load matched pairs
        self.matches = pd.read_csv(matches_csv)
        self.matches['label'] = 1  # Matched pairs have label 1

        # Adjusted column names for matched pairs
        self.matches['img1_path'] = self.matches.apply(
            lambda row: os.path.join(
                self.images_dir,
                row['name'].strip(),
                f"{row['name'].strip()}_{int(row['imagenum1']):04d}.jpg"
            ), axis=1)
        self.matches['img2_path'] = self.matches.apply(
            lambda row: os.path.join(
                self.images_dir,
                row['name'].strip(),
                f"{row['name'].strip()}_{int(row['imagenum2']):04d}.jpg"
            ), axis=1)

        # Load mismatched pairs
        self.mismatches = pd.read_csv(mismatches_csv)
        self.mismatches['label'] = 0  # Mismatched pairs have label 0

        # Adjusted column names for mismatched pairs
        self.mismatches['img1_path'] = self.mismatches.apply(
            lambda row: os.path.join(
                self.images_dir,
                row['name'].strip(),
                f"{row['name'].strip()}_{int(row['imagenum1']):04d}.jpg"
            ), axis=1)
        self.mismatches['img2_path'] = self.mismatches.apply(
            lambda row: os.path.join(
                self.images_dir,
                row['name.1'].strip(),
                f"{row['name.1'].strip()}_{int(row['imagenum2']):04d}.jpg"
            ), axis=1)

        # Combine matches and mismatches
        self.data = pd.concat(
            [self.matches[['img1_path', 'img2_path', 'label']],
             self.mismatches[['img1_path', 'img2_path', 'label']]],
            ignore_index=True)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img1_path = self.data.iloc[idx]['img1_path']
        img2_path = self.data.iloc[idx]['img2_path']
        label = self.data.iloc[idx]['label']

        # Load images with error handling
        try:
            img1 = Image.open(img1_path).convert('RGB')
        except FileNotFoundError:
            print(f"File not found: {img1_path}")
            return self.__getitem__((idx + 1) % len(self))

        try:
            img2 = Image.open(img2_path).convert('RGB')
        except FileNotFoundError:
            print(f"File not found: {img2_path}")
            return self.__getitem__((idx + 1) % len(self))

        # Apply transformations
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        label = torch.tensor([label], dtype=torch.float32)

        return img1, img2, label

# 4. Creating dataloaders
train_dataset = LFWDataset(
    matches_csv=matchpairs_train_csv,
    mismatches_csv=mismatchpairs_train_csv,
    images_dir=images_dir,
    transform=transform
)

test_dataset = LFWDataset(
    matches_csv=matchpairs_test_csv,
    mismatches_csv=mismatchpairs_test_csv,
    images_dir=images_dir,
    transform=transform
)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=0)

#  implementing the Siamese Network
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=10),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # Output: (batch_size, 64, 59, 59)
            nn.Conv2d(64, 128, kernel_size=7),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # Output: (batch_size, 128, 26, 26)
            nn.Conv2d(128, 128, kernel_size=4),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),  # Output: (batch_size, 128, 11, 11)
            nn.Conv2d(128, 256, kernel_size=4),
            nn.ReLU(inplace=True),  # Output: (batch_size, 256, 8, 8)
            # No MaxPool here
        )
        self.fc = nn.Sequential(
            nn.Linear(256 * 8 * 8, 4096),  # input size
            nn.Sigmoid()
        )

    def forward_once(self, x):
        output = self.cnn(x)
        output = output.view(output.size(0), -1)  # flattenning the image
        output = self.fc(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2

# 6. Loss function and optimizer
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        # Using constrastive Loss function
        loss = torch.mean(
            (1 - label) * torch.pow(euclidean_distance, 2) +  # For matched pairs
            label * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2)  # For mismatched pairs
        )
        return loss

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseNetwork().to(device)
criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)


num_epochs = 5  # Adjust as needed
for epoch in range(1, num_epochs + 1):
    model.train()
    running_loss = 0.0
    for batch_idx, (img1, img2, label) in enumerate(train_loader):
        img1, img2, label = img1.to(device), img2.to(device), label.to(device)
        optimizer.zero_grad()
        output1, output2 = model(img1, img2)
        loss = criterion(output1, output2, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        print(f"Epoch [{epoch}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch}/{num_epochs}] completed. Average Loss: {epoch_loss:.4f}\n")
    
# 8. accuracy calculation
def evaluate(model, dataloader, device, threshold=1.0):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (img1, img2, label) in enumerate(dataloader):
            img1, img2, label = img1.to(device), img2.to(device), label.to(device)
            output1, output2 = model(img1, img2)
            euclidean_distance = F.pairwise_distance(output1, output2)
            # Predict similar if distance < threshold
            prediction = (euclidean_distance < threshold).float()
            correct += (prediction == label).sum().item()
            total += label.size(0)

            if (batch_idx + 1) % 10 == 0:
                print(f"Evaluation Batch [{batch_idx+1}/{len(dataloader)}]")

    accuracy = correct / total
    return accuracy

test_accuracy = evaluate(model, test_loader, device)
print(f"Test Accuracy: {test_accuracy * 10:.2f}%")

Epoch [1/5], Batch [1/138], Loss: 0.3268
Epoch [1/5], Batch [2/138], Loss: 1.8375
Epoch [1/5], Batch [3/138], Loss: 0.3281
Epoch [1/5], Batch [4/138], Loss: 1.0777
Epoch [1/5], Batch [5/138], Loss: 0.3257
Epoch [1/5], Batch [6/138], Loss: 0.6352
Epoch [1/5], Batch [7/138], Loss: 0.3866
Epoch [1/5], Batch [8/138], Loss: 0.4977
Epoch [1/5], Batch [9/138], Loss: 0.5233
Epoch [1/5], Batch [10/138], Loss: 0.3650
Epoch [1/5], Batch [11/138], Loss: 0.2257
Epoch [1/5], Batch [12/138], Loss: 0.3076
Epoch [1/5], Batch [13/138], Loss: 0.2805
Epoch [1/5], Batch [14/138], Loss: 0.4130
Epoch [1/5], Batch [15/138], Loss: 0.3232
Epoch [1/5], Batch [16/138], Loss: 0.3479
Epoch [1/5], Batch [17/138], Loss: 0.3024
Epoch [1/5], Batch [18/138], Loss: 0.2857
Epoch [1/5], Batch [19/138], Loss: 0.3265
Epoch [1/5], Batch [20/138], Loss: 0.2774
Epoch [1/5], Batch [21/138], Loss: 0.3592
Epoch [1/5], Batch [22/138], Loss: 0.2836
Epoch [1/5], Batch [23/138], Loss: 0.2872
Epoch [1/5], Batch [24/138], Loss: 0.2616
E

In [127]:
#  Saving the trained model
model_save_path = 'siamese_network.pth'
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")


Model saved to siamese_network.pth


In [125]:
inference_model = SiameseNetwork().to(device)

# Load the saved state dictionary
inference_model.load_state_dict(torch.load(model_save_path, map_location=device))
inference_model.eval()

# 12. Testing with two images
def compare_images(model, image1_path, image2_path, transform, device, threshold=1.0):
    # Load and preprocess images
    img1 = Image.open(image1_path).convert('RGB')
    img2 = Image.open(image2_path).convert('RGB')

    img1 = transform(img1).unsqueeze(0).to(device)
    img2 = transform(img2).unsqueeze(0).to(device)

    # Obtain embeddings
    with torch.no_grad():
        output1, output2 = model(img1, img2)

    # Compute Euclidean distance
    euclidean_distance = F.pairwise_distance(output1, output2)
    distance = euclidean_distance.item()

    # Make prediction
    is_same_person = distance < threshold

    # Output the result
    print(f"Euclidean Distance: {distance:.4f}")
    if is_same_person:
        print("The images are of the **same** person.")
    else:
        print("The images are of **different** people.")

    return distance, is_same_person

# Example usage:
# Paths to your test images
test_image1_path = '/Users/dionbaldsing/Downloads/dion/d1.jpeg'
test_image2_path = '/Users/dionbaldsing/Downloads/dion/d2.jpeg'

# Compare the images
distance, is_same = compare_images(
    model=inference_model,
    image1_path=test_image1_path,
    image2_path=test_image2_path,
    transform=transform,
    device=device,
    threshold=0.65  # Adjust the threshold if necessary
)

RuntimeError: Error(s) in loading state_dict for SiameseNetwork:
	Missing key(s) in state_dict: "cnn.1.weight", "cnn.1.bias", "cnn.1.running_mean", "cnn.1.running_var", "cnn.4.weight", "cnn.4.bias", "cnn.5.weight", "cnn.5.bias", "cnn.5.running_mean", "cnn.5.running_var", "cnn.8.weight", "cnn.8.bias", "cnn.9.running_mean", "cnn.9.running_var", "cnn.12.weight", "cnn.12.bias", "cnn.13.weight", "cnn.13.bias", "cnn.13.running_mean", "cnn.13.running_var", "fc.1.weight", "fc.1.bias", "fc.1.running_mean", "fc.1.running_var", "fc.4.weight", "fc.4.bias", "fc.5.weight", "fc.5.bias", "fc.5.running_mean", "fc.5.running_var", "fc.8.weight", "fc.8.bias", "fc.9.weight", "fc.9.bias", "fc.9.running_mean", "fc.9.running_var", "fc.12.weight", "fc.12.bias". 
	Unexpected key(s) in state_dict: "cnn.3.weight", "cnn.3.bias", "cnn.6.weight", "cnn.6.bias". 
	size mismatch for cnn.9.weight: copying a param with shape torch.Size([256, 128, 4, 4]) from checkpoint, the shape in current model is torch.Size([128]).
	size mismatch for cnn.9.bias: copying a param with shape torch.Size([256]) from checkpoint, the shape in current model is torch.Size([128]).
	size mismatch for fc.0.weight: copying a param with shape torch.Size([4096, 16384]) from checkpoint, the shape in current model is torch.Size([1024, 16384]).
	size mismatch for fc.0.bias: copying a param with shape torch.Size([4096]) from checkpoint, the shape in current model is torch.Size([1024]).