In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
SOURCE_VIDEO_PATH = f"/content/14101338_3840_2160_60fps.mp4"

In [None]:
!pip install "ultralytics<=8.3.40"

from IPython import display
display.clear_output()
!yolo settings sync=False

import ultralytics
ultralytics.checks()

In [None]:
%cd {HOME}
!git clone https://github.com/ifzhang/ByteTrack.git
%cd {HOME}/ByteTrack

!sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

!pip3 install -q -r requirements.txt
!python3 setup.py -q develop
!pip install -q cython_bbox
!pip install -q onemetric
!pip install -q loguru lap thop

from IPython import display
display.clear_output()


import sys
sys.path.append(f"{HOME}/ByteTrack")


import yolox
print("yolox.__version__:", yolox.__version__)

In [None]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

In [None]:
!pip install torchreid

In [None]:
import torch
import torchreid
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_reid = torchreid.models.build_model(
    name='osnet_x1_0',
    num_classes=1000,
    pretrained=True
)
model_reid.eval().to(device)

In [None]:
!pip install supervision==0.1.0


from IPython import display
display.clear_output()


import supervision
print("supervision.__version__:", supervision.__version__)

In [None]:
from supervision.draw.color import ColorPalette
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
from supervision.tools.detections import Detections, BoxAnnotator
from supervision.tools.line_counter import LineCounter, LineCounterAnnotator

In [None]:
from typing import List
import numpy as np
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)
def match_detections_with_tracks(
    detections: Detections,
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(detections)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            original_id = tracks[tracker_index].track_id
            wrapped_id = (original_id - 1) % 50 + 1
            tracker_ids[detection_index] = wrapped_id

    return tracker_ids

In [None]:
MODEL = "yolov8x.pt"
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()

In [None]:
from PIL import Image
import torch
from torchvision import transforms

reid_transform = transforms.Compose([
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225]),
])

def extract_reid_features(frame, bboxes, model):
    crops = []
    for box in bboxes:
        x1, y1, x2, y2 = map(int, box)
        crop = frame[y1:y2, x1:x2]
        if crop.size == 0:
            continue
        crop = Image.fromarray(crop[..., ::-1])
        crop = reid_transform(crop).unsqueeze(0).cuda()
        crops.append(crop)

    if not crops:
        return np.array([])

    with torch.no_grad():
        inputs = torch.cat(crops, dim=0)
        features = model(inputs).cpu().numpy()

    return features

In [None]:
LINE_START = Point(50, 1500)
LINE_END = Point(3840-50, 1500)

TARGET_VIDEO_PATH = f"{HOME}/vehicle-counting-result.mp4"

In [None]:
VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

In [None]:
import numpy as np
np.float = float

In [None]:
CLASS_NAMES_DICT = model.model.names
CLASS_ID = [0]

In [None]:
from tqdm.notebook import tqdm

byte_tracker = BYTETracker(BYTETrackerArgs())
video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
line_counter = LineCounter(start=LINE_START, end=LINE_END)
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
line_annotator = LineCounterAnnotator(thickness=4, text_thickness=4, text_scale=2)

import csv
from collections import defaultdict

entry_times = dict()
last_seen_frame = dict()
fps = video_info.fps
frame_idx = 0
final_log = []

with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame in tqdm(generator, total=video_info.total_frames):
        results = model(frame)
        detections = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        bboxes = detections.xyxy
        features = extract_reid_features(frame, bboxes, model_reid)

        if len(features) > 0:
            detection_features = np.hstack((detections2boxes(detections), features))
        else:
            detection_features = detections2boxes(detections)

        tracks = byte_tracker.update(
        output_results=detection_features,
        img_info=frame.shape,
        img_size=frame.shape,
        frame=frame,
        reid_model=model_reid
        )
        current_ids = set()

        for track in tracks:
            if not track.is_activated:
                continue
            original_id = track.track_id
            track_id = (original_id - 1) % 50 + 1
            current_ids.add(track_id)

            if track_id not in entry_times:
                entry_times[track_id] = frame_idx

            last_seen_frame[track_id] = frame_idx

        inactive_ids = []
        for track_id, last_frame in last_seen_frame.items():
            if track_id not in current_ids and frame_idx - last_frame > byte_tracker.args.track_buffer:
                entry_sec = entry_times[track_id] / fps
                exit_sec = last_frame / fps
                final_log.append((track_id, entry_sec, exit_sec))
                inactive_ids.append(track_id)

        for track_id in inactive_ids:
            entry_times.pop(track_id, None)
            last_seen_frame.pop(track_id, None)

        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        detections.tracker_id = np.array(tracker_id)
        mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        labels = [
            f"#{tracker_id} {CLASS_NAMES_DICT[class_id]} {confidence:0.2f}"
            for _, confidence, class_id, tracker_id
            in detections
        ]
        line_counter.update(detections=detections)
        frame = box_annotator.annotate(frame=frame, detections=detections, labels=labels)
        sink.write_frame(frame)
        frame_idx += 1
import pandas as pd

for track_id in list(last_seen_frame):
    entry_sec = entry_times[track_id] / fps
    exit_sec = last_seen_frame[track_id] / fps
    final_log.append((track_id, entry_sec, exit_sec))

df = pd.DataFrame(final_log, columns=["person_id", "entry_time", "exit_time"])
df["entry_time"] = pd.to_datetime(df["entry_time"], unit='s').dt.strftime('%H:%M:%S')
df["exit_time"] = pd.to_datetime(df["exit_time"], unit='s').dt.strftime('%H:%M:%S')

df.to_csv("person_entry_exit_log.csv", index=False)

In [None]:
from google.colab import files
files.download("person_entry_exit_log.csv")