# Load Files
Loading:
*   pairsDevTest.txt
*   pairsDevTrain.txt
*   lfwa.zip



In [None]:
!unzip 'lfwa.zip'

# Imports

In [17]:
import cv2
import numpy as np
from tensorflow.keras.layers import Dense, Conv2D,Lambda, Flatten, MaxPooling2D, BatchNormalization, Activation
from tensorflow.keras.models import Sequential,Model
from tensorflow.keras import initializers
from tensorflow.keras import Input
from tensorflow.keras.regularizers import l2
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.optimizers import Adam, SGD, Nadam
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import TensorBoard


import gc
import time
import cv2
import os

import google.colab.patches as colab_cv2


# Data Loader

In [4]:
def load_data(train_path, test_path, image_size, image_path):
    """
      Description: loads the data, splits it to labels and features 

      Input: 
        train_path - the path to the train dataset 
        test_path - the path to the test dataset
        image_path - path to image folders
        image_size - the image size 

      Output:
        train, test, train_y, test_y - numpy arrays of the train and test datasets, including the images

    """
    train_equal, train_not_equal = load_files(train_path, image_path, image_size)
    test_equal, test_not_equal = load_files(test_path, image_path, image_size)

    train_y = np.ones(len(train_equal))
    train_y_not_equal = np.zeros(len(train_not_equal))
    train_y = np.concatenate((train_y, train_y_not_equal), axis=0)
    train = np.concatenate((train_equal, train_not_equal), axis=0)
    indices = np.arange(train_y.shape[0])
    np.random.shuffle(indices)
    train = train[indices]
    train_y = train_y[indices]

    test_y = np.ones(len(test_equal))
    test_y_not_equal = np.zeros(len(test_not_equal))
    test_y = np.concatenate((test_y, test_y_not_equal), axis=0)
    test = np.concatenate((test_equal, test_not_equal), axis=0)
    indices = np.arange(test_y.shape[0])
    np.random.shuffle(indices)
    test = test[indices]
    test_y = test_y[indices]   
    
    train = [train[:, 1, :, :, :], train[:, 0, :, :, :]]
    test = [test[:, 1, :, :, :], test[:, 0, :, :, :]]

    return train, test, train_y, test_y

def load_images(sample, equal, image_path, image_size):
    """
      Description: loads pairs of images (equal or not)

      Input: 
        sample - string of pairs to load
        equal - indication on whether the pair of persons in the sample are equal or not
        image_path - path to image folders
        image_size - the image size 

      Output:
        first_image - the image of the first person 
        second_image - the image of the second person

    """
    sample = sample.split("\t")
    first_person_name = sample[0]
    first_image_name = str(int(sample[1])).zfill(4)

    if equal:
        second_person_name = first_person_name
        second_image_name = str(int(sample[2])).zfill(4)
    else:
        second_person_name = sample[2]
        second_image_name = str(int(sample[3])).zfill(4)

    first_image_path = image_path + first_person_name + "/" + first_person_name + "_" + first_image_name + ".jpg"
    first_image = cv2.imread(first_image_path, 0)
    first_image = cv2.resize(first_image, (image_size, image_size))
    first_image = np.array(first_image).reshape(image_size, image_size, 1)
    second_image_path = image_path + second_person_name + "/" + second_person_name + "_" + second_image_name + ".jpg"
    second_image = cv2.imread(second_image_path, 0)
    second_image = cv2.resize(second_image, (image_size, image_size))
    second_image = np.array(second_image).reshape(image_size, image_size, 1)

    return [first_image, second_image]


def load_files(file_path, image_path, image_size):
    """
      Description: splits the text file into equal and non equal lists of pairs

      Input: 
        file_path - path to the samples data (train or test)
        image_path - path to image folders
        image_size - the image size 

      Output:
        data_equals - list of all equal pairs
        data_not_equals - list of all non equal pairs

    """
    data_equals = []
    data_not_equals = []
    i = 0
    with open(file_path) as fp:
        line = fp.readline()
        amount_of_samples = int(line) * 2
        while i < amount_of_samples:
            i_sample = fp.readline()
            if i < amount_of_samples / 2:
                data_equals.append(load_images(i_sample, True, image_path, image_size))
            else:
                data_not_equals.append(load_images(i_sample, False, image_path, image_size))
            i += 1
    return data_equals, data_not_equals


# Siamese model (according to the architecture in the paper)

In [9]:
def create_model_with_batchnorm(image_shape, optimizer):
    """
      Description: creates the Siamese network with batchnorm layers - between convolution layers and non liniearites activation layers

      Input: 
        impage_shape - the images shape 
        optimizer - the optimizer to use in the training 

      Output:
        siamese_net - the network model

    """
    w_init = initializers.RandomNormal(mean=0.0, stddev=0.01)
    b_init = initializers.RandomNormal(mean=0.5, stddev=0.01)
    first_input = Input(image_shape)
    second_input = Input(image_shape)
    model = Sequential()

    # input layer - first convolution
    model.add(Conv2D(64, (10,10), input_shape=image_shape,
                      kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # second convolution layer
    model.add(Conv2D(128, (7,7), input_shape=image_shape,
                    bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # third convolution layer
    model.add(Conv2D(128, (4,4), input_shape=image_shape,
                    bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # fourth convolution layer
    model.add(Conv2D(256, (4,4), input_shape=image_shape,
                bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))

    # fully connected layer
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',
                  kernel_regularizer=l2(2e-4)
            , bias_initializer=b_init,  kernel_initializer=w_init))

    encoded_f = model(first_input)
    encoded_s = model(second_input)
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_f, encoded_s])
    prediction = Dense(1,activation='sigmoid',bias_initializer=b_init, kernel_initializer=w_init)(L1_distance)
    siamese_net = Model(inputs=[first_input,second_input],outputs=prediction)

    siamese_net.compile(loss='binary_crossentropy', metrics=['binary_accuracy'],
                      optimizer=optimizer)
    return siamese_net

In [10]:
def create_model(image_shape, optimizer):
    """
      Description: creates the Siamese network according to the architecture in the paper

      Input: 
        impage_shape - the images shape 
        optimizer - the optimizer to use in the training 

      Output:
        siamese_net - the network model

    """
    w_init = initializers.RandomNormal(mean=0.0, stddev=0.01)
    b_init = initializers.RandomNormal(mean=0.5, stddev=0.01)
    first_input = Input(image_shape)
    second_input = Input(image_shape)
    model = Sequential()

    # input layer - first convolution
    model.add(Conv2D(64, (10,10), activation='relu', input_shape=image_shape,
                      kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # second convolution layer
    model.add(Conv2D(128, (7,7), activation='relu', input_shape=image_shape,
                    bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # third convolution layer
    model.add(Conv2D(128, (4,4), activation='relu', input_shape=image_shape,
                    bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D(pool_size=(2,2)))


    # fourth convolution layer
    model.add(Conv2D(256, (4,4), activation='relu', input_shape=image_shape,
                bias_initializer=b_init,  kernel_initializer=w_init, kernel_regularizer=l2(2e-4)))

    # fully connected layer
    model.add(Flatten())
    model.add(Dense(4096, activation='sigmoid',kernel_regularizer=l2(2e-4),
                    bias_initializer=b_init,  kernel_initializer=w_init))

    encoded_f = model(first_input)
    encoded_s = model(second_input)
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    L1_distance = L1_layer([encoded_f, encoded_s])
    prediction = Dense(1,activation='sigmoid',bias_initializer=b_init, kernel_initializer=w_init)(L1_distance)
    siamese_net = Model(inputs=[first_input,second_input],outputs=prediction)

    siamese_net.compile(loss='binary_crossentropy', metrics=['binary_accuracy'],
                      optimizer=optimizer)
    return siamese_net

In [27]:
import datetime

def train_model(train, test, y_train, y_test, image_shape, learning_rate, optimizer_name, max_epochs, batch_size, use_batchnorm):
    """
      Description: training the Siamese network

      Input: 
        train - train dataset
        test -  test dataset
        y_train -  train labels
        y_test - test labels
        image_shape - the images shape
        learning_rate - the learning rate 
        optimizer_name - the optimizer to use for the model training
        max_epochs -  the maximum amount of ephocs
        batch_size - the batch size for the input in the network
        use_batchnorm - boolean variable, if the CNN network will contain batch normalization layers
      Output:
        history - the history of the trained model along the training process
        model - the trained model 

    """
    start = time.time()
    optimizer = get_optimizer(optimizer_name, learning_rate)
    model = create_model(image_shape, optimizer)
    if use_batchnorm:
      model = create_model_with_batchnorm(image_shape, optimizer) 

    # Define Tensorboard as a Keras callback
    logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    tensorboard = TensorBoard(
      log_dir= logdir,
      histogram_freq=1)      
    callbacks = [
        tensorboard,
        EarlyStopping(monitor='val_loss',
                      min_delta=0,
                      patience=0,
                      verbose=0, mode='auto'), ]
    history = model.fit(train, y_train, epochs=max_epochs, batch_size=batch_size, validation_split=0.1, verbose=1,
                        callbacks=callbacks)
    loss, acc = model.evaluate(test, y_test, verbose=0)
    history = history.history
    history["test_binary_accuracy"] = acc
    history["test_loss"] = loss
    end = time.time()
    print(end - start)
    history["time"] = end - start

    return history, model

# Utils

In [28]:
def get_optimizer(optimizer_name, learning_rate):
  """
    Description: this is an help function that returns the optimizer object

    Input:
      optimizer_name - the optimizer name 
      learning_rate - the learning rate for the optimizer init

    Output: 
      The optimizer object
  """ 
  if optimizer_name == 'adam':
      return Adam(lr=learning_rate)
  if optimizer_name == "nadam":
      return Nadam(lr=learning_rate)
  else:
      return SGD(lr=learning_rate, momentum=0.9)


def create_accuracy_vs_epoch_plot(learning_rates_history, learning_rates):
  """
    Description: plots the graphs of the accuracy vs epochs values

    Input:
      learning_rates_history - the history of loss values of the model along the training process 
      learning_rate - the list of examined learning rates

    Output: 
      the plots
  """
  for learning_rate in learning_rates:
    fig, axs = plt.subplots(1, 2, sharex=False, sharey=False, figsize=(20, 5))
    j = -1
    optimizer_history = learning_rates_history[learning_rate]
    for optimizer in optimizer_history:
        if j == 2:
            j = 0
        else:
            j += 1
        axs[j].plot(optimizer_history[optimizer]['history']['binary_accuracy'])
        axs[j].plot(optimizer_history[optimizer]['history']['val_binary_accuracy'])
        axs[j].set_title(optimizer)
        axs[j].legend(['train', 'validation'], loc='upper right')
        axs[j].set(ylabel='accuracy', xlabel='epoch')

    title = 'accuracy vs epoch, learning rate =' + str(learning_rate)
    fig.suptitle(title, fontsize=16)
    plt.savefig(f'accuracy_vs_epoch_plot_lr_{learning_rate}.png')


def create_loss_vs_epoch_plot(learning_rates_history, learning_rates):
  """
    Description: plots the graphs of the loss vs epochs values

    Input:
      learning_rates_history - the history of loss values of the model along the training process 
      learning_rate - the list of examined learning rates

    Output: 
      the plots
  """
  for learning_rate in learning_rates:
    fig, axs = plt.subplots(1, 2, sharex=False, sharey=False, figsize=(20, 5))
    j = -1
    optimizer_history = learning_rates_history[learning_rate]
    for optimizer in optimizer_history:
        if j == 2:
            j = 0
        else:
            j += 1
        axs[j].plot(optimizer_history[optimizer]['history']['loss'])
        axs[j].plot(optimizer_history[optimizer]['history']['val_loss'])
        axs[j].set_title(optimizer)
        axs[j].legend(['train', 'validation'], loc='upper right')
        axs[j].set(ylabel='loss', xlabel='epoch')

    title = 'loss vs epoch, learning rate =' + str(learning_rate)
    fig.suptitle(title, fontsize=16)
    plt.savefig(f'loss_vs_epoch_plot_lr_{learning_rate}.png')

def print_results_summary(learning_rates_history, learning_rates):
  """
    Description: prints a summary of the results for all the models

    Input:
      history - the history of the model results along the training process
      learning_rates - the list of examined learning rates

    Output: 
      a summary of the results
  """
  for learning_rate in learning_rates:
      optimizer_history = learning_rates_history[learning_rate]
      for optimizer in optimizer_history:
          history = optimizer_history[optimizer]['history']

          results = ""
          results += 'learning rate: ' + str(learning_rate) + "," 
          results += "optimizer: " + str(optimizer).upper() + ","
          results += 'Epochs to early stop: ' + str(len(history['loss'])) + ","
          results += 'time: ' + str(history['time']) + ","
          results += 'train_loss: ' + str(history['loss'][-1]) + ","
          results += 'train_acc: ' + str(history['binary_accuracy'][-1]) + ","
          results += 'val_loss: ' + str(history['val_loss'][-1]) + ","
          results += 'val_acc: ' + str(history['val_binary_accuracy'][-1]) + ","
          results += 'test_loss: ' + str(history['test_loss']) + ","
          results += 'test_acc: ' + str(history['test_binary_accuracy'])

          print(results)

def create_classifications_examples(image_shape, test, y_test, model):
  """
    Description: prints examples of misclassifications and right classifications

    Input:
      image_shape - the images size 
      test - the test dataset
      y_test -  the testset labels
      model - the model in which the function will search for the examples

    Output: 
      images of misclassifications and right classifications
  """
  y_tag = model.predict(test)
  y_tag[y_tag < 0.5] = 0
  y_tag[y_tag >= 0.5] = 1

  predicted_N_wrong = []
  predicted_Y_wrong = []
  predicted_N_right = []
  predicted_Y_right = []

  for i in range(len(y_tag)):
      y_predict = y_tag[i][0]
      true_label = y_test[i]

      if y_predict < true_label:
          predicted_N_wrong.append(i)
      elif y_predict > true_label:
          predicted_Y_wrong.append(i)
      elif y_predict == 1:
          predicted_Y_right.append(i)
      elif y_predict == 0:
          predicted_N_right.append(i)

  print("Predicted not equals but equals")
  print(len(predicted_N_wrong))
  print("Predicted equals but not equals")
  print(len(predicted_Y_wrong))
  print("Predicted equals and equals")
  print(len(predicted_Y_right))
  print("Predicted not equals and not equals")
  print(len(predicted_N_right))

  if (len(predicted_N_wrong) > 0):
      print("Misclassification - predicted not equals but equals")
      colab_cv2.cv2_imshow(test[0][predicted_N_wrong[len(predicted_N_wrong) // 2]])
      colab_cv2.cv2_imshow(test[1][predicted_N_wrong[len(predicted_N_wrong) // 2]])
  if (len(predicted_Y_wrong) > 0):
      print("Misclassification - predicted equals but not equals")
      colab_cv2.cv2_imshow(test[0][predicted_Y_wrong[len(predicted_Y_wrong) // 2]])
      colab_cv2.cv2_imshow(test[1][predicted_Y_wrong[len(predicted_Y_wrong) // 2]])

  if (len(predicted_N_right) > 0):
      print("Right classification - predicted not equals and not equals")
      colab_cv2.cv2_imshow(test[0][predicted_N_right[len(predicted_N_right) // 2]])
      colab_cv2.cv2_imshow(test[1][predicted_N_right[len(predicted_N_right) // 2]])

  if (len(predicted_Y_right) > 0):
      print("Right classification - predicted equals and equals")
      colab_cv2.cv2_imshow(test[0][predicted_Y_right[len(predicted_Y_right) // 2]])
      colab_cv2.cv2_imshow(test[1][predicted_Y_right[len(predicted_Y_right) // 2]])



# Main

In [None]:
from collections import defaultdict
all_models_histories= defaultdict(lambda: defaultdict(dict))
all_models = defaultdict(lambda: defaultdict(dict))
learning_rates = [0.0005, 5e-05, 5e-06]
optimizers = ["nadam", "adam"]

image_size = 105
shape = (image_size, image_size, 1)
epochs_size = 100
batch_s = 100
dropout = 0.3

test_path="pairsDevTest.txt" 
train_path="pairsDevTrain.txt" 
image_path="lfw2/lfw2/"
verbose = 0
use_batchnorm = True

train, test, y_train, y_test = load_data(train_path, test_path, image_size, image_path)

for learning_rate in learning_rates:
  for optimizer in optimizers:
    history, model = train_model(train, test, y_train, y_test, shape, learning_rate, optimizer, epochs_size, batch_s, use_batchnorm)
    all_models_histories[learning_rate][optimizer]['history'] = history
    all_models[learning_rate][optimizer]['model'] = model


In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir logs

# Report and visualizations

In [None]:
print_results_summary(all_models_histories, learning_rates)

In [None]:
create_loss_vs_epoch_plot(all_models_histories, learning_rates)

In [None]:
create_accuracy_vs_epoch_plot(all_models_histories, learning_rates)

# Classification examples

In [None]:
best_model = all_models[0.0005]['nadam']['model']
create_classifications_examples(shape, test, y_test, best_model)

# Data exploration

In [None]:
print(f'Number of pairs in train: {len(y_train)}')
print(f'Number of equal pairs in train: {len(y_train[y_train==1])}')
print(f'Number of not equal pairs in train: {len(y_train[y_train==0])}')

print(f'Number of pairs in test: {len(y_test)}')
print(f'Number of equal pairs in train: {len(y_test[y_test==1])}')
print(f'Number of not equal pairs in train: {len(y_test[y_test==0])}')