# 3D skeleton tracking

Here we will create a 3d skeleton using OpenPifPaf to have the coordinates in the image coordinates and use the depth capacities of the intel Realsens d455 to add depth (3rd dimension).

Note: it's fairly important to use the included usb cable as others cables mignt not be recognized as usb 3.x. This is required in order to have everything working out smoothly.

In [1]:
import io
import numpy as np
import PIL
import torch
import openpifpaf
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from timeit import timeit
from time import time
import os
import cv2
import pyrealsense2 as rs
import math as m

  from .autonotebook import tqdm as notebook_tqdm


#### check version and available hardware

In [2]:
print('OpenPifPaf version', openpifpaf.__version__)
print('PyTorch version', torch.__version__)

OpenPifPaf version 0.13.11
PyTorch version 1.13.1


In [3]:
print(torch.cuda.is_available())
print(torch.cuda.get_device_name())

True
NVIDIA GeForce RTX 3050 Laptop GPU


#### Openpifpaf predictor

In [4]:
predictor = openpifpaf.Predictor(checkpoint='shufflenetv2k16') #could use mobilenetv3small for betterperformances

#### Magic numbers and dicts

#### Utils

In [5]:
def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def compute_angle(shoudler, hip, elbow):
    """ Returns the angle in radiants between of the arm defined by its shoudler, hip and elbow"""
    v1 = unit_vector(hip - shoudler)
    v2 = unit_vector(elbow - shoudler)
    return np.arccos(np.clip(np.dot(v1, v2),-1.0,1.0))*360/(2*np.pi)

  
def unit_vector(vector):
    """ Returns the unit vector of the vector.  """
    return vector / np.linalg.norm(vector)

def compute_angle(shoudler, hip, elbow):
    """ Returns the angle in radiants between of the arm defined by its shoudler, hip and elbow"""
    v1 = unit_vector(hip - shoudler)
    v2 = unit_vector(elbow - shoudler)
    return np.arccos(np.clip(np.dot(v1, v2),-1.0,1.0))*360/(2*np.pi)

def compute_arms_angle(prediction):
    """ Returns the angle of left and right arm wrt to the torso (return nan if the arm is not seen) """
    
    pos_dict = {
        "nose": 0,
        "left_eye": 1,
        "right_eye": 2,
        "left_ear": 3,
        "right_ear": 4,
        "left_shoudler": 5,
        "right_shoudler": 6,
        "left_elbow": 7,
        "right_elbow": 8,
        "left_wrist": 9,
        "right_wrist": 10,
        "left_hip": 11,
        "right_hip": 12,
        "left_knee": 13,
        "right_knee": 14,
        "left_ankle": 15,
        "right_ankle": 16}
    
    # we suppose only on person on the image (index = 0)
    keypoints = predictions[0].data

    # left arm
    left_shoulder = keypoints[pos_dict["left_shoudler"],0:2]
    left_hip = keypoints[pos_dict["left_hip"],0:2]
    left_elbow = keypoints[pos_dict["left_elbow"],0:2]

    # right arm
    right_shoulder = keypoints[pos_dict["right_shoudler"],0:2]
    right_hip = keypoints[pos_dict["right_hip"],0:2]
    right_elbow = keypoints[pos_dict["right_elbow"],0:2]

    return(compute_angle(left_shoulder, left_hip, left_elbow), compute_angle(right_shoulder, right_hip, right_elbow))

def angles_to_command(left_angle, right_angle):
    """Returns the command corresponding to arms angle"""
    
    up_angle = 160
    horizontal_angle = 90
    down_angle = 20
    margin = 15

    # backward angle = 160 +/- 10
    if (left_angle > up_angle-margin and left_angle < up_angle+margin) and (right_angle > up_angle-margin and right_angle < up_angle+margin):
        return "backward"
    # forward angle = 90 +/- 10
    if (left_angle > horizontal_angle-margin and left_angle < horizontal_angle+margin) and (right_angle > horizontal_angle-margin and right_angle < horizontal_angle+margin):
        return "forward"
    # left = left: 90 +/- 10 right: 20 +/- 10
    if (left_angle > horizontal_angle-margin and left_angle < horizontal_angle+margin) and (right_angle > down_angle-margin and right_angle < down_angle+margin):
        return "left"
    # right = left: 20 +/- 10 right: 90 +/- 10
    if (left_angle > down_angle-margin and left_angle < down_angle+margin) and (right_angle > horizontal_angle-margin and right_angle < horizontal_angle+margin):
        return "right"
    else:
        return "stop"

In [6]:
def cart2sph(x,y,z):
    XsqPlusYsq = x**2 + y**2
    r = m.sqrt(XsqPlusYsq + z**2)               # r
    elev = m.atan2(z,m.sqrt(XsqPlusYsq))     # theta
    az = m.atan2(y,x)                           # phi
    return r, elev, az

def rad2deg(rad):
    return rad*180/m.pi

In [7]:
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)
YELLOW = (0, 255, 255)

pos_dict = {
        "nose": 0,
        "left_eye": 1,
        "right_eye": 2,
        "left_ear": 3,
        "right_ear": 4,
        "left_shoulder": 5,
        "right_shoulder": 6,
        "left_elbow": 7,
        "right_elbow": 8,
        "left_wrist": 9,
        "right_wrist": 10,
        "left_hip": 11,
        "right_hip": 12,
        "left_knee": 13,
        "right_knee": 14,
        "left_ankle": 15,
        "right_ankle": 16}



In [8]:
pipeline = rs.pipeline()
config = rs.config()

In [15]:
# Get device product line for setting a supporting resolution
pipeline_wrapper = rs.pipeline_wrapper(pipeline)
pipeline_profile = config.resolve(pipeline_wrapper)
device = pipeline_profile.get_device()
device_product_line = str(device.get_info(rs.camera_info.product_line))

In [14]:
print(device)

<pyrealsense2.device: Intel RealSense D455 (S/N: 141322250607  FW: 05.14.00.00  on USB3.2)>


In [11]:
found_rgb = False
for s in device.sensors:
    if s.get_info(rs.camera_info.name) == 'RGB Camera':
        found_rgb = True
        break
if not found_rgb:
    print("The demo requires Depth camera with Color sensor")
    exit(0)


In [16]:
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

if device_product_line == 'L500':
    config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 30)
else:
    config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)


In [18]:
pipeline.stop()

In [17]:
np.set_printoptions(formatter={'float': lambda x: "{0:0.2f}".format(x)})

In [25]:
# Start streaming
#pipeline.stop()
cfg = pipeline.start(config)

debug = False
detection = True 

colorizer = rs.colorizer()

profile = cfg.get_stream(rs.stream.depth) # Fetch stream profile for depth stream
intr = profile.as_video_stream_profile().get_intrinsics() # Downcast to video_stream_profile and fetch intrinsics

while True:

    c = cv2.waitKey(1)
    if c == 27: # press escape to quit
        break

    # Wait for a coherent pair of frames: depth and color
    frames = pipeline.wait_for_frames()

    # Create alignment primitive with color as its target stream:
    align = rs.align(rs.stream.color)
    frameset = align.process(frames)

    # get aligned frames
    depth_frame = frameset.get_depth_frame()
    color_frame = frameset.get_color_frame()
    if not depth_frame or not color_frame:
        continue

    # Convert images to numpy arrays
    depth_image = np.asanyarray(depth_frame.get_data())
    color_image = np.asanyarray(color_frame.get_data())
    
    
    # ==== OPENPIFPAF =====
    if detection:
        # img = cv2.cvtColor(color_frame, cv2.COLOR_BGR2RGB)
        pil_img = PIL.Image.fromarray(color_image)
        frame = color_image

        predictions, _, _ = predictor.pil_image(pil_img)

        if debug:
            print(f"Angle of the left arm : {left_angle}\nAngle of the right arm : {right_angle}\n")

        try:
            left_angle, right_angle = compute_arms_angle(predictions)

            # writes desired command
            command = angles_to_command(left_angle, right_angle)
            # cv2.putText(img = frame, text=f"command: {command}", org = (0,60), fontFace=cv2.FONT_HERSHEY_TRIPLEX, fontScale=1, color=GREEN,thickness=3)
            
            # ==== skeleton drawing ====
            #draw arms and hips
            keypoints = predictions[0].data

            # left arm
            left_shoulder = keypoints[pos_dict["left_shoulder"],0:2]
            left_hip = keypoints[pos_dict["left_hip"],0:2]
            left_elbow = keypoints[pos_dict["left_elbow"],0:2]
            left_wrist = keypoints[pos_dict["left_wrist"],0:2]

            # right arm
            right_shoulder = keypoints[pos_dict["right_shoulder"],0:2]
            right_hip = keypoints[pos_dict["right_hip"],0:2]
            right_elbow = keypoints[pos_dict["right_elbow"],0:2]
            right_wrist = keypoints[pos_dict["right_wrist"],0:2]

            # cv2.line(frame, tuple(map(int, tuple(left_shoulder))), tuple(map(int, tuple(left_elbow))), RED, 5)
            # cv2.line(frame, tuple(map(int, tuple(left_shoulder))), tuple(map(int, tuple(left_hip))), YELLOW, 5)
            # cv2.line(frame, tuple(map(int, tuple(right_shoulder))), tuple(map(int, tuple(right_elbow))), RED, 5)
            # cv2.line(frame, tuple(map(int, tuple(right_shoulder))), tuple(map(int, tuple(right_hip))), YELLOW, 5)

            cv2.line(frame, tuple(map(int, tuple(right_elbow))), tuple(map(int, tuple(right_wrist))), RED, 5)
            # display_whole_skeleton(frame, keypoints)
            # ==== END skeleton drawing ====

            # get 3d coordinates of the left and right elbow
            left_elbow_3d = rs.rs2_deproject_pixel_to_point(intr, left_elbow, depth_frame.get_distance(int(left_elbow[0]), int(left_elbow[1])))
            right_elbow_3d = rs.rs2_deproject_pixel_to_point(intr, right_elbow, depth_frame.get_distance(int(right_elbow[0]), int(right_elbow[1])))

            #get 3d coordinates of the left and right wrists
            left_wrist_3d = rs.rs2_deproject_pixel_to_point(intr, left_wrist, depth_frame.get_distance(int(left_wrist[0]), int(left_wrist[1])))
            right_wrist_3d = rs.rs2_deproject_pixel_to_point(intr, right_wrist, depth_frame.get_distance(int(right_wrist[0]), int(right_wrist[1])))

            # compute vector from elbow to wrist
            left_forearm_vector = np.array(left_wrist_3d) - np.array(left_elbow_3d)
            right_forearm_vector = np.array(right_wrist_3d) - np.array(right_elbow_3d)

            right_forearm_vector[[1,2]] = right_forearm_vector[[2,1]] # swap y and z
            right_forearm_vector[2] = -right_forearm_vector[2] # invert z

            # compute length of the forearm
            left_forearm_length = np.linalg.norm(left_forearm_vector)
            right_forearm_length = np.linalg.norm(right_forearm_vector)

            # convert to spherical coordinates
            _, elevation, azimuth = cart2sph(*right_forearm_vector)
            elevation = rad2deg(elevation)
            azimuth = rad2deg(azimuth)

            cv2.putText(img = frame, text=f"elevation: {elevation:.1f}, azimuth: {azimuth:.1f}", org = (0,60), fontFace=cv2.FONT_HERSHEY_TRIPLEX, fontScale=0.5, color=GREEN,thickness=1)
            cv2.putText(img = frame, text=f"right forearm vector: {right_forearm_vector}", org = (0,120), fontFace=cv2.FONT_HERSHEY_TRIPLEX, fontScale=0.5, color=GREEN,thickness=1)

            # print(f"elevation: {rad2deg(elevation)}, azimuth: {rad2deg(azimuth)}", end = "\r")
            # print(f"right forearm vector: {right_forearm_vector}", end = "\r")

        except:
            cv2.putText(img = frame, text=f"no detection", org = (0,60), fontFace=cv2.FONT_HERSHEY_TRIPLEX, fontScale=1, color=GREEN,thickness=2)

    # ==== END OPENPIFPAF =====
    
    # Update color and depth frames:
    aligned_depth_frame = frameset.get_depth_frame()
    colorized_depth = np.asanyarray(colorizer.colorize(aligned_depth_frame).get_data())

    # Show the two frames together:
    images = np.hstack((color_image, colorized_depth))
    # plt.imshow(images)

    # Show images
    cv2.namedWindow('RealSense', cv2.WINDOW_NORMAL)
    cv2.imshow('RealSense', images)
    cv2.waitKey(1)

# Stop streaming
pipeline.stop()

# Release resources
cv2.destroyAllWindows()

  return vector / np.linalg.norm(vector)


In [None]:
rs.deproject_pixel_to_point()