Import libraries

In [None]:
# Import libraries
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
import keras
import scipy.misc 
import tensorflow
from keras.utils import np_utils
from keras.preprocessing.image import ImageDataGenerator
from math import sqrt
from IPython.display import display
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input, Dense, Flatten, Dropout, BatchNormalization
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.optimizers import Adam, SGD
from keras.regularizers import l1, l2
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
import itertools
import h5py
import cv2
from google.colab import files

Set seeding to ensure consistent result

In [None]:
# produce stable results
from numpy.random import seed 
seed(1)
from tensorflow import random
tensorflow.random.set_seed(1)

Define paths


In [None]:
# define dataset path 
dataset_path = "/content/drive/My Drive/Colab Notebooks/FER2013/fer2013.csv"
# define weight saving path
weight_path="/content/drive/My Drive/Colab Notebooks/FER2013/model02/weights_min_loss.hdf5"
# define model saving path
model_path = "/content/drive/My Drive/Colab Notebooks/FER2013/model02/FER2013.hdf5"

Read dataset and divide to train/validation/test set

In [None]:
# read dataset
data = pd.read_csv(dataset_path)

# splitting for training set
train_data = data[(data.Usage == "Training")]
# splitting for validation set
val_data = data[(data.Usage == "PublicTest")]
# splitting for test set
test_data = data[(data.Usage == "PrivateTest")]

#convert to 2-Dimensional ndarray n(48*48)
X_train = np.array(list(map(str.split, train_data.pixels)), np.uint8)
X_val = np.array(list(map(str.split, val_data.pixels)), np.float32)
X_test = np.array(list(map(str.split, test_data.pixels)), np.float32)

Histogram Equalization

In [None]:
#HE processing on training set
def he_preprocessing(img: np.ndarray):
  list = []
  for rows in img:
    he_img = cv2.equalizeHist(rows)
    list.append(he_img)
  
  img_set = np.array(list)

  return img_set

X_train = clahe_preprocessing(X_train)
#convert to float32 for data augmentation
X_train = np.array(X_train, dtype=np.float32)

OR Contrast Limited Adaptive Histogram Equalization (CLAHE)


In [None]:
#CLAHE processing on training set
def clahe_preprocessing(img: np.ndarray):
  list = []
  for rows in img:
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(width,height))
    clahe_img = clahe.apply(rows)
    list.append(clahe_img)
  
  img_set = np.array(list)

  return img_set

X_train = clahe_preprocessing(X_train)

# convert to float32 for data augmentation
X_train = np.array(X_train, dtype=np.float32)

Data Augmentation and Min-max Normalization

In [None]:
# reshape data to 4-dimensional array (n,48,48,1) where n = number of samples
X_train = X_train.reshape(X_train.shape[0], 48, 48, 1)
X_val = X_val.reshape(X_val.shape[0], 48, 48, 1)
X_test = X_test.reshape(X_test.shape[0], 48, 48, 1)

# get number of samples in each subset
num_train = X_train.shape[0]
num_val = X_val.shape[0]
num_test = X_test.shape[0]

# reshaping dataset
y_train = train_data.emotion
y_train = np_utils.to_categorical(y_train, num_classes)
y_val = val_data.emotion
y_val = np_utils.to_categorical(y_val, num_classes)
y_test = test_data.emotion
y_test = np_utils.to_categorical(y_test, num_classes)

# Data augmentation for training data 
data_gen = ImageDataGenerator(
    rescale=1./255, # min-max normalization
    fill_mode = 'nearest',
    rotation_range=10,  # randomly rotate images in the range (degrees, 0 to 180)
    width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
    height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
    horizontal_flip=True  # randomly flip images
    )

# Data augmentation for test data (only min-max normalization)
test_gen = ImageDataGenerator( 
    rescale=1./255 # min-max normalization
    )

Machine Learning Variables

In [None]:
# define batch size
batch_size = 128
# define number of epoch
num_epochs = 200
# define number of classes
num_classes = 7
# define emotion labels
emotion_labels = ["Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]
classes = np.array(("Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"))
# define input shape
width, height = 48, 48
input_shape = (width, height, 1) # 1 indicates single channel = greyscale



Define Shallow CNN Architectures

In [None]:
def FER_model(input_shape = (48,48,1)):
    #first input model
    visible = Input(shape = input_shape, name='input')
    

    #the first block
    conv1_1 = Conv2D(64, kernel_size=3, activation='relu', padding='same', name = 'conv1_1')(visible)
    conv1_1 = BatchNormalization()(conv1_1)
    conv1_2 = Conv2D(64, kernel_size=3, activation='relu', padding='same', name = 'conv1_2')(conv1_1)
    conv1_2 = BatchNormalization()(conv1_2)
    pool1_1 = MaxPooling2D(pool_size=(2,2), name = 'pool1_1')(conv1_2)
    drop1_1 = Dropout(0.3, name = 'drop1_1')(pool1_1)

    #the 2-nd block
    conv2_1 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_1')(drop1_1)
    conv2_1 = BatchNormalization()(conv2_1)
    conv2_2 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_2')(conv2_1)
    conv2_2 = BatchNormalization()(conv2_2)
    conv2_3 = Conv2D(128, kernel_size=3, activation='relu', padding='same', name = 'conv2_3')(conv2_2)
    conv2_2 = BatchNormalization()(conv2_3)
    pool2_1 = MaxPooling2D(pool_size=(2,2), name = 'pool2_1')(conv2_3)
    drop2_1 = Dropout(0.3, name = 'drop2_1')(pool2_1)

    #the 3-rd block
    conv3_1 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_1')(drop2_1)
    conv3_1 = BatchNormalization()(conv3_1)
    conv3_2 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_2')(conv3_1)
    conv3_2 = BatchNormalization()(conv3_2)
    conv3_3 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_3')(conv3_2)
    conv3_3 = BatchNormalization()(conv3_3)
    conv3_4 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv3_4')(conv3_3)
    conv3_4 = BatchNormalization()(conv3_4)
    pool3_1 = MaxPooling2D(pool_size=(2,2), name = 'pool3_1')(conv3_4)
    drop3_1 = Dropout(0.3, name = 'drop3_1')(pool3_1)

    #the 4-th block
    conv4_1 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_1')(drop3_1)
    conv4_1 = BatchNormalization()(conv4_1)
    conv4_2 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_2')(conv4_1)
    conv4_2 = BatchNormalization()(conv4_2)
    conv4_3 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_3')(conv4_2)
    conv4_3 = BatchNormalization()(conv4_3)
    conv4_4 = Conv2D(256, kernel_size=3, activation='relu', padding='same', name = 'conv4_4')(conv4_3)
    conv4_4 = BatchNormalization()(conv4_4)
    pool4_1 = MaxPooling2D(pool_size=(2,2), name = 'pool4_1')(conv4_4)
    drop4_1 = Dropout(0.3, name = 'drop4_1')(pool4_1)

    #the 5-th block
    conv5_1 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_1')(drop4_1)
    conv5_1 = BatchNormalization()(conv5_1)
    conv5_2 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_2')(conv5_1)
    conv5_2 = BatchNormalization()(conv5_2)
    conv5_3 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_3')(conv5_2)
    conv5_3 = BatchNormalization()(conv5_3)
    conv5_4 = Conv2D(512, kernel_size=3, activation='relu', padding='same', name = 'conv5_4')(conv5_3)
    conv5_3 = BatchNormalization()(conv5_3)
    pool5_1 = MaxPooling2D(pool_size=(2,2), name = 'pool5_1')(conv5_4)
    drop5_1 = Dropout(0.3, name = 'drop5_1')(pool5_1)

    #Flatten and output
    flatten = Flatten(name = 'flatten')(drop5_1)
    output = Dense(num_classes, activation='softmax', name = 'output')(flatten)

    # create model 
    model = Model(inputs =visible, outputs = output)
    # summary layers
    print(model.summary())
    
    return model

Define Xception

In [None]:
def entry_flow(inputs) :
    
    x = Conv2D(32, 3, strides = 2, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    x = Conv2D(64,3,padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    previous_block_activation = x
    
    for size in [64, 128, 256] :
    
        x = Activation('relu')(x)
        x = SeparableConv2D(size, 3, padding='same')(x)
        x = BatchNormalization()(x)
    
        x = Activation('relu')(x)
        x = SeparableConv2D(size, 3, padding='same')(x)
        x = BatchNormalization()(x)
        
        x = MaxPooling2D(3, strides=2, padding='same')(x)
        
        residual = Conv2D(size, 1, strides=2, padding='same')(previous_block_activation)
        
        x = tensorflow.keras.layers.Add()([x, residual])
        previous_block_activation = x
    
    return x

def middle_flow(x, num_blocks=8) :
    
    previous_block_activation = x
    
    for _ in range(num_blocks) :
    
        x = Activation('relu')(x)
        x = SeparableConv2D(256, 3, padding='same')(x)
        x = BatchNormalization()(x)
    
        x = Activation('relu')(x)
        x = SeparableConv2D(256, 3, padding='same')(x)
        x = BatchNormalization()(x)
        
        x = Activation('relu')(x)
        x = SeparableConv2D(256, 3, padding='same')(x)
        x = BatchNormalization()(x)
        
        x = tensorflow.keras.layers.Add()([x, previous_block_activation])
        previous_block_activation = x
    
    return x

def exit_flow(x) :
    
    previous_block_activation = x
    
    x = Activation('relu')(x)
    x = SeparableConv2D(256, 3, padding='same')(x)
    x = BatchNormalization()(x)
    
    x = Activation('relu')(x)
    x = SeparableConv2D(1024, 3, padding='same')(x)
    x = BatchNormalization()(x)
    
    x = MaxPooling2D(3, strides=2, padding='same')(x)
    
    residual = Conv2D(1024, 1, strides=2, padding='same')(previous_block_activation)
    x = tensorflow.keras.layers.Add()([x, residual])
      
    x = Activation('relu')(x)
    x = SeparableConv2D(728, 3, padding='same')(x)
    x = BatchNormalization()(x)
    
    x = Activation('relu')(x)
    x = SeparableConv2D(1024, 3, padding='same')(x)
    x = BatchNormalization()(x)
    
    x = GlobalAveragePooling2D()(x)

    x = Dense(7, activation='linear', activity_regularizer=l2(0.001))(x)
    
    return x

outputs = exit_flow(middle_flow(entry_flow(inputs)))
xception = Model(inputs, outputs)
xception.summary()

Define MobileNetV2 

In [None]:
model = keras.applications.MobileNetV2(weights=None, include_top=False, input_shape=input_shape)
x = model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.7)(x)
predictions = Dense(num_classes, activation = 'softmax')(x)
model = Model(inputs = model.input, outputs = predictions)
opt = Adam(lr=0.001, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-7, decay = 1e-6)
model.compile(loss="categorical_crossentropy", optimizer = opt, metrics = ["acc"])
model.summary()

Define ResNet50

In [None]:
model = keras.applications.resnet.ResNet50(weights=None, include_top=False, input_shape=input_shape)
x = model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(num_classes, activation = 'softmax')(x)
model = Model(inputs = model.input, outputs = predictions)
opt = Adam(lr=0.001, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-7, decay = 1e-6)
model.compile(loss="categorical_crossentropy", optimizer = opt, metrics = ["acc"])
model.summary()

Define InceptionV3

In [None]:
model = keras.applications.inception_v3.InceptionV3(weights=None, include_top=False, input_shape=inputs)
x = model.output
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')
predictions = Dense(num_classes, activation = 'softmax')(x)
model = Model(inputs = model.input, outputs = predictions)
opt = Adam(lr=0.001, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-7, decay = 1e-6)
model.compile(loss="categorical_crossentropy", optimizer = opt, metrics = ["acc"])
model.summary()

Define EfficientNetB0

In [None]:
import efficientnet.keras as efn 

model = efn.EfficientNetB0(weights=None, include_top=False, input_shape=input_shape)
x = model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.7)(x)
predictions = Dense(num_classes, activation = 'softmax')(x)
model = Model(inputs = model.input, outputs = predictions)
opt = Adam(lr=0.001, beta_1 = 0.9, beta_2 = 0.999, epsilon = 1e-7, decay = 1e-6)
model.compile(loss="categorical_crossentropy", optimizer = opt, metrics = ["acc"])
model.summary()

Training the model

In [None]:
# takes data & label arrays, generate batches of augmented data
train_flow = data_gen.flow(X_train, y_train, batch_size=batch_size) 
val_flow = test_gen.flow(X_val, y_val, batch_size=batch_size) 
test_flow = test_gen.flow(X_test, y_test, batch_size=batch_size) 

# compile CNN model with the optimizer, loss function, and metrics as accuracy
model = FER_model()
opt = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, decay=1e-6)
model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['acc']) 

# record model history
from keras.callbacks import ModelCheckpoint
checkpoint = ModelCheckpoint(weight_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]

# start the training
history = model.fit_generator(train_flow, 
                    steps_per_epoch=len(X_train) / batch_size, 
                    epochs=num_epochs,  
                    verbose=2,  
                    callbacks=callbacks_list,
                    validation_data=val_flow,  
                    validation_steps=len(X_val) / batch_size)

# record training and validation loss
train_loss=history.history['loss']
val_loss=history.history['val_loss']
train_acc=history.history['acc']
val_acc=history.history['val_acc']

Print losses and plot loss graphs

In [None]:
# get epochs
epochs = range(len(train_acc))

# plot train loss
plt.plot(epochs,train_loss,'r', label='train_loss')
plt.plot(epochs,val_loss,'b', label='val_loss')
plt.title('train_loss vs val_loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.figure()
plt.savefig('loss.png')

# plot validation loss
plt.plot(epochs,train_acc,'r', label='train_acc')
plt.plot(epochs,val_acc,'b', label='val_acc')
plt.title('train_acc vs val_acc')
plt.xlabel('epoch')
plt.ylabel('acc')
plt.legend()
plt.figure()
plt.savefig('accuracy.png')

loss = model.evaluate_generator(test_flow, steps=len(X_test) / batch_size) 
print("Test Loss " + str(loss[0]))
print("Test Acc: " + str(loss[1]))

Save trained model

In [None]:
# save trained shallow CNN model
model.save(model_path)

Plot confusion matrix

In [None]:
def plot_confusion_matrix(y_test, y_pred, classes,
                          normalize=False,
                          title='Unnormalized confusion matrix',
                          cmap=plt.cm.Blues):
    cm = confusion_matrix(y_test, y_pred)
    
    if normalize:
        cm = np.round(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], 2)
        
    np.set_printoptions(precision=2)
        
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    thresh = cm.min() + (cm.max() - cm.min()) / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True expression')
    plt.xlabel('Predicted expression')
    plt.show()
    plt.savefig('cm.png')

y_pred_ = model.predict(X_test/255., verbose=1)
y_pred = np.argmax(y_pred_, axis=1)
t_te = np.argmax(y_test, axis=1)

fig = plot_confusion_matrix(y_test=t_te, y_pred=y_pred,
                      classes=classes,
                      normalize=True,
                      cmap=plt.cm.Greys,
                      title='Average accuracy: ' + str(np.sum(y_pred == t_te)/len(t_te)) + '\n')
