# Position2Go keypoint localization network


In [None]:
import sys, os, warnings, time, glob

# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras as keras
from tensorflow.keras.layers import *
from tensorflow.keras.activations import *

# Helper libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

print("keras version {}".format(tf.keras.__version__)); del keras
print("tensorflow version {}".format(tf.__version__))

from utils.tf_records_generator import *
from models.res_net import *

%load_ext autoreload
%autoreload 2

## Define data loader

In [None]:
# Define global variables
EPOCHS = 15
SUM_OF_ALL_DATASAMPLES = 3688
BATCHSIZE = 32
SHUFFLE_BUFFER = 512

In [None]:
P2G_TEST_SET_PATH = "/home/kingkolibri/10_catkin_ws/test_records/"
P2G_TRAIN_SET_PATH = "/home/kingkolibri/10_catkin_ws/train_records/"

In [None]:
train_files = glob.glob(P2G_TRAIN_SET_PATH + "p2g_*.tfrecord")
test_files = glob.glob(P2G_TEST_SET_PATH + "p2g_*.tfrecord")
train_files

In [None]:
def create_dataset(filenames, repetitions=-1):
    
    # This works with arrays as well
    dataset = tf.data.TFRecordDataset(filenames=filenames)
    
    # Maps the parser on every filepath in the array
    dataset = dataset.map(parse_p2g_example, num_parallel_calls=8)
    dataset = dataset.repeat(repetitions) # will go on forever        
    # dataset = dataset.shuffle(SHUFFLE_BUFFER)
    dataset = dataset.batch(BATCHSIZE)
    
    # Create an iterator
    iterator = dataset.make_one_shot_iterator()
    
    # Create tf representation of the iterator
    rdm, heatmaps = iterator.get_next()
    
    return rdm, heatmaps

## Build and train model

In [None]:
fn_model = tf.keras.Sequential()
fn_model.add(Flatten(input_shape=(64, 64, 2)))
fn_model.add(Dense(64*64*5))
#fn_model.add(Activation('relu'))
#fn_model.add(Dense(64*64*13))
fn_model.add(Activation('sigmoid'))
fn_model.add(Reshape((64, 64, 13)))

In [None]:
STEPS_PER_EPOCH= int(SUM_OF_ALL_DATASAMPLES / BATCHSIZE)

#Get your datatensors
inputs, labels = create_dataset(train_files)


localizer_model = resnet((64, 64, 8), depth=14)

# Describe model
localizer_model.summary()

### Train model

In [None]:
#Compile model
localizer_model.compile(optimizer=tf.keras.optimizers.Adam(),
                        loss='mean_squared_error',
                        metrics=['acc'],
                        target_tensors=[labels]
                   )

In [None]:
#Train the model
train_hist = localizer_model.fit(
    inputs, labels,
    epochs=EPOCHS, 
    steps_per_epoch=STEPS_PER_EPOCH)

In [None]:
fig, ax1 = plt.subplots()

color = 'tab:red'
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss', color=color)
ax1.plot(train_hist.history['loss'], color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

color = 'tab:blue'
ax2.set_ylabel('Accuracy', color=color)  # we already handled the x-label with ax1
ax2.plot(train_hist.history['acc'], color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()  # otherwise the right y-label is slightly clipped

plt.title("Training history")
plt.show()

### Test trained model

In [None]:
def create_dataset(filenames, repetitions=-1):
    
    # This works with arrays as well
    dataset = tf.data.TFRecordDataset(filenames=filenames)
    
    # Maps the parser on every filepath in the array
    dataset = dataset.map(parse_p2g_example, num_parallel_calls=8)
    dataset = dataset.repeat(repetitions) # will go on forever        
    dataset = dataset.shuffle(SHUFFLE_BUFFER)
    dataset = dataset.batch(BATCHSIZE)
    
    # Create an iterator
    iterator = dataset.make_one_shot_iterator()
    
    # Create tf representation of the iterator
    rdm, heatmaps = iterator.get_next()
    
    return rdm, heatmaps

In [None]:
test_files

In [None]:
#Get your datatensors
inputs, labels = create_dataset(test_files, repetitions=1)

#Combine it with keras
model_input = tf.keras.layers.Input(tensor=inputs)

localizer_model.evaluate(
    x=model_input,
    y=labels,
    verbose=1,
    steps=20
)


## Visualize result

In [None]:
with tf.Session() as sess:
    sess.run(tf.initialize_all_variables()) #execute init_op
    sample_rdm = sess.run(inputs)
    sample_heatmaps = sess.run(labels)
    sample_outputs = localizer_model.predict_on_batch(inputs)

In [None]:
64*64*8*4

In [None]:
plt.matshow(sample_outputs[0,:,:,5])

In [None]:
sample_outputs.shape

### Visualize sample heatmaps

In [None]:
keypoint_columns = [
    'LAnkle', 
    'LElbow', 
    'LHip', 
    'LKnee',
    'LShoulder',
    #['keypoint_0_LSmallToe_x', 'keypoint_0_LSmallToe_y'],
    'LWrist',
    #['keypoint_0_MidHip_x', 'keypoint_0_MidHip_y'],
    #['keypoint_0_Neck_x', 'keypoint_0_Neck_y'],
    'Nose',
    'RAnkle',
    #['keypoint_0_RBigToe_x','keypoint_0_RBigToe_y'], 
    #['keypoint_0_REar_x','keypoint_0_REar_y'], 
    'RElbow', 
    #['keypoint_0_REye_x','keypoint_0_REye_y'], 
    #['keypoint_0_RHeel_x','keypoint_0_RHeel_y'], 
    'RHip', 
    'RKnee', 
    'RShoulder',
    #['keypoint_0_RSmallToe_x', 'keypoint_0_RSmallToe_y'], 
    'RWrist' 
]

### Visualize estimated keypoints

In [None]:
y_test = sample_outputs
y_pred = sample_heatmaps

In [None]:
heigth = y_test.shape[1]
width = y_test.shape[2]
num_keypoints = y_test.shape[3]



for i in np.random.randint(y_test.shape[0], size=(2,)):
    
    
    fig = plt.figure(figsize=(3,3))
  #  ax = fig.add_subplot(1,1,1)
  #  ax.imshow(X_train[i,:,:,0], cmap="gray")
  #  ax.axis("off")
    
    fig = plt.figure(figsize=(20,3))
    count = 1
    
    for j, keypoint in enumerate(keypoint_columns):
        ax = fig.add_subplot(2, num_keypoints, count)
        ax.set_title(keypoint)
        ax.axis("off")
        count += 1
        ax.imshow(y_pred[i,:,:,j])
        if j == 0:
            ax.set_ylabel("prediction")
            
    for j, keypoint in enumerate(keypoint_columns):
        ax = fig.add_subplot(2, num_keypoints,count)
        count += 1
        ax.imshow(y_test[i,:,:,j])   
        ax.axis("off")
        if j == 0:
            ax.set_ylabel("true")
    plt.show()

### Visualize stick man

In [None]:
keypoint_pairs = [
    [('Neck', 'RShoulder'), None],
    [('Neck', 'LShoulder'), None],
    [('RShoulder', 'RElbow'), None],
    [('RElbow', 'RWrist'), None],
    [('LShoulder', 'LElbow'), None],
    [('LElbow', 'LWrist'), None],
    [('Neck', 'RHip'), None],
    [('RHip', 'RKnee'), None],
    [('RKnee', 'RAnkle'), None],
    [('Neck', 'LHip'), None],
    [('LHip', 'LKnee'), None],
    [('LKnee', 'LAnkle'), None],
    [('Neck', 'Nose'), None],
    [('Nose', 'REye'), None],
    [('REye', 'REar'), None],
    [('Nose', 'LEye'), None],
    [('LEye', 'LEar'), None],
    [('RShoulder', 'REar'), None],
    [('LShoulder', 'LEar'), None],
    [('LHip', 'LAnkle'), 'LKnee'],
    [('RHip', 'RAnkle'), 'RKnee'],
    [('RShoulder', 'RAnkle'), 'RElbow'],
    [('LShoulder', 'LAnkle'), 'LElbow'],
    [('RHip', 'LHip'), 'Neck'],
    [('RHip', 'RShoulder'), 'Neck'],
    [('LHip', 'LShoulder'), 'Neck'],
]

In [None]:
body_pred = {}
body_true = {}
for j, keypoint in enumerate(keypoint_columns):
    x_pred = y_pred[i,:,:,j]
    x_true = y_test[i,:,:,j]
    body_pred[keypoint] = np.unravel_index(y_test[i,:,:,j].argmax(), y_test[i,:,:,j].shape)
    body_true[keypoint] = np.unravel_index(y_pred[i,:,:,j].argmax(), y_pred[i,:,:,j].shape)

Let's first check the visualization of the samples

In [None]:
body_true

In [None]:
img = np.zeros((64,64,3), np.uint8)

import cv2
image_h, image_w = img.shape[:2]

centers = {}

# draw point
for key in keypoint_columns:
    if key not in body_true.keys():
        continue

    body_part = body_true[key]
    center = (body_part[1], body_part[0])
    centers[key] = center
    img[center[1], center[0], :] =  [0, 255, 0]

In [None]:
plt.imshow(img)

In [None]:
# draw line
for pair_order, pair in enumerate(keypoint_pairs):
    if pair[0][0] not in body_true.keys() or pair[0][1] not in body_true.keys() or pair[1] in body_true.keys():
        continue

    cv2.line(img, centers[pair[0][0]], centers[pair[0][1]], [255, 0, 0], 1)

In [None]:
plt.imshow(img)

Now lets take a look at the network output

In [None]:
img = np.zeros((64,64,3), np.uint8)

import cv2
image_h, image_w = img.shape[:2]

centers = {}

# draw point
for key in keypoint_columns:
    if key not in body_pred.keys():
        continue

    body_part = body_pred[key]
    center = (body_part[1], body_part[0])
    centers[key] = center
    img[center[1], center[0], :] =  [0, 255, 0]
    
# draw line
for pair_order, pair in enumerate(keypoint_pairs):
    if pair[0][0] not in body_pred.keys() or pair[0][1] not in body_pred.keys() or pair[1] in body_pred.keys():
        continue

    cv2.line(img, centers[pair[0][0]], centers[pair[0][1]], [255, 0, 0], 1)
    
plt.imshow(img)