# Import pose estimation model

## Define output format

Let's load the JSON file which describes the human pose task.  This is in COCO format, it is the category descriptor pulled from the annotations file.  We modify the COCO category slightly, to add a neck keypoint.  We will use this task description JSON to create a topology tensor, which is an intermediate data structure that describes the part linkages, as well as which channels in the part affinity field each linkage corresponds to.

In [1]:
import os
os.environ['MPLCONFIGDIR'] = os.getcwd() + "/configs/" # Specify MatplotLib config folder

import json
import numpy as np
# Requiere https://github.com/NVIDIA-AI-IOT/trt_pose
import trt_pose.coco
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

Matplotlib is building the font cache; this may take a moment.


## Import TensorRT optimized model

Next, we'll load our model. It has been optimized using another Notebook and saved so that we do not need to perform optimization again, we can just load the model. Please note that TensorRT has device specific optimizations, so you can only use an optimized model on similar platforms.

In [2]:
import torch
# Requiere https://github.com/NVIDIA-AI-IOT/torch2trt
from torch2trt import TRTModule

OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

<All keys matched successfully>

# Define video-processing pipeline

## Pre-process image for TRT_Pose

Next, let's define a function that will preprocess the image, which is originally in BGR8 / HWC format. It is formated to the default Torch format.

In [3]:
import cv2
import torchvision.transforms as transforms
import PIL.Image

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

## Access video feed

Access images streamed by a WiFi camera on the local network.

In [4]:
WIDTH_INPUT, HEIGHT_INPUT = 224, 224 # Imposed by the model

import ipywidgets
from IPython.display import display
import urllib.request

image_w = ipywidgets.Image(format='jpeg')

display(image_w)

url_esp32 = 'http://192.168.0.163/capture'
url_IPcam = 'http://192.168.0.244:8080/photo.jpg'
def fetch_image(url):
    imgResp = urllib.request.urlopen(url)
    imgNp = np.array(bytearray(imgResp.read()),dtype=np.uint8)
    img = cv2.imdecode(imgNp,-1)
    img_height, img_width, img_channel = img.shape
    if img_width>img_height:
        img = cv2.resize(img, (int((HEIGHT_INPUT/img_height)*img_width), HEIGHT_INPUT), interpolation = cv2.INTER_AREA)
        img = img[:, img.shape[1]//2 - WIDTH_INPUT//2 : img.shape[1]//2 + WIDTH_INPUT//2]
    else:
        img = cv2.resize(img, (WIDTH_INPUT, int((WIDTH_INPUT/img_width)*img_height)), interpolation = cv2.INTER_AREA)
        img = img[img.shape[0] - HEIGHT_INPUT//2 : img.shape[0] + HEIGHT_INPUT//2,:]
    return img

Image(value=b'', format='jpeg')

## Get keypoints with TRT-Pose

In [5]:
def get_keypoints(counts, objects, peak, indexBody=0):
    #if indexBody<counts[0]:
    #    return None
    kpoint = []
    human = objects[0][indexBody]
    C = human.shape[0]
    for j in range(C):
        k = int(human[j])
        if k >= 0:
            peak = peaks[0][j][k]   # peak[1]:width, peak[0]:height
            kpoint.append([float(peak[1]),float(peak[0])])
            #print('indexBody:%d : success [%5.3f, %5.3f]'%(j, peak[1], peak[2]) )
        else:
        
            kpoint.append([None, None])
            #print('indexBody:%d : None'%(j) )
    return np.array(kpoint)

def get_cmap_paf(image):
        data = preprocess(image)
        cmap, paf = model_trt(data)
        cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
        return cmap, paf

## Get label with pose-classification-kit

In [6]:
from tensorflow import keras
from keras.utils.data_utils import get_file

classificationModelURL = "https://github.com/ArthurFDLR/pose-classification-kit/blob/master/pose_classification_kit/models/Body/CNN_BODY18_1/CNN_BODY18_1.h5?raw=true"
classificationModelPath = get_file(
    "CNN_BODY18_1",
    classificationModelURL
)
classificationModel = keras.models.load_model(classificationModelPath)
classificationModel.summary()

classificationLabelsURL = "https://raw.githubusercontent.com/ArthurFDLR/pose-classification-kit/master/pose_classification_kit/models/Body/CNN_BODY18_1/class.json"
classificationLabelsPath = get_file(
    "CNN_BODY18_1_Info",
    classificationLabelsURL
)

with open(classificationLabelsPath) as f:
    classificationLabels = json.load(f)['labels']
print("labels:", classificationLabels)

def getLengthLimb(data, keypoint1: int, keypoint2: int):
    if type(data[keypoint1, 0]) != type(None) and type(data[keypoint2, 0]) != type(None):
        return np.linalg.norm([data[keypoint1, 0:2] - data[keypoint2, 0:2]])
    return 0

def preprocess_keypoints(keypoints:np.ndarray):
    if type(keypoints) != type(None):
        assert keypoints.shape == (18,2)
        # Find bounding box
        min_x, max_x = float("inf"), 0.0
        min_y, max_y = float("inf"), 0.0
        for k in keypoints:
            if type(k[0]) != type(None):  # If keypoint exists
                min_x = min(min_x, k[0])
                max_x = max(max_x, k[0])
                min_y = min(min_y, k[1])
                max_y = max(max_y, k[1])

        # Centering
        np.subtract(
            keypoints[:, 0],
            (min_x + max_x) / 2.,
            where=keypoints[:, 0] != None,
            out=keypoints[:, 0],
        )
        np.subtract(
            (min_y + max_y) / 2.,
            keypoints[:, 1],
            where=keypoints[:, 0] != None,
            out=keypoints[:, 1],
        )

        # Scaling  
        normalizedPartsLength = np.array(
            [
                getLengthLimb(keypoints, 6, 12) * (16.0 / 5.2),  # Torso right
                getLengthLimb(keypoints, 5, 11) * (16.0 / 5.2),  # Torso left
                getLengthLimb(keypoints, 0, 17) * (16.0 / 2.5),  # Neck
                getLengthLimb(keypoints, 12, 14) * (16.0 / 3.6),  # Right thigh
                getLengthLimb(keypoints, 14, 16) * (16.0 / 3.5),  # Right lower leg
                getLengthLimb(keypoints, 11, 13) * (16.0 / 3.6),  # Left thigh
                getLengthLimb(keypoints, 13, 15) * (16.0 / 3.5),  # Left lower leg
            ]
        )
        
        # Mean of non-zero lengths
        normalizedPartsLength = normalizedPartsLength[normalizedPartsLength > 0.0]
        if len(normalizedPartsLength)>0:
            scaleFactor = np.mean(normalizedPartsLength)
        else:
            return None

        # Populate None keypoints with 0s
        keypoints[keypoints == None] = 0.0

        # Normalize
        np.divide(keypoints, scaleFactor, out=keypoints[:, 0:2])

        if np.any((keypoints > 1.0) | (keypoints < -1.0)):
            #print("Scaling error")
            return None

        return keypoints.astype('float32')
    else: return None

Model: "CNN_BODY18_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv1d_3 (Conv1D)            (None, 16, 16)            112       
_________________________________________________________________
dropout_8 (Dropout)          (None, 16, 16)            0         
_________________________________________________________________
conv1d_4 (Conv1D)            (None, 14, 32)            1568      
_________________________________________________________________
dropout_9 (Dropout)          (None, 14, 32)            0         
_________________________________________________________________
conv1d_5 (Conv1D)            (None, 12, 32)            3104      
_________________________________________________________________
dropout_10 (Dropout)         (None, 12, 32)            0         
_________________________________________________________________
flatten_2 (Flatten)          (None, 384)              

# Main Processing loop

- Read image
- Infere keypoints
- Infere label
- Send MQTT update
- Draw skeleton on the input image
- Update in output window.

In [8]:
try:
    while True:
        # Get image
        image = fetch_image(url_esp32)
        
        # TRT-Pose inference
        cmap, paf = get_cmap_paf(image)
        counts, objects, peaks = parse_objects(cmap, paf)
        keypoints = get_keypoints(counts, objects, peaks)
        
        # Classification inference
        label_pose = None
        keypoints = preprocess_keypoints(keypoints)
        if type(keypoints) != type(None):
            prediction = classificationModel.predict(x=np.array([keypoints]))
            label_pose = classificationLabels[np.argmax(prediction)]

        # Display image locally
        draw_objects(image, counts, objects, peaks)
        if label_pose:
            image = cv2.putText(image, label_pose, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 
                   .7, (255, 0, 0), 1, cv2.LINE_AA)
        image_w.value = bytes(cv2.imencode('.jpg', image[:, :, :])[1])
        
except KeyboardInterrupt:
    print('Video processing stopped')

Video processing stopped
