In [58]:
# Итоговое задание по компьютерному зрению, ч. 2
# Колосов Максим

In [60]:
# Установка библиотек

!pip install lap ultralytics opencv-python



In [62]:
# Импорт библиотек

from ultralytics import YOLO
import cv2
import time
import numpy as np
import os

In [64]:
# Для выполнения задания взято видео с движущимися автомобилями в городе Владивостоке
# (видео из города Корсакова я не нашёл)

# Загрузим видео и узнаем, какие у него fps (количество кадров в секунду),
# ширина и высота

cap = cv2.VideoCapture("raw_video/raw_video.mp4")

fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

print(f"Параметры исходного видео:\nFPS: {int(fps)}\nWidth: {w}\nHeight: {h}")

Параметры исходного видео:
FPS: 25
Width: 1280
Height: 720


In [66]:
# Перепишем видео под меньший FPS, а разрешение оставим прежним

target_fps = 10
skip = max(1, round(fps / target_fps))
size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
folder_10fps_video = "10fps_video"
if not os.path.exists(folder_10fps_video):
    os.makedirs(folder_10fps_video)
out = cv2.VideoWriter('10fps_video/10fps_video.mp4', fourcc, target_fps, size)

frame_id = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_id += 1
    if frame_id % skip:
        continue
    small = cv2.resize(frame, size)
    out.write(small)

cap.release()
out.release()

In [68]:
# Модель
model = YOLO("yolov8l.pt")

# Трекер
tracker_config = "custom_botsort.yaml"

# Класс "car"
classes = (2,)

# Видео для чтения
cap = cv2.VideoCapture("10fps_video/10fps_video.mp4")

In [70]:
# Настройки для записи видео
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
folder_tracked_video = "tracked_video"
if not os.path.exists(folder_tracked_video):
    os.makedirs(folder_tracked_video)
out = cv2.VideoWriter("tracked_video/tracked_video.mp4", fourcc, fps, (w, h))

# Цвета для элементов
GREEN_COLOR = (0, 255, 0)  # Зелёный
BLUE_COLOR = (255, 0, 0)  # Синий
WHITE_COLOR = (255, 255, 255)  # Белый
RED_COLOR = (0, 0, 255)  # Красный

# Задаём отрезок для подсчёта пересечений
line_start = (778, 512)  # Начальная точка отрезка
line_end = (893, 403)    # Конечная точка отрезка

# Задаём точки красного четырёхугольника
quad_points = [
    (496, 570),  # Верхняя левая точка
    (575, 570),  # Верхняя правая точка
    (575, 710),  # Нижняя правая точка
    (392, 710)   # Нижняя левая точка
]
quad_contour = np.array(quad_points, dtype=np.int32)

# Функция для проверки нахождения точки внутри четырёхугольника
def point_in_polygon(point, poly):
    x, y = point
    n = len(poly)
    inside = False
    
    p1x, p1y = poly[0]
    for i in range(n + 1):
        p2x, p2y = poly[i % n]
        if y > min(p1y, p2y):
            if y <= max(p1y, p2y):
                if x <= max(p1x, p2x):
                    if p1y != p2y:
                        xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
                    if p1x == p2x or x <= xinters:
                        inside = not inside
        p1x, p1y = p2x, p2y
    
    return inside

# Функции для проверки пересечения
def on_segment(A, B, P):
    # Проверяем, лежит ли точка P на отрезке AB
    return min(A[0], B[0]) <= P[0] <= max(A[0], B[0]) and (min(A[1], B[1]) <= P[1] <= max(A[1], B[1]))

def segments_intersect(A, B, C, D):
    # Проверяем пересечение отрезков AB и CD
    def orientation(O, P, Q):
        # Вычисляем ориентацию тройки точек (O, P, Q)
        val = (P[1] - O[1]) * (Q[0] - P[0]) - (P[0] - O[0]) * (Q[1] - P[1])
        if val == 0: return 0  # Коллинеарны
        return 1 if val > 0 else -1  # По или против часовой стрелки

    o1 = orientation(A, B, C)
    o2 = orientation(A, B, D)
    o3 = orientation(C, D, A)
    o4 = orientation(C, D, B)

    # Общее условие пересечения
    if o1 != o2 and o3 != o4:
        return True

    # Проверка случаев, когда точки лежат на отрезках
    if o1 == 0 and on_segment(A, B, C): return True
    if o2 == 0 and on_segment(A, B, D): return True
    if o3 == 0 and on_segment(C, D, A): return True
    if o4 == 0 and on_segment(C, D, B): return True

    return False

# Счётчик пересечений, история треков и множество пересекших объектов
cross_count = 0
prev_centers = {}  # Для хранения предыдущих позиций (cx, cy) по ID
crossed_ids = set()  # Множество для хранения ID пересекших объектов
in_red_zone = {}  # Для отслеживания машин в красной зоне

# Обработка видео
try:
    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            break

        # Трекинг
        results = model.track(
            frame,
            persist=True,
            classes=classes,
            verbose=False,
            tracker=tracker_config
        )

        # Создаём чистую копию кадра для аннотаций
        annotated_frame = frame.copy()

        # Рисуем красный четырёхугольник
        cv2.polylines(
            img=annotated_frame,
            pts=[quad_contour],
            isClosed=True,
            color=RED_COLOR,
            thickness=2,
            lineType=cv2.LINE_AA
        )

        # Надпись RED ZONE
        cv2.putText(
            img=annotated_frame,
            text="RED ZONE",
            org=(440, 680),
            fontFace=cv2.FONT_HERSHEY_SIMPLEX,
            fontScale=0.7,
            color=RED_COLOR,
            thickness=1,
            lineType=cv2.LINE_AA
        )
        
        # Рисуем отрезок подсчёта
        cv2.line(
            img=annotated_frame,
            pt1=line_start,
            pt2=line_end,
            color=BLUE_COLOR,
            thickness=2,
            lineType=cv2.LINE_AA
        )
        
        # Отображаем счётчик
        cv2.putText(
            img=annotated_frame,
            text=f"Cross count: {cross_count}",
            org=(5, 710),
            fontFace=cv2.FONT_HERSHEY_SIMPLEX,
            fontScale=1,
            color=BLUE_COLOR,
            thickness=2,
            lineType=cv2.LINE_AA
        )
        
        # Обрабатываем только если есть обнаружения
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy().astype(int)
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            
            # Сбрасываем статус для всех машин перед новой проверкой
            current_frame_red_zone = set()
            
            for box, track_id in zip(boxes, track_ids):
                x1, y1, x2, y2 = box
                cx = (x1 + x2) // 2
                cy = (y1 + y2) // 2
                
                # Проверка пересечения отрезка
                if track_id in prev_centers:
                    prev_cx, prev_cy = prev_centers[track_id]
                    
                    # Проверяем пересечение траектории с отрезком
                    if segments_intersect(line_start, line_end, 
                                         (prev_cx, prev_cy), 
                                         (cx, cy)):
                        
                        # Вычисляем векторное произведение для определения стороны
                        Ax, Ay = line_start
                        Bx, By = line_end
                        # Вектор AB
                        AB = (Bx - Ax, By - Ay)
                        # Вектор от A до предыдущей точки
                        AP_prev = (prev_cx - Ax, prev_cy - Ay)
                        # Вектор от A до текущей точки
                        AP_curr = (cx - Ax, cy - Ay)
                        
                        # Векторное произведение AB x AP
                        cross_prev = AB[0] * AP_prev[1] - AB[1] * AP_prev[0]
                        cross_curr = AB[0] * AP_curr[1] - AB[1] * AP_curr[0]
                        
                        # Переход из отрицательной в положительную область
                        if cross_prev < 0 and cross_curr >= 0:
                            cross_count += 1
                            # Добавляем ID в множество пересекших объектов
                            if track_id not in crossed_ids:
                                crossed_ids.add(track_id)
                
                # Обновляем историю позиций
                prev_centers[track_id] = (cx, cy)
                
                # Проверяем, находится ли центр в красной зоне
                if point_in_polygon((cx, cy), quad_points):
                    current_frame_red_zone.add(track_id)
                    in_red_zone[track_id] = True
                else:
                    # Если машина вышла из зоны, сбрасываем статус
                    if track_id in in_red_zone:
                        in_red_zone[track_id] = False
                
                # Выбираем цвет в зависимости от факта пересечения
                if track_id in crossed_ids:
                    box_color = BLUE_COLOR
                    text_bg_color = BLUE_COLOR
                else:
                    box_color = BOX_COLOR
                    text_bg_color = BOX_COLOR
                
                # Рисуем bounding box
                cv2.rectangle(
                    img=annotated_frame,
                    pt1=(x1, y1),
                    pt2=(x2, y2),
                    color=box_color,
                    thickness=2,
                    lineType=cv2.LINE_AA
                )
                
                # Форматируем текст: ID, координаты центра
                text = f"ID:{track_id} ({cx},{cy})"
                
                # Рассчитываем размер текста
                text_size, baseline = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)
                text_w, text_h = text_size
                
                # Позиционирование текста (над bounding box)
                text_y = y1 - 5 if y1 - text_h - 5 > 0 else y1 + text_h + 20
                
                # Рисуем фон для текста
                cv2.rectangle(
                    img=annotated_frame,
                    pt1=(x1, text_y - text_h - 5),
                    pt2=(x1 + text_w, text_y + 5),
                    color=text_bg_color,
                    thickness=cv2.FILLED
                )
                
                # Рисуем текст
                cv2.putText(
                    img=annotated_frame,
                    text=text,
                    org=(x1, text_y),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=0.7,
                    color=WHITE_COLOR,
                    thickness=2,
                    lineType=cv2.LINE_AA
                )
            
            # Выводим сообщения о машинах в красной зоне
            red_zone_text_y = 100
            for track_id in current_frame_red_zone:
                text = f"Car ID:{track_id} is in RED ZONE"
                text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)[0]
                cv2.rectangle(
                    img=annotated_frame,
                    pt1=(10, red_zone_text_y - 30),
                    pt2=(20 + text_size[0], red_zone_text_y + 10),
                    color=(0, 0, 0),
                    thickness=cv2.FILLED
                )
                cv2.putText(
                    img=annotated_frame,
                    text=text,
                    org=(15, red_zone_text_y),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=0.9,
                    color=RED_COLOR,
                    thickness=2,
                    lineType=cv2.LINE_AA
                )
                red_zone_text_y += 40

        # Запись кадра
        out.write(annotated_frame)

finally:
    # Освобождение ресурсов
    cap.release()
    out.release()