<a href="https://colab.research.google.com/github/JoseFPortoles/Biometrics/blob/main/fingerprints_siamese_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

Development of a siamese network for fingerprint comparison. The dataset used for training and testing the model is FVC2002 (Second International Competition for Fingerprint Verification Algorithms) and it can be found in http://bias.csr.unibo.it/fvc2002/.

Databases present in FVC2002 are:

* DB1: optical sensor "TouchView II" by Identix
* DB2: optical sensor "FX2000" by Biometrika
* DB3: capacitive sensor "100 SC" by Precise Biometrics
* DB4: synthetic fingerprint generation

Each database contains 880 pictures corresponding to 110 fingers (8 pictures of each finger).


## Import stuff
All the libraries used in this notebook

In [1]:
import cv2
from google.colab.patches import cv2_imshow
import matplotlib.pyplot as plt
import numpy as np
import os
import zipfile
import requests
from sklearn.model_selection import train_test_split
from imgaug import augmenters as iaa
from imgaug.augmentables.segmaps import SegmentationMapOnImage
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import tensorflow.keras
from tensorflow.keras.applications import VGG16
from tensorflow.keras import models
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.layers import Flatten, Concatenate, Dense, Dropout, Subtract, Multiply
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
import random
import tqdm.notebook as tqdm
import pandas as pd
from pandas.plotting import table 
from itertools import product

# Download database

In [2]:
# Paths
url_db1 = 'http://bias.csr.unibo.it/fvc2002/Downloads/DB1_B.zip'
dir_db1 = './FVC2002/DB1_B'
url_db2 = 'http://bias.csr.unibo.it/fvc2002/Downloads/DB2_B.zip'
dir_db2 = './FVC2002/DB2_B'

# Define download function
def download_DBi_B(url, dir):
    ''' Download a DBi_B database from url to dir'''
    zip_name = os.path.basename(url)
    path_zip = os.path.join(dir, zip_name)
    html_db = requests.get(url)

    # Create paths if not already existing
    os.makedirs(dir, exist_ok=True)
    
    # Download zip file
    with open(path_zip, 'wb') as r:
        r.write(html_db.content)
    
    # Unzip database
    with zipfile.ZipFile(path_zip, 'r') as z:
        z.extractall(dir)
    
    # Remove zip file
    os.remove(path_zip)



# Download DBs
download_DBi_B(url_db1, dir_db1)
download_DBi_B(url_db2, dir_db2)

### SETTINGS

In [3]:
# Input height and width
img_h = 224
img_w = 224

# Batch size and Epochs
batch_size = 32
epochs = 30

# Number of training/validation examples (pairs of images) generated
ntrainfiles = 3200
nvalfiles = 3200

## Preprocessing
Here, the folder of interest **DB1_B** of the FP database is used as training folder. Another one **DB3_B** (acquired with a different instrument but still optical acquisition) is used as a validation set in order to assess the ability of the network to generalise, and therefore to perform comparisons of FP not in the database of interest.

This cell resizes the original FP raw images to 224 x 224 pixels so they can be feed to the network during training without wasting GPU time while the CPU resizes the images.

In [5]:
def resize_DB(input, output):
    '''Resize the images at input and put the resized versions in output'''
    # Create output folders if needed
    os.makedirs(output, exist_ok=True)    
    # Resize input images
    for root, _, files in os.walk(input):
        for file in files:
            path = os.path.join(root, file)
            if '.tif' not in file or 'checkpoint' in file:
                print(file)
                continue
            img = cv2.imread(path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            resized_img = cv2.resize(img, (img_h, img_w))
            filename = file.split(sep='.')[0] + '.png'
            outpath = os.path.join(output, filename)
            cv2.imwrite(outpath, resized_img)

# input_dir: Original imagesñ resized_dir: resized
input_dir = '/content/FVC2002/DB2_B'
resized_dir = '/content/FVC2002/resized/DB2_B'

resize_DB(input_dir, resized_dir)

In [20]:
resized_x = os.listdir(resized_dir)
resized_y = []

for filename in resized_x:
    id = filename[1:3]
    id_num = int(id)
    resized_y.append(id_num)

x_train, x_test, y_train, y_test = train_test_split(resized_x, resized_y, test_size=0.35, random_state=42, shuffle=True)

print('x_train: ',len(x_train), '\n', x_train )
print('x_test: ', len(x_test), '\n', x_test)
print('y_train: ', len(y_train), '\n', y_train)
print('y_test: ', len(y_test), '\n', y_test)

x_train:  52 
 ['105_3.png', '110_4.png', '109_3.png', '104_1.png', '103_6.png', '108_4.png', '110_3.png', '102_2.png', '106_5.png', '106_4.png', '101_7.png', '104_2.png', '109_7.png', '104_4.png', '107_4.png', '109_8.png', '101_6.png', '103_4.png', '101_8.png', '102_5.png', '105_7.png', '108_5.png', '108_6.png', '105_2.png', '110_5.png', '103_7.png', '104_3.png', '107_7.png', '107_8.png', '104_6.png', '108_7.png', '106_8.png', '106_2.png', '105_1.png', '102_1.png', '109_5.png', '108_8.png', '102_3.png', '110_8.png', '109_6.png', '110_2.png', '104_8.png', '103_8.png', '103_5.png', '107_3.png', '105_5.png', '101_3.png', '105_4.png', '106_7.png', '102_4.png', '106_3.png', '105_6.png']
x_test:  28 
 ['107_5.png', '101_2.png', '106_6.png', '109_2.png', '105_8.png', '104_7.png', '110_6.png', '107_1.png', '107_6.png', '103_1.png', '101_1.png', '110_1.png', '103_3.png', '107_2.png', '108_1.png', '110_7.png', '109_4.png', '102_6.png', '109_1.png', '108_3.png', '106_1.png', '102_8.png', '102_7.

## Data generation
Here the iterators which generate the batches for training and validation are defined.

It starts by loading the images of the relevant database to a numpy array. The network works by comparing a pair of images and producing a similarity score between 0 and 1. Therefore the generator returns pairs of image batches x1 and x2 as well as the corresponding batch of labels y. Each label is either 0 (FPs belonging to different individuals) or 1 (FPs belonging to the same individual).

Pairs of FP are produced by pairing randomly images from the database. The field **samefreq** is used to balance the dataset as it controls the ratio of same to different individual pairs.

I have used the package **imgaug** to augment the images by producing geometric transformations to each image so the dataset has more variability and training makes the network more able to generalise. 

In [10]:
class FPData(tensorflow.keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, img_pathlist, augmentation=True, dataset_size=3200, img_channels=3):
        self.batch_size = batch_size
        self.img_size = img_size
        self.img_channels = img_channels
        self.img_pathlist = img_pathlist
        self.seq = iaa.Sequential(
            [
             iaa.Fliplr(0.5),
             iaa.Flipud(0.5),
             #iaa.Sometimes(0.5, iaa.GaussianBlur(sigma=(0, 0.5))),
             #iaa.ContrastNormalization((0.75, 1.5)),
             #iaa.AdditiveGaussianNoise(
             #    loc=0, scale=(0.0, 0.05 * 255), per_channel=0.5),
             #iaa.Multiply((0.8, 1.2), per_channel=0.2),
             iaa.Affine(
                 scale={
                     "x": (0.8, 1.2),
                     "y": (0.8, 1.2)
                },
                translate_percent={
                    "x": (-0.1, 0.1),
                    "y": (-0.1, 0.1)
                },
                rotate=(-5, 5),
                shear=(-8, 8)
                )
             ],
             random_order=True)
        self.augmentation = augmentation
        # How often identical images are presented to the network
        self.samefreq = 0.5
        # Load database of images/id labels
        DB = self.load_DB()
        self.img_DB = DB[0]
        self.labels_DB = DB[1]
        self.dataset_size = dataset_size

    def __len__(self):
        return self.dataset_size // self.batch_size

    def __getitem__(self, idx):
        ret = []
        x1 = np.zeros((self.batch_size,) + self.img_size + (self.img_channels,), dtype="float32")
        x2 = np.zeros((self.batch_size,) + self.img_size + (self.img_channels,), dtype="float32")
        y = np.zeros((self.batch_size,), dtype="uint8")
        img_db = self.img_DB
        labels_db = self.labels_DB
        for i in range(self.batch_size):
            # Pick a random entry in db (img1)
            img1_idx = random.randint(0, img_db.shape[0]-1)
            if random.random() < self.samefreq:
                # Read individual id
                id = labels_db[img1_idx]
                # Find what entries belong to the same individual
                id_args = np.argwhere(labels_db == id)
                # set label for same individual
                label = 1
            else:
                # Read individual id
                id = labels_db[img1_idx]
                # Find what entries belong to a different individual
                id_args = np.argwhere(labels_db != id)
                # set label for different individual 
                label = 0
            # pick one of the allowed entries at random (img2)
            img2_idx = random.sample(id_args.tolist(), 1)
            # fetch images
            img1 = img_db[img1_idx]
            img2 = img_db[img2_idx]
            # insert images and label in the batch
            x1[i] = img1
            x2[i] = img2
            y[i] = label
            ret.append(([x1[i], x2[i]]))
        return [x1, x2], y
        #return ret, y

    def load_DB(self):
        pathlist = self.img_pathlist
        img_DB = np.zeros((len(pathlist),) + self.img_size + (self.img_channels,), dtype=np.float32)
        labels_DB = np.zeros((len(pathlist),), dtype=np.uint8)
        for i, path in enumerate(pathlist):
            dir, filename = os.path.split(path)
            # Individual ID
            id = int(filename[1:3])
            # Image
            img = load_img(path, color_mode='rgb')
            img_array = img_to_array(img)/255.
            # Add image/label to database
            img_DB[i] = img_array
            labels_DB[i] = id
        return img_DB, labels_DB

In [11]:
def get_pathlist(dir):
    pathlist = []
    for root, _, files in os.walk(dir):
        for file in files:
            path = os.path.join(root, file)
            if '.png' not in file or 'checkpoint' in file:
                print('Not included in pathlist: ', file)
                continue
            pathlist.append(path)
    return pathlist
            


train_pathlist = get_pathlist(output_train)
val_pathlist = get_pathlist(output_val)

random.shuffle(train_pathlist)
random.shuffle(val_pathlist)

trainsteps_epoch = ntrainfiles//batch_size
valsteps_epoch = nvalfiles//batch_size

train_gen = FPData(
    batch_size,
    (img_h,img_w),
    train_pathlist,
    augmentation=True,
    dataset_size=ntrainfiles
)

val_gen = FPData(
    batch_size, 
    (img_h,img_w), 
    val_pathlist,
    augmentation=False,
    dataset_size=nvalfiles)

## Model architecture
The concept of choice here has been a siamese network, this is, two identical CNN networks that extract features from two corresponding input images. The networks are joined by the fully-connected head in order to produce a single similarity score for the comparison of the two images. 

The way I have implemented this is by using a VGG16 network as the CNN base of the architecture. The VGG16 is pretrained on Imagenet so it can exploit the learned features in learning this new task (transfer learning). 

In [12]:
def siamese_model():
    input_1 = layers.Input(shape=(img_h, img_w, 3))
    input_2 = layers.Input(shape=(img_h, img_w, 3))

    base_model = VGG16(weights='imagenet', include_top=False, input_shape=(img_w, img_h, 3))

    for layer in base_model.layers:
        layer.trainable = False

    x1 = base_model(input_1)
    x2 = base_model(input_2)

    v1 = Flatten()(x1)
    v2 = Flatten()(x2)

    x = Concatenate(axis=-1)([v1, v2])
    x = Dense(128, activation="relu")(x)
    x = Dropout(0.3)(x)
    out = Dense(1, activation="sigmoid")(x)

    model = Model([input_1, input_2], out)

    model.compile(loss="binary_crossentropy", metrics=['acc'], optimizer=Adam(1e-5))

    model.summary()

    plot_model(model, "./model.png", show_shapes=True)

    return model

model = siamese_model()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_4 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
input_5 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
vgg16 (Functional)              (None, 7, 7, 512)    14714688    input_4[0][0]                    
                                                                 input_5[0][0]                    
__________________________________________________________________________________________________
flatten_2 (Flatten)             (None, 25088)        0           vgg16[0][0]                

In [13]:
callbacks = [
    tensorflow.keras.callbacks.ModelCheckpoint(
        "siamese.h5",
        monitor = 'loss',
        save_best_only=True,
    )]

history = model.fit_generator(train_gen,
                    trainsteps_epoch,                 
                    validation_data=val_gen,
                    epochs=epochs,
                    verbose=1,
                    callbacks = callbacks
                    )



Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30

KeyboardInterrupt: ignored

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
acc = history.history['acc']
val_acc = history.history['val_acc']
epoch = [x for x in range(len(loss))]
print(epoch)

plt.figure()
plt.title("Siamese network, loss")
plt.plot(epoch, val_loss, label = "validation loss")
plt.plot(epoch, loss, label = "train loss")
plt.xlabel("Epoch")
plt.ylabel("loss")
plt.legend()
plt.savefig('./figure1')

plt.figure()
plt.title("Siamese network, accuracy")
plt.plot(epoch, val_acc, label = "validation accuracy")
plt.plot(epoch, acc, label = "train accuracy")
plt.xlabel("Epoch")
plt.ylabel("accuracy")
plt.legend()
plt.savefig('./figure2')

In [None]:
#model = models.load_model('siamese.h5')
img_p = load_img('/content/fp_perpetrator.png', color_mode='rgb')
img_p_array = img_to_array(img_p)
img_p_array = cv2.resize(img_p_array, (img_h,img_w))/255.
print(img_p_array.shape)
img_q = load_img('/content/DB1_B/107_8.png', color_mode='rgb')
img_q_array = img_to_array(img_q)/255.
print(img_q_array.shape)
print(model.predict([np.array([img_p_array]), np.array([img_q_array])], batch_size=1))
plt.figure(figsize=(8,4))
plt.subplot(121)
plt.imshow(img_p_array)
plt.subplot(122)
plt.imshow(img_q_array)

## Fine tuning

In [None]:
# Thaw VGG16's top block for fine tuning
base_model = model.layers[2]
block5_layers = [layer for layer in base_model.layers[-4:-1]]
for layer in block5_layers:
    layer.trainable = True
    print(layer._name, " -> Trainable")

model.summary()

### Perform fine tuning

Fine-tune at a lower learning rate

In [None]:
model.compile(loss="binary_crossentropy", metrics=['acc'], optimizer=Adam(1e-5))

callbacks = [
    tensorflow.keras.callbacks.ModelCheckpoint(
        "siamese_fine-tuned.h5",
        monitor = 'loss',
        save_best_only=True,
    )]

history_fine_tuning = model.fit_generator(train_gen,
                    trainsteps_epoch,                 
                    validation_data=val_gen,
                    epochs=epochs,
                    verbose=1,
                    callbacks = callbacks
                    )

In [None]:
loss = history_fine_tuning.history['loss']
val_loss = history_fine_tuning.history['val_loss']
acc = history_fine_tuning.history['acc']
val_acc = history_fine_tuning.history['val_acc']
epoch = [x for x in range(len(loss))]
print(epoch)

plt.figure()
plt.title("Fine-tuning siamese network, loss")
plt.plot(epoch, val_loss, label = "validation loss")
plt.plot(epoch, loss, label = "train loss")
plt.xlabel("Epoch")
plt.ylabel("loss")
plt.legend()
plt.savefig('./figure1')

plt.figure()
plt.title("Fine-tuning siamese network, accuracy")
plt.plot(epoch, val_acc, label = "validation accuracy")
plt.plot(epoch, acc, label = "train accuracy")
plt.xlabel("Epoch")
plt.ylabel("accuracy")
plt.legend()
plt.savefig('./figure2')

In [None]:
#model = models.load_model('siamese.h5')
img_p = load_img('/content/fp_perpetrator.png', color_mode='rgb')
img_p_array = img_to_array(img_p)
img_p_array = cv2.resize(img_p_array, (img_h,img_w))/255.
print(img_p_array.shape)
img_q = load_img('/content/DB1_B/104_1.png', color_mode='rgb')
img_q_array = img_to_array(img_q)/255.
print(img_q_array.shape)
print(model.predict([np.array([img_p_array]), np.array([img_q_array])], batch_size=1))
plt.figure(figsize=(8,4))
plt.subplot(121)
plt.imshow(img_p_array)
plt.subplot(122)
plt.imshow(img_q_array)

In [None]:
#model = models.load_model('siamese_fine-tuned.h5')

def similarity_prediction(img1, img2, model):
    pred = model.predict([np.array([img1]), np.array([img2])])
    pred = pred[0][0]
    return pred

data_gen = FPData(32, (224,224), train_pathlist)
images_db = data_gen.img_DB
labels_db = data_gen.labels_DB


In [None]:
img_p = load_img('/content/fp_perpetrator.png', color_mode='rgb')
img_p_array = img_to_array(img_p)
img_p_array = cv2.resize(img_p_array, (img_h,img_w))/255.

data = []
for label, image in zip(labels_db, images_db):
    data.append([str(label), similarity_prediction(img_p_array, image, model)])




In [None]:
perpetrator_df = pd.DataFrame(data, columns=['label', 'd'])

In [None]:
perpetrator_df[0:17]

In [None]:
# Function to plot mean scores and errors
def plot_mean_error(mean_series, error_series, figsize=(8,4)):
    mean_series.combine(error_series, lambda x,y: str(x)[:7]+"+/-"+str(y)[:7]).sort_values(ascending=False)
    plt.figure(figsize=figsize)
    ax = mean_series.plot(yerr=error_series)
    ax.set_xlabel('Individual')
    ax.set_ylabel('Mean score')
    ax.set_xticks(range(10))
    ax.set_xticklabels(mean_series.sort_values(ascending=False).index)
    
fp_mean = perpetrator_df.groupby('label').d.mean().sort_values(ascending=False)
fp_std = perpetrator_df.groupby('label').d.std()
fp_mean.combine(fp_std, lambda x,y: str(x)[:7]+"+/-"+str(y)[:7]).sort_values(ascending=False)

plot_mean_error(fp_mean, fp_std)