In [1]:
import cv2
import numpy as np
import pandas as pd
from ultralytics import YOLO
import torch
import json
from typing import List
from ultralytics.engine.results import Results
import os

In [2]:
def draw_lines(annotated_frame, data: dict) -> list[tuple]:
    areas = data["areas"][0]
    new_areas: list = []
    for point in areas:
        x, y = int(point[0] * annotated_frame.shape[1]), int(point[1] * annotated_frame.shape[0])
        new_areas.append((x, y))
    new_areas.sort(key=lambda x: x[1])
    for i in range(0, len(new_areas), 2):
        cv2.line(annotated_frame, new_areas[i], new_areas[i+1], (0, 255, 0), 2)
    return new_areas


def is_intersecting(line_start, line_end, point):
    x1, y1 = line_start
    x2, y2 = line_end
    x, y = point
    k = (y2-y1)/(x2-x1)
    b = y1 - k*x1
    if abs(int(y) - int(k*x + b)) < 10:
        return True
    return False


def is_inside(area, point_x, point_y):
    vertx = [points[0] for points in area]
    verty = [points[1] for points in area]
    c = False
    j = len(vertx)-1
    for i in range(len(vertx)):
        if ( ((verty[i]>point_y) != (verty[j]>point_y)) and
        (point_x < (vertx[j]-vertx[i]) * (point_x-verty[i]) / (verty[j]-verty[i]) + vertx[i]) ):
            c = not c
        j = i
    return c

In [9]:
FRAME_ANALYSIS_FREQ = 4
VIDEO_DIR = "vid"
JSON_DIR = "jsons"
USED_CLASSES = [2, 5, 7] # 2 - car, 5 - bus, 7 - truck

init_keys = ["file_name", "car", "quantity_car", "average_speed_car", "van", "quantity_van", "average_speed_van", "bus", "quantity_bus", "average_speed_bus"]
result_df = pd.DataFrame(dict.fromkeys(init_keys, []))
print(result_df)

vids = os.listdir(VIDEO_DIR)
print("Доступные видео: ", vids)
for vid_fname_idx in range(len(vids)):
    vid_fname = vids[vid_fname_idx]
    json_name = vid_fname.split('.mp4')[0]

    with open(f'{JSON_DIR}/{json_name}.json', 'r', encoding='utf-8') as f:
        data = json.load(f)

    model = YOLO('yolov8x.pt')

    vid_path = f"{VIDEO_DIR}/{vid_fname}"
    vid_capture = cv2.VideoCapture(vid_path)
    vid_fps = vid_capture.get(cv2.CAP_PROP_FPS)
    print(f"=== {vid_fname} - {vid_fps} FPS ===")

    appeared_ids = []
    count_dict = dict.fromkeys(USED_CLASSES, 0)

    ids_average_speed : dict = {}
    last_center_bbox : dict = {}

    frame_idx = 0

    while vid_capture.isOpened():
        success, frame = vid_capture.read()

        if success:
            if frame_idx % FRAME_ANALYSIS_FREQ == 0:
                # !!! Изменить verbose на True если нужна инфа от модели !!!
                track_results : List[Results] = model.track(frame, persist=True, classes=USED_CLASSES, verbose=False, conf=0.5) 
                annotated_frame = track_results[0].plot()
                
                try:
                    areas = draw_lines(annotated_frame=annotated_frame, data=data)

                    bb_on_frame_ids : np.ndarray = track_results[0].boxes.id.numpy()
                    bb_on_frame_cls : np.ndarray = track_results[0].boxes.cls.numpy()
                    bb_center : np.ndarray = track_results[0].boxes.xywh.numpy()

                    for idx in range(len(bb_on_frame_ids)):
                        if (bb_on_frame_ids[idx] not in appeared_ids) and is_inside(areas, bb_center[idx][0], bb_center[idx][1]):
                            appeared_ids.append(bb_on_frame_ids[idx])
                            count_dict[bb_on_frame_cls[idx]] += 1

                        # TODO: Фиксануть рассчет скорости (траблы с is_intersecting)
                        for j in range(0, len(areas), 2):
                            if is_intersecting(areas[j], areas[j+1], bb_center[idx][:2]) and is_inside(areas, bb_center[idx][0], bb_center[idx][1]):
                                ids_average_speed.setdefault(f"{str(bb_on_frame_ids[idx])}", []).append(frame_idx)
                    
                    # print(ids_average_speed)

                    with open(f"{JSON_DIR}/{json_name}_average_speed.json", "w", encoding='utf-8') as json_data:
                        json.dump(ids_average_speed, json_data)

                except: pass

                # print(f"Количество обьектов класса на {frame_idx} фрейм", count_dict)

                annotated_frame = cv2.resize(annotated_frame, (1280, 640))                
                cv2.imshow("YOLOv8 Tracking", annotated_frame)
                key = cv2.waitKey(2)
    
                if  key == 27:
                    break
            frame_idx += 1
        else:
            break
    
    print(f"Финальное количество объектов в видео {vid_fname}", count_dict)
    vid_fname_formatless = vid_fname.split(".")[0]
    # !!! Там где нолики места для средней скорости !!!
    data_inserted = [vid_fname_formatless, "car", count_dict[2], 0, "van", count_dict[7], 0, "bus", count_dict[5], 0] 

    result_df.loc[vid_fname_idx] = data_inserted

    vid_capture.release()
    cv2.destroyAllWindows()

print(result_df)

Empty DataFrame
Columns: [file_name, car, quantity_car, average_speed_car, van, quantity_van, average_speed_van, bus, quantity_bus, average_speed_bus]
Index: []
Доступные видео:  ['KRA-25-73-2023-08-24-morning.mp4']
=== KRA-25-73-2023-08-24-morning.mp4 - 30.0 FPS ===
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
Финальное количество объектов в видео KRA-25-73-2023-08-24-morning.mp4 {2: 7, 5: 2, 7: 1}
                      file_name  car  quantity_car  average_speed_car  van  \
0  KRA-25-73-2023-08-24-morning  car             7                  0  van   

   quantity_van  average_speed_van  bus  quantity_bus  average_speed_bus  
0             1                  0  bus             2                  0  


In [19]:
VIDEO_NAME = "KRA-25-73-2023-08-24-morning.mp4"
SAVE_DIR = "vid_frames"

np.random.seed(10)

vid_capture = cv2.VideoCapture(f'{VIDEO_DIR}/{VIDEO_NAME}')
total_frame_count = vid_capture.get(cv2.CAP_PROP_FRAME_COUNT)

frame_range = np.arange(total_frame_count)
chosen_frames = np.random.choice(frame_range, size = 50)

print(chosen_frames)

try:
    os.mkdir(f"{SAVE_DIR}/{VIDEO_NAME.split('.')[0]}")
except FileExistsError:
    pass

for c_f in chosen_frames:
    vid_capture.set(cv2.CAP_PROP_POS_FRAMES, c_f)
    success, frame = vid_capture.read()
    
    if success:
        cv2.imwrite(f'{SAVE_DIR}/{VIDEO_NAME.split(".")[0]}/frame_{int(c_f)}.jpg', frame)
    else: break


vid_capture.release()

[       5758         363       34581       12697       12392       31713        5958       23497       10419       26060       27146       28075       35145       27319       11123       32571       15296       32617       11694       16627       19094       30127        7113       33873       16400       12804
       34826       15251       21188        2281       11727        3570        4807       32985         751       28721       22281        3258       14522         951       34372       27108        7176        9402       12199        9485        3127         325        8621       29296]
