In [2]:
from google.colab import drive
drive.mount('/content/drive')

video_path = '/content/drive/MyDrive/Практика_ЦК/202505261047-202505261057.mp4'

Mounted at /content/drive


In [4]:
!pip install opencv-python
!pip install opencv-python-headless
!pip install filterpy
!pip install ipywidgets
!pip install ultralytics
!wget https://raw.githubusercontent.com/abewley/sort/master/sort.py

Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m15.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: filterpy
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
  Created wheel for filterpy: filename=filterpy-1.4.5-py3-none-any.whl size=110460 sha256=468cf813f2ad025ba5efbaa16e1abf061630003096c953825874e96f1848dc01
  Stored in directory: /root/.cache/pip/wheels/12/dc/3c/e12983eac132d00f82a20c6cbe7b42ce6e96190ef8fa2d15e1
Successfully built filterpy
Installing collected packages: filterpy
Successfully installed filterpy-1.4.5
Collecting ultralytics
  Downloading ultralytics-8.3.148-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.

In [5]:
max_frames = 2000   # кол-во кадров в финальном видео

In [7]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import torch
from pathlib import Path
from ultralytics import YOLO
from sort import Sort   # в sort.py закомментить строки с 22 по 25 про matplotlib
import numpy as np
from IPython.display import display, Video as ipyVideo
import math
import ipywidgets as widgets
from IPython.display import display, clear_output
import matplotlib.pyplot as plt

Вспомогательные функции сглаживания характеристик

In [8]:
def smooth_centers(points, window=3):
    smoothed = []
    for i in range(len(points)):
        if i < window:
            smoothed.append(points[i]["center"])
        else:
            avg_x = np.mean([p["center"][0] for p in points[i-window:i+1]])
            avg_y = np.mean([p["center"][1] for p in points[i-window:i+1]])
            smoothed.append((avg_x, avg_y))
    return smoothed

def recompute_speed_acceleration(track_data, fps, height, delta=10, smoothing_window=3):
    for track_id, points in track_data.items():
        if len(points) < delta + 1:
            continue

        centers = smooth_centers(points, window=smoothing_window)
        for i, pt in enumerate(points):
            pt["center_smoothed"] = centers[i]

        for i in range(delta, len(points)):
            curr = points[i]
            prev = points[i - delta]
            cx1, cy1 = prev["center_smoothed"]
            cx2, cy2 = curr["center_smoothed"]
            dx = cx2 - cx1
            dy = cy2 - cy1
            dist = (dx**2 + dy**2)**0.5

            relative_y = cy2 / height
            meters_per_pixel = get_meters_per_pixel(relative_y)
            dist_meters = dist * meters_per_pixel

            dt = (curr["frame"] - prev["frame"]) / fps
            if dt > 0:
                curr["speed"] = dist_meters / dt

        for i in range(delta * 2, len(points)):
            s1 = points[i - delta].get("speed")
            s2 = points[i].get("speed")
            t1 = points[i - delta]["frame"]
            t2 = points[i]["frame"]
            if s1 is not None and s2 is not None:
                dt = (t2 - t1) / fps
                if dt > 0:
                    acc = (s2 - s1) / dt
                    points[i]["acceleration"] = acc

    return track_data

In [9]:
def recompute_acceleration_smoothed(track_data, fps, window=5):
    for track_id, points in track_data.items():
        speeds = [p["speed"] if p["speed"] is not None else 0.0 for p in points]

        for i in range(len(points)):
            if i < window or i + window >= len(points):
                points[i]["acceleration"] = None
                continue

            past = speeds[i - window:i]
            future = speeds[i + 1:i + window + 1]

            if not past or not future:
                points[i]["acceleration"] = None
                continue

            v_before = np.mean(past)
            v_after = np.mean(future)
            dt = (window * 2) / fps

            acceleration = (v_after - v_before) / dt

            if abs(acceleration) > 10:
                acceleration = None

            points[i]["acceleration"] = acceleration

    return track_data

def smooth_acceleration(track_data, window=5):
    for points in track_data.values():
        accs = [p.get("acceleration") for p in points]
        smoothed = []
        for i in range(len(accs)):
            window_vals = accs[max(0, i - window + 1): i + 1]
            valid_vals = [a for a in window_vals if a is not None]
            if valid_vals:
                smoothed.append(sum(valid_vals) / len(valid_vals))
            else:
                smoothed.append(None)
        for i, val in enumerate(smoothed):
            points[i]["acceleration"] = val

    return track_data

In [10]:
def smooth_direction(track_data, window=5):
    for track_id, points in track_data.items():
        directions = [p.get("direction") for p in points]
        for i in range(len(points)):
            valid = [d for d in directions[max(0, i-window+1):i+1] if d is not None]
            if valid:
                points[i]["direction_smoothed"] = np.mean(valid)
            else:
                points[i]["direction_smoothed"] = None
    return track_data


def extract_events_interval(points, fps, accel_threshold=-2.5, turn_threshold=30, window=5, min_gap=10):
    events = []
    last_frame = -min_gap

    for i in range(window, len(points) - window):
        frame = points[i]["frame"]

        # Резкое торможение (как есть)
        acc = points[i].get("acceleration")
        if acc is not None and acc < accel_threshold and frame - last_frame >= min_gap:
            events.append((frame, "Резкое торможение"))
            last_frame = frame

        # Поворот по разности усреднённого направления
        past = [p.get("direction_smoothed") for p in points[i-window:i] if p.get("direction_smoothed") is not None]
        future = [p.get("direction_smoothed") for p in points[i+1:i+window+1] if p.get("direction_smoothed") is not None]
        if not past or not future:
            continue
        d1 = np.mean(past)
        d2 = np.mean(future)
        delta = abs(d2 - d1)
        delta = min(delta, 360 - delta)

        if delta > turn_threshold and frame - last_frame >= min_gap:
            events.append((frame, f"Поворот (Δθ={delta:.1f}°)"))
            last_frame = frame

    return events

Основной блок с обработкой

In [26]:
# Запрос выбора модели у пользователя
selection = input("Введите 1 для YOLOv11 (дообученная) или 2 для YOLOv5 (предобученная): ")

# Инициализация модели по выбору
if selection == "2":
    print("Загружается YOLOv5 (предобученная)...")
    model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    model.classes = [2, 5, 7]  # car, bus, truck
    class_names = model.names

else:
    print("Загружается дообученная YOLOv11...")
    model_path = '/content/drive/MyDrive/Практика_ЦК/models/best.pt'
    model = YOLO(model_path)
    class_names = model.model.names

# Назначение цветов для классов
class_colors = {
    'car': (0, 255, 255),
    'truck': (255, 64, 0),
    'bus': (0, 181, 26)
}

Введите 1 для YOLOv11 (дообученная) или 2 для YOLOv5 (предобученная): 1
Загружается дообученная YOLOv11...


In [28]:
input_video = video_path
input_dir = str(Path(input_video).parent)
output_video = str(Path(input_dir) / 'processed_video.mp4')

cap = cv2.VideoCapture(input_video)
fps = cap.get(cv2.CAP_PROP_FPS)
new_width, new_height = 640, 480
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps, (new_width, new_height))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f'Обработка видео: {frame_count} кадров, {fps} FPS')

processed_frames = 0

tracker = Sort()
track_data = {}

def compute_iou(box1, box2):
    x1, y1, x2, y2 = box1
    x1_p, y1_p, x2_p, y2_p = box2
    xi1, yi1 = max(x1, x1_p), max(y1, y1_p)
    xi2, yi2 = min(x2, x2_p), min(y2, y2_p)
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_p - x1_p) * (y2_p - y1_p)
    union_area = box1_area + box2_area - inter_area
    if union_area == 0:
        return 0
    return inter_area / union_area

def get_meters_per_pixel(relative_y):
    return 0.898 * relative_y**2 - 1.38 * relative_y + 0.56   # коэф-ы расчитаны вручную на основе входного видео

while processed_frames < max_frames:
    ret, frame = cap.read()
    if not ret:
        break

    frame = cv2.resize(frame, (new_width, new_height))
    results = model(frame)

    detections = []
    if selection == "2":  # YOLOv5
        detections = results.xyxy[0].cpu().numpy()
    else:  # YOLOv11
        result = results[0]
        for box in result.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().item()
            cls = box.cls[0].cpu().item()
            detections.append([x1, y1, x2, y2, conf, cls])
        detections = np.array(detections)

    dets_for_sort = []
    det_classes = []
    for det in detections:
        x1, y1, x2, y2, conf, cls = det
        dets_for_sort.append([x1, y1, x2, y2, conf])
        det_classes.append((x1, y1, x2, y2, class_names[int(cls)], conf))

    dets_for_sort = np.array(dets_for_sort)
    tracked_objects = tracker.update(dets_for_sort)

    for obj in tracked_objects:
        x1, y1, x2, y2, track_id = obj
        x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
        cx = (x1 + x2) / 2
        cy = (y1 + y2) / 2
        center = (cx, cy)

        if track_id not in track_data:
            track_data[track_id] = []

        last_point = track_data[track_id][-1] if track_data[track_id] else None
        speed = None
        acceleration = None
        direction = None

        relative_y = cy / new_height
        meters_per_pixel = get_meters_per_pixel(relative_y)

        if last_point:
            dx = center[0] - last_point['center'][0]
            dy = center[1] - last_point['center'][1]
            dist_pixels = (dx**2 + dy**2)**0.5
            dist_meters = dist_pixels * meters_per_pixel
            frame_diff = processed_frames - last_point['frame']
            time_diff = frame_diff / fps if frame_diff > 0 else 1 / fps
            speed = dist_meters / time_diff

            # Ускорение
            if last_point.get("speed") is not None and speed is not None:
                acceleration = (speed - last_point["speed"]) / time_diff
                if abs(acceleration) > 10:
                    acceleration = None

            # Направление (в градусах)
            motion_threshold = 2.0  # минимальное перемещение в пикселях для расчёта направления
            if dx**2 + dy**2 >= motion_threshold**2:
                angle_rad = np.arctan2(dy, dx)
                direction = np.degrees(angle_rad)
            else:
                direction = None  # слишком мало движения — игнорируем

        track_data[track_id].append({
            'frame': processed_frames,
            'center': center,
            'speed': speed,
            'acceleration': acceleration,
            'direction': direction
        })

        # Сглаживание скорости
        recent_speeds = [p.get('speed') for p in track_data[track_id][-3:] if p.get('speed') is not None]
        smoothed_speed = sum(recent_speeds) / len(recent_speeds) if recent_speeds else None
        if smoothed_speed is not None and smoothed_speed > 25:
            smoothed_speed = None

        matched_class = "object"
        color = (0, 255, 0)
        iou_threshold = 0.3
        for det in det_classes:
            dx1, dy1, dx2, dy2, cls_name, conf = det
            iou = compute_iou([x1, y1, x2, y2], [dx1, dy1, dx2, dy2])
            if iou > iou_threshold:
                matched_class = f"{cls_name} {conf:.2f}"
                color = class_colors.get(cls_name, (0, 255, 0))
                break

        label = f"ID {int(track_id)} | {matched_class}"
        if smoothed_speed is not None:
            label += f" | {smoothed_speed:.1f} m/s"

        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        directions = [p.get("direction") for p in track_data[int(track_id)][-5:] if p.get("direction") is not None]
        if directions:
            smoothed_dir = np.mean(directions)
            angle_rad = np.radians(smoothed_dir)
            length = 30
            x_start, y_start = int(cx), int(cy)
            x_end = int(x_start + length * np.cos(angle_rad))
            y_end = int(y_start + length * np.sin(angle_rad))
            cv2.arrowedLine(frame, (x_start, y_start), (x_end, y_end), color, 2, tipLength=0.3)

    out.write(frame)
    processed_frames += 1

cap.release()
out.release()

print('Видео сохранено:', output_video)
display(ipyVideo(output_video))

track_data = recompute_speed_acceleration(track_data, fps, height=new_height)

track_data = recompute_acceleration_smoothed(track_data, fps)

track_data = smooth_acceleration(track_data)

track_data = smooth_direction(track_data)

[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
Speed: 2.0ms preprocess, 9.0ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 2 trucks, 9.1ms
Speed: 3.0ms preprocess, 9.1ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 bus, 13 cars, 1 truck, 9.4ms
Speed: 1.8ms preprocess, 9.4ms inference, 1.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 1 truck, 10.6ms
Speed: 1.7ms preprocess, 10.6ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 1 truck, 9.9ms
Speed: 1.7ms preprocess, 9.9ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 1 truck, 9.7ms
Speed: 1.7ms preprocess, 9.7ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars, 1 truck, 9.8ms
Speed: 1.6ms preprocess, 9.8ms inference, 1.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 13 cars,

Отрисовка графиков + события

In [29]:
def enrich_with_direction(track_data, new_key='direction'):
    for track_id, points in track_data.items():
        for i in range(len(points)):
            if i == 0:
                points[i][new_key] = None
                continue

            x1, y1 = points[i - 1]['center']
            x2, y2 = points[i]['center']
            dx = x2 - x1
            dy = y2 - y1

            angle_rad = np.arctan2(dy, dx)
            angle_deg = np.degrees(angle_rad)

            points[i][new_key] = angle_deg

    return track_data

In [30]:
enrich_with_direction(track_data)

available_ids = sorted([int(k) for k in track_data.keys() if len(track_data[k]) >= 5])
dropdown = widgets.Dropdown(
    options=available_ids,
    description='Track ID:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)

output = widgets.Output()

def on_track_id_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        track_id = change['new']
        with output:
            clear_output(wait=True)

            data = track_data.get(track_id, [])
            if not data:
                print(f"ID {track_id} не найден.")
                return

            frames = [p["frame"] for p in data if p.get("speed") is not None]
            speeds = [p["speed"] for p in data if p.get("speed") is not None]
            accels = [p.get("acceleration") for p in data if p.get("acceleration") is not None]
            accel_frames = [p["frame"] for p in data if p.get("acceleration") is not None]

            fig, ax = plt.subplots(figsize=(12, 6))
            if frames:
                ax.plot(frames, speeds, label="Скорость (м/с)", linewidth=2)
            if accel_frames:
                ax.plot(accel_frames, accels, '--', label="Ускорение (м/с²)", alpha=0.7)

            ax.set_title(f"График движения объекта ID {track_id}")
            ax.set_xlabel("Кадр")
            ax.set_ylabel("Значение")
            ax.legend()
            ax.grid(True)
            fig.tight_layout()
            display(fig)
            plt.close(fig)

            events = extract_events_interval(data, fps)
            if events:
                print(f"\nОбнаруженные события для ID {track_id}:")
                for frame, event in events:
                    print(f"Кадр {frame}: {event}")
            else:
                print("\nСобытия не обнаружены.")

dropdown.observe(on_track_id_change)

display(dropdown, output)

Dropdown(description='Track ID:', layout=Layout(width='50%'), options=(1863, 1864, 1865, 1866, 1867, 1868, 186…

Output()

In [31]:
#enrich_with_direction(track_data)

dropdown = widgets.Dropdown(
    options=available_ids,
    description='Track ID:',
    style={'description_width': 'initial'},
    layout=widgets.Layout(width='50%')
)
output = widgets.Output()

def on_track_id_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        track_id = change['new']
        with output:
            clear_output(wait=True)

            data = track_data.get(track_id, [])
            if not data:
                print(f"ID {track_id} не найден.")
                return

            frames = [p["frame"] for p in data if p.get("speed") is not None]
            speeds = [p["speed"] for p in data if p.get("speed") is not None]
            accel_frames = [p["frame"] for p in data if p.get("acceleration") is not None]
            accels = [p.get("acceleration") for p in data if p.get("acceleration") is not None]

            fig, ax1 = plt.subplots(figsize=(12, 6))

            if frames:
                ax1.plot(frames, speeds, label="Скорость (м/с)", color='tab:blue', linewidth=2)
                ax1.set_ylabel("Скорость (м/с)", color='tab:blue')
                ax1.tick_params(axis='y', labelcolor='tab:blue')

            ax2 = ax1.twinx()
            if accel_frames:
                ax2.plot(accel_frames, accels, '--', label="Ускорение (м/с²)", color='tab:red', alpha=0.7)
                ax2.set_ylabel("Ускорение (м/с²)", color='tab:red')
                ax2.tick_params(axis='y', labelcolor='tab:red')

            ax1.set_title(f"График движения объекта ID {track_id}")
            ax1.set_xlabel("Кадр")
            ax1.grid(True)
            fig.tight_layout()
            display(fig)
            plt.close(fig)

            events = extract_events_interval(data, fps)
            if events:
                print(f"\nОбнаруженные события для ID {track_id}:")
                for frame, event in events:
                    print(f"Кадр {frame}: {event}")
            else:
                print("\nСобытия не обнаружены.")

dropdown.observe(on_track_id_change)

display(dropdown, output)

Dropdown(description='Track ID:', layout=Layout(width='50%'), options=(1863, 1864, 1865, 1866, 1867, 1868, 186…

Output()