<a href="https://colab.research.google.com/github/altaga/Open-Driving-Monitor/blob/main/Emotions/train/Train_Test_and_Deploy_Emotions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Importing modules

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import cv2
from matplotlib import pyplot as plt
from keras.layers import Conv2D, Flatten, Dense, AveragePooling2D, Dropout, MaxPooling2D,Activation,BatchNormalization
from keras.models import Sequential
from keras.losses import SparseCategoricalCrossentropy
from keras.regularizers import l2
from tensorflow.python.keras.utils import np_utils
from tensorflow.python.tools import freeze_graph
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from sklearn.model_selection import train_test_split

Download the Dataset

In [None]:
!wget https://huggingface.co/spaces/mxz/emtion/resolve/c697775e0adc35a9cec32bd4d3484b5f5a263748/fer2013.csv

Setting up training

In [None]:
NUM_CLASSES = 7
IMG_SIZE = 48
# Training Parameters
n_epochs = 30
batch_size = 128
weight_decay = 1e-4

Setup support functions for processing and training.

In [None]:
def pandas_vector_to_list(pandas_df):
    py_list = [item[0] for item in pandas_df.values.tolist()]
    return py_list

def process_emotion(emotion):
    emotion_as_list = pandas_vector_to_list(emotion)
    y_data = []
    for index in range(len(emotion_as_list)):
        y_data.append(emotion_as_list[index])

    # Y data
    y_data_categorical = np_utils.to_categorical(y_data, NUM_CLASSES)
    return y_data_categorical


def process_pixels(pixels, img_size=IMG_SIZE):
    pixels_as_list = pandas_vector_to_list(pixels)
    np_image_array = []
    for index, item in enumerate(pixels_as_list):
        # 48x48
        data = np.zeros((img_size, img_size), dtype=np.uint8)
        # split space separated ints
        pixel_data = item.split()

        # 0 -> 47, loop through the rows
        for i in range(0, img_size):
            # (0 = 0), (1 = 47), (2 = 94), ...
            pixel_index = i * img_size
            # (0 = [0:47]), (1 = [47: 94]), (2 = [94, 141]), ...
            data[i] = pixel_data[pixel_index:pixel_index + img_size]

        np_image_array.append(np.array(data))

    np_image_array = np.array(np_image_array)
    # convert to float and divide by 255
    np_image_array = np_image_array.astype('float32') / 255.0
    return np_image_array


def duplicate_input_layer(array_input, size):
    vg_input = np.empty([size, 48, 48, 3])
    for index, item in enumerate(vg_input):
        item[:, :, 0] = array_input[index]
        item[:, :, 1] = array_input[index]
        item[:, :, 2] = array_input[index]
    return vg_input

def wrap_frozen_graph(graph_def, inputs, outputs, print_graph=False):
    def _imports_graph_def():
        tf.compat.v1.import_graph_def(graph_def, name="")

    wrapped_import = tf.compat.v1.wrap_function(_imports_graph_def, [])
    import_graph = wrapped_import.graph

    print("-" * 50)
    print("Frozen model layers: ")
    layers = [op.name for op in import_graph.get_operations()]
    if print_graph == True:
        for layer in layers:
            print(layer)
    print("-" * 50)

    return wrapped_import.prune(
        tf.nest.map_structure(import_graph.as_graph_element, inputs),
        tf.nest.map_structure(import_graph.as_graph_element, outputs))

Creating the model and training with the entire data set.

In [None]:
# get the data in a Pandas dataframe
raw_data = pd.read_csv("fer2013.csv")

# convert to one hot vectors
emotion_array = process_emotion(raw_data[['emotion']])

# convert to a 48x48 float matrix
pixel_array = process_pixels(raw_data[['pixels']])

# Data Split
X_train, X_test, y_train, y_test = train_test_split(pixel_array,emotion_array ,
                                  random_state=104,
                                  test_size=0.4,
                                  shuffle=True)
X_valid, X_test, y_valid, y_test = train_test_split(X_test,y_test ,
                              random_state=104,
                              test_size=0.3,
                              shuffle=True)

n_train = int(len(X_train))
n_test = int(len(X_test))
n_valid = int(len(X_valid))

x_train_input = duplicate_input_layer(X_train, n_train)
x_test_input = duplicate_input_layer(X_test, n_test)
x_valid_input = duplicate_input_layer(X_valid, n_valid)
print(x_train_input.dtype)

# Model
model = Sequential()
# 1st convolution layer
model.add(Conv2D(64, (8,8), activation='relu', input_shape=(48, 48, 3)))
model.add(Activation('elu'))
model.add(BatchNormalization())
# 1st Dropout
model.add(MaxPooling2D(pool_size=(3,3)))
model.add(Dropout(0.1))
# 2nd convolution layer
model.add(Conv2D(128, (8,8), padding='same', kernel_regularizer= l2(weight_decay)))
model.add(Activation('elu'))
model.add(BatchNormalization())
# 2nd Dropout
model.add(MaxPooling2D(pool_size=(3,3)))
model.add(Dropout(0.2))
# 3rd convolution layer
model.add(Conv2D(256, (8,8), padding='same', kernel_regularizer= l2(weight_decay)))
model.add(Activation('elu'))
model.add(BatchNormalization())
# 3rd Dropout
model.add(MaxPooling2D(pool_size=(3,3)))
model.add(Dropout(0.4))
# Flatten
model.add(Flatten())
# 1st Dense
model.add(Dense(512, activation="linear"))
model.add(Activation('elu'))
model.add(Dropout(0.2))
model.add(Dense(512, activation="linear"))
model.add(Activation('elu'))
model.add(Dropout(0.2))
# Output Layer
model.add(Dense(7, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])

model.summary()

model.fit(x_train_input, y_train,
                    validation_data=(x_valid_input, y_valid),
                    epochs=n_epochs, batch_size=batch_size)

score = model.evaluate(x_test_input, y_test, batch_size=batch_size)

print("Training Score: {}".format(score))
model.fit(x_test_input, y_test,
                    validation_data=(x_valid_input, y_valid),
                    epochs=int(n_epochs/2), batch_size=batch_size)
model.fit(x_valid_input, y_valid, epochs=int(n_epochs/2), batch_size=batch_size)

print("Model trained with all data")

Download test image

In [None]:
!wget https://raw.githubusercontent.com/altaga/Open-Driving-Monitor/main/Emotions/test/testImages/Happy1.png

Testing the model

In [None]:
# load the image from disk
class_names = ["Angry", "Disgust", "Fear", "Happy", "Sad", "Surprise", "Neutral"]

image = cv2.imread('Happy1.png')
color = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
resized = cv2.resize(color, (48, 48),  interpolation=cv2.INTER_AREA)
plt.imshow(color)
data = np.expand_dims(resized, 0)
outputs = model.predict(data)
# Get Values
final_outputs = outputs[0]
# get the class label
label_id = np.argmax(final_outputs)
out_name = class_names[label_id]
print(out_name)

Creating a forzen graph that can be used in OpenCV DNN

In [None]:
# Convert Keras model to ConcreteFunction
full_model = tf.function(lambda x: model(x))
full_model = full_model.get_concrete_function(
    tf.TensorSpec(model.inputs[0].shape, model.inputs[0].dtype, name="emotion"))
# Get frozen ConcreteFunction
frozen_func = convert_variables_to_constants_v2(full_model)
frozen_func.graph.as_graph_def()
layers = [op.name for op in frozen_func.graph.get_operations()]
print("-" * 50)
print("Frozen model layers: ")
for layer in layers:
    print(layer)
print("-" * 50)
print("Frozen model inputs: ")
print(frozen_func.inputs)
print("Frozen model outputs: ")
print(frozen_func.outputs)
# Save frozen graph from frozen ConcreteFunction to hard drive
tf.io.write_graph(graph_or_graph_def=frozen_func.graph,
                  logdir="./frozen_models",
                  name="frozen_graph.pb",
                  as_text=False)