In [1]:
import numpy as np
import pandas as pd
import os
import argparse
import errno
import dlib
import cv2
import imageio

In [2]:
image_height = 48
image_width = 48
window_size = 24
window_step = 6
SELECTED_LABELS = [0,1,2,3,4,5,6]
#IMAGES_PER_LABEL = 500
OUTPUT_FOLDER_NAME = "fer2013_features"

In [3]:
predictor = dlib.shape_predictor('/kaggle/input/shape-predictor-68-face-landmarksdat/shape_predictor_68_face_landmarks.dat')
original_labels = [0, 1, 2, 3, 4, 5, 6]
new_labels = list(set(original_labels) & set(SELECTED_LABELS))
nb_images_per_label = list(np.zeros(len(SELECTED_LABELS), 'uint8'))
try:
    os.makedirs(OUTPUT_FOLDER_NAME)
except OSError as e:
    if e.errno == errno.EEXIST and os.path.isdir(OUTPUT_FOLDER_NAME):
        pass
    else:
        raise

In [5]:
def get_landmarks(image, rects):
    if len(rects) > 1:
        raise BaseException("TooManyFaces")
    if len(rects) == 0:
        raise BaseException("NoFaces")
    return np.matrix([[p.x, p.y] for p in predictor(image, rects[0]).parts()])

def get_new_label(label):
    new_label = new_labels.index(label)
    label = list(np.zeros(len(new_labels), 'uint8'))
    label[new_label] = 1
    return label

'''def sliding_hog_windows(image):
    hog_windows = []
    for y in range(0, image_height, window_step):
        for x in range(0, image_width, window_step):
            window = image[y:y+window_size, x:x+window_size]
            hog_windows.extend(hog(window, orientations=8, pixels_per_cell=(8, 8),
                                            cells_per_block=(1, 1), visualise=False))
    return hog_windows'''

'def sliding_hog_windows(image):\n    hog_windows = []\n    for y in range(0, image_height, window_step):\n        for x in range(0, image_width, window_step):\n            window = image[y:y+window_size, x:x+window_size]\n            hog_windows.extend(hog(window, orientations=8, pixels_per_cell=(8, 8),\n                                            cells_per_block=(1, 1), visualise=False))\n    return hog_windows'

In [6]:
print( "importing csv file")
data = pd.read_csv('/kaggle/input/fer2013/fer2013.csv')

for category in data['Usage'].unique():
    print( "converting set: " + category + "...")
    # create folder
    if not os.path.exists(category):
        try:
            os.makedirs(OUTPUT_FOLDER_NAME + '/' + category)
        except OSError as e:
            if e.errno == errno.EEXIST and os.path.isdir(OUTPUT_FOLDER_NAME):
                pass
            else:
                raise
    category_data = data[data['Usage'] == category]
    samples = category_data['pixels'].values
    labels = category_data['emotion'].values
    images = []
    landmarks = []
    labels_list = []
    print(category)
    print(len(samples))
    for i in range(len(samples)):
        try:
            if labels[i] in SELECTED_LABELS :
                image = np.fromstring(samples[i], dtype=int, sep=" ").reshape((image_height, image_width))
                images.append(image)
                imageio.imwrite('temp.jpg', image)
                image2 = cv2.imread('temp.jpg')
                face_rects = [dlib.rectangle(left=1, top=1, right=47, bottom=47)]
                face_landmarks = get_landmarks(image2, face_rects)
                landmarks.append(face_landmarks)       
                labels_list.append(get_new_label(labels[i]))
        except Exception as e:
            print( "error in image: " + str(i) + " - " + str(e))
    np.save(OUTPUT_FOLDER_NAME + '/' + category + '/images.npy', images)
    print("done")
    np.save(OUTPUT_FOLDER_NAME + '/' + category + '/landmarks.npy', landmarks)
    print("done")
    np.save(OUTPUT_FOLDER_NAME + '/' + category + '/labels.npy', labels_list)
    print("done")

importing csv file
converting set: Training...
Training
28709
done
done
done
converting set: PublicTest...
PublicTest
3589
done
done
done
converting set: PrivateTest...
PrivateTest
3589
done
done
done


In [7]:
import os

class Dataset:
    name = 'Fer2013'
    train_folder = 'fer2013_features/Training'
    validation_folder = 'fer2013_features/PublicTest'
    test_folder = 'fer2013_features/PrivateTest'
    shape_predictor_path='/kaggle/input/shape-predictor-68-face-landmarksdat/shape_predictor_68_face_landmarks.dat'

class Network:
    model = 'B'
    input_size = 48
    output_size = 7
    activation = 'relu'
    loss = 'categorical_crossentropy'   
    use_batchnorm_after_conv_layers = True

class Hyperparams:
    keep_prob = 0.956   # dropout = 1 - keep_prob
    learning_rate = 0.016
    learning_rate_decay = 0.864
    decay_step = 50
    optimizer = 'adam'  # {'momentum', 'adam', 'rmsprop', 'adagrad', 'adadelta'}
    optimizer_param = 1   # momentum value for Momentum optimizer, or beta1 value for Adam

class Training:
    batch_size = 128
    epochs = 13
    snapshot_step = 500
    vizualize = True
    logs_dir = "logs"
    checkpoint_dir = "checkpoints/chk"
    best_checkpoint_path = "checkpoints/best/"
    max_checkpoints = 1
    checkpoint_frequency = 1.0 # in hours
    save_model = True
    save_model_path = "best_model/saved_model.bin"

class VideoPredictor:
    emotions = ["Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]
    print_emotions = False
    camera_source = 0
    face_detection_classifier = "lbpcascade_frontalface.xml"
    show_confidence = False
    time_to_wait_between_predictions = 0.5

class OptimizerSearchSpace:
    learning_rate = {'min': 0.00001, 'max': 0.1}
    learning_rate_decay = {'min': 0.5, 'max': 0.99}
    optimizer = ['adam']   # ['momentum', 'adam', 'rmsprop', 'adagrad', 'adadelta']
    optimizer_param = {'min': 0.5, 'max': 0.99}
    keep_prob = {'min': 0.7, 'max': 0.99}

def make_dir(folder):
    if not os.path.exists(folder):
        os.makedirs(folder)

DATASET = Dataset()
NETWORK = Network()
TRAINING = Training()
HYPERPARAMS = Hyperparams()
VIDEO_PREDICTOR = VideoPredictor()
OPTIMIZER = OptimizerSearchSpace()

make_dir(TRAINING.logs_dir)
make_dir(TRAINING.checkpoint_dir)

In [8]:
import numpy as np

def load_data(validation=False, test=False):
    
    data_dict = dict()
    validation_dict = dict()
    test_dict = dict()

    if DATASET.name == "Fer2013":

        # load train set
        data_dict['X'] = np.load("/kaggle/working/fer2013_features/Training/images.npy")
        data_dict['X'] = data_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
        data_dict['X2'] = np.load("/kaggle/working/fer2013_features/Training/landmarks.npy")
        data_dict['Y'] = np.load("/kaggle/working/fer2013_features/Training/labels.npy")

        if validation:
            # load validation set
            validation_dict['X'] = np.load("/kaggle/working/fer2013_features/PublicTest/images.npy")
            validation_dict['X'] = validation_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
            validation_dict['X2'] = np.load("/kaggle/working/fer2013_features/PublicTest/landmarks.npy")
            validation_dict['Y'] = np.load("/kaggle/working/fer2013_features/PublicTest/labels.npy")
        
        if test:
            # load test set
            test_dict['X'] = np.load("/kaggle/working/fer2013_features/PrivateTest/images.npy")
            test_dict['X'] = test_dict['X'].reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])
            test_dict['X2'] = np.load("/kaggle/working/fer2013_features/PrivateTest/landmarks.npy")
            test_dict['Y'] = np.load("/kaggle/working/fer2013_features/PrivateTest/labels.npy")
    
        if not validation and not test:
            return data_dict
        elif not test:
            return data_dict, validation_dict
        else: 
            return data_dict, validation_dict, test_dict
    else:
        print( "Unknown dataset")
        exit()

In [9]:
!pip install tflearn

Collecting tflearn
  Downloading tflearn-0.5.0.tar.gz (107 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m107.3/107.3 kB[0m [31m542.3 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: tflearn
  Building wheel for tflearn (setup.py) ... [?25ldone
[?25h  Created wheel for tflearn: filename=tflearn-0.5.0-py3-none-any.whl size=127299 sha256=a10e355ee0fd27a38f4eb50f13ee27b43f3d93bb399469570ed685cda15998e5
  Stored in directory: /root/.cache/pip/wheels/5f/14/2e/1d8e28cc47a5a931a2fb82438c9e37ef9246cc6a3774520271
Successfully built tflearn
Installing collected packages: tflearn
Successfully installed tflearn-0.5.0
[0m

In [10]:
import tensorflow as tf 
from tflearn.layers.core import input_data, dropout, fully_connected
from tflearn.layers.conv import conv_2d, max_pool_2d
from tflearn.layers.merge_ops import merge_outputs, merge
from tflearn.layers.normalization import local_response_normalization, batch_normalization
from tflearn.layers.estimator import regression 
from tflearn.optimizers import Momentum, Adam

In [11]:
def build_model(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
    learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob,
    learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step):

    images_network = input_data(shape=[None, NETWORK.input_size, NETWORK.input_size, 1], name='input1')
    images_network = conv_2d(images_network, 64, 3, activation=NETWORK.activation)
    images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 128, 3, activation=NETWORK.activation)
    
    images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 256, 3, activation=NETWORK.activation)
    
    images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 4096, activation=NETWORK.activation)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 1024, activation=NETWORK.activation)
    landmarks_network = input_data(shape=[None, 68, 2], name='input2')
    landmarks_network = fully_connected(landmarks_network, 1024, activation=NETWORK.activation)
    landmarks_network = fully_connected(landmarks_network, 128, activation=NETWORK.activation)
    images_network = fully_connected(images_network, 128, activation=NETWORK.activation)
    network = merge([images_network, landmarks_network], 'concat', axis=1)
    network = fully_connected(network,NETWORK.output_size, activation='softmax')
    optimizer = Adam(learning_rate=learning_rate, beta1=optimizer_param, beta2=learning_rate_decay)
    network = regression(network, optimizer=optimizer, loss=NETWORK.loss, learning_rate=learning_rate, name='output')

    return network

In [67]:
'''def build_model(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
    learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob,
    learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step):

    images_network = input_data(shape=[None, NETWORK.input_size, NETWORK.input_size, 1], name='input1')
    images_network = conv_2d(images_network, 64, 5, activation=NETWORK.activation)     
    images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 64, 5, activation=NETWORK.activation)
    images_network = batch_normalization(images_network)
    images_network = max_pool_2d(images_network, 3, strides = 2)
    images_network = conv_2d(images_network, 128, 4, activation=NETWORK.activation)
    images_network = batch_normalization(images_network)
    images_network = dropout(images_network, keep_prob=keep_prob)
    images_network = fully_connected(images_network, 1024, activation=NETWORK.activation) 
    landmarks_network = input_data(shape=[None, 68, 2], name='input2')
    landmarks_network = fully_connected(landmarks_network, 1024, activation=NETWORK.activation)
    landmarks_network = fully_connected(landmarks_network, 40, activation=NETWORK.activation)
    images_network = fully_connected(images_network, 40, activation=NETWORK.activation)
    network = merge([images_network, landmarks_network], 'concat', axis=1)
    network = fully_connected(network, NETWORK.output_size, activation='softmax')
    optimizer = Adam(learning_rate=learning_rate, beta1=optimizer_param, beta2=learning_rate_decay)
    network = regression(network, optimizer=optimizer, loss=NETWORK.loss, learning_rate=learning_rate, name='output')

    return network'''

In [12]:
import tensorflow as tf
from tflearn import DNN
import time
import argparse
import os

In [13]:
def train(optimizer=HYPERPARAMS.optimizer, optimizer_param=HYPERPARAMS.optimizer_param, 
        learning_rate=HYPERPARAMS.learning_rate, keep_prob=HYPERPARAMS.keep_prob, 
        learning_rate_decay=HYPERPARAMS.learning_rate_decay, decay_step=HYPERPARAMS.decay_step,
        train_model=True):

        print( "loading dataset " + DATASET.name + "...")
        if train_model:
                data, validation = load_data(validation=True)
        else:
                data, validation, test = load_data(validation=True, test=True)

        with tf.Graph().as_default():
                print( "building model...")
                network = build_model(optimizer, optimizer_param, learning_rate, 
                          keep_prob, learning_rate_decay, decay_step)
                model = DNN(network, tensorboard_dir=TRAINING.logs_dir, 
                        tensorboard_verbose=0, checkpoint_path=TRAINING.checkpoint_dir,
                        max_checkpoints=TRAINING.max_checkpoints)

                #tflearn.config.init_graph(seed=None, log_device=False, num_cores=6)

                if train_model:
                        # Training phase
                        print( "start training...")
                        print( "  - emotions = {}".format(NETWORK.output_size))
                        print( "  - model = {}".format(NETWORK.model))
                        print( "  - optimizer = '{}'".format(optimizer))
                        print( "  - learning_rate = {}".format(learning_rate))
                        print( "  - learning_rate_decay = {}".format(learning_rate_decay))
                        print( "  - otimizer_param ({}) = {}".format('beta1' if optimizer == 'adam' else 'momentum', optimizer_param))
                        print( "  - keep_prob = {}".format(keep_prob))
                        print( "  - epochs = {}".format(TRAINING.epochs))
                        #print( "  - use landmarks = {}".format(NETWORK.use_landmarks))

                        start_time = time.time()
                        
                        model.fit([data['X'], data['X2']], data['Y'],
                                    validation_set=([validation['X'], validation['X2']], validation['Y']),
                                    snapshot_step=TRAINING.snapshot_step,
                                    show_metric=TRAINING.vizualize,
                                    batch_size=TRAINING.batch_size,
                                    n_epoch=TRAINING.epochs)
            
                        training_time = time.time() - start_time
                        print( "training time = {0:.1f} sec".format(training_time))

                        if TRAINING.save_model:
                            print( "saving model...")
                            model.save(TRAINING.save_model_path)
                            if not(os.path.isfile(TRAINING.save_model_path)) and \
                                    os.path.isfile(TRAINING.save_model_path + ".meta"):
                                    os.rename(TRAINING.save_model_path + ".meta", TRAINING.save_model_path)
                        print( "evaluating...")
                        validation_accuracy = evaluate(model, validation['X'], validation['X2'], validation['Y'])
                        print( "  - validation accuracy = {0:.1f}".format(validation_accuracy*100))
                        return validation_accuracy
                else:
                        # Testing phase : load saved model and evaluate on test dataset
                        print( "start evaluation...")
                        print( "loading pretrained model...")
                        if os.path.isfile(TRAINING.save_model_path):
                                model.load(TRAINING.save_model_path)
                        else:
                                print( "Error: file '{}' not found".format(TRAINING.save_model_path))
                                exit()
                        
                        print( "--")
                        print( "Validation samples: {}".format(len(validation['Y'])))
                        print( "Test samples: {}".format(len(test['Y'])))
                        print( "--")
                        print( "evaluating...")
                        start_time = time.time()
                        validation_accuracy = evaluate(model, validation['X'], validation['X2'], validation['Y'])
                        print( "  - validation accuracy = {0:.1f}".format(validation_accuracy*100))
                        test_accuracy = evaluate(model, test['X'], test['X2'], test['Y'])
                        print( "  - test accuracy = {0:.1f}".format(test_accuracy*100))
                        print( "  - evalution time = {0:.1f} sec".format(time.time() - start_time))
                        return test_accuracy
               

In [14]:
def evaluate(model, X, X2, Y):
    accuracy = model.evaluate([X, X2], Y)
    return accuracy[0]

In [15]:
train()

Training Step: 2924  | total loss: [1m[32mnan[0m[0m | time: 9.619s
| Adam | epoch: 013 | loss: nan - acc: 0.1351 -- iter: 28672/28709
Training Step: 2925  | total loss: [1m[32mnan[0m[0m | time: 10.665s
| Adam | epoch: 013 | loss: nan - acc: 0.1349 | val_loss: nan - val_acc: 0.1301 -- iter: 28709/28709
--
training time = 187.9 sec
saving model...
evaluating...
  - validation accuracy = 13.0


0.13011981053218166

In [16]:
import tensorflow as tf
from tflearn import DNN
import time
import numpy as np
import argparse
import dlib
import cv2
import os
from skimage.feature import hog

def load_model():
    model = None
    with tf.Graph().as_default():
        print( "loading pretrained model...")
        network = build_model()
        model = DNN(network)
        if os.path.isfile(TRAINING.save_model_path):
            model.load(TRAINING.save_model_path)
        else:
            print( "Error: file '{}' not found".format(TRAINING.save_model_path))
    return model

def get_landmarks(image, rects, predictor):
    # this function have been copied from http://bit.ly/2cj7Fpq
    if len(rects) > 1:
        raise TooManyFaces
    if len(rects) == 0:
        raise NoFaces
    return np.matrix([[p.x, p.y] for p in predictor(image, rects[0]).parts()])


In [17]:
model = load_model()
image = cv2.imread("/kaggle/input/d/msambare/fer2013/test/angry/PrivateTest_10131363.jpg")
image.shape

loading pretrained model...


2022-12-25 21:41:40.845680: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-12-25 21:41:40.846550: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-12-25 21:41:40.847155: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-12-25 21:41:40.847787: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-12-25 21:41:40.848331: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from S

(48, 48, 3)

In [18]:
def get_emotion(label):
    if VIDEO_PREDICTOR.print_emotions:
        print( "- Angry: {0:.1f}%\n- Happy: {1:.1f}%\n- Sad: {2:.1f}%\n- Surprise: {3:.1f}%\n- Neutral: {4:.1f}%".format(
                label[0]*100, label[1]*100, label[2]*100, label[3]*100, label[4]*100))
    label = label.tolist()
    return VIDEO_PREDICTOR.emotions[label.index(max(label))],max(label)

In [20]:
face_rects = [dlib.rectangle(left=0, top=0, right=NETWORK.input_size, bottom=NETWORK.input_size)]
image = cv2.imread("/kaggle/input/d/msambare/fer2013/train/angry/Training_10120469.jpg")
shape_predictor = dlib.shape_predictor("/kaggle/input/shape-predictor-68-face-landmarksdat/shape_predictor_68_face_landmarks.dat")
face_landmarks = np.array([get_landmarks(image, face_rects, shape_predictor)])
tensor_image = image.reshape([-1, NETWORK.input_size, NETWORK.input_size, 1])

In [46]:
tensor_image.shape

(3, 48, 48, 1)

In [48]:
face_landmarks.shape

(1, 68, 2)

In [21]:
predicted_label = model.predict([tensor_image,face_landmarks])

TypeError: predict() got an unexpected keyword argument 'batch_size'

In [112]:
emotion=get_emotion(predicted_label[0])
print( "Prediction: {0} (confidence: {1:.1f}%)".format(emotion, confidence*100))

NameError: name 'predicted_label' is not defined