In [79]:
import sys
sys.path.append(f"/home/bowen68/projects/bisque/ByteTrack")
from IPython import display
display.clear_output()
import supervision
import os
%matplotlib inline
import torch
from utils.general import non_max_suppression

In [100]:
from supervision.draw.color import ColorPalette
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
# from supervision.tools.detections import BoxAnnotator
from supervision.tools.detections import Detections, BoxAnnotator
from supervision.tools.line_counter import LineCounter, LineCounterAnnotator
# from tracking_utils import Detection

In [148]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

## Track utils

In [6]:
from typing import List

import numpy as np


# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections, 
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)
    
    tracker_ids = [None] * len(detections)
    
    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

## Load yolo model

In [7]:
from utils.torch_utils import select_device, time_sync
from models.common import DetectMultiBackend

In [127]:
device = ''
weights = 'runs/train/bowen-run-27-new/weights/best.pt'
root_dir = os.getcwd()

data = 'data/mare.yaml'
data = os.path.join(root_dir, data)
device = select_device(device)


model = DetectMultiBackend(weights, device=device, data=data)
stride, names, pt = model.stride, model.names, model.pt
imgsz = (1088, 1920)
model.warmup(imgsz=(1, 3, *imgsz))
print((1, 3, *imgsz))

YOLOv5 🚀 2023-2-20 torch 1.13.1 CUDA:0 (NVIDIA GeForce RTX 2080 Ti, 11020MiB)

Fusing layers... 
YOLOv5l summary: 367 layers, 46156743 parameters, 0 gradients, 107.8 GFLOPs


(1, 3, 1088, 1920)


### data

In [145]:
from utils.datasets import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
from utils.augmentations import letterbox #padded resize 
CLASS_NAMES_DICT = model.model.names
CLASS_ID = [0,1,2,3,4,5,6,7,8,9]
HOME = os.getcwd()
# SOURCE_VIDEO_PATH = f"{HOME}/examples/test_video.mp4"
SOURCE_VIDEO_PATH = f"{HOME}/examples/example30s.mp4"
TARGET_VIDEO_PATH = f"{HOME}/examples/output/example30s.mp4"
VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

VideoInfo(width=1920, height=1080, fps=30, total_frames=882)

### Detections

In [None]:
conf_thres = 0.25
iou_thres = 0.45
max_det = 1000

# create frame generator
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
# create instance of BoxAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
# acquire first video frame
iterator = iter(generator)
frame0 = next(iterator)
# padded resize from (1080 X 1920) to (1088 X 1920)
frame = frame0 / 255
frame = letterbox(frame, imgsz, stride=32, auto=True)[0]
# (1088 x 1920 x 3) -> (3 x 1088 x 1920)
frame = frame.transpose((2, 0, 1))
frame = np.ascontiguousarray(frame)
frame = torch.from_numpy(frame).to(device).float()
frame = frame[None] # expand for batch dim
# model prediction on single frame and conversion to supervision Detections
pred0 = model(frame, augment=None, visualize=False)

# pred = non_max_suppression(pred0, conf_thres, iou_thres, max_det=max_det)
pred = non_max_suppression(pred0, conf_thres, iou_thres, None, False, max_det=max_det)
# x_min, y_min, x_max, y_max, confidence, class_id in pred[0].cpu().numpy():
det = pred[0].cpu().numpy()
xyxy = det[:,:4]
confidence = det[:,4]
class_id = det[:,5].astype(int)

detections = Detections(
    xyxy=xyxy,
    confidence=confidence,
    class_id=class_id
)
# format custom labels
labels = [
    f"{CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
    for _, confidence, class_id, tracker_id
    in detections
]
# # annotate and display frame
frame = box_annotator.annotate(frame=frame0, detections=detections, labels=labels)

%matplotlib inline
show_frame_in_notebook(frame0, (16, 16))

### Tracking

In [147]:
SOURCE_VIDEO_PATH = f"{HOME}/examples/example30s.mp4"
TARGET_VIDEO_PATH = f"{HOME}/examples/output/example30s.mp4"

conf_thres = 0.25
iou_thres = 0.45
max_det = 1000
LINE_START = Point(50, 800)
LINE_END = Point(1920-50, 800)
from tqdm.notebook import tqdm

# create BYTETracker instance
byte_tracker = BYTETracker(BYTETrackerArgs())
# create VideoInfo instance
video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
# create frame generator
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
# create LineCounter instance
line_counter = LineCounter(start=LINE_START, end=LINE_END)
# create instance of BoxAnnotator and LineCounterAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
line_annotator = LineCounterAnnotator(thickness=4, text_thickness=4, text_scale=2)

# open target video file
with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    # loop over video frames
    for frame0 in tqdm(generator, total=video_info.total_frames):
        # model prediction on single frame and conversion to supervision Detections
        frame = frame0 / 255
        frame = letterbox(frame, imgsz, stride=32, auto=True)[0]
        # (1088 x 1920 x 3) -> (3 x 1088 x 1920)
        frame = frame.transpose((2, 0, 1))
        frame = np.ascontiguousarray(frame)
        frame = torch.from_numpy(frame).to(device).float()
        frame = frame[None] # expand for batch dim
        # model prediction on single frame and conversion to supervision Detections
        pred0 = model(frame, augment=None, visualize=False)

        # pred = non_max_suppression(pred0, conf_thres, iou_thres, max_det=max_det)
        pred = non_max_suppression(pred0, conf_thres, iou_thres, None, False, max_det=max_det)
        det = pred[0].cpu().numpy()
        xyxy = det[:,:4]
        confidence = det[:,4]
        class_id = det[:,5].astype(int)

        detections = Detections(
            xyxy=xyxy,
            confidence=confidence,
            class_id=class_id
        )

        # results = model(frame)
        # detections = Detections(
        #     xyxy=results[0].boxes.xyxy.cpu().numpy(),
        #     confidence=results[0].boxes.conf.cpu().numpy(),
        #     class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        # )
        # filtering out detections with unwanted classes
        # mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
        # detections.filter(mask=mask, inplace=True)
        # tracking detections
        tracks = byte_tracker.update(
            output_results=detections2boxes(detections=detections),
            img_info=frame0.shape,
            img_size=frame0.shape
        )
        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        detections.tracker_id = np.array(tracker_id)
        # filtering out detections without trackers
        mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        # format custom labels
        labels = [
            f"#{tracker_id} {CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
            for _, confidence, class_id, tracker_id
            in detections
        ]
        # updating line counter
        line_counter.update(detections=detections)
        # annotate and display frame
        frame0 = box_annotator.annotate(frame=frame0, detections=detections, labels=labels)
        line_annotator.annotate(frame=frame0, line_counter=line_counter)
        sink.write_frame(frame0)

  0%|          | 0/882 [00:00<?, ?it/s]

KeyboardInterrupt: 