In [None]:
!pip install ikomia



In [None]:
!pip install ultralytics deep_sort_realtime

Collecting ultralytics
  Downloading ultralytics-8.3.146-py3-none-any.whl.metadata (37 kB)
Collecting deep_sort_realtime
  Downloading deep_sort_realtime-1.3.2-py3-none-any.whl.metadata (12 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.146-py3-none-any.whl (1.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading deep_sort_realtime-1.3.2-py3-none-any.whl (8.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m82.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.14-py3-none-any.whl (26 kB)
Installing collected packages: deep_sort_realtime, ultralytics-thop, ultralytics
Successfully installed deep_sort_realtime-1.3.2 ultralytics-8.3.146 ultralytics-thop-2.0.14


# Задача 1

На основе алгоритма DeepSORT реализуйте возможность подсчета количества объектов и какого класса пересекает заданную линию в обеих направлениях.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def ccw(A, B, C):
    """Проверка направления поворота"""
    return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])

def intersect(A, B, C, D):
    """Пересекаются ли отрезки AB и CD"""
    return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D)

In [None]:
import cv2
import numpy as np
import csv
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from collections import defaultdict

# Функция проверки пересечения двух отрезков
def intersect(p1, p2, line_p1, line_p2):
    def ccw(A, B, C):
        return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])

    return ccw(p1, line_p1, line_p2) != ccw(p2, line_p1, line_p2) and \
           ccw(p1, p2, line_p1) != ccw(p1, p2, line_p2)

# Путь к видео
video_input = "/content/drive/MyDrive/3 курс/2 семестр/Машинное зрение/15/car_jaaam.mp4"
output_video = "counted_objects_output.mp4"

# Инициализация модели и трекера
model = YOLO("yolov8n.pt")
tracker = DeepSort(max_age=30)

# Словари и списки для хранения данных
object_positions = {}  # Хранит последние позиции треков
counts_left = defaultdict(int)  # Подсчет пересечений слева направо
counts_right = defaultdict(int)  # Подсчет пересечений справа налево
metadata_list = []  # Метаданные для CSV

# Открытие видеопотока
cap = cv2.VideoCapture(video_input)
if not cap.isOpened():
    print(f"Ошибка: Не удалось открыть видео {video_input}")
    exit()

# Параметры видео
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_rate = cap.get(cv2.CAP_PROP_FPS)

# Проверка корректности параметров
if frame_width == 0 or frame_height == 0:
    print("Ошибка: Неверные размеры видео")
    cap.release()
    exit()

# Определение вертикальной линии по центру
center_line_x = frame_width // 2
line_start = (center_line_x, 0)
line_end = (center_line_x, frame_height)

# Настройка выходного видео
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, frame_rate, (frame_width, frame_height))

current_frame = 0
while True:
    success, frame = cap.read()
    if not success:
        print(f"Инфо: Конец видео или ошибка на кадре {current_frame}")
        break

    # Детекция объектов с помощью YOLOv8
    results = model(frame)[0]
    detected_objects = []
    for detection in results.boxes.data.tolist():
        x1, y1, x2, y2, confidence, class_id = detection
        class_label = model.names[int(class_id)]
        detected_objects.append(([x1, y1, x2 - x1, y2 - y1], confidence, class_label))

    # Обновление треков
    tracked_objects = tracker.update_tracks(detected_objects, frame=frame)

    # Обработка треков
    for track in tracked_objects:
        if not track.is_confirmed():
            continue

        track_id = track.track_id
        bbox = track.to_ltrb()
        x1, y1, x2, y2 = map(int, bbox)
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2
        class_label = track.get_det_class()

        # Сохранение метаданных
        metadata_list.append([current_frame, track_id, class_label, center_x, center_y])

        # Обновление позиций трека
        if track_id not in object_positions:
            object_positions[track_id] = []
        object_positions[track_id].append((center_x, center_y))

        # Проверка пересечения вертикальной линии
        if len(object_positions[track_id]) > 2:
            object_positions[track_id] = object_positions[track_id][-2:]
            prev_pos, curr_pos = object_positions[track_id]
            if intersect(prev_pos, curr_pos, line_start, line_end):
                if prev_pos[0] > curr_pos[0]:  # Справа налево
                    counts_left[class_label] += 1
                else:  # Слева направо
                    counts_right[class_label] += 1
                object_positions[track_id] = [(0, 0)]  # Сброс после пересечения

        # Отрисовка bounding box и идентификатора
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, f"{class_label} ID:{track_id}", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Отрисовка счетчиков
    y_offset = 30
    all_classes = set(counts_left.keys()) | set(counts_right.keys())
    for cls in sorted(all_classes):
        text = f"{cls}: Left: {counts_left[cls]} Right: {counts_right[cls]}"
        cv2.putText(frame, text, (30, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        y_offset += 25

    # Отрисовка вертикальной линии
    cv2.line(frame, line_start, line_end, (0, 0, 255), 2)

    # Сохранение кадра
    out.write(frame)
    current_frame += 1

# Освобождение ресурсов
cap.release()
out.release()

# Сохранение метаданных в CSV
with open("object_tracking_data.csv", "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["Frame", "ObjectID", "Class", "CenterX", "CenterY"])
    writer.writerows(metadata_list)

# Вывод итоговой статистики
print("Итоговая статистика:")
print("Слева направо:", dict(counts_right) if counts_right else "Нет пересечений")
print("Справа налево:", dict(counts_left) if counts_left else "Нет пересечений")

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 16.4MB/s]



0: 384x640 7 cars, 1 truck, 252.3ms
Speed: 9.6ms preprocess, 252.3ms inference, 23.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 cars, 1 truck, 198.7ms
Speed: 3.2ms preprocess, 198.7ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 188.5ms
Speed: 3.0ms preprocess, 188.5ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 199.3ms
Speed: 3.3ms preprocess, 199.3ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 204.0ms
Speed: 3.2ms preprocess, 204.0ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 124.2ms
Speed: 2.9ms preprocess, 124.2ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 cars, 1 truck, 128.2ms
Speed: 3.8ms preprocess, 128.2ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 cars, 1 truck, 132.5ms
Speed: 3.1ms prep

In [None]:
from google.colab import drive
from IPython.display import Video
import cv2
import os

# Монтируем Google Drive
drive.mount('/content/drive')

# Путь к видео
video_path = '/content/drive/MyDrive/3 курс/2 семестр /Машинное зрение /15/counted_output.avi'

# Проверка существования файла
if not os.path.exists(video_path):
    print(f"Ошибка: Файл {video_path} не существует")
else:
    # Проверка возможности открытия видео
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Ошибка: Не удалось открыть видео {video_path}. Попробуем перекодировать...")
        !apt-get install ffmpeg
        !ffmpeg -i "{video_path}" -c:v libx264 -c:a aac /content/converted_video.mp4
        video_path = '/content/converted_video.mp4'
    else:
        # Проверка параметров видео
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = frame_count / fps
        print(f'FPS: {fps}, Frames: {frame_count}, Duration: {duration:.2f} seconds')
        cap.release()

        # Конвертация, если не соответствует требованиям (10 секунд, 100–150 кадров)
        if duration > 13 or fps > 30:
            print("Конвертируем видео до 10 секунд и 10 FPS...")
            !apt-get install ffmpeg
            !ffmpeg -i "{video_path}" -t 10 -r 30 /content/output_10fps.mp4
            video_path = '/content/output_10fps.mp4'

# Отображение видео
Video(video_path, embed=True, width=640, height=360, html_attributes="controls autoplay")

Output hidden; open in https://colab.research.google.com to view.

# Задача 2

На основе алгоритма DeepSORT реализуйте сохранение метаданных трекинга в файл (id, координаты, класс)

#Задача 3

Выведите время жизни каждого трека. То есть отслеживаете отдельный id и считаете его время жизни.