In [1]:
import math
import os
from ultralytics import YOLO
import cv2
import numpy as np
from collections import defaultdict
import json
import matplotlib.path as mplPath

In [2]:
# Шаг кадра
FRAME_STEP = 3

# Путь до модели YOLO
MODEL_PATH = '/home/jupyter/datasphere/project/CERT/best.pt'

# Путь для записи файла результатов
RESULTS_PATH = '/home/jupyter/datasphere/project/CERT/results.txt'

# Путь к файлу задач
WORKS_FILE_PATH = 'works.txt'

In [3]:
def convert_to_real_coordinates(coordinates, resolution):
    new_coordinates = []

    for coord in coordinates:
        x = int(coord[0] * resolution[0])
        y = int(coord[1] * resolution[1])
        new_coordinates.append([x, y])

    return new_coordinates

In [4]:
file = open(WORKS_FILE_PATH, 'r')
array_of_video_names = file.read().splitlines()
file.close()

In [5]:
print(array_of_video_names)

['000', '001', '002', '003', '004', '005', '006', '007', '008', '009', '010', '011', '012', '013']


In [None]:
for video_name in array_of_video_names:
    # Чтение данных из JSON-файла
    print(os.path.join('/home/jupyter/datasphere/project/CERT/markup', 'jsons', video_name + '.json'))
    json_file = open(os.path.join('/home/jupyter/datasphere/project/CERT/markup', 'jsons', video_name + '.json'))
    json_data = json.load(json_file)
    areas = json_data['areas']

    
    # Инициализация модели YOLO
    model = YOLO(MODEL_PATH)
    model.to('cuda')

    # Загрузка видео
    cap = cv2.VideoCapture(os.path.join('/home/jupyter/datasphere/project/CERT/video', video_name + '.mp4'))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Инициализация структур для хранения истории движения объектов
    track_history = defaultdict(lambda: [])

    storage = []
    class_names = ['tractor', 'bike', 'bus', 'car', 'motorcycle', 'trolleybus', 'truck']

    frame_number = 1

    frame_size = ()

    # Цикл обработки каждого кадра видео
    while cap.isOpened():
        success, frame = cap.read()

        if frame_number == 1:
            frame_size = (frame.shape[1], frame.shape[0])



        frame_number += 1
        if frame_number % FRAME_STEP != 0:
            continue

        if success:
            # Применение модели YOLO для отслеживания объектов на кадре
            results = model.track(frame, persist=True, verbose=False)

            if results[0].boxes.id is not None:

                boxes = results[0].boxes.xywh.cpu()
                track_ids = results[0].boxes.id.int().cpu().tolist()
                classes = results[0].boxes.cls.int().cpu().tolist()

                # Перебор распознанных объектов и их треков
                for box, track_id, class_id in zip(boxes, track_ids, classes):
                    x, y, w, h = box

                    track = track_history[track_id]
                    point = (float(x), float(y) + float(h/2))
                    track.append(point)

                    storage.append((track_id, class_names[class_id], point))

                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        else:
            break

    ids = list(set(item[0] for item in storage))

    cars, vans, buses = [], [], []

    for id in ids:
        desired_objects = [obj for obj in storage if obj[0] == id]

        class_count = {}  # создаем пустой словарь для подсчета частоты каждого класса

        for item in desired_objects:
            class_name = item[1]  # получаем класс из элемента
            if class_name in class_count:
                class_count[class_name] += 1  # увеличиваем счетчик, если класс уже есть в словаре
            else:
                class_count[class_name] = 1  # инициализируем счетчик для нового класса

        most_common_class = max(class_count, key=class_count.get)  # выбираем класс с наибольшей частотой

        class_name = most_common_class
        points_array = [item[2] for item in desired_objects]

        max_frames = 0

        for area in areas:
            real_coordinates_areas = convert_to_real_coordinates(area, frame_size)
            pts = np.array(real_coordinates_areas, np.int32)

            bbPath = mplPath.Path(pts)

            start_point, end_point = None, None
            track_frames_count = 0

            for point in points_array:
                if bbPath.contains_point(point):
                    track_frames_count += 1
                    if start_point is None:
                        start_point = point
                else:
                    if start_point is not None:
                        end_point = point

            if track_frames_count > max_frames:
                max_frames = track_frames_count

        speed = 0

        if max_frames > 0:
            speed = 20 / ((max_frames * FRAME_STEP) / fps) * 3.6

            if class_name == 'car':
                cars.append(speed)

            if class_name == 'bus':
                buses.append(speed)

            if class_name == 'truck' or class_name == 'tractor':
                vans.append(speed)

    result = (video_name + ';' + str(len(cars)) + ';' + str(round(np.mean(cars), 2)) + ';' + str(len(vans)) + ';' + str(round(np.mean(vans), 2)) + ';' + str(len(buses)) + ';' + str(round(np.mean(buses), 2)))

    print(result)

    f = open(RESULTS_PATH, 'a+')  # open file in append mode
    f.write(result + '\n')
    f.close()

    with open(WORKS_FILE_PATH, 'r') as file:
        lines = file.readlines()
        # Пропустить первую строку
        lines = lines[1:]

    with open(WORKS_FILE_PATH, 'w') as file:
        file.writelines(lines)

    cap.release()
    cv2.destroyAllWindows()

/home/jupyter/datasphere/project/CERT/markup/jsons/000.json
000;320;95.31;12;39.09;8;117.82
/home/jupyter/datasphere/project/CERT/markup/jsons/001.json
001;362;62.72;48;141.31;18;136.46
/home/jupyter/datasphere/project/CERT/markup/jsons/002.json
002;262;56.17;21;114.58;8;32.55
/home/jupyter/datasphere/project/CERT/markup/jsons/003.json
