In [None]:
from ultralytics import YOLOv10
import torch

# 1) Загружаем предобученный COCO-чекпоинт с HF
model = YOLOv10.from_pretrained("jameslahm/yolov10m")

device = 0 if torch.cuda.is_available() else "cpu"
print(device)


import torch
import numpy as np
import cv2

from torchreid.utils import FeatureExtractor

# ===== ReID: OSNet для людей =====

REID_PERSON_CLASS = 0  # COCO: 0 = person

osnet_device = "cuda" if torch.cuda.is_available() else "cpu"
print(osnet_device)

person_reid_extractor = FeatureExtractor(
    model_name="osnet_x1_0",
    # путь к скачанному .pt с ReID-весами (MSMT17 / Market1501 и т.д.)
    model_path=r"osnet_x1_0_msmt17.pt",
    device=osnet_device,
)
print(f"[ReID] OSNet инициализирован на {osnet_device}")



0


In [None]:
def extract_person_reid_features(frame_bgr: np.ndarray, det: dict):
    """
    frame_bgr : np.ndarray (H, W, 3), BGR
    det       : результат run_yolo10_on_frame

    Возвращает:
      - features: (N, 512) float32, L2-нормированные
      - valid_mask: (N,) bool — True только для людей с успешно посчитанным признаком
    """
    boxes = det["xyxy"]      # (N,4)
    cls   = det["cls"]       # (N,)
    N = boxes.shape[0]

    if N == 0:
        return np.zeros((0, 512), dtype=np.float32), np.zeros((0,), dtype=bool)

    H, W, _ = frame_bgr.shape

    crops = []
    idx_map = []   # индексы детекций, для которых реально считаем ReID

    # --- готовим кропы только для людей ---
    for i in range(N):
        if cls[i] != REID_PERSON_CLASS:
            continue

        x1, y1, x2, y2 = boxes[i]
        x1 = max(0, min(W - 1, int(x1)))
        y1 = max(0, min(H - 1, int(y1)))
        x2 = max(0, min(W - 1, int(x2)))
        y2 = max(0, min(H - 1, int(y2)))

        if x2 <= x1 or y2 <= y1:
            continue

        crop_bgr = frame_bgr[y1:y2, x1:x2, :]
        if crop_bgr.size == 0:
            continue

        # OSNet стандартно обучен на 256x128 (H,W)
        crop_bgr = cv2.resize(crop_bgr, (128, 256), interpolation=cv2.INTER_LINEAR)
        crop_rgb = cv2.cvtColor(crop_bgr, cv2.COLOR_BGR2RGB)

        crops.append(crop_rgb)
        idx_map.append(i)

    features = np.zeros((N, 512), dtype=np.float32)
    valid_mask = np.zeros((N,), dtype=bool)

    if len(crops) == 0:
        return features, valid_mask

    # torchreid FeatureExtractor умеет принимать список np.ndarray
    with torch.no_grad():
        feats = person_reid_extractor(crops)  # torch.Tensor или np.ndarray, (M,512)
        if isinstance(feats, torch.Tensor):
            feats = feats.cpu().numpy()

    # L2-нормировка и расклад по исходным индексам
    for j, det_idx in enumerate(idx_map):
        v = feats[j].astype(np.float32)
        norm = np.linalg.norm(v) + 1e-12
        v /= norm
        features[det_idx] = v
        valid_mask[det_idx] = True

    return features, valid_mask


In [34]:
import subprocess as sp
import json
import numpy as np

def get_video_info(video_path: str):
    """
    Возвращает (width, height, fps) для первого видеопотока.
    Требует установленного ffprobe.
    """
    cmd = [
        "ffprobe",
        "-v", "error",
        "-select_streams", "v:0",
        "-show_entries", "stream=width,height,avg_frame_rate",
        "-of", "json",
        video_path,
    ]

    result = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True, check=True)
    info = json.loads(result.stdout)
    stream = info["streams"][0]

    width = int(stream["width"])
    height = int(stream["height"])

    fps_str = stream.get("avg_frame_rate", "0/1")
    num, den = map(int, fps_str.split("/"))
    fps = num / den if den != 0 else 0.0

    return width, height, fps


In [35]:
def ffmpeg_frame_generator(video_path: str, resize_to: tuple[int, int] | None = None):
    """
    Читает видео через ffmpeg и выдаёт кадры как np.ndarray (H, W, 3) в BGR.

    :param video_path: путь к .mp4
    :param resize_to: (W, H) целевого кадра или None, чтобы оставить оригинал
    """
    orig_w, orig_h, fps = get_video_info(video_path)
    print(f"Видео: {orig_w}x{orig_h}, fps={fps:.3f}")

    if resize_to is None:
        out_w, out_h = orig_w, orig_h
        scale_filter = "scale=iw:ih"   # без изменения размера
    else:
        out_w, out_h = resize_to
        scale_filter = f"scale={out_w}:{out_h}"

    cmd = [
        "ffmpeg",
        "-hide_banner",
        "-loglevel", "error",
        "-nostdin",
        "-i", video_path,
        "-vf", scale_filter,
        "-f", "rawvideo",
        "-pix_fmt", "bgr24",
        "-"
    ]

    # Открываем ffmpeg-процесс
    proc = sp.Popen(
        cmd,
        stdout=sp.PIPE,
        stderr=sp.DEVNULL   # чтобы не забивался буфер stderr
    )

    frame_size = out_w * out_h * 3  # 3 канала BGR по 1 байту

    try:
        while True:
            # читаем ровно один кадр
            raw = proc.stdout.read(frame_size)
            if not raw or len(raw) < frame_size:
                break

            frame = np.frombuffer(raw, np.uint8).reshape((out_h, out_w, 3))
            yield frame

    finally:
        if proc.stdout is not None:
            proc.stdout.close()
        proc.wait()


In [36]:
import cv2

# Статический ROI в координатах кадра (x1, y1, x2, y2) или None для всего кадра
# Пример: вырезать центральную полосу по высоте
ROI = None
ROI = (0, 360, 1280, 720)

# Минимальная высота бокса в пикселях, чтобы бокс считался «валидным» для трекинга
MIN_BOX_HEIGHT = 24.0

# Порог уверенности для трекинга (Калман и т.д.)
TRACK_CONF_THR = 0.85


def run_yolo10_on_frame(
    frame_bgr,
    conf_thr: float = 0.25,           # порог для самой YOLO
    roi=ROI,
    min_box_height: float = MIN_BOX_HEIGHT,
    track_conf_thr: float = TRACK_CONF_THR,  # порог для трекинга
):
    """
    Запускает YOLOv10 на полном кадре и возвращает детекции.

    Возвращаемый словарь:
      - xyxy: (N, 4) — все боксы после порога по conf_thr;
      - scores: (N,) — вероятности;
      - cls: (N,) — индексы классов;
      - valid_size_mask: (N,) bool — True, если бокс достаточно высокий;
      - inside_roi_mask: (N,) bool — True, если бокс целиком внутри ROI
                                  (или все True, если ROI=None);
      - conf_track_mask: (N,) bool — True, если score >= track_conf_thr;
      - track_mask: (N,) bool — True, если бокс годен для трекинга
                                (размер + ROI + уверенность);
      - xyah: (M, 4) — (x, y, a, h) только для боксов с track_mask=True.
    """
    H, W, _ = frame_bgr.shape

    # --- 1. YOLO по всему кадру ---
    results = model.predict(
        source=frame_bgr,
        imgsz=960,
        device=device,
        half=(device != "cpu"),
        conf=conf_thr,
        verbose=False,
    )
    res = results[0]

    if res.boxes is None or len(res.boxes) == 0:
        return {
            "xyxy": np.zeros((0, 4), dtype=np.float32),
            "scores": np.zeros((0,), dtype=np.float32),
            "cls": np.zeros((0,), dtype=np.int32),
            "valid_size_mask": np.zeros((0,), dtype=bool),
            "inside_roi_mask": np.zeros((0,), dtype=bool),
            "conf_track_mask": np.zeros((0,), dtype=bool),
            "track_mask": np.zeros((0,), dtype=bool),
            "xyah": np.zeros((0, 4), dtype=np.float32),
        }

    boxes = res.boxes.xyxy.cpu().numpy().astype(np.float32)   # (N, 4)
    scores = res.boxes.conf.cpu().numpy().astype(np.float32)  # (N,)
    cls = res.boxes.cls.cpu().numpy().astype(np.int32)        # (N,)

    # Дополнительная фильтрация по conf_thr (на всякий случай)
    mask_conf = scores >= conf_thr
    boxes = boxes[mask_conf]
    scores = scores[mask_conf]
    cls = cls[mask_conf]

    if boxes.shape[0] == 0:
        return {
            "xyxy": boxes,
            "scores": scores,
            "cls": cls,
            "valid_size_mask": np.zeros((0,), dtype=bool),
            "inside_roi_mask": np.zeros((0,), dtype=bool),
            "conf_track_mask": np.zeros((0,), dtype=bool),
            "track_mask": np.zeros((0,), dtype=bool),
            "xyah": np.zeros((0, 4), dtype=np.float32),
        }

    # --- 2. Геометрия ---
    w = boxes[:, 2] - boxes[:, 0]
    h = boxes[:, 3] - boxes[:, 1]
    x = boxes[:, 0] + w / 2.0
    y = boxes[:, 1] + h / 2.0
    a = w / np.maximum(h, 1e-6)

    # --- 3. Маска по размеру ---
    valid_size_mask = h >= float(min_box_height)

    # --- 4. Маска по ROI (полностью внутри) ---
    if roi is not None:
        x1_roi, y1_roi, x2_roi, y2_roi = roi

        # ограничиваем ROI границами кадра
        x1_roi = max(0, min(W, x1_roi))
        x2_roi = max(0, min(W, x2_roi))
        y1_roi = max(0, min(H, y1_roi))
        y2_roi = max(0, min(H, y2_roi))

        if x2_roi <= x1_roi or y2_roi <= y1_roi:
            raise ValueError("ROI имеет неположительный размер после обрезки по кадру")

        inside_roi_mask = (
            (boxes[:, 0] >= x1_roi) &
            (boxes[:, 1] >= y1_roi) &
            (boxes[:, 2] <= x2_roi) &
            (boxes[:, 3] <= y2_roi)
        )
    else:
        inside_roi_mask = np.ones_like(valid_size_mask, dtype=bool)

    # --- 5. Маска по уверенности для трекинга ---
    conf_track_mask = scores >= float(track_conf_thr)

    # --- 6. Итоговая маска для трекинга ---
    track_mask = valid_size_mask & inside_roi_mask & conf_track_mask

    # xyah только для тех, кто пойдёт в трекер
    if np.any(track_mask):
        xyah = np.stack(
            [x[track_mask], y[track_mask], a[track_mask], h[track_mask]],
            axis=1,
        ).astype(np.float32)
    else:
        xyah = np.zeros((0, 4), dtype=np.float32)

    return {
        "xyxy": boxes,
        "scores": scores,
        "cls": cls,
        "valid_size_mask": valid_size_mask,
        "inside_roi_mask": inside_roi_mask,
        "conf_track_mask": conf_track_mask,
        "track_mask": track_mask,
        "xyah": xyah,
    }



# ---------- Отрисовка детекций на кадре ----------

def draw_detections(frame_bgr: np.ndarray, det, roi=ROI):
    """
    Рисует:
      - полупрозрачный бирюзовый ROI (если задан),
      - все детекции:
          * красный  — слишком маленький по размеру (valid_size_mask=False);
          * оранжевый — размер норм, но conf < TRACK_CONF_THR (conf_track_mask=False);
          * зелёный — годный для трекинга (track_mask=True);
          * синий — размер норм, conf >= TRACK_CONF_THR, но вне ROI.

    Толщина линий = 1, fontScale = 0.42.
    """
    img = frame_bgr.copy()

    # --- 1. Полупрозрачный ROI ---
    if roi is not None:
        x1_roi, y1_roi, x2_roi, y2_roi = roi
        H, W, _ = img.shape

        x1_roi = max(0, min(W, x1_roi))
        x2_roi = max(0, min(W, x2_roi))
        y1_roi = max(0, min(H, y1_roi))
        y2_roi = max(0, min(H, y2_roi))

        if x2_roi > x1_roi and y2_roi > y1_roi:
            overlay = img.copy()
            roi_color = (255, 255, 0)  # бирюзовый-ish в BGR
            cv2.rectangle(
                overlay,
                (x1_roi, y1_roi),
                (x2_roi, y2_roi),
                roi_color,
                thickness=-1,
            )
            alpha = 0.2
            img = cv2.addWeighted(overlay, alpha, img, 1 - alpha, 0)

    # --- 2. Отрисовка боксов ---
    boxes = det["xyxy"]
    scores = det["scores"]
    cls = det["cls"]
    valid_size_mask = det.get("valid_size_mask", np.ones_like(scores, dtype=bool))
    inside_roi_mask = det.get("inside_roi_mask", np.ones_like(scores, dtype=bool))
    conf_track_mask = det.get("conf_track_mask", np.ones_like(scores, dtype=bool))
    track_mask = det.get("track_mask", valid_size_mask & inside_roi_mask & conf_track_mask)

    names = getattr(model, "names", None)

    for i, ((x1, y1, x2, y2), sc, cl) in enumerate(zip(boxes, scores, cls)):
        is_valid_size = bool(valid_size_mask[i])
        is_inside_roi = bool(inside_roi_mask[i])
        is_conf_ok = bool(conf_track_mask[i])
        is_track = bool(track_mask[i])

        # Цвета в BGR
        red = (0, 0, 255)
        green = (0, 160, 0)
        blue = (255, 0, 0)
        orange = (0, 165, 255)

        if not is_valid_size:
            color = red               # маленький
        elif not is_conf_ok:
            color = orange            # размер норм, но уверенность < TRACK_CONF_THR
        elif is_track:
            color = green             # годен для трекинга (в ROI + conf + размер)
        else:
            color = blue              # размер и conf ок, но вне ROI

        p1 = (int(x1), int(y1))
        p2 = (int(x2), int(y2))
        cv2.rectangle(img, p1, p2, color, 1)  # толщина 1

        # Подпись: имя класса + score
        if names is not None and int(cl) in names:
            cls_name = names[int(cl)]
        else:
            cls_name = str(int(cl))

        label = f"{cls_name} {sc:.2f}"
        cv2.putText(
            img,
            label,
            (int(x1), int(y1) - 5),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.42,     # fontScale
            color,
            1,        # толщина шрифта
            cv2.LINE_AA,
        )

    return img


# ---------- ffmpeg writer для выхода ----------

def create_ffmpeg_writer(output_path: str, width: int, height: int, fps: float):
    """
    Создаёт ffmpeg-процесс, принимающий сырые BGR кадры (rawvideo) на stdin
    и записывающий mp4 (H.264).
    """
    cmd = [
        "ffmpeg",
        "-y",
        "-loglevel", "error",          # <− чтобы ffmpeg почти ничего не писал
        "-f", "rawvideo",
        "-pix_fmt", "bgr24",
        "-s", f"{width}x{height}",
        "-r", f"{fps}",
        "-i", "-",
        "-an",
        "-c:v", "libx264",
        "-pix_fmt", "yuv420p",
        output_path,
    ]

    # Критично: не PIPE, а DEVNULL (или вообще не перенаправлять stderr)
    proc = sp.Popen(cmd, stdin=sp.PIPE, stderr=sp.DEVNULL)
    return proc


In [37]:
# ---------- Основной цикл: читаем → детектим → рисуем → пишем ----------

video_path = r"vid.mp4"
output_path_boxes = r"vid_with_boxes.mp4"

orig_w, orig_h, fps = get_video_info(video_path)
print(f"Исходное видео: {orig_w}x{orig_h}, fps={fps:.3f}")

# Для корректного letterbox лучше оставить оригинальное разрешение,
# YOLO сам приведёт к imgsz=960 внутри. Поэтому resize_to=None.
gen = ffmpeg_frame_generator(video_path, resize_to=None)

writer_proc = create_ffmpeg_writer(output_path_boxes, orig_w, orig_h, fps)

try:
    for idx, frame in enumerate(gen):
        det = run_yolo10_on_frame(frame, conf_thr=0.25)  # порог можно менять
        frame_drawn = draw_detections(frame, det)

        # запись кадра в ffmpeg writer
        writer_proc.stdin.write(frame_drawn.tobytes())

        if idx % 50 == 0:
            print(f"Обработан кадр {idx}")
finally:
    if writer_proc.stdin is not None:
        writer_proc.stdin.close()
    writer_proc.wait()
    print("Готово, выходное видео:", output_path_boxes)


Исходное видео: 1280x720, fps=25.000
Видео: 1280x720, fps=25.000
Обработан кадр 0
Обработан кадр 50
Обработан кадр 100
Обработан кадр 150
Обработан кадр 200
Обработан кадр 250
Обработан кадр 300
Обработан кадр 350
Обработан кадр 400
Обработан кадр 450
Обработан кадр 500
Обработан кадр 550
Обработан кадр 600
Обработан кадр 650
Обработан кадр 700
Обработан кадр 750
Обработан кадр 800
Обработан кадр 850
Обработан кадр 900
Обработан кадр 950
Обработан кадр 1000
Обработан кадр 1050
Обработан кадр 1100
Обработан кадр 1150
Готово, выходное видео: vid_with_boxes.mp4


In [38]:
import numpy as np

# Размерности для удобства
STATE_DIM = 7   # [x, y, a, h, vx, vy, va]
MEAS_DIM  = 4   # [x, y, a, h]


def xyxy_to_xyah(boxes_xyxy: np.ndarray) -> np.ndarray:
    """
    boxes_xyxy: (N, 4) [x1, y1, x2, y2]
    → (N, 4) [x, y, a, h], где:
      x, y — центр,
      a    — aspect ratio = w / h,
      h    — высота.
    """
    boxes_xyxy = np.asarray(boxes_xyxy, dtype=np.float32)
    if boxes_xyxy.size == 0:
        return np.zeros((0, 4), dtype=np.float32)

    x1 = boxes_xyxy[:, 0]
    y1 = boxes_xyxy[:, 1]
    x2 = boxes_xyxy[:, 2]
    y2 = boxes_xyxy[:, 3]

    w = x2 - x1
    h = y2 - y1
    x = x1 + 0.5 * w
    y = y1 + 0.5 * h
    a = w / np.maximum(h, 1e-6)

    xyah = np.stack([x, y, a, h], axis=1).astype(np.float32)
    return xyah


def xyah_to_xyxy(boxes_xyah: np.ndarray) -> np.ndarray:
    """
    boxes_xyah: (N, 4) [x, y, a, h]
    → (N, 4) [x1, y1, x2, y2].
    """
    boxes_xyah = np.asarray(boxes_xyah, dtype=np.float32)
    if boxes_xyah.size == 0:
        return np.zeros((0, 4), dtype=np.float32)

    x = boxes_xyah[:, 0]
    y = boxes_xyah[:, 1]
    a = boxes_xyah[:, 2]
    h = boxes_xyah[:, 3]

    w = a * h
    x1 = x - 0.5 * w
    x2 = x + 0.5 * w
    y1 = y - 0.5 * h
    y2 = y + 0.5 * h

    xyxy = np.stack([x1, y1, x2, y2], axis=1).astype(np.float32)
    return xyxy


In [39]:
def make_F(dt: float) -> np.ndarray:
    """
    Матрица перехода состояния F для [x, y, a, h, vx, vy, va].
    """
    F = np.eye(STATE_DIM, dtype=np.float32)
    F[0, 4] = dt  # x += vx * dt
    F[1, 5] = dt  # y += vy * dt
    F[2, 6] = dt  # a += va * dt
    # h считаем квазипостоянной (нет скорости)
    return F


def make_H() -> np.ndarray:
    """
    Матрица измерения H: берём только [x, y, a, h] из состояния.
    """
    H = np.zeros((MEAS_DIM, STATE_DIM), dtype=np.float32)
    H[0, 0] = 1.0  # x
    H[1, 1] = 1.0  # y
    H[2, 2] = 1.0  # a
    H[3, 3] = 1.0  # h
    return H


def make_R() -> np.ndarray:
    """
    Матрица ковариации измерения R (шум детектора).
    Простейшая диагональная настройка:

      - x,y   — базовый шум (1.0)
      - a,h   — считаем более шумными (10×)
    """
    R = np.eye(MEAS_DIM, dtype=np.float32)
    R[2, 2] *= 10.0
    R[3, 3] *= 10.0
    return R


def make_Q() -> np.ndarray:
    """
    Матрица процессного шума Q.
    Близко к настройкам SORT/OC-SORT:

      - положение/форма (первые 4) — единичный шум;
      - скорости (последние 3)     — гораздо меньший шум (0.01),
        чтобы они не разъезжались слишком быстро.

    Для начала этого достаточно; позже можно будет отдельно
    тюнить под конкретные сцены.
    """
    Q = np.eye(STATE_DIM, dtype=np.float32)
    Q[4:, 4:] *= 0.01  # скорости
    return Q


def make_initial_P() -> np.ndarray:
    """
    Начальная ковариация P для нового трека.

    Аналогично SORT/OC-SORT:
      - по [x, y, a, h] неопределённость умеренная;
      - по скоростям огромная (1000×), т.к. мы их не наблюдаем напрямую,
        только выводим из серии измерений.
    Всё дополнительно масштабируем ×10 (как в SORT).
    """
    P = np.eye(STATE_DIM, dtype=np.float32)
    P[4:, 4:] *= 1000.0  # большие неопределённости по скоростям
    P *= 10.0
    return P


In [40]:
class KalmanFilterXYAH:
    """
    Калман-фильтр в формате OC-SORT / DeepSORT для состояния:
      x = [x, y, a, h, vx, vy, va]^T.

    dt задаётся снаружи (обычно dt = 1 / FPS видео).
    """

    def __init__(self, dt: float):
        self.dt = float(dt)

        self.dim_x = STATE_DIM
        self.dim_z = MEAS_DIM

        # Состояние и ковариации
        self.x = np.zeros(self.dim_x, dtype=np.float32)          # вектор состояния
        self.P = make_initial_P()                                # ковариация

        # Матричная структура
        self.F = make_F(self.dt)                                 # переход
        self.H = make_H()                                        # измерение
        self.Q = make_Q()                                        # процессный шум
        self.R = make_R()                                        # шум измерения

        # Булевый флаг: инициализирован ли фильтр
        self.initialized = False

    # --- служебные методы ---

    def _ensure_shape_meas(self, z_xyah: np.ndarray) -> np.ndarray:
        z = np.asarray(z_xyah, dtype=np.float32).reshape(-1)
        if z.shape[0] != self.dim_z:
            raise ValueError(f"Ожидается измерение размерности {self.dim_z}, "
                             f"получено {z.shape[0]}")
        return z

    # --- публичный интерфейс ---

    def initiate(self, meas_xyah: np.ndarray):
        """
        Инициализация фильтра по первой детекции [x, y, a, h].

        Скорости (vx, vy, va) выставляются в 0, а неопределённость по ним
        уже заложена в P (очень большая).
        """
        z = self._ensure_shape_meas(meas_xyah)
        self.x = np.zeros(self.dim_x, dtype=np.float32)
        self.x[:4] = z
        self.P = make_initial_P()
        self.initialized = True

    def predict(self) -> np.ndarray:
        """
        Шаг предсказания:
          x' = F x
          P' = F P F^T + Q

        Возвращает предсказанное измерение в формате [x, y, a, h].
        """
        if not self.initialized:
            raise RuntimeError("KalmanFilterXYAH: вызван predict() до initiate()")

        # x' = F x
        self.x = self.F @ self.x
        # P' = F P F^T + Q
        self.P = self.F @ self.P @ self.F.T + self.Q

        # возвращаем только измеряемую часть
        return self.x[:4].copy()

    def project(self) -> tuple[np.ndarray, np.ndarray]:
        """
        Проецирует текущее состояние в пространство измерений:
          z_pred = H x
          S      = H P H^T + R

        Это будет нужно для Махаланобис-гейтинга на шаге 3.
        """
        if not self.initialized:
            raise RuntimeError("KalmanFilterXYAH: вызван project() до initiate()")

        z_pred = self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        return z_pred, S

    def update(self, meas_xyah: np.ndarray):
        """
        Шаг коррекции (обновления) по новому измерению z = [x, y, a, h].

        Используется на шаге 4 после ассоциации детекций и треков.
        """
        if not self.initialized:
            raise RuntimeError("KalmanFilterXYAH: вызван update() до initiate()")

        z = self._ensure_shape_meas(meas_xyah)

        # Инновация: y = z - H x'
        z_pred, S = self.project()
        y = z - z_pred

        # Калмановский коэффициент: K = P H^T S^{-1}
        K = self.P @ self.H.T @ np.linalg.inv(S)

        # Обновление состояния: x = x' + K y
        self.x = self.x + K @ y

        # Обновление ковариации: P = (I - K H) P
        I = np.eye(self.dim_x, dtype=np.float32)
        self.P = (I - K @ self.H) @ self.P

        # для удобства вернуть обновлённое "измерение"
        return self.x[:4].copy()


In [41]:
import numpy as np

# Пытаемся использовать SciPy, при отсутствии — простой жадный fallback
try:
    from scipy.optimize import linear_sum_assignment as _linear_sum_assignment
    SCIPY_AVAILABLE = True
except Exception:
    SCIPY_AVAILABLE = False
    _linear_sum_assignment = None


def bbox_iou_matrix(boxes1_xyxy: np.ndarray, boxes2_xyxy: np.ndarray) -> np.ndarray:
    """
    Вычисляет IoU для всех пар боксов из boxes1 и boxes2.

    boxes1_xyxy: (T, 4) [x1, y1, x2, y2]
    boxes2_xyxy: (N, 4) [x1, y1, x2, y2]
    → (T, N) матрица IoU.
    """
    boxes1 = np.asarray(boxes1_xyxy, dtype=np.float32)
    boxes2 = np.asarray(boxes2_xyxy, dtype=np.float32)
    T = boxes1.shape[0]
    N = boxes2.shape[0]
    if T == 0 or N == 0:
        return np.zeros((T, N), dtype=np.float32)

    x11 = boxes1[:, 0:1]
    y11 = boxes1[:, 1:2]
    x12 = boxes1[:, 2:3]
    y12 = boxes1[:, 3:4]

    x21 = boxes2[:, 0]
    y21 = boxes2[:, 1]
    x22 = boxes2[:, 2]
    y22 = boxes2[:, 3]

    inter_x1 = np.maximum(x11, x21)  # (T, N)
    inter_y1 = np.maximum(y11, y21)
    inter_x2 = np.minimum(x12, x22)
    inter_y2 = np.minimum(y12, y22)

    inter_w = np.clip(inter_x2 - inter_x1, a_min=0.0, a_max=None)
    inter_h = np.clip(inter_y2 - inter_y1, a_min=0.0, a_max=None)
    inter_area = inter_w * inter_h

    area1 = (x12 - x11) * (y12 - y11)  # (T, 1)
    area2 = (x22 - x21) * (y22 - y21)  # (N,)

    union_area = area1 + area2 - inter_area  # broadcasting → (T, N)
    iou = np.where(union_area > 0.0, inter_area / union_area, 0.0).astype(np.float32)
    return iou


def mahalanobis_gating_matrix(
    tracks_xyah: np.ndarray,
    tracks_S: np.ndarray,
    dets_xyah: np.ndarray
) -> np.ndarray:
    """
    Махаланобисовы расстояния для всех пар (трек, детекция).

    tracks_xyah: (T, 4)  — предсказанное измерение трека [x, y, a, h]
    tracks_S:    (T, 4, 4) — ковариация измерения S = H P H^T + R
    dets_xyah:   (N, 4)  — измерения детекций [x, y, a, h]

    Возвращает матрицу d^2 (T, N), где d^2_ij — расстояние от трека i до детекции j.
    """
    tracks_xyah = np.asarray(tracks_xyah, dtype=np.float32)
    dets_xyah = np.asarray(dets_xyah, dtype=np.float32)
    S = np.asarray(tracks_S, dtype=np.float32)

    T = tracks_xyah.shape[0]
    N = dets_xyah.shape[0]
    if T == 0 or N == 0:
        return np.zeros((T, N), dtype=np.float32)

    # Инвертируем ковариации треков
    S_inv = np.linalg.inv(S)  # (T, 4, 4)

    # diff[t, n, d] = z_det[n, d] - z_pred[t, d]
    diff = dets_xyah[None, :, :] - tracks_xyah[:, None, :]  # (T, N, 4)

    # temp[t, n, k] = Σ_d diff[t,n,d] * S_inv[t,d,k]
    temp = np.einsum("tnd,tdk->tnk", diff, S_inv)
    # d2[t, n]      = Σ_k temp[t,n,k] * diff[t,n,k]
    d2 = np.einsum("tnk,tnk->tn", temp, diff)
    return d2.astype(np.float32)


def linear_assignment(cost_matrix: np.ndarray, cost_limit: float) -> np.ndarray:
    """
    Обёртка над Венгерским алгоритмом.

    cost_matrix: (T, N)  — матрица стоимостей (меньше = лучше)
    cost_limit:  максимальная допустимая стоимость для матча.
                 Если стоимость > cost_limit — пара считается нематчируемой.

    Возвращает массив shape (K, 2), пары индексов (row_idx, col_idx).
    """
    cost = np.asarray(cost_matrix, dtype=np.float32)
    T, N = cost.shape
    if T == 0 or N == 0:
        return np.zeros((0, 2), dtype=np.int32)

    if SCIPY_AVAILABLE:
        row_ind, col_ind = _linear_sum_assignment(cost)
        matches = []
        for r, c in zip(row_ind, col_ind):
            if cost[r, c] <= cost_limit:
                matches.append((int(r), int(c)))
        if not matches:
            return np.zeros((0, 2), dtype=np.int32)
        return np.asarray(matches, dtype=np.int32)

    # Fallback: простой жадный выбор минимальных стоимостей
    matches = []
    used_rows = set()
    used_cols = set()
    while True:
        best_val = float("inf")
        best_r = best_c = None
        for r in range(T):
            if r in used_rows:
                continue
            for c in range(N):
                if c in used_cols:
                    continue
                v = cost[r, c]
                if v < best_val:
                    best_val = v
                    best_r, best_c = r, c
        if best_r is None or best_val > cost_limit:
            break
        matches.append((best_r, best_c))
        used_rows.add(best_r)
        used_cols.add(best_c)

    if not matches:
        return np.zeros((0, 2), dtype=np.int32)
    return np.asarray(matches, dtype=np.int32)


In [42]:
def byte_maha_associate(
    tracks_xyah: np.ndarray,      # (T, 4)  предсказанное измерение трека [x, y, a, h]
    tracks_S: np.ndarray,         # (T, 4, 4) ковариация S каждого трека
    dets_xyxy: np.ndarray,        # (N, 4)  боксы детекций [x1,y1,x2,y2]
    dets_xyah: np.ndarray,        # (N, 4)  измерения детекций [x, y, a, h]
    dets_scores: np.ndarray,      # (N,)    score детекций
    valid_size_mask: np.ndarray,  # (N,) bool — достаточная высота бокса
    inside_roi_mask: np.ndarray,  # (N,) bool — бокс целиком внутри ROI
    *,
    thr_high: float = 0.5,
    thr_low: float = 0.1,
    iou_thresh_high: float = 0.1,
    iou_thresh_low: float = 0.1,
    gate_thresh: float = 9.4877,  # χ²_0.99(dof=4)
    alpha: float = 0.5,
    beta: float = 0.5,
    appearance_cost_high: np.ndarray | None = None,  # (T, N) или None
    track_classes: np.ndarray | None = None,         # (T,)  или None
    det_classes: np.ndarray | None = None,           # (N,)  или None
):
    """
    Ассоциация по схеме:
      A) High-conf: IoU + Махаланобис-гейтинг + Венгерский;
      B) Остаток (tracks × high-conf dets): стоимость α·(1−IoU)+β·d_app;
      C) ByteTrack: оставшиеся треки × low-conf dets по IoU (с гейтингом).

    Важно:
      - На вход подаются ВСЕ детекции текущего кадра;
      - Каких именно детекций мы касаемся, определяют:
           valid_size_mask, inside_roi_mask, thr_high, thr_low.

    Возвращает словарь:
      - matches_high: (K1, 2) пары (track_idx, det_idx) для high-conf (A+B);
      - matches_low:  (K2, 2) пары (track_idx, det_idx) для low-conf (C);
      - unmatched_tracks:        (T_u,) индексы треков без матчей;
      - unmatched_dets_high:     (Nh_u,) индексы high-conf детекций без матчей;
      - unmatched_dets_low:      (Nl_u,) индексы low-conf детекций без матчей.
    """
    tracks_xyah = np.asarray(tracks_xyah, dtype=np.float32)
    tracks_S = np.asarray(tracks_S, dtype=np.float32)
    dets_xyxy = np.asarray(dets_xyxy, dtype=np.float32)
    dets_xyah = np.asarray(dets_xyah, dtype=np.float32)
    dets_scores = np.asarray(dets_scores, dtype=np.float32)
    valid_size_mask = np.asarray(valid_size_mask, dtype=bool)
    inside_roi_mask = np.asarray(inside_roi_mask, dtype=bool)

    T = tracks_xyah.shape[0]
    N = dets_xyxy.shape[0]
    assert dets_xyah.shape[0] == N == dets_scores.shape[0]

    # --- 0. Если треков нет, ничего не ассоциируем ---
    if T == 0:
        # Разбиваем детекции просто по порогам
        det_valid = valid_size_mask & inside_roi_mask & (dets_scores >= thr_low)
        high_mask = det_valid & (dets_scores >= thr_high)
        low_mask = det_valid & (dets_scores < thr_high)

        det_ids_high = np.where(high_mask)[0]
        det_ids_low = np.where(low_mask)[0]

        return {
            "matches_high": np.zeros((0, 2), dtype=np.int32),
            "matches_low": np.zeros((0, 2), dtype=np.int32),
            "unmatched_tracks": np.zeros((0,), dtype=np.int32),
            "unmatched_dets_high": det_ids_high,
            "unmatched_dets_low": det_ids_low,
        }

    # --- 1. Разделяем детекции по ByteTrack-порогам и ROI/размеру ---

    det_valid = valid_size_mask & inside_roi_mask & (dets_scores >= thr_low)
    high_mask = det_valid & (dets_scores >= thr_high)
    low_mask = det_valid & (dets_scores < thr_high)

    det_ids_high = np.where(high_mask)[0]
    det_ids_low = np.where(low_mask)[0]

    # Для удобства
    all_track_ids = np.arange(T, dtype=np.int32)

    # Если нет ни одной валидной детекции
    if det_ids_high.size == 0 and det_ids_low.size == 0:
        return {
            "matches_high": np.zeros((0, 2), dtype=np.int32),
            "matches_low": np.zeros((0, 2), dtype=np.int32),
            "unmatched_tracks": all_track_ids,
            "unmatched_dets_high": det_ids_high,
            "unmatched_dets_low": det_ids_low,
        }

    # Для всех треков заранее посчитаем bbox-форму (XYXY) из XYAH
    def xyah_to_xyxy(boxes_xyah: np.ndarray) -> np.ndarray:
        boxes_xyah = np.asarray(boxes_xyah, dtype=np.float32)
        if boxes_xyah.size == 0:
            return np.zeros((0, 4), dtype=np.float32)
        x = boxes_xyah[:, 0]
        y = boxes_xyah[:, 1]
        a = boxes_xyah[:, 2]
        h = boxes_xyah[:, 3]
        w = a * h
        x1 = x - 0.5 * w
        x2 = x + 0.5 * w
        y1 = y - 0.5 * h
        y2 = y + 0.5 * h
        return np.stack([x1, y1, x2, y2], axis=1).astype(np.float32)

    tracks_xyxy = xyah_to_xyxy(tracks_xyah)

    INF = 1e9

    # ===================================================================
    #  A) High-conf: IoU + Махаланобис-гейтинг
    # ===================================================================

    matches_high_list = []

    matched_tracks_mask = np.zeros(T, dtype=bool)
    matched_high_mask = np.zeros(det_ids_high.shape[0], dtype=bool)

    if det_ids_high.size > 0:
        dets_xyxy_high = dets_xyxy[det_ids_high]
        dets_xyah_high = dets_xyah[det_ids_high]

        # IoU и Махаланобис
        iou_high = bbox_iou_matrix(tracks_xyxy, dets_xyxy_high)  # (T, Nh)
        maha_high = mahalanobis_gating_matrix(tracks_xyah, tracks_S, dets_xyah_high)

        valid_pair = (maha_high <= gate_thresh) & (iou_high >= iou_thresh_high)

        cost_high = 1.0 - iou_high
        cost_high[~valid_pair] = INF

        # Классовые ограничения (если заданы)
        if track_classes is not None and det_classes is not None:
            track_classes = np.asarray(track_classes)
            det_classes_high = np.asarray(det_classes)[det_ids_high]
            class_mismatch = (track_classes[:, None] != det_classes_high[None, :])
            cost_high[class_mismatch] = INF

        # Ограничиваем стоимость реальных матчей диапазоном [0, 1]
        matches_stage1_local = linear_assignment(cost_high, cost_limit=1.0)

        for t_local, d_local in matches_stage1_local:
            matched_tracks_mask[t_local] = True
            matched_high_mask[d_local] = True
            t_global = all_track_ids[t_local]
            d_global = det_ids_high[d_local]
            matches_high_list.append((t_global, d_global))

    # --- треки и high-dets, оставшиеся после шага A ---
    unmatched_tracks_A = all_track_ids[~matched_tracks_mask]
    unmatched_high_ids_A = det_ids_high[~matched_high_mask]

    # ===================================================================
    #  B) Остаток (tracks × high-conf) с cost = α·(1−IoU)+β·d_app
    # ===================================================================

    if unmatched_tracks_A.size > 0 and unmatched_high_ids_A.size > 0:
        t_ids_B = unmatched_tracks_A
        d_ids_B = unmatched_high_ids_A

        tracks_xyah_B = tracks_xyah[t_ids_B]
        tracks_xyxy_B = xyah_to_xyxy(tracks_xyah_B)
        tracks_S_B = tracks_S[t_ids_B]

        dets_xyxy_B = dets_xyxy[d_ids_B]
        dets_xyah_B = dets_xyah[d_ids_B]

        iou_B = bbox_iou_matrix(tracks_xyxy_B, dets_xyxy_B)
        maha_B = mahalanobis_gating_matrix(tracks_xyah_B, tracks_S_B, dets_xyah_B)

        valid_pair_B = (maha_B <= gate_thresh) & (iou_B >= iou_thresh_high)

        # appearance cost, если задана полная матрица (T, N)
        if appearance_cost_high is not None:
            appearance_cost_high = np.asarray(appearance_cost_high, dtype=np.float32)
            d_app_B = appearance_cost_high[np.ix_(t_ids_B, d_ids_B)]
        else:
            d_app_B = np.zeros_like(iou_B, dtype=np.float32)

        cost_B = alpha * (1.0 - iou_B) + beta * d_app_B
        cost_B[~valid_pair_B] = INF

        # Классовые ограничения (если заданы)
        if track_classes is not None and det_classes is not None:
            track_classes_B = np.asarray(track_classes)[t_ids_B]
            det_classes_B = np.asarray(det_classes)[d_ids_B]
            class_mismatch_B = (track_classes_B[:, None] != det_classes_B[None, :])
            cost_B[class_mismatch_B] = INF

        matches_stage2_local = linear_assignment(cost_B, cost_limit=INF - 1.0)

        for idx in range(matches_stage2_local.shape[0]):
            t_loc_B, d_loc_B = matches_stage2_local[idx]
            t_global = t_ids_B[t_loc_B]
            d_global = d_ids_B[d_loc_B]

            # помечаем как заматченные
            matched_tracks_mask[t_global] = True

            # найдём позицию d_global в det_ids_high, чтобы отметить matched_high_mask
            # (unmatched_high_ids_A — подмножество det_ids_high)
            high_pos = np.where(det_ids_high == d_global)[0]
            if high_pos.size > 0:
                matched_high_mask[high_pos[0]] = True

            matches_high_list.append((t_global, d_global))

    # --- high-conf детекции, не использованные ни в A, ни в B ---
    unmatched_high_ids_final = det_ids_high[~matched_high_mask]

    # ===================================================================
    #  C) ByteTrack: оставшиеся треки × low-conf детекции
    # ===================================================================

    matches_low_list = []
    matched_low_mask = np.zeros(det_ids_low.shape[0], dtype=bool)

    unmatched_tracks_after_AB = all_track_ids[~matched_tracks_mask]

    if unmatched_tracks_after_AB.size > 0 and det_ids_low.size > 0:
        t_ids_C = unmatched_tracks_after_AB
        d_ids_C = det_ids_low

        tracks_xyah_C = tracks_xyah[t_ids_C]
        tracks_xyxy_C = xyah_to_xyxy(tracks_xyah_C)
        tracks_S_C = tracks_S[t_ids_C]

        dets_xyxy_C = dets_xyxy[d_ids_C]
        dets_xyah_C = dets_xyah[d_ids_C]

        iou_C = bbox_iou_matrix(tracks_xyxy_C, dets_xyxy_C)
        maha_C = mahalanobis_gating_matrix(tracks_xyah_C, tracks_S_C, dets_xyah_C)

        valid_pair_C = (maha_C <= gate_thresh) & (iou_C >= iou_thresh_low)

        cost_C = 1.0 - iou_C
        cost_C[~valid_pair_C] = INF

        # Классовые ограничения (если заданы)
        if track_classes is not None and det_classes is not None:
            track_classes_C = np.asarray(track_classes)[t_ids_C]
            det_classes_C = np.asarray(det_classes)[d_ids_C]
            class_mismatch_C = (track_classes_C[:, None] != det_classes_C[None, :])
            cost_C[class_mismatch_C] = INF

        matches_stage3_local = linear_assignment(cost_C, cost_limit=1.0)

        for idx in range(matches_stage3_local.shape[0]):
            t_loc_C, d_loc_C = matches_stage3_local[idx]
            t_global = t_ids_C[t_loc_C]
            d_global = d_ids_C[d_loc_C]

            matches_low_list.append((t_global, d_global))

            # отметим конкретный low-conf детектор как использованный
            low_pos = np.where(det_ids_low == d_global)[0]
            if low_pos.size > 0:
                matched_low_mask[low_pos[0]] = True

    # ===================================================================
    #  Итоговые множества unmatched
    # ===================================================================

    matches_high = (
        np.asarray(matches_high_list, dtype=np.int32)
        if matches_high_list else np.zeros((0, 2), dtype=np.int32)
    )
    matches_low = (
        np.asarray(matches_low_list, dtype=np.int32)
        if matches_low_list else np.zeros((0, 2), dtype=np.int32)
    )

    # треки, которые не получили ни high-, ни low-match
    if matches_high.shape[0] > 0:
        matched_tracks_high = np.unique(matches_high[:, 0])
    else:
        matched_tracks_high = np.zeros((0,), dtype=np.int32)

    if matches_low.shape[0] > 0:
        matched_tracks_low = np.unique(matches_low[:, 0])
    else:
        matched_tracks_low = np.zeros((0,), dtype=np.int32)

    matched_tracks_all = np.union1d(matched_tracks_high, matched_tracks_low)
    unmatched_tracks = np.setdiff1d(all_track_ids, matched_tracks_all, assume_unique=True)

    # unmatched dets (high/low)
    unmatched_dets_high = unmatched_high_ids_final
    unmatched_dets_low = det_ids_low[~matched_low_mask]

    return {
        "matches_high": matches_high,
        "matches_low": matches_low,
        "unmatched_tracks": unmatched_tracks,
        "unmatched_dets_high": unmatched_dets_high,
        "unmatched_dets_low": unmatched_dets_low,
    }


In [43]:
import numpy as np

# Классы, для которых мы используем appearance (ReID)
# COCO: 0 = person, 2 = car, 7 = truck и т.п.
REID_ENABLED_CLASSES = {0}  # пока только person

# Сколько последних эмбеддингов учитывать на треке
REID_MAX_FEATURES = 20


In [None]:
from dataclasses import dataclass, field
from enum import IntEnum
import numpy as np

class TrackState(IntEnum):
    TENTATIVE = 0
    CONFIRMED = 1
    LOST = 2
    REMOVED = 3


@dataclass
class Track:
    track_id: int
    kf: "KalmanFilterXYAH"
    class_id: int
    n_init: int
    max_time_lost: int

    state: TrackState = TrackState.TENTATIVE
    hits: int = 1
    age: int = 1
    time_since_update: int = 0
    score: float = 0.0

    last_xyah: np.ndarray = field(default_factory=lambda: np.zeros(4, dtype=np.float32))
    last_S:   np.ndarray = field(default_factory=lambda: np.eye(4, dtype=np.float32))

    trajectory_xyxy: list = field(default_factory=list)
    features: list = field(default_factory=list)  # список np.ndarray (appearance-фичи)

    # --- состояния ---

    def is_tentative(self) -> bool:
        return self.state == TrackState.TENTATIVE

    def is_confirmed(self) -> bool:
        return self.state == TrackState.CONFIRMED

    def is_lost(self) -> bool:
        return self.state == TrackState.LOST

    def is_removed(self) -> bool:
        return self.state == TrackState.REMOVED

    # --- инициализация ---

    def initiate_from_detection(self, meas_xyah: np.ndarray, score: float, feature: np.ndarray | None = None):
        self.kf.initiate(meas_xyah)
        z_pred, S = self.kf.project()
        self.last_xyah = z_pred
        self.last_S = S
        self.score = float(score)
        self.age = 1
        self.hits = 1
        self.time_since_update = 0

        xyxy = xyah_to_xyxy(self.last_xyah[None, :])[0]
        self.trajectory_xyxy.append(xyxy)

        if feature is not None:
            self.features.append(feature)

    # --- предсказание ---

    def predict(self):
        if not self.kf.initialized:
            return

        self.kf.predict()
        z_pred, S = self.kf.project()
        self.last_xyah = z_pred
        self.last_S = S

        self.age += 1
        self.time_since_update += 1

        xyxy = xyah_to_xyxy(self.last_xyah[None, :])[0]
        self.trajectory_xyxy.append(xyxy)

    # --- обновление по детекции ---

    def update(
        self,
        meas_xyah: np.ndarray,
        score: float,
        feature: np.ndarray | None = None,
        use_kalman: bool = True,
    ):
        self.time_since_update = 0
        self.hits += 1
        self.score = float(score)

        if use_kalman:
            self.kf.update(meas_xyah)

        z_pred, S = self.kf.project()
        self.last_xyah = z_pred
        self.last_S = S

        xyxy = xyah_to_xyxy(self.last_xyah[None, :])[0]
        if self.trajectory_xyxy:
            self.trajectory_xyxy[-1] = xyxy
        else:
            self.trajectory_xyxy.append(xyxy)

        if feature is not None:
            self.features.append(feature)

        if self.state == TrackState.TENTATIVE and self.hits >= self.n_init:
            self.state = TrackState.CONFIRMED
        elif self.state == TrackState.LOST:
            self.state = TrackState.CONFIRMED

    # --- "пропуск" кадра ---

    def mark_missed(self):
        if self.state == TrackState.TENTATIVE:
            self.state = TrackState.REMOVED
        elif self.state in (TrackState.CONFIRMED, TrackState.LOST):
            if self.time_since_update > self.max_time_lost:
                self.state = TrackState.REMOVED
            else:
                self.state = TrackState.LOST

    # --- геттеры ---

    def current_xyxy(self) -> np.ndarray:
        return xyah_to_xyxy(self.last_xyah[None, :])[0]

    # --- ID-банк (усреднение последних K фич) ---

    def get_feature_centroid(self, max_k: int = REID_MAX_FEATURES) -> np.ndarray | None:
        """
        Возвращает L2-нормированный центроид последних max_k appearance-фич
        или None, если фич ещё нет.
        """
        if not self.features:
            return None

        feats = self.features[-int(max_k):]
        arr = np.stack(feats, axis=0).astype(np.float32)

        # на всякий случай ещё раз L2-нормируем каждую фичу
        norms = np.linalg.norm(arr, axis=1, keepdims=True) + 1e-12
        arr = arr / norms

        centroid = arr.mean(axis=0)
        c_norm = float(np.linalg.norm(centroid))
        if c_norm > 0.0:
            centroid /= c_norm
        return centroid
    
    


In [45]:
def compute_appearance_cost_matrix(
    tracks: list[Track],
    det_classes: np.ndarray,
    det_features: np.ndarray,
    enabled_classes: set[int] = REID_ENABLED_CLASSES,
    max_features: int = REID_MAX_FEATURES,
) -> np.ndarray:
    """
    Строит матрицу d_app (T, N) по косинусной дистанции:
      d_app = 1 - cos_sim(track_centroid, det_feature).

    Для пар (трек, детекция), где:
      - класс не в enabled_classes,
      - или классы не совпадают,
      - или у трека нет фич
    → d_app = 0 (appearance там не используется, решает motion).
    """
    if not tracks:
        return np.zeros((0, det_features.shape[0]), dtype=np.float32)

    det_classes = np.asarray(det_classes, dtype=np.int32)
    det_features = np.asarray(det_features, dtype=np.float32)

    T = len(tracks)
    N = det_features.shape[0]

    if N == 0:
        return np.zeros((T, 0), dtype=np.float32)

    # предполагаем, что det_features уже L2-нормированы (как в ColorHistAppearance)
    cost_app = np.zeros((T, N), dtype=np.float32)

    for ti, tr in enumerate(tracks):
        if tr.class_id not in enabled_classes:
            continue

        centroid = tr.get_feature_centroid(max_k=max_features)
        if centroid is None:
            continue

        # только детекции того же класса
        same_cls = (det_classes == tr.class_id)
        idxs = np.where(same_cls)[0]
        if idxs.size == 0:
            continue

        # cos_sim = <track_centroid, det_feature>
        sims = det_features[idxs] @ centroid  # (K,)
        sims = np.clip(sims, -1.0, 1.0)
        d_app = 1.0 - sims  # 0 — "очень похожи", 2 — совсем разные

        cost_app[ti, idxs] = d_app.astype(np.float32)

    return cost_app


In [46]:
class Tracker:
    """
    Трекер DeepSORT/OC-SORT-стиля с appearance:

      - Калман [x,y,a,h,vx,vy,va]
      - ByteTrack + Махаланобис + Венгерский
      - Жизненный цикл треков
      - ID-банк appearance (усреднение последних фич)
    """

    def __init__(
        self,
        fps: float,
        n_init: int = 3,
        max_time_lost: int = 30,
        thr_high: float = 0.5,
        thr_low: float = 0.1,
        iou_thresh_high: float = 0.1,
        iou_thresh_low: float = 0.1,
        gate_thresh: float = 9.4877,
        alpha: float = 0.5,
        beta: float = 0.5,
        track_conf_thr: float = 0.85,
    ):
        self.dt = 1.0 / float(fps)
        self.n_init = int(n_init)
        self.max_time_lost = int(max_time_lost)

        self.thr_high = float(thr_high)
        self.thr_low = float(thr_low)
        self.iou_thresh_high = float(iou_thresh_high)
        self.iou_thresh_low = float(iou_thresh_low)
        self.gate_thresh = float(gate_thresh)
        self.alpha = float(alpha)
        self.beta = float(beta)
        self.track_conf_thr = float(track_conf_thr)

        self.tracks: list[Track] = []
        self._next_id: int = 1

    # --- создание нового трека ---

    def _spawn_track(
        self,
        meas_xyah: np.ndarray,
        score: float,
        class_id: int,
        feature: np.ndarray | None = None,
    ):
        kf = KalmanFilterXYAH(dt=self.dt)
        tr = Track(
            track_id=self._next_id,
            kf=kf,
            class_id=int(class_id),
            n_init=self.n_init,
            max_time_lost=self.max_time_lost,
        )
        tr.initiate_from_detection(meas_xyah, score, feature=feature)
        self.tracks.append(tr)
        self._next_id += 1

    # --- основной шаг ---

    def update(
        self,
        det: dict,
        det_features: np.ndarray | None = None,
    ):
        """
        Обновление трекера по детекциям одного кадра.

        det — словарь от run_yolo10_on_frame(...):
            "xyxy", "scores", "cls",
            "valid_size_mask", "inside_roi_mask", ...

        det_features — (N, D) appearance-фичи детекций (L2-нормированные).
                       Если None, ассоциация работает только по motion/IoU.
        """
        # 1. Удаляем REMOVED-треки
        self.tracks = [t for t in self.tracks if not t.is_removed()]

        # 2. Предсказание
        for t in self.tracks:
            t.predict()

        T = len(self.tracks)

        if T > 0:
            tracks_xyah = np.stack([t.last_xyah for t in self.tracks], axis=0)
            tracks_S = np.stack([t.last_S for t in self.tracks], axis=0)
            track_classes = np.array([t.class_id for t in self.tracks], dtype=np.int32)
        else:
            tracks_xyah = np.zeros((0, 4), dtype=np.float32)
            tracks_S = np.zeros((0, 4, 4), dtype=np.float32)
            track_classes = np.zeros((0,), dtype=np.int32)

        # 3. Детекции кадра
        dets_xyxy = np.asarray(det["xyxy"], dtype=np.float32)
        scores = np.asarray(det["scores"], dtype=np.float32)
        det_classes = np.asarray(det["cls"], dtype=np.int32)
        valid_size_mask = np.asarray(det["valid_size_mask"], dtype=bool)
        inside_roi_mask = np.asarray(det["inside_roi_mask"], dtype=bool)

        N = dets_xyxy.shape[0]

        if N == 0:
            for t in self.tracks:
                t.mark_missed()
            self.tracks = [t for t in self.tracks if not t.is_removed()]
            return self.tracks

        dets_xyah = xyxy_to_xyah(dets_xyxy)

        # 4. appearance-cost (ReID) для high-conf части
        appearance_cost_high = None
        if det_features is not None and T > 0:
            appearance_cost_high = compute_appearance_cost_matrix(
                tracks=self.tracks,
                det_classes=det_classes,
                det_features=det_features,
                enabled_classes=REID_ENABLED_CLASSES,
                max_features=REID_MAX_FEATURES,
            )

        # 5. Ассоциация
        assoc = byte_maha_associate(
            tracks_xyah=tracks_xyah,
            tracks_S=tracks_S,
            dets_xyxy=dets_xyxy,
            dets_xyah=dets_xyah,
            dets_scores=scores,
            valid_size_mask=valid_size_mask,
            inside_roi_mask=inside_roi_mask,
            thr_high=self.thr_high,
            thr_low=self.thr_low,
            iou_thresh_high=self.iou_thresh_high,
            iou_thresh_low=self.iou_thresh_low,
            gate_thresh=self.gate_thresh,
            alpha=self.alpha,
            beta=self.beta,
            appearance_cost_high=appearance_cost_high,
            track_classes=track_classes,
            det_classes=det_classes,
        )

        matches_high = assoc["matches_high"]
        matches_low  = assoc["matches_low"]
        unmatched_tracks_idx = assoc["unmatched_tracks"]
        unmatched_dets_high = assoc["unmatched_dets_high"]
        unmatched_dets_low = assoc["unmatched_dets_low"]

        matched_tracks = np.zeros(T, dtype=bool)

        # 6. High-conf матчи
        for t_idx, d_idx in matches_high:
            matched_tracks[t_idx] = True
            meas_xyah = dets_xyah[d_idx]
            sc = scores[d_idx]
            feat = None
            if det_features is not None:
                feat = det_features[d_idx]

            use_kalman = (sc >= self.track_conf_thr)

            self.tracks[t_idx].update(
                meas_xyah,
                score=sc,
                feature=feat,
                use_kalman=use_kalman,
            )

        # 7. Low-conf ByteTrack-цепочка
        for t_idx, d_idx in matches_low:
            matched_tracks[t_idx] = True
            meas_xyah = dets_xyah[d_idx]
            sc = scores[d_idx]
            feat = None
            if det_features is not None:
                feat = det_features[d_idx]

            use_kalman = (sc >= self.track_conf_thr)

            self.tracks[t_idx].update(
                meas_xyah,
                score=sc,
                feature=feat,
                use_kalman=use_kalman,
            )

        # 8. Треки без матча
        for t_idx, tr in enumerate(self.tracks):
            if not matched_tracks[t_idx]:
                tr.mark_missed()

        # 9. Новые треки из unmatched high-conf детекций
        for d_idx in unmatched_dets_high:
            meas_xyah = dets_xyah[d_idx]
            sc = scores[d_idx]
            class_id = det_classes[d_idx]
            feat = None
            if det_features is not None:
                feat = det_features[d_idx]
            self._spawn_track(meas_xyah, sc, int(class_id), feature=feat)

        # 10. Чистка REMOVED
        self.tracks = [t for t in self.tracks if not t.is_removed()]
        return self.tracks


In [47]:
# Пример: инициализация трекера (после определения fps)
orig_w, orig_h, fps = get_video_info(video_path)
tracker = Tracker(
    fps=fps,
    n_init=3,
    max_time_lost=30,
    thr_high=0.5,
    thr_low=0.1,
    track_conf_thr=TRACK_CONF_THR,  # тот же, что в детекторе
)

# В основном цикле по кадрам:
# for idx, frame in enumerate(gen):
#     det = run_yolo10_on_frame(frame, conf_thr=0.25)
#     tracks = tracker.update(det)
#
#     # дальше можно рисовать треки вместо "сырых" боксов:
#     # например, только CONFIRMED:
#     # for tr in tracks:
#     #     if tr.is_confirmed():
#     #         box = tr.current_xyxy()
#     #         track_id = tr.track_id
#     #         class_id = tr.class_id
#     #         ...


In [48]:
import cv2
import numpy as np

def draw_tracks_on_frame(
    frame_bgr: np.ndarray,
    tracks: list,
    draw_tentative: bool = True,
    draw_lost: bool = False,
    traj_len: int = 30,
):
    """
    Рисует треки поверх кадра.

    Цвета по состоянию трека:
      - CONFIRMED — ярко-зелёный
      - TENTATIVE — жёлтый
      - LOST      — фиолетовый (опционально, draw_lost=True)
    Толщина линий = 1, fontScale = 0.42 (как ты просил).

    Дополнительно:
      - рисуем короткую траекторию (последние traj_len центров бокса).
    """
    img = frame_bgr.copy()

    for tr in tracks:
        if tr.is_removed():
            continue

        if tr.is_confirmed():
            color = (0, 255, 0)      # зелёный
        elif tr.is_tentative():
            if not draw_tentative:
                continue
            color = (0, 255, 255)    # жёлтый
        elif tr.is_lost():
            if not draw_lost:
                continue
            color = (255, 0, 255)    # фиолетовый
        else:
            # на всякий случай
            color = (255, 255, 255)

        # Текущий бокс
        box_xyxy = tr.current_xyxy()
        x1, y1, x2, y2 = box_xyxy
        p1 = (int(x1), int(y1))
        p2 = (int(x2), int(y2))

        # Рисуем рамку трека
        cv2.rectangle(img, p1, p2, color, 1)

        # Подпись: ID и класс
        label = f"ID {tr.track_id} | c{tr.class_id}"
        cv2.putText(
            img,
            label,
            (int(x1), int(y1) - 7),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.42,
            color,
            1,
            cv2.LINE_AA,
        )

        # Траектория (центры боксов за последние traj_len шагов)
        if tr.trajectory_xyxy:
            traj = tr.trajectory_xyxy[-traj_len:]
            pts = []
            for bx in traj:
                cx = 0.5 * (bx[0] + bx[2])
                cy = 0.5 * (bx[1] + bx[3])
                pts.append((int(cx), int(cy)))

            if len(pts) >= 2:
                pts_arr = np.array(pts, dtype=np.int32).reshape((-1, 1, 2))
                cv2.polylines(img, [pts_arr], isClosed=False, color=color, thickness=1)

    return img


In [49]:
# Путь к входному видео и выходному видео с треками
# video_path = r"vid.mp4"
# output_path_1_4 = r"output_tracks(1_4).mp4"

# # 1. Узнаём fps и размер
# orig_w, orig_h, fps = get_video_info(video_path)
# print(f"Видео: {orig_w}x{orig_h}, fps={fps:.3f}")

# # 2. Инициализируем трекер
# tracker = Tracker(
#     fps=fps,
#     n_init=3,
#     max_time_lost=30,
#     thr_high=0.5,
#     thr_low=0.1,
#     iou_thresh_high=0.3,   # было 0.1
#     iou_thresh_low=0.1,
#     gate_thresh=5.99,      # вместо 9.48
#     alpha=0.5,
#     beta=0.5,
#     track_conf_thr=TRACK_CONF_THR,
# )


# # 3. ffmpeg-генератор и writer
# gen = ffmpeg_frame_generator(video_path, resize_to=None)
# writer_proc = create_ffmpeg_writer(output_path_1_4, orig_w, orig_h, fps)

# try:
#     for idx, frame in enumerate(gen):
#         # --- Шаг 1: детекция ---
#         det = run_yolo10_on_frame(frame, conf_thr=0.25)

#         # --- Шаги 2–4: Калман + ассоциация + жизненный цикл ---
#         tracks = tracker.update(det)

#         # --- Визуализация ---
#         # Сначала нарисуем ROI и детекции (цвета по твоей логике)
#         frame_det = draw_detections(frame, det)
#         # Сверху нарисуем треки (ID, траектории)
#         frame_out = draw_tracks_on_frame(frame_det, tracks)

#         # Записываем кадр в ffmpeg
#         writer_proc.stdin.write(frame_out.tobytes())

#         if idx % 50 == 0:
#             print(f"Обработан кадр {idx}")
# finally:
#     if writer_proc.stdin is not None:
#         writer_proc.stdin.close()
#     ret = writer_proc.wait()
#     print("ffmpeg завершился с кодом", ret)
#     print("Готово, выходное видео:", output_path)


In [50]:
import cv2
import numpy as np

class ColorHistAppearance:
    """
    Очень простой appearance-признак:
      - обрезаем bbox
      - переводим в HSV
      - считаем 2D гистограмму по (H, S)
      - нормируем (L1 + L2)
    На выходе: L2-нормированный вектор длины h_bins * s_bins.
    """

    def __init__(self, h_bins: int = 16, s_bins: int = 16):
        self.h_bins = int(h_bins)
        self.s_bins = int(s_bins)
        self.dim = self.h_bins * self.s_bins

    def extract_for_detections(self, frame_bgr: np.ndarray, det: dict) -> np.ndarray:
        """
        det["xyxy"]: (N,4) — боксы.
        Возвращает (N, D) массив эмбеддингов (D = self.dim), L2-нормированных.
        Для некорректных/слишком маленьких боксов — нулевой вектор.
        """
        boxes = np.asarray(det["xyxy"], dtype=np.float32)
        N = boxes.shape[0]
        if N == 0:
            return np.zeros((0, self.dim), dtype=np.float32)

        H, W, _ = frame_bgr.shape
        feats = []

        for (x1, y1, x2, y2) in boxes:
            x1_i = int(round(x1))
            y1_i = int(round(y1))
            x2_i = int(round(x2))
            y2_i = int(round(y2))

            x1_i = max(0, min(W - 1, x1_i))
            x2_i = max(0, min(W, x2_i))
            y1_i = max(0, min(H - 1, y1_i))
            y2_i = max(0, min(H, y2_i))

            # совсем маленькие или вырожденные боксы
            if x2_i <= x1_i + 1 or y2_i <= y1_i + 1:
                feats.append(np.zeros(self.dim, dtype=np.float32))
                continue

            crop = frame_bgr[y1_i:y2_i, x1_i:x2_i]
            hsv = cv2.cvtColor(crop, cv2.COLOR_BGR2HSV)

            hist = cv2.calcHist(
                [hsv],
                channels=[0, 1],
                mask=None,
                histSize=[self.h_bins, self.s_bins],
                ranges=[0, 180, 0, 256],
            )
            hist = hist.flatten().astype(np.float32)

            # L1-нормировка
            s = float(hist.sum())
            if s > 0.0:
                hist /= s

            # L2-нормировка
            norm = float(np.linalg.norm(hist))
            if norm > 0.0:
                hist /= norm

            feats.append(hist)

        feats = np.stack(feats, axis=0)
        return feats


In [51]:
# Инициализация
output_path_1_5 = r"output_1_5.mp4"
orig_w, orig_h, fps = get_video_info(video_path)
print(f"Видео: {orig_w}x{orig_h}, fps={fps:.3f}")

tracker = Tracker(
    fps=fps,
    n_init=3,
    max_time_lost=30,
    thr_high=0.5,
    thr_low=0.1,
    iou_thresh_high=0.3,   # чуть жёстче, чем 0.1
    iou_thresh_low=0.1,
    gate_thresh=5.99,      # можно немного ужать гейтинг
    alpha=0.5,
    beta=0.5,
    track_conf_thr=TRACK_CONF_THR,
)

appearance_model = ColorHistAppearance(h_bins=16, s_bins=16)

gen = ffmpeg_frame_generator(video_path, resize_to=None)
writer_proc = create_ffmpeg_writer(output_path_1_5, orig_w, orig_h, fps)

try:
    for idx, frame in enumerate(gen):
        # 1) детектор
        det = run_yolo10_on_frame(frame, conf_thr=0.25)

        # 2) appearance-фичи для всех детекций этого кадра
        det_features = appearance_model.extract_for_detections(frame, det)

        # 3) трекер с учётом appearance
        tracks = tracker.update(det, det_features=det_features)

        # 4) визуализация
        frame_det = draw_detections(frame, det)
        frame_out = draw_tracks_on_frame(frame_det, tracks)

        writer_proc.stdin.write(frame_out.tobytes())

        if idx % 50 == 0:
            print(f"Обработан кадр {idx}")
finally:
    if writer_proc.stdin is not None:
        writer_proc.stdin.close()
    ret = writer_proc.wait()
    print("ffmpeg завершился с кодом", ret)
    print("Готово, выходное видео:", output_path_1_5)


Видео: 1280x720, fps=25.000
Видео: 1280x720, fps=25.000
Обработан кадр 0
Обработан кадр 50
Обработан кадр 100
Обработан кадр 150
Обработан кадр 200
Обработан кадр 250
Обработан кадр 300
Обработан кадр 350
Обработан кадр 400
Обработан кадр 450
Обработан кадр 500
Обработан кадр 550
Обработан кадр 600
Обработан кадр 650
Обработан кадр 700
Обработан кадр 750
Обработан кадр 800
Обработан кадр 850
Обработан кадр 900
Обработан кадр 950
Обработан кадр 1000
Обработан кадр 1050
Обработан кадр 1100
Обработан кадр 1150
ffmpeg завершился с кодом 0
Готово, выходное видео: output_1_5.mp4
