In [24]:
%%capture
!pip install albumentations --upgrade -qqq

In [25]:
import os
import pandas as pd
import random
import matplotlib.pyplot as plt
import cv2
from torch.utils.data import DataLoader, Dataset
import albumentations as A
from PIL import Image
import numpy as np
from torchvision import models
from torchsummary import summary
import torch
from torch import nn
from torch.optim import Adam
from albumentations.pytorch import ToTensorV2
import torch.nn.functional as F

In [26]:
old_path = '/content/Images_folder/'

In [27]:
image_name = []
for i in os.listdir(old_path):
  image_name.append(i)  

image_df = pd.DataFrame()
image_df['image_names'] = image_name

image_df.head()

Unnamed: 0,image_names
0,image2_rotated.jpg
1,image1_colorchange.jpg
2,image1_orig.jpg
3,image2_colorchange.jpg
4,image1_rotated.jpg


In [29]:
image_df['id'] =  image_df['image_names'].str.split('_', expand=True)[0]
image_df['suffix'] = image_df['image_names'].str.split('_', expand=True)[1]
print('shape of the input file:', image_df.shape)
image_df.head()

shape of the input file: (6, 3)


Unnamed: 0,image_names,id,suffix
0,image2_rotated.jpg,image2,rotated.jpg
1,image1_colorchange.jpg,image1,colorchange.jpg
2,image1_orig.jpg,image1,orig.jpg
3,image2_colorchange.jpg,image2,colorchange.jpg
4,image1_rotated.jpg,image1,rotated.jpg


In [31]:
#let's take unique ids from the list
unique_id = image_df['id'].unique()
print('Total number of unique ids:', len(unique_id))
print('\nList of unique ids:', unique_id)

Total number of unique ids: 2

List of unique ids: ['image2' 'image1']


In [33]:
PN = []
PP = []

for i, row in image_df.iterrows():
  id = row['id']
  sample = row['image_names']

  #we need to create two list now
  remaining_id = unique_id[unique_id != id]
  pos_pair = list(image_df[image_df['id'] == id]['suffix'])
  neg_pair = list(image_df[image_df['id'].isin(remaining_id)]['image_names'])

  pos = random.sample(pos_pair,3)
  neg = random.sample(neg_pair,3)

  #now prefix each value with the image identity and form the pairs
  pp_sub = [sample + ',' + id + '_' + x for x in pos]
  pn_sub = [sample + ',' + x for x in neg]

  PP.extend(pp_sub)
  PN.extend(pn_sub)

In [37]:
PP_df = pd.DataFrame()
PN_df = pd.DataFrame()

PP_df['positive_positive'] = PP
PN_df['positive_negative'] = PN

PP_df = PP_df['positive_positive'].str.split(',', expand=True)
PP_df.columns = ['Anchor_image', 'Positive_image']

PP_df.head()

Unnamed: 0,Anchor_image,Positive_image
0,image2_rotated.jpg,image2_orig.jpg
1,image2_rotated.jpg,image2_colorchange.jpg
2,image2_rotated.jpg,image2_rotated.jpg
3,image1_colorchange.jpg,image1_colorchange.jpg
4,image1_colorchange.jpg,image1_orig.jpg


In [38]:
PN_df = PN_df['positive_negative'].str.split(',', expand=True)
PN_df.columns = ['Anchor_image', 'Negative_image']

PN_df.head()

Unnamed: 0,Anchor_image,Negative_image
0,image2_rotated.jpg,image1_rotated.jpg
1,image2_rotated.jpg,image1_orig.jpg
2,image2_rotated.jpg,image1_colorchange.jpg
3,image1_colorchange.jpg,image2_rotated.jpg
4,image1_colorchange.jpg,image2_orig.jpg


In [40]:
image_df = PP_df.join(PN_df['Negative_image'])
image_df.head()

Unnamed: 0,Anchor_image,Positive_image,Negative_image
0,image2_rotated.jpg,image2_orig.jpg,image1_rotated.jpg
1,image2_rotated.jpg,image2_colorchange.jpg,image1_orig.jpg
2,image2_rotated.jpg,image2_rotated.jpg,image1_colorchange.jpg
3,image1_colorchange.jpg,image1_colorchange.jpg,image2_rotated.jpg
4,image1_colorchange.jpg,image1_orig.jpg,image2_orig.jpg


In [41]:
print('Total count:', image_df.shape[0])

Total count: 18


In [43]:
folder_dir = '/content/Images_folder/'

#defining custom dataset for the specific requirement
class TrainDataset(Dataset):
  def __init__(self, image_file, folder_dir, transform):
    
    '''
    Read the csv file and set the corresponding variables
    '''
    self.image_df = image_file    
    self.folder_dir = folder_dir
    self.transform = transform

  def __len__(self):
    '''
    Defining the length of the dataset
    '''    
    return len(self.image_df)

  def __getitem__(self, idx):
    '''
    Process the images from the dataset
    Perform the transformation and return the result to the dataloader
    '''       
    anchor_name = self.image_df.loc[idx, 'Anchor_image']    
    positive_name = self.image_df.loc[idx, 'Positive_image']
    negative_name = self.image_df.loc[idx, 'Negative_image']    

    anchor_image = np.array(Image.open(folder_dir + anchor_name))
    positive_image = np.array(Image.open(folder_dir + positive_name))
    negative_image = np.array(Image.open(folder_dir + negative_name))

    anchor = self.transform(image = anchor_image)
    positive = self.transform(image = positive_image)
    negative = self.transform(image = negative_image)

    return (anchor, positive, negative)

In [44]:
train_transform = A.Compose([
A.Resize(width=224, height=224),
A.Rotate(limit=10, p=0.9, border_mode=cv2.BORDER_CONSTANT),
A.RandomBrightnessContrast(),
#A.HorizontalFlip(p=0.5),
#A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.9),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),])

In [45]:
train_data = TrainDataset(image_df, folder_dir, train_transform)
train_loader = DataLoader(train_data, batch_size = 16, shuffle=True)

In [46]:
#define a siamese network using ResNet152
custom_model = models.resnet152(pretrained=True)

#display the names of the individual blocks
for name, model in custom_model.named_children():
  print(name)

Downloading: "https://download.pytorch.org/models/resnet152-394f9c45.pth" to /root/.cache/torch/hub/checkpoints/resnet152-394f9c45.pth


  0%|          | 0.00/230M [00:00<?, ?B/s]

conv1
bn1
relu
maxpool
layer1
layer2
layer3
layer4
avgpool
fc


In [47]:
#the last layer is the one we want to modify to 128 as this will be used for distance metric comparison
custom_model.fc

Linear(in_features=2048, out_features=1000, bias=True)

In [48]:
in_ftrs = custom_model.fc.in_features
out_ftrs = custom_model.fc.out_features
print('number of input features of the model:', in_ftrs)
print('number of output features of the model:', out_ftrs )

number of input features of the model: 2048
number of output features of the model: 1000


In [49]:
#now update the last layer output to a length of 128
custom_model.fc = nn.Linear(in_ftrs, 128)

In [50]:
#print the updated model
custom_model.fc

Linear(in_features=2048, out_features=128, bias=True)

In [51]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#converting to cuda format
custom_model = custom_model.to(device)

In [72]:
#define triplet loss function
class triplet_loss(nn.Module):
  def __init__(self):
    super(triplet_loss, self).__init__()
    self.margin = 1
  
  def forward(self, anchor, positive, negative):
    pos_dist = (anchor - positive).pow(2).sum(1)
    neg_dist = (anchor - negative).pow(2).sum(1)
    loss = F.relu(pos_dist - neg_dist + self.margin)
    return loss.mean()

In [78]:
def train_accuracy(model, train_loader):
  #set the model in the evaluation mode
  model.eval()

  margin = 1

  total_accuracy = 0

  with torch.no_grad():
    for i, (anchor, positive, negative) in enumerate(train_loader):
      anchor = anchor['image'].to(device)
      positive = positive['image'].to(device)
      negative = negative['image'].to(device)

      anchor_feature = custom_model(anchor)
      positive_feature = custom_model(positive)
      negative_feature = custom_model(negative)

      pos_dist = (anchor_feature - positive_feature).pow(2).sum(1)
      neg_dist = (anchor_feature - negative_feature).pow(2).sum(1)     
      
      pos_neg_diff = (pos_dist - neg_dist + margin).cpu().data      

      total_accuracy += (pos_neg_diff <= 0).sum()*1.0/pos_dist.size()[0]

  return total_accuracy

In [79]:
%%capture
!pip install wandb

In [80]:
import wandb

In [77]:
#logon to the wandb
wandb.login()
wandb.init(project='Siamese_wandbtrail')



VBox(children=(Label(value=' 0.00MB of 0.00MB uploaded (0.00MB deduped)\r'), FloatProgress(value=1.0, max=1.0)…

0,1
accuracy,▅███▂▇██▁███▁▁██████
loss,▁▁▁▁▁▃▁▂▁▁█▁▁▁▁▂▁▁▁▂

0,1
accuracy,1.0
loss,1.65721


In [81]:
loss_fun = triplet_loss()
optimizer = Adam(custom_model.parameters(), lr = 0.0001)
custom_model.train()
wandb_log = {}

for epoch in range(10):
  total_loss = 0
  accuracy = 0
  for i, (anchor, positive, negative) in enumerate(train_loader):
    anchor = anchor['image'].to(device)
    positive = positive['image'].to(device)
    negative = negative['image'].to(device)

    anchor_feature = custom_model(anchor)
    positive_feature = custom_model(positive)
    negative_feature = custom_model(negative)

    optimizer.zero_grad()
    loss = loss_fun(anchor_feature, positive_feature, negative_feature)
    loss.backward()
    optimizer.step()      
    
    total_loss += loss
  
  print('total loss for the epoch %d'%epoch, total_loss.item()/2)
  wandb_log['loss'] = total_loss.item()/2

  accuracy = train_accuracy(model, train_loader)
  print('accuracy for the epoch %d'%epoch, accuracy/2)
  wandb_log['accuracy'] = accuracy/2

  wandb.log(wandb_log)  

total loss for the epoch 0 0.39048638939857483
accuracy for the epoch 0 tensor(1.)
total loss for the epoch 1 0.0
accuracy for the epoch 1 tensor(0.4688)
total loss for the epoch 2 0.16112101078033447
accuracy for the epoch 2 tensor(1.)
total loss for the epoch 3 0.0
accuracy for the epoch 3 tensor(1.)
total loss for the epoch 4 0.0
accuracy for the epoch 4 tensor(0.5000)
total loss for the epoch 5 0.0
accuracy for the epoch 5 tensor(0.4688)
total loss for the epoch 6 0.0
accuracy for the epoch 6 tensor(1.)
total loss for the epoch 7 2.34348726272583
accuracy for the epoch 7 tensor(1.)
total loss for the epoch 8 0.0
accuracy for the epoch 8 tensor(0.9688)
total loss for the epoch 9 0.178547203540802
accuracy for the epoch 9 tensor(0.9375)
