In [None]:
%pip install numpy ultralytics scipy matplotlib scikit-learn jupyter
%pip install opencv-contrib-python

# Imports

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment
from ultralytics import YOLO
import math
import csv
import os

# segmentation.py

In [None]:
class YoloSegmenter:
    def __init__(self, model_path: str):
        """
        Инициализация модели сегментации.
        model_path – путь к файлу модели.
        """
        self.model = YOLO(model_path)  # Загружаем модель через ultralytics

    def segment_frame(self, frame: np.ndarray) -> list:
        """
        Обрабатывает один кадр и возвращает список детекций.
        Каждая детекция – словарь с ключами:
            'bbox': [x1, y1, x2, y2],
            'mask': np.ndarray бинарная маска (значения 0 или 1),
            'class': int (0 – пузырь в фокусе, 1 – пузырь вне фокуса),
            'confidence': float
        """
        # Выполняем инференс для одного кадра
        results = self.model(frame)
        result = results[0]  # При обработке одного кадра возвращается список из одного результата

        detections = []

        # Извлекаем bounding boxes, оценки и классы
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()  # shape: (n, 4)
            scores = result.boxes.conf.cpu().numpy()   # shape: (n,)
            classes = result.boxes.cls.cpu().numpy()   # shape: (n,)
        else:
            boxes, scores, classes = [], [], []

        # Извлекаем маски (если модель обучена на segmentation)
        if result.masks is not None:
            # result.masks.data имеет размер (n, height, width) – значения от 0 до 1
            masks = result.masks.data.cpu().numpy()
        else:
            masks = [None] * len(boxes)

        for i, bbox in enumerate(boxes):
            mask = masks[i] if i < len(masks) else None
            detection = {
                'bbox': bbox.tolist(),
                'mask': (mask > 0.5).astype(np.uint8) if mask is not None else None,
                'class': int(classes[i]),
                'confidence': float(scores[i])
            }
            detections.append(detection)

        return detections


# tracker_bytetrack.py

In [None]:
def iou(bbox1, bbox2):
    """
    Вычисляет IoU (Intersection over Union) для двух bounding box.
    Формат bbox: [x1, y1, x2, y2]
    """
    x1 = max(bbox1[0], bbox2[0])
    y1 = max(bbox1[1], bbox2[1])
    x2 = min(bbox1[2], bbox2[2])
    y2 = min(bbox1[3], bbox2[3])
    inter_area = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = max(0, bbox1[2] - bbox1[0]) * max(0, bbox1[3] - bbox1[1])
    area2 = max(0, bbox2[2] - bbox2[0]) * max(0, bbox2[3] - bbox2[1])
    union_area = area1 + area2 - inter_area
    if union_area <= 0:
        return 0.0
    return inter_area / union_area

class KalmanBoxTracker:
    """
    Отслеживает отдельный объект с использованием полноценного Калмана.
    Состояние: [x, y, s, r, vx, vy, vs]
      x, y – координаты центра,
      s – площадь (масштаб),
      r – соотношение сторон (предполагается относительно стабильным),
      vx, vy, vs – скорости соответствующих параметров.
    """
    count = 0

    def __init__(self, bbox, frame_idx, timestamp, detection=None):
        """
        Инициализация трека по начальному bbox.
        bbox: [x1, y1, x2, y2]
        """
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1

        # Преобразуем bbox в измерение: [x, y, s, r]
        x1, y1, x2, y2 = bbox
        w = x2 - x1
        h = y2 - y1
        x = x1 + w / 2.0
        y = y1 + h / 2.0
        s = w * h
        r = w / (h + 1e-6)

        # Инициализируем состояние: [x, y, s, r, vx, vy, vs]
        self.state = np.array([x, y, s, r, 0, 0, 0], dtype=np.float32)
        # Начальная ковариация. Допустим, относительно точны координаты, но неопределенность в площади и особенно в скорости больше.
        self.P = np.diag([10, 10, 100, 10, 1000, 1000, 1000]).astype(np.float32)

        # Шаг времени по умолчанию (если не задан динамически) — 1 единица (1 кадр)
        dt = 1.0  
        # Матрица перехода состояния F для модели постоянной скорости.
        # В дальнейшем будем обновлять компоненты, зависящие от dt.
        self.F = np.array([
            [1, 0, 0, 0, dt,  0,   0],
            [0, 1, 0, 0,  0, dt,   0],
            [0, 0, 1, 0,  0,  0,  dt],
            [0, 0, 0, 1,  0,  0,   0],
            [0, 0, 0, 0,  1,  0,   0],
            [0, 0, 0, 0,  0,  1,   0],
            [0, 0, 0, 0,  0,  0,   1]
        ], dtype=np.float32)

        # Матрица наблюдения H (мы измеряем только [x, y, s, r])
        self.H = np.array([
            [1, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0]
        ], dtype=np.float32)

        # Матрица шума процесса Q — увеличим шум для скоростных компонент.
        self.Q = np.diag([0.1, 0.1, 0.1, 0.001, 50, 50, 50]).astype(np.float32)
        # Матрица шума измерения R — предполагаем, что измерения x, y относительно точны, площадь менее точна.
        self.R = np.diag([0.5, 0.5, 10, 0.01]).astype(np.float32)

        self.frame_idx = frame_idx
        self.timestamp = timestamp
        self.time_since_update = 0
        self.history = [bbox]
        self.detection = detection

    def predict(self, dt=None):
        """
        Выполняет предсказание нового состояния.
        Если dt не задан, предполагаем dt = 1.0.
        Если задан, обновляем матрицу перехода F.
        """
        if dt is None:
            dt = 1.0
        # Обновляем dt в матрице перехода F
        self.F[0, 4] = dt
        self.F[1, 5] = dt
        self.F[2, 6] = dt

        self.state = np.dot(self.F, self.state)
        self.P = np.dot(np.dot(self.F, self.P), self.F.T) + self.Q
        self.time_since_update += 1
        return self.get_state()

    def update(self, bbox, frame_idx, timestamp, detection=None):
        """
        Обновление состояния по новой детекции с использованием стандартного уравнения Калмана.
        dt рассчитывается как разница между текущей временной меткой и предыдущей.
        """
        # Рассчитываем dt по времени между текущей и предыдущей детекцией.
        dt = timestamp - self.timestamp if self.timestamp is not None else 1.0
        if dt <= 0:
            dt = 1.0

        # Обновляем матрицу перехода F с учетом dt
        self.F[0, 4] = dt
        self.F[1, 5] = dt
        self.F[2, 6] = dt

        # Преобразуем bbox в измерение: [x, y, s, r]
        x1, y1, x2, y2 = bbox
        w = x2 - x1
        h = y2 - y1
        x = x1 + w / 2.0
        y = y1 + h / 2.0
        s = w * h
        r = w / (h + 1e-6)
        z = np.array([x, y, s, r], dtype=np.float32)

        # Измеренная инновация: y = z - Hx
        y_meas = z - np.dot(self.H, self.state)
        # Инновационная ковариация: S = HPH^T + R
        S = np.dot(np.dot(self.H, self.P), self.H.T) + self.R
        # Оптимальное усиление Калмана: K = PH^T S^-1
        K = np.dot(np.dot(self.P, self.H.T), np.linalg.inv(S))
        # Обновление состояния: x = x + Ky
        self.state = self.state + np.dot(K, y_meas)
        # Обновление ковариации: P = (I - KH)P
        I = np.eye(self.F.shape[0], dtype=np.float32)
        self.P = np.dot((I - np.dot(K, self.H)), self.P)

        self.time_since_update = 0
        self.frame_idx = frame_idx
        self.timestamp = timestamp
        self.history.append(bbox)
        self.detection = detection

    def get_state(self):
        """
        Возвращает текущий bbox в формате [x1, y1, x2, y2] на основании текущего состояния.
        Производится защита от отрицательных значений для s и r.
        """
        x, y, s, r = self.state[0:4]
        s = max(s, 1e-6)
        r = max(r, 1e-6)
        w = math.sqrt(s * r)
        h = s / (w + 1e-6)
        x1 = x - w / 2.0
        y1 = y - h / 2.0
        x2 = x + w / 2.0
        y2 = y + h / 2.0
        return [x1, y1, x2, y2]


class ByteTracker:
    def __init__(self, high_thresh=0.6, low_thresh=0.1, max_time_lost=10, iou_threshold=0.2, distance_threshold=50):
        """
        Инициализация ByteTracker.
          - high_thresh: порог для высокодоверенных детекций.
          - low_thresh: порог для низкодоверенных детекций.
          - max_time_lost: число кадров, в течение которых трек может оставаться без обновления.
          - iou_threshold: порог IoU для сопоставления детекций с треками.
          - distance_threshold: максимальное допустимое расстояние между центрами bbox для сопоставления.
        """
        self.high_thresh = high_thresh
        self.low_thresh = low_thresh
        self.max_time_lost = max_time_lost
        self.iou_threshold = iou_threshold
        self.distance_threshold = distance_threshold
        self.trackers = []
        self.finished_tracks = []  # Для хранения завершённых треков

    def update(self, detections, frame_idx, timestamp):
        """
        Обновляет трекер по новым детекциям.
        detections: список словарей с ключами 'bbox', 'confidence', 'mask', и т.д.
        Возвращает словарь активных треков: {tracker_id: tracker_instance}.
        """
        # Разбиваем детекции на высокодоверенные и низкодоверенные
        high_detections = [det for det in detections if det['confidence'] >= self.high_thresh]
        low_detections = [det for det in detections if self.low_thresh <= det['confidence'] < self.high_thresh]

        high_boxes = np.array([det['bbox'] for det in high_detections]) if high_detections else np.empty((0, 4))
        low_boxes = np.array([det['bbox'] for det in low_detections]) if low_detections else np.empty((0, 4))

        # Предсказываем новое положение для всех треков
        for tracker in self.trackers:
            dt = timestamp - tracker.timestamp if tracker.timestamp is not None else 1.0
            if dt <= 0:
                dt = 1.0
            tracker.predict(dt)
        predicted_boxes = np.array([tracker.get_state() for tracker in self.trackers]) if self.trackers else np.empty((0, 4))

        # 1. Сопоставляем высокодоверенные детекции с существующими треками
        matches, unmatched_trackers, unmatched_detections = self.associate_detections_to_trackers(predicted_boxes, high_boxes)

        # Обновляем треки, для которых найдено соответствие
        for tracker_idx, detection_idx in matches:
            self.trackers[tracker_idx].update(high_boxes[detection_idx], frame_idx, timestamp,
                                              detection=high_detections[detection_idx])

        # 2. Для оставшихся треков пытаемся сопоставить низкодоверенные детекции
        if len(unmatched_trackers) > 0 and low_boxes.shape[0] > 0:
            unmatched_predicted = predicted_boxes[unmatched_trackers]
            matches_low, unmatched_trackers_final, unmatched_low = self.associate_detections_to_trackers(unmatched_predicted, low_boxes)
            for local_tracker_idx, detection_idx in matches_low:
                global_tracker_idx = unmatched_trackers[local_tracker_idx]
                self.trackers[global_tracker_idx].update(low_boxes[detection_idx], frame_idx, timestamp,
                                                          detection=low_detections[detection_idx])
            unmatched_trackers = [unmatched_trackers[i] for i in unmatched_trackers_final]

        # 3. Для треков, которым не нашли соответствия, увеличиваем счетчик пропусков
        for idx in unmatched_trackers:
            self.trackers[idx].time_since_update += 1

        # 4. Создаем новые треки для высокодоверенных детекций, которым не нашли соответствия
        for detection_idx in unmatched_detections:
            det = high_detections[detection_idx]
            new_tracker = KalmanBoxTracker(det['bbox'], frame_idx, timestamp, detection=det)
            self.trackers.append(new_tracker)

        # 5. Переносим треки, которые не обновлялись слишком долго, в finished_tracks
        active_trackers = []
        for tracker in self.trackers:
            if tracker.time_since_update > self.max_time_lost:
                self.finished_tracks.append(tracker)
            else:
                active_trackers.append(tracker)
        self.trackers = active_trackers

        # Возвращаем активные треки – считаем активными те, у которых time_since_update <= 1
        active = {tracker.id: tracker for tracker in self.trackers if tracker.time_since_update <= 1}
        return active

    def associate_detections_to_trackers(self, trackers_boxes, detections_boxes):
        """
        Сопоставляет детекции с треками с использованием матрицы IoU и комбинированного критерия.
        Возвращает:
            matches: список пар (tracker_idx, detection_idx)
            unmatched_trackers: список индексов треков без сопоставления
            unmatched_detections: список индексов детекций без сопоставления
        """
        if trackers_boxes.shape[0] == 0 or detections_boxes.shape[0] == 0:
            return [], list(range(trackers_boxes.shape[0])), list(range(detections_boxes.shape[0]))

        # Вычисляем матрицу IoU
        iou_matrix = np.zeros((trackers_boxes.shape[0], detections_boxes.shape[0]), dtype=np.float32)
        for t, tb in enumerate(trackers_boxes):
            for d, db in enumerate(detections_boxes):
                iou_matrix[t, d] = iou(tb, db)

        row_indices, col_indices = linear_sum_assignment(-iou_matrix)
        matches = []
        unmatched_trackers = []
        unmatched_detections = []

        # Для каждой пары (tracker, детекция) проверяем, если либо IoU достаточно высокое,
        # либо центры bbox близки (евклидово расстояние меньше distance_threshold), то считаем их совпадающими.
        for t, d in zip(row_indices, col_indices):
            tb = trackers_boxes[t]
            db = detections_boxes[d]
            center_tracker = ((tb[0] + tb[2]) / 2.0, (tb[1] + tb[3]) / 2.0)
            center_detection = ((db[0] + db[2]) / 2.0, (db[1] + db[3]) / 2.0)
            dist = math.hypot(center_tracker[0] - center_detection[0],
                              center_tracker[1] - center_detection[1])
            if iou_matrix[t, d] >= self.iou_threshold or dist < self.distance_threshold:
                matches.append((t, d))
            else:
                unmatched_trackers.append(t)
                unmatched_detections.append(d)

        # Добавляем те треки и детекции, которые не попали в алгоритм Хунгера
        for t in range(trackers_boxes.shape[0]):
            if t not in row_indices:
                unmatched_trackers.append(t)
        for d in range(detections_boxes.shape[0]):
            if d not in col_indices:
                unmatched_detections.append(d)
        return matches, unmatched_trackers, unmatched_detections
    

# utils.py

In [None]:
def compute_centroid(bbox):
    """
    Вычисляет центр bounding box в формате [x1, y1, x2, y2].
    """
    x1, y1, x2, y2 = bbox
    return ((x1 + x2) / 2, (y1 + y2) / 2)

def euclidean_distance(p1, p2):
    """
    Вычисляет евклидово расстояние между двумя точками.
    """
    if p1 is None or p2 is None:
        return float('inf')
    return math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

def draw_mask(frame, mask, color, alpha=0.5):
    """
    Накладывает полупрозрачную маску на кадр.
    mask: бинарная маска (0 или 1), размером, соответствующим кадру.
    color: кортеж (B, G, R) – цвет для маски.
    alpha: коэффициент прозрачности.
    """
    # Убедимся, что маска имеет тип uint8 и размеры совпадают с изображением
    mask = mask.astype(np.uint8)
    if mask.shape[:2] != frame.shape[:2]:
        mask = cv2.resize(mask, (frame.shape[1], frame.shape[0]), interpolation=cv2.INTER_NEAREST)
    colored_mask = np.zeros_like(frame, dtype=np.uint8)
    colored_mask[mask == 1] = color
    frame = cv2.addWeighted(frame, 1, colored_mask, alpha, 0)
    return frame


# tracking.py

In [None]:
class Bubble:
    def __init__(self, bubble_id: int, detection: dict, frame_idx: int, timestamp: float):
        """
        Инициализация нового пузырька с уникальным ID и сохранением первой детекции.
        """
        self.id = bubble_id
        self.history = []  # История обновлений: список словарей с данными за каждый кадр
        self.missed_frames = 0  # Счётчик пропущенных кадров (если объект не найден)
        self.update(detection, frame_idx, timestamp)

    def update(self, detection: dict, frame_idx: int, timestamp: float):
        """
        Обновление информации о пузырьке новой детекцией. Рассчитывается скорость (пикселей/сек).
        """
        bbox = detection.get('bbox', None)
        mask = detection.get('mask', None)
        detection_class = detection.get('class', None)
        centroid = compute_centroid(bbox) if bbox is not None else None

        # Вычисляем площадь: по маске (если есть) или по bbox
        if mask is not None:
            area = float(np.sum(mask > 0))
        elif bbox is not None:
            x1, y1, x2, y2 = bbox
            area = abs((x2 - x1) * (y2 - y1))
        else:
            area = 0

        # Вычисляем скорость (если имеется предыдущая информация)
        speed = 0.0
        if self.history:
            prev = self.history[-1]
            prev_centroid = prev.get('centroid')
            dt = timestamp - prev.get('timestamp', timestamp)
            if centroid is not None and prev_centroid is not None and dt > 0:
                speed = euclidean_distance(centroid, prev_centroid) / dt

        self.history.append({
            'frame_idx': frame_idx,
            'timestamp': timestamp,
            'centroid': centroid,
            'bbox': bbox,
            'area': area,
            'class': detection_class,
            'mask': mask,
            'speed': speed
        })
        self.missed_frames = 0

    def last_position(self):
        """
        Возвращает последний известный центр и bbox.
        """
        if self.history:
            return self.history[-1]['centroid'], self.history[-1]['bbox']
        return None, None

class BubbleTracker:
    def __init__(self, distance_threshold: float = 50.0, max_missed: int = 5):
        """
        Инициализация трекера:
          - distance_threshold: максимальное расстояние (в пикселях) для сопоставления детекции с существующим пузырьком;
          - max_missed: число кадров, в течение которых объект может не детектироваться, прежде чем его удалить.
        """
        self.distance_threshold = distance_threshold
        self.max_missed = max_missed
        self.tracked_bubbles = {}  # Словарь: bubble_id -> Bubble
        self.next_id = 0

    def update(self, detections: list, frame_idx: int, timestamp: float):
        """
        Обновляет состояние трекера с детекциями текущего кадра.
        Возвращает словарь текущих активных пузырьков.
        """
        # Вычисляем центры для каждой детекции
        detection_centroids = []
        for det in detections:
            bbox = det.get('bbox', None)
            centroid = compute_centroid(bbox) if bbox is not None else None
            detection_centroids.append(centroid)

        # Собираем последние центры уже отслеживаемых пузырьков
        tracked_ids = list(self.tracked_bubbles.keys())
        tracked_centroids = []
        for tid in tracked_ids:
            cent, _ = self.tracked_bubbles[tid].last_position()
            tracked_centroids.append(cent)

        used_detection_indices = set()

        # Жадное сопоставление: для каждого отслеживаемого объекта ищем ближайшую детекцию
        for tid, track_centroid in zip(tracked_ids, tracked_centroids):
            if track_centroid is None:
                continue
            min_dist = float('inf')
            min_idx = -1
            for i, det_centroid in enumerate(detection_centroids):
                if i in used_detection_indices or det_centroid is None:
                    continue
                dist = euclidean_distance(track_centroid, det_centroid)
                if dist < min_dist:
                    min_dist = dist
                    min_idx = i
            if min_idx != -1 and min_dist < self.distance_threshold:
                # Обновляем существующий пузырёк
                self.tracked_bubbles[tid].update(detections[min_idx], frame_idx, timestamp)
                used_detection_indices.add(min_idx)
            else:
                # Если сопоставление не найдено – увеличиваем счётчик пропущенных кадров
                self.tracked_bubbles[tid].missed_frames += 1

        # Создаём новые объекты для оставшихся детекций
        for i, det in enumerate(detections):
            if i not in used_detection_indices:
                bubble = Bubble(self.next_id, det, frame_idx, timestamp)
                self.tracked_bubbles[self.next_id] = bubble
                self.next_id += 1

        # Удаляем объекты, которые пропустили слишком много кадров
        remove_ids = [tid for tid, bubble in self.tracked_bubbles.items() if bubble.missed_frames > self.max_missed]
        for tid in remove_ids:
            del self.tracked_bubbles[tid]

        return self.tracked_bubbles


# video_processing

In [None]:
class VideoProcessor:
    def __init__(self, model_path: str):
        self.segmenter = YoloSegmenter(model_path)
        # Настраиваем трекер; параметры можно изменять
        self.tracker = ByteTracker(high_thresh=0.6, low_thresh=0.1, max_time_lost=10, iou_threshold=0.2, distance_threshold=50)
    def generate_histograms(self, hist_folder: str):
        """
        Собирает все треки (активные + завершённые), выбирает топ-20 по длине жизни,
        вычисляет для каждого скорость и площадь, строит гистограммы и сохраняет их как файлы.
        Возвращает пути к файлам гистограмм (speed_hist_file, area_hist_file).
        """
        # Объединяем завершённые и активные треки
        all_tracks = self.tracker.finished_tracks + self.tracker.trackers
        if not all_tracks:
            return None, None

        # Сортируем треки по числу кадров (длина истории)
        sorted_tracks = sorted(all_tracks, key=lambda tr: len(tr.history), reverse=True)
        top20 = sorted_tracks[:20]
        speeds = []
        areas = []
        for tr in top20:
            vx = tr.state[4]
            vy = tr.state[5]
            speed = math.sqrt(vx*vx + vy*vy)
            speeds.append(speed)
            areas.append(tr.state[2])

        # Строим гистограмму скорости
        plt.figure()
        plt.hist(speeds, bins=10, color='blue', alpha=0.7)
        plt.title("Гистограмма скорости (топ-20 долгоживущих)")
        plt.xlabel("Скорость (пикселей/кадр)")
        plt.ylabel("Частота")
        speed_hist_file = os.path.join(hist_folder, "histogram_speed.png")
        plt.savefig(speed_hist_file)
        plt.close()

        # Строим гистограмму площади
        plt.figure()
        plt.hist(areas, bins=10, color='green', alpha=0.7)
        plt.title("Гистограмма площади (топ-20 долгоживущих)")
        plt.xlabel("Площадь (пикселей^2)")
        plt.ylabel("Частота")
        area_hist_file = os.path.join(hist_folder, "histogram_area.png")
        plt.savefig(area_hist_file)
        plt.close()

        return speed_hist_file, area_hist_file
    

    def process_video(self, input_video_path: str, output_video_path: str, csv_path: str, hist_folder: str):
        cap = cv2.VideoCapture(input_video_path)
        if not cap.isOpened():
            print("Ошибка: не удалось открыть видеофайл.")
            return None, None

        fps = cap.get(cv2.CAP_PROP_FPS)
        width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

        csv_file = open(csv_path, mode='w', newline='')
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(['tracker_id', 'frame_idx', 'timestamp', 'centroid_x', 'centroid_y', 'area', 'class', 'speed'])

        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            timestamp = frame_idx / fps

            detections = self.segmenter.segment_frame(frame)
            tracked_objects = self.tracker.update(detections, frame_idx, timestamp)
            annotated_frame = frame.copy()

            for tracker in tracked_objects.values():
                bbox = tracker.get_state()  # [x1, y1, x2, y2]
                cx = int((bbox[0] + bbox[2]) / 2)
                cy = int((bbox[1] + bbox[3]) / 2)

                detection = tracker.detection
                if detection is None:
                    continue
                mask = detection.get('mask', None)
                detection_class = detection.get('class', None)
                color = (0, 255, 0) if detection_class == 0 else (0, 0, 255)

                cv2.putText(annotated_frame,
                            f"ID: {tracker.id}",
                            (cx, cy - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.5,
                            color,
                            2)
                if mask is not None:
                    annotated_frame = draw_mask(annotated_frame, mask, color)

                # Вычисляем скорость как sqrt(vx^2 + vy^2) из состояния трека
                vx = tracker.state[4]
                vy = tracker.state[5]
                speed = math.sqrt(vx*vx + vy*vy)
                # Площадь берем из state[2]
                area = tracker.state[2]

                csv_writer.writerow([
                    tracker.id,
                    frame_idx,
                    timestamp,
                    cx,
                    cy,
                    area,
                    detection_class,
                    speed
                ])

            out.write(annotated_frame)
            frame_idx += 1

        cap.release()
        out.release()
        csv_file.close()

        # Генерируем гистограммы для топ-20 долгоживущих треков
        speed_hist_file, area_hist_file = self.generate_histograms(hist_folder)
        return speed_hist_file, area_hist_file

# run

In [None]:
MODEL_PATH = "hf_model_repo/model.pt"
video_processor = VideoProcessor(MODEL_PATH)
input_video_path = "input_video/test3.mp4"
output_video_path = "results3/output_video.mp4"  
csv_path = "results3/tracking_data.csv"            
hist_folder = "results3"                         
video_processor.process_video(input_video_path, output_video_path, csv_path, hist_folder)