## TensorFlow in an end-to-end example - Face Emotions Recognition

@fhlug 23.4.2019 | by David Baumgartner
---



### Get the dataset/data

- Emotion recognition
  - Source: images with a resolution of 48x48px
  - Target: 6 classes (original 7 classes)


In [0]:
# Labels
# Gitlab: https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/small_filtered_labels.npy
# Drive: https://drive.google.com/file/d/1OFzaoNiY_E9TXhp-pZSQBCgJTjnKyEMq/view?usp=sharing
!wget --no-check-certificate 'https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/small_filtered_labels.npy' -O small_filtered_labels.npy

# Images
# Gitlab: https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/small_filtered_images.npy
# Drive: https://drive.google.com/file/d/1Jsoi8BysEbaiHijTFYmzS_6yJZqmXwUw/view?usp=sharing
!wget --no-check-certificate 'https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/small_filtered_images.npy' -O small_filtered_images.npy

---

### Util

- calculate the baseline (the result should be better than just guessing)

In [0]:
import collections
import numpy as np

def calc_baseline(lab: np.ndarray) -> (float, int):
    counters = collections.Counter(lab)
    baseline = 0
    label = -1
    lab_len = len(lab)
    for l, c in counters.items():
        b = c / lab_len
        if b > baseline:
            baseline = b
            label = l
    return baseline * 100, label

---

### Load the dataset

- load the sources and targets into the system memory or GPU memory
- calculate the baseline for the loaded dataset for simple verification

In [0]:
# load dataset

# load images
features = 'small_filtered_images.npy' # 48x48
X = np.load(features)
X = np.expand_dims(X, axis=3)

# load labels
labels = 'small_filtered_labels.npy' # 1
Y = np.load(labels)
baseline = calc_baseline(Y)
Y = np.expand_dims(Y, axis=1)

print(X.shape, Y.shape)
print(baseline)

### Encode the target

- the target should be encoded so that every class can get a probability value

In [0]:
# encode labels -> OneHotEncoder

raise RuntimeError("TODO")

### Split the data

- split the dataset into separate datasets 
  - training (80%)
  - testing (20%)
  
use the testing dataset __only__ for testing, never for or in the training lifecycle

extract a __third dataset__ for validation during the training process, if the _tf.data.Dataset_-module is in use, then provide a manually created validation-set of data

the final data splitting could look like: 70:10:20

In [0]:
# split dataset to training-, testing-set

raise RuntimeError("TODO")

### Define the input and output dimensions

- what is the input
- what is the output
- what batch size would be nice



In [0]:
# define input and out put shape for neural network

raise RuntimeError("TODO")

### Define a neural network model

model definition depends on the data and the target

In [0]:
raise RuntimeError("TODO")

### Create a model instance

define which optimizer you want and how to calculate the error

- params
  - optimizer
  - loss
  - metrics
  - ...

In [0]:
# create actual model for training

raise RuntimeError("TODO")

### Start training the created model

with the training define additional parameters like how many epochs of training

- params
  - validdation size
  - epochs
  - shuffle
  - earlystopping
  - patience
  - save
  - ...

In [0]:
# setup training

raise RuntimeError("TODO")

### Plot some metric information and the graph

In [0]:
from IPython.display import clear_output, Image, display, HTML
import numpy as np    

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def

def show_graph(graph_def, max_const_size=32):
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(str(strip_def)), id='graph'+str(np.random.rand()))

    iframe = """
        <iframe seamless style="width:1200px;height:620px;border:0" srcdoc="{}"></iframe>
    """.format(code.replace('"', '&quot;'))
    display(HTML(iframe))

In [0]:
#generate plots
import matplotlib.pyplot as plt

# plot history for accuracy
plt.figure()
plt.ioff()

raise RuntimeError("TODO")

plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# plot history for loss
plt.figure()
plt.ioff()

raise RuntimeError("TODO")

plt.legend(['train', 'validation'], loc='upper left')
plt.show()

# show the current graph with TensorBoard
show_graph(tf.get_default_graph().as_graph_def())

## Load a model with Keras

- not frozen
- training can continue


In [0]:
# load a trained keras model

raise RuntimeError("TODO")

---

# Face detection in images

In [0]:
# face detector
# Gitlab: https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/haarcascade_frontalface_default.xml
# Drive: https://drive.google.com/file/d/1xSUK0snZeeJ_e1ekRaI-8rYHm_MNeuwh
!wget --no-check-certificate 'https://gitlab.com/2er0/pres/raw/aeb84257634be88267f79b2cde7414d29216e38f/workshop/haarcascade_frontalface_default.xml' -O haarcascade_frontalface_default.xml

## Classification

1. find face(s) in an image
- clip face(s) out of the image
- predict a label to each of the face(s)
- render the result as an image

In [0]:
import cv2 as cv
import numpy as np

face_cascade = cv.CascadeClassifier('haarcascade_frontalface_default.xml')
clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

emoMap = {0: "angry",
          1: "fear",
          2: "happy",
          3: "sad",
          4: "surprise",
          5: "neutral",
          6: "contempt"}

In [0]:
# detect face in image

def detect_face_and_classify(img, m):
  # convert to gray
  raise RuntimeError("TODO")
  
  # detect faces
  faces = face_cascade.detectMultiScale(gray, 1.3, 5)
  
  # emotiondetection
  if len(faces) == 0:
    return img
  
  face_features = []
  x_y_pos = []
  for face in faces:
      (x, y, w, h) = face

      # rescale cropboarder to be perfect square
      if w > h:
          diff = w - h
          y = y - int(diff / 2)
          h = h + int(((diff / 2) - (diff % 2)))
      else:
          diff = h - w
          x = x - int(diff / 2)
          w = w + int((diff / 2) - (diff % 2))

      # draw crop rectangle
      cv.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)

      # crop image
      c = gray[y:y + h, x:x + w]

      # scale image for classification
      raise RuntimeError("TODO")

      # hist equalisation
      raise RuntimeError("TODO")

      # expand dimenstions
      raise RuntimeError("TODO")

      face_features.append(c)
      x_y_pos.append((x, y, w))

  face_features = np.asarray(face_features)
  # predict class
  # class_predictions = m.predict_classes(face_features)
  each_class_predictions = m.predict(face_features)
  # show mapped class
  for all_classes, (x, y, w) in zip(each_class_predictions, x_y_pos):
      class_pred = int(encoder.inverse_transform([all_classes])[0])

      cv.putText(img, emoMap[class_pred], (x + 4, y - 4),
                 cv.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv.LINE_AA)
  
  return img

In [0]:
# import precoded take_photo sample

raise RuntimeError("TODO")

In [0]:
# run - collect photo and classify it
from google.colab.patches import cv2_imshow

raise RuntimeError("TODO")

  # Freeze & Optimize Model (Production)

In [0]:
# https://www.dlology.com/blog/how-to-convert-trained-keras-model-to-tensorflow-and-make-prediction/

def freeze_session_1(session, keep_var_names=None, output_names=None, clear_devices=True):
    """
    Freezes the state of a session into a pruned computation graph.

    Creates a new computation graph where variable nodes are replaced by
    constants taking their current value in the session. The new graph will be
    pruned so subgraphs that are not necessary to compute the requested
    outputs are removed.
    @param session The TensorFlow session to be frozen.
    @param keep_var_names A list of variable names that should not be frozen,
                          or None to freeze all the variables in the graph.
    @param output_names Names of the relevant graph outputs.
    @param clear_devices Remove the device directives from the graph for better portability.
    @return The frozen graph definition.
    """
    from tensorflow.graph_util import convert_variables_to_constants
    graph = session.graph
    with graph.as_default():
        freeze_var_names = list(set(v.op.name for v in tf.global_variables()).difference(keep_var_names or []))
        output_names = output_names or []
        output_names += [v.op.name for v in tf.global_variables()]
        print(output_names)
        input_graph_def = graph.as_graph_def()
        if clear_devices:
            for node in input_graph_def.node:
                node.device = ""
        frozen_graph = convert_variables_to_constants(session, input_graph_def, 
                                                      output_names, freeze_var_names)
        return frozen_graph
      
def freeze_session_2(model_dir, output_node_names):
    """Extract the sub graph defined by the output nodes and convert 
    all its variables into constant 
    Args:
        model_dir: the root folder containing the checkpoint state file
        output_node_names: a string, containing all the output node's names, 
                            comma separated
    """
    if not tf.gfile.Exists(model_dir):
        raise AssertionError(
            "Export directory doesn't exists. Please specify an export "
            "directory: %s" % model_dir)

    if not output_node_names:
        print("You need to supply the name of a node to --output_node_names.")
        return -1

    # We retrieve our checkpoint fullpath
    checkpoint = tf.train.get_checkpoint_state(model_dir)
    input_checkpoint = checkpoint.model_checkpoint_path
    
    # We precise the file fullname of our freezed graph
    absolute_model_dir = "/".join(input_checkpoint.split('/')[:-1])
    output_graph = absolute_model_dir + "/frozen_model.pb"

    # We clear devices to allow TensorFlow to control on which device it will load operations
    clear_devices = True

    # We start a session using a temporary fresh Graph
    with tf.Session(graph=tf.Graph()) as sess:
        # We import the meta graph in the current default Graph
        saver = tf.train.import_meta_graph(input_checkpoint + '.meta', clear_devices=clear_devices)

        # We restore the weights
        saver.restore(sess, input_checkpoint)

        # We use a built-in TF helper to export variables to constants
        output_graph_def = tf.graph_util.convert_variables_to_constants(
            sess, # The session is used to retrieve the weights
            tf.get_default_graph().as_graph_def(), # The graph_def is used to retrieve the nodes 
            output_node_names.split(",") # The output node names are used to select the usefull nodes
        ) 

        # Finally we serialize and dump the output graph to the filesystem
        with tf.gfile.GFile(output_graph, "wb") as f:
            f.write(output_graph_def.SerializeToString())
        print("%d ops in the final graph." % len(output_graph_def.node))

    return output_graph_def

### Version 1: freeze-session

In [0]:
from tensorflow.keras import backend as K
from tensorflow.python.tools import optimize_for_inference_lib
from tensorflow.tools.graph_transforms import TransformGraph

# Create, compile and train model...
print('Input: ', [i.op.name for i in model.inputs], model.input)
print('Output: ', [o.op.name for o in model.outputs], model.output)

K.set_learning_phase(0)
k_sess = K.get_session()

model_path = './out2'
model_name = 'model'
input_node_name = [i.op.name for i in model.inputs][0]
output_node_name = [o.op.name for o in model.outputs][0]

frozen_graph = freeze_session_1(k_sess, output_names=[out.op.name for out in model.outputs])
tf.train.write_graph(frozen_graph, model_path, f'frozen_{model_name}.pb', as_text=False)

### Version 2: optimize for inference

In [0]:
graph_def = optimize_for_inference_lib.optimize_for_inference(k_sess.graph.as_graph_def(), [input_node_name], [output_node_name], tf.float32.as_datatype_enum, True)
graph_def = TransformGraph(graph_def, [input_node_name], [output_node_name], ['fold_constants', 'sort_by_execution_order'])
with tf.gfile.GFile(f'./{model_path}/opti_{model_name}.pb', 'wb') as f:
  f.write(graph_def.SerializeToString())

# Training Checkpoints

In [0]:
tf.train.Saver().save(k_sess, f'{model_path}/{model_name}.ckpt')

In [0]:
show_graph(graph_def)

# Face detection in Images with optimized model

based on classifier within OpenCV or native TensorFlow

---

## Util 2

In [0]:
import tensorflow as tf

def load_graph(frozen_graph_filename):
  # We load the protobuf file from the disk and parse it to retrieve the 
  # unserialized graph_def
  with tf.gfile.GFile(frozen_graph_filename, "rb") as f:
      graph_def = tf.GraphDef()
      graph_def.ParseFromString(f.read())

  # Then, we import the graph_def into a new Graph and returns it 
  with tf.Graph().as_default() as graph:
      # The name var will prefix every op/nodes in your graph
      # Since we load everything in a new graph, this is not needed
      tf.import_graph_def(graph_def, name="prefix")
  return graph

---

## Load a native TensorFlow graph

A TenserFlow graph loaded with the provided API or with a third party library

- TensorFlow API (C++, Java, Go, Python, ...)
- OpenCV has load function (works not 100%)


In [0]:
# detect face in image
import cv2 as cv
import numpy as np

predict_with_cv = False
frozen_name = 'todo'
raise RuntimeError("TODO: set model path for loading load")

# works only if only one model is in the current session
if predict_with_cv:
  cv_model = cv.dnn.readNetFromTensorflow(frozen_name)

else:
  graph = load_graph(frozen_name)
  layer_names = [o.name for o in graph.get_operations()]
  print(list(filter(lambda x : x.count('nput')>0, layer_names)))
  print(list(filter(lambda x : x.count('oftmax')>0, layer_names)))

  # We access the input and output nodes 
  tensor_input = graph.get_tensor_by_name(f'prefix/{input_node_name}:0')
  tensor_output = graph.get_tensor_by_name(f'prefix/{output_node_name}:0')
  sess = tf.Session(graph=graph)

In [0]:
show_graph(tf.get_default_graph().as_graph_def())

## Update Classification

changing the classification to the frozen and optimized model

In [0]:
def detect_face_and_classify_freezed(img):
  # convert to gray
  raise RuntimeError("TODO")
  
  # detect faces
  faces = face_cascade.detectMultiScale(gray, 1.3, 5)
  
  # emotiondetection
  if len(faces) == 0:
    return img
  
  face_features = []
  x_y_pos = []
  for face in faces:
      (x, y, w, h) = face

      # rescale cropboarder to be perfect square
      if w > h:
          diff = w - h
          y = y - int(diff / 2)
          h = h + int(((diff / 2) - (diff % 2)))
      else:
          diff = h - w
          x = x - int(diff / 2)
          w = w + int((diff / 2) - (diff % 2))

      # draw crop rectangle
      cv.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)

      # crop image
      c = gray[y:y + h, x:x + w]

      # scale image for classification
      raise RuntimeError("TODO")

      # hist equalisation
      raise RuntimeError("TODO")

      # expand dimenstions
      raise RuntimeError("TODO")

      face_features.append(c)
      x_y_pos.append((x, y, w))

  face_features = np.asarray(face_features)
  
  # predict class
  if predict_with_cv:
    cv_model.setInput(cv.dnn.blobFromImages(face_features, size=(48, 48), swapRB=True, crop=False))
    each_class_predictions = cv_model.forward()
  else:
    each_class_predictions = sess.run(tensor_output, {tensor_input: face_features})
    
  # show mapped class
  for all_classes, (x, y, w) in zip(each_class_predictions, x_y_pos):
      class_pred = int(encoder.inverse_transform([all_classes])[0])

      cv.putText(img, emoMap[class_pred], (x + 4, y - 4),
                 cv.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2, cv.LINE_AA)
  
  return img, each_class_predictions

## Get a picture and classify it, part 2

In [0]:
# use frozen model
from google.colab.patches import cv2_imshow

raise RuntimeError("TODO")

# Finish up

how much space are the models now using


In [0]:
import os

# The folder containing files.
directory = "todo"
raise RuntimeError("TODO: set dir path to check")

# Get all files.
items = os.listdir(directory)

# Loop and add files to list.
pairs = []
for file in items:

    # Use join to get full file path.
    location = os.path.join(directory, file)

    # Get size and add to list of tuples.
    size = os.path.getsize(location)
    pairs.append((size, file))

# Sort list of tuples by the first element, size.
pairs.sort(key=lambda s: s[0])

# Display pairs.
for pair in pairs:
    print(pair)

# Thanks for every support and feedback

- Andi Haghofer
- Daniel Knittl-Frank
- Rainhard Findling
- Gerald Zwettler
- ...
