In [142]:
# Given Data Loader in Siamese_Sample folder
# Train Libraries
from __future__ import print_function
import argparse, random, copy
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision import transforms as T
from torch.optim.lr_scheduler import StepLR
# from dataloader import train_test_split, FingerprintDataset, ImageTransform
# from model import SiameseNetwork
import json


# Model Libraries
import torch
import torch.nn as nn
import torchvision


# Dataloader libraries
import os
import random
import albumentations
import torch
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import numpy as np


import matplotlib.pyplot as plt
import numpy as np
import random


In [143]:
def imshow(img, text=None):
    npimg = img.numpy()
    plt.axis("off")
    if text:
        plt.text(75, 8, text, style='italic',fontweight='bold',
            bbox={'facecolor':'white', 'alpha':0.8, 'pad':10})
        
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()    

# Plotting data
def show_plot(iteration,loss):
    plt.plot(iteration,loss)
    plt.show()

In [144]:
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_dist = F.pairwise_distance(output1, output2, keepdim=True)

        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_dist, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_dist, min=0.0), 2))
        return loss_contrastive

In [145]:

def train(model, device, train_loader, optimizer, epoch, criterion):
    counter = []
    loss_history = []
    iteration_number = 0

    for i, (img0, img1, label, _, _) in enumerate(train_loader, 0):

        # Send the images and labels to CUDA
        img0, img1, label = img0.cuda(), img1.cuda(), label.cuda()

        # Zero the gradients
        optimizer.zero_grad()

        # Pass in the two images into the network and obtain two outputs
        output1, output2 = model(img0, img1)

        # Pass the outputs of the networks and label into the loss function
        loss_contrastive = criterion(output1, output2, label)

        # Calculate the backpropagation
        loss_contrastive.backward()

        # Optimize
        optimizer.step()

        # Every 10 batches print out the loss
        if i % 10 == 0 :
            print(f"Epoch number {epoch}\n Current loss {loss_contrastive.item()}\n")
            iteration_number += 10
            counter.append(iteration_number)
            print(iteration_number)
            loss_history.append(loss_contrastive.item())
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
				epoch, i * len(img0), len(train_loader.dataset),
				100. * i / len(train_loader), loss_contrastive.item()))
        # if iteration_number==:
        #     show_plot(counter, loss_history)

In [146]:
# The FingerPrintDataset used to load into pytorch dataloader
class FingerprintDataset(Dataset):
  def __init__(self, data_nums, data_list, transform=None, transAug=None):
    self.data_nums = data_nums
    self.data_list = data_list
    self.transform = transform
    self.transAug = transAug
    
  
  def __getitem__(self, idx):
  
    real_dir = '../../SOCOFingBMP/Real'
    real_imgs = os.listdir(real_dir)

    # img will be a case from altered image
    img = self.data_list[idx]
    # find a corresponding name that can be used for identifying the real image
    img_name_real = img[img.rfind('/')+1:img.rfind('_')] + '.BMP'
    
    # This will set probability for creating matching or non-matching case at 50%
    print_match = idx % 2 == 0

    # This will set probability for applying augmentation at 50%
    apply_aug = random.randint(0,1)
    
    # print_match : 0 means non-matching fingerprints
    # print_match : 1 means matching fingerprints
    if print_match : # Define the directory for which we will find the matching real case (same individual)
      real_img_dir = os.path.join(real_dir, img_name_real)
    else :
      while True: # Define the directory for which we will find the non-matching real case (different individual)
        real_img_num = random.choice(self.data_nums)-1 
        img_name_real_unmatch = real_imgs[real_img_num]
        
        if img_name_real != img_name_real_unmatch:
          real_img_dir = os.path.join(real_dir, img_name_real_unmatch)
          break

    real_directory = real_img_dir
    # print(real_directory)
    altered_directory = img
    # print(altered_directory)
    real = Image.open(real_img_dir).convert("L")
    altered = Image.open(img).convert("L")
    
    real = self.transform(real)

    if self.transAug is None :
      altered = self.transform(altered)
    else : 
      if apply_aug :
        altered_np = np.array(altered)
        altered_augmented = self.transAug(altered_np)
        altered = Image.fromarray(altered_augmented['image'])
        altered = self.transform(altered)
      else : 
        altered = self.transform(altered)
    
    # label = torch.from_numpy(np.array([int(print_match)], dtype=np.long))
    label = torch.tensor(print_match, dtype=torch.float)
    # label = print_match
    # pairset = {"real": real, "altered": altered, "label": torch.from_numpy(np.array([int(print_match)], dtype=np.float32))}
    
    # return pairset
    return real, altered, label, real_directory, altered_directory

  def __len__(self):
    length = len(self.data_list)
    return length

In [147]:
def train_test_split(base_dir, train_ratio, num_people = 600, randomize = True, tests = None):

  # Splitting the train & test data according to the 'train_ratio' (ex. 0.9 : 0.1) 
  train_size = num_people * train_ratio
  test_size = num_people - train_size

  # Setting train_nums to be a list composed of 1 ~ 600
  train_nums = [i+1 for i in range(0,num_people)]
  # Setting test_nums to be a list composed of 600 zeros ([0,0,.....,0])
  test_nums = [0 for i in range(0, num_people)]

  train_list = []
  test_list = []

  altered_root = os.path.join(base_dir, "Altered")
  altered_list = ["Altered-Easy", "Altered-Medium", "Altered-Hard"]

  test_check = []

  # Randomly select people who will be used as test subjects
  if randomize :
    while len(test_check) < test_size:
      test = random.randint(1,num_people)
      if test not in test_check :
        test_check.append(test)
        test_nums[test-1] = test
    
    for i in range(0, len(train_nums)):
      train_nums[i] = train_nums[i]-test_nums[i]

    train_nums = set(train_nums)
    train_nums.remove(0)
    train_nums = list(train_nums)

    test_nums = set(test_nums)
    test_nums.remove(0)
    test_nums = list(test_nums)

    print("Num Train Data : ",len(train_nums))
    print("Num Test Data : ", len(test_nums))
  # If we pre-select test subjects, then just remove these from the train subjects
  # train subjects will initially contain all individuals (1 ~ 600)
  else :
    test_nums = tests
    for i in range(0, len(test_nums)):
      if test_nums[i] in train_nums:
        train_nums.remove(test_nums[i])
  
  # Let's go through the altered images and organize them depending on 
  # whether they can be used for training or testing of the model
  for altered in altered_list : 
    altered_dir = os.path.join(altered_root, altered)
    altered_imgs = os.listdir(altered_dir)
    
    for img in altered_imgs :
      person_num = int(img[:img.find('__')])
      if person_num in test_nums :
        test_img_path = os.path.join(altered_dir, img)
        test_list.append(test_img_path)
      else:
        train_img_path = os.path.join(altered_dir, img)
        train_list.append(train_img_path)

  # train_list & test_list are made of altered images
  return train_nums, train_list, test_nums, test_list

class ImageTransform():
  def __init__(self, mean, std):
    self.data_transform = transforms.Compose([
        transforms.Resize((100, 100)),
        # transforms.Resize((95, 95)),
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])

  def __call__(self, img):
    # print(type(img))
    return self.data_transform(img)
      
class ImageAugTransform_train():
  def __init__(self, mean, std):
    self.albumentations_transform_oneof = albumentations.Compose([
                                                                  albumentations.Resize(100, 100),
                                                                  albumentations.Normalize(mean=mean, std=std, max_pixel_value=255),
                                                                  albumentations.OneOf([
                                                                                        albumentations.HorizontalFlip(p=0.5),
                                                                                        albumentations.RandomRotate90(p=0.5)
                                                                                        ], p=0.5),
                                                                  albumentations.OneOf([
                                                                  albumentations.GaussNoise(p=0.5)
                                                                  ], p=0.5),
                                                                  albumentations.RandomBrightness(limit=0.2, always_apply=False, p=0.5)
                                                                  ])

  def __call__(self, img):
      return self.albumentations_transform_oneof(image=img)


In [148]:
## ORIGINAL SIAMESE NETWORK 
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.resnet = torchvision.models.resnet18(pretrained=False)
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.fc_in_features = self.resnet.fc.in_features
        self.resnet = torch.nn.Sequential(*(list(self.resnet.children())[:-1]))

        # convolutional neural network
        # self.cnn1 = nn.Sequential(
        #     nn.Conv2d(1, 96, kernel_size=11, stride=4),
        #     nn.ReLU(inplace=True),
        #     nn.MaxPool2d(3, stride=2),

        #     nn.Conv2d(96, 256, kernel_size=5, stride=1),
        #     nn.ReLU(inplace=True),
        #     nn.MaxPool2d(2, stride=2),

        #     nn.Conv2d(256, 384, kernel_size=3, stride=1),
        #     nn.ReLU(inplace=True)
        # )

        # fully connected layers
        self.fc1 = nn.Sequential(
            # nn.Linear(512, 1024),
            # nn.ReLU(inplace=True),

            # nn.Linear(1024, 256), 
            # nn.ReLU(inplace=True),

            # nn.Linear(256, 2)
            nn.Linear(self.fc_in_features, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 1),
        )
        self.sigmoid = nn.Sigmoid()
        self.resnet.apply(self.init_weights)
        self.fc1.apply(self.init_weights)


    def init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform(m.weight)
            m.bias.data.fill_(0.01)
    
    def forward_once(self, x):
        output = self.resnet(x)
        # output = self.cnn1(output)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output
    
    def forward(self, input1, input2):
        # output1 = self.forward_once(input1)
        # output2 = self.forward_once(input2)
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)

        # concatenate both images' features
        # output = torch.cat((output1, output2), 1)

        # pass the concatenation to the linear layers
        # output = self.fc1(output)

        # pass the out of the linear layers to sigmoid layer
        # output = self.sigmoid(output)
        
        # return output
        return output1, output2

In [149]:
def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    # we aren't using `TripletLoss` as the MNIST dataset is simple, so `BCELoss` can do the trick.
    criterion = ContrastiveLoss()

    with torch.no_grad():
        for (images_1, images_2, targets, real_addr, altered_addr) in test_loader:
            images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)
            output1, output2 = model(images_1, images_2)

            # Calculate the distance between output1 and output2
            euclidean_dist = F.pairwise_distance(output1, output2)

            # Calculate the loss
            loss = criterion(output1, output2, targets)

            test_loss += loss.item()  # Sum up batch loss

            # Convert distances to binary predictions
            pred = (euclidean_dist > 0.5).float()

            # Calculate the number of correct predictions
            correct += pred.eq(targets).sum().item()
            # test_loss += criterion(output1, output2, targets).sum().item()  # sum up batch loss
            # outputs = (output1+output2)/2.0
            # ones = torch.ones(outputs.shape[0],).to(device)
            # zeros = torch.zeros(outputs.shape[0],).to(device)
            # pred = torch.where(outputs > 0.5, ones, zeros)  # get the index of the max log-probability
            # correct += pred.eq(targets.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    # for the 1st epoch, the average loss is 0.0001 and the accuracy 97-98%
    # using default settings. After completing the 10th epoch, the average
    # loss is 0.0000 and the accuracy 99.5-100% using default settings.
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [150]:
def main():
    
    batch_size = 128
    # parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
    #                     help='input batch size for testing (default: 1000)')
    test_batch_size = 1000
    # parser.add_argument('--epochs', type=int, default=14, metavar='N',
    #                     help='number of epochs to train (default: 14)')
    epochs = 3
    # parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
    #                     help='learning rate (default: 1.0)')
    lr = 0.01
    # parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
    #                     help='Learning rate step gamma (default: 0.7)')
    gamma = 0.7
    # parser.add_argument('--no-cuda', action='store_true', default=False,
    #                     help='disables CUDA training')
    no_cuda = False
    # parser.add_argument('--no-mps', action='store_true', default=False,
    #                     help='disables macOS GPU training')
    no_mps = False
    # parser.add_argument('--dry-run', action='store_true', default=False,
    #                     help='quickly check a single pass')
    dry_run = False
    # parser.add_argument('--seed', type=int, default=1, metavar='S',
    #                     help='random seed (default: 1)')
    seed = 1
    # parser.add_argument('--log-interval', type=int, default=10, metavar='N',
    #                     help='how many batches to wait before logging training status')
    log_interval = 10
    # parser.add_argument('--save-model', action='store_true', default=False,
    #                     help='For Saving the current Model')
    save_model = True
    # args = parser.parse_args()
    cuda_num = 1
    use_cuda = not no_cuda and torch.cuda.is_available()
    torch.cuda.set_device(cuda_num)
    torch.manual_seed(seed)
    device = torch.device("cuda:{}".format(cuda_num) if torch.cuda.is_available() else "cpu")
    # device = torch.device("cpu")
    train_kwargs = {'batch_size': batch_size}
    test_kwargs = {'batch_size': test_batch_size}
    if use_cuda:
        cuda_kwargs = {'num_workers': 1,
                       'pin_memory': True,
                       'shuffle': True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)


    mean = (0.5,) #q1
    std = (0.5,)

    # device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
    
    # torch.cuda.set_device(2)
    # Change the directory to where SOCOFing is located. 
    # Direcotories should look as the following
    # SOCOFing
    # -- Altered
    #    -- ...
    # -- Real
    #    -- ...
    base_dir = '../../SOCOFingBMP/'

    nums = {}
    train_nums, train_list, test_nums, test_list = train_test_split(base_dir, 0.7)
    print("Train List Length : ", len(train_list))
    print("Test List Length : ", len(test_list))

    nums['train'] = train_nums
    nums['test'] = test_nums
    
    # state_dict save file name : siamese_network_train%_test%_lr.pt
    outpath = 'train_test_split_7_3_1.json'

    with open(outpath, 'w') as f:
        json.dump(nums, f)

    print(device)
    train_dataset = FingerprintDataset(train_nums, train_list,transform=ImageTransform(mean,std), transAug=None)
    test_dataset = FingerprintDataset(test_nums, test_list, transform=ImageTransform(mean,std))
    
    train_loader = torch.utils.data.DataLoader(train_dataset,**train_kwargs)
    test_loader = torch.utils.data.DataLoader(test_dataset, **test_kwargs)

    model = SiameseNetwork().to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
            
    criterion = ContrastiveLoss()
    scheduler = StepLR(optimizer, step_size=1, gamma=gamma)

    # Create a simple dataloader just for simple visualization
    # vis_dataloader = DataLoader(trainset,
    #                         shuffle=True,
    #                         num_workers=2,
    #                         batch_size=8)

    # # Extract one batch
    # example_batch = next(iter(vis_dataloader))

    # # Example batch is a list containing 2x8 images, indexes 0 and 1, an also the label
    # # If the label is 1, it means that it is not the same person, label is 0, same person in both images
    # concatenated = torch.cat((example_batch[0], example_batch[1]),0)

    # imshow(torchvision.utils.make_grid(concatenated))
    # print(example_batch[2].numpy().reshape(-1))

    #######################################################

    for epoch in range(epochs):
        print(epoch)
        train(model, device, train_loader, optimizer, epoch, criterion)
        # test(model, device, test_loader)
        scheduler.step()



    # # Locate the test dataset and load it into the SiameseNetworkDataset
    siamese_dataset = FingerprintDataset(test_nums, test_list, transform=ImageTransform(mean,std))
    test_dataloader = DataLoader(siamese_dataset, batch_size=1, shuffle=True)

    # Grab one image that we are going to test
    dataiter = iter(test_dataloader)
    x0, x1, label, _, _ = next(dataiter)

    while label!=0:
        x0, x1, label, _, _ = next(dataiter)
        
    first = 0
    success = 0
    total = 0
    for i in range(1000):
        # Iterate over 5 images and test them with the first image (x0)
        if first>0:
            x0, x1, label, _, _ = next(dataiter)
        first+=1
        # Concatenate the two images together
        concatenated = torch.cat((x0, x1), 0)
        print(concatenated.size())
        print("label:", label)
        output1, output2 = model(x0.cuda(), x1.cuda())
        print("Outputs: ", output1, output2)
        euclidean_distance = F.pairwise_distance(output1, output2)[0]
        print("euclidean dist:", euclidean_distance)
        # imshow(torchvision.utils.make_grid(concatenated), f'Dissimilarity: {euclidean_distance.item():.2f}')
        if (euclidean_distance.item() < 0.75 and label==1) or (euclidean_distance.item() >= 0.75 and label==0):
            success+=1
        total+=1
    

    print("Accuracy: ", success/total)

if __name__ == "__main__":
    main()

Num Train Data :  420
Num Test Data :  180
Train List Length :  34401
Test List Length :  14869
cuda:1




0


  torch.nn.init.xavier_uniform(m.weight)


Epoch number 0
 Current loss 1.2771819829940796

10
Epoch number 0
 Current loss 2.279956817626953

20
Epoch number 0
 Current loss 7.030306816101074

30


KeyboardInterrupt: 

In [None]:
## Tasks
# 0. Figure out why the resnet18 structure is not learning properly (test to at least 50 to 100 epochs)
# 0.5. Tweak code to output accuracy of test cases of each level (easy, med, hard) not altogether bc right now it is randomized 
# 1. Change Structures: Test Contrastive Loss with Resnet 50 structures 
# 2. Change Structures: Test BCE Loss with Resnet 50 structures
# 3. Learn about the matrix calculations in CNNs
# 4. Learn about vision transformers 
# 5. Try different optimizers: ADAM, AdaDelta