In [None]:
# Writen by Horia-Gabriel Radu - 2021-2022
# for Third Year Project at University of Manchester
#
#  Used dataset basic for emotion detection model training from:
#   Reliable crowdsourcing and deep locality-preserving learning for expression recognition in the wild,
#   Li, Shan and Deng, Weihong and Du, JunPing,
#   Computer Vision and Pattern Recognition (CVPR), 2017 IEEE Conference on,
#   2584--2593,
#   2017,
#   IEEE

In [None]:
import os
import cv2
from tqdm.notebook import tqdm
import numpy as np
import random
#import object_detection
import tensorflow as tf
from tensorflow import keras
import tensorflow_hub as hub
import tensorflow_addons as tfa
from tensorflow.keras import layers
import functools

import matplotlib.pyplot as plt

In [None]:
tf.random.set_seed(1234)
np.random.seed(1234)
random.seed(1234)

In [None]:
def augmentation_color(image, brightness=0, contrast=0, saturation=0, hue=0):
    transformations = []
    if brightness > 0:
        transformations.append(functools.partial(tf.image.random_brightness, max_delta=brightness))
    if contrast > 0:
        transformations.append(functools.partial(tf.image.random_contrast, lower=max(0, 1 - contrast), upper=1 + contrast))
    if saturation > 0:
        transformations.append(functools.partial(tf.image.random_saturation, lower=max(0, 1 - saturation), upper=1 + saturation))
    if hue > 0:
        transformations.append(functools.partial(tf.image.random_hue, max_delta=hue))

    random.shuffle(transformations)
    for transformation in transformations:
        image = transformation(image)

    return image 



In [None]:
# #!pip list
# #!pip install --upgrade tensorflow_hub
# config = tf.compat.v1.ConfigProto()
# config.gpu_options.allow_growth = True
# sess= tf.compat.v1.Session(config=config)

In [None]:
DATA_PATH = "data - basic/basic/AllFaces/aligned/"
LABELS = "data - basic/basic/EmoLabel/list_patition_label.txt"

In [None]:
labels_dict = {0:"Surprise", 1:"Fear", 2:"Disgust", 3:"Happiness", 4:"Sadness", 5:"Anger", 6:"Neutral"}
labels_list = [{'name':'Surprise', 'id':0}, {'name':'Fear', 'id':1}, {'name':'Disgust', 'id':2}, {'name':'Happiness', 'id':3}, {'name':'Sadness', 'id':4}, {'name':'Anger', 'id':5}, {'name':'Neutral', 'id':6}]


In [None]:
file_labels = []
tfdata = []
tflabels = []
sets = []

with open(LABELS, 'r') as f:
    for l in f.readlines():
        words = l.split()
        file_labels.append((words[0], words[1]))

#random.shuffle(file_labels)

for x in file_labels:
    file_contents = tf.io.read_file(DATA_PATH + x[0][:-4] + "_aligned.jpg")
    #dataset.append({'image':tf.io.decode_jpeg(file_contents), 'label':int(x[1])})
    image = tf.io.decode_jpeg(file_contents)
    rev = tf.reverse(image, [1])
#     tfdata.append(tf.io.decode_jpeg(file_contents))
#     tflabels.append(int(x[1]) - 1)
#     tfdata.append(rev)
#     tflabels.append(int(x[1]) - 1)
    sets.append((image, int(x[1]) - 1))
    sets.append((rev, int(x[1]) - 1))

random.shuffle(sets)
    
for t in sets:
    #image = tf.image.central_crop(t[0], 0.90)
    image = cv2.resize(t[0].numpy(), (50, 50))
    image = cv2.resize(image, (100, 100))
    image = tf.convert_to_tensor(image)
    tfdata.append(augmentation_color(image, 0.25, 0.2, 0.2, 0.1))
    tflabels.append(t[1])
    
print(len(tfdata))
print(len(tflabels))

In [None]:
# features = tfds.features.FeaturesDict({
#     'image': Image(shape=(None, None, 3), dtype=tf.uint8),
#     'label': ClassLabel(shape=(), dtype=tf.int64, num_classes=7),
# })

dataset = tf.data.Dataset.from_tensor_slices((tfdata, tflabels))
print(len(dataset))
print(dataset)

In [None]:
# dataset.shuffle(buffer_size=len(dataset))
print(len(dataset))

train_size = int(0.85 * len(dataset))
val_size = int(0.15 * len(dataset))
#test_size = int(0.15 * len(dataset))
print(train_size)
#print(test_size)
print(val_size)


# # dataset = dataset.shuffle(1000)
# train_dataset = dataset.take(train_size)
# test_dataset = dataset.skip(train_size)
# val_dataset = test_dataset.skip(test_size)
# test_dataset = test_dataset.take(test_size)

# dataset = dataset.shuffle(1000)
train_dataset = dataset.take(train_size)
#test_dataset = dataset.skip(train_size)
val_dataset = dataset.skip(train_size)
val_dataset = dataset.take(val_size)
test_dataset = val_dataset

print(len(train_dataset))
print(len(val_dataset))
print(len(test_dataset))

In [None]:
train_dataset = train_dataset.shuffle(len(train_dataset)//2).batch(batch_size=32).prefetch(buffer_size=tf.data.AUTOTUNE)

test_dataset = test_dataset.batch(batch_size=32).prefetch(buffer_size=tf.data.AUTOTUNE)

val_dataset = val_dataset.shuffle(len(val_dataset)//2).batch(batch_size=32).prefetch(buffer_size=tf.data.AUTOTUNE)

#DATASET DONE

In [None]:
plt.figure(figsize=(16,16))
for i in range(9):
    for im,la in train_dataset.take(1):
#         train_dataset = train_dataset.skip(1)
        im = im/255
        plt.subplot(3,3,i+1)
        plt.imshow(im[0])
        plt.title( "Class_label: " + str(labels_dict[la[0].numpy()]))
        plt.axis(False);

In [None]:
def tensorboard_callback(directory, name):
    log_dir = directory + "/" + name
    t_c = tf.keras.callbacks.TensorBoard(log_dir = log_dir)
    return t_c

def model_checkpoint(directory, name):
    log_dir = directory + "/" + name
    m_c = tf.keras.callbacks.ModelCheckpoint(filepath=log_dir,
                                             monitor="val_accuracy",
                                             save_best_only=True,
                                             save_weights_only=True,
                                             verbose=1)
    return m_c

def scheduler(epoch, lr):
    if epoch == 20:
        return lr * 0.1
    if epoch == 40:
        return lr * 0.1
    else:
        return lr


# -- OPTION 1 -- EFFICIENTNET --

In [None]:
# Instructions on using tensorflow for machine learning from (Accessed 10 March 2022):
# https://towardsdatascience.com/image-classification-transfer-learning-and-fine-tuning-using-tensorflow-a791baf9dbf3

In [None]:
base_model_efficientnet = tf.keras.applications.efficientnet.EfficientNetB0(include_top=False)
base_model_efficientnet.trainable = False

In [None]:
inputs = layers.Input(shape = (100,100,3), name='inputLayer')
x = base_model_efficientnet(inputs, training = False)
x = layers.GlobalAveragePooling2D(name='poolingLayer')(x)
x = layers.Dense(7, name='outputLayer')(x)
outputs = layers.Activation(activation="softmax", dtype=tf.float32, name='activationLayer')(x)

model_efficientnet = tf.keras.Model(inputs, outputs, name="EfficientNet")

In [None]:
base_model_efficientnet.trainable = True
# model_efficientnet.trainable = True
for layer in model_efficientnet.layers[1].layers:
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False

In [None]:
base_model_efficientnet.summary()
for lnum, layer in enumerate(base_model_efficientnet.layers):
    print(lnum, layer.name, layer.trainable, layer.dtype, layer.dtype_policy)

In [None]:
model_efficientnet.summary()
for lnum, layer in enumerate(model_efficientnet.layers):
    print(lnum, layer.name, layer.trainable, layer.dtype, layer.dtype_policy)

In [None]:
lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

model_efficientnet.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(),
              optimizer = tfa.optimizers.AdamW(learning_rate=1e-4, weight_decay=1e-4),
              metrics = ["accuracy"])
hist_model = model_efficientnet.fit(train_dataset,
                             epochs=50,
                             steps_per_epoch=len(train_dataset),
                             validation_data=val_dataset,
                             validation_steps=int(0.1*len(val_dataset)),
                             callbacks=[tensorboard_callback("Tensorboard_E", "model_tuned"), model_checkpoint("Checkpoints_E", "model_tuned.ckpt")])

In [None]:
model_efficientnet.evaluate(test_dataset)

In [None]:
tf.saved_model.save(model_efficientnet, "saved_models/model_efficient")

In [None]:
# from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2# Get keras model
# # model = ...# Convert Keras model to ConcreteFunction    
# full_model = tf.function(lambda inputs: model_efficientnet(inputs))    
# full_model = full_model.get_concrete_function([tf.TensorSpec(model_input.shape, model_input.dtype) for model_input in model_efficientnet.inputs])# Get frozen ConcreteFunction    
# frozen_func = convert_variables_to_constants_v2(full_model)
# frozen_func.graph.as_graph_def()# Save frozen graph from frozen ConcreteFunction to hard drive
# tf.io.write_graph(graph_or_graph_def=frozen_func.graph,logdir="./frozen_models",name="frozen_efficient.pb",as_text=False)

In [None]:

# lista = [n.name for n in graph_def.node]
# print(lista[-20:])

In [None]:
import tensorflow as tf
print(tf.__version__)

from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.python.tools import optimize_for_inference_lib

loaded = tf.saved_model.load('saved_models/model_efficient')
infer = loaded.signatures['serving_default']

f = tf.function(infer).get_concrete_function(inputLayer=tf.TensorSpec(shape=[None, 100, 100, 3], dtype=tf.float32))
f2 = convert_variables_to_constants_v2(f)
graph_def = f2.graph.as_graph_def()

for i in reversed(range(len(graph_def.node))):
    #print(graph_def.node[i].op)
    if graph_def.node[i].op == 'NoOp':
        del graph_def.node[i]

for node in graph_def.node:
    for i in reversed(range(len(node.input))):
        if node.input[i][0] == '^':
            del node.input[i]
            


graph_def = optimize_for_inference_lib.optimize_for_inference(graph_def,
                                                              ['inputLayer'],
                                                              ['StatefulPartitionedCall/StatefulPartitionedCall/EfficientNet/activationLayer/Softmax'],
                                                              tf.float32.as_datatype_enum)

with tf.io.gfile.GFile('frozen_models/frozen_graph_efficient.pb', 'wb') as f:
    f.write(graph_def.SerializeToString())
    


In [None]:
import cv2
print(cv2.__version__)
# cv2.dnn.writeTextGraph("frozen_models/frozen_efficient.pb", "frozen_models/frozen_efficient.pbtxt")
cv2.dnn.writeTextGraph("frozen_models/frozen_graph_efficient.pb", "frozen_models/frozen_graph_efficient.pbtxt")

# -- OPTION 2 -- RESNET --

In [None]:
base_model_resnet = tf.keras.Sequential([
    hub.KerasLayer("https://tfhub.dev/google/imagenet/resnet_v2_50/classification/5",
                   trainable=True, arguments=dict(batch_norm_momentum=0.997))

])
base_model_resnet.trainable = False

In [None]:
inputs = layers.Input(shape = ([100,100,3]), name='inputLayer')
x = base_model_resnet(inputs, training = False)
# x = layers.BatchNormalization(name='poolingLayer')(x)
x = layers.Dense(7, name='outputLayer')(x)
outputs = layers.Activation(activation="softmax", dtype=tf.float32, name='activationLayer')(x)

model_resnet = tf.keras.Model(inputs, outputs, name="ResNet")

In [None]:
base_model_resnet.trainable = True
# model_resnet.trainable = True
for layer in model_resnet.layers[1].layers:
    if isinstance(layer, layers.BatchNormalization):
        layer.trainable = False

In [None]:
base_model_resnet.summary()
for lnum, layer in enumerate(base_model_resnet.layers):
    print(lnum, layer.name, layer.trainable, layer.dtype, layer.dtype_policy)

In [None]:
model_resnet.summary()
for lnum, layer in enumerate(model_resnet.layers):
    print(lnum, layer.name, layer.trainable, layer.dtype, layer.dtype_policy)

In [None]:
print("Available GPUs: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

model_resnet.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(),
              optimizer = tfa.optimizers.AdamW(learning_rate=1e-4, weight_decay=1e-4),
              metrics = ["accuracy"])
hist_model_tuned = model_resnet.fit(train_dataset,
                             epochs=20,
                             steps_per_epoch=len(train_dataset),
                             validation_data=val_dataset,
                             validation_steps=int(0.1*len(val_dataset)),
                             
                             callbacks=[tensorboard_callback("Tensorboard_R", "model_tuned"), model_checkpoint("Checkpoints_R", "model_tuned.ckpt")])

In [None]:
model_results = model_resnet.evaluate(test_dataset)

In [None]:
preds = model_resnet.predict(test_dataset, verbose = 1)
pred_labels = tf.argmax(preds, axis=1)
test_labels = np.concatenate([y for x, y in test_dataset], axis=0)

# Step 1
test_image_batches = []
for images, labels in test_dataset.take(-1):
    test_image_batches.append(images.numpy())

# Step 2
test_images = [item for sublist in test_image_batches for item in sublist]
len(test_images)
plt.figure(figsize = (20,20))
for i in range(9):
    random_int_index = random.choice(range(len(test_images)))
    plt.subplot(3,3,i+1)
    plt.imshow(test_images[random_int_index]/255.)
    if test_labels[random_int_index] == pred_labels[random_int_index]:
        color = "g"
    else:
        color = "r"
    plt.title("True Label: " + str(labels_dict[test_labels[random_int_index]]) + " || " + "Predicted Label: " +
              str(labels_dict[pred_labels[random_int_index].numpy()]) + "\n" + 
              str(np.asarray(tf.reduce_max(preds, axis = 1))[random_int_index]), c=color)
    plt.axis(False);

In [None]:
model_resnet.save("saved_models/model_resnet")

In [None]:
import tensorflow as tf
print(tf.__version__)

from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.python.tools import optimize_for_inference_lib

loaded = tf.saved_model.load('saved_models/model_resnet')
infer = loaded.signatures['serving_default']

f = tf.function(infer).get_concrete_function(inputLayer=tf.TensorSpec(shape=[None, 100, 100, 3], dtype=tf.float32))
f2 = convert_variables_to_constants_v2(f)
graph_def = f2.graph.as_graph_def()

for i in reversed(range(len(graph_def.node))):
    #print(graph_def.node[i].op)
    if graph_def.node[i].op == 'NoOp':
        del graph_def.node[i]

for node in graph_def.node:
    for i in reversed(range(len(node.input))):
        if node.input[i][0] == '^':
            del node.input[i]
            


graph_def = optimize_for_inference_lib.optimize_for_inference(graph_def,
                                                              ['inputLayer'],
                                                              ['StatefulPartitionedCall/StatefulPartitionedCall/ResNet/activationLayer/Softmax'],
                                                              tf.float32.as_datatype_enum)

with tf.io.gfile.GFile('frozen_models/frozen_graph_resnet.pb', 'wb') as f:
    f.write(graph_def.SerializeToString())
    


In [None]:
import cv2
print(cv2.__version__)
# cv2.dnn.writeTextGraph("frozen_models/frozen_efficient.pb", "frozen_models/frozen_efficient.pbtxt")
cv2.dnn.writeTextGraph("frozen_models/frozen_graph_resnet.pb", "frozen_models/frozen_graph_resnet.pbtxt")

# _________________ TESTING _________________

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

In [None]:
caffeWeightFile = "../detectFace/models/res10_300x300_ssd_iter_140000_fp16.caffemodel";
caffeConfigFile = "../detectFace/models/deploy.prototxt";
tensorflowWeightFile = "../detectFace/models/opencv_face_detector_uint8.pb";
tensorflowConfigFile = "../detectFace/models/opencv_face_detector.pbtxt";
netFace = cv2.dnn.readNetFromTensorflow(tensorflowWeightFile, tensorflowConfigFile);

netEmotion = cv2.dnn.readNet("frozen_models/frozen_graph_resnet.pb")
labels_dict = {0:"Surprise", 1:"Fear", 2:"Disgust", 3:"Happiness", 4:"Sadness", 5:"Anger", 6:"Neutral"}

In [None]:
cam = cv2.VideoCapture(0)

cv2.namedWindow("test")

img_counter = 0

while True:
    ret, frame = cam.read()
    frame = np.flip(frame, 1)
    if not ret:
        print("failed to grab frame")
        break
        
    frameCopy = frame.copy()
    faces = []
    
    (h, w) = frame.shape[:2]
    inputBlob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0));
    
    netFace.setInput(inputBlob, "data");
    detections = netFace.forward("detection_out");

    for i in range(0, detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence > 0.7:
            box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
            (startX, startY, endX, endY) = box.astype("int")

            text = "{:.2f}%".format(confidence * 100)
            y = startY - 10 if startY - 10 > 10 else startY + 10
            #cv2.rectangle(frameCopy, (startX, startY), (endX, endY), (0, 0, 255), 2)
            #cv2.putText(frameCopy, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 255), 2)
            
            faces.append([startX, startY, endX, endY])
            
    for f in faces:
        face = frameCopy[f[1]:f[3], f[0]:f[2]]
        faceInput = cv2.resize(face, (50, 50))
        faceInput = cv2.resize(faceInput, (100, 100))
        plt.imshow(faceInput)
        emotion = cv2.dnn.blobFromImage(faceInput, 1.0, (100, 100))
        netEmotion.setInput(emotion)

        outEmotion = netEmotion.forward();
        cv2.putText(frameCopy, labels_dict[outEmotion.argmax()], (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)

        
    cv2.imshow("test", frameCopy)

    k = cv2.waitKey(1)
    if k%256 == 27:
        break
    elif k%256 == 32:
        img_name = "opencv_frame_{}.png".format(img_counter)
        cv2.imwrite(img_name, frame)
        print("{} written!".format(img_name))
        img_counter += 1
        
    

cam.release()

cv2.destroyAllWindows()

In [None]:
n = 1001
test = cv2.resize(face, (100, 100))
test = cv2.resize(test, (50, 50))
test = cv2.resize(test, (100, 100))
#test = tfdata[n].numpy()

blb = cv2.dnn.blobFromImage(test, 1.0, (100, 100))

plt.imshow(test)

netEmotion.setInput(blb)
out = netEmotion.forward()

#print("true label - " + str(labels_dict[tflabels[n]]) + " | predicted label - " + labels_dict[out.argmax()])
print("predicted label - " + labels_dict[out.argmax()])
#print("true label - " + labels_dict[tflabels[n]])

In [None]:
cv2.namedWindow("test")

cv2.imshow("test", test)

cv2.waitKey(0)

cv2.destroyAllWindows()

In [None]:
nr = 1010

showImg = cv2.resize(tfdata[nr].numpy(), (50, 50))
showImg = cv2.resize(showImg, (100, 100))
#tfdata[nr] = tf.convert_to_tensor(showImg)
# showImg = tf.compat.v1.image.resize(tfdata[0],[43, 43], method=tf.compat.v1.image.ResizeMethod.AREA)
# showImg = tf.compat.v1.image.resize(showImg,[86, 86], method=tf.compat.v1.image.ResizeMethod.AREA)
plt.imshow(showImg)

In [None]:
plt.imshow(tfdata[nr])

In [None]:
nr = 1010
plt.imshow(sets[1010][0])
print(labels_dict[tflabels[nr]])