In [1]:
import pandas as pd
from numpy import expand_dims
import cv2
from keras.models import load_model
import numpy as np
import keras

Using TensorFlow backend.


In [2]:
df = pd.read_csv('data.csv')
df.head()

Unnamed: 0,image1,image2,label
0,data/Billy_Crystal_0001.jpg,data/Billy_Crystal_0003.jpg,1
1,data/Britney_Spears_0008.jpg,data/Britney_Spears_0014.jpg,1
2,data/Joseph_Deiss_0002.jpg,data/Joseph_Deiss_0003.jpg,1
3,data/Barbra_Streisand_0001.jpg,data/Barbra_Streisand_0002.jpg,1
4,data/Rebekah_Chantay_Revels_0001.jpg,data/Rebekah_Chantay_Revels_0002.jpg,1


In [3]:
class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, df, batch_size=4, shuffle=False):
        'Initialization'
        
        self.batch_size = batch_size
        self.df = df
        self.list_IDs = list_IDs
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
            
    
    def preprocess_face(self,filename, required_size=(160, 160)):
        
        image = cv2.imread(filename)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  
        image = cv2.resize(image, required_size)
        image = image.astype('float32') / 255.0
        # standardize pixel values across channels (global)
#         mean, std = image.mean(), image.std()
#         image = (image - mean) / std
        # transform face into one sample
        return image

    def __data_generation(self, list_IDs_batch):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X1 = np.empty((self.batch_size, 160, 160, 3))
        X2 = np.empty((self.batch_size, 160, 160, 3))
        y = np.empty((self.batch_size), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_batch):
            # Store sample
            img1 = self.df['image1'].iloc[ID]
            img2 = self.df['image2'].iloc[ID]
            X1[i,] = self.preprocess_face(img1)
            X2[i,] = self.preprocess_face(img2)

            # Store class
            y[i] = self.df['label'].iloc[ID]

        return [X1, X2], y

In [4]:
from sklearn.model_selection import train_test_split

train_idx, val_idx = train_test_split(
    df.index, random_state=42, test_size=0.2, stratify = df.label.values)

train_face_gen = DataGenerator(train_idx, df)
val_face_gen = DataGenerator(val_idx, df)

# for x,y in train_face_gen:
#     pass
# for x,y in val_face_gen:
#     pass


In [5]:
from keras.layers import Lambda, Input
from keras.models import Model
import keras.backend as K
from keras.layers.core import Dense, Dropout, Flatten, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.noise import GaussianNoise
from keras.optimizers import Adam, RMSprop

def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)


def contrastive_loss(y_true, y_pred):
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

def accuracy(y_true, y_pred):
    return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))

def base(base_model):
    xin = Input(shape=(160,160,3))
    fetures = Model(base_model.get_layer(index=0).input, base_model.get_layer(index=-1).output)
    embd1 = fetures(xin)
    return Model(xin, embd1)

def face_detector(base_model):
    base_model = base(base_model)
    
    x1 = Input(shape=(160,160,3)) 
    x2 = Input(shape=(160,160,3)) 
    embd1 = base_model(x1)
    embd2 = base_model(x2)
    
    distance = Lambda(euclidean_distance,
                  output_shape=eucl_dist_output_shape)([embd1, embd2])
    
    final_model = Model(input = [x1, x2], output = distance)
    return final_model

In [6]:
facenet_base = load_model('facenet.h5')

Instructions for updating:
If using Keras pass *_constraint arguments to layers.





In [7]:
facenet_base.summary()

Model: "inception_resnet_v1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 160, 160, 3)  0                                            
__________________________________________________________________________________________________
Conv2d_1a_3x3 (Conv2D)          (None, 79, 79, 32)   864         input_1[0][0]                    
__________________________________________________________________________________________________
Conv2d_1a_3x3_BatchNorm (BatchN (None, 79, 79, 32)   96          Conv2d_1a_3x3[0][0]              
__________________________________________________________________________________________________
Conv2d_1a_3x3_Activation (Activ (None, 79, 79, 32)   0           Conv2d_1a_3x3_BatchNorm[0][0]    
________________________________________________________________________________

In [9]:
for layer in facenet_base.layers[:-19]:
    layer.trainable = False
    
engine = face_detector(facenet_base)
    
# adam = Adam(learning_rate=1e-3)
rms = RMSprop()
engine.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy])
engine.summary()

Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            (None, 160, 160, 3)  0                                            
__________________________________________________________________________________________________
input_3 (InputLayer)            (None, 160, 160, 3)  0                                            
__________________________________________________________________________________________________
model_2 (Model)                 (None, 128)          22808144    input_2[0][0]                    
                                                                 input_3[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 1)            0           model_2[1][0]              



In [None]:
callback = [ModelCheckpoint(f'best_model.h5', save_best_only=True, verbose=2),
            EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=15, restore_best_weights=True),
            ReduceLROnPlateau(monitor='val_accuracy', factor=0.5, patience=10, min_lr=0.00000001, mode='max', verbose=1)]


history = engine.fit_generator(
        train_face_gen,
        validation_data=val_face_gen,
        epochs=50,
        verbose=1,
        callbacks=callback
    )

Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50