In [1]:
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import bgr8_to_jpeg

img_widget = widgets.Image(format='jpeg', width=300, height=300)
ID_widget = widgets.IntText(value=0, description='ID to track:')

width = int(img_widget.width)
height = int(img_widget.height)

Error loading module `ublox_gps`: No module named 'serial'


In [2]:
from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np
from trackable_object import TrackableObject


class PersonTracker:

    def __init__(self, max_disappeared=50, max_distance=80):

        self.max_disappeared = max_disappeared
        self.max_distance = max_distance

        self.nextID = 0
        self.to_dict = OrderedDict()

    def _register(self, person_center):
        # assign new person center
        self.to_dict[self.nextID] = TrackableObject(*person_center, self.nextID)
        # update ID
        self.nextID += 1

    def _deregister(self, ID):
        # delete person from register
        del self.to_dict[ID]

    def update(self, boxes):
        # if no boxes are present
        if len(boxes) == 0:
            # loop over the keys and mark persons as disappeared
            for to in self.to_dict.values():
                to.disappeared_count += 1
                # if person has disappeared more than the threshold is, delete them
                if to.disappeared_count >= self.max_disappeared:
                    self._deregister(to.ID)

            return self.to_dict

        # if new boxes present initialize new_centers
        new_centers = np.zeros((len(boxes), 2))
        center_box_dict = {}
        # calculates the centre and assigns in to the new_persons_center variable
        for i, box in enumerate(boxes):
            centroid = _calculate_center(box)
            new_centers[i] = centroid
            center_box_dict[tuple(centroid)] = box

        # if we haven't registered any new persons yet, register the new persons centers
        if len(self.to_dict) == 0:
            for i in range(0, len(new_centers)):
                self._register(new_centers[i])
        # else calculate the distances between points and assign new coordinates to persons centers
        else:
            # grab the set of object IDs and corresponding centroids
            IDs = [to.ID for to in self.to_dict.values()]
            predicted_centroids = [to.predict() for to in self.to_dict.values()]

            D = dist.cdist(np.array(predicted_centroids), new_centers)

            rows = D.min(axis=1).argsort()

            cols = D.argmin(axis=1)[rows]

            used_rows = set()
            used_cols = set()

            # loop over the combination of the (row, column) index tuples
            for (row, col) in zip(rows, cols):

                if row in used_rows or col in used_cols:
                    continue

                if D[row, col] > self.max_distance:
                    continue

                ID = IDs[row]
                to = self.to_dict[ID]
                new_center = new_centers[col]

                # assign box
                to.box = center_box_dict[tuple(new_center)]
                to.centroid = to.apply_kf(new_center)
                to.disappeared_count = 0

                used_rows.add(row)
                used_cols.add(col)

            unused_rows = set(range(0, D.shape[0])).difference(used_rows)
            unused_cols = set(range(0, D.shape[1])).difference(used_cols)

            if D.shape[0] >= D.shape[1]:
                # loop over the unused row indexes
                for row in unused_rows:
                    ID = IDs[row]
                    to = self.to_dict[ID]
                    to.disappeared_count += 1

                    if to.disappeared_count > self.max_disappeared:
                        self._deregister(ID)
            else:
                for col in unused_cols:
                    self._register(new_centers[col])

        return self.to_dict

def _calculate_center(bbox):
        center_x = int((bbox[0] + bbox[2]) / 2.0 - 0.5)
        center_y = int((bbox[1] + bbox[3]) / 2.0 - 0.5)
        return (center_x, center_y)

In [None]:
import cv2 as cv
from jetbot import ObjectDetector


class Follower:

    def __init__(self):
        self.model = ObjectDetector('ssd_mobilenet_v2_coco.engine')
        self.results = None
        self.trackable_objects = None
        self.tracked_to = None
        self.pt = PersonTracker()

    def _predict(self, img):
        self.detections = self.model(img)

    def _post_process(self):
        boxes = []
        for detection in self.detections[0]:
            if detection['label'] == 1:
                boxes.append(detection['bbox'])
            
        return boxes

    def track(self, img):

        self._predict(img)

        boxes = self._post_process()

        for box in boxes:
            Follower._draw_boxes(img, *box, color=(61, 254, 96))

        self.trackable_objects = self.pt.update(boxes)
        self.tracked_to = self.trackable_objects.get(ID_widget.value, None)
        
        if len(self.trackable_objects) != 0:
            for to in self.trackable_objects.values():
                if to.ID == ID_widget.value:
                    img = Follower._draw_id(img, to.ID, to.centroid, (18, 13, 212))
                elif to.disappeared_count > 0:
                    img = Follower._draw_id(img, to.ID, to.predicted_centroid, (253, 63, 28))
                else:
                    img = Follower._draw_id(img, to.ID, to.centroid, (61, 254, 96))

        return img

    @staticmethod
    def _draw_id(image, objectID, centroid, color):
        text = "ID {}".format(objectID)
        # cv.putText(image, text, (int(centroid[0]) - 10, int(centroid[1]) - 10), cv.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        #cv.circle(image, (int(centroid[0]), int(centroid[1])), 4, color, -1)
        return image

    @staticmethod
    def _draw_boxes(image, x_min, y_min, x_max, y_max, color):
        cv.rectangle(image, (int(width * x_min), int(height * y_min)), (int(width * x_max), int(height * y_max)), color, 2)
        return image

    @staticmethod
    def _center(bbox):
        center_x = int((bbox[0] + bbox[2]) / 2.0 - 0.5)
        center_y = int((bbox[1] + bbox[3]) / 2.0 - 0.5)
        return (center_x, center_y)
    
follower = Follower()

In [None]:
from jetcam.csi_camera import CSICamera

camera = CSICamera(width=300, height=300)

In [None]:
camera.running = True

def execute(change):
    img = change['new']
    img = follower.track(img)
    img_widget.value = bgr8_to_jpeg(img)
    
camera.observe(execute, names='value')

In [11]:
display(widgets.VBox([img_widget, ID_widget]))

VBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

In [7]:
camera.unobserve(execute, names='value')