# Facial Keypoint Detection


## Initialize Environment

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from keras.backend.tensorflow_backend import set_session
from keras.backend.tensorflow_backend import clear_session
from keras.backend.tensorflow_backend import get_session
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Activation, Convolution2D, BatchNormalization, Flatten
from tensorflow.keras.layers import Dense, Dropout, Input, MaxPool2D
from tensorflow.keras.optimizers import Nadam
from keras import backend
import cv2
import os, gc, json, math

In [None]:
import keras
print(keras.__version__)
import tensorflow
print(tensorflow.__version__)

## Functions

In [None]:
def convert_pixels(data):
    """
    Convert pixels to the right intensity 0-1 and in a square matrix.
    """
    data = np.array([row.split(' ') for row in data['Image']],dtype='float') / 255.0
    data = data.reshape(-1,96,96,1)
    return(data)

In [None]:
def view_img(sample_img,coord=None):
    """
    Display an image. For debugging, display a coordinate on the image.
    input:
        - sample_img: numpy array. image to be displayed
        - coord: lst. of coordinates in form [[x_coordinate,y_coordinate],[x_coordinate,y_coordinate]]
    TODO handle multiple coordinates. Work out bugs with multiple coordinates
    """
    plt.figure()
    plt.imshow(sample_img.reshape(96,96),cmap='gray')
    if coord is not None:
        plt.scatter(coord[0],coord[1],marker = '*',c='r')
    plt.show()

In [None]:
def get_facial_keypoints(data,ind):
    """
    Structure the coordinates for all facial keypoints for a single image.
    inputs:
        - data: numpy array containing rows as each image sample and columns as facial keypoint coordinates
        - ind: index of the image
    output:
        - numpy array with format [[list of x-coordinates],[list of y-coordinates]]
    """
    data[ind]
    it = iter(data[ind])
    x_coord = []
    y_coord = []

    for x in it:
        x_coord.append(x)
        y_coord.append(next(it))
    
    return(np.array([x_coord,y_coord]))


## Import Data

In [None]:
# Reset Keras Session
def reset_keras():
    sess = get_session()
    clear_session()
    sess.close()
    sess = get_session()

    try:
        del model # this is from global space - change this as you need
    except:
        pass

    print(gc.collect()) # if it's done something you should see a number being outputted

    # use the same config as you used to create the session
    config = tensorflow.ConfigProto()
    config.gpu_options.per_process_gpu_memory_fraction = 1
    config.gpu_options.visible_device_list = "0"
    set_session(tensorflow.Session(config=config))
    
reset_keras()    

In [None]:
if not os.path.exists('../output/'):
    os.makedirs('../output/model')
    os.makedirs('../output/history')
    
    
model_dir = "../output/model/"
history_dir = "../output/history/"

train_file = '../input/training/training.csv'
test_file = '../input/test/test.csv'
train_data = pd.read_csv(train_file)  
test_data = pd.read_csv(test_file)

## Exploratory Data Analysis

In [None]:
train_data.head().T

Create training vector with images and normalize thee

In [None]:
x_train = convert_pixels(train_data)
x_test = convert_pixels(test_data)

Generate labels 

In [None]:
y_train = train_data[[col for col in train_data.columns if col != 'Image']].to_numpy()

## Feature Engineering

### Set feature engineering parameters

In [None]:
fill_na = False
add_flip_horiz = True
add_blur_img = True
orig_x_train = x_train.copy()
orig_y_train = y_train.copy()

### Fill NA in the training labels.

In [None]:
if fill_na:
    # https://stackoverflow.com/questions/18689235/numpy-array-replace-nan-values-with-average-of-columns
    # get column means
    col_mean = np.nanmean(y_train,axis=0)

    # find the x,y indices that are missing from y_train
    inds = np.where(np.isnan(y_train))

    # fill in missing values in y_train with the column means. "take" is much more efficient than fancy indexing
    y_train[inds] = np.take(col_mean, inds[1])


### Flip images horizontally and add to the training data

In [None]:
def flip_img_horiz():
    """
    Flip images horizontally for all training images
    """
    # Flip images
    flip_img = np.array([np.fliplr(orig_x_train[[ind]][0]) for ind in range(orig_x_train.shape[0])])
    
    # Flip x coordinates
    flip_train_data = train_data.copy()
    x_columns = [col for col in flip_train_data.columns if '_x' in col]
    flip_train_data[x_columns] = flip_train_data[x_columns].applymap(lambda x: 96-x)

    # Flip left and right values
    real_left_val = flip_train_data[[col for col in flip_train_data if 'right_' in col]]
    real_right_val = flip_train_data[[col for col in flip_train_data if 'left_' in col]]
    flip_train_data[[col for col in flip_train_data if 'right_' in col]] = real_right_val
    flip_train_data[[col for col in flip_train_data if 'left_' in col]] = real_left_val
    
    # Output in correct format
    flip_coord = flip_train_data[[col for col in flip_train_data if col != 'Image']].to_numpy()
    return(flip_img,flip_coord)

if add_flip_horiz:
    # Apply the augmentation and add the new data to the training set
    flipped_img,flipped_coord = flip_img_horiz()
    x_train = np.append(x_train,flipped_img,axis=0)
    y_train = np.append(y_train,flipped_coord,axis=0)

### Add Gaussian blurring with a 5x5 filter with $\sigma$ = 2.

In [None]:
def blur_img():
    """
    Add Gaussian blurring to the images
    """
    # https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_imgproc/py_filtering/py_filtering.html
    blur_img = np.array([cv2.GaussianBlur(orig_x_train[[ind]][0],(5,5),2).reshape(96,96,1) for ind in range(orig_x_train.shape[0])])
    
    return(blur_img)

if add_blur_img:
    x_train = np.append(x_train,blur_img(),axis=0)
    y_train = np.append(y_train,orig_y_train,axis=0)

## Modeling

In [None]:
# Define callback function if detailed log required
class History(tensorflow.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.train_loss = []
        self.train_rmse = []
        self.val_rmse = []
        self.val_loss = []

    def on_batch_end(self, batch, logs={}):
        self.train_loss.append(logs.get('loss'))
        self.train_rmse.append(logs.get('rmse'))
        
    def on_epoch_end(self, batch, logs={}):    
        self.val_rmse.append(logs.get('val_rmse'))
        self.val_loss.append(logs.get('val_loss'))
        
# Implement ModelCheckPoint callback function to save CNN model
class CNN_ModelCheckpoint(tensorflow.keras.callbacks.Callback):

    def __init__(self, model, filename):
        self.filename = filename
        self.cnn_model = model

    def on_train_begin(self, logs={}):
        self.max_val_rmse = math.inf
        
 
    def on_epoch_end(self, batch, logs={}):    
        val_rmse = logs.get('val_rmse')
        if(val_rmse < self.max_val_rmse):
           self.max_val_rmse = val_rmse
           self.cnn_model.save(self.filename)


In [None]:
def base_model():
    model_input = Input(shape=(96,96,1))

    x = Convolution2D(32, (3,3), padding='same', use_bias=False)(model_input)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = MaxPool2D(pool_size=(2, 2))(x)
    x = Convolution2D(64, (3,3), padding='same', use_bias=False)(x)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = MaxPool2D(pool_size=(2, 2))(x)
    x = Convolution2D(96, (3,3), padding='same', use_bias=False)(x)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = MaxPool2D(pool_size=(2, 2))(x)
    x = Convolution2D(128, (3,3),padding='same', use_bias=False)(x)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = MaxPool2D(pool_size=(2, 2))(x)
    x = Convolution2D(256, (3,3),padding='same',use_bias=False)(x)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = MaxPool2D(pool_size=(2, 2))(x)
    x = Convolution2D(512, (3,3), padding='same', use_bias=False)(x)
    x = LeakyReLU(alpha = 0.1)(x)
    x = BatchNormalization()(x)
    x = Flatten()(x)
    x = Dense(512,activation='relu')(x)
    x = Dropout(0.2)(x)
    model_output = Dense(30)(x)
    model = Model(model_input, model_output, name="base_model")
    return model

model = base_model()
model.summary()

In [None]:
# Custom RMSE metric
def rmse(y_true, y_pred):
    return backend.sqrt(backend.mean(backend.square(y_pred - y_true), axis=-1))

#from tensorflow.keras.optimizers.schedules import InverseTimeDecay

# Use Nadam optimizer with variable learning rate
optimizer = Nadam(lr=0.00001,
                  beta_1=0.9,
                  beta_2=0.999,
                  epsilon=1e-08,
                  schedule_decay=0.004)


# Loss: MSE and Metric = RMSE
model.compile(optimizer= optimizer, 
              loss='mean_squared_error',
              metrics=[rmse])

#Callback to save the best model
saveBase_Model = CNN_ModelCheckpoint(model, model_dir+"base_model.h5")

#define callback functions
callbacks = [#EarlyStopping(monitor='val_rmse', patience=3, verbose=2),
             saveBase_Model]

Run for 1000 epochs and keeping 20% train-valid split

In [None]:
history = model.fit(x_train,
                    y_train,
                    epochs = 1000,
                    batch_size = 256,
                    validation_split = 0.2,
                    callbacks = callbacks
                    )

In [None]:
plt.style.use('seaborn-darkgrid')
plt.plot(history.history['rmse'])
plt.xlabel('Epochs')
plt.ylabel('Root Mean Square Error')
plt.title('Model Training Error')
plt.show()

In [None]:
sess = get_session()
clear_session()
sess.close()
sess = get_session()
try:
    del model # this is from global space - change this as you need
except:
    print("Model clear Failed")
print(gc.collect())    