In [14]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


In [1]:
import os
HOME = os.getcwd()
print(HOME)

/content


In [2]:
SOURCE_VIDEO_PATH =  f"{HOME}/resized_video.mp4"


In [3]:
%cd {HOME}
!git clone https://github.com/ifzhang/ByteTrack.git
%cd {HOME}/ByteTrack


!sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

!pip3 install -q -r requirements.txt
!python3 setup.py -q develop
!pip install -q cython_bbox
!pip install -q onemetric

!pip install -q loguru lap thop

from IPython import display
display.clear_output()


import sys
sys.path.append(f"{HOME}/ByteTrack")


import yolox
print("yolox.__version__:", yolox.__version__)

yolox.__version__: 0.1.0


In [10]:
# Pip install method (recommended)

!pip install ultralytics

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

Ultralytics YOLOv8.0.172 🚀 Python-3.10.12 torch-2.0.1+cu118 CPU (AMD EPYC 7B12)
Setup complete ✅ (2 CPUs, 12.7 GB RAM, 26.4/107.7 GB disk)


In [4]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

In [5]:
!pip install supervision==0.1.0


from IPython import display
display.clear_output()


import supervision
print("supervision.__version__:", supervision.__version__)

supervision.__version__: 0.1.0


In [6]:

from IPython import display
display.clear_output()


In [7]:
%%capture
from supervision.draw.color import ColorPalette
from supervision.draw.color import Color
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook

In [8]:
# settings
MODEL = "yolov8x.pt"

In [11]:
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()

Downloading https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x.pt to 'yolov8x.pt'...
100%|██████████| 131M/131M [00:00<00:00, 373MB/s]
YOLOv8x summary (fused): 268 layers, 68200608 parameters, 0 gradients, 257.8 GFLOPs


In [12]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
# class_ids of interest - car, motorcycle, bus and truck
CLASS_ID = [ 0, 1, 2, 3, 5, 7]

In [16]:
from typing import List, Optional, Tuple, Dict,Union
import pandas
import cv2
import numpy as np
import math
from supervision.draw.color import Color, ColorPalette


class Detections:
    def __init__(
        self,
        xyxy: np.ndarray,
        confidence: np.ndarray,
        class_id: np.ndarray,
        tracker_id: Optional[np.ndarray] = None,
        frame_rate: Optional[float] = None,
        pixels_per_meter: Optional[float] = None,
    ):
        """
        Data class containing information about the detections in a video frame.

        :param xyxy: np.ndarray : An array of shape (n, 4) containing the bounding boxes coordinates in format [x1, y1, x2, y2]
        :param confidence: np.ndarray : An array of shape (n,) containing the confidence scores of the detections.
        :param class_id: np.ndarray : An array of shape (n,) containing the class ids of the detections.
        :param tracker_id: Optional[np.ndarray] : An array of shape (n,) containing the tracker ids of the detections.
        :param frame_rate: Optional[float] : Frame rate of the video (frames per second).
        :param pixels_per_meter: Optional[float] : Pixels per meter calibration information.
        """
        self.xyxy: np.ndarray = xyxy
        self.confidence: np.ndarray = confidence
        self.class_id: np.ndarray = class_id
        self.tracker_id: Optional[np.ndarray] = tracker_id
        self.speeds_mph: Optional[np.ndarray] = None  # Store estimated speeds in mph

        n = len(self.xyxy)
        validators = [
            (isinstance(self.xyxy, np.ndarray) and self.xyxy.shape == (n, 4)),
            (isinstance(self.confidence, np.ndarray) and self.confidence.shape == (n,)),
            (isinstance(self.class_id, np.ndarray) and self.class_id.shape == (n,)),
            self.tracker_id is None
            or (
                isinstance(self.tracker_id, np.ndarray)
                and self.tracker_id.shape == (n,)
            ),
        ]
        if not all(validators):
            raise ValueError(
                "xyxy must be 2d np.ndarray with (n, 4) shape, "
                "confidence must be 1d np.ndarray with (n,) shape, "
                "class_id must be 1d np.ndarray with (n,) shape, "
                "tracker_id must be None or 1d np.ndarray with (n,) shape"
            )

        if frame_rate is not None and pixels_per_meter is not None:
            self.calculate_speeds(frame_rate, pixels_per_meter)

    def __len__(self):
        """
        Returns the number of detections in the Detections object.
        """
        return len(self.xyxy)

    def __iter__(self):
        """
        Iterates over the Detections object and yield a tuple of (xyxy, confidence, class_id, tracker_id) for each detection.
        """
        for i in range(len(self.xyxy)):
            yield (
                self.xyxy[i],
                self.confidence[i],
                self.class_id[i],
                self.tracker_id[i] if self.tracker_id is not None else None,
            )

    @classmethod
    def from_yolov5(cls, yolov5_output: np.ndarray):
        """
        Creates a Detections instance from a YOLOv5 output tensor

        :param yolov5_output: np.ndarray : The output tensor from YOLOv5
        :return: Detections : A Detections instance representing the detections in the frame

        Example:
        detections = Detections.from_yolov5(yolov5_output)
        """
        xyxy = yolov5_output[:, :4]
        confidence = yolov5_output[:, 4]
        class_id = yolov5_output[:, 5].astype(int)
        return cls(xyxy, confidence, class_id)

    def filter(self, mask: np.ndarray, inplace: bool = False) -> Optional[np.ndarray]:
        """
        Filter the detections by applying a mask

        :param mask: np.ndarray : A mask of shape (n,) containing a boolean value for each detection indicating if it should be included in the filtered detections
        :param inplace: bool : If True, the original data will be modified and self will be returned.
        :return: Optional[np.ndarray] : A new instance of Detections with the filtered detections, if inplace is set to False. None otherwise.
        """
        if inplace:
            self.xyxy = self.xyxy[mask]
            self.confidence = self.confidence[mask]
            self.class_id = self.class_id[mask]
            self.tracker_id = (
                self.tracker_id[mask] if self.tracker_id is not None else None
            )
            return self
        else:
            return Detections(
                xyxy=self.xyxy[mask],
                confidence=self.confidence[mask],
                class_id=self.class_id[mask],
                tracker_id=self.tracker_id[mask]
                if self.tracker_id is not None
                else None,
            )

    def calculate_speeds(self, frame_rate: float, pixels_per_meter: float):
        """
        Calculate estimated speeds for detections based on frame rate and pixels per meter.

        :param frame_rate: float : Frame rate of the video (frames per second).
        :param pixels_per_meter: float : Pixels per meter calibration information.
        """
        self.speeds_mph = []
        time_constant = 1 / frame_rate  # Time constant (1 frame duration in seconds)

        for i in range(len(self.xyxy)):
            location1 = (self.xyxy[i][0], self.xyxy[i][1])
            location2 = (self.xyxy[i][2], self.xyxy[i][3])

            d_pixel = math.sqrt(math.pow(location2[0] - location1[0], 2) + math.pow(location2[1] - location1[1], 2))
            d_meters = d_pixel / pixels_per_meter

            speed_mps = d_meters / time_constant
            speed_mph = speed_mps * 2.23694  # 1 m/s = 2.23694 mph
            self.speeds_mph.append(speed_mph)

        self.speeds_mph = np.array(self.speeds_mph)



class BoxAnnotator:
    def __init__(
        self,
        color: Union[Color, ColorPalette],
        thickness: int = 2,
        text_color: Color = Color.white(),
        text_scale: float = 0.5,
        text_thickness: int = 1,
        text_padding: int = 10,
    ):
        """
        A class for drawing bounding boxes on an image using detections provided.

        :param color: Union[Color, ColorPalette] :  The color to draw the bounding box, can be a single color or a color palette
        :param thickness: int :  The thickness of the bounding box lines, default is 2
        :param text_color: Color :  The color of the text on the bounding box, default is white
        :param text_scale: float :  The scale of the text on the bounding box, default is 0.5
        :param text_thickness: int :  The thickness of the text on the bounding box, default is 1
        :param text_padding: int :  The padding around the text on the bounding box, default is 5
        """
        self.color: Union[Color, ColorPalette] = color
        self.thickness: int = thickness
        self.text_color: Color = text_color
        self.text_scale: float = text_scale
        self.text_thickness: int = text_thickness
        self.text_padding: int = text_padding

    def annotate(
        self,
        frame: np.ndarray,
        detections: Detections,
        labels: Optional[List[str]] = None,
    ) -> np.ndarray:
        """
        Draws bounding boxes on the frame using the detections provided.

        :param frame: np.ndarray : The image on which the bounding boxes will be drawn
        :param detections: Detections : The detections for which the bounding boxes will be drawn
        :param labels: Optional[List[str]] :  An optional list of labels corresponding to each detection. If labels is provided, the confidence score of the detection will be replaced with the label.
        :return: np.ndarray : The image with the bounding boxes drawn on it
        """
        font = cv2.FONT_HERSHEY_SIMPLEX
        for i, (xyxy, confidence, class_id, tracker_id) in enumerate(detections):
            color = (
                self.color.by_idx(class_id)
                if isinstance(self.color, ColorPalette)
                else self.color
            )
            text = (
                f"{confidence:0.2f}"
                if (labels is None or len(detections) != len(labels))
                else labels[i]
            )

            x1, y1, x2, y2 = xyxy.astype(int)
            text_width, text_height = cv2.getTextSize(
                text=text,
                fontFace=font,
                fontScale=self.text_scale,
                thickness=self.text_thickness,
            )[0]

            text_x = x1 + self.text_padding
            text_y = y1 - self.text_padding

            text_background_x1 = x1
            text_background_y1 = y1 - 2 * self.text_padding - text_height

            text_background_x2 = x1 + 2 * self.text_padding + text_width
            text_background_y2 = y1

            cv2.rectangle(
                img=frame,
                pt1=(x1, y1),
                pt2=(x2, y2),
                color=color.as_bgr(),
                thickness=self.thickness,
            )
            cv2.rectangle(
                img=frame,
                pt1=(text_background_x1, text_background_y1),
                pt2=(text_background_x2, text_background_y2),
                color=color.as_bgr(),
                thickness=cv2.FILLED,
            )
            cv2.putText(
                img=frame,
                text=text,
                org=(text_x, text_y),
                fontFace=font,
                fontScale=1,
                color=self.text_color.as_rgb(),
                thickness=1,
                lineType=cv2.LINE_AA,
            )
        return frame


In [17]:
from typing import List

import numpy as np
import pandas

# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections,
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(detections)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

def estimatespeed(Location1, Location2):
    #Euclidean Distance Formula
    d_pixel = math.sqrt(math.pow(Location2[0] - Location1[0], 2) + math.pow(Location2[1] - Location1[1], 2))
    # defining the pixels per meter
    ppm = 8
    d_meters = d_pixel / ppm

    time_constant = 15 * 0.8
    # distance = speed/time
    speed = d_meters * time_constant

    # Convert meters to miles
    speed_mph = speed * 0.621371192

    return int(speed_mph)


In [21]:
import cv2

# Open the input video file
video = input("Enter the path of the input video:")
input_video = cv2.VideoCapture(video)  # Replace 'input_video.mp4' with your input video filename

# Get the original video's frames per second (fps), width, and height
fps = int(input_video.get(cv2.CAP_PROP_FPS))
original_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
original_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Create VideoWriter object to save the resized video
desired_width = 1280
desired_height = 960

output_video = cv2.VideoWriter(f"{HOME}/resized_video.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (desired_width, desired_height))

while True:
    ret, frame = input_video.read()
    if not ret:
        break

    # Resize the frame
    sample = cv2.resize(frame, (desired_width, desired_height))

    # Write the resized frame to the output video
    output_video.write(sample)

    #cv2.imshow('Resized Video', resized_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
input_video.release()
output_video.release()
cv2.destroyAllWindows()
video_path=f"{HOME}/resized_video.mp4"

Enter the path of the input video:class_sample1.mp4


In [24]:

from tqdm.notebook import tqdm
import cv2
import numpy as np
from IPython.display import display, clear_output
import time
import csv
from collections import defaultdict

# Define the classes and other variables
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names

# class_ids of interest - car, motorcycle, bus and truck
CLASS_ID = [0, 1 ,2, 3, 5, 7]
arrow_line_length = 20  # You can adjust this value

# Create BYTETracker instance
byte_tracker = BYTETracker(BYTETrackerArgs())
#video_path = input("Enter the path of the input video: ")
# Create VideoInfo instance
video_info = VideoInfo.from_video_path(video_path)
# Create frame generator
generator = get_video_frames_generator(video_path)
# Create instance of BoxAnnotator and LineCounterAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=1, text_thickness=1, text_scale=0.5)
vehicle_counts = defaultdict(int)
unique_vehicle_counts = defaultdict(set)
prev_midpoint = None
prev_time = None
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
font_thickness = 1
# Open target video file
lines = {}
previous_midpoints = {}
# Define a threshold for determining movement direction
MIDPOINT_THRESHOLD = 30  # Adjust this value as needed

# Create a CSV file for output
csv_file = open('output.csv', 'w', newline='')  # Open a CSV file in write mode
csv_writer = csv.writer(csv_file)  # Create a CSV writer object
csv_writer.writerow(['Class', 'ID',"direction","speed"])

# Open target video file
with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    # Initialize lines dictionary
    lines = {}

    # Loop over video frames
    for frame in tqdm(generator, total=video_info.total_frames):
        # Model prediction on single frame and conversion to Detections
        frame1=frame.copy()
        results = model(frame)
        detections1 = results[0].boxes  # Assuming 'boxes' attribute contains detection info

        detected_frame = frame.copy()
        result = model(frame1)
        detections = Detections(
            xyxy=result[0].boxes.xyxy.cpu().numpy(),
            confidence=result[0].boxes.conf.cpu().numpy(),
            class_id=result[0].boxes.cls.cpu().numpy().astype(int)
        )
        # Filtering out detections with unwanted classes
        mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)

        # tracking detections
        tracks = byte_tracker.update(
            output_results=detections2boxes(detections=detections),
            img_info=frame1.shape,
            img_size=frame1.shape
        )
        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        detections.tracker_id = np.array(tracker_id)
        # filtering out detections without trackers
        mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)

        labels = [
            f"{tracker_id} {CLASS_NAMES_DICT[class_id]}"
            for _, confidence, class_id, tracker_id
            in detections
        ]
        for _, _, class_id, tracker_id in detections:
            if tracker_id is not None:
                vehicle_counts[class_id] += 1
                unique_vehicle_counts[class_id].add(tracker_id)

        # Add unique count labels to the frame
        unique_count_labels = [
            f"{CLASS_NAMES_DICT[class_id]} Unique Count: {len(unique_vehicle_counts[class_id])}"
            for class_id in unique_vehicle_counts.keys()
        ]
        label_y = 60
        for label in unique_count_labels:
            cv2.putText(frame1, label, (10, label_y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            label_y += 30

        # Draw labels on the frame
        for label in labels:
            #cv2.putText(frame, label, (10, label_y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            label_y += 30

        detected_frame= box_annotator.annotate(frame=frame1, detections=detections, labels=labels)
        # Loop through each detection in the frame
        # ... (your existing code)
        # Loop through each detection in the frame
        for detection_id, detection in enumerate(detections1.data):
          class_id = int(detection[5])
          xyxy = detection[:4]
          x1, y1, x2, y2 = map(int, xyxy)
          class_name = CLASS_NAMES_DICT.get(class_id, "Unknown")
          if class_name in ['person','motorcycle','car', 'truck', 'bus', 'van', 'bicycle']:
            # Calculate midpoint
            midpoint_x = (x1 + x2) // 2
            midpoint_y = (y1 + y2) // 2

            if detection_id in previous_midpoints:
              prev_midpoint_x, _ = previous_midpoints[detection_id]
              x_change = midpoint_x - prev_midpoint_x
              speed = estimatespeed((midpoint_x, midpoint_y), previous_midpoints[detection_id])
              #csv_writer.writerow([CLASS_NAMES_DICT[class_id], tracker_id,speed])
              if abs(x_change) > MIDPOINT_THRESHOLD:
                direction = "right" if x_change > 0 else "left"
                speed = estimatespeed((midpoint_x, midpoint_y), previous_midpoints[detection_id])
                #print((midpoint_x, midpoint_y),(midpoint_x, midpoint_y)-5)

                  #cv2.putText(frame1, f"Speed: {speed} mph", (x1+10, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                cv2.putText(frame1, f"Direction: {direction}", (x1+10, y1 - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                csv_writer.writerow([CLASS_NAMES_DICT[class_id], tracker_id, direction,speed])
                #if speed <= 100:  # Check if speed is not above 100 mph
                   #cv2.putText(frame1, f"Speed: {speed} mph", (x1, y1 - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)


        previous_midpoints[detection_id] = (midpoint_x, midpoint_y)



        sink.write_frame(detected_frame)
csv_file.close()
print("Annotated video saved at:", TARGET_VIDEO_PATH)

  0%|          | 0/784 [00:00<?, ?it/s]


0: 480x640 1 person, 15 cars, 2 trucks, 13 traffic lights, 74.3ms
Speed: 2.5ms preprocess, 74.3ms inference, 2.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 15 cars, 2 trucks, 13 traffic lights, 73.5ms
Speed: 6.2ms preprocess, 73.5ms inference, 2.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 14 cars, 2 trucks, 13 traffic lights, 72.7ms
Speed: 3.1ms preprocess, 72.7ms inference, 1.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 14 cars, 2 trucks, 13 traffic lights, 46.1ms
Speed: 2.6ms preprocess, 46.1ms inference, 1.6ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 14 cars, 2 trucks, 12 traffic lights, 45.5ms
Speed: 2.2ms preprocess, 45.5ms inference, 1.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 14 cars, 2 trucks, 12 traffic lights, 46.5ms
Speed: 2.2ms preprocess, 46.5ms inference, 1.4ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 14 cars, 2 tru

Annotated video saved at: /content/output.mp4
