### Requirement Libraries

In [194]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.applications import resnet
from tensorflow.keras import layers
from tensorflow import keras
import cv2
import os
import random
from mtcnn.mtcnn import MTCNN

### Creating Configuration

### Build pair of images dataset

In [105]:
def images_to_array(root_path, img_target_size=(300, 300), numeric_labels=True):
    """
    read images from directories as a numpy array.
    
    :param root_path(str): root path refering to the path containing sub-folders
    :param img_target_size(tuple): a tuple with size(W, L) to convert image sizes
    :param numeric_labels: if true, output labels will be int. if false, the folder names considered as label
    :return img_array: a numpy array containing all image with shape (#images, img_target_size, #channels)
    """
    img_list = []
    label_list = []
    class_names = os.listdir(root_path)
    #class_names = [str(i) for i in sorted([int(j) for j in os.listdir(root_path)])]
    for i, class_name in enumerate(class_names):
        img_names = os.listdir(os.path.join(root_path, class_name))

        for img_name in img_names:
            path = os.path.join(root_path, class_name, img_name)
            img = cv2.imread(path)
            img = cv2.resize(img, img_target_size)
            img_list.append(img)
            if numeric_labels:
                label_list.append(i)
            else:
                label_list.append(class_name)
        
    img_array = np.array(img_list)
    label_array = np.array(label_list)
    
    return img_array, label_array

In [106]:
root_path = 'C:\\Users\Lenovo\Documents\Github\Datasets\Facial-Emotion-Recognition\\Data'
img_array, label_array = images_to_array(root_path)
print(f'img_array.shape : {img_array.shape}')
print(f'label_array.shape : {label_array.shape}')

img_array.shape : (152, 300, 300, 3)
label_array.shape : (152,)


In [107]:
def make_pairs(images, labels):
    # initialize two empty lists to hold the (image, image) pairs and
    # labels to indicate if a pair is positive or negative
    pair_images = []
    pair_labels = []
    
    # calculate the total number of classes present in the dataset
    # and then build a list of indexes for each class label that
    # provides the indexes for all examples with a given label
    num_classes = len(np.unique(labels))
    idx = [np.where(labels == i)[0] for i in range(0, num_classes)] 
    
    # loop over all images
    for idxA in range(len(images)):
        # grab the current image and label belonging to the current
        # iteration
        current_image = images[idxA]
        label = labels[idxA]
        
        # randomly pick an image that belongs to the *same* class
        # label
        idxB = np.random.choice(idx[label])
        pos_image = images[idxB]
        
        # prepare a positive pair and update the images and labels
        # lists, respectively
        pair_images.append([current_image, pos_image])
        pair_labels.append([1])
        
        # grab the indices for each of the class labels *not* equal to
        # the current label and randomly pick an image corresponding
        # to a label *not* equal to the current label
        negIdx = np.where(labels != label)[0]
        neg_image = images[np.random.choice(negIdx)]
        
        # prepare a negative pair of images and update our lists
        pair_images.append([current_image, neg_image])
        pair_labels.append([0])
        
    # return a 2-tuple of our image pairs and labels
    return (np.array(pair_images), np.array(pair_labels))

In [108]:
pair_images, pair_labels = make_pairs(img_array, label_array)
print(f'pair_images : {pair_images.shape}')
print(f'pair_labels : {pair_labels.shape}')

pair_images : (304, 2, 300, 300, 3)
pair_labels : (304, 1)


### Build triplet images dataset

In [174]:
class MapFunction():
    def __init__(self, imageSize):
        # define the image width and height
        self.imageSize = imageSize
    def decode_and_resize(self, imagePath):
        # read and decode the image path
        image = tf.io.read_file(imagePath)
        image = tf.image.decode_jpeg(image, channels=3)
        # convert the image data type from uint8 to float32 and then resize
        # the image to the set image size
        image = tf.image.convert_image_dtype(image, dtype=tf.float32)
        image = tf.image.resize(image, self.imageSize)
        # return the image
        return image
    def __call__(self, anchor, positive, negative):
        anchor = self.decode_and_resize(anchor)
        positive = self.decode_and_resize(positive)
        negative = self.decode_and_resize(negative)
        # return the anchor, positive and negative processed images
        return (anchor, positive, negative)

In [175]:
class TripletGenerator:
    def __init__(self, datasetPath):
        # create an empty list which will contain the subdirectory
        # names of the `dataset` directory with more than one image
        # in it
        self.peopleNames = list()
        # iterate over the subdirectories in the dataset directory
        for folderName in os.listdir(datasetPath):
            # build the subdirectory name
            absoluteFolderName = os.path.join(datasetPath, folderName)
            # get the number of images in the subdirectory
            numImages = len(os.listdir(absoluteFolderName))
            # if the number of images in the current subdirectory
            # is more than one, append into the `peopleNames` list
            if numImages > 1:
                self.peopleNames.append(absoluteFolderName)
        # create a dictionary of people name to their image names
        self.allPeople = self.generate_all_people_dict()
    def generate_all_people_dict(self):
        # create an empty dictionary that will be populated with
        # directory names as keys and image names as values
        allPeople = dict()
        # iterate over all the directory names with more than one
        # image in it
        for personName in self.peopleNames:
            # get all the image names in the current directory
            imageNames = os.listdir(personName)
            # build the image paths and populate the dictionary
            personPhotos = [
                os.path.join(personName, imageName) for imageName in imageNames
            ]
            allPeople[personName] = personPhotos
        # return the dictionary
        return allPeople
    def get_next_element(self):
        # create an infinite generator
        while True:
            # draw a person at random which will be our anchor and
            # positive person
            anchorName = random.choice(self.peopleNames)
            # copy the list of people names and remove the anchor
            # from the list
            temporaryNames = self.peopleNames.copy()
            temporaryNames.remove(anchorName)
            # draw a person at random from the list of people without
            # the anchor, which will act as our negative sample
            negativeName = random.choice(temporaryNames)
            # draw two images from the anchor folder without replacement
            (anchorPhoto, positivePhoto) = np.random.choice(
                a=self.allPeople[anchorName],
                size=2,
                replace=False
            )
            # draw an image from the negative folder
            negativePhoto = random.choice(self.allPeople[negativeName])
            # yield the anchor, positive and negative photos
            yield (anchorPhoto, positivePhoto, negativePhoto)

In [136]:
path = 'C:\\Users\\Lenovo\\Documents\\Github\\Datasets\\LFW\\lfw-deepfunneled\\lfw-deepfunneled'

In [179]:
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 256
BUFFER_SIZE = BATCH_SIZE * 2
# define autotune
AUTO = tf.data.AUTOTUNE

### Create detected face dataset

In [192]:
for person_name in os.listdir(path):
    for img_name in os.listdir(os.path.join(path, person_name)):
        img_path = os.listdir(os.path.join(path, person_name, img_name))
        img = cv2.imread(img_path)
        detector = MTCNN()
        
    print(person_name)
    break

Aaron_Eckhart


### Create the data pipeline

In [177]:
# create the data input pipeline for train and val dataset
trainTripletGenerator = TripletGenerator(
    datasetPath=path)

valTripletGenerator = TripletGenerator(
    datasetPath=path)

trainTfDataset = tf.data.Dataset.from_generator(
    generator=trainTripletGenerator.get_next_element,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
    )
)

valTfDataset = tf.data.Dataset.from_generator(
    generator=valTripletGenerator.get_next_element,
    output_signature=(
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
        tf.TensorSpec(shape=(), dtype=tf.string),
    )
)

In [190]:
mapFunction = MapFunction(imageSize=IMAGE_SIZE)

print("[INFO] building the train and validation `tf.data` pipeline...")
trainDs = (trainTfDataset
    .map(mapFunction)
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)
valDs = (valTfDataset
    .map(mapFunction)
    .batch(BATCH_SIZE)
    .prefetch(AUTO)
)

[INFO] building the train and validation `tf.data` pipeline...


### Create Model

In [191]:
IMAGE_SIZE = (300, 300)

LEARNING_RATE = 0.0001
STEPS_PER_EPOCH = 50
VALIDATION_STEPS = 10
EPOCHS = 10

In [127]:
def get_embedding_module(imageSize):
    # construct the input layer and pass the inputs through a
    # pre-processing layer
    inputs = keras.Input(imageSize + (3,))
    x = resnet.preprocess_input(inputs)
    
    # fetch the pre-trained resnet 50 model and freeze the weights
    baseCnn = keras.models.load_model(model_path)
    baseCnn.trainable=False
    
    # pass the pre-processed inputs through the base cnn and get the
    # extracted features from the inputs
    extractedFeatures = baseCnn(x)
    # pass the extracted features through a number of trainable layers
    x = layers.GlobalAveragePooling2D()(extractedFeatures)
    x = layers.Dense(units=1024, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=512, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(units=256, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(units=128)(x)
    # build the embedding model and return it
    embedding = keras.Model(inputs, outputs, name="embedding")
    return embedding

In [128]:
def get_siamese_network(imageSize, embeddingModel):
    # build the anchor, positive and negative input layer
    anchorInput = keras.Input(name="anchor", shape=imageSize + (3,))
    positiveInput = keras.Input(name="positive", shape=imageSize + (3,))
    negativeInput = keras.Input(name="negative", shape=imageSize + (3,))
    # embed the anchor, positive and negative images
    anchorEmbedding = embeddingModel(anchorInput)
    positiveEmbedding = embeddingModel(positiveInput)
    negativeEmbedding = embeddingModel(negativeInput)
    # build the siamese network and return it
    siamese_network = keras.Model(
        inputs=[anchorInput, positiveInput, negativeInput],
        outputs=[anchorEmbedding, positiveEmbedding, negativeEmbedding]
    )
    return siamese_network

In [129]:
class SiameseModel(keras.Model):
    
    def __init__(self, siameseNetwork, margin, lossTracker):
        super().__init__()
        self.siameseNetwork = siameseNetwork
        self.margin = margin
        self.lossTracker = lossTracker
        
    def _compute_distance(self, inputs):
        (anchor, positive, negative) = inputs
        # embed the images using the siamese network
        embeddings = self.siameseNetwork((anchor, positive, negative))
        anchorEmbedding = embeddings[0]
        positiveEmbedding = embeddings[1]
        negativeEmbedding = embeddings[2]
        # calculate the anchor to positive and negative distance
        apDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - positiveEmbedding), axis=-1
        )
        anDistance = tf.reduce_sum(
            tf.square(anchorEmbedding - negativeEmbedding), axis=-1
        )
        
        # return the distances
        return (apDistance, anDistance)
    
    def _compute_loss(self, apDistance, anDistance):
        loss = apDistance - anDistance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    
    def call(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        return (apDistance, anDistance)
    
    def train_step(self, inputs):
        with tf.GradientTape() as tape:
            # compute the distance between the anchor and positive,
            # negative images
            (apDistance, anDistance) = self._compute_distance(inputs)
            # calculate the loss of the siamese network
            loss = self._compute_loss(apDistance, anDistance)
        # compute the gradients and optimize the model
        gradients = tape.gradient(
            loss,
            self.siameseNetwork.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.siameseNetwork.trainable_variables)
        )
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    
    def test_step(self, inputs):
        # compute the distance between the anchor and positive,
        # negative images
        (apDistance, anDistance) = self._compute_distance(inputs)
        # calculate the loss of the siamese network
        loss = self._compute_loss(apDistance, anDistance)
        
        # update the metrics and return the loss
        self.lossTracker.update_state(loss)
        return {"loss": self.lossTracker.result()}
    
    @property
    def metrics(self):
        return [self.lossTracker]

In [130]:
# build the embedding module and the siamese network

embeddingModule = get_embedding_module(imageSize=IMAGE_SIZE)

siameseNetwork =  get_siamese_network(
    imageSize=IMAGE_SIZE,
    embeddingModel=embeddingModule,
)



In [131]:
#create siamese model instance
siameseModel = SiameseModel(
    siameseNetwork=siameseNetwork,
    margin=0.5,
    lossTracker=keras.metrics.Mean(name="loss"),
)

In [134]:
siameseModel.compile(
    optimizer=keras.optimizers.Adam(0.0001)
)

In [None]:
# train and validate the siamese model

siameseModel.fit(
	trainDs,
	steps_per_epoch=config.STEPS_PER_EPOCH,
	validation_data=valDs,
	validation_steps=config.VALIDATION_STEPS,
	epochs=config.EPOCHS,
)