## Setup

In [67]:
!pip install ultralytics

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.1.47  Python-3.12.3 torch-2.2.2+cu121 CUDA:0 (NVIDIA GeForce RTX 3050, 8192MiB)
Setup complete  (32 CPUs, 31.8 GB RAM, 479.6/930.7 GB disk)


In [68]:
!pip install supervision

from IPython import display
display.clear_output()

import supervision as sv
print("supervision.__version__:", sv.__version__)

supervision.__version__: 0.19.0


In [69]:
MODEL = "yolov8x.pt"
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names

# class_ids of interest - car, motorcycle, bus and truck
selected_classes = [1]

YOLOv8x summary (fused): 268 layers, 68200608 parameters, 0 gradients


In [70]:
import os
import csv
from tqdm.notebook import tqdm
import numpy as np
import math
import torch
from IPython import display
import subprocess


## Utils

In [71]:
import VARIABLES2
import importlib
importlib.reload(VARIABLES2)
from VARIABLES2 import *

In [72]:
def calculate_hypotenuse(a, b):
  return math.sqrt(a**2 + b**2)
  

In [73]:
class VideoInfoHandler():
  def __init__(self) -> None:
    self.video_info = None
    self.va_params = {}
    self.line_zone_annotators = []
    self.label_annotator = None
    self.trace_annotator = None
    self.byte_tracker = None
    self.line_zones = [] 
    pass
    
  def re_init(self, SOURCE_VIDEO_PATH):
    self.video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
    self.init_va_params()
    self.init_line_zone_annotators()
    self.label_annotator = sv.LabelAnnotator( text_thickness=self.va_params["text_thickness"], text_scale=self.va_params["text_scale"])
    self.trace_annotator = sv.TraceAnnotator(thickness=self.va_params["thickness"], trace_length=self.va_params["trace_length"])
    self.byte_tracker = sv.ByteTrack(
      track_activation_threshold=0.25, lost_track_buffer=30, minimum_matching_threshold=0.8, frame_rate=self.video_info.fps
    )
    self.init_line_zones()
    
  def init_va_params(self):
    video_default_size = calculate_hypotenuse(1920, 1080)
    video_current_size = calculate_hypotenuse(self.video_info.width, self.video_info.height)
    proportion = video_current_size / video_default_size
    self.va_params = {
        "thickness": round(THICKNESS_DEFAULT * proportion),
        "text_thickness": round(TEXT_THICKNESS_DEFAULT * proportion),
        "text_scale": TEXT_SCALE_DEFAULT * proportion,
        "trace_length": round(TRACE_LENGTH_DEFAULT * proportion),
    }
    
  def init_line_zone_annotators(self):
    self.line_zone_annotators = [sv.LineZoneAnnotator(
                thickness=self.va_params["thickness"],
                text_thickness=self.va_params["text_thickness"],
                text_scale=self.va_params["text_scale"]
                )
              for _ in range(3)]
    
  def get_line_zones(self):
    line_zones = []
    for i in [-1, 0, 1]:
        x = self.video_info.width * (1 / 2 + i * 0.15)
        line_zones.append(
          sv.LineZone(
          start=sv.Point( x, 0),
          end=sv.Point(x, self.video_info.height)
          )
        )
    return line_zones
          
  def init_line_zones(self):
    new_line_zones = self.get_line_zones()
    if self.line_zones:
      for i, ex_line in enumerate(self.line_zones):
        new_line_zones[i].in_count = ex_line.in_count
        new_line_zones[i].out_count = ex_line.out_count
    
    self.line_zones = new_line_zones
    
    
    

  

In [74]:

def process_video(
    source_path: str,
    target_path: str,
    callback,
    stride=1,
) -> None:
    """
    Process a video file by applying a callback function on each frame
        and saving the result to a target video file.

    Args:
        source_path (str): The path to the source video file.
        target_path (str): The path to the target video file.
        callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in
            a numpy ndarray representation of a video frame and an
            int index of the frame and returns a processed numpy ndarray
            representation of the frame.

    Examples:
        ```python
        import supervision as sv

        def callback(scene: np.ndarray, index: int) -> np.ndarray:
            ...

        process_video(
            source_path=<SOURCE_VIDEO_PATH>,
            target_path=<TARGET_VIDEO_PATH>,
            callback=callback
        )
        ```
    """
    source_video_info = sv.VideoInfo.from_video_path(video_path=source_path)
    with sv.VideoSink(target_path=target_path, video_info=source_video_info) as sink:
        for index, frame in tqdm(enumerate(
            sv.get_video_frames_generator(source_path=source_path, stride=stride)
        ), desc=" Video processing", position=1, leave=False, total=source_video_info.total_frames):
            result_frame = callback(frame, index)
            sink.write_frame(frame=result_frame)



In [75]:
import VARIABLES2
import importlib
importlib.reload(VARIABLES2)
from VARIABLES2 import *


vih = VideoInfoHandler()

def callback(frame: np.ndarray, index:int) -> np.ndarray:
    # model prediction on single frame and conversion to supervision Detections
    results = model(frame, verbose=False, device=torch.device("cuda:0"))[0]
    detections = sv.Detections.from_ultralytics(results)
    # only consider class id from selected_classes define above 
    detections = detections[np.isin(detections.class_id, selected_classes)]
    # tracking detections
    detections = vih.byte_tracker.update_with_detections(detections)
    labels = [
        f"#{tracker_id} {model.model.names[class_id]} {confidence:0.2f}"
        for confidence, class_id, tracker_id
        in zip(detections.confidence, detections.class_id, detections.tracker_id)
    ]
    annotated_frame = vih.trace_annotator.annotate(
        scene=frame.copy(),
        detections=detections
    )
    annotated_frame=vih.label_annotator.annotate(
        scene=annotated_frame,
        detections=detections,
        labels=labels)

    # update line counter
    for line_zone in vih.line_zones:
        line_zone.trigger(detections)
    # return frame with box and line annotated result
    for i in range(3):
        annotated_frame = vih.line_zone_annotators[i].annotate(annotated_frame, line_counter=vih.line_zones[i])
    return  annotated_frame


videos_folder = "3-3"
videos_folder_path = os.path.join("full_recordings", videos_folder)


data = [["file_name", "in", "out"]]

prev_in, prev_out = 0,  0




with open(os.path.join("results", f"{videos_folder}.csv"), "w", newline="") as csv_output:
  writer = csv.writer(csv_output)
  try:
    for file_name in tqdm(os.listdir(videos_folder_path), desc=" Videos remaining", position=0):
      file_path = os.path.join(videos_folder_path, file_name)
      vih.re_init(file_path)
      process_video(
        source_path = file_path,
        target_path = TARGET_DUMMY_VIDEO_PATH,
        callback=callback,
      )
      max_in = max(vih.line_zones, key=lambda x: x.in_count)
      max_out = max(vih.line_zones, key=lambda x: x.out_count)
      
      data.append([file_name, max_in.in_count - prev_in, max_out.out_count - prev_out])
      prev_in = max_in.in_count
      prev_out = max_out.out_count
      
  except KeyError as e:
    print(e)
  finally: 
    writer.writerows(data)
    result = subprocess.run(f"echo {max_in.in_count}, {max_out.out_count} > results/{videos_folder}.txt", shell=True, capture_output=True, text=True)

    # Check if the command was successful
    if result.returncode == 0:
        print(result.stdout)
    else:
        print("Command failed with error:")
        print(result.stdout)
        print(result.stderr)
    print(max_in.in_count, max_out.out_count)



 Videos remaining:   0%|          | 0/5 [00:00<?, ?it/s]

 Video processing:   0%|          | 0/18032 [00:00<?, ?it/s]

 Video processing:   0%|          | 0/18031 [00:00<?, ?it/s]

 Video processing:   0%|          | 0/18032 [00:00<?, ?it/s]

 Video processing:   0%|          | 0/18031 [00:00<?, ?it/s]

 Video processing:   0%|          | 0/2682 [00:00<?, ?it/s]


153 114
