# Import packages

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import pickle

from math import sin, cos, pi
import cv2

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import models, layers
from tensorflow.keras.layers import Flatten, Conv2D, LeakyReLU, GlobalAveragePooling2D, MaxPooling2D, Dropout, Dense
from tensorflow.keras.preprocessing.image import array_to_img, img_to_array, load_img

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.applications.resnet50 import ResNet50

tf.random.set_seed(18)
np.random.seed(18)

# Load data

In [None]:
!unzip ../input/facial-keypoints-detection/training.zip
!unzip ../input/facial-keypoints-detection/test.zip

train_data = pd.read_csv('./training.csv')
test_data = pd.read_csv('./test.csv')
lookid_data = pd.read_csv('../input/facial-keypoints-detection/IdLookupTable.csv')

print(train_data.shape, test_data.shape)

# First look at the data

In [None]:
# Take a look at the sample data
pd.set_option("display.max_columns", 100)

train_data[:3].T

In [None]:
train_data.info()

From above, we see quite a number of columns that contain missing values, and the number of missing values are large.

In [None]:
test_data[:3]

Each cell in Column 'image' in train_data/test_data is a list composed of 96*96=9216 items which are seperated by space from each other. This is the X (fields) while y is the other 30 columns in train_data.

# Handle missing values

In [None]:
# Now we deal with missing values

print("How many columns contain missing values?")
print(train_data.isnull().any().value_counts())
print("------------------")
print("The number of missing values for each column:")
print(train_data.isnull().sum())

In [None]:
print("Size of original dataframe: "+str(len(train_data))+'x'+str(len(train_data.columns))+'\n')
train_aug = train_data.dropna()
print("Size of dataframe after dropping rows of missing values, which is used for data augmentation: "
      +str(len(train_aug))+'x'+str(len(train_aug.columns))+'\n')

In [None]:
# One option: fill the missing values with the previous values in that row
train_fill = train_data.fillna(method = 'ffill')
print("Size of dataframe after filling blanks: "
      +str(len(train_fill))+'x'+str(len(train_fill.columns))+'\n')

Since we have such a great amount of missing values, we simply cannot drop them. We still include them for the analyses, but we will not use them for augmentation.

# Prepare X and y

In [None]:
def prep_x_y (train):
    """
    Function to extract X and y from training set:
    INPUT:
        train: dataframe with shape (n,31)
    OUTPUT:
        X: numpy array which contains image data, shape (n,96,96,1)
        y: numpy array with shape (n, 30)
    """
    imgs = []
    pnts = []
    
    n = train.shape[0]
    
    X_tr = train['Image'] # X part
    y_tr = train.drop('Image',axis = 1) # y part
    
    for i in range(n):
        img = X_tr.iloc[i]
        img = img.split(' ')
        imgs.append(img)
        
        pnt = y_tr.iloc[i,:] # take the ith row data
        pnts.append(pnt)

    X_train = np.array(imgs,dtype = 'float')
    X_train = X_train.reshape(-1,96,96,1)
    y_train = np.array(pnts,dtype = 'float')
    X_train = X_train / 255.0
    
    return X_train, y_train

In [None]:
# These are used as base for image augmentation

X_aug, y_aug = prep_x_y(train_aug)

print(X_aug.shape)
print(X_aug.min(), X_aug.max())
print("")

print(y_aug.shape)
print(y_aug.min(), y_aug.max())

In [None]:
# These are to be included in the final training set

X_fill, y_fill = prep_x_y(train_fill)

print(X_fill.shape)
print(X_fill.min(), X_fill.max())

print(y_fill.shape)
print(y_fill.min(), y_fill.max())

Note that the range for y is (0,96), after image augmentation, we need to make sure the augmented data still falls within this range.

# Plot some images

In [None]:
def plot_images(images, points, ncols, shrinkage=0.2):
  """
  Function to plot images and their lables:
  INPUT:
      images: numpy array with shape (N, d, d, c) dtype=float
      points: numpy array with shape (N,), dtype=float
      ncols: number of columns in the resulting image grid
      shrinage: how much each image to be shrinked for display
  """
  
  nindex, height, width, intensity = images.shape
  nrows = nindex//ncols
  print(f"Number of rows: {nrows}, number of cols: {ncols}")
  
  fig_width = int(width*ncols*shrinkage)
  fig_height = int(height*nrows*shrinkage)

  fig, axes = plt.subplots(nrows, ncols, 
                          figsize=(fig_width, fig_height))
  print(f"Figure width: {fig_width}, height: {fig_height}")
  axes = axes.flatten()
  
  for k in range(nindex):
    img = images[k]
    img = array_to_img(img)
    ax = axes[k]
    ax.imshow(img, cmap="Greys_r")
    pnt_x = [points[k][2*j] for j in range(15)]
    pnt_y = [points[k][2*j+1] for j in range(15)]
    ax.scatter(pnt_x,pnt_y,s=50,c='r')
    ax.set_axis_off()

  plt.tight_layout()
  plt.show()

In [None]:
plot_images(X_aug[:12], y_aug[:12], 6, shrinkage=0.1)

# Image Augmentation with cv2

## Define helper functions

In [None]:
def aug_rotation(X, y, rotation_angles=[15]):
    """
    Function to rotate images (X) and points (y) in the same time.
    INPUT:
    	X: numpy array with shape (n, d, d, c)
    	y: points to plot with shape (n, m)
        rotation_angles: a list of angles to rotate
    OUTPUT:
        augmented images with shape (n, d, d, c)
        augmented points with shape (n, m)

    """

    rotated_images = []
    rotated_keypoints = []
    
    size = X.shape[1] # suppose h == w
    
    center = (int(size/2), int(size/2))
    
    for angle in rotation_angles:
        for angle in [angle, -angle]:
            rot = cv2.getRotationMatrix2D(center, angle, 1.)
            angle_rad = -angle*pi/180.
            
            for image in X:
                rotated_image = cv2.warpAffine(image.reshape(size,size), rot, (size, size), flags=cv2.INTER_CUBIC)
                rotated_images.append(rotated_image)
                
            for keypoint in y:
                rotated_keypoint = keypoint - int(size/2)
                
                for idx in range(0, len(rotated_keypoint), 2):
                    rotated_keypoint[idx] = rotated_keypoint[idx]*cos(angle_rad)-rotated_keypoint[idx+1]*sin(angle_rad)
                    rotated_keypoint[idx+1] = rotated_keypoint[idx]*sin(angle_rad)+rotated_keypoint[idx+1]*cos(angle_rad)
                rotated_keypoint += size/2   
                rotated_keypoints.append(rotated_keypoint)
    
    return np.reshape(rotated_images,(-1,size,size,1)), np.array(rotated_keypoints)

In [None]:
def aug_shift(X, y, pixel_shifts=[15]):
    """
    Function to shift images (X) and points (y) in the same time.
    INPUT:
    	X: numpy array with shape (n, d, d, c)
    	y: points to plot with shape (n, m)
        pixel_shifts: a list of values indicating horizontal & vertical shift amount in pixels
    OUTPUT:
        augmented images with shape (n, d, d, c)
        augmented points with shape (n, m)
    """
    size = X.shape[1]
    shifted_images = []
    shifted_keypoints = []
    for shift in pixel_shifts:    
        for (shift_x,shift_y) in [(-shift,-shift),(-shift,shift),(shift,-shift),(shift,shift)]:
            sh = np.float32([[1,0,shift_x],[0,1,shift_y]])
            
            for image, keypoint in zip(X, y):
                shifted_image = cv2.warpAffine(image, sh, (size,size), flags=cv2.INTER_CUBIC)
                shifted_keypoint = np.array([(point+shift_x) if idx%2==0 else (point+shift_y) for idx, point in enumerate(keypoint)])
                
                if np.all(0.0<shifted_keypoint) and np.all(shifted_keypoint<size):
                    shifted_images.append(shifted_image.reshape(size,size,1))
                    shifted_keypoints.append(shifted_keypoint)
    shifted_keypoints = np.clip(shifted_keypoints,0.0,size)
    return np.array(shifted_images), np.array(shifted_keypoints)

In [None]:
def aug_brightness(X, y, brightness_range=[0.6,1.2]):
    """
    Function to adjust the brightness of images (X) 
    INPUT:
    	X: numpy array with shape (n, d, d, c)
    	y: points to plot with shape (n, m)
        brightness_range: a list of two values to decrease/increase brightness
    OUTPUT:
        augmented images with shape (n, d, d, c)
        augmented points with shape (n, m)
    Note:
        Brightness is pre-defined as either 1.2 times or 0.6 times
    """
    altered_brightness_images = []
    inc_brightness_images = np.clip(X*brightness_range[1], 0, 255)    
    dec_brightness_images = np.clip(X*brightness_range[0], 0, 255)    
    altered_brightness_images.extend(inc_brightness_images)
    altered_brightness_images.extend(dec_brightness_images)
    return np.array(altered_brightness_images), np.concatenate((y, y))

In [None]:
def aug_noise(X, y, noise=0.008):
    """
    Function to add noises images (X) 
    INPUT:
    	X: numpy array with shape (n, d, d, c)
    	y: points to plot with shape (n, m)
        noise: a value times a random number of normal distribution as noise impact
    OUTPUT:
        augmented images with shape (n, d, d, c)
        augmented points with shape (n, m)
    """
    noisy_images = []
    size = X.shape[1]
    for image in X:
        noisy_image = cv2.add(image, noise*np.random.randn(size,size,1))    
        noisy_images.append(noisy_image.reshape(size,size,1))
    return np.array(noisy_images), y

## Apply functions

In [None]:
X_rot, y_rot = aug_rotation(X_aug,y_aug)
print(X_rot.shape)
plot_images(X_rot[:12], y_rot[:12], 6, shrinkage=0.1)

In [None]:
X_shift, y_shift = aug_shift(X_aug,y_aug)
print(X_shift.shape)
plot_images(X_shift[:12], y_shift[:12], 6, shrinkage=0.1)

In [None]:
X_brt, y_brt = aug_brightness(X_aug,y_aug)
print(X_brt.shape)
plot_images(X_brt[:12], y_brt[:12], 6, shrinkage=0.1)

In [None]:
X_noise, y_noise = aug_noise(X_aug,y_aug)
print(X_noise.shape)
plot_images(X_noise[:12], y_noise[:12], 6, shrinkage=0.1)

# Generate final training data

In [None]:
# First check all shapes
print(X_noise.shape, y_noise.shape)
print(X_brt.shape, y_brt.shape)
print(X_shift.shape, y_shift.shape)
print(X_rot.shape, y_rot.shape)

In [None]:
X = np.concatenate((X_fill, X_noise, X_brt, X_shift, X_rot))
y = np.concatenate((y_fill, y_noise, y_brt, y_shift, y_rot))

print(X.shape, y.shape)
print(X.min(), "----",X.max())
print(y.min(), "----", y.max())

# Build the model

In [None]:
model = tf.keras.models.Sequential()
pretrained_model = ResNet50(input_shape=(96,96,3), include_top=False, weights='imagenet')
pretrained_model.trainable = True

model.add(Conv2D(3, (1,1), padding='same', input_shape=(96,96,1)))
model.add(LeakyReLU(alpha=0.1))
model.add(pretrained_model)
model.add(GlobalAveragePooling2D())
model.add(Dropout(0.1))
model.add(Dense(30))
model.summary()

In [None]:
# Define callbacks

# Early stopping if no improvement
early_stop = EarlyStopping(monitor = 'loss', 
                           patience = 30, 
                           mode = 'min',
                           baseline=None)

# Reduce learning rate when a metric has stopped improving.
reduce_lr = ReduceLROnPlateau(monitor = 'val_loss', 
                              factor = 0.7,
                              patience = 5, 
                              min_lr = 1e-15,
                              mode = 'min', 
                              verbose = 1)

# Compile the model

model.compile(optimizer='adam', loss='mean_squared_error', metrics=['acc'])

# Train the model

In [None]:
history=model.fit(x=X,
                  y=y,
                  epochs=200,
                  batch_size=64,
                  validation_split=0.15,
                  callbacks=[early_stop,reduce_lr])

In [None]:
# Save the model

model.save("FKD_KB1V1_E_Result-Model-ResNet50.h5")

with open('FKD_KB1V1_E_Result-History-ResNet50.pkl', 'wb') as f:
    pickle.dump(history.history, f)

# Plot Acc and Loss

In [None]:
def plot_acc_loss(history):
    """
    Function to plot acc and loss of the training results:
    INPUT:
        history: object of the model fitting results
    OUT:
        A plot of training and validation acc per epoch
        A plot of training and validation loss per opoch
    """
    # Plot loss and acc 

    acc = history.history['acc']
    val_acc = history.history['val_acc']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    epochs = range(len(acc))

    #------------------------------------------------
    # Plot training and validation acc per epoch
    #------------------------------------------------
    plt.plot(epochs, acc, 'r', label='Training acc')
    plt.plot(epochs, val_acc, 'b', label='Validation acc')
    plt.title('Training and validation acc')
    plt.legend()
    plt.figure()
    print("")

    #------------------------------------------------
    # Plot training and validation loss per epoch
    #------------------------------------------------
    plt.plot(epochs, loss, 'r', label='Training Loss')
    plt.plot(epochs, val_loss, 'b', label='Validation Loss')
    plt.title('Training and validation loss')
    plt.legend()
    plt.show()

In [None]:
plot_acc_loss(history)

# Predict the Test Dataet

In [None]:
timag = []
for i in range(0,1783):
    timg = test_data['Image'][i].split(' ')
    timag.append(timg)

timage_list = np.array(timag,dtype = 'float')
X_test = timage_list.reshape(-1,96,96,1) 
X_test = X_test/255.0
print(X_test.shape)

In [None]:
y_test = model.predict(X_test)
print(y_test.min(), y_test.max())

In [None]:
# Plot sample test images
plot_images(X_test[:12], y_test[:12], 6, shrinkage=0.1)

# Prepare Submission File

In [None]:
feature_names = list(lookid_data['FeatureName'])
image_ids = list(lookid_data['ImageId']-1)
row_ids = list(lookid_data['RowId'])

feature_list = []
for feature in feature_names:
    feature_list.append(feature_names.index(feature))
    
predictions = []
for x,y in zip(image_ids, feature_list):
    predictions.append(y_test[x][y])
    
row_ids = pd.Series(row_ids, name = 'RowId')
locations = pd.Series(predictions, name = 'Location')
locations = locations.clip(0.0,96.0)
submission_result = pd.concat([row_ids,locations],axis = 1)
submission_result.to_csv('FKD_KB1V1_E_Result-Submission.csv',index = False)