In [8]:
import os
import cv2
import numpy as np 
from ultralytics import YOLO
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
from deep_sort.tools import generate_detections as gdet

In [9]:
class YOLOv8:
    def __init__(self, path):
        self.model = YOLO(path)
    
    def detect(self, source_img):
        results = self.model.predict(source_img, verbose=False)[0]
        bboxes = results.boxes.xywh.cpu().numpy()
        bboxes[:, :2] = bboxes[:, :2] - (bboxes[:, 2:] / 2)
        scores = results.boxes.conf.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy()

        return bboxes, scores, class_ids

In [10]:
class DeepSort():
    def __init__(
        self,
        model_path="networks/mars-small128.pb",
        max_consine_distance=0.7,
        nn_budget=None,
        classes=['object']
    ):
        self.encoder = gdet.create_box_encoder(model_path, batch_size=1)
        self.metric = nn_matching.NearestNeighborDistanceMetric('cosine', max_consine_distance, nn_budget)
        self.tracker = Tracker(self.metric)

        key_lists = []
        val_lists = []
        for ID, class_name in enumerate(classes):
            key_lists.append(ID)
            val_lists.append(class_name)
        self.key_lists = key_lists
        self.val_lists = val_lists

    def tracking(self, origin_frame, bboxes, scores, class_ids):
        features = self.encoder(origin_frame, bboxes)

        detections = [Detection(bbox, score, class_id, feature)
                      for bbox, score, class_id, feature in zip(bboxes, scores, class_ids, features)]
        
        self.tracker.predict()
        self.tracker.update(detections)

        tracked_bboxes = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 5:
                continue
            bbox = track.to_tlbr()
            class_id = track.get_class()
            conf_score = track.get_conf_score()
            tracking_id = track.track_id
            tracked_bboxes.append(bbox.tolist() + [class_id, conf_score, tracking_id])
        
        tracked_bboxes = np.array(tracked_bboxes)

        return tracked_bboxes

In [11]:
def draw_detection(img, bboxes, scores, class_ids, ids, classes=['objects'], mask_alpha=0.3):
    height, width = img.shape[:2]
    np.random.seed(0)
    rng = np.random.default_rng(3)
    colors = rng.uniform(0, 255, size=(len(classes), 3))

    mask_img = img.copy()
    det_img = img.copy()

    size = min([height, width]) * 0.0006
    text_thickness = int(min([height, width]) * 0.001)

    for bbox, score, class_id, id_ in zip(bboxes, scores, class_ids, ids):
        color = colors[class_id]
        x1,y1,x2,y2 = bbox.astype(int)

        cv2.rectangle(det_img, (x1,y1), (x2,y2), color, 2)
        cv2.rectangle(mask_img, (x1,y1), (x2,y2), color, -1)

        label = classes[class_id]
        caption = f"{label} {int(score * 100)}% ID:{id_}"
        (tw,th), _ = cv2.getTextSize(text=caption, fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=size, thickness=text_thickness)

        th = int(th * 1.2)

        cv2.rectangle(det_img , (x1 , y1) , (x1 + tw , y1 - th) , color , -1)
        cv2.rectangle(mask_img , (x1 , y1) , (x1 + tw , y1 - th) , color , -1)

        cv2.putText(det_img, caption, (x1, y1), cv2.FONT_HERSHEY_COMPLEX, size, (255,255,255), text_thickness, cv2.LINE_AA)
        cv2.putText(mask_img, caption, (x1, y1), cv2.FONT_HERSHEY_COMPLEX, size, (255,255,255), text_thickness, cv2.LINE_AA)

    return cv2.addWeighted(mask_img, mask_alpha, det_img, 1 - mask_alpha, 0)

In [12]:
def tracking_video(vid_path, detector, tracker, is_save_result=False, save_dir='tracking_results'):
    cap = cv2.VideoCapture(vid_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    if is_save_result:
        os.makedirs(save_dir, exist_ok=True)
        fps = int(cap.get(cv2.CAP_PROP_FPS))

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')  

        save_result_name = 'out1.mp4'
        save_result_path = os.path.join(save_dir, save_result_name)
        out = cv2.VideoWriter(save_result_path, fourcc, fps, (width, height))

    all_tracking_res = []
    tracked_ids = np.array([], dtype=np.int32)
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        detector_res = detector.detect(frame)
        bboxes, scores, class_ids = detector_res

        tracker_pred = tracker.tracking(origin_frame=frame, bboxes=bboxes, scores=scores, class_ids=class_ids)
        if tracker_pred.size > 0:
            bboxes = tracker_pred[:, :4]
            class_ids = tracker_pred[:, 4].astype(int)
            conf_scores = tracker_pred[:, 5]
            tracking_ids = tracker_pred[:, 6].astype(int)

            new_ids = np.setdiff1d(tracking_ids, tracked_ids)

            tracked_ids = np.concatenate((tracked_ids, new_ids))

            res_img = draw_detection(
                img=frame,
                bboxes=bboxes,
                scores=conf_scores,
                class_ids=class_ids,
                ids=tracked_ids
            )
        else:
            res_img = frame

        all_tracking_res.append(tracker_pred)

        if is_save_result:
            out.write(res_img)

        if cv2.waitKey(25) & 0xFF == ord('q'):
            break
    
    cap.release()
    if is_save_result:
        out.release()
    cv2.destroyAllWindows()

    return all_tracking_res


In [13]:
detector = YOLOv8("best.pt")
tracker = DeepSort()

all_tracking_res = tracking_video("CityRoam.mp4", detector, tracker, is_save_result=True)