In [1]:
import pandas as pd
import cv2
import numpy as np
from ultralytics import YOLO
from ultralytics.engine.results import Results, Boxes
import matplotlib.pyplot as plt
from typing import List
import torch

torch.cuda.is_available()

True

In [8]:
model_path = './models/L-30ep_new_ds.pt'
classes = {0: 'divider-line', 1: 'dotted-line', 2: 'double-line', 3: 'random-line', 4: 'road-sign-line', 5: 'solid-line', 6: 'road-A'}
name_to_class = {v: k for k,v in classes.items()}
name_to_class

{'divider-line': 0,
 'dotted-line': 1,
 'double-line': 2,
 'random-line': 3,
 'road-sign-line': 4,
 'solid-line': 5,
 'road-A': 6}

In [9]:
model = YOLO(model_path, task='detect')
model

YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(128, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C3k2(
        (cv1): Conv(
          (conv): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(128, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(256, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(256, eps=0.001, momentum=0.03, affine=True, track_

In [10]:
def put_text_on_image(img: np.ndarray, text: str, color: tuple, line: int=0) -> np.ndarray:
	# Определяем позицию текста (левый верхний угол)
	position = (10, 30+30*line)  # Координаты (x, y)

	# Определяем шрифт, размер, цвет и толщину текста
	font = cv2.FONT_HERSHEY_COMPLEX
	font_scale = 1
	thickness = 2

	# Добавляем текст на изображение
	return cv2.putText(img, text, position, font, font_scale, color, thickness, lineType=cv2.LINE_AA)

In [11]:
def public_trafic_driving(frame: Boxes) -> bool:
    """
    Проверка, если бокс с классом 'road-A' находится примерно по центру экрана.

    Args:
        frame (Boxes): размеченный кадр.
    """
    road_a_class = name_to_class['road-A']

    # Проверка наличия нужного класса
    if road_a_class not in frame.cls:
        return False

    # Проход по всем боксам
    for box in frame:
        if box.cls[0] != road_a_class:
            continue

        # Координаты центра бокса
        box_coords = box.xyxyn[0]
        box_center_x = (box_coords[2] + box_coords[0]) / 2
        box_center_y = (box_coords[3] + box_coords[1]) / 2

        # Проверка, если центр бокса примерно по центру экрана
        if 0.4 <= box_center_x <= 0.6 and 0.3 <= box_center_y <= 0.7:
            return True

    return False


In [12]:
def solid_line_crossing(frame: Boxes) -> bool:
    """
    Проверка переезда сплошной линии.

    Args:
        frame (Boxes): размеченный кадр.
    """
    solid_line_class = name_to_class['solid-line']
    double_solid_line_class = name_to_class['double-line']

    # Проверка наличия сплошных линий в кадре
    if not (solid_line_class in frame.cls or double_solid_line_class in frame.cls):
        return False

    # Проход по всем боксам
    for box in frame:
        if box.cls[0] not in (solid_line_class, double_solid_line_class):
            continue

        # Координаты бокса
        box_coords = box.xyxyn[0]
        box_center_x = (box_coords[2] + box_coords[0]) / 2
        box_center_y = (box_coords[3] + box_coords[1]) / 2

        # Пропускаем, если линия далеко от центра по вертикали
        if box_center_y >= 0.66:
            continue

        # Определение правой или левой линии по позиции x-координаты
        is_right_line = box_coords[0] >= 0.5
        is_left_line = box_coords[2] < 0.5

        # Проверка нарушения
        # Для правой линии проверяем, что она по центру или левее
        if is_right_line and box_center_x <= 0.5:
            return True
        # Для левой линии проверяем, что она по центру или правее
        elif is_left_line and box_center_x >= 0.5:
            return True

    return False


In [13]:
import cv2
import numpy as np

BOX_COLORS=np.random.uniform(0, 255, size=(len(model.names), 3))

def draw_bounding_boxes(result: Results, ):
    """
    Отрисовывает bounding boxes на изображении.

    :param result: Результат детекции модели YOLO, содержащий bounding boxes, классы и вероятности.
    :param image: Исходное изображение в формате NumPy.
    :return: Изображение с нарисованными bounding boxes в формате NumPy.
    """
    result = result.cpu().numpy()
    # Копируем изображение, чтобы не изменять оригинал
    image_with_boxes = result.orig_img

    # Предполагается, что result содержит списки boxes, scores и class_ids
    boxes = result.boxes.xyxy  # Список координат bounding boxes
    scores = result.boxes.conf  # Список вероятностей
    class_ids = result.boxes.cls.astype(int)  # Список идентификаторов классов

    for box, score, class_id in zip(boxes, scores, class_ids):
        # Получаем координаты bounding box
        x1, y1, x2, y2 = map(int, box)


        # Определяем цвет для текущего класса
        color = BOX_COLORS[class_id%len(BOX_COLORS)]

        # Рисуем прямоугольник
        cv2.rectangle(image_with_boxes, (x1, y1), (x2, y2), color, 2)

        # Подписываем bounding box
        class_id = classes[class_id]
        label = f'Class {class_id}: {score:.2f}'
        cv2.putText(image_with_boxes, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    return image_with_boxes

In [18]:
import cv2
import os
from tqdm import tqdm
import time
import csv


def process_image(input_video_path: str, output_video_path: str, model: YOLO, every_frame: int=5, csv_file_path: str="violations.csv"):
	# Получаем список файлов изображений в указанной папке

	video = cv2.VideoCapture(input_video_path)
	width, height = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
	fps = video.get(cv2.CAP_PROP_FPS)
	video_name = os.path.splitext(os.path.basename(input_video_path))[0]

	# Создаем объект VideoWriter для записи видео
	fourcc = cv2.VideoWriter_fourcc(*'mp4v')
	out_fps = fps # ФПС в выходном файле
	output_video = cv2.VideoWriter(output_video_path, fourcc, out_fps, (width, height))
	color = {False: (0, 0, 255), True: (0, 255, 0)}

	# Для хранения времени последнего обнаруженного нарушения
	last_violation_time = {"solid_line": -5, "public_trafic": -5}

	# Проверяем наличие CSV файла, если нет, создаем и записываем заголовок
	if not os.path.exists(csv_file_path):
		with open(csv_file_path, mode='w', newline='') as file:
			writer = csv.writer(file)
			writer.writerow(["video_name", "violation", "time"])


	frame_cnt = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
	print(frame_cnt, fps)
	print('Старт')
	# Проходим по всем изображениям и добавляем их в видео
	for i_frame in tqdm(range(frame_cnt)):
		ret, frame = video.read()
		if i_frame%every_frame!=0:
			continue
		if not ret: break
		result = model(frame, verbose=False, conf=0.75, device=0)[0]
		box = result.boxes
		# прогон изображениям по различным нарушениям
		solid_line_result = solid_line_crossing(box)
		public_trafic_result = public_trafic_driving(box)

		# Отметка времени в секундах
		current_time_sec = int(i_frame / fps)

		# Условие для записи в CSV
		with open(csv_file_path, mode='a', newline='') as file:
			writer = csv.writer(file)

      # Запись нарушений с учетом интервала в 5 секунд
			if solid_line_result and current_time_sec - last_violation_time["solid_line"] >= 5:
				writer.writerow([video_name, "Статья 12.17  часть 1.1 и 1.2. движение транспортных средств по полосе для маршрутных транспортных средств или остановка на указанной полосе в нарушение Правил дорожного движения", current_time_sec])
				last_violation_time["solid_line"] = current_time_sec

			if public_trafic_result and current_time_sec - last_violation_time["public_trafic"] >= 5:
				writer.writerow([video_name, "Статья 12.16. часть 1 Несоблюдение требований, предписанных дорожными знаками или разметкой проезжей части дороги", current_time_sec])
				last_violation_time["public_trafic"] = current_time_sec

		# отрисовка боксов
		frame = draw_bounding_boxes(result)

		# маркеры нарушений
		frame = put_text_on_image(frame, "Сплошная: "+str(solid_line_result), color[solid_line_result])
		frame = put_text_on_image(frame, "Езда по полосе для общ. транспорта: "+str(public_trafic_result), color[public_trafic_result], line=1)

		# запись видео
		output_video.write(frame)

	# Освобождаем ресурсы
	video.release()
	print(f"Video saved to {output_video_path}")
	print(f"Violations saved to {csv_file_path}")

# Пример использования
start = time.time()
input_video_path = '/content/4summerevning.mp4'
output_video_path = './output.mp4'
#process_image(input_video_path, output_video_path, model=model)
print(time.time()-start, ' секунд')

0.0  секунд


In [15]:
import os 

dir = './drive-download-20241110T073607Z-001/'
files = os.listdir(dir)
files

['akn00007_3C5sneri.mov', 'akn00083_65POhPKk.mov']

In [16]:
os.path.join(dir, files[0])

'./drive-download-20241110T073607Z-001/akn00007_3C5sneri.mov'

In [19]:
for i in files:
    process_image(os.path.join(dir, i), './output3.mp4', model=model)

8992 29.97002997002997
Старт


 17%|█▋        | 1505/8992 [00:35<02:58, 42.03it/s]


KeyboardInterrupt: 