# Neural Network - Similiarity Function

This notebook details the process of choosing the similarity function that determines the images that are similar to a given image.
The implementation of the network is based on the Siamese network implementation. This class of networks is known to be more robust to class imbalance, so it fits the data on which we train.

In [4]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm

  Referenced from: <5AA8DD3D-A2CC-31CA-8060-88B4E9C18B09> /Users/naomi/miniconda3/envs/Lab2_env/lib/python3.10/site-packages/torchvision/image.so
  warn(


In [5]:
# load the data
data_paths = pd.read_csv('datasets/house_styles/sampled_paired_labels_shuffled.csv')
data_paths = data_paths[0:150][['image1_path', 'image2_path', 'similarity']]
print(data_paths.head())

                                         image1_path  \
0  (datasets/house_styles/all_images/001_d2c7428a...   
1  (datasets/house_styles/all_images/453_d7b5d246...   
2  (datasets/house_styles/all_images/116_32f01ef6...   
3  (datasets/house_styles/all_images/301_b73b9663...   
4  (datasets/house_styles/all_images/042_06b56791...   

                                         image2_path  similarity  
0  (datasets/house_styles/all_images/366_08eff319...         3.0  
1  (datasets/house_styles/all_images/122_e44a0cb3...         0.0  
2  (datasets/house_styles/all_images/174_55a7b3f9...         0.0  
3  (datasets/house_styles/all_images/116_32f01ef6...         0.0  
4  (datasets/house_styles/all_images/069_d3bedc1f...         1.0  


In [6]:
# split the data into training and testing
train_data = data_paths.sample(frac=0.8, random_state=42)
test_data = data_paths.drop(train_data.index)

print("Train: ", train_data.shape, "Test: ", test_data.shape)

Train:  (120, 3) Test:  (30, 3)


In [7]:
class ImageSimilarityDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.data = dataframe
        self.transform = transform
        self.master_path = ''

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Load images from the paths
        image1_path = self.master_path + self.data.iloc[idx, 0].strip("()")
        image2_path = self.master_path + self.data.iloc[idx, 1].strip("()")
        
        # Load images
        image1 = Image.open(image1_path).convert("RGB")
        image2 = Image.open(image2_path).convert("RGB")
        
        # Apply transforms if provided
        if self.transform:
            image1 = self.transform(image1)
            image2 = self.transform(image2)
        
        # Get similarity score
        similarity = self.data.iloc[idx, 2]
        
        return image1, image2, torch.tensor(similarity, dtype=torch.float32)

# Define transformations (resize, normalization, etc.)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224 (standard for many models)
    transforms.ToTensor(),          # Convert image to PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize for pre-trained models
])


In [8]:
if torch.cuda.is_available():
    device = torch.device('cuda')
# elif torch.backends.mps.is_available():
#     device = torch.device("mps")
else:
    device = torch.device('cpu')

print(device)

cpu


In [9]:
class SiameseNetwork(nn.Module):
    
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        # Setting up the Sequential of CNN Layers
        self.cnn1 = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),
            
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),

            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),
        )
        
        # Adaptive pooling layer to ensure the output size is consistent
        self.adaptive_pool = nn.AdaptiveAvgPool2d((6, 6))  # Adjust pooling size to handle dynamic input
        
        # Defining the fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(256 * 6 * 6, 1024),  # Input size adjusted for adaptive pooling
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            
            nn.Linear(1024, 128),
            nn.ReLU(inplace=True),
            
            nn.Linear(128, 2)
        )
        
    def forward_once(self, x):
        # Forward pass 
        x = self.cnn1(x)
        x = self.adaptive_pool(x)  # Adaptive pooling to standardize the feature map size
        x = x.view(x.size(0), -1)  # Flatten the feature map
        x = self.fc1(x)
        return x

    def forward(self, input1, input2):
        # Forward pass of input 1
        output1 = self.forward_once(input1)
        # Forward pass of input 2
        output2 = self.forward_once(input2)
        return output1, output2

# class SiameseNetwork(nn.Module):
    
#     def __init__(self):
#         super(SiameseNetwork, self).__init__()
#         # Setting up the Sequential of CNN Layers
#         self.cnn1 = nn.Sequential(
#             nn.Conv2d(3, 96, kernel_size=11, stride=1),  # Adjusted for RGB input
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(3, stride=2),
            
#             nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(3, stride=2),
#             nn.Dropout2d(p=0.3),

#             nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
#             nn.ReLU(inplace=True),
            
#             nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(3, stride=2),
#             nn.Dropout2d(p=0.3),
#         )
        
#         # Adaptive pooling layer to ensure the output size is consistent
#         self.adaptive_pool = nn.AdaptiveAvgPool2d((6, 6))  # Adjust pooling size to handle dynamic input
        
#         # Defining the fully connected layers
#         self.fc1 = nn.Sequential(
#             nn.Linear(256 * 6 * 6, 1024),  # Input size adjusted for adaptive pooling
#             nn.ReLU(inplace=True),
#             nn.Dropout(p=0.5),
            
#             nn.Linear(1024, 128),
#             nn.ReLU(inplace=True),
            
#             nn.Linear(128, 2)
#         )
        
#     def forward_once(self, x):
#         # Forward pass 
#         x = self.cnn1(x)
#         x = self.adaptive_pool(x)  # Adaptive pooling to standardize the feature map size
#         x = x.view(x.size(0), -1)  # Flatten the feature map
    #     x = self.fc1(x)
    #     return x

    # def forward(self, input1, input2):
    #     # Forward pass of input 1
    #     output1 = self.forward_once(input1)
    #     # Forward pass of input 2
    #     output2 = self.forward_once(input2)
    #     return output1, output2


In [10]:
class SmallSiameseNetwork(nn.Module):
    
    def __init__(self):
        super(SmallSiameseNetwork, self).__init__()
        # Setting up a smaller CNN
        self.cnn1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=7, stride=1, padding=1),  # Fewer filters, smaller kernel size
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),  # Max pooling with smaller stride to reduce spatial dimensions
            nn.Dropout2d(p=0.2),  # Reduced dropout rate
            
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=1),  # Fewer filters
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),  # Smaller max-pooling
            nn.Dropout2d(p=0.2),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),  # Fewer filters
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, stride=2),
            nn.Dropout2d(p=0.2),
        )
        
        # Adaptive pooling layer to standardize output size (reduce to 3x3)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((3, 3))  # Smaller output size (3x3)
        
        # Defining smaller fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(128 * 3 * 3, 512),  # Reduced size based on new feature map size
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.4),  # Keep some dropout for regularization
            
            nn.Linear(512, 64),  # Smaller fully connected layer
            nn.ReLU(inplace=True),
            
            nn.Linear(64, 2)  # Output layer remains the same
        )
        
    def forward_once(self, x):
        # Forward pass 
        x = self.cnn1(x)
        x = self.adaptive_pool(x)  # Adaptive pooling to standardize the feature map size
        x = x.view(x.size(0), -1)  # Flatten the feature map
        x = self.fc1(x)
        return x

    def forward(self, input1, input2):
        # Forward pass of input 1
        output1 = self.forward_once(input1)
        # Forward pass of input 2
        output2 = self.forward_once(input2)
        return output1, output2


In [11]:
# define the loss function
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on:
    """

    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, x0, x1, y):
        # euclidian distance
        diff = x0 - x1
        dist_sq = torch.sum(torch.pow(diff, 2), 1)
        dist = torch.sqrt(dist_sq)

        mdist = self.margin - dist
        dist = torch.clamp(mdist, min=0.0)

        loss = y * dist_sq + (1 - y) * torch.pow(dist, 2)
        loss = torch.sum(loss) / 2.0 / x0.size()[0]
        
        return loss

In [12]:
def train_siamese_network(model, train_loader, criterion, optimizer, num_epochs):
    model.train()  # Set the model to training mode

    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for i, (img1, img2, labels) in tqdm(enumerate(train_loader)):
            # Move tensors to the appropriate device
            img1, img2, labels = img1.to(device), img2.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            output1, output2 = model(img1, img2)
            loss = criterion(output1, output2, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Print statistics
            running_loss += loss.item()
            if (i + 1) % 10 == 0:  # Print every 10 batches
                print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {running_loss / 10:.4f}")
                running_loss = 0.0

    return model

In [13]:
siamese_net = SmallSiameseNetwork().to(device)
criterion = ContrastiveLoss(margin=1.0)
optimizer = optim.Adam(siamese_net.parameters(), lr=0.001)

# Create datasets
train_dataset = ImageSimilarityDataset(train_data, transform=transform)
test_dataset = ImageSimilarityDataset(test_data, transform=transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Train the model
num_epochs = 100

In [None]:
train_siamese_network(siamese_net, train_loader, criterion, optimizer, num_epochs)

In [None]:
# torch.save(model.state_dict(), "model.pt")
# print("Model Saved Successfully") 