In [None]:
!nvidia-smi

In [None]:
!pip install opencv-python

# Установка YOLOv8

In [None]:
# Pip install method (recommended)

!pip install ultralytics

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

# Установка ByteTrack

In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
%cd {HOME}
!git clone https://github.com/ifzhang/ByteTrack.git
%cd {HOME}/ByteTrack

# workaround related to https://github.com/roboflow/notebooks/issues/80
!sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

!pip3 install -q -r requirements.txt
!python3 setup.py -q develop
!pip install -q cython_bbox
!pip install -q onemetric
# workaround related to https://github.com/roboflow/notebooks/issues/112 and https://github.com/roboflow/notebooks/issues/106
!pip install -q loguru lap thop

from IPython import display
display.clear_output()


import sys
sys.path.append(f"{HOME}/ByteTrack")


import yolox
print("yolox.__version__:", yolox.__version__)

In [None]:
# Путь к файлу, который нужно изменить
file_path = f"{HOME}/ByteTrack/yolox/tracker/byte_tracker.py"

# Имя атрибута, который нужно изменить
old_attribute = "dtype=np.float"
new_attribute = "dtype=np.float64"

# Читаем файл и заменяем устаревший атрибут
with open(file_path, 'r') as file:
    file_content = file.read()

# Заменяем атрибут
file_content = file_content.replace(old_attribute, new_attribute)

# Записываем изменения обратно в файл
with open(file_path, 'w') as file:
    file.write(file_content)

print(f"Attribute '{old_attribute}' was replaced with '{new_attribute}' in {file_path}")

In [None]:
# Путь к файлу, который нужно изменить
file_path = f"{HOME}/ByteTrack/yolox/tracker/matching.py"

# Имя атрибута, который нужно изменить
old_attribute = "dtype=np.float"
new_attribute = "dtype=np.float64"

# Читаем файл и заменяем устаревший атрибут
with open(file_path, 'r') as file:
    file_content = file.read()

# Заменяем атрибут
file_content = file_content.replace(old_attribute, new_attribute)

# Записываем изменения обратно в файл
with open(file_path, 'w') as file:
    file.write(file_content)

print(f"Attribute '{old_attribute}' was replaced with '{new_attribute}' in {file_path}")

In [None]:
from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False

# Установка Roboflow Supervision

In [None]:
!pip install supervision==0.16.0


from IPython import display
display.clear_output()

import supervision
print("supervision.__version__:", supervision.__version__)

In [None]:
from supervision.draw.color import ColorPalette, DEFAULT_COLOR_PALETTE
from supervision.geometry.core import Point, Position
from supervision.utils.image import crop_image
from supervision.utils.video import VideoInfo, VideoSink, get_video_frames_generator
from supervision.utils.notebook import plot_image
from supervision.detection.annotate import Detections, BoxAnnotator
from supervision.detection.tools.polygon_zone import PolygonZone, PolygonZoneAnnotator

# Утилиты отслеживания

In [None]:
from typing import List
import numpy as np


# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections,
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(detections)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

# Загрузка модели YOLOv8

In [None]:
MODEL = "/kaggle/input/yolov8s-xixixi/yolov8s640.pt"

In [None]:
from ultralytics import YOLO

model = YOLO(MODEL)
model.fuse()

In [None]:
model.names

In [None]:
# dict maping class_id to class_name
CLASS_NAMES_DICT = model.names
# class_ids of interest - car, motorcycle, bus and truck
CLASS_ID = [0, 1, 2, 3, 4, 5]

NEEDED_CLASS_IDS = [0, 1, 5]

CLASS_ID_BY_NAME = {
    'bus': 0,
    'car': 1,
    'other': 2,
    'tramvay': 3,
    'trolleybus': 4,
    'van': 5
}

# Формирование объектов размеченных видео

In [None]:
import os
import json


FPS = 30


class AnnotatedVideo:
    def __init__(self, video_path, markup_path):
        self.video_path = video_path
        self.markup_path = markup_path
        self.annotation_data = self.load_annotation_data()
        self.video_info = VideoInfo.from_video_path(video_path)

    def load_annotation_data(self):
        with open(self.markup_path, 'r') as json_file:
            annotation_data = json.load(json_file)
        return annotation_data

    def get_video_name(self):
        return os.path.basename(self.video_path).rstrip('.mp4')
    
    def get_video_wh(self):
        return self.video_info.resolution_wh
    
    def __str__(self):
        return f"Video Path: {self.video_path}\nMarkup Path: {self.markup_path}\nAnnotation Data: {self.annotation_data}"

In [None]:
# Путь до корневой папки с видео и разметкой
root_folder = '/kaggle/input/h1y1y1y1y1'

NEEDED_VIDEO_FOLDERS = ['Video4']

# Получение списка папок с видео
video_folders = [f for f in os.listdir(root_folder) if os.path.isdir(os.path.join(root_folder, f)) and 'Video' in f]

# Создание списка аннотированных видео
annotated_videos = []
for video_folder in video_folders:
    if video_folder not in NEEDED_VIDEO_FOLDERS:
        continue
    videos_folder = os.path.join(root_folder, video_folder, video_folder)
    markup_folder = os.path.join(root_folder, 'Test_markup')

    # Получение списка видео и соответствующих разметок
    video_files = [f for f in os.listdir(videos_folder) if f.endswith('.mp4')]
    markup_files = [f for f in os.listdir(markup_folder) if f.endswith('.json')]

    # Проверка соответствия видео и разметки
    for video_file in video_files:
        video_path = os.path.join(videos_folder, video_file)
        markup_file = video_file.replace('.mp4', '.json')
        if markup_file in markup_files:
            markup_path = os.path.join(markup_folder, markup_file)
            annotated_video = AnnotatedVideo(video_path, markup_path)
            annotated_videos.append(annotated_video)

# Polygons

In [None]:
LENGTH_AREA = 20 #meters

In [None]:
from typing import Tuple


class ExtendedPolygonZone(PolygonZone):
    def __init__(
        self,
        area: np.ndarray,
        frame_resolution_wh: Tuple[int, int],
        start_zone: np.ndarray,
        end_zone: np.ndarray,
        triggering_position: Position = Position.CENTER,
    ):
        self.start_zone = PolygonZone(polygon=start_zone, frame_resolution_wh=frame_resolution_wh, triggering_position=triggering_position)
        self.end_zone = PolygonZone(polygon=end_zone, frame_resolution_wh=frame_resolution_wh, triggering_position=triggering_position)
        self.start_zone_detections = {}
        self.class_statistics = {'car': {"count": 0, "total_speed": 0.0}, 'van': {"count": 0, "total_speed": 0.0}, 'bus': {"count": 0, "total_speed": 0.0}}
        super().__init__(area, frame_resolution_wh, triggering_position)
        
    def update_statistics(self, mclass: str, speed: float):
        """
        Update statistics for the specified class.

        Parameters:
            class (str): The class (car, van, bus)
            speed (float): The speed of the detected object
        """
        if mclass in self.class_statistics:
            self.class_statistics[mclass]["count"] += 1
            self.class_statistics[mclass]["total_speed"] += speed
        
    def check_start_zone(self, detections: Detections, current_frame_id: int):
        mask = self.start_zone.trigger(detections=detections)
        filtered_detections = detections[mask]
        for i in range(len(filtered_detections)):
            # Преобразуй numpy массив в кортеж
            tracker_id = filtered_detections[i].tracker_id[0]

            # Преобразуй numpy массив в строку для class_id
            class_id_str = str(filtered_detections[i].class_id[0])

            # Используй кортеж и строку в качестве ключей в словаре
            self.start_zone_detections[tracker_id] = {
                'class_id': class_id_str,
                'current_frame_id': current_frame_id
            }

    def calc_speed_kmh(self, start_frame_id: int, end_frame_id: int):
        return (LENGTH_AREA / ((end_frame_id - start_frame_id) / FPS)) * 3.6
    
    def check_end_zone(self, detections: Detections, current_frame_id: int):
        mask = self.end_zone.trigger(detections=detections)
        filtered_detections = detections[mask]
        for i in range(len(filtered_detections)):
            # Преобразуй numpy массив в строку для class_id
            end_class_id_str = str(filtered_detections[i].class_id[0])

            # Теперь используй строку в качестве ключа в словаре
            end_class_id = CLASS_NAMES_DICT.get(int(end_class_id_str), 'unknown_class')

            # Преобразуй numpy массив в строку для tracker_id
            end_tracker_id = filtered_detections[i].tracker_id[0]

            if self.start_zone_detections.get(end_tracker_id):
                end_detection = self.start_zone_detections[end_tracker_id]
                self.update_statistics(mclass=end_class_id,
                                      speed=self.calc_speed_kmh(end_detection['current_frame_id'], current_frame_id))
                del self.start_zone_detections[end_tracker_id]
            else:
                pass
#                 if end_class_id in self.class_statistics and self.class_statistics[end_class_id]['count'] != 0:
#                     stats_objects = self.class_statistics[end_class_id]
#                     avg_speed = stats_objects['total_speed'] / stats_objects['count']
#                     self.update_statistics(mclass=end_class_id, speed=avg_speed)
#                 else:
#                     pass  # Обработка случая, когда класс отсутствует в статистике

    def get_average_speed(self, mclass: str) -> float:
        """
        Get the average speed for the specified class.

        Parameters:
            mclass (int): The class ID (1, 2, 3 for car, van, bus)

        Returns:
            float: The average speed for the specified class
        """
        if mclass in self.class_statistics and self.class_statistics[mclass]["count"] > 0:
            return self.class_statistics[mclass]["total_speed"] / self.class_statistics[mclass]["count"]
        else:
            return 0.0


In [None]:
import numpy as np
from typing import Tuple

class PolygonSink:
    def __init__(self, markup, frame_resolution_wh):
        self.frame_resolution_wh = frame_resolution_wh
        self.polygons = self.create_polygons(markup)
        self.count_polygon = len(markup['directions'])

    def scale_polygon(self, frame_resolution_wh, polygon_relative):
        w, h = frame_resolution_wh
        scaled_polygon = np.array(polygon_relative, dtype=float)
        scaled_polygon[:, 0] *= w  # Умножаем x-координаты на ширину
        scaled_polygon[:, 1] *= h  # Умножаем y-координаты на высоту
        scaled_polygon = scaled_polygon.astype(int)
        return np.array(scaled_polygon.tolist())

    def create_polygons(self, markup):
        polygons = []
        areas = markup["areas"]
        zones = markup["zones"]
        
        for i in range(len(areas)):
            polygon = self.scale_polygon(self.frame_resolution_wh, areas[i])  # Convert to numpy array
            start_zone = self.scale_polygon(self.frame_resolution_wh, zones[i*2]) # Convert to numpy array
            end_zone = self.scale_polygon(self.frame_resolution_wh, zones[i*2+1])  # Convert to numpy array
            extended_polygon_zone = ExtendedPolygonZone(
                area=polygon,
                frame_resolution_wh=self.frame_resolution_wh,
                triggering_position=Position.CENTER,
                start_zone=start_zone,
                end_zone=end_zone
            )

            polygons.append(extended_polygon_zone)
        
        return polygons


    
    def get_statistics(self):
        class_statistics = {'car': {"count": 0, "avg_speed": 0.0}, 'van': {"count": 0, "avg_speed": 0.0}, 'bus': {"count": 0, "avg_speed": 0.0}}
        for polygon in self.polygons:
            for key in list(class_statistics.keys()):
                class_statistics[key]['count'] += polygon.class_statistics[key]['count']
                if class_statistics[key]['avg_speed'] != 0.0:
                    class_statistics[key]['avg_speed'] = (class_statistics[key]['avg_speed'] + polygon.get_average_speed(key)) / self.count_polygon
                else:
                    class_statistics[key]['avg_speed'] = polygon.get_average_speed(key)
        return class_statistics
        

# CSV Logger

In [None]:
import csv

class CSVLogger:
    def __init__(self, path_to_csv: str, is_submission: bool = True):
        self.path_to_csv = path_to_csv
        self.is_submission = is_submission
        if not is_submission:
            self.create_header()
        else:
            self.create_header_submission()

    def create_header(self):
        header = ['file_name', 'car', 'quantity_car', 'average_speed_car',
                  'van', 'quantity_van', 'average_speed_van',
                  'bus', 'quantity_bus', 'average_speed_bus']

        with open(self.path_to_csv, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(header)
    
    def create_header_submission(self):
        header = ['file_name', 'quantity_car', 'average_speed_car',
                  'quantity_van', 'average_speed_van',
                  'quantity_bus', 'average_speed_bus']

        with open(self.path_to_csv, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(header)

    def log(self, name_video: str, quantity_car: int, average_speed_car: float,
            quantity_van: int,  average_speed_van: float,
            quantity_bus: int, average_speed_bus: float) -> None:
        if not self.is_submission:
            with open(self.path_to_csv, 'a') as f:
                writer = csv.writer(f)
                # KRA-2-7-2023-08-22-evening,car,430,32.64,van,15,25.30,bus,22,26.75
                writer.writerow([name_video, 'car', int(quantity_car*1.5), average_speed_car/1.5,
                                 'van', int(quantity_van*1.5), average_speed_van/1.5,
                                 'bus', int(quantity_bus*1.5), average_speed_bus/1.5,])
        else:
            with open(self.path_to_csv, 'a') as f:
                writer = csv.writer(f)
                # KRA-2-7-2023-08-22-evening,car,430,32.64,van,15,25.30,bus,22,26.75
                writer.writerow([name_video, int(quantity_car*1.5), average_speed_car/1.5, 
                                 int(quantity_van*1.5), average_speed_van/1.5,
                                 int(quantity_bus*1.5), average_speed_bus/1.5])

# Run Pipeline

In [None]:
from tqdm.notebook import tqdm
from collections import Counter
import json
import cv2

In [None]:
# create BYTETracker instance
logger = CSVLogger(path_to_csv=f"{HOME}/predict.csv")
SKIP_FRAMES = 2

# create instance of BoxAnnotator and LineCounterAnnotator
# box_annotator = BoxAnnotator(color=ColorPalette.default(), thickness=4, text_thickness=4, text_scale=2)

for video in annotated_videos:
    generator = get_video_frames_generator(video.video_path)
    FPS = video.video_info.fps
    
    # create polygons
    polygon_sink = PolygonSink(markup=video.annotation_data, frame_resolution_wh=video.get_video_wh())
    byte_tracker = BYTETracker(BYTETrackerArgs(), frame_rate=FPS/SKIP_FRAMES)
    
    id_current_frame = 0
    for frame in tqdm(generator, total=video.video_info.total_frames):
        if id_current_frame % SKIP_FRAMES != 0:
            id_current_frame += 1
            continue

        # model prediction on single frame and conversion to supervision Detections
        results = model(frame, verbose=False)
        detections = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        # filtering out detections with needed classes
        mask = np.array([class_id in NEEDED_CLASS_IDS for class_id in detections.class_id], dtype=bool)
        detections = detections[mask]

        # tracking detections
        tracks = byte_tracker.update(
            output_results=detections2boxes(detections=detections),
            img_info=frame.shape,
            img_size=frame.shape
        )
        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        detections.tracker_id = np.array(tracker_id)
        #print(tracker_id)
        # filtering out detections without trackers
        mask_is_none = np.array([tracker_id == None for tracker_id in detections.tracker_id], dtype=bool)
        none_detections = detections[mask_is_none]
        
        mask_by_none = np.array([tracker_id != None for tracker_id in detections.tracker_id], dtype=bool)
        detections = detections[mask_by_none]
        
        
        # print(detections.tracker_id)

        for polygon in polygon_sink.polygons:
            mask = polygon.trigger(detections=detections)
            filtered_detections = detections[mask]
            mask = polygon.trigger(detections=none_detections)
            filtered_none_detections = none_detections[mask]
            #print(len(filtered_detections))
            #print(polygon.class_statistics)
            polygon.check_start_zone(filtered_detections, id_current_frame)
            polygon.check_end_zone(filtered_detections, id_current_frame)
            
#             polygon.check_end_zone(filtered_none_detections, id_current_frame)


#         # format custom labels
#         labels = [
#             f"#{detection[4]} {CLASS_NAMES_DICT[detection[3]]} {detection[2]:0.2f}"
#             for detection
#             in detections
#         ]
#         # annotate and display frame
#         frame = box_annotator.annotate(scene=frame, detections=detections, labels=labels)
        id_current_frame += 1

    statistics = polygon_sink.get_statistics()
    print(video.get_video_name())
    print(statistics)
    logger.log(name_video=video.get_video_name(), quantity_car=statistics['car']['count'], 
               average_speed_car=statistics['car']['avg_speed'], 
               quantity_van=statistics['van']['count'], 
               average_speed_van=statistics['van']['avg_speed'],
               quantity_bus=statistics['bus']['count'],
               average_speed_bus=statistics['bus']['avg_speed'])       


In [None]:
video.get_video_name()

In [None]:
CLASS_NAMES_DICT 