<a href="https://colab.research.google.com/github/Dina-Essam/PoseEstimation/blob/master/PoseEstimation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount("/content/drive/")

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive/


In [2]:
import os
import numpy as np
from scipy.ndimage import gaussian_filter, maximum_filter
import cv2
from keras.models import model_from_json

Using TensorFlow backend.


In [0]:
def loadModel(json, weights):
    json_file = open(json, 'r')
    loaded_model_json = json_file.read()
    json_file.close()
    loaded_model = model_from_json(loaded_model_json)
    loaded_model.load_weights(weights)  
    return loaded_model




def non_max_supression(plain, windowSize=3, threshold=1e-6):
    # clear value less than threshold
    under_th_indices = plain < threshold
    plain[under_th_indices] = 0
    return plain * (plain == maximum_filter(plain, footprint=np.ones((windowSize, windowSize))))

def render_joints(cvmat, joints, conf_th=0.01):
    for _joint in joints:
        _x, _y, _conf = _joint
        cv2.circle(cvmat, center=(int(_x), int(_y)), color=(0, 255, 0), radius=7, thickness=2)

    return cvmat

def post_process_heatmap(heatMap, kpConfidenceTh=0.2):
    kplst = list()
    for i in range(heatMap.shape[-1]): 
        # ignore last channel, background channel
        _map = heatMap[:, :, i]
        _map = gaussian_filter(_map, sigma=0.5)
        _nmsPeaks = non_max_supression(_map, windowSize=3, threshold=1e-6)

        y, x = np.where(_nmsPeaks == _nmsPeaks.max())
        if len(x) > 0 and len(y) > 0:
            kplst.append((int(x[0]), int(y[0]), _nmsPeaks[y[0], x[0]]))
        else:
            kplst.append((0, 0, 0))
    return kplst

def inference(img, loaded_model):
    #imgdata = cv2.imread(img)
    scale = (img.shape[0] * 1.0 / 256, img.shape[1] * 1.0 / 256)
    imgdata = cv2.resize(img, (256,256))
    
    mean = np.array([0.4404, 0.4440, 0.4327], dtype=np.float)
    imgdata = normalize(imgdata, mean)
    input = imgdata[np.newaxis, :, :, :]
    out = loaded_model.predict(input)
    return out[-1], scale
def normalize(imgdata, color_mean):

    imgdata = imgdata / 255.0

    for i in range(imgdata.shape[-1]):
        imgdata[:, :, i] -= color_mean[i]

    return imgdata


keys = ['r_ankle', 'r_knee', 'r_hip',
                'l_hip', 'l_knee', 'l_ankle',
                'plevis', 'thorax', 'upper_neck', 'head_top',
                'r_wrist', 'r_elbow', 'r_shoulder',
                'l_shoulder', 'l_elbow', 'l_wrist']


In [0]:
###Predect 2D
def predict_2dpose(path_to_video,path_to_model,path_to_weights):
    model = loadModel(path_to_model,path_to_weights)

    cap = cv2.VideoCapture(path_to_video)

    frame_joints=[]
    while(1):
        ret, frame = cap.read()
        if ret is False:
            break
        out, scale= inference(frame, model)
        keypoints = post_process_heatmap(out[0,:,:,:])
        joints=[]
        for _kp in keypoints:
            joints.append([_kp[0],_kp[1]])
        frame_joints.append(joints)
    cap.release()
    frame_joints = np.array(frame_joints)
    return frame_joints

In [0]:
import numpy as np
import copy
import tensorflow as tf
from tensorflow import keras
import pickle

# Stacked Hourglass produces 16 joints. These are the names.
SH_NAMES = ['']*16
SH_NAMES[0]  = 'RFoot'
SH_NAMES[1]  = 'RKnee'
SH_NAMES[2]  = 'RHip'
SH_NAMES[3]  = 'LHip'
SH_NAMES[4]  = 'LKnee'
SH_NAMES[5]  = 'LFoot'
SH_NAMES[6]  = 'Hip'
SH_NAMES[7]  = 'Spine'
SH_NAMES[8]  = 'Thorax'
SH_NAMES[9]  = 'Head'
SH_NAMES[10] = 'RWrist'
SH_NAMES[11] = 'RElbow'
SH_NAMES[12] = 'RShoulder'
SH_NAMES[13] = 'LShoulder'
SH_NAMES[14] = 'LElbow'
SH_NAMES[15] = 'LWrist'
# Joints in H3.6M -- data has 32 joints, but only 17 that move; these are the indices.
H36M_NAMES = ['']*32
H36M_NAMES[0]  = 'Hip'
H36M_NAMES[1]  = 'RHip'
H36M_NAMES[2]  = 'RKnee'
H36M_NAMES[3]  = 'RFoot'
H36M_NAMES[6]  = 'LHip'
H36M_NAMES[7]  = 'LKnee'
H36M_NAMES[8]  = 'LFoot'
H36M_NAMES[12] = 'Spine'
H36M_NAMES[13] = 'Thorax'
H36M_NAMES[14] = 'Neck/Nose'
H36M_NAMES[15] = 'Head'
H36M_NAMES[17] = 'LShoulder'
H36M_NAMES[18] = 'LElbow'
H36M_NAMES[19] = 'LWrist'
H36M_NAMES[25] = 'RShoulder'
H36M_NAMES[26] = 'RElbow'
H36M_NAMES[27] = 'RWrist'

In [0]:
import tensorflow as tf
from tensorflow.python.keras import layers
from tensorflow.python.keras.layers import Add
from tensorflow.python.keras.layers.normalization import BatchNormalization
import numpy as np


def Linear_Model(X,drop):

    X = tf.keras.Input(shape=(32,))
    inp = X
    # First layer, brings dimensionality up to linear_size
    X = layers.Dense(1024, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    X = BatchNormalization()(X)
    X = layers.Activation('relu')(X)
    X = layers.Dropout(drop)(X)

    X_shortcut = X
    # we can think of this chunk as the input layer
    X = layers.Dense(1024, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    X = BatchNormalization()(X)
    X = layers.Activation('relu')(X)
    X = layers.Dropout(drop)(X)

    # we can think of this chunk as the hidden layer
    X = layers.Dense(1024, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    X = BatchNormalization()(X)
    X = layers.Activation('relu')(X)
    X = layers.Dropout(drop)(X)

    # residual connection
    X = Add()([X, X_shortcut])

    X_shortcut = X
    # we can think of this chunk as the input layer
    X = layers.Dense(1024, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    X = BatchNormalization()(X)
    X = layers.Activation('relu')(X)
    X = layers.Dropout(drop)(X)

    # we can think of this chunk as the hidden layer
    X = layers.Dense(1024, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    X = BatchNormalization()(X)
    X = layers.Activation('relu')(X)
    X = layers.Dropout(drop)(X)

    # residual connection
    X = tf.keras.layers.Add()([X, X_shortcut])

    # Last Layer
    X = X = layers.Dense(42, use_bias=True, kernel_initializer=tf.keras.initializers.he_normal(seed=None))(X)
    predictions = layers.Activation('linear')(X)
    model = tf.keras.Model(inputs=inp, outputs=predictions)
    return model


In [0]:
def unNormalizeData(normalized_data, data_mean, data_std, dimensions_to_ignore):
  """
  Un-normalizes a matrix whose mean has been substracted and that has been divided by
  standard deviation. Some dimensions might also be missing
  Args
    normalized_data: nxd matrix to unnormalize
    data_mean: np vector with the mean of the data
    data_std: np vector with the standard deviation of the data
    dimensions_to_ignore: list of dimensions that were removed from the original data
  Returns
    orig_data: the input normalized_data, but unnormalized
  """
  T = normalized_data.shape[0] # Batch size
  D = data_mean.shape[0] # Dimensionality

  orig_data = np.zeros((T, D), dtype=np.float32)
  dimensions_to_use = np.array([dim for dim in range(D)
                                if dim not in dimensions_to_ignore])

  orig_data[:, dimensions_to_use] = normalized_data

  # Multiply times stdev and add the mean
  stdMat = data_std.reshape((1, D))
  stdMat = np.repeat(stdMat, T, axis=0)
  meanMat = data_mean.reshape((1, D))
  meanMat = np.repeat(meanMat, T, axis=0)
  orig_data = np.multiply(orig_data, stdMat) + meanMat
  return orig_data

In [0]:
def prediction3D(sh_2d,model_path):
    # PreProcessing
    #sh_2d=sh_2d.reshape(sh_2d.shape[:-2] + (-1,))#####(30,16,2)to(30,32)
    SH_TO_GT_PERM = np.array([SH_NAMES.index(h) for h in H36M_NAMES if h != '' and h in SH_NAMES])
    sh_2d = sh_2d[:, SH_TO_GT_PERM, :]
    # Reshape into n x (32*2) matrix
    sh_2d = np.reshape(sh_2d, [sh_2d.shape[0], -1])
    poses_final = np.zeros([sh_2d.shape[0], len(H36M_NAMES) * 2])
    dim_to_use_x = np.where(np.array([x != '' and x != 'Neck/Nose' for x in H36M_NAMES]))[0] * 2
    dim_to_use_y = dim_to_use_x + 1
    dim_to_use = np.zeros(len(SH_NAMES) * 2, dtype=np.int32)
    dim_to_use[0::2] = dim_to_use_x
    dim_to_use[1::2] = dim_to_use_y
    poses_final[:, dim_to_use] = sh_2d

    complete_train = copy.deepcopy(np.vstack(poses_final))
    data_mean = np.mean(complete_train, axis=0)
    data_std = np.std(complete_train, axis=0)
    dimensions_to_ignore=[]
    dimensions_to_use = np.where(np.array([x != '' and x != 'Neck/Nose' for x in H36M_NAMES]))[0]
    dimensions_to_use = np.sort(np.hstack((dimensions_to_use * 2, dimensions_to_use * 2 + 1)))
    dimensions_to_ignore = np.delete(np.arange(len(H36M_NAMES) * 2), dimensions_to_use)
    poses_final = poses_final[:, dimensions_to_use]
    mu = data_mean[dimensions_to_use]
    stddev = data_std[dimensions_to_use]
    encoder_inputs = np.divide((poses_final - mu), stddev)

    # Training
    model = Linear_Model(encoder_inputs, 0.5)#(30,32)
    model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
                  loss='mse',  # mean squared error
                  metrics=['mean_absolute_error', 'mean_squared_error'])
    model=keras.models.load_model(model_path)
    predictions = model.predict(encoder_inputs)

    #PostProcessing
    enc_in = unNormalizeData(encoder_inputs, data_mean, data_std, dimensions_to_ignore)

    data_mean2 = np.load('./drive/My Drive/GP/weights/data_mean_3d.npy', allow_pickle=True)
    data_std2 = np.load('./drive/My Drive/GP/weights/data_std_3d.npy', allow_pickle=True)
    dimensions_to_ignore2 = np.load('./drive/My Drive/GP/weights/dim_to_ignore_3d.npy', allow_pickle=True)
    
    
    T = predictions.shape[0]  # Batch size
    D = 96  # Dimensionality
    orig_data = np.zeros((T, D), dtype=np.float32)
    dimensions_to_use = np.array([dim for dim in range(D) if dim not in dimensions_to_ignore2])
    orig_data[:, dimensions_to_use] = predictions
    # Multiply times stdev and add the mean
    stdMat = data_std2.reshape((1, D))
    stdMat = np.repeat(stdMat, T, axis=0)
    meanMat = data_mean2.reshape((1, D))
    meanMat = np.repeat(meanMat, T, axis=0)
    orig_data = np.multiply(orig_data, stdMat) + meanMat
    return orig_data,enc_in

In [0]:
path_to_video='./drive/My Drive/GP/Walking.60457274.mp4'
path_to_model='./drive/My Drive/GP/data/data/net_arch.json'
path_to_weights='./drive/My Drive/GP/data/data/weights_epoch96.h5'
model_path3D="./drive/My Drive/GP/weights/my_Finalmodel64batch201epoch.tfl"

frame_joints= predict_2dpose(path_to_video,path_to_model,path_to_weights)

POINTS3D,POINTS2D = prediction3D(frame_joints,model_path3D)


In [0]:
export_units = {}
for Frame in range(5):
  to_export = POINTS3D.tolist()[Frame]
  x,y,z = [[] for _ in range(3)]
  for o in range(0, len(to_export), 3):
      x.append(to_export[o])
      y.append(to_export[o+1])
      z.append(to_export[o+2])
  export_units[Frame]={}
  for jnt_index, (_x, _y, _z) in enumerate(zip(x,y,z)):
    export_units[Frame][jnt_index] = {"translate": [_x, _y, _z]}
print(export_units[4])

# Save JSON

In [0]:
with open(_out_file, 'w') as outfile:
  logger.info("exported maya json to {0}".format(_out_file))
  json.dump(export_units, outfile)