In [13]:
%reset

In [14]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
import matplotlib.pyplot as plt
%matplotlib qt
from matplotlib.animation import FuncAnimation
import tarfile

In [15]:
# Load Model from tfhub.dev
# model = hub.load('https://tfhub.dev/google/movenet/multipose/lightning/1') # Stopped working for some reason
model = tf.saved_model.load('model\\')
movenet = model.signatures['serving_default']

In [16]:
# Using NVIDIA GPU for speed (comment out if you do not have a NVIDIA GPU or if previously ran)
# gpus = tf.config.experimental.list_physical_devices('GPU')
# for gpu in gpus:
#     tf.config.experimental.set_memory_growth(gpu, True)

# Functions

In [17]:
# Draw circles on live frame where key points are located above certain threshold
def draw_keypoints(frame, keypoints, confidence_threshold):
    # Loop through every keypoint, draw circle on frame if confidence_threshold is high
    for kp in keypoints: # kp = keypoint
        kpy, kpx, kpc = kp
        if kpc > confidence_threshold:
            cv2.circle(frame, (int(kpx), int(kpy)), 6, (0,255,0), -1) # (image, (x, y), size, color, fill circle)
            # cv2.putText(frame, 'text', (int(kpx), int(kpy)), fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=1, color=(0,255,0), thickness=1)

# Dictionary that maps from joint names to keypoint indices (from the google colab on tfhub)
keypoint_dict = {
    'nose': 0,
    'left_eye': 1,
    'right_eye': 2,
    'left_ear': 3,
    'right_ear': 4,
    'left_shoulder': 5,
    'right_shoulder': 6,
    'left_elbow': 7,
    'right_elbow': 8,
    'left_wrist': 9,
    'right_wrist': 10,
    'left_hip': 11,
    'right_hip': 12,
    'left_knee': 13,
    'right_knee': 14,
    'left_ankle': 15,
    'right_ankle': 16
}

# Dictionary that maps connected keypoints lines with a color (from the google colab on tfhub)
edges = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

# Draw connections between keypoints
def draw_connections(frame, keypoints, edges, confidence_threshold):

    for edge, color in edges.items():
        p1, p2 = edge
        y1, x1, c1 = keypoints[p1]
        y2, x2, c2 = keypoints[p2]
    
        # If each point has high enough confidence, draw points connecting the two
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 2)

# Loop through 6 people with highest confidence scores
def loop_through_people(frame, outputs, edges, confidence_threshold):
    for people in outputs:
        draw_connections(frame, people, edges, confidence_threshold)
        draw_keypoints(frame, people, confidence_threshold)

# Get outputs from the Movenet Model and store in outputs
def get_outputs(frame):
    # Reframe image to multiple of 32 and dtype to int32 per model definition
    img = frame.copy()
    img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192, 256) # Scale according to webcam
    img = tf.cast(img, dtype=tf.int32)

    # Implement model
    outputs = movenet(img)
    outputs = outputs['output_0'].numpy()[:,:,:51].reshape((6,17,3)) # Get keypoints with scores
    y, x, c = frame.shape
    outputs = np.squeeze(np.multiply(outputs, [y,x,1])) # Multiply normalized coordinates by image size
    return outputs

# Get live video
def get_video(cap, edges, confidence_interval, flag):
    assert flag == 0 or flag == 1
    ret, frame = cap.read()

    outputs = get_outputs(frame)

    loop_through_people(frame, outputs, edges, confidence_interval)
    if flag == 0:
        cv2.imshow('Movenet Multipose 1', frame) # Show live frame
    elif flag == 1:
        cv2.imshow('Movenet Multipose 2', frame) # Show live frame
    
    return outputs

# Get output boolean matrix to only include points with high confidence in both images
def get_output_boolean(outputs1, outputs2, confidence_interval):
    outputs_bool1 = outputs1[:,:,2]>confidence_interval
    outputs_bool2 = outputs2[:,:,2]>confidence_interval
    outputs_bool = outputs_bool1 & outputs_bool2
    return outputs_bool

# Get the undistorted points from the outputs of the Movenet Model
def get_undistorted(outputs, calib):
    undistorted = np.full((17, 2), False)
    for i in np.arange(np.size(outputs[:,0,0])):
        pts = outputs[i,:,:2]
        pts = np.squeeze(cv2.undistortPoints(np.float32(pts), calib['mtx'], calib['dist']))
        pts[:,0] = pts[:,0]*calib['mtx'][0,0]+calib['mtx'][0,2]
        pts[:,1] = pts[:,1]*calib['mtx'][1,1]+calib['mtx'][1,2]
        undistorted = np.dstack((undistorted, pts))
    undistorted = np.swapaxes(np.swapaxes(undistorted[:, :, 1:], 0, 2), 1, 2)
    return undistorted

# Calculate depth using triangulation between the two images
# Source: https://link.springer.com/article/10.1007/s11263-017-1036-4
def get_depth(undistorted1, undistorted2, baseline, calib):
    depth = np.full((17), False)
    for i in np.arange(np.size(undistorted1[:,0,0])):
        depth_i = calib['mtx'][0,0]*baseline/abs(undistorted1[i,:,1]-undistorted2[i,:,1])
        depth = np.dstack((depth, depth_i))
    depth = np.swapaxes(np.squeeze(depth[:,:,1:]), 0, 1)
    return depth

# Return 3D coordinates based on World Coordinate unit vector multiplied by depth
def get_3d_coord(undistorted, depth, calib, outputs_bool):
    undistorted = np.dstack((undistorted, np.ones((6,17))))
    undistorted = np.swapaxes(undistorted, 1, 2)
    temp=undistorted.copy()
    undistorted[:,0,:]=temp[:,1,:]; undistorted[:,1,:]=temp[:,0,:]; del temp # Rewrite undistorted for [x,y] rather than [y,x]
    world = np.swapaxes(np.dot(np.linalg.inv(calib['mtx']), undistorted), 0, 1)
    depth = np.swapaxes(np.dstack((depth, depth, depth)), 1, 2) # To multiply the [x,y,z] coordinates
    coords = depth*world # Multiply by depth
    outputs_bool = np.swapaxes(np.dstack((outputs_bool,outputs_bool,outputs_bool)), 1, 2)
    coords = coords*outputs_bool # Only use points in both images
    return coords

# Video Capture

In [18]:
# INPUTS
confidence_interval = 0.4 # Adjust depending on accuracy requirements
baseline = 6 # note your unit (inches)
calib1 = np.load('internal_webcam_calibration.npz') # Load in calib 1
calib2 = np.load('external_webcam_calibration.npz') # Load in calib 2
outputs_bool = np.full((6,17), False) # Instantiate outputs_bool

In [19]:
# Webcam 
cap1 = cv2.VideoCapture(1) # (may have to adjust x in cv2.VideoCapture(x) to proper webcam (0, 1, 2... etc.))
cap2 = cv2.VideoCapture(0) # Note which webcam is which

from IPython.display import display, clear_output
fig = plt.figure()
plt.style.use('fivethirtyeight')
ax = fig.add_subplot(projection='3d')

while cap1.isOpened() or cap2.isOpened():
    outputs1 = get_video(cap1, edges, confidence_interval, 0)
    outputs2 = get_video(cap2, edges, confidence_interval, 1)
    
    # Return 3D coordinates in each camera coordinates
    outputs_bool = get_output_boolean(outputs1, outputs2, confidence_interval)
    undistorted1 = get_undistorted(outputs1, calib1)
    undistorted2 = get_undistorted(outputs2, calib2)
    depth1 = get_depth(undistorted1, undistorted2, baseline, calib1)
    depth2 = get_depth(undistorted1, undistorted2, baseline, calib2)
    coords1 = get_3d_coord(undistorted1, depth1, calib1, outputs_bool)
    coords2 = get_3d_coord(undistorted2, depth2, calib2, outputs_bool)

    # Plot 3D coordinates using Matplotlib
    ax.cla()
    ax.plot(coords1[0,0,:], coords1[0,2,:], -coords1[0,1,:], 'xb') # reorient for correct visualization
    for edge, color in edges.items(): # Loop through every coordinate and connect points like in connect_keypoints
        p1, p2 = edge
        x1, y1, z1 = coords1[0,:,p1]
        x2, y2, z2 = coords1[0,:,p2]
        if x1 != 0 and x2 !=0:
            ax.plot([x1, x2], [z1, z2], [-y1, -y2], '-r')
    ax.set_xlabel('X'); ax.set_ylabel('Z'), ax.set_zlabel('Y')
    ax.set_xlim([-30, 30]); ax.set_ylim([0, 50]); ax.set_zlim([-30, 30]) # Set axes of graph
    clear_output(wait = True)
    plt.pause(0.0001)

    if cv2.waitKey(10) & 0xFF==ord('q'): # Press q to quit live stream
        break

cap1.release()
cap2.release()
cv2.destroyAllWindows()