In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image
import random

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary

In [2]:
device = ('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
#folder structure
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
#make directories
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

In [None]:
#dataset - https://www.kaggle.com/datasets/jessicali9530/lfw-dataset?resource=download
#extracting lfw dataset
!tar -xf lfw.zip

In [None]:
# move lfw images to data/negative
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH) 

In [5]:
#uuid to generate unique image names
import uuid

In [4]:
#image capturing for anchors, positives
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()

    #cut down frame to 250x250
    frame = frame = frame[120:120+250, 200:200+250, :]

    #collect anchors
    if cv2.waitKey(1) & 0xFF == ord('a'):
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
        
    #collect positives
    if cv2.waitKey(1) & 0xFF == ord('p'):
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)
    
    cv2.imshow('Image Collection', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [6]:
#load data paths
anchor = [os.path.join(ANC_PATH, f) for f in os.listdir(ANC_PATH) if f.endswith(".jpg")][:300]
positive = [os.path.join(POS_PATH, f) for f in os.listdir(POS_PATH) if f.endswith(".jpg")][:300]
negative = [os.path.join(NEG_PATH, f) for f in os.listdir(NEG_PATH) if f.endswith(".jpg")][:300]

In [7]:
#preprocessing
preprocess = transforms.Compose([
    transforms.Resize((100,100)),
    transforms.ToTensor()
])

def load_and_preprocess(file_path):
    img = Image.open(file_path).convert("RGB")
    img = preprocess(img)
    return img

In [8]:
img1 = load_and_preprocess('data\\anchor\\74e85329-d280-11f0-a34d-80a3977a1ccc.jpg')

In [9]:
img1.min()

tensor(0.2784)

In [10]:
#creating labelled dataset
positives = [(a, p, 1) for a, p in zip(anchor, positive)]
negatives = [(a, n, 0) for a, n in zip(anchor, negative)]

data = positives + negatives 

In [11]:
#splitting dataset
random.shuffle(data)

split = int(0.7 * len(data))
train_data = data[:split]
test_data = data[split:]

In [12]:
#dataset
class SiameseDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        img1_path, img2_path, label = self.pairs[idx]
        img1 = load_and_preprocess(img1_path)
        img2 = load_and_preprocess(img2_path)

        return img1, img2, torch.tensor(label, dtype=torch.float32)

In [13]:
#loading dataset
train_dataset = SiameseDataset(train_data)
test_dataset = SiameseDataset(test_data)

In [14]:
#dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [15]:
#embedding layer
class EmbeddingNet(nn.Module):
    def __init__(self):
        super().__init__()

        #first block
        self.conv1 = nn.Conv2d(3, 64, kernel_size=10)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        #second block
        self.conv2 = nn.Conv2d(64, 128, kernel_size=7)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        #third block
        self.conv3 = nn.Conv2d(128, 128, kernel_size=4)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        #final embedding block
        self.conv4 = nn.Conv2d(128, 256, kernel_size=4)

        #fully connected linear layer
        self.fc = nn.Linear(256 * 5 * 5, 4096)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = self.pool3(F.relu(self.conv3(x)))
        x = F.relu(self.conv4(x))

        x = x.view(x.size(0), -1) #flatten
        x = torch.sigmoid(self.fc(x)) #dense (4096, sigmoid)

        return x

In [16]:
embedding = EmbeddingNet()

In [17]:
summary(embedding, input_size=(3, 100, 100))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 91, 91]          19,264
         MaxPool2d-2           [-1, 64, 45, 45]               0
            Conv2d-3          [-1, 128, 39, 39]         401,536
         MaxPool2d-4          [-1, 128, 19, 19]               0
            Conv2d-5          [-1, 128, 16, 16]         262,272
         MaxPool2d-6            [-1, 128, 8, 8]               0
            Conv2d-7            [-1, 256, 5, 5]         524,544
            Linear-8                 [-1, 4096]      26,218,496
Total params: 27,426,112
Trainable params: 27,426,112
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.11
Forward/backward pass size (MB): 7.26
Params size (MB): 104.62
Estimated Total Size (MB): 112.00
----------------------------------------------------------------


In [18]:
#siamese l1 distance layer
class L1Dist(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, input_embedding, validation_embedding):
        return torch.abs(input_embedding - validation_embedding)

In [19]:
#siamese model
class SiameseNetwork(nn.Module):
    def __init__(self, embedding_model):
        super().__init__()

        self.embedding = embedding_model
        self.l1 = L1Dist()
        self.classifier = nn.Sequential(
            nn.Linear(4096, 1),
            nn.Sigmoid()
        )

    def forward(self, img1, img2):
        #embeddings
        emb1 = self.embedding(img1)
        emb2 = self.embedding(img2)

        #L1 distance
        distance = self.l1(emb1, emb2)

        #classifier
        output = self.classifier(distance)

        return output

In [20]:
#final siamese model
siamese_model = SiameseNetwork(embedding_model=embedding)

In [21]:
#loss function
criterion = nn.BCELoss()
#optimizer
optimizer = optim.Adam(siamese_model.parameters(), lr=0.0001)
epochs = 50

In [22]:
#create checkpoint directory
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

os.makedirs(checkpoint_dir, exist_ok=True)

In [23]:
#saving checkpoint
torch.save({
    'epoch': epochs,
    'model_state_dict': siamese_model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict()
}, "training_checkpoints/ckpt")

checkpoint = torch.load("training_checkpoints/ckpt")
siamese_model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epochs = checkpoint['epoch']

In [24]:
#training step
def train_step(batch, model, criterion, optimizer):
    model.train()

    img1, img2, labels = batch      #unpacking
    img1, img2, labels = img1.to(device), img2.to(device), labels.to(device)

    optimizer.zero_grad()         

    outputs = model(img1, img2)     #forward pass 

    loss = criterion(outputs, labels.unsqueeze(1))   #BCE loss requires shape (B,1)
    
    loss.backward()               
    optimizer.step()                

    return loss.item()

In [25]:
#training loop
def train(model, train_loader, criterion, optimizer, epochs):
    for epoch in range(1, epochs + 1):
        epoch_loss = 0.0

        print(f"\nEpoch {epoch}/{epochs}")

        for batch_idx, batch in enumerate(train_loader):
            loss = train_step(batch, model, criterion, optimizer)
            epoch_loss += loss

            print(f"Batch {batch_idx+1}/{len(train_loader)} | Loss: {loss:.4f}")

        avg_loss = epoch_loss / len(train_loader)
        print(f"→ Epoch {epoch} Average Loss: {avg_loss:.4f}")

In [33]:
train(siamese_model, train_loader, criterion, optimizer, epochs)


Epoch 1/50
Batch 1/14 | Loss: 0.6658
Batch 2/14 | Loss: 0.6387
Batch 3/14 | Loss: 0.5964
Batch 4/14 | Loss: 0.5323
Batch 5/14 | Loss: 0.5686
Batch 6/14 | Loss: 0.5947
Batch 7/14 | Loss: 0.4655
Batch 8/14 | Loss: 0.3756
Batch 9/14 | Loss: 0.4273
Batch 10/14 | Loss: 0.2563
Batch 11/14 | Loss: 0.3962
Batch 12/14 | Loss: 0.3203
Batch 13/14 | Loss: 0.2781
Batch 14/14 | Loss: 0.2625
→ Epoch 1 Average Loss: 0.4556

Epoch 2/50
Batch 1/14 | Loss: 0.3931
Batch 2/14 | Loss: 0.3871
Batch 3/14 | Loss: 0.2609
Batch 4/14 | Loss: 0.2451
Batch 5/14 | Loss: 0.2123
Batch 6/14 | Loss: 0.2913
Batch 7/14 | Loss: 0.2795
Batch 8/14 | Loss: 0.1457
Batch 9/14 | Loss: 0.2578
Batch 10/14 | Loss: 0.3228
Batch 11/14 | Loss: 0.2225
Batch 12/14 | Loss: 0.2058
Batch 13/14 | Loss: 0.2067
Batch 14/14 | Loss: 0.0378
→ Epoch 2 Average Loss: 0.2477

Epoch 3/50
Batch 1/14 | Loss: 0.2685
Batch 2/14 | Loss: 0.1643
Batch 3/14 | Loss: 0.1274
Batch 4/14 | Loss: 0.1315
Batch 5/14 | Loss: 0.0974
Batch 6/14 | Loss: 0.1874
Batch 7/

In [34]:
#import metrics
from sklearn.metrics import precision_score, recall_score

In [35]:
#get one batch from test loader
test_batch = next(iter(test_loader))

test_img1, test_img2, y_true = test_batch
test_img1 = test_img1.to(device)
test_img2 = test_img2.to(device)
y_true_np = y_true.numpy()        # convert to numpy for sklearn

In [36]:
#evaluation
siamese_model.eval()
with torch.no_grad():
    y_hat = siamese_model(test_img1, test_img2).cpu().numpy()

In [45]:
#adding threshold of 50% to get 0/1 result
y_pred = (y_hat > 0.5).astype(int)
print("Predictions:", y_pred.tolist())

Predictions: [[1], [0], [0], [1], [1], [1], [0], [0], [0], [1], [1], [0], [1], [0], [1], [0], [1], [0], [1], [0], [1], [0], [1], [1], [0], [1], [0], [1], [0], [0], [1], [1]]


In [46]:
#model metrics
precision = precision_score(y_true_np, y_pred)
recall = recall_score(y_true_np, y_pred)

print("Precision:", precision)
print("Recall:", recall)

Precision: 1.0
Recall: 1.0


In [39]:
#saving model
torch.save({
    "model_state_dict": siamese_model.state_dict()
}, "siamese_model.pth")

In [None]:
#reloading model

#recreate embedding model
embedding_model = EmbeddingNet()

#recreate siamese model
loaded_model = SiameseNetwork(embedding_model)

#load weights
checkpoint = torch.load("siamese_model.pth", map_location=device)
loaded_model.load_state_dict(checkpoint["model_state_dict"])

loaded_model.to(device)
loaded_model.eval()

In [44]:
print(siamese_model.load_state_dict(checkpoint["model_state_dict"], strict=True))

<All keys matched successfully>
