# **EfficientNet-B3 Siamese-Network Thresholds**
* [Imports](#section-one)
* [Load model](#section-two)
* [Get thresholds](#section-three)
    - [IDs](#sub-section-three-one)
    - [Driving Licenses](#sub-section-three-two)
    - [Passports](#sub-section-three-three)

In [1]:
!pip install efficientnet_pytorch

<a id="section-one"></a>
## **Imports**

In [2]:
import sys
sys.path.append("../input/libraries")

import torch
import torchvision
from torch import nn
from efficientnet_pytorch import EfficientNet
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import random
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
import time
from torch.utils.data import Dataset,DataLoader
from torch.utils.data.sampler import SequentialSampler, RandomSampler
from tqdm.notebook import tqdm_notebook
from validation import plot, get_metrics, get_best_thresholds, validate_thresh

%matplotlib inline

In [3]:
ID = 'id'
DRIVING_LICENSE = 'driving_license'
PASSPORT = 'passport'

In [4]:
img_size = 200
n_epochs = 20
test_size = 0.2
BATCH_SIZE = 8

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

<a id="section-two"></a>
## **Load model**

In [5]:
def get_transforms(apply_augmentations=True):
    if apply_augmentations:
        return A.Compose([
            A.Rotate(limit=20),
            A.Flip(),
            A.OneOf([
                A.HueSaturationValue(), 
                A.RandomBrightnessContrast(),
            ], p=0.4),
            A.OneOf([
                A.Blur(blur_limit=3),
                A.MedianBlur(blur_limit=3),
                A.GaussNoise()
                ],p=0.4),
            A.Normalize(p=1.0),
            A.Resize(height=img_size, width=img_size, p=1),
            ToTensorV2(p=1.0),
        ], p=1.0)

    else:
        return A.Compose([
            A.Resize(height=img_size, width=img_size, p=1),
            A.Normalize(p=1.0),
            ToTensorV2(p=1.0),
        ])

In [6]:
class TripletsDataset(Dataset):
    def __init__(self, df):
        super().__init__()
        self.df = df
    
    def __get_image(self, image_path, aug_prob=0.7):
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        prob = random.uniform(0.0, 1.0)
        transforms = get_transforms(True if prob<=aug_prob else False)
        transformed = transforms(image=image)
        image = transformed['image']
        
        return image
            
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        anchor_path = row['anchor']
        positive_path = row['positive']
        negative_path = row['negative']
        
        return self.__get_image(anchor_path), self.__get_image(positive_path), self.__get_image(negative_path)
    
    def __len__(self):
        return len(self.df)

In [7]:
def data_loader(dataset, train):
    if train:
        sampler = RandomSampler(dataset)
    else:
        sampler = SequentialSampler(dataset)
    return torch.utils.data.DataLoader(dataset,
                                       batch_size=BATCH_SIZE,
                                       sampler=sampler,
                                       #pin_memory=False,
                                       #drop_last=True,
                                       shuffle=False)

In [8]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        self.base_cnn = EfficientNet.from_pretrained('efficientnet-b3') # 1536
        self.fc = nn.Sequential(nn.Linear(1536 , 512),
                                nn.BatchNorm1d(512),
                                nn.Dropout(0.3),
                                nn.Linear(512 , 256))
    
    def _get_vector(self, x):
        x = self.base_cnn.extract_features(x)
        x = self.base_cnn._avg_pooling(x)
        x = x.flatten(start_dim=1)
        x = self.base_cnn._dropout(x)
        x = self.fc(x)

        return x

    def forward(self, anchor, positive, negative):
        return self._get_vector(anchor), self._get_vector(positive), self._get_vector(negative)

In [9]:
class SiameseModel:
    def __init__(self, verbose=True):
        self.model = SiameseNetwork().to(device)
        
    def compile(self, loss_fn, path=None):
        self.loss_fn = loss_fn
        if path is not None:
            self.load(path)
            
    # load checkpoint from path
    def load(self, path):
        map_location = device.type if device.type=='cpu' else None
        self.model.load_state_dict(torch.load(path, map_location=map_location)['model_state_dict'])
        self.model.eval()
    
    
    def get_vector(self, image, dim=1):        
        vector = self.model._get_vector(image.to(device).float())
        if dim==1:
            return vector[0].cpu()
        
        return vector.cpu()
    
    def get_loss(self, input_data, true_data):
        with torch.no_grad():
            input_vector = self.get_vector(input_data, dim=None)
            true_vector = self.get_vector(true_data, dim=None)

            loss = self.loss_fn(input_vector.to(device), true_vector.to(device))
            
        return loss.detach().item()
    
    def is_match(self, input_doc, doc_vector):
        if not torch.is_tensor(doc_vector):
            if isinstance(doc_vector, list):
                doc_vector = np.array(doc_vector)
            assert isinstance(doc_vector, np.ndarray), f'Expected vector input to be of a list/numpy array/torch tensor, but got {type(doc_vector)}.'
            doc_vector = torch.as_tensor(doc_vector)
        
        with torch.no_grad():
            input_vector = self.get_vector(input_doc, dim=None)

            loss = self.loss_fn(input_vector.to(device), doc_vector.to(device)).detach().item()
        return loss   
       # if loss <= THRESHOLD:
       #     return True
        #return False

In [10]:
doc_siamese_model = SiameseModel()
loss_fn = nn.PairwiseDistance()
model_path = '../input/siamese-model/siamese_model.bin'
doc_siamese_model.compile(loss_fn, path=model_path)

<a id="section-three"></a>
## **Get thresholds**

In [24]:
test_docs_dir = '../input/documents/test_data'

In [27]:
def get_image(image_path, apply_transforms=True):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    assert image is not None, f'Document not found in path "{image_path}".'
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
    transforms = get_transforms(apply_transforms)
    transformed = transforms(image=image)
    image = transformed['image']
    
    if len(image.shape)==2:
            image = image[None, :, :]
    if len(image.shape)==3:
        image = image[None, :, :, :]

    assert len(image.shape)==4, f'Expected input data of 4 dimensions, but got {len(image.shape)}.'
    return image
    
def get_distances(test_docs_dir, person, doc_type):
    distances = []
    docs = ['ids', 'driving_licenses', 'passports']
    assert doc_type in docs, f'Expected doc_type to be one of [ids, driving_licenses, passports] but got {doc_type}.'
    
    anchor = get_image(os.path.join(os.path.join(test_docs_dir, doc_type), person), False)
    others = [other_type for other_type in docs if other_type!=doc_type]
    for other_type in others:
        if person in os.listdir(os.path.join(test_docs_dir, other_type)):
            start = time.time()
            true_dist = doc_siamese_model.get_loss(anchor, get_image(os.path.join(os.path.join(test_docs_dir, doc_type), person)))
            end = time.time()
            distances.append({'dist':true_dist, 'match':True, 'time':end-start})
            
            start = time.time()
            false_dist = doc_siamese_model.get_loss(anchor, get_image(os.path.join(os.path.join(test_docs_dir, other_type), person)))
            end = time.time()
            distances.append({'dist':false_dist, 'match':False, 'time':end-start}) 
    
    other_person = random.choice(os.listdir(os.path.join(test_docs_dir, doc_type)))
    while other_person == person:
        other_person = random.choice(os.listdir(os.path.join(test_docs_dir, doc_type)))
    start = time.time()
    true_dist = doc_siamese_model.get_loss(anchor, get_image(os.path.join(os.path.join(test_docs_dir, doc_type), person)))
    end = time.time()
    distances.append({'dist':true_dist, 'match':True, 'time':end-start})
    
    start = time.time()
    false_dist = doc_siamese_model.get_loss(anchor, get_image(os.path.join(os.path.join(test_docs_dir, doc_type), other_person)))
    end = time.time()
    distances.append({'dist':false_dist, 'match':False, 'time':end-start}) 
    
    return distances

In [28]:
id_distances = []

for person in tqdm_notebook(os.listdir(os.path.join(test_docs_dir, 'ids'))):
    id_distances += get_distances(test_docs_dir, person, 'ids')

<a id="sub-section-three-one"></a>
### **IDs**

In [29]:
id_distances_df = pd.DataFrame(id_distances)
id_distances_df

In [30]:
id_distances_df.to_csv('id_distances_df.csv', index=False)

In [31]:
true_match = id_distances_df[id_distances_df['match']==True]
false_match = id_distances_df[id_distances_df['match']==False]

plot(true_match.drop(columns=['match','time']), false_match.drop(columns=['match','time']))

In [32]:
dist_field = {'name':'dist', 
              'mean': true_match.dist.mean(), 
              'std': true_match.dist.std(),
              'upper_bound': false_match.dist.mean()}
metrics = get_metrics(true_match.drop(columns='match'), false_match.drop(columns='match'), [dist_field])
best_thresholds = get_best_thresholds(metrics, ['accuracy', 'f1_score'], n=5)
best_thresholds

In [33]:
best_thresholds.to_csv('id_best_thresholds.csv', index=False)

<a id="sub-section-three-two"></a>

### **Driving Licenses**

In [36]:
driving_license_distances = []

for person in tqdm_notebook(os.listdir(os.path.join(test_docs_dir, 'driving_licenses'))):
    driving_license_distances += get_distances(test_docs_dir, person, 'driving_licenses')

In [37]:
driving_license_distances_df = pd.DataFrame(driving_license_distances)
driving_license_distances_df

In [38]:
driving_license_distances_df.to_csv('driving_license_distances.csv', index=False)

In [39]:
true_match = driving_license_distances_df[driving_license_distances_df['match']==True]
false_match = driving_license_distances_df[driving_license_distances_df['match']==False]

plot(true_match.drop(columns=['match','time']), false_match.drop(columns=['match','time']))

In [40]:
dist_field = {'name':'dist', 
              'mean': true_match.dist.mean(), 
              'std': true_match.dist.std(),
              'upper_bound': false_match.dist.mean()}
metrics = get_metrics(true_match.drop(columns='match'), false_match.drop(columns='match'), [dist_field])
best_thresholds = get_best_thresholds(metrics, ['accuracy', 'f1_score'], n=5)
best_thresholds

In [41]:
best_thresholds.to_csv('driving_license_best_thresholds.csv', index=False)

<a id="sub-section-three-three"></a>
## **Passports**

In [42]:
passport_distances = []

for person in tqdm_notebook(os.listdir(os.path.join(test_docs_dir, 'passports'))):
    passport_distances += get_distances(test_docs_dir, person, 'passports')

In [43]:
passport_distances_df = pd.DataFrame(passport_distances)
passport_distances_df

In [44]:
passport_distances_df.to_csv('passport_distances_df.csv', index=False)

In [45]:
true_match = passport_distances_df[passport_distances_df['match']==True]
false_match = passport_distances_df[passport_distances_df['match']==False]

plot(true_match.drop(columns=['match','time']), false_match.drop(columns=['match','time']))

In [46]:
dist_field = {'name':'dist', 
              'mean': true_match.dist.mean(), 
              'std': true_match.dist.std(),
              'upper_bound': false_match.dist.mean()}
metrics = get_metrics(true_match.drop(columns='match'), false_match.drop(columns='match'), [dist_field])
best_thresholds = get_best_thresholds(metrics, ['accuracy', 'f1_score'], n=5)
best_thresholds

In [47]:
best_thresholds.to_csv('passport_best_thresholds.csv', index=False)