In [1]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt
import glob

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, ConcatDataset, random_split, Subset
# from torch.nn import Linear, Conv2d, MaxPool2d, Flatten, BatchNorm2d, LayerNorm

import torch.optim as optim

from torchvision import transforms

from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
%load_ext tensorboard

In [3]:
# GPU Details
if torch.cuda.is_available():
    print("CUDA is available. Details of available GPU(s):")
    num_gpus = torch.cuda.device_count()
    print(f"Number of GPU(s) available: {num_gpus}")
    for i in range(num_gpus):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
        print(f"  Memory Allocated: {torch.cuda.memory_allocated(i) / 1e9} GB")
        print(f"  Memory Cached: {torch.cuda.memory_reserved(i) / 1e9} GB")
else:
    print("CUDA is not available.")


CUDA is available. Details of available GPU(s):
Number of GPU(s) available: 1
GPU 0: NVIDIA GeForce RTX 4090
  Memory Allocated: 0.0 GB
  Memory Cached: 0.0 GB


In [4]:
    # Setup Paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [5]:
# Create Folder Structure
os.makedirs(POS_PATH, exist_ok=True)
os.makedirs(NEG_PATH, exist_ok=True)
os.makedirs(ANC_PATH, exist_ok=True)

In [6]:
# unzip Tar GZ Labelled Faces in the Wild Dataset
# dataset: https://vis-www.cs.umass.edu/lfw/
# !tar -xf lfw.tgz

In [7]:
# move images from the dataset directory to data\negative
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         FROM_PATH = os.path.join('lfw', directory, file)
#         TO_PATH = os.path.join(NEG_PATH, file)
#         os.replace(FROM_PATH, TO_PATH)

In [8]:
# universally unique identifier for naming collected images
import uuid

In [9]:
# # Collect positive and anchor images
# cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)

# while cap.isOpened():
#     _, frame = cap.read()
    
#     # display image
#     cv2.imshow('Frame', frame)
    
#     # extract 250, 250 block from image
#     frame = frame[60:310, 200:450 :]
    
#     # display extracted section
#     cv2.imshow('Tiny Frame' , frame)
    
#     # Collect anchors
#     if cv2.waitKey(1) & 0xFF == ord('a'):
#         # generate file path
#         IMG_PATH = os.path.join(ANC_PATH, "{}.jpg".format(uuid.uuid1()))
#         # save file to anchors
#         cv2.imwrite(IMG_PATH, frame)
    
#     # Collect positives
#     if cv2.waitKey(1) & 0xFF == ord('p'):
#         # generate file path
#         IMG_PATH = os.path.join(POS_PATH, "{}.jpg".format(uuid.uuid1()))
#         # save file to positives
#         cv2.imwrite(IMG_PATH, frame)
    
#     # Break loop
#     if cv2.waitKey(1) & 0xFF == ord('q'):
#         break

# # release webcam and destroy window
# cap.release()
# cv2.destroyAllWindows()

In [10]:
# grab first 300 file paths in anchors, negatives and positives
anchor_files = glob.glob(ANC_PATH + '\\*.jpg')[:320]
negative_files = glob.glob(NEG_PATH + '\\*.jpg')[:320]
positive_files = glob.glob(POS_PATH + '\\*.jpg')[:320]

In [11]:
class ImageDataset(Dataset):
    def __init__(self, anchor_paths, other_paths, label):
        """
        anchor_paths: List of paths to anchor images
        other_paths: List of paths to either positive or negative images
        label: 0 or 1 (0 for negative pairs, 1 for positive pairs)
        """
        self.anchor_paths = anchor_paths
        self.other_paths = other_paths
        self.label = label
        self.transform = transforms.Compose([
            # Convert to PIL Image
            transforms.ToPILImage(),
            # Resize to match the Siamese paper input size
            transforms.Resize((105, 105)),
            # Convert to PyTorch tensor of shape (channels, height, width) which also scales values between [0, 1]
            transforms.ToTensor(),
        ])
        
    def __len__(self):
        return len(self.anchor_paths)
    
    def __getitem__(self, index):
        # Load the anchor image
        anchor_img = cv2.imread(self.anchor_paths[index])
        anchor_img = cv2.cvtColor(anchor_img, cv2.COLOR_BGR2RGB)  # Convert from BGR to RGB
        anchor_img = self.transform(anchor_img)
        
        # Load the other image (positive or negative)
        other_img = cv2.imread(self.other_paths[index])
        other_img = cv2.cvtColor(other_img, cv2.COLOR_BGR2RGB)
        other_img = self.transform(other_img)
        
        return anchor_img, other_img, torch.tensor(self.label, dtype=torch.float32)

In [12]:
# Create the DataSets
negative_dataset = ImageDataset(anchor_files, negative_files, 0)
positive_dataset = ImageDataset(anchor_files, positive_files, 1)

In [14]:
# Concatenate datasets
combined_dataset = ConcatDataset([positive_dataset, negative_dataset])

# Calculate absolutes for a 70-30 split
total_size = len(combined_dataset)
train_size = int(0.7 * total_size)
test_size = total_size - train_size

# Randomly split dataset into train and test
train_dataset, test_dataset = random_split(combined_dataset, [train_size, test_size])

# Create DataLoader for training data
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# Create DataLoader for testing data
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False) # no shuffling for repeatability

In [17]:
class Embdedding(nn.Module):
    def __init__(self):
        super(Embdedding, self).__init__()
        
        # Define layers of the network
        self.embedding = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=10),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=7),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=4),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=4),
            nn.Flatten(),
            nn.Linear(256*6*6, 4096),
            nn.Sigmoid(),
        )
        
    def forward(self, x):
        x = self.embedding(x)
        return x
        

In [18]:
model = Embdedding().to('cuda')
model

Embdedding(
  (embedding): Sequential(
    (0): Conv2d(3, 64, kernel_size=(10, 10), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
    (3): Conv2d(64, 128, kernel_size=(7, 7), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
    (6): Conv2d(128, 128, kernel_size=(4, 4), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
    (9): Conv2d(128, 256, kernel_size=(4, 4), stride=(1, 1))
    (10): Flatten(start_dim=1, end_dim=-1)
    (11): Linear(in_features=9216, out_features=4096, bias=True)
    (12): Sigmoid()
  )
)

In [19]:
# # print out tensorflow like summary using torchsummary
# summary(model, (3, 105, 105))

In [21]:
class L1Distance(nn.Module):
    def __init__(self):
        super(L1Distance, self).__init__()
        
    def forward(self, input_embedding, validation_embedding):
        return torch.abs(input_embedding - validation_embedding)

In [24]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        # Embedding layers
        self.embedding = Embdedding()
        
        # Custom layer to calculate l1 distance
        self.l1dist = L1Distance()
        
        # Classification layer
        self.seq = nn.Sequential(
            nn.Linear(4096, 1),
            nn.Sigmoid()
        )
      
    def forward(self, anchor_image, validation_image):
        # Create embedding for anchor image
        anchor_embedding = self.embedding(anchor_image)
        
        # Create embedding for image to validate
        validation_embedding = self.embedding(validation_image)
        
        # Calculate L1 Distance
        distances = self.l1dist(anchor_embedding, validation_embedding)
        
        # Classify
        classification = self.seq(distances)
        
        return classification    
        

In [25]:
model = SiameseNetwork()
model
# summary(model, input_data=[(3, 105, 105), (3, 105, 105)])

SiameseNetwork(
  (embedding): Embdedding(
    (embedding): Sequential(
      (0): Conv2d(3, 64, kernel_size=(10, 10), stride=(1, 1))
      (1): ReLU()
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
      (3): Conv2d(64, 128, kernel_size=(7, 7), stride=(1, 1))
      (4): ReLU()
      (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
      (6): Conv2d(128, 128, kernel_size=(4, 4), stride=(1, 1))
      (7): ReLU()
      (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True)
      (9): Conv2d(128, 256, kernel_size=(4, 4), stride=(1, 1))
      (10): Flatten(start_dim=1, end_dim=-1)
      (11): Linear(in_features=9216, out_features=4096, bias=True)
      (12): Sigmoid()
    )
  )
  (l1dist): L1Distance()
  (seq): Sequential(
    (0): Linear(in_features=4096, out_features=1, bias=True)
    (1): Sigmoid()
  )
)

In [28]:
def train_step(model, train_loader, criterion, optimizer, epoch, writer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for batch_idx, (anchors, others, labels) in enumerate(train_loader):
        anchors, others, labels = anchors.cuda(), others.cuda(), labels.unsqueeze(1).cuda()
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass to get output
        outputs = model(anchors, others)
        # Calculate loss
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        # Calculate gradient
        loss.backward()
        # Update parameters
        optimizer.step()
        
        # Calculate Accuracy
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
        if batch_idx % 10 == 6: # Log every 7 batches
            writer.add_scalar('Training Loss', running_loss / 7, epoch * len(train_loader) + batch_idx)
            writer.add_scalar('Training Accuracy', (correct / total) * 100, epoch * len(train_loader) + batch_idx)
            running_loss = 0.0
            correct = 0
            total = 0

In [29]:
def evaluate(model, validation_loader, criterion, epoch, writer):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch_idx, (anchors, others, labels) in enumerate(validation_loader):
            anchors, others, labels = anchors.cuda(), others.cuda(), labels.unsqueeze(1).cuda()
            
            # Forward pass to get output
            outputs = model(anchors, others)
            # Calculate loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    avg_loss = total_loss / len(validation_loader)
    accuracy = (correct / total) * 100
    writer.add_scalar('Validation Loss', avg_loss, epoch)
    writer.add_scalar('Validation Accuracy', accuracy, epoch)
    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    # return avg_loss, accuracy   
            

In [30]:
model = SiameseNetwork().cuda()

# Binary Cross-Entropy loss for the output of the final sigmoid layer
criterion = nn.BCELoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Initialize TensorBoard writer
writer = SummaryWriter('runs/siamese_experiment')

In [31]:
epochs = 50

for epoch in range(1, epochs + 1):
    train_step(model, train_loader, criterion, optimizer, epoch, writer)
    evaluate(model, test_loader, criterion, epoch, writer)
    
writer.close()

Validation Loss: 0.2125, Accuracy: 94.27%
Validation Loss: 0.0923, Accuracy: 98.44%
Validation Loss: 0.0621, Accuracy: 98.96%
Validation Loss: 0.0511, Accuracy: 99.48%
Validation Loss: 0.0238, Accuracy: 99.48%
Validation Loss: 0.0154, Accuracy: 99.48%
Validation Loss: 0.0196, Accuracy: 99.48%
Validation Loss: 0.0155, Accuracy: 99.48%
Validation Loss: 0.0248, Accuracy: 99.48%
Validation Loss: 0.0131, Accuracy: 98.96%
Validation Loss: 0.0128, Accuracy: 99.48%
Validation Loss: 0.0117, Accuracy: 99.48%
Validation Loss: 0.0113, Accuracy: 99.48%
Validation Loss: 0.0104, Accuracy: 99.48%
Validation Loss: 0.0088, Accuracy: 99.48%
Validation Loss: 0.0084, Accuracy: 99.48%
Validation Loss: 0.0078, Accuracy: 99.48%
Validation Loss: 0.0070, Accuracy: 99.48%
Validation Loss: 0.0063, Accuracy: 99.48%
Validation Loss: 0.0062, Accuracy: 100.00%
Validation Loss: 0.0055, Accuracy: 100.00%
Validation Loss: 0.0048, Accuracy: 100.00%
Validation Loss: 0.0049, Accuracy: 100.00%
Validation Loss: 0.0042, Accur

In [37]:
%tensorboard --logdir=runs

Reusing TensorBoard on port 6006 (pid 22296), started 2 days, 6:11:22 ago. (Use '!kill 22296' to kill it.)

In [33]:
# Save the entire model
torch.save(model, 'Siamese_Network.pth')

In [34]:
# Save the model state and metrics
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'validation_loss': 0.0219,
    'validation_accuracy': "98.96%",
}, 'Siamese_Network_State.pth')

In [35]:
# # To load the entire model
# model = torch.load('Siamese_Network.pth')

# # To load the model from state dictionary
# model = SiameseNetwork() # Re-instantiate the model using its class
# checkpoint = torch.load('model_checkpoint.pth')
# model.load_state_dict(checkpoint['model_state_dictionary'])
# model.cuda()

In [None]:
# for inference
model.eval()

def sample_gen(test_loader=test_loader):
    for anchors, others, labels in test_loader:
        anchors, others, labels = anchors.cuda(), others.cuda(), labels.unsqueeze(1).cuda()
        
        outputs = model(anchors, others)
        
        for idx, output in enumerate(outputs):
            anchor = anchors[idx]
            other = others[idx]
            output = outputs[idx]
            
            fig, ax = plt.subplots(1, 2)
            ax[0].imshow(anchor.cpu().squeeze().permute(1, 2, 0))
            ax[1].imshow(other.cpu().squeeze().permute(1, 2, 0))
            
            print('Confidence: ' + str(round(output.item())))
            
            yield None # to make the method a generator
sample = sample_gen()

In [None]:
next(sample)