# Установка и базовые импорты

In [1]:
import gdown, os, sys, glob, math, time
from pathlib import Path
import cv2, subprocess, shlex, numpy as np, os
from pathlib import Path
import numpy as np
import torch
from torchvision import transforms
from torchvision.models.video import r3d_18, R3D_18_Weights

# Using a different, generally compatible HTML5 video
#html_video = "https://www.learningcontainer.com/wp-content/uploads/2020/05/sample-mp4-file.mp4"
html_video = 'https://cdn.pixabay.com/video/2024/05/08/211188_large.mp4'
#html_video = "https://cdn.pixabay.com/video/2016/10/11/5871-186389068_large.mp4"
path_video = '/content/test_video.mp4'


 # ← укажите папку с видео
video_path = Path(path_video)
video_path.parent.mkdir(parents=True, exist_ok=True)

# Remove existing video if it clashes with the new download
if video_path.exists():
    os.remove(video_path)

try:
    gdown.download(html_video, str(video_path), quiet=False)
except Exception:
    pass


# небольшие хелперы
#VIDEOS_DIR = video_path  # initial value
OUT_DIR = Path("out"); OUT_DIR.mkdir(exist_ok=True, parents=True)

def list_videos(vdir: Path):
    vids = sorted([p for ext in ("*.mp4","*.avi","*.mov","*.mkv") for p in vdir.glob(ext)])
    if not vids:
        raise FileNotFoundError(f"В {vdir} не найдено видеофайлов.")
    return vids

# If path_video is a file that exists, use it directly; otherwise treat as directory
if video_path.exists() and video_path.is_file():
    videos = [video_path]
else:
    VIDEOS_DIR = video_path if video_path.is_dir() else video_path.parent
    videos = list_videos(VIDEOS_DIR)

videos[:3]

Downloading...
From: https://cdn.pixabay.com/video/2024/05/08/211188_large.mp4
To: /content/test_video.mp4
100%|██████████| 45.7M/45.7M [00:00<00:00, 58.5MB/s]


[PosixPath('/content/test_video.mp4')]

In [2]:
import base64
from IPython.display import HTML, display
def display_video_from_path(video_path: str) -> HTML:
    """
    Reads a video file, encodes it as base64, and embeds it within an IPython.display.HTML object for display.
    This method is often more robust for displaying local video files in Colab.

    Args:
        video_path (str): The path to the video file.

    Returns:
        IPython.display.HTML: An HTML object containing the embedded video.
    """
    with open(video_path, 'rb') as video_file:
        encoded_video = base64.b64encode(video_file.read()).decode('utf-8')

    video_html = f"""
    <video width='512' controls autoplay>
        <source src='data:video/mp4;base64,{encoded_video}' type='video/mp4'>
        Your browser does not support the video tag.
    </video>
    """
    return HTML(video_html)

src_path = str(videos[0])
display(display_video_from_path(src_path))

Output hidden; open in https://colab.research.google.com to view.

# Работа с видео в CV (чтение, запись, кадры, FPS)

Ключевые вещи: открыть поток, прочитать/обработать кадры, сохранить результат.

In [3]:
import cv2, subprocess, shlex, numpy as np, os
from pathlib import Path

# Вспомогательная функция: открыть ffmpeg-пайп под H.264
def open_h264_writer(dst_path: str, w: int, h: int, fps: float, crf: int = 20, preset: str = "veryfast"):
    """
    Создаёт процесс ffmpeg, который примет сырые кадры (BGR24) через stdin
    и запишет MP4 c H.264 (yuv420p, +faststart), совместимый с HTML5.
    Возвращает subprocess.Popen с открытым stdin.
    """
    Path(dst_path).parent.mkdir(parents=True, exist_ok=True)
    cmd = (
        f'ffmpeg -y -f rawvideo -pix_fmt bgr24 -s {w}x{h} -r {fps} -i - '
        f'-an -c:v libx264 -preset {preset} -crf {crf} -pix_fmt yuv420p '
        f'-movflags +faststart "{dst_path}"'
    )
    return subprocess.Popen(shlex.split(cmd), stdin=subprocess.PIPE)

# Куда сохранить результирующее видео (совместимое с HTML5)
dst_path = "/content/out/basic_h264_repaired.mp4"
Path(dst_path).parent.mkdir(parents=True, exist_ok=True)

# Открываем источник видео (должна быть определена переменная src_path)
cap = cv2.VideoCapture(src_path)
assert cap.isOpened(), f"Не открыть {src_path}"

# Извлекаем параметры входного видео
# fps: частота кадров, w/h: ширина/высота кадра (используем их для корректного пайпа в ffmpeg)
fps = cap.get(cv2.CAP_PROP_FPS) or 25  # на некоторых файлах CAP_PROP_FPS может вернуть 0 → подставляем 25
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Используем open_h264_writer
# Готовим команду ffmpeg и создаём процесс
proc = open_h264_writer(dst_path, w, h, fps)

# Читаем видео из OpenCV по кадрам и передаём их в stdin ffmpeg
# Важно: frame.tobytes() → последовательность байт в формате bgr24, как указано в командной строке
while True:
    ok, frame = cap.read()
    if not ok:
        break
    proc.stdin.write(frame.tobytes())

# Корректно закрываем ресурсы
cap.release()           # освобождаем видеодекодер OpenCV
proc.stdin.close()      # закрываем stdin, чтобы ffmpeg понял, что поток завершён
proc.wait()             # дожидаемся завершения ffmpeg и записи файла на диск

print("Готово:", dst_path)


Готово: /content/out/basic_h264_repaired.mp4


In [4]:
display(display_video_from_path(dst_path))

Output hidden; open in https://colab.research.google.com to view.

# Оптический поток: визуализация и применения

**Оптический поток** — это векторное поле смещений между соседними кадрами: для каждого «наблюдаемого» элемента сцены оцениваем, куда он сдвинулся за $\Delta t$.

## Базовые предпосылки

* **Неизменность яркости (brightness constancy):**

  $$
  I(x,y,t) \approx I(x+u,,y+v,,t+\Delta t),
  $$

  где $(u,v)$ — смещение пикселя.

* **Малость движения и гладкость поля** (для локальной аппроксимации и устойчивости).

Из первой предпосылки, после линеаризации (Тейлор) получаем **основное уравнение оптического потока**:

$$
I_x,u + I_y,v + I_t \approx 0,
$$

где $I_x, I_y$ — пространственные производные, $I_t$ — временная.

---

## А) Разреженный Lucas–Kanade (LK)

Идея: в небольшой окрестности $W$ вокруг точки считаем смещение **одним и тем же**, решаем задачу наименьших квадратов:

$$
\min_{u,v}\ \sum_{(x,y)\in W} \big(I_x(x,y),u + I_y(x,y),v + I_t(x,y)\big)^2.
$$

Нормальные уравнения:

Тогда
$$
\underbrace{
\begin{pmatrix}
\sum I_x^2 & \sum I_x I_y\\[4pt]
\sum I_x I_y & \sum I_y^2
\end{pmatrix}
}_{\mathbf{A}}
\begin{pmatrix} u\\ v \end{pmatrix}
=
-\underbrace{
\begin{pmatrix}
\sum I_x I_t\\[4pt]
\sum I_y I_t
\end{pmatrix}
}_{\mathbf{b}} \, .
$$

Если \(\det(\mathbf{A})\) не близок к нулю, то
$$
\begin{pmatrix} u\\ v \end{pmatrix}
= -\,\mathbf{A}^{-1}\mathbf{b}.
$$
Чтобы система была устойчива, нужны «текстурные» точки, где есть градиент и по $x$, и по $y$. Поэтому выбираем точки по **Ши–Томаси** (good features to track).

**На практике:**

* Окно $W$ (напр. $21\times21$), пирамиды (image pyramids) для больших смещений.
* Выдаёт смещения только в выбранных **точках** (разреженно).
* Визуализация: стрелки/траектории по «углам» (Shi–Tomasi) — компактно и быстро.

**Когда выбирать LK:** нужно быстро отследить опорные точки/ключевые объекты; важна скорость и простота.

---

## Б) Плотный Farnebäck

Идея: аппроксимируем локальные окрестности **квадратичными полиномами** яркости и подбираем смещение между кадрами, которое согласует эти аппроксимации.

Локальная модель (первый кадр):
$$
I_1(\mathbf{x}) \approx \mathbf{x}^\top \mathbf{A},\mathbf{x} + \mathbf{b}^\top \mathbf{x} + c,
$$
во втором кадре $I_2(\mathbf{x}) \approx I_1(\mathbf{x}-\mathbf{d})$; подбираем $\mathbf{d}=(u,v)$, минимизируя несоответствие полиномиальных приближений. Далее уточняем на пирамиде (coarse-to-fine) и сглаживаем поле.

**Свойства:**

* Даёт **вектор** для **каждого пикселя** (плотный поток).
* Хорошо работает на «мягких» текстурах за счёт полиномиальной модели.
* Параметры: масштаб пирамиды, число уровней, размер окна, число итераций.

**Визуализация (HSV):**

* Hue (оттенок) — направление вектора $\angle (u,v)$,
* Value (яркость) — величина $,\sqrt{u^2+v^2}$,
* Saturation — фиксируем 255.
  Так получаем «карту движения» с интуитивной цветовой кодировкой.

**Когда выбирать Farnebäck:** нужен **полный** поток для сегментации движения, оценки фона/фора, построения масок и последующего трекинга.

---

## Типичные применения

* **Сегментация движения**: порог по величине $|{\bf d}|$ → бинарная маска «где движется».
* **Инициализация/поддержка трекинга**: маска движения + bbox (мы показали на практике: поток → маска → контуры → рамки).
* **Стабилизация**: оценка глобального смещения/деформации (выравнивание кадров).
* **Аналитика жестов/активности**: суммарные векторы по ROI, гистограммы направлений и т. п.

---

## Что помнить на практике

* **Освещение/блики** нарушают «неизменность яркости» → возможны артефакты.
* **Большие смещения** → используем **пирамиду** и/или даунскейл + итерации.
* **Текстурность** важна для LK: выбираем точки (Shi–Томаси) там, где есть градиент в обеих координатах.
* Для «шумных» сцен — постобработка: сглаживание, морфология, NMS по bbox (как в ноутбуке).

---

## Куда «крутить» ручки

* **LK:** размер окна $W$, число уровней пирамиды, критерий сходимости.
  Больше окно/уровней → устойчивее к шуму и крупным смещениям, но медленнее.
* **Farnebäck:** `pyrScale`, `numLevels`, `winSize`, `numIters`, `polyN`, `polySigma`.
  Увеличение `winSize`/`numIters` — более гладкий и устойчивый поток; `polyN/polySigma` — компромисс «детали↔шум».

---

### Коротко: когда что использовать

* **Нужно быстро и наглядно понять траектории ключевых точек** → LK (разреженно).
* **Нужна карта движения и производные маски/боксы** → Farnebäck (плотно, с HSV-визуализацией).


In [5]:
# ВАЖНО: как и раньше, пишем результат через ffmpeg-пайп в H.264 (yuv420p, +faststart),
# чтобы ролики гарантированно проигрывались встроенным HTML5-плеером в ноутбуке.

import cv2, numpy as np, subprocess, shlex
from pathlib import Path

src_path = str(videos[0])         # берём то же входное видео, что и раньше
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---------- Общие параметры видео ----------
cap = cv2.VideoCapture(src_path)
assert cap.isOpened(), f"Не открыть видео: {src_path}"
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))   # ширина кадра
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))  # высота кадра
fps = cap.get(cv2.CAP_PROP_FPS) or 25          # частота кадров (если 0 — подставляем 25)




## А) Lucas–Kanade (разреженный): трекинг «углов» между кадрами

Делаем визуал явным:

- для LK: стрелки, толщина, фильтр по длине вектора, цвет по направлению;

- для Farneback: цветная карта (HSV) уже есть — просто сохраним и покажем.

**Lucas–Kanade: заметная визуализация (стрелки + цвет)**

In [6]:
# Разреженный оптический поток (Lucas–Kanade) — две панели:
# слева оригинальный кадр, справа — стрелки/точки на чёрном фоне.
# Цвет стрелок кодирует направление (угол) через HSV→BGR.
# Параметры min_len / arrow_scale / max_points_to_draw управляют читаемостью.

import cv2, numpy as np, subprocess, shlex
from pathlib import Path

src_path = str(videos[0])
cap = cv2.VideoCapture(src_path); assert cap.isOpened(), f"Не открыть: {src_path}"
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25


# ===== Настройки визуализации =====
min_len = 1.5             # игнорировать «дрожь» короче порога (пиксели)
arrow_scale = 2.5         # во сколько раз удлинять стрелки для наглядности
tip_len = 0.35            # относительная длина наконечника стрелки (0..1)
thickness = 2             # толщина линий
max_points_to_draw = 300  # максимум стрелок на кадр (для читаемости)

# Выход: ширина в 2 раза больше (две панели рядом)
dst_path = str(OUT_DIR / "optical_flow_LK_side_by_side_h264.mp4")
proc = open_h264_writer(dst_path, w*2, h, fps)

# ===== Инициализация LK =====
ok, prev = cap.read(); assert ok, "Пустое видео?"
prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)

# Стартовые «углы» (Shi–Tomasi) — те точки, которые LK будет трекать
p0 = cv2.goodFeaturesToTrack(
    prev_gray,
    maxCorners=800,      # больше — гуще поле
    qualityLevel=0.01,
    minDistance=7,
    blockSize=7
)

lk_params = dict(
    winSize=(21, 21),    # окно для локальной аппроксимации
    maxLevel=3,          # пирамидальные уровни (для крупных смещений)
    criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)

while True:
    ok, frame = cap.read()
    if not ok:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # Если точки «иссякли», переинициализируем (не рвём поток)
    if p0 is None or len(p0) == 0:
        p0 = cv2.goodFeaturesToTrack(gray, maxCorners=800, qualityLevel=0.01, minDistance=7, blockSize=7)
        # Покажем пустую правую панель (стрелок пока нет)
        right = np.zeros((h, w, 3), dtype=np.uint8)
        stacked = np.hstack([frame, right])
        proc.stdin.write(stacked.tobytes())
        prev_gray = gray
        continue

    # LK: сопоставляем «углы» между prev_gray и текущим gray
    p1, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, p0, None, **lk_params)

    # Правая панель — «чистый» чёрный фон под стрелки
    right = np.zeros((h, w, 3), dtype=np.uint8)

    if p1 is not None and st is not None:
        good_new = p1[st == 1]   # новые позиции тех точек, что удалось сопоставить
        good_old = p0[st == 1]   # соответствующие старые позиции

        if good_new.size > 0:
            # Векторы смещения и углы
            vec = good_new - good_old
            dx, dy = vec[:, 0], vec[:, 1]
            mag = np.sqrt(dx*dx + dy*dy)          # длина вектора
            ang = np.arctan2(dy, dx) + np.pi      # угол [0..2π)

            # Отсеиваем «дрожь»
            keep = mag > min_len
            if np.any(keep):
                gn = good_new[keep]
                go = good_old[keep]
                ang_keep = ang[keep]
                mag_keep = mag[keep]

                # Прореживание, чтобы не «забить» кадр
                if len(gn) > max_points_to_draw:
                    idx = np.random.choice(len(gn), size=max_points_to_draw, replace=False)
                    gn, go, ang_keep, mag_keep = gn[idx], go[idx], ang_keep[idx], mag_keep[idx]

                # Цвет стрелок по направлению (Hue), насыщенность/яркость — максимальные
                hue = (ang_keep / (2*np.pi) * 179).astype(np.uint8).reshape(-1, 1)
                hsv = np.zeros((len(hue), 1, 3), dtype=np.uint8)
                hsv[..., 0] = hue
                hsv[..., 1] = 255
                hsv[..., 2] = 255
                colors = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR).reshape(-1, 3).tolist()

                # Рисуем стрелки и конечные точки на правой панели (оригинал — слева без изменений)
                for (x1, y1), (x0, y0), color in zip(gn, go, colors):
                    # Увеличим длину для наглядности
                    vx = (x1 - x0) * arrow_scale
                    vy = (y1 - y0) * arrow_scale
                    x0i, y0i = int(x0), int(y0)
                    x1i, y1i = int(x0 + vx), int(y0 + vy)

                    cv2.arrowedLine(right, (x0i, y0i), (x1i, y1i),
                                    color, thickness=thickness, tipLength=tip_len)
                    cv2.circle(right, (x1i, y1i), 3, color, -1)

    # Склеиваем две панели: [оригинал | стрелки]
    stacked = np.hstack([frame, right])
    proc.stdin.write(stacked.tobytes())

    # Обновляем «предыдущий кадр» и точки — продолжаем треки
    prev_gray = gray
    p0 = None if (p1 is None or st is None) else good_new.reshape(-1, 1, 2)

cap.release()
proc.stdin.close()
proc.wait()
print("Готово:", dst_path)


Готово: out/optical_flow_LK_side_by_side_h264.mp4


In [7]:
display(display_video_from_path(dst_path))

Output hidden; open in https://colab.research.google.com to view.

## B) Плотный оптический поток
визуализация потока на чёрном фоне

Делает быстро и робастно:
- Если доступен GPU (OpenCV с CUDA), использует cv2.cuda (TV-L1 или Farneback).
- Иначе — CPU Farneback на уменьшенной копии кадра, затем апскейл в исходный размер.
- Запись сразу в H.264 (yuv420p, +faststart), чтобы HTML5-плеер в ноутбуке играл без сюрпризов.

In [22]:
# Плотный поток + МАСКА + ОРИГИНАЛ С BBOX + NMS
# ┌──────────────┬─────────────────────┬────────────── ┐
# │  FLOW (HSV)  │  MOTION MASK (bin)  │ ORIGINAL+BBOX │
# └──────────────┴─────────────────────┴────────────── ┘
# - формируем маску движения по перцентилю;
# - чистим морфологией;
# - собираем контуры → боксы;
# - считаем "оценку" бокса = средняя скорость в mag внутри рамки;
# - применяем NMS (порог по IoU), чтобы убрать дубликаты/перекрытия;
# - рисуем более "жирные" боксы.

import cv2, numpy as np, subprocess, shlex
from pathlib import Path

src_path = str(videos[0])
OUT_DIR.mkdir(parents=True, exist_ok=True)
dst_path = str(OUT_DIR / "flow_mask_orig_bbox_nms_h264.mp4")

# --- тюнинг производительности ---
MAX_SIDE = 720
SKIP     = 0

# --- параметры маски/боксов/NMS ---
PERC       = 95           # порог по перцентилю скорости
K_OPEN     = 3            # морф. открытие (шум)
K_DILATE   = 5            # дилатация (слияние)
MIN_AREA   = 800          # минимальная площадь бокса
IOU_THR    = 0.5          # порог для NMS (чем выше, тем агрессивнее)
DRAW_COLOR = (0, 255, 255)
THICK      = 4            # толще рамки


def iou_xywh(a, b):
    """IoU для боксов в формате (x, y, w, h)."""
    ax1, ay1, aw, ah = a; ax2, ay2 = ax1 + aw, ay1 + ah
    bx1, by1, bw, bh = b; bx2, by2 = bx1 + bw, by1 + bh
    inter_x1, inter_y1 = max(ax1, bx1), max(ay1, by1)
    inter_x2, inter_y2 = min(ax2, bx2), min(ay2, by2)
    inter_w, inter_h = max(0, inter_x2 - inter_x1), max(0, inter_y2 - inter_y1)
    inter = inter_w * inter_h
    union = aw * ah + bw * bh - inter
    return inter / union if union > 0 else 0.0

def nms_xywh(boxes, scores, iou_thr=0.5):
    """Простая NMS: сортируем по score убыв., отбираем, гасим перекрывающиеся > iou_thr."""
    idxs = np.argsort(scores)[::-1]
    keep = []
    suppressed = np.zeros(len(idxs), dtype=bool)
    for i in range(len(idxs)):
        if suppressed[i]:
            continue
        keep.append(idxs[i])
        for j in range(i + 1, len(idxs)):
            if suppressed[j]:
                continue
            if iou_xywh(boxes[idxs[i]], boxes[idxs[j]]) > iou_thr:
                suppressed[j] = True
    return [boxes[k] for k in keep], [scores[k] for k in keep]

# --- исходное видео ---
cap = cv2.VideoCapture(src_path); assert cap.isOpened(), f"Не открыть: {src_path}"
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
FPS = cap.get(cv2.CAP_PROP_FPS) or 25

# вычислительный даунскейл
short, long = (H, W) if H <= W else (W, H)
scale = min(1.0, MAX_SIDE / short)
proc_W = max(2, int(W * scale)) // 2 * 2
proc_H = max(2, int(H * scale)) // 2 * 2
sx = W / proc_W
sy = H / proc_H

# полотно из 3 панелей по ширине
out_w, out_h = W * 3, H
proc = open_h264_writer(dst_path, out_w, out_h, FPS)

# первый кадр
ok, prev = cap.read(); assert ok, "Пустое видео?"
for _ in range(SKIP): cap.grab()
prev_small = cv2.resize(prev, (proc_W, proc_H), interpolation=cv2.INTER_AREA) if scale < 1.0 else prev
prev_gray_small = cv2.cvtColor(prev_small, cv2.COLOR_BGR2GRAY)

kernel_open   = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (K_OPEN, K_OPEN))
kernel_dilate = cv2.getStructuringElement(cv2.MORPH_RECT,   (K_DILATE, K_DILATE))

while True:
    ok, frame = cap.read()
    if not ok:
        break
    for _ in range(SKIP): cap.grab()

    # расчёт плотного потока (на даунскейле для скорости)
    curr_small = cv2.resize(frame, (proc_W, proc_H), interpolation=cv2.INTER_AREA) if scale < 1.0 else frame
    curr_gray_small = cv2.cvtColor(curr_small, cv2.COLOR_BGR2GRAY)
    flow_small = cv2.calcOpticalFlowFarneback(prev_gray_small, curr_gray_small,
                                              None, 0.5, 3, 15, 3, 5, 1.2, 0)

    # апскейл вектора к оригиналу
    fx = cv2.resize(flow_small[..., 0], (W, H), interpolation=cv2.INTER_LINEAR) * sx
    fy = cv2.resize(flow_small[..., 1], (W, H), interpolation=cv2.INTER_LINEAR) * sy

    # панель 1: цветная карта потока (HSV→BGR)
    mag, ang = cv2.cartToPolar(fx, fy)
    hsv = np.zeros((H, W, 3), dtype=np.uint8)
    hsv[..., 0] = (ang * 180 / np.pi / 2).astype(np.uint8)
    hsv[..., 1] = 255
    hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    flow_vis = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    # панель 2: маска движения
    thr = np.percentile(mag, PERC) if np.any(mag) else 0.0
    mask = (mag > thr).astype(np.uint8) * 255
    if K_OPEN > 1:
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open, iterations=1)
    if K_DILATE > 1:
        mask = cv2.dilate(mask, kernel_dilate, iterations=1)
    mask_vis = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)

    # панель 3: оригинал + боксы (после NMS)
    orig_with_bbox = frame.copy()

    # контуры → боксы → скор (средняя скорость в маг)
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes, scores = [], []
    for c in contours:
        x, y, w, h = cv2.boundingRect(c)
        if w * h < MIN_AREA:
            continue
        # оценка: средняя скорость внутри бокса — устойчивее площади/максимума
        mean_speed = float(mag[y:y+h, x:x+w].mean())
        boxes.append((x, y, w, h))
        scores.append(mean_speed)

    # NMS по IoU, чтобы убрать дубли/накладки
    if boxes:
        boxes_nms, scores_nms = nms_xywh(boxes, scores, iou_thr=IOU_THR)
        for (x, y, w, h) in boxes_nms:
            cv2.rectangle(orig_with_bbox, (x, y), (x + w, y + h), DRAW_COLOR, THICK)

    # склейка панелей
    canvas = np.hstack([flow_vis, mask_vis, orig_with_bbox])
    proc.stdin.write(canvas.tobytes())

    prev_gray_small = curr_gray_small

cap.release()
proc.stdin.close()
proc.wait()
print("Готово:", dst_path)


Готово: out/flow_mask_orig_bbox_nms_h264.mp4


In [23]:
display(display_video_from_path(dst_path))

Output hidden; open in https://colab.research.google.com to view.

# Калман-фильтр для сглаживания 2D-позиции объекта (центр bbox) + YOLO (person)

**Идея.** Отслеживаем центр рамки детектора как 2D-точку и считаем, что объект движется с почти постоянной скоростью. Детектор даёт шумные измерения позиции — Калман сглаживает и умеет предсказывать, когда измерений нет.

## Модель состояния (constant velocity)

Состояние:
$$
\mathbf{x}_k =
\begin{bmatrix}
x_k\\
y_k\\
v^x_k\\
v^y_k
\end{bmatrix},\quad \mathbf{u}_k \equiv 0.
$$

Динамика за шаг $\Delta t$:
$$
\mathbf{x}_k = \mathbf{F}\,\mathbf{x}_{k-1} + \mathbf{w}_k,\qquad
\mathbf{F}=
\begin{bmatrix}
1&0&\Delta t&0\\
0&1&0&\Delta t\\
0&0&1&0\\
0&0&0&1
\end{bmatrix},\quad
\mathbf{w}_k \sim \mathcal{N}(\mathbf{0},\,\mathbf{Q}).
$$

Обычно берут $\mathbf{Q}=q\,\mathbf{I}_4$ (чем больше $q$, тем быстрее реакция на манёвры).

## Модель измерения (YOLO → центр bbox)

Детектор возвращает центр рамки $(z^x_k, z^y_k)$:
$$
\mathbf{z}_k =
\begin{bmatrix}
z^x_k\\
z^y_k
\end{bmatrix}
=
\mathbf{H}\,\mathbf{x}_k + \mathbf{v}_k,\qquad
\mathbf{H}=
\begin{bmatrix}
1&0&0&0\\
0&1&0&0
\end{bmatrix},\quad
\mathbf{v}_k \sim \mathcal{N}(\mathbf{0},\,\mathbf{R}).
$$

Часто $\mathbf{R}=\sigma^2\,\mathbf{I}_2$ (чем больше $\sigma^2$, тем меньше доверие детектору).

## Рекурсии Калмана

**Прогноз:**
$$
\hat{\mathbf{x}}^-_k = \mathbf{F}\,\hat{\mathbf{x}}_{k-1},\qquad
\mathbf{P}^-_k = \mathbf{F}\,\mathbf{P}_{k-1}\,\mathbf{F}^\top + \mathbf{Q}.
$$

**Коррекция (при наличии $\mathbf{z}_k$):**
$$
\mathbf{S}_k = \mathbf{H}\,\mathbf{P}^-_k\,\mathbf{H}^\top + \mathbf{R},\quad
\mathbf{K}_k = \mathbf{P}^-_k\,\mathbf{H}^\top\,\mathbf{S}_k^{-1},
$$
$$
\hat{\mathbf{x}}_k = \hat{\mathbf{x}}^-_к + \mathbf{K}_k\!\left(\mathbf{z}_k-\mathbf{H}\hat{\mathbf{x}}^-_k\right),\quad
\mathbf{P}_k = (\mathbf{I}-\mathbf{K}_k\mathbf{H})\,\mathbf{P}^-_k.
$$

Если детектор молчит — делаем только прогноз.

## Как тюнить шумы

- Больше $q$ в $\mathbf{Q}$ → быстрее подстраиваемся к резким поворотам/ускорениям.
- Больше $\sigma^2$ в $\mathbf{R}$ → сильнее сглаживание «дрожащих» измерений.
- Нестрогий FPS → обновляй $\Delta t$ как фактический интервал между кадрами.




In [11]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.227-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.227-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m42.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.18-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.227 ultralytics-thop-2.0.18


In [12]:
# Практика:
#   - Детектируем людей (COCO class 0) самой лёгкой моделью "yolov8n.pt".
#   - Берём самый крупный bbox в кадре (устойчивее при множестве людей).
#   - Визуализация:
#       * зелёная точка — «сырое» измерение детектора,
#       * синяя точка — оценка Калмана (сглаженная/предсказанная).
#   - Сохранение: пишем сразу в H.264 (yuv420p, +faststart) через ffmpeg-пайп — корректно играет в ноутбуке.

import cv2, numpy as np, subprocess, shlex
from pathlib import Path
from ultralytics import YOLO


class Kalman2D:
    """
    Линейный Калман под модель «равномерного движения»:
        state = [x, y, vx, vy]^T
        measurement = [x, y]^T
    dt — шаг по времени (1/FPS), process_var — дисперсия процесса (насколько доверяем модели),
    meas_var — дисперсия измерения (насколько шумны наблюдения детектора).
    """
    def __init__(self, dt: float = 1.0, process_var: float = 1.0, meas_var: float = 25.0):
        self.kf = cv2.KalmanFilter(4, 2, 0)

        # Матрица перехода (constant velocity model)
        self.kf.transitionMatrix = np.array([
            [1, 0, dt, 0 ],
            [0, 1, 0 , dt],
            [0, 0, 1 , 0 ],
            [0, 0, 0 , 1 ],
        ], np.float32)

        # Матрица наблюдения: видим только позицию
        self.kf.measurementMatrix = np.array([
            [1, 0, 0, 0],
            [0, 1, 0, 0],
        ], np.float32)

        # Ковариация процессного шума (Q): чем больше — тем быстрее «догоняем» резкие манёвры
        q = float(process_var)
        self.kf.processNoiseCov = np.array([
            [q, 0, 0, 0],
            [0, q, 0, 0],
            [0, 0, q, 0],
            [0, 0, 0, q],
        ], np.float32)

        # Ковариация шума измерений (R): чем больше — тем меньше доверяем детектору
        r = float(meas_var)
        self.kf.measurementNoiseCov = np.array([
            [r, 0],
            [0, r],
        ], np.float32)

        # Начальная апостериорная ковариация и состояние
        self.kf.errorCovPost = np.eye(4, dtype=np.float32)
        self.kf.statePost = np.zeros((4, 1), np.float32)

        self.initialized = False  # выставим true при первом «валидном» измерении

    def _init_from_measurement(self, xy: np.ndarray):
        """Инициализация состояния первой валидной позицией (нулевая скорость)."""
        x, y = float(xy[0]), float(xy[1])
        self.kf.statePost[:] = np.array([[x], [y], [0.0], [0.0]], np.float32)
        self.initialized = True

    def predict(self) -> np.ndarray:
        """Прогноз позиции [x, y] без коррекции (когда нет наблюдения)."""
        pred = self.kf.predict()
        return pred[:2].reshape(-1)  # (x, y)

    def correct(self, xy: np.ndarray) -> np.ndarray:
        """
        Коррекция по измерению [x, y]. Если фильтр не инициализирован — инициализируем.
        Возвращает оценку позиции после коррекции.
        """
        xy = np.asarray(xy, dtype=np.float32).reshape(2, 1)
        if not self.initialized:
            self._init_from_measurement(xy.ravel())
            return xy.ravel()
        est = self.kf.correct(xy)
        return est[:2].reshape(-1)

def bbox_to_center_xyxy(b: np.ndarray) -> tuple[float, float]:
    """Перевод bbox формата [x1, y1, x2, y2] → центр (x, y)."""
    x1, y1, x2, y2 = b
    return (float((x1 + x2) * 0.5), float((y1 + y2) * 0.5))


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [13]:
# --- вход/выход ---
src_path = str(videos[0])
cap = cv2.VideoCapture(src_path); assert cap.isOpened(), f"Не открыть: {src_path}"
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25

dst_path = str(OUT_DIR / "kalman_demo_h264.mp4")
proc = open_h264_writer(dst_path, w, h, fps, crf=20, preset="veryfast")

# --- детектор и фильтр ---
yolo = YOLO("yolov8n.pt")                         # лёгкая COCO-модель
kf = Kalman2D(dt=1.0/float(fps), process_var=1.0, meas_var=50.0)  # базовые шумы: под ситуацию можно тюнить

while True:
    ok, frame = cap.read()
    if not ok:
        break

    # Быстрый детект по кадру (без накопления результатов в RAM)
    res = yolo(frame, stream=False, verbose=False)[0]

    # Собираем bbox людей (class 0 = person), оставляем уверенные
    bboxes = []
    if res.boxes is not None and len(res.boxes) > 0:
        xyxy = res.boxes.xyxy.cpu().numpy()
        cls   = res.boxes.cls.cpu().numpy().astype(int)
        conf  = res.boxes.conf.cpu().numpy()
        for b, c, p in zip(xyxy, cls, conf):
            if c == 0 and p > 0.3:       # человек с conf > 0.3
                bboxes.append(b)

    meas_xy = None
    if bboxes:
        # Берём самый большой bbox — устойчивее от случайных мелких срабатываний
        areas = [(b[2]-b[0])*(b[3]-b[1]) for b in bboxes]
        b = bboxes[int(np.argmax(areas))]
        x1, y1, x2, y2 = map(int, b)
        # Рисуем «сырое» измерение: зелёную рамку и зелёную точку в центре
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 5)
        meas_xy = bbox_to_center_xyxy(b)
        #cv2.circle(frame, (int(meas_xy[0]), int(meas_xy[1])), 5, (0, 255, 0), -1)
        cv2.circle(frame, (int(meas_xy[0]), int(meas_xy[1])), 9, (0, 255, 0), -1)
        cv2.circle(frame, (int(meas_xy[0]), int(meas_xy[1])), 11, (0, 100, 0), 2)

    # Шаг предсказания
    pred_xy = kf.predict()

    # Шаг коррекции (если есть наблюдение)
    if meas_xy is not None:
        est_xy = kf.correct(meas_xy)
    else:
        est_xy = pred_xy  # нет измерения — остаёмся на предсказании

    # Визуализация оцененной позиции Калмана (синяя точка) и «хвостик» скорости
    ex, ey = int(est_xy[0]), int(est_xy[1])
    cv2.circle(frame, (ex, ey), 11, (255, 0, 0), -1)
    cv2.circle(frame, (ex, ey), 13, (60, 0, 0), 2)
    cv2.putText(frame, "Kalman (blue) vs measurement (green)", (20, h - 20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 4)    # тень
    cv2.putText(frame, "Kalman (blue) vs measurement (green)", (20, h - 20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)  # текст


    # Пишем кадр в ffmpeg
    proc.stdin.write(frame.tobytes())

cap.release()
proc.stdin.close()
proc.wait()
print("Калман-демо записано в:", dst_path)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 241.3MB/s 0.0s
Калман-демо записано в: out/kalman_demo_h264.mp4


In [14]:
display(display_video_from_path(dst_path))

Output hidden; open in https://colab.research.google.com to view.

# Классификация действия: 3D-CNN r3d_18 (Kinetics-400)

**Задача.** По короткому клипу (последовательности кадров) определить **класс действия**.

## Почему 3D-свёртки?
Обычная 2D-CNN видит только пространственные паттерны внутри одного кадра.
3D-CNN (как r3d_18) накладывает **свёртку 3×k×k** по времени и пространству → учит
совместные **пространственно-временные признаки** (движения и их форму).

## Что делает r3d_18
- Архитектура: **ResNet-18**, где все свёртки заменены на **3D-свёртки**.
- Вход: тензор размера **(C, T, H, W)**, обычно `C=3`, `T≈8–32` кадров.
- Выход: логиты на **Kinetics-400** (400 классов: “dribbling basketball”, “playing guitar”, …).

## Мини-инференс для демонстрации
1) Читаем видео покадрово и поддерживаем скользящее окно из `T=16` кадров.  
2) Каждые `S` кадров (шаг) прогоняем окно через `r3d_18 (Kinetics-400)` → получаем вероятности классов.  
3) На кадрах между двумя инференсами держим последний предсказанный `top-1` (как «стабильную подпись»).  
4) Сохраняем аннотированный ролик (overlay: класс + вероятность) и простую временную диаграмму класса по кадрам.

> Для скорости перед подачей в сеть уменьшаем размер кадров (например, короткая сторона → 256) и даём модели `16×256×256`.
> На практике для надёжной оценки берут **несколько клипов** из разных мест ролика,
> иногда с разными кропами/масштабами, и усредняют предсказания.


In [15]:
# Классификация действия (r3d_18, Kinetics-400) + проигрывание видео с оверлеем top-5
# — выбираем 16 кадров равномерно по ролику
# — прогоняем через r3d_18 -> top-5 классов
# — сохраняем исходное видео c «табличкой» top-5 поверх (стабильно для HTML5)

import cv2, torch, numpy as np, subprocess, shlex
from pathlib import Path
from torchvision.models.video import r3d_18, R3D_18_Weights

# ----- входы/выходы -----
VIDEO_PATH = str(videos[0])  # иначе возьмём первый в списке

OUT_DIR.mkdir(parents=True, exist_ok=True)
ANN_PATH = str(OUT_DIR / "r3d18_action_overlay_h264.mp4")  # аннотированный ролик

# ----- модель/препроцесс -----
DEVICE   = "cuda" if torch.cuda.is_available() else "cpu"
WEIGHTS  = R3D_18_Weights.DEFAULT
MODEL    = r3d_18(weights=WEIGHTS).eval().to(DEVICE)
PREPROC  = WEIGHTS.transforms()
CLASSES  = WEIGHTS.meta["categories"]

# ----- утилиты -----
def sample_clip_uniform(path: str, num_frames: int = 16):
    """Равномерно выбираем num_frames кадров из ролика. Возвращает (T,H,W,3) RGB uint8 и индексы кадров."""
    cap = cv2.VideoCapture(path); assert cap.isOpened(), f"Не открыть: {path}"
    n = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    idxs = np.linspace(0, max(0, n-1), num_frames).astype(int)
    frames, picked = [], []
    i = 0
    ok = True
    while ok and i <= idxs[-1]:
        ok, bgr = cap.read()
        if not ok: break
        if i in idxs:
            frames.append(cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB))
            picked.append(i)
        i += 1
    cap.release()
    # дублируем последний, если видео слишком короткое
    while len(frames) < num_frames and len(frames)>0:
        frames.append(frames[-1]); picked.append(picked[-1])
    return np.stack(frames, axis=0), np.array(picked, dtype=int)

def to_model_tensor(clip_np: np.ndarray) -> torch.Tensor:
    """
    (T,H,W,3) uint8 -> PREPROC -> (1,3,T,H,W) float32 на DEVICE.
    У PREPROC оси могут быть (C,T,H,W) или (T,C,H,W) — унифицируем.
    """
    vid = torch.from_numpy(clip_np).permute(0,3,1,2).contiguous()  # (T,3,H,W)
    out = PREPROC(vid)  # обычно (3,T,H,W)
    if out.ndim != 4:
        raise RuntimeError(f"Ожидался 4D тензор после PREPROC, пришло: {tuple(out.shape)}")
    if out.shape[0] == 3:              # (C,T,H,W)
        clip = out.unsqueeze(0)        # -> (1,3,T,H,W)
    else:                              # (T,C,H,W)
        clip = out.permute(1,0,2,3).unsqueeze(0)
    return clip.to(DEVICE)


def draw_top5_panel(frame_bgr, top5, origin=(20, 20),
                    width=800, row_h=56,              # шире панель и выше строки
                    font_scale=2.0, thickness=3,      # крупный шрифт
                    alpha=0.55):                      # прозрачность подложки
    """
    Крупная полупрозрачная панель с Top-5.
    - width: ширина панели
    - row_h: высота строки (межстрочный интервал)
    - font_scale, thickness: размер/толщина шрифта
    - alpha: доля подложки (0..1)
    """
    x0, y0 = origin
    rows = len(top5) + 1
    pad  = 14
    h = pad + rows * row_h

    overlay = frame_bgr.copy()
    # фон-подложка
    cv2.rectangle(overlay, (x0, y0), (x0 + width, y0 + h), (0, 0, 0), -1)
    cv2.addWeighted(overlay, alpha, frame_bgr, 1 - alpha, 0, frame_bgr)

    # заголовок
    cv2.putText(frame_bgr, "R3D-18 / Kinetics-400 (Top-5)",
                (x0 + pad, y0 + int(row_h * 0.7)),
                cv2.FONT_HERSHEY_SIMPLEX, font_scale, (240, 240, 240),
                thickness, cv2.LINE_AA)

    # строки top-5
    for k, (lab, p) in enumerate(top5, start=1):
        text = f"{k}. {lab}  ({p:.2f})"
        y = y0 + int(row_h * (k + 0.7))
        cv2.putText(frame_bgr, text, (x0 + pad, y),
                    cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255),
                    thickness, cv2.LINE_AA)

# ----- 1) Клип -> предсказание -----
NUM_FRAMES = 16
clip_np, picked_idx = sample_clip_uniform(VIDEO_PATH, NUM_FRAMES)  # (T,H,W,3), индексы кадров
print("clip shape (T,H,W,3):", clip_np.shape)

with torch.no_grad():
    clip_tensor = to_model_tensor(clip_np)          # (1,3,T,H,W) на DEVICE
    logits = MODEL(clip_tensor)                     # (1,400)
    probs  = torch.softmax(logits, dim=1)[0].cpu().numpy()

# top-5 списком [(label, prob), ...]
top5_idx = probs.argsort()[-5:][::-1]
TOP5 = [(CLASSES[i], float(probs[i])) for i in top5_idx]

print("\nTop-5 классов:")
for k,(lab,p) in enumerate(TOP5, start=1):
    print(f"{k}. {lab} — {p:.3f}")

# ----- 2) Проигрываем исходное видео с оверлеем top-5 -----
cap = cv2.VideoCapture(VIDEO_PATH); assert cap.isOpened()
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = open_h264_writer(ANN_PATH, w, h, fps, crf=20, preset="veryfast")

while True:
    ok, frame = cap.read()
    if not ok: break
    draw_top5_panel(frame, TOP5, origin=(20, 20),
                width=900, row_h=60, font_scale=2.2, thickness=4, alpha=0.6)
    writer.stdin.write(frame.tobytes())

cap.release(); writer.stdin.close(); writer.wait()
print("Аннотированное видео сохранено:", ANN_PATH)


Downloading: "https://download.pytorch.org/models/r3d_18-b3b3357e.pth" to /root/.cache/torch/hub/checkpoints/r3d_18-b3b3357e.pth


100%|██████████| 127M/127M [00:02<00:00, 53.1MB/s]


clip shape (T,H,W,3): (16, 3840, 2160, 3)

Top-5 классов:
1. faceplanting — 0.230
2. digging — 0.194
3. feeding birds — 0.136
4. crossing river — 0.071
5. slacklining — 0.066
Аннотированное видео сохранено: out/r3d18_action_overlay_h264.mp4


In [16]:
display(display_video_from_path(ANN_PATH))

Output hidden; open in https://colab.research.google.com to view.

# Комбинация детекции (YOLO) + трекинг (встроенный ByteTrack)

В учебных целях используем Ultralytics YOLO с готовым трекером bytetrack (простая и практичная связка).

In [17]:
# Детекция (YOLO) + трекинг (ByteTrack) с немедленной записью результата в H.264,
# чтобы видео гарантированно воспроизводилось в HTML5-плеере ноутбука.

from ultralytics import YOLO
import cv2, subprocess, shlex, numpy as np
from pathlib import Path

# 1) Загружаем лёгкую предобученную модель детекции (COCO)
yolo = YOLO("yolov8n.pt")

# 2) Источник видео и путь, куда писать итоговый ролик (совместимый с браузером)
src_path = str(videos[0])
dst_path = str(Path(OUT_DIR) / "yolo_bytetrack" / "yolo_track_run" / "tracked_direct_h264.mp4")
Path(dst_path).parent.mkdir(parents=True, exist_ok=True)

# 3) Получаем параметры входного видео для корректной конфигурации ffmpeg-пайпа
cap = cv2.VideoCapture(src_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25    # частота кадров (если 0 — подставим 25)
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# 4) Готовим ffmpeg-процесс, который будет принимать сырые кадры (BGR) по stdin и кодировать их в H.264
#    -f rawvideo      : вход — сырые кадры без контейнера
#    -pix_fmt bgr24   : формат пикселей OpenCV (3 канала по 8 бит)
#    -s {w}x{h}       : размер кадра, ffmpeg должен знать точные размеры RAW-потока
#    -r {fps}         : частота кадров
#    -i -             : читать из stdin
#    -an              : без аудио (для простоты учебного примера)
#    -c:v libx264     : кодек H.264 (совместим с HTML5)
#    -preset veryfast : ускоренная кодировка (хорошо для демо)
#    -crf 20          : целевое визуальное качество (чем выше CRF — тем сильнее сжатие)
#    -pix_fmt yuv420p : требуемый браузером формат
#    -movflags +faststart : перенести moov-атом в начало (быстрый старт стриминга в браузере)
cmd = (
    f'ffmpeg -y -f rawvideo -pix_fmt bgr24 -s {w}x{h} -r {fps} -i - '
    f'-an -c:v libx264 -preset veryfast -crf 20 -pix_fmt yuv420p '
    f'-movflags +faststart "{dst_path}"'
)
proc = subprocess.Popen(shlex.split(cmd), stdin=subprocess.PIPE)

# 5) Запускаем трекинг в потоковом режиме:
#    stream=True — выдаёт генератор результатов, не копит их в память (важно для длинных роликов)
#    tracker="bytetrack.yaml" — вшитая конфигурация ByteTrack для ассоциации детекций в треки
#    conf / iou — пороги детекции и NMS
for r in yolo.track(source=src_path, tracker="bytetrack.yaml",
                    conf=0.25, iou=0.5, stream=True, verbose=False):
    # r.plot() рисует на кадре рамки, метки классов, ID треков — получаем готовый кадр для записи
    frame = r.plot()
    # Пишем байты кадра (BGR) напрямую в stdin ffmpeg — он кодирует и пишет MP4 на диск
    proc.stdin.write(frame.tobytes())

# 6) Аккуратно закрываем ресурсы: видеопоток и stdin ffmpeg (чтобы он завершил файл корректно)
cap.release()
proc.stdin.close()
proc.wait()



[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...
Using Python 3.12.12 environment at: /usr
Resolved 2 packages in 130ms
Prepared 1 package in 29ms
Installed 1 package in 5ms
 + lap==0.5.12

[31m[1mrequirements:[0m AutoUpdate success ✅ 0.8s



0

In [18]:
display(display_video_from_path(dst_path))

Output hidden; open in https://colab.research.google.com to view.

## Метрики по трекингу: средняя скорость и время в зоне

**Дано.** Для объекта с ID \(i\) известны центры его боксов по кадрам:
$$
c_t^{(i)} = \big(x_t^{(i)},\, y_t^{(i)}\big), \quad t=1,\dots,T_i,
$$
частота кадров — \(f\) FPS.

**Покадровое смещение (в пикселях):**
$$
\Delta d_t^{(i)} = \left\|\, c_t^{(i)} - c_{t-1}^{(i)} \,\right\|_2,\quad t=2,\dots,T_i.
$$

**Суммарный путь (px):**
$$
D^{(i)} = \sum_{t=2}^{T_i} \Delta d_t^{(i)}.
$$

**Мгновенная скорость (px/s) на кадре \(t\):**
$$
v_t^{(i)} = \Delta d_t^{(i)} \cdot f.
$$

**Средняя скорость (px/s) за трек:**
$$
\bar{v}^{(i)} = \frac{D^{(i)}}{\,(T_i-1)/f\,}
= f \cdot \frac{1}{T_i-1}\sum_{t=2}^{T_i}\Delta d_t^{(i)}.
$$

**Масштаб (опционально).** Если известен коэффициент \(\alpha\) — метров на пиксель:
$$
v_{t,\text{м/с}}^{(i)} = \alpha\, v_t^{(i)},
\qquad
\bar{v}_{\text{м/с}}^{(i)} = \alpha\, \bar{v}^{(i)}.
$$

**Время в зоне.** Пусть \(Z\) — зона интереса (прямоугольник/полигон). Тогда
$$
T_{\text{in}}^{(i)}
= \frac{1}{f}\sum_{t=1}^{T_i}\mathbf{1}\!\left\{\, c_t^{(i)} \in Z \,\right\}
\quad \text{сек}.
$$

**Что отображать поверх видео (практика):**
- контур зоны \(Z\);
- bbox и центр \((x_t,y_t)\) с подписью ID;
- мгновенную скорость \(v_t\) (px/s или м/с);
- итоговую плашку на кадре/в конце ролика: \(\bar{v}\), \(T_{\text{in}}\), общая длина трека.


In [19]:
# Метрики по трекингу: средняя скорость (px/s) и время в зоне (сек)
# Используем Ultralytics YOLO + встроенный ByteTrack (stream=True), считаем «на лету».
# Визуализируем: зона (ROI), толщенные bbox, подписи ID и instant speed.
from ultralytics import YOLO
import cv2, numpy as np, subprocess, shlex
from pathlib import Path
from IPython.display import HTML

# ----- входные/выходные пути -----
SRC = str(videos[0])                                 # исходный ролик (как и раньше)
OUT_MP4 = str(OUT_DIR / "metrics_yolo_bytetrack_overlay_h264.mp4")

# ----- параметры зоны (ROI) -----
# Вариант 1: прямоугольник (x1,y1,x2,y2). Подберите под своё видео.
ROI_RECT = (50, 50, 600, 400)
# Вариант 2 (опционально): произвольный многоугольник (если нужно)
ROI_POLY = None  # например: np.array([(100,100),(600,120),(620,420),(80,400)], np.int32)


def in_zone(xy):
    """Проверка попадания точки в ROI."""
    x, y = xy
    if ROI_POLY is not None:
        return cv2.pointPolygonTest(ROI_POLY, (float(x), float(y)), False) >= 0
    x1, y1, x2, y2 = ROI_RECT
    return (x1 <= x <= x2) and (y1 <= y <= y2)

def bbox_center_xyxy(b):
    x1,y1,x2,y2 = b
    return ((x1+x2)/2.0, (y1+y2)/2.0)

def draw_metrics_label(img, text_lines, anchor, scale=1.0, pad=8,
                       bg=(0,0,0), fg=(255,255,255), alpha=0.65,
                       font=cv2.FONT_HERSHEY_SIMPLEX):
    # было: font_scale = 1.2 * 4 * scale; thickness = 2 * 4
    font_scale = 1.9 * scale                     # ~0.9 для 1080p, до ~1.3 для 4K
    thickness  = max(2, int(2 * scale))          # 2..3 обычно
    pad        = max(6, int(6 * scale))          # адаптивные отступы

    x, y = anchor
    line_h = 0; widths = []; heights = []
    for s in text_lines:
        (w, h), _ = cv2.getTextSize(s, font, font_scale, thickness)
        widths.append(w); heights.append(h); line_h = max(line_h, h)

    box_w = max(widths) + 2 * pad
    box_h = int(len(text_lines) * (line_h + pad)) + pad

    overlay = img.copy()
    cv2.rectangle(overlay, (x, y), (x + box_w, y + box_h), bg, -1)
    cv2.addWeighted(overlay, alpha, img, 1 - alpha, 0, dst=img)

    ty = y + pad + line_h
    for s in text_lines:
        cv2.putText(img, s, (x + pad, ty), font, font_scale, fg, thickness, cv2.LINE_AA)
        ty += line_h + pad


# ----- инициализация -----
yolo = YOLO("yolov8n.pt")

cap = cv2.VideoCapture(SRC); assert cap.isOpened(), f"Не открыть {SRC}"
fps = cap.get(cv2.CAP_PROP_FPS) or 25
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
writer = open_h264_writer(OUT_MP4, w, h, fps, crf=20, preset="veryfast")

# копия зоны в удобной форме
if ROI_POLY is not None:
    roi_poly_int = ROI_POLY.reshape(-1, 1, 2).astype(np.int32)
else:
    x1,y1,x2,y2 = ROI_RECT

# ----- аккумуляторы по ID -----
# Для каждого ID храним:
#  last_center  : последняя точка центра
#  dist_sum     : сумма пиксельных смещений по кадрам
#  frames       : сколько кадров видели ID
#  frames_in_roi: сколько кадров центр был в зоне
stats = {}

def update_stats(obj_id, center, inroi, step_dist):
    st = stats.setdefault(int(obj_id), dict(
        last_center=None, dist_sum=0.0, frames=0, frames_in_roi=0
    ))
    st["frames"] += 1
    if inroi:
        st["frames_in_roi"] += 1
    if step_dist is not None:
        st["dist_sum"] += float(step_dist)
    st["last_center"] = center

# ----- основной цикл: трекинг + визуализация -----
# stream=True — экономит RAM и позволяет обрабатывать кадры по одному
for r in yolo.track(source=SRC, tracker="bytetrack.yaml", conf=0.25, iou=0.5,
                    stream=True, verbose=False):
    frame = r.orig_img.copy()
    H, W = frame.shape[:2]
    #SCALE = max(W, H) / 1920.0  # ~1.0 для 1080p, ~2.0 для 4K
    SCALE = min(max(W, H) / 1920.0, 1.9)   # мягкий апскейл, верхняя граница 1.2


    # рисуем ROI
    if ROI_POLY is not None:
        cv2.polylines(frame, [roi_poly_int], isClosed=True, color=(0,255,255), thickness=3)
    else:
        cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,255), 3)

    # читаем боксы/ID
    boxes_xyxy = r.boxes.xyxy.cpu().numpy() if r.boxes.xyxy is not None else np.empty((0,4))
    ids        = r.boxes.id.cpu().numpy()   if (hasattr(r.boxes, "id") and r.boxes.id is not None) else np.array([])
    clses      = r.boxes.cls.cpu().numpy()  if r.boxes.cls is not None else np.array([])
    confs      = r.boxes.conf.cpu().numpy() if r.boxes.conf is not None else np.array([])

    # обрабатываем только людей (COCO class 0) — можно убрать фильтр при желании
    for b, oid, cls, cf in zip(boxes_xyxy, ids, clses, confs):
        if int(cls) != 0:
            continue

        # центр + мгновенная скорость (px/s)
        cx, cy = bbox_center_xyxy(b)
        has_prev = int(oid) in stats and stats[int(oid)]["last_center"] is not None
        step_dist = None
        if has_prev:
            px, py = stats[int(oid)]["last_center"]
            step_dist = np.hypot(cx - px, cy - py)    # пикс/кадр
        inst_speed = (step_dist * fps) if step_dist is not None else 0.0  # px/s

        # попадание в зону
        inside = in_zone((cx, cy))

        # обновляем статистику
        update_stats(oid, (cx, cy), inside, step_dist)

        # визуализация — более «жирные» bbox и надписи
        x1i, y1i, x2i, y2i = map(int, b)
        color = (0, 200, 0) if inside else (0, 120, 255)
        thick = max(4, int(4 * SCALE))                   # рамка чуть тоньше, но адаптивная
        cv2.rectangle(frame, (x1i, y1i), (x2i, y2i), color, thick)

        lines = [f"ID {int(oid)}",
                f"v={inst_speed:.0f} px/s",
                f"T_in={stats[int(oid)]['frames_in_roi'] / fps:.1f}s"]

        draw_metrics_label(
            frame,
            text_lines=lines,
            anchor=(x1i, max(0, y1i - int(20 * SCALE))), # плашка над bbox
            scale=SCALE * 1.9
        )

        cv2.circle(frame, (int(cx), int(cy)), max(5, int(5 * SCALE)), (255,255,255), -1)


        # точка центра
        #$cv2.circle(frame, (int(cx), int(cy)), 5, (255,255,255), -1)
        cv2.circle(frame, (int(cx), int(cy)), max(6, int(6 * SCALE)), (255, 255, 255), -1)


    writer.stdin.write(frame.tobytes())

# закрываем пайп
writer.stdin.close(); writer.wait()

# ----- сводка по объектам -----
summary = []
for oid, st in stats.items():
    if st["frames"] <= 1:
        continue
    avg_speed_pxps = (st["dist_sum"] / (st["frames"] - 1)) * fps
    time_in_zone_s = st["frames_in_roi"] / fps
    total_time_s   = st["frames"] / fps
    summary.append((oid, avg_speed_pxps, time_in_zone_s, total_time_s))

# сортируем по ID и печатаем
summary.sort(key=lambda x: x[0])
print("ID | avg_speed [px/s] | time_in_zone [s] | tracked_time [s]")
for oid, v, tin, tt in summary:
    print(f"{oid:2d} | {v:14.1f} | {tin:15.2f} | {tt:14.2f}")

print("\nВидео с оверлеем сохранено:", OUT_MP4)


ID | avg_speed [px/s] | time_in_zone [s] | tracked_time [s]
 1 |          496.6 |            0.00 |           5.47
12 |          575.7 |            0.00 |           0.27
13 |          142.6 |            0.00 |           0.23
21 |          627.3 |            0.00 |           0.10
25 |          828.9 |            0.00 |           0.80
39 |          750.0 |            0.00 |           7.07
58 |          708.4 |            0.00 |           0.07
59 |          639.5 |            0.00 |           1.13
61 |           99.9 |            0.00 |           3.80
73 |         1628.1 |            0.00 |           0.63
75 |          435.2 |            0.00 |           0.30
77 |          127.1 |            0.00 |           0.27
79 |         1046.7 |            0.00 |           0.07
100 |          269.2 |            0.00 |           0.13

Видео с оверлеем сохранено: out/metrics_yolo_bytetrack_overlay_h264.mp4


In [20]:
display(display_video_from_path(OUT_MP4))

Output hidden; open in https://colab.research.google.com to view.