Задание 1

In [36]:
from ultralytics import YOLO
import numpy as np
import cv2

In [37]:
# Загружаем модель YOLOv8x
model = YOLO("yolov8x.pt")

# Загружаем видео
video = cv2.VideoCapture("test_video_short.mp4")
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

track_ids = {}
kalman_filters = {} # фильтр Калмана делает отслеживание более точным и плавным

# Создание видеозаписи
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer = cv2.VideoWriter('output.mp4', fourcc, 20.0, (width, height))
   

In [None]:
while(video.isOpened()):
    ret, frame = video.read()
    if not ret:
        break

    results = model.predict(frame, classes=[0], conf=0.2, iou=0.5)

    # Получаем координаты распознанных людей
    detections = results[0].boxes.xyxy.cpu().numpy().astype(int)

    # Обновляем словарь track_ids
    for det in detections:
        x1, y1, x2, y2 = det
        cur_center_x = (x1 + x2) // 2
        cur_center_y = (y1 + y2) // 2

        # Проверяем, есть ли уже ID для этого человека
        found_id = False
        for id, (prev_center_x, prev_center_y) in track_ids.items():
            # Находим расстояние 
            distance = np.sqrt((cur_center_x - prev_center_x)**2 + (cur_center_y - prev_center_y)**2)
            if distance < 50:
                track_ids[id] = (cur_center_x, cur_center_y)
                found_id = True
                break

        # Если ID не найден, создаем новый
        if not found_id:
            new_id = len(track_ids) + 1
            track_ids[new_id] = (cur_center_x, cur_center_y)
            # Создаем Kalman filter для нового объекта
            kalman_filters[new_id] = cv2.KalmanFilter(4, 2)
            # Инициализируем фильтр
            kalman_filters[new_id].measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
            kalman_filters[new_id].transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
            # Начальное состояние
            kalman_filters[new_id].statePre = np.array([[cur_center_x], [cur_center_y], [0], [0]], np.float32)
            kalman_filters[new_id].statePost = np.array([[cur_center_x], [cur_center_y], [0], [0]], np.float32)

    # Обновляем Kalman filter для каждого объекта
    for id in track_ids:
        # Если объект был обнаружен в текущем кадре
        if id in track_ids:
            # Получаем центр объекта
            cur_center_x, cur_center_y = track_ids[id]
            # Обновляем Kalman filter
            measurement = np.array([[cur_center_x], [cur_center_y]], np.float32)
            kalman_filters[id].correct(measurement)

            prediction = kalman_filters[id].predict()
            track_ids[id] = (int(prediction[0]), int(prediction[1]))

    # Рисуем прямоугольники и ID
    for id, (cur_center_x, cur_center_y) in track_ids.items():
        for det in detections:
            x1, y1, x2, y2 = det
            if (x1 < cur_center_x < x2) and (y1 < cur_center_y < y2):
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"ID: {id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    cv2.putText(frame, f'People: {len(detections)}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow("Video", frame)
    writer.write(frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
writer.release()
cv2.destroyAllWindows()

Задание 2

In [76]:
# Загружаем модель YOLOv8x
model = YOLO("yolov8x.pt")

# Загружаем видео
video = cv2.VideoCapture("test_video_short.mp4")
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))

track_ids = {}
helmet_status = {}
no_helmet_trajectories = {}
# Цветовые модели HSV
hsv_orange_lower = np.array([10, 100, 100], dtype="uint8")
hsv_orange_upper = np.array([25, 255, 255], dtype="uint8")
hsv_green_lower = np.array([40, 50, 50], dtype="uint8")
hsv_green_upper = np.array([80, 255, 255], dtype="uint8")

# Создание видеозаписи
fourcc = cv2.VideoWriter_fourcc(*'XVID')
writer = cv2.VideoWriter('output.mp4', fourcc, 20.0, (width, height))
   

In [77]:
def person_has_helmet(det, frame):
    x1, y1, x2, y2 = det
    person_roi = frame[y1:int(y1 + (y2 - y1) / 3), x1:x2]  
    hsv = cv2.cvtColor(person_roi, cv2.COLOR_BGR2HSV)  
    mask_orange = cv2.inRange(hsv, hsv_orange_lower, hsv_orange_upper)
    mask_green = cv2.inRange(hsv, hsv_green_lower, hsv_green_upper)     
    return np.sum(mask_orange) > 100 or np.sum(mask_green) > 100

In [78]:
while(video.isOpened()):
    ret, frame = video.read()
    if not ret:
        break

    results = model.predict(frame, classes=[0], conf=0.2, iou=0.4)
    detections = results[0].boxes.xyxy.cpu().numpy().astype(int)

    for det in detections:
        x1, y1, x2, y2 = det
        cur_center_x = (x1 + x2) // 2
        cur_center_y = (y1 + y2) // 2

        has_helmet = person_has_helmet(det, frame)

        found_id = False
        for id, (prev_center_x, prev_center_y) in track_ids.items():
            distance = np.sqrt((cur_center_x - prev_center_x)**2 + (cur_center_y - prev_center_y)**2)
            if distance < 50:
                track_ids[id] = (cur_center_x, cur_center_y)
                helmet_status[id] = has_helmet 
                found_id = True
                break

        if not found_id:
            new_id = len(track_ids) + 1
            track_ids[new_id] = (cur_center_x, cur_center_y)
            helmet_status[new_id] = has_helmet  

        if id in helmet_status and not helmet_status[id]:
            if id not in no_helmet_trajectories:
                no_helmet_trajectories[id] = []
            no_helmet_trajectories[id].append((cur_center_x, cur_center_y))

    # Отрисовка результатов на кадре
    for id, (cur_center_x, cur_center_y) in track_ids.items():
        for det in detections:
            x1, y1, x2, y2 = det
            if (x1 < cur_center_x < x2) and (y1 < cur_center_y < y2):
                if id in helmet_status and helmet_status[id]:  
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"ID: {id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                else:
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                    cv2.putText(frame, f"ID: {id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)

    # Отрисовка траекторий сотрудников без касок
    for id, trajectory in no_helmet_trajectories.items():
        for i in range(1, len(trajectory)):
            # Рисуем линии только для точек одной траектории
            cv2.line(frame, trajectory[i-1], trajectory[i], (0, 0, 255), 2)

    # Вывод результата на экран
    cv2.imshow("Video", frame)
    writer.write(frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
writer.release()
cv2.destroyAllWindows()



0: 384x640 6 persons, 1723.6ms
Speed: 2.0ms preprocess, 1723.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1710.6ms
Speed: 2.0ms preprocess, 1710.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1705.5ms
Speed: 2.0ms preprocess, 1705.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1703.5ms
Speed: 2.0ms preprocess, 1703.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1705.6ms
Speed: 2.0ms preprocess, 1705.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1704.5ms
Speed: 3.0ms preprocess, 1704.5ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1716.6ms
Speed: 1.0ms preprocess, 1716.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 1700.9ms
Speed: 2.0ms preprocess, 1700.9ms inference, 2.0ms 