### ***Real-time visual recognition of "aircraft ramp hand-signals" applied to UAVs into airport ground operations.***
Author: M.Á. de Frutos Carro
(MAD, JUN-2020)

A cornerstone of this project, the "Convolutional Pose Machines" optimized for use in JetsonNano, builds on the work described in:

- **RUN TensorRT_Pose_Estimation:**
Real-time pose estimation accelerated with NVIDIA TensorRT
    https://github.com/NVIDIA-AI-IOT/trt_pose

Before running this Notebook, make sure you have visited, and followed all the instructions described in that repository.

A key step, and that you should only execute once, is to download the model (For this projecy we will use: *resnet18_baseline_att_224x224_A*) and its optimization. Follow the instructions described in the repository demo Notebook.

In [None]:
import json
import torch
import trt_pose.coco
import cv2
import torchvision.transforms as transforms
import PIL.Image
import numpy as np
from torch2trt import TRTModule
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects
import os
import time
import tensorflow as tf
from tensorflow.python.compiler.tensorrt import trt_convert as trt

with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

WIDTH = 224
HEIGHT = 224

data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

def get_keypoint(humans, hnum, peaks):
    #check invalid human index
    kpoint = []
    human = humans[0][hnum]
    C = human.shape[0]
    for j in range(C):
        k = int(human[j])
        if k >= 0:
            peak = peaks[0][j][k]   
            peak = (float(peak[1]), float(peak[0]))  # x, y
            kpoint.append(peak)
        else:    
            peak = (float(0.0), float(0.0))
            kpoint.append(peak)
    return kpoint

**CAMERA:**

Based on JetCAM: an easy to use Python camera interface for NVIDIA Jetson.
https://github.com/NVIDIA-AI-IOT/jetcam

In [2]:
#Camera

# Full reset of the camera
!echo 'dlinano' | sudo -S systemctl restart nvargus-daemon && printf '\n'
# Check device number
!ls -ltrh /dev/video*

from jetcam.usb_camera import USBCamera
from jetcam.utils import bgr8_to_jpeg
camera = USBCamera(width=WIDTH, height=HEIGHT, capture_fps=30)
camera.running = True

[sudo] password for dlinano: 
crw-rw----+ 1 root video 81, 0 Jun 19 10:25 /dev/video0


**CONTROL PANEL:**

In [3]:
CATEGORIES = ['STOP', 'AHEAD', 'RIGHT', 'LEFT', 'NONE']
DATASETS = ['N1', 'N2', 'N3', 'N4']
MODELS = ['DNN','Random Forest']

#Poses
stop = []
ahead = []
right = []
left = []
none = []

state=0
modelo_type=0

#Widget
import ipywidgets
from IPython.display import display

out = ipywidgets.Output()

###Video
image_w = ipywidgets.Image(format='jpeg',width=300, height=300)


###Dataset
dataset_widget = ipywidgets.Dropdown(options=DATASETS, description='Dataset')
category_widget = ipywidgets.Dropdown(options=CATEGORIES, description='Category')
count_widget = ipywidgets.IntText(description='Count',disabled=True)

def dataset_dropdown_eventhandler(change):
    global state
    update_count(state)


def category_dropdown_eventhandler(change):
    global state
    if (change.new == 'STOP'):
        state=0
    elif (change.new == 'AHEAD'):
        state=1
    elif (change.new == 'RIGHT'):
        state=2
    elif (change.new == 'LEFT'):
        state=3
    elif (change.new == 'NONE'):
        state=4
    update_count(state)
    #with out:
       # print(state)

def update_count(s):
    if (s == 0):
        count_widget.value = len(stop)
    elif (s == 1):
        count_widget.value = len(ahead)
    elif (s == 2):
        count_widget.value = len(right)
    elif (s == 3):
        count_widget.value = len(left)
    elif (s == 4):
        count_widget.value = len(none)
        
dataset_widget.observe(dataset_dropdown_eventhandler, names='value')
category_widget.observe(category_dropdown_eventhandler, names='value')
dataset_opt=ipywidgets.VBox([dataset_widget, category_widget, count_widget])

###Capture/Save
capture_button = ipywidgets.Button(description='CAPTURE')
save_button = ipywidgets.Button(description='SAVE')

def on_capture_button_clicked(_):
    global state
    if(state==0):
        stop.append(key)
    if(state==1):
        ahead.append(key)
    if(state==2):
        right.append(key)
    if(state==3):
        left.append(key)
    if(state==4):
        none.append(key)
    update_count(state)
        
def on_save_button_clicked(_):
    global state
    
    if not os.path.exists(dataset_widget.value):
        os.makedirs(dataset_widget.value)
    
    if(state==0):
        np_stop= np.asarray(stop)
        np.save(dataset_widget.value+'/stop_'+dataset_widget.value+'.npy', np_stop)   
    if(state==1):
        np_ahead= np.asarray(ahead)
        np.save(dataset_widget.value+'/ahead_'+dataset_widget.value+'.npy', np_ahead)   
    if(state==2):
        np_right= np.asarray(right)
        np.save(dataset_widget.value+'/right_'+dataset_widget.value+'.npy', np_right)   
    if(state==3):
        np_left= np.asarray(left)
        np.save(dataset_widget.value+'/left_'+dataset_widget.value+'.npy', np_left)
    if(state==4):
        np_none= np.asarray(none)
        np.save(dataset_widget.value+'/none_'+dataset_widget.value+'.npy', np_none) 
    
    #with out:
        #print(state)
    
capture_button.on_click(on_capture_button_clicked)
save_button.on_click(on_save_button_clicked)

capture_buttons = ipywidgets.HBox([capture_button, save_button])

###Live
pause_button = ipywidgets.Button(description='PAUSE')
live_button = ipywidgets.Button(description='LIVE')
fps_widget = ipywidgets.FloatText(description='FPS:', disabled=True)

def on_pause_button_clicked(_):
    camera.unobserve_all()
     
def on_live_button_clicked(_):
      camera.observe(main, names='value') 
        
pause_button.on_click(on_pause_button_clicked)
live_button.on_click(on_live_button_clicked)
live_buttons = ipywidgets.VBox([fps_widget, ipywidgets.HBox([pause_button, live_button])])


### Score+Predict
run_model_button=ipywidgets.ToggleButton(value=False, description='RUN', icon='check')
pred_widget = ipywidgets.Text(description='OUT:',value='', disabled=True)
model_widget = ipywidgets.Dropdown(options=MODELS, description='Model')

score_widgets = []
for category in CATEGORIES:
    score_widget = ipywidgets.FloatSlider(min=0.0, max=1.0, 
                                          description=category, orientation='vertical')
    score_widgets.append(score_widget)
score_widget = ipywidgets.FloatSlider(min=0.0, max=1.0, description='NO HUMAN', orientation='vertical')
score_widgets.append(score_widget)

def model_dropdown_eventhandler(change):
    global modelo_type
    if (change.new == 'Random Forest'):
        modelo_type=1
    elif (change.new == 'DNN'):
        modelo_type=0

model_widget.observe(model_dropdown_eventhandler, names='value')

def update_modelo(clase, output):
    if(run_model_button.value == False):
        pred_widget.value= "NO PREDICTION"
        for i in range(6):
            score_widgets[i].value = 0.0
    elif(clase==5):
        pred_widget.value= "NO HUMAN"
        for i in range(5):
            score_widgets[i].value = 0.0
        score_widgets[5].value = 1.0
    elif (clase==0):
        pred_widget.value= "STOP"
        score_widgets[0].value = output[0][0]
        score_widgets[1].value = output[0][1]
        score_widgets[2].value = output[0][2]
        score_widgets[3].value = output[0][3]
        score_widgets[4].value = output[0][4]
        score_widgets[5].value = 0.0
    elif (clase==1):
        pred_widget.value= "AHEAD"
        score_widgets[0].value = output[0][0]
        score_widgets[1].value = output[0][1]
        score_widgets[2].value = output[0][2]
        score_widgets[3].value = output[0][3]
        score_widgets[4].value = output[0][4]
        score_widgets[5].value = 0.0
    elif (clase==2):
        pred_widget.value= "RIGHT"
        score_widgets[0].value = output[0][0]
        score_widgets[1].value = output[0][1]
        score_widgets[2].value = output[0][2]
        score_widgets[3].value = output[0][3]
        score_widgets[4].value = output[0][4]
        score_widgets[5].value = 0.0
    elif (clase==3):
        pred_widget.value= "LEFT"
        score_widgets[0].value = output[0][0]
        score_widgets[1].value = output[0][1]
        score_widgets[2].value = output[0][2]
        score_widgets[3].value = output[0][3]
        score_widgets[4].value = output[0][4]
        score_widgets[5].value = 0.0
    elif (clase==4):
        pred_widget.value= "NONE"
        score_widgets[0].value = output[0][0]
        score_widgets[1].value = output[0][1]
        score_widgets[2].value = output[0][2]
        score_widgets[3].value = output[0][3]
        score_widgets[4].value = output[0][4]
        score_widgets[5].value = 0.0
        
score=ipywidgets.VBox([ipywidgets.HBox(score_widgets),pred_widget,ipywidgets.HBox([model_widget,run_model_button])])        

###ALL
all_widget = ipywidgets.HBox([ipywidgets.VBox([ipywidgets.HBox([image_w]),live_buttons,
                              dataset_opt,capture_buttons]), ipywidgets.VBox([score])])

**PREDICTION MODEL:**

In [4]:
#Upload Optimized Tensor RT Model
loaded = tf.saved_model.load('models/model_n1n2n3_v10')  # loading the converted model
infer = loaded.signatures["serving_default"]

# In case KERAS Model (Too Low)
#model = tf.keras.models.load_model('models/MAF_ariba_abajo.h5')

#Upload Random Forest Model
import pickle
import warnings
warnings.filterwarnings('ignore')
model_rf = pickle.load(open('models/rand_forest_v2.sav', 'rb'))

**LIVE EXECUTION:**

In [5]:
def main(change):
    t=time.time()
    image = change['new']
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    draw_objects(image, counts, objects, peaks)
    image_w.value = bgr8_to_jpeg(image[:, ::-1, :])
    global model_out
    global output
    global o_clase
    global ellapsed
    global modelo_type
    
    if len(range(counts[0]))>0:    #Check if person detected
        #for i in range(counts[0]): #Get Keypoints
        global key
        key = get_keypoint(objects, 0, peaks)
        
        if (run_model_button.value) and (time.time()- ellapsed>0.25):
            inkey=np.array(key,dtype=np.float32)
            inkey=inkey.reshape(1,36)
            if(modelo_type == 1):
                output=model_rf.predict(inkey) #For RF Model
            else:
                output= infer(tf.constant(inkey,dtype=float))['LastLayer'] #TRT Model
                
            clase=np.argmax(output)
            ellapsed=time.time()
            o_clase=clase
        else:
            clase=o_clase
    else:
        clase=5 #No-Human
        
    fps=(1.0)/(time.time()-t)
    fps_widget.value = round(fps,2)
    update_modelo(clase,output)
    
#main({'new': camera.value})

In [7]:
#Init
ellapsed=time.time()
output=np.array([0,0,0,0,0], ndmin=2)
o_clase=5
update_count(0)

# Execute
camera.observe(main, names='value')
display(all_widget)

HBox(children=(VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'),)), VB…