# Import files and directories

In [42]:
from PIL import Image
import torch
import os
import uuid
import cv2
import torchvision.transforms.functional as F
import random
from torch.utils.data import Dataset
from torchvision import transforms
from torch.utils.data import ConcatDataset, DataLoader, SubsetRandomSampler
from torch import nn
import math

In [43]:
POS_PATH = os.path.join("..","test",'data','positive')
NEG_PATH = os.path.join("..","test",'data','negetive')
ANC_PATH = os.path.join("..","test",'data','anchor')

In [8]:
os.makedirs(POS_PATH)
os.makedirs(NEG_PATH)
os.makedirs(ANC_PATH)

# Wild Dataset

In [14]:
# Establish a connection to the webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    # Cut down frame to 250x250px
    frame = frame[120:120+250,200:200+250, :]
    
    # Collect anchors 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Create the unique file path 
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out anchor image
        cv2.imwrite(imgname, frame)
    
    # Collect positives
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Create the unique file path 
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Write out positive image
        cv2.imwrite(imgname, frame)
    
    # Show image back to screen
    cv2.imshow('Image Collection', frame)
    
    # Breaking gracefully
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
# Release the webcam
cap.release()
# Close the image show frame
cv2.destroyAllWindows()

# Data Augmention and Dataloaders

In [44]:
import torchvision.transforms.functional as F
import random
def data_aug(img):

    data = []
    
    img = F.adjust_brightness(img, brightness_factor=1.05)
    data.append(img)

    img = F.adjust_contrast(img, contrast_factor=torch.empty(1).uniform_(0.6, 1).item())
    data.append(img)
            
    img = F.hflip(img)
    data.append(img)
    
    img = F.adjust_saturation(img, saturation_factor=torch.empty(1).uniform_(0.9, 1).item())
    data.append(img)

    return data

In [45]:
count = 0
files = os.listdir(os.path.join(POS_PATH))
length = len(os.listdir(NEG_PATH)) - len(files)     

while count < length:
    for file_name in files:
        img_path = os.path.join(POS_PATH, file_name)
        img = Image.open(img_path).convert('RGB')
        img_tensor = F.to_tensor(img)
        augmented_images = data_aug(img_tensor)
        for i, image in enumerate(augmented_images):
            image = F.to_pil_image(image)
            image.save(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())))
        else:
            count+=4
            
files = os.listdir(os.path.join(POS_PATH))            
while len(os.listdir(NEG_PATH)) - len(files)  >0:
    file_name = files[len(files) - 4]

    img_path = os.path.join(POS_PATH, file_name)
    img = Image.open(img_path).convert('RGB')
    img_tensor = F.to_tensor(img)
    image.save(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())))
    files = os.listdir(os.path.join(POS_PATH))   


In [46]:
count = 0
files = os.listdir(os.path.join(ANC_PATH))
length = len(os.listdir(NEG_PATH)) - len(files)     

while count < length:
    for file_name in files:
        img_path = os.path.join(ANC_PATH, file_name)
        img = Image.open(img_path).convert('RGB')
        img_tensor = F.to_tensor(img)
        augmented_images = data_aug(img_tensor)
        for i, image in enumerate(augmented_images):
            image = F.to_pil_image(image)
            image.save(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())))
        else:
            count+=4
            
files = os.listdir(os.path.join(ANC_PATH))            
while len(os.listdir(NEG_PATH)) - len(files)  >0:
    file_name = files[len(files) - 4]

    img_path = os.path.join(ANC_PATH, file_name)
    img = Image.open(img_path).convert('RGB')
    img_tensor = F.to_tensor(img)
    image.save(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())))
    files = os.listdir(os.path.join(ANC_PATH))   


In [47]:
class MergeImageDataset(Dataset):
    def __init__(self,ANC_PATH  ,POS_PATH, NEG_PATH, types ,transform = None):
        
        self.POS_PATH = POS_PATH
        self.NEG_PATH = NEG_PATH
        self.ANC_PATH = ANC_PATH
        
        self.types = types
        
        self.transform = transform
        
        self.POS_IMG = os.listdir(POS_PATH)
        self.NEG_IMG = os.listdir(NEG_PATH)
        self.ANC_IMG = os.listdir(ANC_PATH)
        
    def __len__(self):
        return len(self.ANC_IMG)

    def __getitem__(self, idx):
        
        anc_dir  = os.path.join(self.ANC_PATH, self.ANC_IMG[idx])
        anc_image = Image.open(anc_dir).convert('RGB')
        
        if self.types == 1: 
            pos_dir  = os.path.join(self.POS_PATH, self.POS_IMG[idx])
            pos_image = Image.open(pos_dir).convert('RGB')

        if self.types == 0: 
            neg_dir  = os.path.join(self.NEG_PATH, self.NEG_IMG[idx])
            neg_image = Image.open(neg_dir).convert('RGB')
        
        data = [anc_image, pos_image if self.types == 1 else neg_image, torch.ones(1) if self.types == 1 else torch.zeros(1) ]

        if self.transform:
            data[0] = self.transform(data[0])
            data[1] = self.transform(data[1])
        
        return data

In [48]:
transform = transforms.Compose([transforms.Resize((100,100)),
                                transforms.ToTensor()])
positive = MergeImageDataset(ANC_PATH, POS_PATH, NEG_PATH, types = 1, transform = transform)
negtive = MergeImageDataset(ANC_PATH, POS_PATH, NEG_PATH, types = 0, transform = transform)
data = ConcatDataset([negtive,positive])

In [49]:
num_samples = len(data)
indices = list(range(num_samples))
random.shuffle(indices)
split = int(math.floor(0.4 * num_samples))  # Use 20% of data for validation
print(split)
train_indices, val_indices = indices[split:], indices[:split]

# Create samplers for the training and validation parts
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
train_loader = DataLoader(data, batch_size=32, sampler=train_sampler)
val_loader = DataLoader(data, batch_size=32, sampler=val_sampler)

1600


# Model

In [50]:
class Embedding(nn.Module):
    def __init__(self):
            super(Embedding, self).__init__()

            self.conv_1 = nn.Conv2d(3, 64,kernel_size=(10, 10))
            self.relu_1 = nn.ReLU()
            self.maxpool_1 = nn.MaxPool2d(2, stride = 2 ,padding = 1)
            
            self.conv_2 =  nn.Conv2d(64, 128,kernel_size=(7, 7))
            self.relu_2 =  nn.ReLU()
            self.maxpool_2 = nn.MaxPool2d(2 , 2)
            
            self.conv_3 =  nn.Conv2d(128, 128,kernel_size=(4, 4))
            self.relu_3 =  nn.ReLU()
            self.maxpool_3 =  nn.MaxPool2d(2, 2, padding = 1)
            
            self.conv_4 =  nn.Conv2d(128, 256,kernel_size=(4, 4))
            self.relu_4 =  nn.ReLU()
            self.flatten = nn.Flatten(start_dim=1)
            
            self.normalization_64 =  nn.BatchNorm2d(64)
            self.normalization_128 =  nn.BatchNorm2d(128)

            self.linear = nn.Linear(256 * 6 * 6, 4096)
        
    def forward(self, x):
        x = self.conv_1(x)
        x = self.relu_1(x)
        x = self.maxpool_1(x)
        x = self.normalization_64(x)
        
        x = self.conv_2(x)
        x = self.relu_2(x)
        x = self.maxpool_2(x)
        x = self.normalization_128(x)
        
        x = self.conv_3(x)
        x = self.relu_3(x)
        x = self.maxpool_3(x)
        x = self.normalization_128(x)
        
        x = self.conv_4(x)
        x = self.relu_4(x)
        
        x = self.flatten(x)
        x = self.linear(x)
        
        return x
        

In [51]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        self.embeder = Embedding()
        self.classifier = nn.Linear(4096,1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, input_image, validation_image):
        input_image = self.embeder(input_image)
        validation_image = self.embeder(validation_image)
        result = torch.abs(input_image - validation_image)
        x = self.classifier(result)
        x = self.sigmoid(x)
        return x 
        

In [52]:
def round(data):
    for i, value in enumerate(data):
        data[i] = 0 if value < 0.5 else 1
    return data

In [53]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchmetrics


torch.manual_seed(34)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Create an instance of the SiameseNetwork
siamese_net = SiameseNetwork().to(device)

# Define the loss function (e.g., Binary Cross Entropy)
criterion =  nn.BCELoss()

# Define the optimizer (e.g., Stochastic Gradient Descent)
optimizer = optim.Adam(siamese_net.parameters(), lr=1e-4)

# Training loop
num_epochs = 5

print(len(train_loader))

accuracies = []
losses = []
for epoch in range(num_epochs):
    train_loss = 0.0
    total_samples = 0
    # Iterate over the training dataset
    for i, (input_image, validation_image, label) in enumerate(train_loader):     
        optimizer.zero_grad()  # Zero the gradients
        
        input_image = input_image.to(device)
        validation_image = validation_image.to(device)
        label = label.to(device)
        # Forward pass
        output = siamese_net(input_image, validation_image)

        # Compute the loss
        loss = criterion(output, label)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Update the running loss
        train_loss += loss.item()
        total_samples += label.size(0)  # Increment total_samples by the batch size
    
    avg_train_loss = train_loss / total_samples
    
    print(f"Epoch [{epoch+1}/{num_epochs}]")

    
        # Evaluate the model on the validation set

    acc_batches = 0
    val_loss = 0.0
    val_total = 0
    total_samples = 0
    correct_val = 0
    val_correct = 0
    with torch.no_grad():
        for i, (input_image, validation_image, label) in enumerate(val_loader):
            
            input_image = input_image.to(device)
            validation_image = validation_image.to(device)
            label = label.to(device)
            
            # Move inputs and labels to the GPU
            output = siamese_net(input_image, validation_image)
            loss = criterion(output, label)
            result = round(output)
            
            val_loss += loss.item()
            total_samples += label.size(0)  # Increment total_samples by the batch size
            val_correct += (result == label).sum().item()
   
            
        avg_val_loss = val_loss / total_samples
        
        print(f"Val Total Loss %f :" %(val_loss))
        print(f"Val Average Loss %f :" % (avg_val_loss))
        print(f"Val Accuracy %.2f :" % (100 * (val_correct / total_samples ) ))
        accuracies.append(100 * (val_correct / total_samples ))
        losses.append(avg_val_loss)


75
Epoch [1/10]
Val Total Loss 1.973756 :
Val Average Loss 0.001234 :
Val Accuracy 98.94 :
Epoch [2/10]
Val Total Loss 0.762965 :
Val Average Loss 0.000477 :
Val Accuracy 99.56 :
Epoch [3/10]
Val Total Loss 0.477422 :
Val Average Loss 0.000298 :
Val Accuracy 99.75 :
Epoch [4/10]
Val Total Loss 0.618737 :
Val Average Loss 0.000387 :
Val Accuracy 99.62 :
Epoch [5/10]
Val Total Loss 0.443144 :
Val Average Loss 0.000277 :
Val Accuracy 99.69 :
Epoch [6/10]
Val Total Loss 0.451739 :
Val Average Loss 0.000282 :
Val Accuracy 99.75 :
Epoch [7/10]
Val Total Loss 0.544950 :
Val Average Loss 0.000341 :
Val Accuracy 99.75 :
Epoch [8/10]
Val Total Loss 0.527080 :
Val Average Loss 0.000329 :
Val Accuracy 99.75 :
Epoch [9/10]
Val Total Loss 0.502910 :
Val Average Loss 0.000314 :
Val Accuracy 99.75 :
Epoch [10/10]
Val Total Loss 0.515173 :
Val Average Loss 0.000322 :
Val Accuracy 99.75 :


In [57]:
TEST_NEG_PATH = os.path.join("..","test","testing","negetive")
TEST_POS_PATH = os.path.join("..","test","testing","positive")
TEST_ANC_PATH = os.path.join("..","test","testing","anchor")

transform = transforms.Compose([transforms.Resize((100,100)),
                                transforms.ToTensor()])

postive = MergeImageDataset(TEST_ANC_PATH, TEST_POS_PATH, TEST_NEG_PATH, types = 1, transform = transform)
negtive = MergeImageDataset(TEST_ANC_PATH, TEST_POS_PATH, TEST_NEG_PATH, types = 0, transform = transform)
data = ConcatDataset([negtive,positive])

In [58]:
test_loader_pos = DataLoader(postive)
test_loader_neg = DataLoader(negtive)

In [59]:
with torch.no_grad():
    val_correct = 0
    total_samples = 0
    for i, (input_image, validation_image, label) in enumerate(test_loader_pos):
            input_image = input_image.to(device)
            validation_image = validation_image.to(device)
            output = siamese_net(input_image, validation_image)
            result = 1 if output >= 0.5 else 0
            val_correct += (result == label).sum().item()
            total_samples += label.size(0)
            print(f"predicted {result} and true is {label} with this output{output}")
            print('-------------------------------------------')

    print("---------------------NEGATIVES-----------------------------------------")
    for i, (input_image, validation_image, label) in enumerate(test_loader_neg):
            input_image = input_image.to(device)
            validation_image = validation_image.to(device)
            output = siamese_net(input_image, validation_image)
            result = 1 if output >= 0.5 else 0
            val_correct += (result == label).sum().item()
            total_samples += label.size(0)
            print(f"predicted {result} and true is {label} with this output{output}")
            print('-------------------------------------------')
            
    print(f"Accuracy %.2f :" % (100 * (val_correct / total_samples) ))

predicted 0 and true is tensor([[1.]]) with this outputtensor([[0.0022]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[4.5994e-05]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[0.0269]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[1.7364e-05]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[0.0006]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[7.5005e-05]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with this outputtensor([[0.0067]], device='cuda:0')
-------------------------------------------
predicted 0 and true is tensor([[1.]]) with