!wget https://www.dropbox.com/s/ic9ym6ckxq2lo6v/Dataset_Signature_Final.zip
#!wget https://www.dropbox.com/s/0n2gxitm2tzxr1n/lightCNN_51_checkpoint.pth
#!wget https://www.dropbox.com/s/9yd1yik7u7u3mse/light_cnn.py
import zipfile
sigtrain = zipfile.ZipFile('Dataset_Signature_Final.zip', mode='r')
sigtrain.extractall()

# http://pytorch.org/
from os.path import exists
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision

In [0]:
import re
import os
import cv2
import random
import numpy as np
import collections

import torch
import torchvision
from torch.utils import data
from torchvision import models
import torch.nn as nn
from torch.utils.data import DataLoader,Dataset
import torch.nn.functional as F
from PIL import Image
import PIL

from numpy.random import choice, shuffle
from itertools import product, combinations, combinations_with_replacement, permutations

import torch.optim as optim
from torchvision import transforms

train_image_list = []
test_image_list = []


for root, dirs, files in os.walk('Dataset'):
  #if (len(dirs) ==0 and off in root):   
  if (len(dirs) ==0):
    for root_sub, dirs_sub, files_sub in os.walk(root):
      for file in files_sub:
        if 'dataset4' not in root_sub:
          train_image_list.append(os.path.join(root_sub,file).rstrip('\n'))
        else:
          test_image_list.append(os.path.join(root_sub,file).rstrip('\n'))

train_image_list_x = []
for i in list(set([re.split('/',image)[1] for image in train_image_list ])):
#datasetx = random.choice(dataset)
#index1 = dataset.index(datasetx)
#for dataset_ in dataset:
  train_image_list_x.append([image for image in train_image_list if i in image])
  
train_image_lis_dataset1 = train_image_list_x[0]
train_image_lis_dataset2 = train_image_list_x[1]
train_image_lis_dataset3 = train_image_list_x[2]


In [0]:
class PhiLoader(data.Dataset):
  
  def __init__(self, image_list, resize_shape, transform=True):
    

    self.image_list = image_list

    self.diff = list(set([str(str(re.split('/',image)[-1]).split('.')[0])[-3:] for image in self.image_list]))
    self.identity_image = []
    
    for i in self.diff:
      self.identity_image.append([image for image in self.image_list if ((str(str(image).split('/')[-1]).split('.')[0]).endswith(i))])

      
    self.PairPool=[]
    
    for user in self.identity_image:
      Real=[]
      Forge=[]
      for image in user:
        if 'real' in image:
          Real.append(image)

        else:
          Forge.append(image)
          
      self.PairPool.extend(list(product(Real,Forge+Real)))
 
    self.Dimensions = resize_shape
    self.transform=transform
    self.labels=[]
    self.ToGray=transforms.Grayscale()
    self.RR=transforms.RandomRotation(degrees=10,resample=PIL.Image.CUBIC)
    self.Identity = transforms.Lambda(lambda x : x)
    self.RRC = transforms.Lambda(lambda x : self.RandomRCrop(x))
    self.Transform=transforms.RandomChoice([self.RR,
                                            self.RRC,
                                            self.Identity
                                           ])
    self.T=transforms.ToTensor()
    self.labels=[]
    
  def __len__(self):

    return len(self.PairPool)
  
  def RandomRCrop(self,image):
    width,height = image.size
    size=random.uniform(0.9,1.00)
    #ratio = random.uniform(0.45,0.55)
    newheight = size*height
    newwidth = size*width
    T=transforms.RandomCrop((int(newheight),int(newwidth)))
    return T(image)

  
  def __getitem__(self,index):
    
    #print("index",index)
    index=index%len(self.PairPool)
    pairPool = self.PairPool[index]
    
    img1 = self.ToGray(Image.open(pairPool[0]))
    img2 = self.ToGray(Image.open(pairPool[1]))  
    
    label_1 = pairPool[0].split('/')[2]
    label_2 = pairPool[1].split('/')[2]
    

    
    
    if label_1 == label_2:              ### same class
      l=0.0
      self.labels.append(l)
      
    else:                               ### different class     
      l=1.0
      self.labels.append(l)
      
      
    if self.transform:
      img1 = self.Transform(img1)
      img2 = self.Transform(img2) 
    
    return self.T(img1.resize(self.Dimensions)), self.T(img2.resize(self.Dimensions)), torch.tensor(l)  

    

In [0]:
class PhiNet(nn.Module):
    def __init__(self, ):
        super(PhiNet, self).__init__()
        self.layer1 = nn.Sequential(
                    nn.Conv2d(1,96,kernel_size=11,stride=1),
                    nn.ReLU(),
                    nn.LocalResponseNorm(5, alpha=1e-4, beta=0.75, k=2),
                    nn.MaxPool2d(kernel_size=3, stride=2))
  

        self.layer2 = nn.Sequential(
                    nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
                    nn.ReLU(),
                    nn.LocalResponseNorm(5, alpha=1e-4, beta=0.75, k=2),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                    nn.Dropout2d(p=0.3))
        
        self.layer3 = nn.Sequential(
                    nn.Conv2d(256,384, kernel_size=3, stride=1, padding=1))
        
        self.layer4 = nn.Sequential(
                    nn.Conv2d(384,256, kernel_size=3, stride=1, padding=1),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                    nn.Dropout2d(p=0.3))
        
        self.layer5 = nn.Sequential(
                    nn.Conv2d(256,128, kernel_size=3, stride=1, padding=1),
                    nn.MaxPool2d(kernel_size=3, stride=2),
                    nn.Dropout2d(p=0.3))
        
        self.adap = nn.AdaptiveAvgPool3d((128,6,6))
        
        self.layer6 = nn.Sequential(
                    nn.Linear(4608,512),
                    nn.ReLU(),
                    nn.Dropout(p=0.5))
        
        
        
        self.layer7 = nn.Sequential(
                    nn.Linear(512,128),
                    nn.ReLU())
        
               
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = self.adap(out)

        out = out.reshape(out.size()[0], -1)
        
        
        out = self.layer6(out)
        out = self.layer7(out)
        
        return out

In [0]:
import math

def set_optimizer_lr(optimizer, lr):
    # callback to set the learning rate in an optimizer, without rebuilding the whole optimizer
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr
    return optimizer
  
def se(initial_lr,iteration,epoch_per_cycle):
  return initial_lr * (math.cos(math.pi * iteration / epoch_per_cycle) + 1) / 2

In [0]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive

    
def contrastive_loss():
    return ContrastiveLoss()

In [0]:
def compute_accuracy_roc(predictions, labels):
  
  '''
 Compute ROC accuracy with a range of thresholds on distances.
 '''

  dmax = np.max(predictions)

  dmin = np.min(predictions)

  nsame = np.sum(labels == 0)

  ndiff = np.sum(labels == 1)
  thresh=1.0

  step = 0.01
  max_acc = 0

  for d in np.arange(dmin, dmax+step, step):
       
    
    idx1 = predictions.ravel() <= d
    idx2 = predictions.ravel() > d

    tpr = float(np.sum(labels[idx1] == 0)) / nsame       
    tnr = float(np.sum(labels[idx2] == 1)) / ndiff
    acc = 0.5 * (tpr + tnr)       
    

    if (acc > max_acc):
      
      max_acc = acc
      thresh=d

  return max_acc,thresh

In [0]:
trainloader1 = torch.utils.data.DataLoader(PhiLoader(image_list = train_image_lis_dataset1, resize_shape=[128,64]), 
                                                             batch_size=32, num_workers=4, shuffle = True, pin_memory=False)

trainloader1_hr = torch.utils.data.DataLoader(PhiLoader(image_list = train_image_lis_dataset1, resize_shape=[256,128]), 
                                                             batch_size=16, num_workers=4, shuffle = True, pin_memory=False)

trainloader1_uhr = torch.utils.data.DataLoader(PhiLoader(image_list = train_image_lis_dataset1, resize_shape=[512,256]), 
                                                             batch_size=4, num_workers=0, shuffle = False, pin_memory=False)

trainloader3 = torch.utils.data.DataLoader(PhiLoader(image_list = train_image_lis_dataset3, resize_shape=[512,256]), 
                                                             batch_size=32, num_workers=1, shuffle = False, pin_memory=False)

testloader = torch.utils.data.DataLoader(PhiLoader(image_list = test_image_list, resize_shape=[256,128]), 
                                                             batch_size=32, num_workers=1, shuffle = True, pin_memory=False)

In [33]:
device = torch.device("cuda:0")
print(device)
best_loss = 99999999

phinet = PhiNet().to(device)



cuda:0


In [0]:
siamese_loss = contrastive_loss() ### Notice a new loss. contrastive_loss function is defined above.
siamese_loss = siamese_loss.to(device)


In [0]:
def test(epoch):
  
    global best_loss
    phinet.eval()
    test_loss = 0
    correct = 0
    total = 1
    
    
    for batch_idx, (inputs_1, inputs_2, targets) in enumerate(testloader):
      
      with torch.no_grad():
        
        inputs_1, inputs_2, targets = inputs_1.to(device), inputs_2.to(device), targets.to(device)
        
        features_1 = phinet(inputs_1)    ### get feature for image_1
        features_2 = phinet(inputs_2)      ### get feature for image_2      
        
        loss = siamese_loss(features_1, features_2, targets.float())
        test_loss += loss.item()
        

    # Save checkpoint.
    losss = test_loss/len(testloader)
    if  losss < best_loss:   ### save model with the best loss so far
        print('Saving..') 
        state = {
            'net': phinet
        }
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
        torch.save(state, 'checkpoint/phinet_siamese.stdt')
        best_loss = losss
    
    return test_loss/len(testloader)

In [0]:
def train_se(epochs_per_cycle,initial_lr,dl):
    phinet.train()
    snapshots = []
    global epoch;
    epoch_loss=0
    cycle_loss=0
    global optimizer 

    for j in range(epochs_per_cycle):
        epoch_loss = 0
        print('\nEpoch: %d' % epoch)
        lr = se(initial_lr, j, epochs_per_cycle)
        optimizer = set_optimizer_lr(optimizer, lr)
        train = trainloader1

        for batch_idx, (inputs_1, inputs_2, targets) in enumerate(train):

            inputs_1, inputs_2, targets = inputs_1.to(device), inputs_2.to(device), targets.to(device)
 
            optimizer.zero_grad()
            features_1 = phinet(inputs_1)    ### get feature for image_1
            features_2 = phinet(inputs_2)  

            loss =siamese_loss(features_1, features_2, targets) 

            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()/len(train)
            

        epoch+=1
        cycle_loss += epoch_loss/(epochs_per_cycle)
        print ("e_Loss:",epoch_loss);  

    print("c_loss:",cycle_loss)
    snapshots.append(phinet.state_dict())
  
    return snapshots

In [0]:
lr=1e-4
epoch=0
optimizer = optim.SGD(phinet.parameters(),lr=lr)

In [0]:
for i in range(6):
  train_se(3,lr,trainloader1)
  test_loss = test(i)
  print("Test Loss: ", test_loss)



Epoch: 0
e_Loss: 1.8136854987395439

Epoch: 1
e_Loss: 1.8160580522135685

Epoch: 2
e_Loss: 1.8135544312627692
c_loss: 1.8144326607386272
Saving..
Test Loss:  1.9583299776603436

Epoch: 3


  "type " + obj.__name__ + ". It won't be checked "


e_Loss: 1.8022973537445073

Epoch: 4
e_Loss: 1.8116473335968817

Epoch: 5
e_Loss: 1.8046352110410988
c_loss: 1.8061932994608294


In [0]:
for i in range(6):
  train_se(3,lr,trainloader1_hr)
  test_loss = test(i)
  print("Test Loss: ", test_loss)

In [0]:
loaded=torch.load('checkpoint/phinet_siamese.stdt')['net']


In [0]:
import gc
def predict(model,dataloader,fn):
  model.eval()
  model.cuda()
  labels=[]
  out=[]
  pwd = torch.nn.PairwiseDistance(p=1)
  for x0,x1,label in dataloader:
    
    labels.extend(label.numpy())
    a=model(x0.cuda())
    b=model(x1.cuda())
    #print(torch.log(a/(1-a)),a)
    out.extend(pwd(a,b))
    #!nvidia-smi
    

    
  return fn(np.asarray(out),np.asarray(labels))
    

In [0]:
testloader_ = torch.utils.data.DataLoader(PhiLoader(image_list = train_image_lis_dataset2, resize_shape=[256,128]), 
                                                             batch_size=16, num_workers=0, shuffle = False, pin_memory=False)

with torch.no_grad():
  maxacc,threshold  = predict(loaded,testloader_,compute_accuracy_roc)
  print("Accuracy:{:0.3f}".format(maxacc*100),"Threshold:{:0.3f}".format(threshold))
    