# Pose Detection

In [7]:
import numpy as np
import cv2
import torch
from torch2trt import TRTModule
import torchvision.transforms as transforms
import PIL.Image
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects


print("Loading Pose Detection Model")
OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'
model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))
print("Model Loaded")

print("Loading coco topology")
def get_topology():
    import trt_pose.coco
    import json
    with open('human_pose.json', 'r') as f:
        human_pose = json.load(f)

    return trt_pose.coco.coco_category_to_topology(human_pose)

topology = get_topology()

print("Initializing pose estimation pipeline")
#Preprocessing constants
WIDTH, HEIGHT = 224, 224
mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

#The actual pose estimation
parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)
def preprocess(image):
    global device
    device = torch.device('cuda')
    
    image = cv2.resize(image, (WIDTH, HEIGHT))
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

def draw_pose(input_img):
    W, H, _ = input_img.shape
    cmap, paf = model_trt(preprocess(input_img))
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    draw_objects(input_img, counts, objects, peaks)
    
    return cv2.resize(input_img, (H, W))
print("All done!")

Loading Pose Detection Model
Model Loaded
Loading coco topology
Initializing pose estimation pipeline
All done!


In [8]:
#Camera streaming
import pyrealsense2 as rs

pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)

# Create an align object
align_to = rs.stream.color
align = rs.align(align_to)

# Start streaming
pipeline.start(config)

try:
    while True:

        # Wait for a coherent pair of frames: depth and color
        frames = pipeline.wait_for_frames()
        # Align the depth frame to color frame
        aligned_frames = align.process(frames)

        # Get aligned frames
        depth_frame = aligned_frames.get_depth_frame() # aligned_depth_frame is a 640x480 depth image
        color_frame = aligned_frames.get_color_frame()

        if not depth_frame or not color_frame:
            continue

        # Convert images to numpy arrays
        depth_image = np.asanyarray(depth_frame.get_data())
        color_image = np.asanyarray(color_frame.get_data())
        
        #Do pose estimation
        color_image = draw_pose(color_image)

        #Show image somehow
        # Apply colormap on depth image (image must be converted to 8-bit per pixel first)
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)

        # Stack both images horizontally
        images = np.hstack((color_image, depth_colormap))

        # Show images
        cv2.namedWindow('RealSense', cv2.WINDOW_AUTOSIZE)
        cv2.imshow('RealSense', images)
        cv2.waitKey(1)


finally:

    # Stop streaming
    pipeline.stop()


KeyboardInterrupt: 