
在这个 notebook 中，我们会展示如何使用 JetBot 跟踪对象。我们将使用在 [COCO dataset](http://cocodataset.org) 网站上预先训练好的神经网络，以检测90种不同的常见对象。例如包括了

* 人 (index 0)
* 杯子 (index 47)
 
和许多其他的常见对象。你可以点击 [这里查看](https://github.com/tensorflow/models/blob/master/research/object_detection/data/mscoco_complete_label_map.pbtxt) 所有的对象。

该模型来自 [TensorFlow 对象检测 API](https://github.com/tensorflow/models/tree/master/research/object_detection)，它还提供自定义训练任务，当训练完成，我们就能使用 Jetson Nano 上的 NVIDIA TensorRT 对其进行优化。

这样做使得这个神经网络非常快，能在 Jetson 上实时执行。但是，这 notebook 并不包含所说的训练与优化步骤。



In [1]:
from IPython.display import display
import ipywidgets.widgets as widgets

detectWidth, detectHeight = 300, 300
poseWidth, poseHeight = 224, 224

colorNormal, colorTarget = (255, 0, 0), (0, 255, 0)
thickNormal, thickTarget = 2, 5

# layout = widgets.Layout(width='auto')#, height='40px')
image_widget = widgets.Image(format='jpeg', width=detectWidth, height=detectHeight)
target_label = widgets.Label(value='tracked target:')
target_text = widgets.Text(value='Nothing')
object_list = widgets.Label(value='')
tolerance_widget = widgets.FloatSlider(value=0.2, min=0.0, max=0.5, description='Deviation tolerance')
speed_widget = widgets.FloatSlider(value=0.4, min=0.0, max=1.0, description='speed')
turn_gain_widget = widgets.FloatSlider(value=0.4, min=0.0, max=1.0, description='turn gain')
error_widget = widgets.Label(value='')
start_button = widgets.Button(description = "Start")
capture_button = widgets.Button(description = "Capture Now")
restart_button = widgets.Button(description = "Stop tracking")
calibration_button = widgets.Button(description = "Do calibration")
calib_load_button = widgets.Button(description = "Load calibration")
calib_abandon_button = widgets.Button(description = "Abandon calibration")

下面初始化相机。我需要300x300的图像像素作为输入。所以，我们需要设置我们的摄像头。
 
> 在内部，Jetson Nano的图像信号处理是使用Carmera类的GStreamer实现的。这是超级快速的方式，大大地减少了CPU的计算量

In [2]:
from jetbot import Camera
camera = Camera.instance(width=detectWidth, height=detectHeight)

In [3]:
import numpy as np
import cv2
from jetbot import bgr8_to_jpeg

mtx_cal, dist_cal = None, None
last_init_img = None

def calibration_init(pic=None):
    global mtx_cal, dist_cal, last_init_img
    if pic is None:
        pic = camera.value
        last_init_img = np.copy(pic)
#     patternSize = (19, 12)
    patternSize = (4, 4)
    patternWasFound, corners = cv2.findChessboardCorners(pic, patternSize)
    # prepare object points, like (0,0,0), (1,0,0), (2,0,0), ...(6,5,0)
    objp = np.zeros((patternSize[0] * patternSize[1], 3), np.float32)
    objp[:, :2] = np.mgrid[0:patternSize[0], 0:patternSize[1]].T.reshape(-1, 2)

    # Arrays to store object points and image points from all the images.
    objpoints = [objp]  # 3d points in real world space
    imgpoints = [corners]  # 2d pionts in image plane.
    imgSize = (pic.shape[1], pic.shape[0])
    rst, mtx_cal, dist_cal, rvecs, tvecs = cv2.calibrateCamera(objpoints, imgpoints, imgSize, None, None)
    
def calibration_do(change):
    change['new'] = cv2.undistort(change['new'], mtx_cal, dist_cal)

In [4]:
# if last_init_img is not None:
#     cv2.imwrite('best_calib_img.jpg', last_init_img)

如果摄像头的视野中有任何COCO对象时，它们会存储在``detections``变量中。


初始化JetBot，这样我们就可以控制JetBot的电机了。

In [5]:
from jetbot import Robot

robot = Robot()

First, let's load the JSON file which describes the human pose task.  This is in COCO format, it is the category descriptor pulled from the annotations file.  We modify the COCO category slightly, to add a neck keypoint.  We will use this task description JSON to create a topology tensor, which is an intermediate data structure that describes the part linkages, as well as which channels in the part affinity field each linkage corresponds to.

In [6]:
import json
import trt_pose.coco
# import trt_pose

# print(trt_pose.__path__)
with open('human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)
# tag: left:[7, 9] right:[8, 10]
# tag in topology [6, 8] [7, 9]
# tag index 6. 7

Next, we'll load our model.  Each model takes at least two parameters, *cmap_channels* and *paf_channels* corresponding to the number of heatmap channels
and part affinity field channels.  The number of part affinity field channels is 2x the number of links, because each link has a channel corresponding to the
x and y direction of the vector field for each link.

In [7]:
import trt_pose.models

num_parts = len(human_pose['keypoints'])
num_links = len(human_pose['skeleton'])

model = trt_pose.models.resnet18_baseline_att(num_parts, 2 * num_links).cuda().eval()

We could then load the saved model using *torch2trt* as follows.

In [8]:
import torch
from torch2trt import TRTModule

OPTIMIZED_MODEL = 'resnet18_baseline_att_224x224_A_epoch_249_trt.pth'
model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

<All keys matched successfully>

In [9]:
class DrawObjects(object):
    
    def __init__(self, topology):
        self.topology = topology
        
    def __call__(self, image, object_counts, objects, normalized_peaks):
        topology = self.topology
        height = image.shape[0]
        width = image.shape[1]
        
        count = int(object_counts[0])
        K = topology.shape[0]
        
        rst = []
        
        for i in range(count):
            obj = objects[0][i]
            C = obj.shape[0]
            for j in range(C):
                k = int(obj[j])
                if k >= 0:
                    peak = normalized_peaks[0][j][k]
                    x = round(float(peak[1]) * width)
                    y = round(float(peak[0]) * height)
                    if j == 8 or j == 10:
                        cv2.circle(image, (x, y), 3, colorTarget, thickTarget)
                    else:
                        cv2.circle(image, (x, y), 3, colorNormal, thickNormal)

            for k in range(K):
                c_a = topology[k][2]
                c_b = topology[k][3]
                if obj[c_a] >= 0 and obj[c_b] >= 0:
                    peak0 = normalized_peaks[0][c_a][obj[c_a]]
                    peak1 = normalized_peaks[0][c_b][obj[c_b]]
                    x0 = round(float(peak0[1]) * width)
                    y0 = round(float(peak0[0]) * height)
                    x1 = round(float(peak1[1]) * width)
                    y1 = round(float(peak1[0]) * height)
                    if k == 8:
                        cv2.line(image, (x0, y0), (x1, y1), colorTarget, thickTarget)
                        rst.append((peak0[1], peak0[0], peak1[1], peak1[0]))
                    else:
                        cv2.line(image, (x0, y0), (x1, y1), colorNormal, thickNormal)
        return rst

Next, we'll define two callable classes that will be used to parse the objects from the neural network, as well as draw the parsed objects on an image.

In [10]:
# from trt_pose.draw_objects import DrawObjects    # 因为要修改，所以复制到上面了
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

Finally, we'll define the main execution loop.  This will perform the following steps

1.  Preprocess the camera image
2.  Execute the neural network
3.  Parse the objects from the neural network output
4.  Draw the objects onto the camera image
5.  Convert the image to JPEG format and stream to the display widget

In [11]:
def multiPersonWarning():
    global modified
    modified = True
    error_widget.value += 'Warning: More than one person found in the image.\n'

# counts, objects, peaks = None
def executePose(change):
#     global counts, objects, peaks
    image = change['new']
    if 'output' not in change:
        change['output'] = np.copy(change['new'])
    output = change['output']
    
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    arms = draw_objects(output, counts, objects, peaks)
    if len(arms) > 1:
        multiPersonWarning()
    change['lines'] = arms
#     print(objects)
#     image_widget.value = bgr8_to_jpeg(image[:, ::-1, :])
    
#     print("(%.2f, %.2f), (%.2f, %.2f)" % (x0, y0, x1, y1))
# executePose({'new': camera.value})

### 计算单个摄像头图像的信号

我们要导入``ObjectDetector``类，它采用我们预先训练过的SSD引擎。

In [12]:
from jetbot import ObjectDetector

detectModel = ObjectDetector('ssd_mobilenet_v2_coco.engine')

with open('detection_target.json', 'r') as f:
    detection_targets = [s if not s.isnumeric() else 'unknown' for s in json.load(f)['name']]
print(detection_targets)

['background', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'unknown', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'unknown', 'backpack', 'umbrella', 'unknown', 'unknown', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'unknown', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'unknown', 'dining table', 'unknown', 'unknown', 'toilet', 'unknown', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'unknown', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush']


在内部，``ObjectDetector``类使用 TensorRT Python API执行我们提供的引擎。
  
它还负责预处理神经网络的输入，以及分析检测到的对象。

现在这只适用于``jetbot.ssd_tensorrt``package创建的引擎。该软件包具有转换阻最佳的TensorFlow对象模型给TensorRT引擎


现在，让我们使用一些摄像头输入执行我们的神经网络。默认情况下``ObjectDetector`` 类生成 ``bgr8``格式。然而，如果你想使用不同的格式作为输入，则你可以覆盖默认的预处理功能。 


In [13]:
# detections = detectModel(camera.value)
# print(detections)

### 显示检测对象的名称

我们将使用下面的代码打印出检测到的对象。

In [14]:

# detections_widget = widgets.Textarea()

# detections_widget.value = str(detections)

# display(detections_widget)

你应该看到每个图像中检测到的每个标签，相似度和边界框。但在这个例子只有一个图像。


要打印第一个图像中检测到的第一个对象，我们可以调用一下内容

> 如果未检测到任何对象，则可能会抛出错误

In [15]:
# from jetbot import bgr8_to_jpeg
# import traitlets
# import ipywidgets.widgets as widgets
# image = widgets.Image(format='jpeg', width=300, height=300)

# display(image)
# camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

### 控制Jetbot去跟踪中心目标

现在我们希望我们的JetBot能跟随指定的对象。 为此，我们将执行以下操作

1. 检测与指定匹配的对象

2. 选择最接近摄像机视野中心的物体，就是 "目标" 对象

3. 将机器人转向目标物体，否则徘徊

4. 如果我们被障碍物阻挡，则左转  


In [16]:
import torchvision
import torch.nn.functional as F

# collision_model = torchvision.models.alexnet(pretrained=False)
# collision_model.classifier[6] = torch.nn.Linear(collision_model.classifier[6].in_features, 2)
# collision_model.load_state_dict(torch.load('best_model.pth'))
device = torch.device('cuda')
# collision_model = collision_model.to(device)

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])
normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.resize(x, (poseWidth, poseHeight))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

最后，让我们显示所有的控件在网络上执行代码更新摄像头。

In [17]:
# bbox = (left, bottom, right, top) \in [0, 1]

def drawRect(image, bbox, color, thickness):
    cv2.rectangle(
        image,
        (int(detectWidth * bbox[0]), int(detectHeight * bbox[1])), 
        (int(detectWidth * bbox[2]), int(detectHeight * bbox[3])),
        color,
        thickness
    )

def multiTargetWarning():
    global modified
    modified = True
    error_widget.value += 'Warning: More than one target found in the image.\n'
    
def executeDetect(change):
    image = change['new']
    if 'output' not in change:
        change['output'] = np.copy(change['new'])
    output = change['output']
    
    # 计算所有被检测到的目标，不包含人
    detections = [det for det in detectModel(image)[0] if det['label'] != 1]
    # why [0] ? might be batch index, where batch size is 1
    
    # 将所有目标绘制在图像上
    for det in detections:
        drawRect(output, det['bbox'], colorNormal, thickNormal)
        
        
    if 'target' in change:    # True only in status 2
        matching_detections = [d for d in detections if d['label'] == change['target']]
        for match in matching_detections:
            drawRect(output, match['bbox'], colorTarget, thickTarget)
        if len(matching_detections) > 1:
            multiTargetWarning()
    change['detections'] = detections
#     robot.forward(float(speed_widget.value))

In [18]:
from numpy import dot
from numpy.linalg import norm

def detection_center(detection):
    """计算目标的xy中心坐标"""
    bbox = detection['bbox']
#     print("bbox:", bbox)
    center_x = (bbox[0] + bbox[2]) / 2.0
    center_y = (bbox[1] + bbox[3]) / 2.0
#     print("cx, cy:", center_x, center_y)
    return center_x, center_y

def cosineSimilarity(a, b):
    return dot(a, b) / (norm(a) * norm(b))

def executeFind(change):
    if len(change['lines']) == 0 or len(change['detections']) == 0:
        return
    
    image = change['new']
    if 'output' not in change:
        change['output'] = np.copy(change['new'])
    output = change['output']
    detections = change['detections']
    
#     x0, y0, x1, y1 = 200, 0, 200, 10
    x0, y0, x1, y1 = change['lines'][0]
    p0 = np.array([x0, y0])
    p1 = np.array([x1, y1])
    
#     print('--------')
#     print('p0', p0)
#     print('p1', p1)
    
    bestApproch = None
    bestSimilarity = float('-inf')
    for det in detections:
        p = np.array(detection_center(det))
#         print('centerxy:', p)
        similarity =  cosineSimilarity(p1 - p0, p - p0)
        if similarity > bestSimilarity:
            bestApproch = det
            bestSimilarity = similarity
#     print(bestSimilarity)
    if bestSimilarity > 0.8:
        change['target'] = bestApproch['label']
        drawRect(output, bestApproch['bbox'], colorTarget, thickTarget)
#     if bestApproch is None:
#         print(detections)
#     else:
#         cv2.imwrite('image.jpg', output)
#         shutdown()

In [19]:
def detection_size(detection):
    # 计算边界框的面积
    bbox = detection['bbox']
    return (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])

def robotForward(speed):
    if 0 < speed <= 0.2:
        robot.forward(0.2)
        time.sleep(0.01)
    if -0.2 <= speed < 0:
        robot.forward(-0.2)
        time.sleep(0.01)
    robot.forward(speed)
    
def robotRight(speed):
    if 0 < speed <= 0.2:
        robot.right(0.2)
        time.sleep(0.01)
    if -0.2 <= speed < 0:
        robot.right(-0.2)
        time.sleep(0.01)
    robot.right(speed)
    
def executeFollow(change):
    # 执行碰撞模型以确定是否被阻止
#     collision_output = collision_model(preprocess(change['new'])).detach().cpu()
#     prob_blocked = float(F.softmax(collision_output.flatten(), dim=0)[0])
#     change['blocked'] = prob_blocked
#     if prob_blocked > 0.5:
#         pass
    
    targets = [det for det in change['detections'] if det['label'] == change['target']]
    if len(targets) == 0:
#         print('no target found')
        robot.stop()
        return
#         shutdown()
    center_x, center_y = detection_center(targets[0])    # ignore all other targets except the first one
    center_x, center_y = center_x - 0.5, center_y - 0.5  # move origin to the center of image
    size = detection_size(targets[0])
#     robot.set_motors(
#         0.5 * float(speed_widget.value + turn_gain_widget.value * center_x),
#         0.5 * float(speed_widget.value - turn_gain_widget.value * center_x)
#     )
    if abs(center_x) > tolerance_widget.value:
        robotRight(turn_gain_widget.value * center_x / 0.5)
    else:
        robotForward(speed_widget.value * (0.5 - size) / 0.5)
        

In [20]:
import traceback

def targetNotFoundError():
    global modified
    modified = True
    error_widget.value += 'ERROR: No target object found, please click the button when appropriate.\n'


def clearAllWarnings():
    error_widget.value = ''

status = -1
target = None    # target label
modified = False
doCalib = False

def execute(change):
    try:
        modified = False
        global status, target
    #     print('output' in change)    # always False
        if doCalib:
            calibration_do(change)
        if status == -1:
            image_widget.value = bgr8_to_jpeg(change['new'])
            return
        if status == 0 or status == 1:
            executePose(change)
            executeDetect(change)
            executeFind(change)
            if 'target' in change:
                target_text.value = detection_targets[change['target']]
            else:
                target_text.value = 'Nothing'
            if status == 1:
                if 'target' in change:
                    target = change['target']
                    status = 2
                else:
                    targetNotFoundError()
                    status = 0
        elif status == 2:
            change['target'] = target
            executeDetect(change)
            executeFollow(change)
        if not modified:
            clearAllWarnings()
        image_widget.value = bgr8_to_jpeg(change['output'])
        object_list.value = ' '.join([detection_targets[det['label']] for det in change['detections']])
    
    except:
        traceback.print_exc()
        shutdown()

`camera.observe()` 在另一个线程里监视摄像头并调用 `execute()` ，所以摄像头停止监视后，要等待一小段时间，等待 `execute()` 退出，再最终令电机停止。

In [21]:
import time

def shutdown():
    camera.unobserve_all()
    time.sleep(0.05)
    robot.stop()

In [22]:
def calib_click(sender):
    global doCalib
    calibration_init()
    doCalib = True
    
def calib_load_click(sender):
    global doCalib
    calibration_init(cv2.imread('best_calib_img.jpg'))
    doCalib = True
    
def abdcalib_click(sender):
    global doCalib
    doCalib = False
    mtx_cal, dist_cal = None, None
    
def start_click(sender):
    global status
    status = 0
    
def cap_click(sender):
    global status
    status = 1
    
def restart_click(sender):
    global status
    status = 0
    robot.stop()
    

calibration_button.on_click(calib_click)
calib_load_button.on_click(calib_load_click)
calib_abandon_button.on_click(abdcalib_click)
start_button.on_click(start_click)
capture_button.on_click(cap_click)
restart_button.on_click(restart_click)

In [23]:
display(widgets.VBox([
    widgets.HBox([
        image_widget, 
        widgets.VBox([
            widgets.HBox([calib_load_button, calibration_button, calib_abandon_button]),
            speed_widget,
            turn_gain_widget,
            tolerance_widget,
            widgets.HBox([start_button, capture_button, restart_button]),
            error_widget,
        ]),
    ]),
    widgets.HBox([target_label, target_text]),
    object_list,
]))

VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'), VBox(children=(HBox(…

调用下面的代码，将执行功能连接到每帧图像更新。

In [24]:
camera.unobserve_all()
camera.observe(execute, names='value')

In [25]:
# shutdown()

In [26]:
# robot.forward(0.2)
# time.sleep(0.01)
# robot.forward(0.0)

In [29]:
time.sleep(0.2)
robot.stop()