# pip install

In [None]:
pip install tflearn

In [40]:
import tensorflow as tf
from tflearn import DNN
import time
import argparse
import os
import numpy as np

import tensorflow as tf 
from tflearn.layers.core import input_data, dropout, fully_connected
from tflearn.layers.conv import conv_2d, max_pool_2d
from tflearn.layers.merge_ops import merge_outputs, merge
from tflearn.layers.normalization import local_response_normalization, batch_normalization
from tflearn.layers.estimator import regression 
from tflearn.optimizers import Momentum, Adam

In [26]:
ls 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Onlylandmark'

[0m[01;34mPrivateTest[0m/  [01;34mPublicTest[0m/  [01;34mTraining[0m/


In [17]:
# ls 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Onlylandmark'
# ls 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/LANDMARKS_HOG'
# ls 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Facelandmarks_HOG_sliding_window'

# Parameters

In [41]:
class Dataset:
    name = 'Fer2013'
    train_folder = 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Onlylandmark/Training'
    validation_folder = 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Onlylandmark/PublicTest'
    test_folder = 'drive/MyDrive/AIHealthcare/Facial_expressions_model_data/Onlylandmark/PrivateTest'
    shape_predictor_path='drive/MyDrive/AIHealthcare/facial_expression_recognition_using_cnn/shape_predictor_68_face_landmarks.dat'
    trunc_trainset_to = -1  # put the number of train images to use (-1 = all images of the train set)
    trunc_validationset_to = -1
    trunc_testset_to = -1

class Network:
    model = 'B'
    input_size = 48
    output_size = 7
    activation = 'relu'
    loss = 'categorical_crossentropy'
    use_landmarks = True
    use_hog_and_landmarks = False
    use_hog_sliding_window_and_landmarks = False
    use_batchnorm_after_conv_layers = True
    use_batchnorm_after_fully_connected_layers = False

class Hyperparams:
    keep_prob = 0.956   # dropout = 1 - keep_prob
    learning_rate = 0.016
    learning_rate_decay = 0.864
    decay_step = 50
    optimizer = 'momentum'  # {'momentum', 'adam', 'rmsprop', 'adagrad', 'adadelta'}
    optimizer_param = 0.95   # momentum value for Momentum optimizer, or beta1 value for Adam

class Training:
    batch_size = 128
    epochs = 13
    snapshot_step = 500
    vizualize = True
    logs_dir = "logs"
    checkpoint_dir = "checkpoints/chk"
    best_checkpoint_path = "checkpoints/best/"
    max_checkpoints = 1
    checkpoint_frequency = 1.0 # in hours
    save_model = True
    save_model_path = "best_model/saved_model.bin"

class VideoPredictor:
    emotions = ["Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]
    print_emotions = False
    camera_source = 0
    face_detection_classifier = "lbpcascade_frontalface.xml"
    show_confidence = False
    time_to_wait_between_predictions = 0.5

class OptimizerSearchSpace:
    learning_rate = {'min': 0.00001, 'max': 0.1}
    learning_rate_decay = {'min': 0.5, 'max': 0.99}
    optimizer = ['momentum']   # ['momentum', 'adam', 'rmsprop', 'adagrad', 'adadelta']
    optimizer_param = {'min': 0.5, 'max': 0.99}
    keep_prob = {'min': 0.7, 'max': 0.99}

def make_dir(folder):
    if not os.path.exists(folder):
        os.makedirs(folder)

DATASET = Dataset()
NETWORK = Network()
TRAINING = Training()
HYPERPARAMS = Hyperparams()
VIDEO_PREDICTOR = VideoPredictor()
OPTIMIZER = OptimizerSearchSpace()

make_dir(TRAINING.logs_dir)
make_dir(TRAINING.checkpoint_dir)

# data_loader

In [42]:
def load_data(validation=False, test=False):
    
    data_dict = dict()
    validation_dict = dict()
    test_dict = dict()

    if DATASET.name == "Fer2013":

        # load train set
        data_dict['X'] = np.load(DATASET.train_folder + '/images.npy')
        data_dict['X'] = data_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
        if NETWORK.use_landmarks:
            data_dict['X2'] = np.load(DATASET.train_folder + '/landmarks.npy')
        if NETWORK.use_hog_and_landmarks:
            data_dict['X2'] = np.load(DATASET.train_folder + '/landmarks.npy')
            data_dict['X2'] = np.array([x.flatten() for x in data_dict['X2']])
            data_dict['X2'] = np.concatenate((data_dict['X2'], np.load(DATASET.train_folder + '/hog_features.npy')), axis=1)
        data_dict['Y'] = np.load(DATASET.train_folder + '/labels.npy')
        if DATASET.trunc_trainset_to > 0:
            data_dict['X'] = data_dict['X'][0:DATASET.trunc_trainset_to, :, :]
            if NETWORK.use_landmarks and NETWORK.use_hog_and_landmarks:
                data_dict['X2'] = data_dict['X2'][0:DATASET.trunc_trainset_to, :]
            elif NETWORK.use_landmarks:
                data_dict['X2'] = data_dict['X2'][0:DATASET.trunc_trainset_to, :, :]
            data_dict['Y'] = data_dict['Y'][0:DATASET.trunc_trainset_to, :]

        if validation:
            # load validation set
            validation_dict['X'] = np.load(DATASET.validation_folder + '/images.npy')
            validation_dict['X'] = validation_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
            if NETWORK.use_landmarks:
                validation_dict['X2'] = np.load(DATASET.validation_folder + '/landmarks.npy')
            if NETWORK.use_hog_and_landmarks:
                validation_dict['X2'] = np.load(DATASET.validation_folder + '/landmarks.npy')
                validation_dict['X2'] = np.array([x.flatten() for x in validation_dict['X2']])
                validation_dict['X2'] = np.concatenate((validation_dict['X2'], np.load(DATASET.validation_folder + '/hog_features.npy')), axis=1)
            validation_dict['Y'] = np.load(DATASET.validation_folder + '/labels.npy')
            if DATASET.trunc_validationset_to > 0:
                validation_dict['X'] = validation_dict['X'][0:DATASET.trunc_validationset_to, :, :]
                if NETWORK.use_landmarks and NETWORK.use_hog_and_landmarks:
                    validation_dict['X2'] = validation_dict['X2'][0:DATASET.trunc_validationset_to, :]
                elif NETWORK.use_landmarks:
                    validation_dict['X2'] = validation_dict['X2'][0:DATASET.trunc_validationset_to, :, :]
                validation_dict['Y'] = validation_dict['Y'][0:DATASET.trunc_validationset_to, :]
        
        if test:
            # load test set
            test_dict['X'] = np.load(DATASET.test_folder + '/images.npy')
            test_dict['X'] = test_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
            if NETWORK.use_landmarks:
                test_dict['X2'] = np.load(DATASET.test_folder + '/landmarks.npy')
            if NETWORK.use_hog_and_landmarks:
                test_dict['X2'] = np.load(DATASET.test_folder + '/landmarks.npy')
                test_dict['X2'] = np.array([x.flatten() for x in test_dict['X2']])
                test_dict['X2'] = np.concatenate((test_dict['X2'], np.load(DATASET.test_folder + '/hog_features.npy')), axis=1)
            test_dict['Y'] = np.load(DATASET.test_folder + '/labels.npy')
            if DATASET.trunc_testset_to > 0:
                test_dict['X'] = test_dict['X'][0:DATASET.trunc_testset_to, :, :]
                if NETWORK.use_landmarks and NETWORK.use_hog_and_landmarks:
                    test_dict['X2'] = test_dict['X2'][0:DATASET.trunc_testset_to, :]
                elif NETWORK.use_landmarks:
                    test_dict['X2'] = test_dict['X2'][0:DATASET.trunc_testset_to, :, :]
                test_dict['Y'] = test_dict['Y'][0:DATASET.trunc_testset_to, :]

        if not validation and not test:
            return data_dict
        elif not test:
            return data_dict, validation_dict
        else: 
            return data_dict, validation_dict, test_dict
    else:
        print( "Unknown dataset")
        exit()

## model

In [43]:
def build_model(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
    learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob,
    learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step):

    if NETWORK.model == 'A':
        return build_modelA(optimizer, optimizer_param, learning_rate, keep_prob, learning_rate_decay, decay_step)
    elif NETWORK.model == 'B':
        return build_modelB(optimizer, optimizer_param, learning_rate, keep_prob, learning_rate_decay, decay_step)
    else:
        print( "ERROR: no model " + str(NETWORK.model))
        exit()

def build_modelB(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
    learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob,
    learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step):

    images_network = input_data(shape=[None, NETWORK.input_size, NETWORK.input_size, 1], name='input1')
    images_network = conv_2d(images_network, 64, 3, activation=NETWORK.activation)
    #images_network = local_response_normalization(images_network)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 128, 3, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 256, 3, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 4096, activation=NETWORK.activation)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 1024, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_fully_connected_layers:
        images_network = batch_normalization(images_network)

    if NETWORK.use_landmarks or NETWORK.use_hog_and_landmarks:
        if NETWORK.use_hog_sliding_window_and_landmarks:
            landmarks_network = input_data(shape=[None, 2728], name='input2')
        elif NETWORK.use_hog_and_landmarks:
            landmarks_network = input_data(shape=[None, 208], name='input2')
        else:
            landmarks_network = input_data(shape=[None, 68, 2], name='input2')
        landmarks_network = fully_connected(landmarks_network, 1024, activation=NETWORK.activation)
        if NETWORK.use_batchnorm_after_fully_connected_layers:
            landmarks_network = batch_normalization(landmarks_network)
        landmarks_network = fully_connected(landmarks_network, 128, activation=NETWORK.activation)
        if NETWORK.use_batchnorm_after_fully_connected_layers:
            landmarks_network = batch_normalization(landmarks_network)
        images_network = fully_connected(images_network, 128, activation=NETWORK.activation)
        network = merge([images_network, landmarks_network], 'concat', axis=1)
    else:
        network = images_network
    network = fully_connected(network, NETWORK.output_size, activation='softmax')

    if optimizer == 'momentum':
        optimizer = Momentum(learning_rate=learning_rate, momentum=optimizer_param, 
                    lr_decay=learning_rate_decay, decay_step=decay_step)
    elif optimizer == 'adam':
        optimizer = Adam(learning_rate=learning_rate, beta1=optimizer_param, beta2=learning_rate_decay)
    else:
        print( "Unknown optimizer: {}".format(optimizer))
    network = regression(network, optimizer=optimizer, loss=NETWORK.loss, learning_rate=learning_rate, name='output')

    return network

def build_modelA(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
    learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob,
    learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step):

    images_network = input_data(shape=[None, NETWORK.input_size, NETWORK.input_size, 1], name='input1')
    images_network = conv_2d(images_network, 64, 5, activation=NETWORK.activation)
    #images_network = local_response_normalization(images_network)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 64, 5, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 128, 4, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_conv_layers:
        images_network = batch_normalization(images_network)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 1024, activation=NETWORK.activation)
    if NETWORK.use_batchnorm_after_fully_connected_layers:
        images_network = batch_normalization(images_network)

    if NETWORK.use_landmarks or NETWORK.use_hog_and_landmarks:
        if NETWORK.use_hog_sliding_window_and_landmarks:
            landmarks_network = input_data(shape=[None, 2728], name='input2')
        elif NETWORK.use_hog_and_landmarks:
            landmarks_network = input_data(shape=[None, 208], name='input2')
        else:
            landmarks_network = input_data(shape=[None, 68, 2], name='input2')
        landmarks_network = fully_connected(landmarks_network, 1024, activation=NETWORK.activation)
        if NETWORK.use_batchnorm_after_fully_connected_layers:
            landmarks_network = batch_normalization(landmarks_network)
        landmarks_network = fully_connected(landmarks_network, 40, activation=NETWORK.activation)
        if NETWORK.use_batchnorm_after_fully_connected_layers:
            landmarks_network = batch_normalization(landmarks_network)
        images_network = fully_connected(images_network, 40, activation=NETWORK.activation)
        network = merge([images_network, landmarks_network], 'concat', axis=1)
    else:
        network = images_network
    network = fully_connected(network, NETWORK.output_size, activation='softmax')

    if optimizer == 'momentum':
        optimizer = Momentum(learning_rate=learning_rate, momentum=optimizer_param, 
                    lr_decay=learning_rate_decay, decay_step=decay_step)
    elif optimizer == 'adam':
        optimizer = Adam(learning_rate=learning_rate, beta1=optimizer_param, beta2=learning_rate_decay)
    else:
        print( "Unknown optimizer: {}".format(optimizer))
    network = regression(network, optimizer=optimizer, loss=NETWORK.loss, learning_rate=learning_rate, name='output')

    return network

# Train

In [44]:
def train(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
        learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob, 
        learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step,
        train_model=True):

        print( "loading dataset " + DATASET.name + "...")
        if train_model:
                data, validation = load_data(validation=True)
        else:
                data, validation, test = load_data(validation=True, test=True)

        with tf.Graph().as_default():
                print( "building model...")
                network = build_model(optimizer, optimizer_param, learning_rate, 
                          keep_prob, learning_rate_decay, decay_step)
                model = DNN(network, tensorboard_dir=TRAINING.logs_dir, 
                        tensorboard_verbose=0, checkpoint_path=TRAINING.checkpoint_dir,
                        max_checkpoints=TRAINING.max_checkpoints)

                #tflearn.config.init_graph(seed=None, log_device=False, num_cores=6)

                if train_model:
                        # Training phase
                        print( "start training...")
                        print( "  - emotions = {}".format(NETWORK.output_size))
                        print( "  - model = {}".format(NETWORK.model))
                        print( "  - optimizer = '{}'".format(optimizer))
                        print( "  - learning_rate = {}".format(learning_rate))
                        print( "  - learning_rate_decay = {}".format(learning_rate_decay))
                        print( "  - otimizer_param ({}) = {}".format('beta1' if optimizer == 'adam' else 'momentum', optimizer_param))
                        print( "  - keep_prob = {}".format(keep_prob))
                        print( "  - epochs = {}".format(TRAINING.epochs))
                        print( "  - use landmarks = {}".format(NETWORK.use_landmarks))
                        print( "  - use hog + landmarks = {}".format(NETWORK.use_hog_and_landmarks))
                        print( "  - use hog sliding window + landmarks = {}".format(NETWORK.use_hog_sliding_window_and_landmarks))
                        print( "  - use batchnorm after conv = {}".format(NETWORK.use_batchnorm_after_conv_layers))
                        print( "  - use batchnorm after fc = {}".format(NETWORK.use_batchnorm_after_fully_connected_layers))

                        start_time = time.time()
                        if NETWORK.use_landmarks:
                                model.fit([data['X'], data['X2']], data['Y'],
                                        validation_set=([validation['X'], validation['X2']], validation['Y']),
                                        snapshot_step=TRAINING.snapshot_step,
                                        show_metric=TRAINING.vizualize,
                                        batch_size=TRAINING.batch_size,
                                        n_epoch=TRAINING.epochs)
                        else:
                                model.fit(data['X'], data['Y'],
                                        validation_set=(validation['X'], validation['Y']),
                                        snapshot_step=TRAINING.snapshot_step,
                                        show_metric=TRAINING.vizualize,
                                        batch_size=TRAINING.batch_size,
                                        n_epoch=TRAINING.epochs)
                                validation['X2'] = None
                        training_time = time.time() - start_time
                        print( "training time = {0:.1f} sec".format(training_time))

                        if TRAINING.save_model:
                                print( "saving model...")
                                model.save(TRAINING.save_model_path)
                                if not(os.path.isfile(TRAINING.save_model_path)) and \
                                        os.path.isfile(TRAINING.save_model_path + ".meta"):
                                        os.rename(TRAINING.save_model_path + ".meta", TRAINING.save_model_path)

                        print( "evaluating...")
                        validation_accuracy = evaluate(model, validation['X'], validation['X2'], validation['Y'])
                        print( "  - validation accuracy = {0:.1f}".format(validation_accuracy*100))
                        return validation_accuracy
                else:
                        # Testing phase : load saved model and evaluate on test dataset
                        print( "start evaluation...")
                        print( "loading pretrained model...")
                        if os.path.isfile(TRAINING.save_model_path):
                                model.load(TRAINING.save_model_path)
                        else:
                                print( "Error: file '{}' not found".format(TRAINING.save_model_path))
                                exit()
                        
                        if not NETWORK.use_landmarks:
                                validation['X2'] = None
                                test['X2'] = None

                        print( "--")
                        print( "Validation samples: {}".format(len(validation['Y'])))
                        print( "Test samples: {}".format(len(test['Y'])))
                        print( "--")
                        print( "evaluating...")
                        start_time = time.time()
                        validation_accuracy = evaluate(model, validation['X'], validation['X2'], validation['Y'])
                        print( "  - validation accuracy = {0:.1f}".format(validation_accuracy*100))
                        test_accuracy = evaluate(model, test['X'], test['X2'], test['Y'])
                        print( "  - test accuracy = {0:.1f}".format(test_accuracy*100))
                        print( "  - evalution time = {0:.1f} sec".format(time.time() - start_time))
                        return test_accuracy

def evaluate(model, X, X2, Y):
        if NETWORK.use_landmarks:
                accuracy = model.evaluate([X, X2], Y)
        else:
                accuracy = model.evaluate(X, Y)
        return accuracy[0]

In [None]:
# train
train()
# evaluate
# train(train_model=False)

Training Step: 296  | total loss: [1m[32m0.26815[0m[0m | time: 1.098s
| Momentum | epoch: 011 | loss: 0.26815 - acc: 0.9336 -- iter: 3328/3436
Training Step: 297  | total loss: [1m[32m0.26389[0m[0m | time: 2.142s
| Momentum | epoch: 011 | loss: 0.26389 - acc: 0.9340 | val_loss: 1.90157 - val_acc: 0.6607 -- iter: 3436/3436
--
