In [16]:
# !pip install ultralytics
# !pip install supervision==0.1.0
# !pip install onemetric
# !pip install numpy==1.23

# %cd {HOME}/third_party
# !git clone https://github.com/ifzhang/ByteTrack.git
# !cd ByteTrack && pip install -q -r requirements.txt
# !cd ByteTrack && python setup.py -q develop
# !pip install -q cython
# !pip install -q cython_bbox
# !pip install -q onemetric
# !pip install supervision==0.1.0
# !pip install ipywidgets

In [1]:
import os
import cv2
from glob import glob

import yaml

with open('param.yaml') as f:
    params = yaml.load(f, Loader=yaml.FullLoader)
    print(params)

video_name = params['video_name']
desired_fps = params['desired_fps']
MODEL = params['model'] + '.pt'

HOME = os.getcwd()
Video_path = HOME + "/Data"

if not os.path.exists(Video_path):
    os.makedirs(Video_path)

SOURCE_VIDEO_PATH = f"{Video_path}/{video_name}"


video = cv2.VideoCapture(SOURCE_VIDEO_PATH)
video.set(cv2.CAP_PROP_POS_MSEC, 100)

w = round(video.get(cv2.CAP_PROP_FRAME_WIDTH))
h = round(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = video.get(cv2.CAP_PROP_FPS)

clip = glob('Data/*avi')[0]
clip_name = clip.split(".")[0] + f"_fps{desired_fps}"
command = f"ffmpeg -i {clip} -filter:v fps={desired_fps} {clip_name}.avi -y"
os.system(command)
SOURCE_VIDEO_PATH = clip_name + '.avi'

{'video_name': 'street-4.avi', 'desired_fps': 15, 'model': 'yolov8x'}
/home/oem/workspace/Yolo_counting/Data
/home/oem/workspace/Yolo_counting/Data/street-4.avi


ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 9 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-avresample --disable-filter=resample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librsvg --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --e

In [42]:
from IPython import display
display.clear_output()


import sys
sys.path.append(f"{HOME}/third_party/ByteTrack")


import yolox
print("yolox.__version__:", yolox.__version__)

yolox.__version__: 0.1.0


In [43]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

In [44]:
from IPython import display
display.clear_output()


import supervision
print("supervision.__version__:", supervision.__version__)

supervision.__version__: 0.1.0


In [45]:
from supervision.draw.color import ColorPalette
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
from supervision.tools.detections import Detections, BoxAnnotator
# from supervision.tools.line_counter import LineCounter, LineCounterAnnotator

In [46]:
from typing import List

import numpy as np


# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections, 
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)
    
    tracker_ids = [None] * len(detections)
    
    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

In [48]:
from ultralytics import YOLO
model = YOLO(MODEL)
model.to('cuda:0')
model.fuse()

YOLOv8x summary (fused): 268 layers, 68200608 parameters, 0 gradients, 257.8 GFLOPs


In [49]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
# class_ids of interest - car, motorcycle, bus and truck
CLASS_ID = [2, 3, 5, 7]

In [26]:
# settings
LINE_START = Point(150, 380)
LINE_END = Point(300, 700)

LINE_START_2 = Point(750, 300)
LINE_END_2 = Point(150, 380)

LINE_START_3 = Point(1100, 350)
LINE_END_3 = Point(750, 300)

LINE_START_4 = Point(300, 700)
LINE_END_4 = Point(1100, 350)

LINE_START_POINTS = [LINE_START, LINE_START_2, LINE_START_3, LINE_START_4]
LINE_END_POINTS = [LINE_END, LINE_END_2, LINE_END_3, LINE_END_4]

# make result folder
folder_path = f"Data/Result/{clip_name.split('/')[1]}"
if not os.path.exists(folder_path):
    os.makedirs(folder_path)

TARGET_VIDEO_PATH = f"Data/Result/{clip_name.split('/')[1]}/{clip_name.split('/')[1]}-result.avi"

print(TARGET_VIDEO_PATH)

Data/Result/street-4_fps15/street-4_fps15-result.avi


In [27]:
VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

VideoInfo(width=1280, height=720, fps=15, total_frames=2702)

In [28]:
from tqdm.notebook import tqdm
from third_party.Line_counter.line_counter_mod import LineCounter, LineCounterAnnotator

# create BYTETracker instance
byte_tracker = BYTETracker(BYTETrackerArgs())
# create VideoInfo instance
video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
# create frame generator
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
# create LineCounter instance
line_counter = LineCounter(start=LINE_START_POINTS, end=LINE_END_POINTS, class_id=CLASS_ID, class_name_dict=CLASS_NAMES_DICT)
# create instance of BoxAnnotator and LineCounterAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=1, text_thickness=1, text_scale=0.25)
line_annotator = LineCounterAnnotator(thickness=1, text_thickness=1, text_scale=0.25)

# open target video file
with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    # loop over video frames
    for frame in tqdm(generator, total=video_info.total_frames):
        # model prediction on single frame and conversion to supervision Detections
        results = model(frame)
        detections = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        # filtering out detections with unwanted classes
        mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        # tracking detections
        tracks = byte_tracker.update(
            output_results=detections2boxes(detections=detections),
            img_info=frame.shape,
            img_size=frame.shape
        )
        
        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        detections.tracker_id = np.array(tracker_id)
        # filtering out detections without trackers
        
        mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)

        detections.filter(mask=mask, inplace=True)
        # format custom labels
        labels = [
            f"#{tracker_id} {CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
            for _, confidence, class_id, tracker_id
            in detections
        ]
        # updating line counter
        line_counter.update(detections=detections)
        # annotate and display frame
        frame = box_annotator.annotate(frame=frame, detections=detections, labels=labels)
        line_annotator.annotate(frame=frame, line_counter=line_counter)
        sink.write_frame(frame)
        # show_frame_in_notebook(frame, (16, 16))
        # break
result_batch = line_annotator.result(line_counter=line_counter)

  0%|          | 0/2702 [00:00<?, ?it/s]

Ultralytics YOLOv8.0.34 ðŸš€ Python-3.8.16 torch-1.13.1+cu117 CUDA:0 (NVIDIA GeForce RTX 3070 Ti, 7979MiB)

0: 384x640 20 cars, 1 bus, 2 trucks, 21.5ms
Speed: 0.2ms pre-process, 21.5ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 19 cars, 2 buss, 2 trucks, 19.7ms
Speed: 0.6ms pre-process, 19.7ms inference, 1.4ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 18 cars, 1 bus, 3 trucks, 20.2ms
Speed: 0.2ms pre-process, 20.2ms inference, 1.7ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 19 cars, 1 bus, 2 trucks, 19.5ms
Speed: 0.2ms pre-process, 19.5ms inference, 1.3ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 21 cars, 1 bus, 2 trucks, 1 traffic light, 18.3ms
Speed: 0.2ms pre-process, 18.3ms inference, 0.7ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 20 cars, 2 buss, 2 trucks, 18.4ms
Speed: 0.2ms pre-process, 18.4ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 20 c

In [29]:
import pandas as pd
from IPython.display import display


df = pd.DataFrame()

for id, result in enumerate(result_batch):
    df[f"{id}-in"] = pd.Series(result[0])
    df[f"{id}-out"] = pd.Series(result[1])

display(df)

df.to_csv(f"Data/Result/{clip_name.split('/')[1]}/in_out_table.csv")

{'car': 28, 'motorcycle': 0, 'bus': 1, 'truck': 2}
{'car': 76, 'motorcycle': 0, 'bus': 20, 'truck': 17}
{'car': 16, 'motorcycle': 0, 'bus': 6, 'truck': 5}
{'car': 96, 'motorcycle': 0, 'bus': 3, 'truck': 12}


Unnamed: 0,0-in,0-out,1-in,1-out,2-in,2-out,3-in,3-out
car,28,14,76,99,16,39,96,66
motorcycle,0,0,0,0,0,0,0,0
bus,1,3,20,20,6,2,3,3
truck,2,0,17,8,5,10,12,12


In [30]:
import pandas as pd
from IPython.display import display

tracker_line = line_counter.tracker_line

dict_batch = []
Dict = {'car' : 0, 'bus' : 0, 'motorcycle':0, "truck" : 0}

for i in range(16):
    dict_batch.append(Dict.copy())

for id, key in enumerate(tracker_line):
    if(len(tracker_line[key]) >= 3):
        # 0 -> 0 : 0, 0 -> 1 : 1, 0 -> 2 : 2, 0 -> 3 : 3, 1 -> 0 : 4, 1 -> 1 : 5, ... 3 -> 3 : 15
        batch_id = tracker_line[key][1]*4 + tracker_line[key][-1]
        dict_batch[batch_id][str(CLASS_NAMES_DICT[tracker_line[key][0]])] = \
        dict_batch[batch_id].get(str(CLASS_NAMES_DICT[tracker_line[key][0]])) + 1

df = pd.DataFrame()

for id, dict in enumerate(dict_batch):
    if(id//4 == id%4):
        continue
    df[f"{id//4}-to-{id%4}"] = pd.Series(dict)

display(df)

df.to_csv(f"Data/Result/{clip_name.split('/')[1]}/result_table.csv")

Unnamed: 0,0-to-1,0-to-2,0-to-3,1-to-0,1-to-2,...,2-to-1,2-to-3,3-to-0,3-to-1,3-to-2
car,10,10,4,0,15,...,0,4,5,59,7
bus,0,1,0,0,0,...,0,0,0,0,0
motorcycle,0,0,1,0,0,...,0,0,0,0,0
truck,3,4,0,0,3,...,1,0,0,3,1
