# Face Recognition with FaceNet and MTCNN
- This project aims to test FaceNet system for face recognition. FaceNet is proposed by Florian Schroff in the 2015 paper FaceNet: A Unified Embedding for Face Recognition and Clustering
- Project Based Learn 5, we use MTCNN algorithm for face detection and Facenet algorithm for face recognition
- Our dataset has about 20 classes and the number of images per layer is about 130- 150 images for each class


In [2]:
import numpy as np
import os
import numpy as np 
import cv2 as cv
from matplotlib import pyplot as plt
from keras.models import load_model
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from mtcnn import MTCNN

## Detection and Processing

- Detect face using MTCNN

### paths and vairables

In [3]:
target_size = (160, 160)
confidence_t = 0.99
detector = MTCNN()
encode_lable = LabelEncoder()

In [4]:
def normalize(img):
    mean, std = img.mean(), img.std()
    return (img - mean) / std

In [5]:
def get_face(frame, box):
    x, y, width, height = box
    x, y = abs(x), abs(y)
    face = frame[y: y + height, x: x + width]
    face = cv.resize(face, target_size)
    # face = normalize(face)
    return face, (x, y), (x + width, y + height)

In [6]:
def dectect_face_of_video(path):
    faces_result = []
    video = cv.VideoCapture(path)
    t = 0
    while video.isOpened() :
        t += 1
        ret, frame = video.read()
        if t % 2 != 0:
            continue
        if ret:
            rgb_frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
            faces = detector.detect_faces(rgb_frame)
            if len(faces) > 0:
                for face in faces:
                    if face['confidence'] > confidence_t:
                        face_image, dpt1, dpt2 = get_face( rgb_frame, face['box'])
                        faces_result.append(np.array(face_image))
            else:
                continue
        else:
            break
    video.release()
    cv.destroyAllWindows()
    return np.array(faces_result)

In [7]:
def dectect_faces(BASE_DIR):
    Faces = []
    Lables = []
    
    DataSet_dir = os.path.join(BASE_DIR, "DataSet")
    Raw_img_dir = os.path.join(DataSet_dir, "raw")

    for root, dirs, files in os.walk(Raw_img_dir):
        for file in files:
            if file.endswith("mp4"):
                path = os.path.join(root, file)
                label = os.path.basename(root)

                single_faces = dectect_face_of_video(path)

                labels = [label for _ in range(len(single_faces))]

                Faces.extend(single_faces)
                Lables.extend(labels)

                directory_path = os.path.join( Raw_img_dir.replace("raw", "processed"), label)
                os.makedirs(directory_path, exist_ok=True)

                for i, face in enumerate(single_faces):
                    new_filename = os.path.join(directory_path, os.path.splitext(file)[0]) + "_" + str(i) + ".jpg"

                    cv.imwrite(new_filename, cv.cvtColor( face, cv.COLOR_RGB2BGR))

    return np.array(Faces), np.array(Lables)

In [None]:
dectect_faces("C:/Users/huuhu/Learning/Data_Analytics/PBL5_model/")

## Load Face and Label From Precessed

In [None]:
def extract_face(filename):
    image = Image.open(filename)
    pixels = np.asarray(image)
    return pixels

In [None]:
def load_face(dir):
    faces = list()
    for filename in os.listdir(dir):
        path = dir + filename
        face = extract_face(path)
        faces.append(face)
    return faces

In [None]:
def load_dataset(dir):
    X, y = list(), list()
    for subdir in os.listdir(dir):
        path = dir + subdir + '/'
        faces = load_face(path)
        labels = [subdir for i in range(len(faces))]
        print("load %d sample for class: %s" % (len(faces),subdir))
        X.extend(faces)
        y.extend(labels)
    return np.asarray(X), np.asarray(y)

In [None]:
#load data
X, y = load_dataset("/content/gdrive/MyDrive/Document/Colab_Notebooks/PBL5_model/DataSet/processed/")
#split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42,stratify=y)
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)
#save
np.savez_compressed("/content/gdrive/MyDrive/Document/Colab_Notebooks/PBL5_model/dataset_faces_20cls.npz", X_train, y_train, X_test, y_test)

In [None]:
#load face dataset
data = np.load("/content/gdrive/MyDrive/Document/Colab_Notebooks/PBL5_model/dataset_faces_17cls.npz")
X_train, y_train, X_test, y_test = data['arr_0'], data['arr_1'], data['arr_2'], data['arr_3']
print('Loaded: ', X_train.shape, y_train.shape, X_test.shape, y_test.shape)

**Setup**

In [None]:
np.random.seed(42)
IMG_W, IMG_H, IMG_C = (160, 160, 3)

## Data Visualization 

In [None]:
def show_data(
    images: np.ndarray,
    labels: np.ndarray,
    GRID: tuple=(15, 6),
    FIGSIZE: tuple=(25, 50),
    recog_fn = None,
    database = None,
) -> None:
    plt.figure(figsize=FIGSIZE)
    n_rows, n_cols = GRID
    n_images = n_rows * n_cols
    
    for index in range(n_images):
        image_index = np.random.randint(len(images))
        image, label = images[image_index], labels[image_index]
        
        plt.subplot(n_rows, n_cols, index+1)
        
        plt.imshow(image)
        plt.axis('off')
        
        if recog_fn is None:
            plt.title(label)
        else:
            recognized = recog_fn(image, database)
            plt.title(f"True:{label}\nPred:{recognized}")
    plt.tight_layout()
    plt.show()
show_data(images=X_train, labels= y_train)

## Train model Inception Resnet V2 For extract feature

In [None]:
def encode_lables(y):
    encode_lable.fit(y)
    y = encode_lable.transform(y)
    return y

In [None]:
from keras.layers import Conv2D, Activation, Input, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Concatenate, Lambda, add, GlobalAveragePooling2D
from keras.models import Model
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam

### Build ararchitecture Inception Resnet

In [None]:
def scaling(x, scale):
    return x * scale


def conv2d_bn(x, filters, kernel_size, strides=1, padding='same', use_bias=False, name=None, activation='relu'):
    """
    Utility function to apply Conv2D + Batch normalization + activation.
    """
    x = Conv2D(filters, kernel_size, strides=strides, padding=padding, use_bias=use_bias, name=name)(x)
    x = BatchNormalization(axis=3, momentum=0.995, epsilon=0.001, scale=False, name=name + '_BatchNorm')(x)

    x = Activation(activation, name=name + '_Activation')(x)
    return x
# def conv2d_branch()

def Stem(x):
  x = conv2d_bn(x, 32, 3, strides=2, padding='valid', use_bias=False, name='Conv2d_1a_3x3')
  x = conv2d_bn(x, 32, 3, strides=1, padding='valid', use_bias=False, name='Conv2d_2a_3x3')
  x = conv2d_bn(x, 96, 3, strides=1, padding='same', use_bias=False, name='Conv2d_2b_3x3')

  branch_0 = MaxPooling2D(3, strides=2, name='MaxPool_3a_3x3')(x)
  branch_1 = conv2d_bn(x, 96, 3, strides=2, padding='valid', use_bias=False, name='Conv2d_3a_3x3')
  x = Concatenate(axis=3, name='3a_Concatenate')([branch_0, branch_1])

  branch_0 = conv2d_bn(x, 64, 1, strides=1, padding='same', use_bias=False, name='Conv2d_branch_0_1a_1x1')
  branch_0 = conv2d_bn(branch_0, 96, 3, strides=1, padding='same', use_bias=False, name='Conv2d_branch_0_2a_3x3')

  branch_1 = conv2d_bn(x, 64, 1, strides=1, padding='same', use_bias=False, name='Conv2d_branch_1_1b_1x1')
  branch_1 = conv2d_bn(branch_1, 64, [7, 1], strides=1, padding='same', use_bias=False, name='Conv2d_branch_1_2b_7x1')
  branch_1 = conv2d_bn(branch_1, 64, [1, 7], strides=1, padding='same', use_bias=False, name='Conv2d_branch_1_3b_7x1')
  branch_1 = conv2d_bn(branch_1, 96, 3, strides=1, padding='valid', use_bias=False, name='Conv2d_branch_1_4b_3x3')
  x = Concatenate(axis=3, name='4a_Concatenate')([branch_0, branch_1])

  branch_0 = MaxPooling2D(3, strides=2, name='MaxPool_5a_3x3')(x)
  branch_1 = conv2d_bn(x, 192, 3, strides=2, padding='valid', use_bias=False, name='Conv2d_5a_3x3')
  x = Concatenate(axis=3, name='5a_Concatenate')([branch_0, branch_1])

  return x

def inception_ResNet_A(x):
  names = ['Block35_1', 'Block35_2', 'Block35_3', 'Block35_4', 'Block35_5','Block35_6', 'Block35_7', 'Block35_8', 'Block35_9', 'Block35_10']
  for i in names:
      branch_0 = conv2d_bn(x, 32, 1, strides=1, padding='same', use_bias=False, name=i + '_Branch_0_Conv2d_1x1')

      branch_1 = conv2d_bn(x, 32, 1, strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0a_1x1')
      branch_1 = conv2d_bn(branch_1, 32, 3, strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0b_3x3')

      branch_2 = conv2d_bn(x, 32, 1, strides=1, padding='same', use_bias=False, name=i + '_Branch_2_Conv2d_0a_1x1')
      branch_2 = conv2d_bn(branch_2, 48, 3, strides=1, padding='same', use_bias=False, name=i + '_Branch_2_Conv2d_0b_3x3')
      branch_2 = conv2d_bn(branch_2, 64, 3, strides=1, padding='same', use_bias=False, name=i + '_Branch_2_Conv2d_0c_3x3')

      branches = [branch_0, branch_1, branch_2]
      mixed = Concatenate(axis=3, name=i + '_Concatenate')(branches)
      up = Conv2D(384, 1, strides=1, padding='same', use_bias=True, name=i + '_Conv2d_1x1')(mixed)
      up = Lambda(scaling, output_shape=K.int_shape(up)[1:], arguments={'scale': 0.17})(up)
      x = add([x, up])
      x = Activation('relu', name=i + '_Activation')(x)
  return x

def reduction_A(x):
  # Mixed 6a (Reduction-A block):
  branch_0 = conv2d_bn(x, 384, 3, strides=2, padding='valid', use_bias=False, name='Mixed_6a_Branch_0_Conv2d_1a_3x3')

  branch_1 = conv2d_bn(x, 192, 1, strides=1, padding='same', use_bias=False, name='Mixed_6a_Branch_1_Conv2d_0a_1x1')
  branch_1 = conv2d_bn(branch_1, 224, 3, strides=1, padding='same', use_bias=False, name='Mixed_6a_Branch_1_Conv2d_0b_3x3')
  branch_1 = conv2d_bn(branch_1, 384, 3, strides=2, padding='valid', use_bias=False, name='Mixed_6a_Branch_1_Conv2d_1a_3x3')

  branch_pool = MaxPooling2D(3, strides=2, padding='valid', name='Mixed_6a_Branch_2_MaxPool_1a_3x3')(x)

  branches = [branch_0, branch_1, branch_pool]
  x = Concatenate(axis=3, name='Mixed_6a')(branches)
  return x

def inception_resNet_B(x):
  # 10x Block17 (Inception-ResNet-B block):
  names = ['Block17_1', 'Block17_2', 'Block17_3', 'Block17_4', 'Block17_5',
           'Block17_6', 'Block17_7', 'Block17_8', 'Block17_9', 'Block17_10',
           'Block17_11', 'Block17_12', 'Block17_13', 'Block17_14', 'Block17_15',
           'Block17_16', 'Block17_17', 'Block17_18', 'Block17_19', 'Block17_20']

  for i in names:
      branch_0 = conv2d_bn(x, 192, 1, strides=1, padding='same', use_bias=False, name=i + '_Branch_0_Conv2d_1x1')

      branch_1 = conv2d_bn(x, 128, 1, strides=1, padding='same',use_bias=False, name=i + '_Branch_1_Conv2d_0a_1x1')
      branch_1 = conv2d_bn(branch_1, 160, [1, 7], strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0b_1x7')
      branch_1 = conv2d_bn(branch_1, 192, [7, 1], strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0c_7x1')

      branches = [branch_0, branch_1]
      mixed = Concatenate(axis=3, name=i + '_Concatenate')(branches)
      up = Conv2D(1152, 1, strides=1, padding='same', use_bias=True, name=i + '_Conv2d_1x1')(mixed)
      up = Lambda(scaling, output_shape=K.int_shape(up)[1:], arguments={'scale': 0.1})(up)
      x = add([x, up])
      x = Activation('relu', name=i + '_Activation')(x)

  return x

def reduction_B(x):
  # Mixed 7a (Reduction-B block): 8 x 8 x 2080
  branch_0 = conv2d_bn(x, 256, 1, strides=1, padding='same', use_bias=False, name='Mixed_7a_Branch_0_Conv2d_0a_1x1')
  branch_0 = conv2d_bn(branch_0, 384, 3, strides=2, padding='valid', use_bias=False, name='Mixed_7a_Branch_0_Conv2d_1a_3x3')

  branch_1 = conv2d_bn(x, 256, 1, strides=1, padding='same', use_bias=False, name='Mixed_7a_Branch_1_Conv2d_0a_1x1')
  branch_1 = conv2d_bn(branch_1, 288, 3, strides=2, padding='valid', use_bias=False, name='Mixed_7a_Branch_1_Conv2d_1a_3x3')

  branch_2 = conv2d_bn(x, 256, 1, strides=1, padding='same', use_bias=False, name='Mixed_7a_Branch_2_Conv2d_0a_1x1')
  branch_2 = conv2d_bn(branch_2, 288, 3, strides=1, padding='same', use_bias=False, name='Mixed_7a_Branch_2_Conv2d_0b_3x3')
  branch_2 = conv2d_bn(branch_2, 320, 3, strides=2, padding='valid', use_bias=False, name='Mixed_7a_Branch_2_Conv2d_1a_3x3')

  branch_pool = MaxPooling2D(3, strides=2, padding='valid', name='Mixed_7a_Branch_3_MaxPool_1a_3x3')(x)
  
  branches = [branch_0, branch_1, branch_2, branch_pool]
  x = Concatenate(axis=3, name='Mixed_7a')(branches)
  return x

def inception_resNet_C(x):
  # 5x Block8 (Inception-ResNet-C block):

  names = ['Block8_1', 'Block8_2', 'Block8_3', 'Block8_4', 'Block8_5',
           'Block8_6', 'Block8_7', 'Block8_8', 'Block8_9', 'Block8_10']
  for i in names:
      branch_0 = conv2d_bn(x, 192, 1, strides=1, padding='same', use_bias=False, name=i + '_Branch_0_Conv2d_1x1')

      branch_1 = conv2d_bn(x, 192, 1, strides=1, padding='same',use_bias=False, name=i + '_Branch_1_Conv2d_0a_1x1')
      branch_1 = conv2d_bn(branch_1, 224, [1, 3], strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0b_1x3')
      branch_1 = conv2d_bn(branch_1, 256, [3, 1], strides=1, padding='same', use_bias=False, name=i + '_Branch_1_Conv2d_0b_3x1')
      branches = [branch_0, branch_1]
      mixed = Concatenate(axis=3, name=i + '_Concatenate')(branches)
      up = Conv2D(2144, 1, strides=1, padding='same', use_bias=True, name=i + '_Conv2d_1x1')(mixed)
      up = Lambda(scaling, output_shape=K.int_shape(up)[1:], arguments={'scale': 0.2})(up)
      x = add([x, up])
      x = Activation('relu', name=i + '_Activation')(x)

  return x

In [None]:
def create_inception_resnet_v2(input):
    x = Stem(input)
    x = inception_ResNet_A(x)
    x = reduction_A(x)
    x = inception_resNet_B(x)
    x = reduction_B(x)
    x = inception_resNet_C(x)
    # Classification block  
    
    x = GlobalAveragePooling2D(name='AvgPool')(x)
    x = Dropout(1.0 - 0.8, name='Dropout')(x)
    # Bottleneck
    x = Flatten()(x)
    x = Dense(128, activation='softmax', use_bias=False, name='Bottleneck')(x)
    x = BatchNormalization(momentum=0.995, epsilon=0.001, scale=False, name='Bottleneck_BatchNorm')(x)
    return x

In [None]:
def trainning_weigth(X_data, y_data):

    ip = Input(shape=(160, 160, 3))

    inception_resnet_v2 = create_inception_resnet_v2(ip)
    # Create model
    model = Model(inputs=ip, outputs=inception_resnet_v2, name='inception_resnet_v1')
    model.compile(optimizer=Adam(learning_rate=0.00001, beta_1=0.9, beta_2=0.999), loss='categorical_crossentropy', metrics=['accuracy'])

    # Split the training set into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split( X_data, y_data, test_size=0.2,  shuffle=True, random_state=5)
    y_train = to_categorical(y_train, 128)
    y_val = to_categorical(y_val, 128)
    # Define early stopping and model checkpoint callbacks
    early_stopping = EarlyStopping( monitor='val_loss', patience=10, verbose=1, mode='min')
    model_checkpoint = ModelCheckpoint('/content/gdrive/MyDrive/Document/Colab_Notebooks/PBL5_model/Model/best_weights.h5', save_best_only=True, save_weights_only=True, monitor='val_loss', mode='min', verbose=1)
    # Train the model
    model.fit(X_train, y_train, epochs=20, batch_size=32, verbose=1, validation_data=(X_val, y_val), callbacks=[early_stopping, model_checkpoint])
    return model

### Trainning Model

In [None]:
y_train = encode_lables(y_train)

model = trainning_weigth(X_train, y_train)

## Extract Feature -> tensor 128 dim 

In [None]:
def get_encode( model, face):
    return model.predict(np.expand_dims(face, axis=0))[0]

In [None]:
def get_encodes( model, faces):
    Features = []
    for face in faces:
        Features.append(get_encode(model, face))
    return np.asarray(Features)