## Imports

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import os
import sys
import random
import tensorflow as tf
from pathlib import Path
from six.moves import urllib
import tarfile
import shutil

from keras import applications
from keras import layers
from keras import losses
from keras import optimizers
from keras import metrics
from keras import Model

from keras.applications import inception_v3
from keras.applications.inception_v3 import InceptionV3

In [None]:
target_size = (140, 140)

inception_model = InceptionV3(weights='imagenet', input_shape = target_size + (3,), include_top=False)

## Layers

In [None]:
flat_layer = layers.Flatten()(inception_model.output)
dense_layer_1 = layers.Dense(512, activation='relu')(flat_layer)
dense_layer_1 = layers.BatchNormalization()(dense_layer_1)
dense_layer_2 = layers.Dense(256, activation='relu')(dense_layer_1)
dense_layer_2 = layers.BatchNormalization()(dense_layer_2)
dense_layer_3 = layers.Dense(256, activation='relu')(dense_layer_2)

inception_model.trainable = False

transfer_inception_model = Model(inputs = inception_model.inputs, outputs = dense_layer_3)

## Triplet function

In [None]:
class SimilarityLayer(layers.Layer):
    # compute and return the two distances:
    # d(anchor,positive) 
    # d(anchor,negative)
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
    def call(self, anchor, positive, negative):
        d1 = tf.reduce_sum(tf.square(anchor-positive), -1)
        d2 = tf.reduce_sum(tf.square(anchor-negative), -1)
        return(d1,d2)
    
anchor = layers.Input(name='anchor', shape = target_size + (3,))
positive = layers.Input(name='positive', shape = target_size + (3,))
negative = layers.Input(name='negative', shape = target_size + (3,))

sim_layer_output = SimilarityLayer().call(
    transfer_inception_model(inputs = inception_v3.preprocess_input(anchor)),
    transfer_inception_model(inputs = inception_v3.preprocess_input(positive)),
    transfer_inception_model(inputs = inception_v3.preprocess_input(negative))
)

siamese_model = Model(inputs=[anchor, positive,negative], outputs=sim_layer_output)

## Siamese model

In [None]:
class SiameseModelClass(Model):
    def __init__(self, siamese_model, margin = 0.5):
        super(SiameseModelClass, self).__init__()
        
        self.siamese_model = siamese_model
        self.margin = margin
        
        # create a Metric instance to track the loss
        self.loss_tracker = metrics.Mean(name="loss")
        
    def call(self, inputs):
        return self.siamese_model(inputs)
    
    # customize the training process: providing our own training step
    def train_step(self, data):
        with tf.GradientTape() as tape:
            # call custom loss function
            loss = self.custom_loss(data)
            
        # Compute gradients
        trainable_vars = self.siamese_model.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Update our training loss metric
        self.loss_tracker.update_state(loss)
        
        return {"loss": self.loss_tracker.result()}
    
    # providing our own evaluation step
    def test_step(self, data):
        # call custom loss function
        loss = self.custom_loss(data)
        
        # Update our test loss metric
        self.loss_tracker.update_state(loss)
        
        return {"loss": self.loss_tracker.result()}
    
    # custom loss function
    def custom_loss(self, data):
        # get the distances tuple from the siamese model output
        d1, d2 = self.siamese_model(data)
        
        # compute the triplet loss
        loss = tf.maximum(d1 - d2 + self.margin, 0)
        
        return loss
    
    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [self.loss_tracker]

## Download function

In [None]:
def download_and_uncompress_tarball(tarball_url, dataset_dir):
    """Downloads the `tarball_url` and uncompresses it locally.
    Args:
    tarball_url: The URL of a tarball file.
    dataset_dir: The directory where the temporary files are stored.
    """
    filename = tarball_url.split('/')[-1]
    filepath = os.path.join(dataset_dir, filename)

    def _progress(count, block_size, total_size):
        sys.stdout.write('\r>> Downloading %s %.1f%%' % (
            filename, float(count * block_size) / float(total_size) * 100.0))
        sys.stdout.flush()

    filepath, _ = urllib.request.urlretrieve(tarball_url, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Successfully downloaded', filename, statinfo.st_size, 'bytes.')
    tarfile.open(filepath, 'r:gz').extractall(dataset_dir)

## Downloading the dataset

In [None]:


# URL for sourcing the funneled images
database_url = 'http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz'

root_folder = '.'
download_folder = root_folder + '/'+ 'data/lfw_original'
selection_folder = root_folder + '/' + 'data/lfw_selection'
download_path = download_folder + '/lfw-deepfunneled.tgz'

if not os.path.exists(download_folder):
    os.makedirs(download_folder)

if not os.path.exists(selection_folder):
    os.makedirs(selection_folder)
    
if not os.path.exists(download_path):
    download_and_uncompress_tarball(database_url, download_folder)


## Extracting dataset

In [None]:
extracted_folder = download_folder + '/lfw-deepfunneled'

# images are organized into separate folders for each person
# get a list of subfolders 
subfolders = [x[0] for x in os.walk(extracted_folder)]

# first item is root the folder itself
subfolders.pop(0) 

## Images per person

In [None]:
people_list = []

for path in subfolders:
    image_count = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
    people_list.append((path.split('\\')[-1], image_count))
    #people_count.append((path, image_count))
    
# Sort from max to min images per person
people_list = sorted(people_list, key=lambda x: x[1], reverse=True)

## Poeple list counting

In [None]:
print(f'Number of people: {len(subfolders)}')
print(f'Number of people with only one photo: {len([person for person, image_count in people_list if image_count==1])}')
print(f'Number of people with >=5 photos: {len([person for person, image_count in people_list if image_count>=5])}')

## People with 5+ images

In [None]:
selected_persons = {}
i = 0

for person,image_count in people_list:
    if image_count >=5:
        file_list = []
        
        # create new folder in selected images path
        newpath = selection_folder + '/' + person.split('/')[-1]
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        
        # copy / paste first 5 images to the new location
        files = [os.path.join(person, f) for f in os.listdir(person) if os.path.isfile(os.path.join(person, f))]
        files = files[0:5] # select first 5 images
        for file in files:
            filename = file.split('/')[-1]
            shutil.copyfile(file, newpath + '/' + filename)
            file_list.append(newpath + '/' + filename)
            
        selected_persons[i] = file_list
        i = i + 1

## Triplets

In [None]:
triplets = []

for item in selected_persons.items():
    images = item[1]
    
    for i in range(len(images)-1):
        for j in range(i+1,len(images)):
            anchor = images[i]
            positive = images[j]
            
            # choose a random negative
            # first generate a random class rank and make sure we're not selecting the current class
            random_class = item[0]
            while random_class == item[0]:
                random_class = random.randint(0, len(selected_persons)-1)
            # selected a random image from the 5 that any of our classes has
            random_image = random.randint(0, 4)
            negative = selected_persons[random_class][random_image]
            
            triplets.append((anchor, positive, negative))

## Preprocessing images

In [None]:
def preprocess_image(filename):
    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels = 3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_size)
    return image

## Displaying triplets

In [None]:
def plot_images(triplets):
    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    
    fig = plt.figure(figsize=(7,12))
    axis = fig.subplots(5, 3)
    
    for i in range(0,5):
        anchor,positive,negative = triplets[40+i]
        show(axis[i,0], preprocess_image(anchor))
        show(axis[i,1], preprocess_image(positive))
        show(axis[i,2], preprocess_image(negative))


plot_images(triplets)

## Preprocessing triplets

In [None]:
def preprocess_triplets(anchor, positive, negative):
    """
    Inputs: a tuple of filenames
    Output: a tuple of preprocessed images 
    """

    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative)
    )

## Shuffling the triplets

In [None]:
rng = np.random.RandomState(seed=101)
rng.shuffle(triplets)

## Transformations

In [None]:
anchor_images = [a_tuple[0] for a_tuple in triplets]
positive_images = [a_tuple[1] for a_tuple in triplets]
negative_images = [a_tuple[2] for a_tuple in triplets]

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)

## Split for training and validation

In [None]:
training_data = dataset.take(round(image_count * 0.8))
validation_data = dataset.skip(round(image_count * 0.8))

training_data = training_data.batch(32, drop_remainder=False)
training_data = training_data.prefetch(8)

validation_data = validation_data.batch(32, drop_remainder=False)
validation_data = validation_data.prefetch(8)

## Training and saving the model

In [None]:
model_on_GPU = SiameseModelClass(siamese_model)
model_on_GPU.compile(optimizer = optimizers.Adam(0.0001))

import time
start = time.time()

epochs = 15

history = model_on_GPU.fit(
    training_data, 
    epochs=epochs, 
    validation_data = validation_data
)
stop = time.time()
print(f'Training on GPU took: {(stop-start)/60} minutes')

model_on_GPU.save("lfw.model.keras")

## Saving the model

In [None]:
model_on_GPU.save("lfw.model.keras")

## Training results

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss during training')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

## Checking if the training was successful

In [None]:
sample = next(iter(training_data))
#plot_images(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    transfer_inception_model(inputs = inception_v3.preprocess_input(anchor)),
    transfer_inception_model(inputs = inception_v3.preprocess_input(positive)),
    transfer_inception_model(inputs = inception_v3.preprocess_input(negative)),
)

d1 = np. sum(np. power((anchor_embedding-positive_embedding),2))
print(f'Anchor-positive difference = {d1}')

d2 = np. sum(np. power((anchor_embedding-negative_embedding),2))
print(f'Anchor-negative difference = {d2}')

cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())

## Testing

In [58]:
evaluation_results = model_on_GPU.evaluate(validation_data)

print(f'Loss: {evaluation_results}')

Loss: 0.5031144022941589
