In [1]:
import numpy as np
from typing import List, Tuple, Dict
from ultralytics import YOLO
from dataclasses import dataclass

from deep_sort.application_util import preprocessing
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
from deep_sort.tools import generate_detections as gdet


# own
from utils.utils import (get_frames, SEQ_01, SEQ_02, SEQ_03)
from utils.deepsort_utils import LABELS_DICT, UNKNOWN_DEFAULT, resize_masks

Instructions for updating:
non-resource variables are not supported in the long term
Root directory is /Users/ellemcfarlane/Documents/dtu/Perception_AF/Pfas-finalProject


In [2]:
deepsort_model_ = './networks/mars-small128.pb' # TODO missing
yolo_model_ = '../../models/yolov8s-seg.pt' # TODO missing

encoder = gdet.create_box_encoder(deepsort_model_, batch_size=1)
_metric = nn_matching.NearestNeighborDistanceMetric("cosine", 0.4, None)
tracker = Tracker(_metric, n_init=0)
detector = YOLO(yolo_model_)

Metal device set to: Apple M2

systemMemory: 8.00 GB
maxCacheSize: 2.67 GB



In [3]:
@dataclass
class DeepSortObject():
    id: int
    label: str
    confidence: float
    xyxy: List[float]
    mask: np.ndarray

    @property
    def tlwh(self):
        """
        Top left corner, width, height representation of bounding box
        """
        top_left_x, top_left_y = self.xyxy[0], self.xyxy[1]
        width = self.xyxy[2] - self.xyxy[0]
        height = self.xyxy[3] - self.xyxy[1]
        return [top_left_x, top_left_y, width, height]

In [18]:
frames = [(_l, _r) for _l, _r in [get_frames(frame_num_=i, seq_dir_=SEQ_01) for i in range(2)]]
detection_results = []

for _i_frame, (_frame_l, _frame_r) in enumerate(frames):
    # 1. Detect objects with YOLO.
    # Output: List of objects (one per detection in frame)
    _detections_i = detector(_frame_l, classes=[0, 1, 2])
    # _masks_i = resize_masks(masks=_detections_i.masks.data.numpy(),orig_shape=_detections_i.masks.orig_shape)

    # boxes = _detections_i[0].boxes
    if not _detections_i: print(f"no detections in frame {_i_frame}");continue

    # detection_results.append(_detections_i) # QUESTION: Need to keep track of ALL detecitons?

    dsobjects_i = []
    #cls_val = int(box.cls[0])
    for _det, _maks in zip(_detections_i.boxes, _masks_i): 
        cls = int(_det.cls[0])
        # NOTE: I'm override id to match "class_id" aka 0,1,etc for label
        # another note: we can't create the DeepSortObject ahead of time like this
        # because later there is no way to match deepsort's track object to these (because for the bounding boxes are slightly diff
        # from YOLO's and other tiny details)
        # so, instead, we create them dynamically in the deepsort loop like I do below
        _dso = DeepSortObject(
            id=int(cls), # This changes later with deepsort
            label=LABELS_DICT.get(int(cls), UNKNOWN_DEFAULT),
            confidence=float(_det.conf[0]),
            xyxy = _det.xyxy.tolist()[0],
            mask = None
            # mask =_maks.astype(bool)
        )
        
        dsobjects_i.append(_dso)

    # 2. Pass detections to deepsort

    # 2.1 Extract features from detections.
    all_bboxes_i = [_dso.xyxy for _dso in dsobjects_i]
    features = encoder(_frame_l, all_bboxes_i)

    # 2.2 make deepsort detections  from features and objects to feed the tracker
    _ds_detections=[]
    for _dso, _feat in zip(dsobjects_i, features):
        _ds_detections.append(Detection(
            tlwh=_dso.tlwh,
            feature=_feat,
            segmentation=_dso.mask,
            class_id=int(_dso.id),
            confidence=_dso.confidence))

    # 2.3 predict tracks
    tracker.predict()
    tracker.update(_ds_detections)

    # 4. Process tracks
    # frame_results = []
    # ds_objects is what we should return
    ds_objects = {}
    occluded_threshold = 2
    # process/save track info like bbox, class, etc
    for track in tracker.tracks:
        # track can be tentative (recently created, needs more evidence aka associations in n_init+1 frames),
        # confirmed (associated for n_init+1 or more frames), or deleted (no longer tracked)
        # a new object is classified as tentative in the first n_init frames
        # https://github.com/nwojke/deep_sort/issues/48
        if track.is_confirmed():
            # change track bbox to top left, bottom right coordinates.
            bbox = list(track.to_tlbr())
            # if occluded (aka not detected in past X frames), use previous frame's class for given id
            if track.time_since_update > occluded_threshold:
                # TODO: we could use this to fill in the "occluded" state instead
                conf = "occluded"
            else:
                conf = track.get_confidence()
            cls = track.get_class()
            mask = track.get_segmentation()
            # format matches labels.txt
            # but we set UNKNOWN_DEFAULT for all values deepsort is not responsible for
            # TODO: remove this data since we only care about deepsort objects?
            # I only had it here to compare to groundtruth
            # data = {
            #     "frame": _i_frame,
            #     "track_id": track.track_id,
            #     "type": cls,
            #     "truncated": UNKNOWN_DEFAULT,
            #     "occluded": UNKNOWN_DEFAULT,
            #     "alpha": UNKNOWN_DEFAULT,
            #     "bbox_left": int(bbox[0]),
            #     "bbox_top": int(bbox[1]),
            #     "bbox_right": int(bbox[2]),
            #     "bbox_bottom": int(bbox[3]),
            #     "height": UNKNOWN_DEFAULT,
            #     "width": UNKNOWN_DEFAULT,
            #     "length": UNKNOWN_DEFAULT,
            #     "x": UNKNOWN_DEFAULT,
            #     "y": UNKNOWN_DEFAULT,
            #     "z": UNKNOWN_DEFAULT,
            #     "yaw": UNKNOWN_DEFAULT,
            #     "score": conf,
            # }
            # frame_results.append(data)
            ds_objects[track.track_id] = DeepSortObject(
                id=track.track_id, # This changes later with deepsort
                label=LABELS_DICT.get(int(cls), UNKNOWN_DEFAULT),
                confidence=float(_det.conf[0]),
                xyxy=_det.xyxy.tolist()[0],
                mask=_maks.astype(bool)
            )
            # print(f"id: {track.track_id}, frame: {frame_idx}, cls: {cls}, box: {bbox}")


0: 224x640 4 persons, 4 bicycles, 124.9ms
Speed: 0.5ms preprocess, 124.9ms inference, 2.2ms postprocess per image at shape (1, 3, 640, 640)

0: 224x640 4 persons, 4 bicycles, 251.8ms
Speed: 2.4ms preprocess, 251.8ms inference, 4.0ms postprocess per image at shape (1, 3, 640, 640)


In [17]:
for obj in ds_objects:
    print(ds_objects[obj].label)
    print()

Pedestrian

Pedestrian

Pedestrian

Cyclist

Pedestrian

Cyclist

Cyclist

Cyclist



so nice, this is what we expect: 4 pedestrians, 4 "cyclists" aka bicycles but I overwrote the label lol

# End goal
function that takes in a pair of left and right frame and spits out a dictionary of `DeepSortObject`s for the objects recognized in the frame_t, with their id's as keys.

In [None]:
def identify_objects(frames:Tuple(np.ndarray)) -> Dict[int, DeepSortObject]:
    ...

In [None]:
print(dsobjects_i[0])

In [None]:
dets_i = detection_results[0]

# _det_0 = _dets_i[0]

# print(f"confidences: {_det_0.conf[0]}")
# print(f"classes: {LABELS_DICT.get(int(_det_0.cls[0]))} ({_det_0.cls[0]})")



In [None]:
_dets_i = detection_results[0]

objects_i = []
for _det in _dets_i:
    _dso = DeepSortObject()
    _dso.id = int(_det.cls[0])
    _dso.label = LABELS_DICT.get(int(_dso.id), UNKNOWN_DEFAULT)
    _dso.confidence = float(_det.conf[0])
    _dso.xyxy = _det.xyxy.tolist()[0]
    _dso.mask = None
    
    objects_i.append(_dso)