### Configuration
#### Load dependencies

In [24]:
import cv2
import os
import face_recognition
from PIL import Image
import random
import numpy as np
import matplotlib.pyplot as plt
import uuid

# import tf dependencies
import tensorflow as tf
from keras.models import Model
from keras.layers import Layer, Input, Conv2D, MaxPooling2D, GlobalAveragePooling2D, Flatten, BatchNormalization, Dropout, Dense, LeakyReLU
from keras.optimizers import Adam
from sklearn.metrics import classification_report



#### Config GPU

In [2]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    print(gpu)

#### Define constants

In [3]:
POSITIVE_PATH = os.path.normpath(os.path.join('data', 'positive'))
NEGATIVE_PATH = os.path.normpath(os.path.join('data', 'negative'))
ANCHOR_PATH = os.path.normpath(os.path.join('data', 'anchor'))
PIXELS_SIZE = 100

#### Create directories

In [4]:
os.makedirs(POSITIVE_PATH, exist_ok=True)
os.makedirs(NEGATIVE_PATH, exist_ok=True)
os.makedirs(ANCHOR_PATH, exist_ok=True)


### Data collection

In [5]:
cap = cv2.VideoCapture(0)
face_locations = None
capture_mode = None

while cap.isOpened():
    ret, frame = cap.read()
    if face_locations is None:
        face_locations = face_recognition.face_locations(frame)
        t, r, b, l = face_locations[0]
        y_margin = 250 - abs(t-b)
        x_margin = 250 - abs(l-r)
     # get 250 x 250 image basing on initial face position
    frame = frame[t-y_margin//2: b+y_margin//2, l - x_margin//2:r+x_margin//2, :]

    key = cv2.waitKey(1) & 0XFF
    # Change mode
    if key == ord('a'):
        capture_mode = 'a'
    elif key == ord('p'):
        capture_mode = 'p'
    elif key == ord('q'):
        break

    # collect samples based on current mode
    if capture_mode == 'a':
        cv2.imwrite(os.path.join(ANCHOR_PATH, f'{uuid.uuid1()}.jpg'), frame)
    elif capture_mode == 'p':
        cv2.imwrite(os.path.join(POSITIVE_PATH, f'{uuid.uuid1()}.jpg'), frame)

    cv2.imshow('Image collection', frame)

cap.release()
cv2.destroyAllWindows()

### Image preprocessing

In [6]:
anchor = tf.data.Dataset.list_files(os.path.join(ANCHOR_PATH, '*.jpg'))
positive = tf.data.Dataset.list_files(os.path.join(POSITIVE_PATH, '*.jpg'))
negative = tf.data.Dataset.list_files(os.path.join(NEGATIVE_PATH, '*.jpg'))

#### Define functions

In [7]:
def preprocess(path_to_file):
    """Function that read file from file, resize and rescale numpy array"""
    img = tf.io.decode_jpeg(tf.io.read_file(path_to_file))
    return tf.image.resize(img, (PIXELS_SIZE, PIXELS_SIZE)) / 255

def preprocess_twin(input_img, val_img, label):
    return preprocess(input_img), preprocess(val_img), label

#### Define dataset

In [8]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [9]:
data = data.map(preprocess_twin)
data = data.cache()
data.shuffle(buffer_size=1024)

<_ShuffleDataset element_spec=(TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(100, 100, None), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.float32, name=None))>

In [10]:
TRAIN_RATIO = 0.7
TEST_RATIO = 1 - TRAIN_RATIO

In [11]:
#### Split dataset

In [12]:
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

test_data = data.skip(round(len(data)*TRAIN_RATIO))
test_data = test_data.take(round(len(data)*TEST_RATIO))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

### Define the model
#### Create embedding layers

In [13]:
def make_embedding():
    inp = Input(shape=(PIXELS_SIZE,PIXELS_SIZE,3), name='input_image')
    leaky_relu = LeakyReLU()

    # First block
    c1 = Conv2D(64, (10,10))(inp)
    c1 = leaky_relu(c1)
    c1 = BatchNormalization()(c1)
    m1 = MaxPooling2D((2,2), padding='same')(c1)

    # Second block
    c2 = Conv2D(128, (7,7))(m1)
    c2 = leaky_relu(c2)
    c2 = BatchNormalization()(c2)
    m2 = MaxPooling2D((2,2), padding='same')(c2)

    # Third block
    c3 = Conv2D(128, (4,4))(m2)
    c3 = leaky_relu(c3)
    c3 = BatchNormalization()(c3)
    m3 = MaxPooling2D((2,2), padding='same')(c3)

    # Final embedding block
    c4 = Conv2D(256, (4,4))(m3)
    c4 = leaky_relu(c4)
    c4 = BatchNormalization()(c4)


    f1 = Flatten()(c4)
    d1 = Dense(1024, activation='relu')(f1)
    d1 = Dropout(0.5)(d1)
    d2 = Dense(4096, activation='sigmoid')(d1)

    return Model(inputs=[inp], outputs=[d2], name='embedding')

In [14]:
embedding = make_embedding()
embedding.summary()

Model: "embedding"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_image (InputLayer)       [(None, 100, 100, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 91, 91, 64)   19264       ['input_image[0][0]']            
                                                                                                  
 leaky_re_lu (LeakyReLU)        multiple             0           ['conv2d[0][0]',                 
                                                                  'conv2d_1[0][0]',               
                                                                  'conv2d_2[0][0]',       

#### A distance layer - Siamese Distance class

In [15]:
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super().__init__()

    def call(self, input_emb, val_emb):
        return tf.math.abs(input_emb-val_emb)

#### Create Siamese model

In [16]:
def make_siamese_model(num_classes=None):
    """
    Create a siamese network model.
    This network will have two inputs (anchor, validation),
    each one goes through the same embedding model and the output embeddings are compared.
    """
    # Anchor image input in the network
    input_img = Input(shape=(PIXELS_SIZE,PIXELS_SIZE,3), name='anchor_input')

    # Validation image in the network
    val_img = Input(shape=(PIXELS_SIZE,PIXELS_SIZE,3), name='validation_input')
    # Combine siamese distance components
    siam_layer =  L1Dist(name='distance')
    distances =siam_layer(embedding(input_img), embedding(val_img))

    if num_classes == None:
        classifier = Dense(1, activation='sigmoid')(distances)
    else:
        output = Dense(num_classes, activation='softmax')(distances)


    return Model(inputs=[input_img, val_img], outputs=classifier, name='SiameseNetwork')


In [17]:
siamese_model = make_siamese_model()

### Training

In [18]:
bce_loss = tf.losses.BinaryCrossentropy()
opt = Adam(0.0001)
checkpt_pref = os.path.normpath(os.path.join('checkpoint', 'checkpt'))
checkpt = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

In [19]:
@tf.function
def train_step(batch):
    with tf.GradientTape() as tape:
        # get anchor & pos/neg image and label
        X = batch[:2]
        y = batch[2]

        y_pred= siamese_model(X, training=True) #
        loss = bce_loss(y, y_pred) # get loss
    # get gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    # update weights
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))

    return loss

#### Train step function

In [20]:
def train(train_data, EPOCHS=50):
    for epoch in range(EPOCHS):
        print(f'Epoch {epoch+1}')
        progbar = tf.keras.utils.Progbar(len(train_data))

        for idx, batch in enumerate(train_data):
            train_step(batch)
            progbar.update(idx+1)

        if not epoch % 10:
            checkpt.save(file_prefix=checkpt_pref)

### Train

In [26]:
train(train_data=train_data)

In [None]:
test_inp, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
#### Save model

In [None]:
siamese_model.save(os.path.join('models', 'siamesemodel.h5'))
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})