In [None]:
import os
import warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # suppress Tensorflow verbose prints
warnings.simplefilter(action='ignore', category=FutureWarning)

%load_ext autoreload
%autoreload 2

from form_train_data import load_data
import matplotlib.pyplot as plt
from scipy import spatial
from pathlib import Path
import tensorflow as tf
from tqdm import tqdm
import pandas as pd
import numpy as np
import itertools
import random
import math
import cv2

data_path = Path.cwd().parent / 'datasets'
train_dataset_path = data_path / 'CASIA-WebFace-112x96'
pairs_count = 1200

In [None]:
print("CASIA pairs for training")
print("LFW pairs for testing")
casia_pairs, lfw_pairs = load_data(data_path)

print("shuffle casia list")
np.random.shuffle(casia_pairs)

casia_pairs_short = []
lfw_pairs_short = []

i_0 = 0 # number of pairs with 'flag' == 0
i_1 = 0
for i, pair in enumerate(casia_pairs):
    if pair['flag'] == 0 and i_0 < (pairs_count // 2):
        casia_pairs_short.append(pair)
        i_0 += 1
    elif pair['flag'] == 1 and i_1 < (pairs_count // 2):
        casia_pairs_short.append(pair)
        i_1 += 1
    if (i_0 + i_1) >= pairs_count:
        break

i_0 = 0
i_1 = 0
for i, pair in enumerate(lfw_pairs):
    if pair['flag'] == 0 and i_0 < (pairs_count // 2):
        lfw_pairs_short.append(pair)
        i_0 += 1
    elif pair['flag'] == 1 and i_1 < (pairs_count // 2):
        lfw_pairs_short.append(pair)
        i_1 += 1
    if (i_0 + i_1) >= pairs_count:
        break
        
# convert to pandas dataframe
test_dataframe_short = pd.DataFrame(lfw_pairs_short)
train_dataframe_short = pd.DataFrame(casia_pairs_short) # takes long time

train_dataframe_short['flag'] = train_dataframe_short['flag'].astype(str)

print("short test dataframe")
print(test_dataframe_short)
print()
print("short train dataframe")
print(train_dataframe_short)

In [None]:
# https://machinelearningmastery.com/how-to-load-large-datasets-from-directories-for-deep-learning-with-keras/
# https://medium.com/@vijayabhaskar96/tutorial-on-keras-flow-from-dataframe-1fd4493d237c
# https://stackoverflow.com/questions/49404993/keras-how-to-use-fit-generator-with-multiple-inputs

def preprocess_image(image):
    image = (image - 127.5) / 128
    return image


def train_generator(train_dataframe, batch_size_):
    class_mode_ = "binary"
    generator = ImageDataGenerator(preprocessing_function=preprocess_image,
                                   validation_split=0.01)
    
    train_generator_X1 = generator.flow_from_dataframe(
                             dataframe=train_dataframe,
                             directory=str(train_dataset_path) + "/",
                             x_col="fileL",
                             y_col="flag",
                             subset="training",
                             batch_size=batch_size_,
                             seed=42,
                             shuffle=True,
                             class_mode=class_mode_,
                             color_mode='rgb',
                             target_size=(112, 96))
    
    train_generator_X2 = generator.flow_from_dataframe(
                             dataframe=train_dataframe,
                             directory=str(train_dataset_path) + "/",
                             x_col="fileR",
                             y_col="flag",
                             subset="training",
                             batch_size=batch_size_,
                             seed=42,
                             shuffle=True,
                             class_mode=class_mode_,
                             color_mode='rgb',
                             target_size=(112, 96))
    while True:
        X1i = train_generator_X1.next()
        X2i = train_generator_X2.next()
        yield [X1i[0], X2i[0]], X1i[1]

In [None]:
from tensorflow.keras.layers import Conv2D, Add, Activation, PReLU, Dense, Input, ZeroPadding2D, Lambda, GlobalAveragePooling2D, Dot 
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.losses import CosineSimilarity
from tensorflow.keras.optimizers import RMSprop
# from keras.engine.topology import Layer
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow import keras

def conv_3_block(input, filters):
    x = ZeroPadding2D(padding=(1, 1))(input)
    x = Conv2D(filters, 3, strides=2, padding='valid', kernel_initializer='glorot_uniform')(x)
    r1 = PReLU()(x)

    x = ZeroPadding2D(padding=(1, 1))(r1)
    x = Conv2D(filters, 3, strides=1, padding='valid', kernel_initializer=TruncatedNormal(stddev=0.01))(x)
    r2 = PReLU()(x)

    x = ZeroPadding2D(padding=(1, 1))(r2)
    x = Conv2D(filters, 3, strides=1, padding='valid', kernel_initializer=TruncatedNormal(stddev=0.01))(x)
    r3 = PReLU()(x)

    x = Add()([r1, r3])
    return x 

def conv_2_block(input, filters):
    x = ZeroPadding2D(padding=(1, 1))(input)
    x = Conv2D(filters, 3, strides=1, padding='valid', kernel_initializer=TruncatedNormal(stddev=0.01))(x)
    x = PReLU()(x)

    x = ZeroPadding2D(padding=(1, 1))(x)
    x = Conv2D(filters, 3, strides=1, padding='valid', kernel_initializer=TruncatedNormal(stddev=0.01))(x)
    x = PReLU()(x)

    x = Add()([input, x])
    return x

def sphereface20(input_shape):
    input = Input(shape=input_shape)
    x = conv_3_block(input, 64)
    x = conv_3_block(x, 128)
    x = conv_2_block(x, 128)
    x = conv_3_block(x, 256)
    x = conv_2_block(x, 256)
    x = conv_2_block(x, 256)
    x = conv_2_block(x, 256)
    x = conv_3_block(x, 512)
    
    x = GlobalAveragePooling2D()(x)
    x = Dense(512, kernel_initializer='glorot_uniform')(x)
    
    model = Model(input, x)
    return model

# https://keras.io/examples/mnist_siamese/
def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))

def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)

# https://stackoverflow.com/questions/51003027/computing-cosine-similarity-between-two-tensors-in-keras
def cosine_distance(vests):
    x, y = vests
    x = K.l2_normalize(x, axis=-1)
    y = K.l2_normalize(y, axis=-1)
    return -K.mean(x * y, axis=-1, keepdims=True)

def cos_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0],1)


# network definition
input_shape = (112, 96, 3)
base_network = sphereface20(input_shape)

input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)

# because we re-use the same instance `base_network`,
# the weights of the network
# will be shared across the two branches
processed_a = base_network(input_a)
processed_b = base_network(input_b)
distance = Lambda(euclidean_distance, output_shape=eucl_dist_output_shape)([processed_a, processed_b])

model = Model([input_a, input_b], distance)

# serialize model to JSON
model_path = str(Path.cwd().parent / 'models' / 'sphereface_20_keras' / 'sphereface_20.json')
model_json = base_network.to_json()
with open(model_path, "w") as json_file:
    json_file.write(model_json)

In [None]:
def accuracy(y_true, y_pred):
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

def custom_loss(yTrue,yPred):
    return K.sum(K.log(yTrue) - K.log(yPred))

# def cosine_similarity(y_true, y_pred):
#     y = tf.constant([c1,c2])
#     x = K.l2_normalize(y_true, -1)
#     y = K.l2_normalize(y_pred, -1)
#     s = K.mean(x * y, axis=-1, keepdims=False) * 10
#     return s

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    0.0001,
    decay_steps=10000,
    decay_rate=0.96,
    staircase=True)
# optimizer_ = keras.optimizers.SGD(learning_rate=lr_schedule)

# rms_ = RMSprop()
optimizer_ = RMSprop(learning_rate=0.001, rho=0.9, epsilon=1e-07)
# loss_ = custom_loss
# loss_ = CosineSimilarity(axis=-1, name='cosine_similarity')

model.compile(loss=contrastive_loss, optimizer=optimizer_, metrics=[accuracy])
# model.compile(loss=loss_, optimizer=rms_, metrics=['accuracy'])

In [None]:
batch_size_ = 32
epochs_ = 1

# https://github.com/keras-team/keras/issues/10855
hist = model.fit(train_generator(train_dataframe_short, batch_size_), epochs=epochs_, shuffle=True)