# Import pose estimation model

## Define output format

Let's load the JSON file which describes the human pose task.  This is in COCO format, it is the category descriptor pulled from the annotations file.  We modify the COCO category slightly, to add a neck keypoint.  We will use this task description JSON to create a topology tensor, which is an intermediate data structure that describes the part linkages, as well as which channels in the part affinity field each linkage corresponds to.

In [None]:
import os
os.environ['MPLCONFIGDIR'] = os.getcwd() + "/configs/" # Specify MatplotLib config folder

import json
import numpy as np
# Requiere https://github.com/NVIDIA-AI-IOT/trt_pose
import trt_pose.coco
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

## Import TensorRT optimized model

Next, we'll load our model. It has been optimized using another Notebook and saved so that we do not need to perform optimization again, we can just load the model. Please note that TensorRT has device specific optimizations, so you can only use an optimized model on similar platforms.

In [None]:
import torch
# Requiere https://github.com/NVIDIA-AI-IOT/torch2trt
from torch2trt import TRTModule

OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

# Define video-processing pipeline

## Pre-process image for TRT_Pose

Next, let's define a function that will preprocess the image, which is originally in BGR8 / HWC format. It is formated to the default Torch format.

In [None]:
import cv2
import torchvision.transforms as transforms
import PIL.Image

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

## Access video feed

Access images streamed by a WiFi camera on the local network.

In [None]:
WIDTH, HEIGHT = 224, 224 # Imposed by the model

import ipywidgets
from IPython.display import display
import urllib.request

image_w = ipywidgets.Image(format='jpeg')

display(image_w)

url_esp32 = 'http://192.168.0.163/capture'
def fetch_image(url):
    imgResp = urllib.request.urlopen(url)
    imgNp = np.array(bytearray(imgResp.read()),dtype=np.uint8)
    img = cv2.imdecode(imgNp,-1)
    return cv2.resize(img, (WIDTH, HEIGHT), interpolation = cv2.INTER_AREA)

## Get keypoints with TRT-Pose

In [None]:
def get_keypoints(counts, objects, peak, indexBody=0):
    #if indexBody<counts[0]:
    #    return None
    kpoint = {}
    human = objects[0][indexBody]
    C = human.shape[0]
    for j in range(C):
        k = int(human[j])
        if k >= 0:
            peak = peaks[0][j][k]   # peak[1]:width, peak[0]:height
            kpoint[j] = [float(peak[1]),float(peak[0])]
            #print('indexBody:%d : success [%5.3f, %5.3f]'%(j, peak[1], peak[2]) )
        else:
        
            kpoint[j] = [None, None]
            #print('indexBody:%d : None'%(j) )
    return kpoint

def get_cmap_paf(image):
        data = preprocess(image)
        cmap, paf = model_trt(data)
        cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
        return cmap, paf

## Keypoints analysis

### Detect if left hand is raised

In [None]:
def isRaisingHand(poseKeypoints):        
        #rightHand_idx, rightHand_y, rightHand_x = poseKeypoints[9]
        leftHand_x, leftHand_y = poseKeypoints[10]
        rightShoulder_x, rightShoulder_y = poseKeypoints[6]
        leftShoulder_x, leftShoulder_y = poseKeypoints[5]
        
        if rightShoulder_y and leftShoulder_y and leftHand_y:
            #print(rightShoulder_y, leftShoulder_y, leftHand_y)
            shoulderSlope = (rightShoulder_y - leftShoulder_y) / (
                rightShoulder_x - leftShoulder_x
            )
            shoulderOri = rightShoulder_y - shoulderSlope * rightShoulder_x
            raisingLeft = leftHand_y < (shoulderSlope * leftHand_x + shoulderOri)
            return raisingLeft
        else:
            return False

## MQTT connection with IoT hub

In [None]:
import paho.mqtt.client as mqtt
client = mqtt.Client()
client.connect('192.168.0.151', 1883, 60)

# Main Processing loop

- Read image
- Infere key-points
- Detect raised left hands
- Send MQTT update
- Draw skeleton on the input image
- Update in output window.

In [None]:
try:
    client.publish("devices/pose-estimation/jetson-nano-1/status", "online")
    while True:
        # Video processing
        image = fetch_image(url_esp32)
        cmap, paf = get_cmap_paf(image)
        counts, objects, peaks = parse_objects(cmap, paf)
        keypoints = get_keypoints(counts, objects, peaks)
        
        # Keypoints analysis
        label_pose = "No_Pose"
        if type(keypoints) != type(None) and isRaisingHand(keypoints):
            label_pose = "RaisedHand"
            
        # Send MQTT update
        pose_info = {"PoseLabel" : label_pose, "Attributes" : {"Keypoints" : keypoints}}
        pose_info_json = json.dumps(pose_info)
        client.publish("devices/pose-estimation/jetson-nano-1/postures-info", pose_info_json)
        
        # Display image locally
        draw_objects(image, counts, objects, peaks)
        image_w.value = bytes(cv2.imencode('.jpg', image[:, ::-1, :])[1])
except KeyboardInterrupt:
    client.publish("devices/pose-estimation/jetson-nano-1/status", "offline")
    print('Video processing stopped')