## Imports

In [None]:
import os
from google.colab import drive

MOUNT_POINT = '/content/gdrive'
DATA_DIR = os.path.join(MOUNT_POINT, 'My Drive', 'Colab Notebooks', 'Checkpoints')
drive.mount(MOUNT_POINT)

Mounted at /content/gdrive


In [None]:
import math
import numpy as np
import matplotlib.pyplot as plt

from keras.models import Model, load_model
from keras.optimizers import Adam
from keras.regularizers import l2
from keras.layers import Input, Add, Flatten, Activation
from keras.layers import Dense, BatchNormalization
from keras.layers import Conv2D, AveragePooling2D
from keras.callbacks import ModelCheckpoint
from keras.callbacks import ReduceLROnPlateau, CSVLogger
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import plot_model

## Hyperparams

In [None]:
SEED = 1
np.random.seed(SEED)

TRAIN_DATA_PERCENT = 90
DEPTH = 56
HEIGHT, WIDTH, CHANNELS = 48, 48, 1
CLASSES = 7
BATCH_SIZE = 64
EPOCHS = 30
LR_INIT = 0.0004
AUGMENTATION = True
MODEL_TYPE = "FER_ResNet{}_RoI".format(DEPTH)

LOAD = False
LAST_EPOCH = 0

LOAD_PATH = os.path.join(DATA_DIR, "FER_ResNet56_RoI_002-2.22.hdf5")
CSV_PATH = os.path.join(DATA_DIR, "FER_ResNet56_RoI.csv")
SAVE_PATH = os.path.join(DATA_DIR, "FER_ResNet56_RoI_{epoch:03d}-{val_loss:.2f}.hdf5")

OPTIMIZER = Adam(learning_rate=LR_INIT)
LR_REDUCER = ReduceLROnPlateau(
  monitor='val_loss',
  factor=0.5,
  patience=6,
  min_delta=0.0001,
  verbose=1
)

checkpoint = ModelCheckpoint(
  filepath=SAVE_PATH,
  monitor='val_loss',
  save_best_only=False,
  save_weights_only=False,
  mode='auto',
  verbose=1
)
csv_logger = CSVLogger(CSV_PATH, append=True)
callbacks = [checkpoint, csv_logger, LR_REDUCER]

## Dataset

In [None]:
!gdown 1t1GH1o5t9WgTE1ODB4QfpzhmuKImFY1w
!gdown 1LldCqbvgwQSt2uBrU9L33Oq5SmnQxq_Z
!gdown 1eUKhOrl_jD44oLzywE2Q6ei9uWWvc1aW

dataset_images = np.load('fer-2013-images.npy')
dataset_labels = np.load('fer-2013-labels.npy')
dataset_landmarks = np.load('fer-2013-dlib-landmarks.npz')['landmarks']

Downloading...
From: https://drive.google.com/uc?id=1t1GH1o5t9WgTE1ODB4QfpzhmuKImFY1w
To: /content/fer-2013-images.npy
100% 82.7M/82.7M [00:00<00:00, 226MB/s]
Downloading...
From: https://drive.google.com/uc?id=1LldCqbvgwQSt2uBrU9L33Oq5SmnQxq_Z
To: /content/fer-2013-labels.npy
100% 1.00M/1.00M [00:00<00:00, 137MB/s]


In [None]:
# Create train and test data and labels
train_data_count = math.floor(len(dataset_images) * TRAIN_DATA_PERCENT / 100)
test_data_count = len(dataset_images) - train_data_count

# Shuffle all data in dataset. 
indexes = np.arange(len(dataset_images))
np.random.shuffle(indexes)
dataset_images = dataset_images[indexes]
dataset_labels = dataset_labels[indexes]
dataset_landmarks = dataset_landmarks[indexes]

# Split train and test data
X_train = dataset_images[:train_data_count].astype('float32')
X_train_landmarks = dataset_landmarks[:train_data_count].astype('float32')
Y_train = dataset_labels[:train_data_count]
X_test = dataset_images[train_data_count:].astype('float32')
X_test_landmarks = dataset_landmarks[train_data_count:].astype('float32')
Y_test = dataset_labels[train_data_count:]

print(X_train.shape)
print(X_train_landmarks.shape)
print(Y_train.shape)
print(X_test.shape)
print(X_test_landmarks.shape)
print(Y_test.shape)

In [None]:
index = 3
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(X_train[index].squeeze(axis=2))
ax2.imshow(X_train_landmarks[index].squeeze(axis=2))

In [None]:
def generate_generator_multiple(
  gen_args,
  in1,
  in2,
  labels,
  batch_size,
  seed,
):
  generator = ImageDataGenerator(**gen_args)
  genX1 = generator.flow(
    x=in1,
    y=labels,
    batch_size=batch_size,
    shuffle=True,
    seed=seed,
  )
  genX2 = generator.flow(
    x=in2,
    y=labels,
    batch_size=batch_size,
    shuffle=True,
    seed=seed,
  )
  while True:
    X1i = genX1.next()
    X2i = genX2.next()
    yield [X1i[0], X2i[0]], X2i[1]

In [None]:
## Train Data ##
if not AUGMENTATION:
  train_args = dict(rescale=1./255)

else:
  train_args = dict(
    # set rescaling factor (applied before any other transformation)
    rescale=1./255,
    # set range for random zoom
    zoom_range=0.3,
    # randomly flip images
    horizontal_flip=True,
    # randomly flip images
    vertical_flip=False,
    # set input mean to 0 over the dataset
    featurewise_center=False,
    # set each sample mean to 0
    samplewise_center=False,
    # divide inputs by std of dataset
    featurewise_std_normalization=False,
    # divide each input by its std
    samplewise_std_normalization=False,
    # apply ZCA whitening
    zca_whitening=False,
    # epsilon for ZCA whitening
    zca_epsilon=1e-06,
    # randomly rotate images in the range (deg 0 to 180)
    rotation_range=0,
    # randomly shift images horizontally
    width_shift_range=0.1,
    # randomly shift images vertically
    height_shift_range=0.1,
    # set range for random shear
    shear_range=0.,
    # set range for random channel shifts
    channel_shift_range=0.,
    # set mode for filling points outside the input boundaries
    fill_mode='nearest',
    # value used for fill_mode = "constant"
    cval=0.,
    # set function that will be applied on each input
    preprocessing_function=None,
    # image data format, either "channels_first" or "channels_last"
    data_format=None,
    # fraction of images reserved for validation (strictly between 0 and 1)
    validation_split=0.0
  )

train_set = generate_generator_multiple(
  gen_args=train_args,
  in1=X_train,
  in2=X_train_landmarks,
  labels=Y_train,
  batch_size=BATCH_SIZE,
  seed=SEED,
) 

## Test Data ##
test_args = dict(rescale=1./255)
test_set = generate_generator_multiple(
  gen_args=test_args,
  in1=X_test,
  in2=X_test_landmarks,
  labels=Y_test,
  batch_size=BATCH_SIZE,
  seed=SEED,
)

## Model Definition

In [None]:
def resnet_layer(
  inputs,
  num_filters=16,
  kernel_size=3,
  strides=1,
  activation='relu',
  batch_normalization=True,
  conv_first=True
):
  """
  2D Convolution-Batch Normalization-Activation stack builder

  # Arguments
    inputs (tensor): input tensor from input image or previous layer
    num_filters (int): Conv2D number of filters
    kernel_size (int): Conv2D square kernel dimensions
    strides (int): Conv2D square stride dimensions
    activation (string): activation name
    batch_normalization (bool): whether to include batch normalization
    conv_first (bool): conv-bn-activation (T) or bn-activation-conv (F)
    
  # Returns
    x (tensor): tensor as input to the next layer
  """

  conv = Conv2D(
    num_filters,
    kernel_size,
    strides,
    padding='same',
    kernel_initializer='he_normal',
    kernel_regularizer=l2(1e-4)
  )

  x = inputs

  if conv_first:
    x = conv(x)
    if batch_normalization:
      x = BatchNormalization()(x)
    if activation is not None:
      x = Activation(activation)(x)
  else:
    if batch_normalization:
      x = BatchNormalization()(x)
    if activation is not None:
      x = Activation(activation)(x)
    x = conv(x)

  return x

In [None]:
# ResNet V2 (with bottleneck blocks) model for small datasets.
def my_model(input1, input2, depth, num_classes):
  """
  ResNet Version 2 Model builder
  Stacks of (1 x 1)-(3 x 3)-(1 x 1) BN-ReLU-Conv2D
  or also known as bottleneck layer
  First shortcut connection per layer is 1 x 1 Conv2D.
  Second and onwards shortcut connection is identity.
  At the beginning of each stage, the feature map size is halved (down-sampled)
  by a convolutional layer with strides=2, while the number of filter maps is
  doubled. Within each stage, the layers have the same number filters and the
  same filter map sizes.
  Feature maps sizes:
  conv1  : 32x32,  16
  stage 0: 32x32,  64
  stage 1: 16x16, 128
  stage 2:  8x8,  256

  # Arguments
    input_shape (tensor): shape of input image tensor
    depth (int): number of core convolutional layers
    num_classes (int): number of classes

  # Returns
    model (Model): Keras model instance
  """

  if (depth - 2) % 9 != 0:
    raise ValueError('depth should be 9n+2 (eg 56 or 110 in [b])')

  # Start model definition.
  num_filters_in = 16
  num_res_blocks = int((depth - 2) / 9)

  X_input1 = Input(input1)
  X_input2 = Input(input2)

  x1 = resnet_layer(
    inputs=X_input1,
    num_filters=num_filters_in,
    kernel_size=3,
    strides=1,
    activation=None,
    batch_normalization=None,
    conv_first=True
  )

  x2 = resnet_layer(
    inputs=X_input2,
    num_filters=num_filters_in,
    kernel_size=3,
    strides=1,
    activation=None,
    batch_normalization=None,
    conv_first=True
  )

  x = Add()([x1, x2])
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  num_filters_out = 0

  # Instantiate the stack of residual units
  for stage in range(3):
    for res_block in range(num_res_blocks):
      activation = 'relu'
      batch_normalization = True
      strides = 1
      if stage == 0:
        num_filters_out = num_filters_in * 4
        if res_block == 0:  # first layer and first stage
          activation = None
          batch_normalization = False
      else:
        num_filters_out = num_filters_in * 2
        if res_block == 0:  # first layer but not first stage
          strides = 2    # down-sample

      # bottleneck residual unit
      y = resnet_layer(
        inputs=x,
        num_filters=num_filters_in,
        kernel_size=1,
        strides=strides,
        activation=activation,
        batch_normalization=batch_normalization,
        conv_first=False
      )

      y = resnet_layer(
        inputs=y,
        num_filters=num_filters_in,
        conv_first=False
      )

      y = resnet_layer(
        inputs=y,
        num_filters=num_filters_out,
        kernel_size=1,
        conv_first=False
      )

      if res_block == 0:
        # linear projection residual shortcut connection to match
        # changed dims
        x = resnet_layer(
          inputs=x,
          num_filters=num_filters_out,
          kernel_size=1,
          strides=strides,
          activation=None,
          batch_normalization=False
        )

      x = Add()([x, y])

    num_filters_in = num_filters_out

  # Add classifier on top.
  # v2 has BN-ReLU before Pooling
  x = BatchNormalization()(x)
  x = Activation('relu')(x)
  x = AveragePooling2D(pool_size=8)(x)
  y = Flatten()(x)
  
  outputs = Dense(
    num_classes,
    activation='softmax',
    kernel_initializer='he_normal'
  )(y)

  # Instantiate model.
  model = Model(inputs=[X_input1, X_input2], outputs=outputs, name=MODEL_TYPE)
  return model

## Model Summary

In [None]:
if LOAD:
  model = load_model(LOAD_PATH)
else:
  model = my_model(
    input1=(HEIGHT, WIDTH, CHANNELS),
    input2=(HEIGHT, WIDTH, CHANNELS),
    depth=DEPTH,
    num_classes=CLASSES
  )

  model.compile(
    loss='categorical_crossentropy',
    optimizer=OPTIMIZER,
    metrics=['accuracy']
  )

In [None]:
model.summary()

Model: "FER_ResNet56_RoI"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 48, 48, 1)]  0           []                               
                                                                                                  
 input_2 (InputLayer)           [(None, 48, 48, 1)]  0           []                               
                                                                                                  
 conv2d (Conv2D)                (None, 48, 48, 16)   160         ['input_1[0][0]']                
                                                                                                  
 conv2d_1 (Conv2D)              (None, 48, 48, 16)   160         ['input_2[0][0]']                
                                                                                   

In [None]:
plot_model(
  model,
  to_file="./{}_structure.png".format(MODEL_TYPE),
  show_shapes=True,
  show_layer_names=True
)

## Train and Evaluate

In [None]:
steps_per_epoch = math.ceil(train_data_count / BATCH_SIZE)
validation_steps = math.ceil(test_data_count / BATCH_SIZE)

history = model.fit(
  x=train_set,
  validation_data=test_set,
  epochs=EPOCHS,
  initial_epoch=LAST_EPOCH,
  callbacks=callbacks,
  steps_per_epoch=steps_per_epoch,
  validation_steps=validation_steps
)

Epoch 1/30
Epoch 1: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_001-2.36.hdf5
Epoch 2/30
Epoch 2: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_002-2.21.hdf5
Epoch 3/30
Epoch 3: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_003-2.04.hdf5
Epoch 4/30
Epoch 4: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_004-2.16.hdf5
Epoch 5/30
Epoch 5: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_005-1.81.hdf5
Epoch 6/30
Epoch 6: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_006-1.76.hdf5
Epoch 7/30
Epoch 7: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_007-1.91.hdf5
Epoch 8/30
Epoch 8: saving model to /content/gdrive/My Drive/Colab Notebooks/Checkpoints/FER_ResNet56_RoI_008-1.58.hdf5
Epoch 9/30
Epoch 9: saving model to /con

In [None]:
train_loss, train_acc = model.evaluate(
  train_set,
  batch_size=BATCH_SIZE,
  steps=steps_per_epoch
)
test_loss, test_acc = model.evaluate(
  test_set,
  batch_size=BATCH_SIZE,
  steps=validation_steps
)
print("final train accuracy = {:.2f} , validation accuracy = {:.2f}".format(train_acc*100, test_acc*100))