# SFF

In [1]:
import os
import struct
from PIL import Image
from typing import Optional, Tuple, Generator
import io


class SFFReader:
    """ Класс для чтения и обработки данных из файлов формата SFF."""

    def __init__(self, sff_file: str, dat_file: str = None):
        """
        Инициализирует объект SFFReader.

        Args:
            sff_file: Путь к файлу .sff.
            dat_file: Путь к индексному файлу .dat (опционально).
                      Если не указан, предполагается, что он имеет то же имя,
                      что и файл .sff, но с расширением .dat.
        """
        self.sff_file = sff_file
        self.dat_file = dat_file or sff_file.replace('.sff', '.dat')
        self.header = self._read_header()
        self.frame_data = self._read_dat()  # Читаем данные о кадрах из DAT
        self.frame_count = len(self.frame_data)

        # Проверка и корекция пикетов
        self.recalculate_pickets()

    def _read_header(self) -> dict:
        """
        Читает и разбирает заголовок файла .sff.

        Returns:
            Словарь с данными из заголовка.
        """
        with open(self.sff_file, 'rb') as f:
            header = {
                'road_code': struct.unpack('<h', f.read(2))[0], # код дороги 
                'road_name_length': struct.unpack('<h', f.read(2))[0], # длина названия дороги
                'road_name': f.read(200).decode('cp1251').rstrip('\x00'), # название дороги
                'direction': struct.unpack('<h', f.read(2))[0], # направление
                'recording_date': struct.unpack('<d', f.read(8))[0], # дата записи
                'start_km': struct.unpack('<d', f.read(8))[0], # начало отрезка
                'end_km': struct.unpack('<d', f.read(8))[0], # конец отрезка
                'reserved': f.read(5), # резерв
            }
        return header

    def _read_dat(self) -> list:
        """
        Читает информацию о кадрах из DAT-файла.

        Returns:
            Список словарей с информацией о каждом кадре.
        """
        frame_data = []

        with open(self.dat_file, 'rb') as f_dat:
            while True:
                dat_data = f_dat.read(24)  # 24 байта на кадр
                if len(dat_data) < 24:
                    break
                jpeg_size, offset, picket, timestamp = struct.unpack('<iqid', dat_data) 
                
                # frame_data - список словарей с информацией о кадрах
                frame_data.append({
                    'jpeg_size': jpeg_size,
                    'offset': offset,
                    'picket': picket,
                    'timestamp': timestamp,
                })

        return frame_data

    def get_frames_count(self):
        return self.frame_count

    def get_frame_by_number(self, frame_number: int, as_bytes: bool = False):
        """
        Возвращает изображение кадра или его байты по номеру, используя смещение из DAT файла.

        Args:
            frame_number: Номер кадра.
            as_bytes: Если True — вернуть байты JPEG, иначе — PIL Image.

        Returns:
            PIL Image или bytes: Изображение или байты JPEG.
        """
        if frame_number >= self.frame_count:
            raise ValueError(f"Номер кадра {frame_number} превышает количество кадров в видео.")

        frame_info = self.frame_data[frame_number]
        offset = frame_info['offset']
        with open(self.sff_file, 'rb') as f:
            f.seek(offset)
            jpeg_size = frame_info['jpeg_size']
            jpeg_data = f.read(jpeg_size).rstrip(b'\x00')
        if as_bytes:
            return jpeg_data
        else:
            return Image.open(io.BytesIO(jpeg_data))
    
    def get_frame_by_meter(self, meter: int) -> Tuple[Optional[Image.Image], int]:
        """
        Возвращает кадр, ближайший к заданному значению метра, 
        и его номер в видео.

        Args:
            meter: Значение метра.

        Returns:
            Tuple[Optional[Image.Image], int]: Кортеж, содержащий:
                - Изображение кадра (PIL Image) или None, если кадр не найден.
                - Номер кадра.
        """
        closest_frame_num = None
        min_distance = float('inf')
        for i, frame_data in enumerate(self.frame_data):
            distance = abs(frame_data['picket'] - meter)
            if distance < min_distance:
                closest_frame_num = i
                min_distance = distance

        if closest_frame_num is not None:
            return self.get_frame_by_number(closest_frame_num), closest_frame_num
        else:
            return None, -1
        
    def get_frames(self, direction: int=0, start_km: float=None, end_km:float=None,
                   reverse: bool = False, picket_thr: int = 2  
                   ) -> Generator[Tuple[int, Image.Image, int, float], None, None]:
        """
        Генератор, возвращающий кадры из видео SFF.

        Args:
            direction: Направление проезда.
            start_km: Начало отрезка дороги для обработки.
            end_km: Конец отрезка дороги для обработки.
            reverse: Флаг, указывающий, нужно ли инвертировать порядок кадров.
            picket_thr: Минимальная разница в пикетах (в метрах)

        Yields:
            Tuple[Image.Image, int, float]: Кортеж, содержащий:
                - Номер кадра.
                - Кадр изображения (PIL Image).
                - Значение пикета.
                - Временную метку кадра.
        """

        previous_picket = None
        prev_frame_skipped = False # Флаг пропуска кадра

        with open(self.sff_file, 'rb') as f:

            frame_indices = list(range(self.frame_count)) # Создаем список индексов

            if reverse:
                frame_indices = frame_indices[::-1]  # Инвертируем список индексов, если reverse=True

            for frame_num in frame_indices:
                frame_info = self.frame_data[frame_num]
                km = frame_info['picket'] / 1000

                if not reverse: # Переднее расположение камер
                    if direction == 0:  # Прямое направление
                        if start_km is not None and km < start_km:
                            continue
                        if end_km is not None and km > end_km:
                            break
                    else:  # Обратное направление
                        if start_km is not None and km > start_km:
                            continue
                        if end_km is not None and km < end_km:
                            break
                else: # Заднее расположение камер
                    if direction == 0:  # Прямое направление
                        if start_km is not None and km > end_km:
                            continue
                        if end_km is not None and km < start_km:
                            break
                    else:  # Обратное направление
                        if start_km is not None and km < end_km:
                            continue
                        if end_km is not None and km > start_km:
                            break

                # Пропуск кадров в зависимости от расстояния
                current_picket = frame_info['picket']
                if previous_picket is not None:
                    picket_difference = abs(current_picket - previous_picket)
                    if picket_difference < picket_thr and not prev_frame_skipped:
                        prev_frame_skipped = True
                        continue
                    else:
                        prev_frame_skipped = False
                previous_picket = current_picket

                f.seek(frame_info['offset']) 
                jpeg_size = frame_info['jpeg_size']
                jpeg_data = f.read(jpeg_size).rstrip(b'\x00') # считываем JPEG данные и убираем нулевые байты
                image = Image.open(io.BytesIO(jpeg_data)) 
                yield frame_num, image, frame_info['picket'], frame_info['timestamp']

    def create_sff_and_dat(self, output_folder: str,
                           frames_generator: Generator[Tuple[Image.Image, int, float], None, None]):
        """
        Создает файлы .sff и .dat, записывая кадры по одному.

        Args:
            output_folder: Путь к папке для сохранения выходных файлов.
            frames_generator: Генератор, возвращающий кортежи (frame, picket, float).
        """
        sff_file = os.path.join(output_folder, 'Video.sff')
        dat_file = os.path.join(output_folder, 'Video.dat')

        os.makedirs(output_folder, exist_ok=True)

        with open(sff_file, 'wb') as f_sff, open(dat_file, 'wb') as f_dat:
            # Записываем заголовок SFF из исходного файла
            f_sff.write(struct.pack('<h', self.header['road_code']))
            f_sff.write(struct.pack('<h', self.header['road_name_length']))
            f_sff.write(self.header['road_name'].encode('cp1251').ljust(200, b'\x00'))
            f_sff.write(struct.pack('<h', self.header['direction']))
            f_sff.write(struct.pack('<d', self.header['recording_date']))
            f_sff.write(struct.pack('<d', self.header['start_km']))
            f_sff.write(struct.pack('<d', self.header['end_km']))
            f_sff.write(self.header['reserved'])

            offset = 239  # Начальное смещение для первого кадра
            frame_num = 0

            for frame, picket, timestamp in frames_generator:
                # Сохраняем кадр в JPEG байты
                output_buffer = io.BytesIO()
                frame.save(output_buffer, format='JPEG')
                jpeg_data = output_buffer.getvalue()
                jpeg_size = len(jpeg_data)

                # Записываем данные кадра в SFF
                f_sff.write(struct.pack('<i', jpeg_size))
                f_sff.write(jpeg_data)
                f_sff.write(struct.pack('<i', picket))
                f_sff.write(struct.pack('<d', timestamp))

                # Записываем данные кадра в DAT
                f_dat.write(struct.pack('<i', jpeg_size))
                f_dat.write(struct.pack('<q', offset))
                f_dat.write(struct.pack('<i', picket))
                f_dat.write(struct.pack('<d', timestamp))

                offset += jpeg_size + 16  # Обновляем смещение для следующего кадра
                frame_num += 1

            # Записываем общее количество кадров в SFF
            f_sff.write(struct.pack('<i', frame_num))

    def recalculate_pickets(self):
        """
        Проверяет и пересчитывает пикеты в frame_data, если есть расхождение между заголовком SFF и данными DAT.
        """
        start_m_header = self.header.get('start_km', 0) * 1000
        end_m_header = self.header.get('end_km', 0) * 1000

        start_m_dat = self.frame_data[0]['picket']
        end_m_dat = self.frame_data[-1]['picket']

        if start_m_header == 0 and end_m_header == 0:
            self.header['start_km'] = round(start_m_dat / 1000, 3)
            self.header['end_km'] = round(end_m_dat / 1000, 3)

        if start_m_header != start_m_dat or end_m_header != end_m_dat:
            for frame_info in self.frame_data:
                current_m_dat = frame_info['picket']
                picket_offset = (end_m_header - start_m_header) * (current_m_dat - start_m_dat) / (end_m_dat - start_m_dat)
                new_picket = start_m_header + picket_offset
                frame_info['picket'] = int(round(new_picket))



# SORT

автомобилей на видео 75 

In [2]:
import cv2
import numpy as np
import random
import csv
from collections import defaultdict, deque, Counter
from sklearn.linear_model import RANSACRegressor
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from datetime import datetime

import torch
# import torch_tensorrt
import torch.nn as nn
from torchvision import models, transforms

# =====================  HELPERS  =====================

COLOR_PALETTE = {
    2: (105, 110, 255),  # car
    3: (255,   0,   0),  # motorcycle
    5: (62,  255,  88),  # bus
    7: (255,  81, 243),  # truck
}

def dbg(debug, msg="", *args, **kwargs):
    if debug:
        print(msg.format(*args, **kwargs))

def draw_box(frame, x1, y1, x2, y2, color, text=" ", text_scale=1.5, text_thickness=4):
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
    if text is not None:
        ts = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, text_scale, text_thickness)[0]
        bg_x1, bg_y1 = x1, max(0, y1 - ts[1] - 20)
        bg_x2, bg_y2 = x1 + ts[0] + 5, y1 - 10
        cv2.rectangle(frame, (bg_x1, bg_y1), (bg_x2, bg_y2), (255, 255, 255), -1)
        cv2.putText(frame, text, (x1, y1 - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, text_scale, color, text_thickness)

def should_calc_axle(x2, y2, frame_w, frame_h,
                     axle_for_all,
                     hz_start, hz_stop,
                     vz_start, vz_stop):
    if axle_for_all:
        return True
    ok_h = True
    ok_v = True
    if hz_start is not None and hz_stop is not None:
        d_right = frame_w - x2
        ok_h = (d_right <= hz_start) and (d_right >= hz_stop)
    if vz_start is not None and vz_stop is not None:
        d_bottom = frame_h - y2
        ok_v = (d_bottom <= vz_start) and (d_bottom >= vz_stop)
    return ok_h and ok_v

def crop_vehicle(frame, x1, y1, x2, y2, padding=0):
    h, w, _ = frame.shape
    x1_p = max(0, x1 - padding)
    y1_p = max(0, y1 - padding)
    x2_p = min(w, x2 + padding)
    y2_p = min(h, y2 + padding)
    if x2_p <= x1_p or y2_p <= y1_p:
        return None, x1_p, y1_p
    return frame[y1_p:y2_p, x1_p:x2_p], x1_p, y1_p

def detect_wheels(crop, wheel_model):
    if crop is None or crop.size == 0 or wheel_model is None:
        class Dummy: boxes = []
        return Dummy()
    return wheel_model(crop, verbose=False)[0]

def fit_line_with_ransac(points, residual_threshold=3, max_trials=100):
    n = len(points)
    if n < 2:
        return np.zeros(n, dtype=bool), None
    X = points[:, 0].reshape(-1, 1)
    y = points[:, 1]
    if np.var(X) < 1e-6 or np.var(y) < 1e-6:
        return np.ones(n, dtype=bool), None
    try:
        ransac = RANSACRegressor(residual_threshold=residual_threshold,
                                 max_trials=max_trials,
                                 min_samples=2,
                                 random_state=0)
        ransac.fit(X, y)
        return ransac.inlier_mask_, ransac
    except ValueError:
        return np.ones(n, dtype=bool), None

def draw_wheels_and_centers(frame, wheel_results, ox, oy,
                            inlier_mask=None, box_color=(0,255,255), radius=4):
    if not hasattr(wheel_results, 'boxes'):
        return
    for i, box in enumerate(wheel_results.boxes):
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cv2.rectangle(frame, (x1+ox, y1+oy), (x2+ox, y2+oy), box_color, 1)
        cx = int((x1+x2)//2) + ox
        cy = int((y1+y2)//2) + oy
        c = (0,255,0) if (inlier_mask is not None and i < len(inlier_mask) and inlier_mask[i]) else (0,0,255)
        cv2.circle(frame, (cx, cy), radius, c, -1)

def draw_axle_line(frame, centers, ox, oy, ransac_model, color=(255,0,0)):
    if centers is None or len(centers) < 2 or ransac_model is None:
        return
    X = centers[:,0].reshape(-1,1)
    x_vals = np.array([X.min(), X.max()]).reshape(-1,1)
    y_vals = ransac_model.predict(x_vals)
    x_start, y_start = int(x_vals[0][0]) + ox, int(y_vals[0]) + oy
    x_end,   y_end   = int(x_vals[1][0]) + ox, int(y_vals[1]) + oy
    cv2.line(frame, (x_start, y_start), (x_end, y_end), color, 2)

def process_wheels_on_vehicle(frame, box, wheel_model,
                              padding=0, residual_threshold=5):
    x1, y1, x2, y2 = box
    crop, ox, oy = crop_vehicle(frame, x1, y1, x2, y2, padding)
    wheel_results = detect_wheels(crop, wheel_model)

    centers = []
    if hasattr(wheel_results, 'boxes'):
        for wb in wheel_results.boxes:
            wx1, wy1, wx2, wy2 = map(int, wb.xyxy[0])
            centers.append([(wx1+wx2)/2, (wy1+wy2)/2])
    centers = np.array(centers)

    if len(centers) == 0:
        return " ax:2", 2

    if len(centers) >= 2:
        mask, ransac_model = fit_line_with_ransac(centers, residual_threshold=residual_threshold)
        draw_wheels_and_centers(frame, wheel_results, ox, oy, inlier_mask=mask)
        draw_axle_line(frame, centers, ox, oy, ransac_model)
        ax_cnt = max(int(mask.sum()), 2)
        return f" ax:{ax_cnt}", ax_cnt
    else:
        draw_wheels_and_centers(frame, wheel_results, ox, oy)
        return " ax:2", 2

# =====================  FRAME SOURCES  =====================

def frames_from_reader(reader):
    for frame_num, image, picket, timestamp in reader.get_frames():
        frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
        yield frame, {"frame_num": frame_num, "picket": picket, "timestamp": timestamp}

# def frames_from_video(video_path):
#     cap = cv2.VideoCapture(video_path)
#     if not cap.isOpened():
#         raise RuntimeError(f"Cannot open video: {video_path}")
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     w  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     h  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#     i = 0
#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break
#         yield frame, {"frame_num": i, "fps": fps, "size": (w,h)}
#         i += 1
#     cap.release()

def frames_from_video(video_path, target_fps=None): 
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {video_path}")
    
    original_fps = cap.get(cv2.CAP_PROP_FPS)
    w  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # --- Рассчитываем интервал пропуска ---
    frame_interval = 1
    if target_fps is not None and original_fps > target_fps and target_fps > 0:
        # Рассчитываем, сколько кадров нужно пропускать
        frame_interval = round(original_fps / target_fps)
        print(f"[INFO] Original FPS: {original_fps:.2f}, Target FPS: {target_fps:.2f}. Processing every {frame_interval} frame(s).")
        # Обновляем FPS, который будет передан дальше, чтобы отражать изменение
        # Это важно для правильного расчета времени и сохранения видео
        effective_fps = original_fps / frame_interval 
    else:
        effective_fps = original_fps
        print(f"[INFO] Original FPS: {original_fps:.2f}. Processing all frames (no FPS reduction).")

    i = 0 # Счетчик всех кадров
    processed_i = 0 # Счетчик обработанных кадров
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # --- Логика пропуска кадров ---
        if i % frame_interval == 0:
            # Передаем информацию о "реальном" времени кадра и эффективном FPS
            # frame_num теперь отражает номер обработанного кадра
            # timestamp можно рассчитать, если нужно точное время
            yield frame, {"frame_num": processed_i, "original_frame_num": i, "fps": effective_fps, "original_fps": original_fps, "size": (w,h)}
            processed_i += 1
        i += 1
        
    cap.release()
    print(f"[INFO] Total frames read: {i}, Total frames processed: {processed_i}")

# =====================  AXLE STORAGE  =====================

AXLE_HISTORY      = defaultdict(lambda: deque(maxlen=5))
AXLE_VALUE        = {}
LAST_AXLE_FRAME   = {}
DEFAULT_AXLES     = 2

def update_axles(track_id, count):
    if count is None or count == 0:
        return
    count = max(count, 2)
    AXLE_HISTORY[track_id].append(count)
    c = Counter(AXLE_HISTORY[track_id]).most_common(1)[0][0]
    AXLE_VALUE[track_id] = c

def ensure_default_axles(track_id):
    if track_id not in AXLE_VALUE:
        AXLE_VALUE[track_id] = DEFAULT_AXLES

# =====================  ZONES & LINES  =====================

def point_in_poly(x, y, poly):
    inside = False
    n = len(poly)
    px, py = x, y
    x1, y1 = poly[0]
    for i in range(1, n + 1):
        x2, y2 = poly[i % n]
        if min(y1, y2) < py <= max(y1, y2) and px <= max(x1, x2):
            if y1 != y2:
                xinters = (py - y1) * (x2 - x1) / (y2 - y1 + 1e-9) + x1
            if x1 == x2 or px <= xinters:
                inside = not inside
        x1, y1 = x2, y2
    return inside

def get_zone_for_point(x, y, zones):
    if not zones:
        return None
    for name, poly in zones.items():
        if point_in_poly(x, y, poly):
            return name
    return None

def side_of_line(px, py, p1, p2):
    return np.sign((px - p1[0]) * (p2[1] - p1[1]) - (py - p1[1]) * (p2[0] - p1[0]))

def point_between_segment(px, py, p1, p2, margin=15):
    xmin, xmax = sorted([p1[0], p2[0]])
    ymin, ymax = sorted([p1[1], p2[1]])
    return (xmin - margin <= px <= xmax + margin) and (ymin - margin <= py <= ymax + margin)

def crossed(prev_side, cur_side):
    return prev_side != 0 and cur_side != 0 and prev_side != cur_side

LINE_SIDE = defaultdict(dict)

# =====================  TRACK INFO  =====================

TRACK_INFO = {}

def init_track_info(tid, frame_idx):
    TRACK_INFO[tid] = {
        "track_id": tid,
        "class_id": None,
        "first_frame": frame_idx,
        "last_frame": frame_idx,
        "start_zone": None,
        "end_zone": None,
        "start_line": None,
        "end_line": None,
        "axles": DEFAULT_AXLES,
        "length_frames": 0,
        "enter_time": None,
        "exit_time": None
    }

def finalize_track(tid, csv_rows):
    if tid not in TRACK_INFO:
        return
    info = TRACK_INFO[tid]
    start = info["start_line"]
    end   = info["end_line"]

    # Записываем только если есть обе линии и они разные
    if start is None or end is None or start == end:
        # просто чистим все данные без записи
        TRACK_INFO.pop(tid, None)
        AXLE_VALUE.pop(tid, None)
        AXLE_HISTORY.pop(tid, None)
        LAST_AXLE_FRAME.pop(tid, None)
        LINE_SIDE.pop(tid, None)
        return
    row = [
        info["track_id"],
        info["class_id"],
        info["axles"],
        info["start_zone"],
        info["end_zone"],
        info["start_line"],
        info["end_line"],
        info["first_frame"],
        info["last_frame"],
        info["length_frames"],
        info["enter_time"],
        info["exit_time"]
    ]
    csv_rows.append(row)
    TRACK_INFO.pop(tid, None)
    AXLE_VALUE.pop(tid, None)
    AXLE_HISTORY.pop(tid, None)
    LAST_AXLE_FRAME.pop(tid, None)
    LINE_SIDE.pop(tid, None)

# =====================  RESNET-34 CLASSIFIER  =====================

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class_names = [
    'bus', 
    'car', 
    'motorcycle', 
    'truck'
]

clf = models.resnet34(weights=None)
in_features = clf.fc.in_features
clf.fc = nn.Linear(in_features, len(class_names))
ckpt = torch.load(r"D:\учеба\test\best_model.pth", map_location=device)
clf.load_state_dict(ckpt['model_state_dict'])
clf.to(device).eval()

# clf = torch_tensorrt.compile(
#     clf,
#     inputs=[
#         torch_tensorrt.Input(
#             min_shape=(1, 3, 224, 224),
#             opt_shape=(1, 3, 224, 224),
#             max_shape=(1, 3, 224, 224),
#             dtype=torch.float32,
#             )
#         ],
#     enabled_precisions={torch.float32, torch.float16},
#     workspace_size=1 << 30,  # 1 GB
# )






preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])
# =====================  CORE PROCESSING  =====================
write_count = 0
def _run_core(frame_gen,
              model,
              target_classes=None,
              debug=False,
              log_every=10,
              use_tracking=True,
              show_track_id=True,
              use_axle=False,
              wheel_model=None,
              axle_for_all=True,
              hz_start=700, hz_stop=5,
              vz_start=None, vz_stop=None,
              axle_period=1,
              save_out=False,
              out_path="output.mp4",
              out_fps=None,
              zones=None,
              lines=None,
              draw_regions=True,
              csv_path="objects.csv"):

    dbg(debug, "[INIT] use_tracking={}", use_tracking)

    tracker = None
    if use_tracking:
        tracker = DeepSort(
            max_iou_distance=0.8,
            max_age=30,
            n_init=3,
            nms_max_overlap=1,
            max_cosine_distance=0.2,
            nn_budget=200,
            gating_only_position=True,
            embedder='mobilenet',
            half=True,
            bgr=False,
            embedder_gpu=True,
        )

    if target_classes is None:
        target_classes = [2]

    writer = None
    rand_palette = {}
    frame_count = 0
    width = height = None
    fps_out = out_fps

    csv_rows = []
    csv_header = [
        "track_id", "class_id", "axles",
        "start_zone", "end_zone",
        "start_line", "end_line",
        "first_frame", "last_frame", "frames_total",
        "enter_time", "exit_time"
    ]

    try:
        for frame_bgr, meta in frame_gen:
            if width is None:
                height, width = frame_bgr.shape[:2]
            if save_out and writer is None:
                chosen_fps = out_fps if (out_fps is not None) else meta.get("fps", 25)
                writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"),
                                         chosen_fps, (width, height))

            # draw zones/lines
            if draw_regions:
                if zones:
                    for zname, poly in zones.items():
                        pts = np.array(poly, dtype=np.int32)
                        cv2.polylines(frame_bgr, [pts], True, (0,255,255), 2)
                        cv2.putText(frame_bgr, zname, tuple(pts[0]),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
                if lines:
                    for lname, (p1, p2) in lines.items():
                        cv2.line(frame_bgr, p1, p2, (0,165,255), 2)
                        cv2.putText(frame_bgr, lname, p1,
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,165,255), 2)

            frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
            results = model(frame_rgb, conf=0.2, iou=0.3,
                            classes=target_classes,
                            half=True, device='0',
                            max_det=50, verbose=False)[0]

            detections = []
            for box in results.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                conf = float(box.conf[0])
                cls_id = int(box.cls[0])
                if cls_id in target_classes:
                    detections.append(([x1, y1, x2 - x1, y2 - y1], conf, cls_id))

            if use_tracking:
                tracks = tracker.update_tracks(detections, frame=frame_rgb)
                alive_ids = set()

                for track in tracks:
                    if not track.is_confirmed():
                        continue
                    ltrb = track.to_ltrb(orig=True, orig_strict=True)
                    if ltrb is None:
                        continue

                    tid = track.track_id
                    alive_ids.add(tid)
                    ensure_default_axles(tid)
                    if tid not in TRACK_INFO:
                        init_track_info(tid, frame_count)

                    x1, y1, x2, y2 = map(int, ltrb)
                    cx, cy = (x1 + x2)//2, (y1 + y2)//2

                    # --- RESNET CLASSIFIER ---
                    crop_img, _, _ = crop_vehicle(frame_bgr, x1, y1, x2, y2, padding=0)
                    label = None
                    if crop_img is not None:
                        rgb_crop = cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB)
                        inp = preprocess(rgb_crop).unsqueeze(0).to(device)
                        with torch.no_grad():
                            out = clf(inp)
                            idx = out.argmax(1).item()
                        label = class_names[idx]
                    TRACK_INFO[tid]["class_id"] = label

                    TRACK_INFO[tid]["last_frame"] = frame_count
                    TRACK_INFO[tid]["length_frames"] += 1

                    # zones
                    cur_zone = get_zone_for_point(cx, cy, zones)
                    if TRACK_INFO[tid]["start_zone"] is None and cur_zone is not None:
                        TRACK_INFO[tid]["start_zone"] = cur_zone
                        TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
                    TRACK_INFO[tid]["end_zone"] = cur_zone

                    # lines
                    if lines:
                        for lname, (p1, p2) in lines.items():
                            cur_side = side_of_line(cx, cy, p1, p2)
                            prev_side = LINE_SIDE[tid].get(lname, cur_side)
                            LINE_SIDE[tid][lname] = cur_side
                            if crossed(prev_side, cur_side) and point_between_segment(cx, cy, p1, p2):
                                if TRACK_INFO[tid]["start_line"] is None:
                                    TRACK_INFO[tid]["start_line"] = lname
                                    if TRACK_INFO[tid]["enter_time"] is None:
                                        TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
                                TRACK_INFO[tid]["end_line"] = lname
                                TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()

                    # axle counting
                    do_calc = (use_axle
                               and should_calc_axle(x2, y2, width, height,
                                                    axle_for_all,
                                                    hz_start, hz_stop,
                                                    vz_start, vz_stop)
                               and (tid not in LAST_AXLE_FRAME
                                    or frame_count - LAST_AXLE_FRAME[tid] >= axle_period))
                    if do_calc:
                        _, num_ax = process_wheels_on_vehicle(frame_bgr, (x1, y1, x2, y2), wheel_model)
                        update_axles(tid, num_ax)
                        TRACK_INFO[tid]["axles"] = AXLE_VALUE.get(tid, DEFAULT_AXLES)
                        LAST_AXLE_FRAME[tid] = frame_count

                    stable_ax = max(AXLE_VALUE.get(tid, DEFAULT_AXLES), 2)
                    TRACK_INFO[tid]["axles"] = stable_ax

                    # draw box + label + axles
                    color = track.get_det_class()  # or rand_palette[tid]
                    color = rand_palette.setdefault(tid, (
                        random.randint(50,200),
                        random.randint(50,200),
                        random.randint(50,200)
                    ))
                    text = f"ID:{tid}"
                    if label:
                        text += f" {label}"
                    if use_axle:
                        text += f" ax:{stable_ax}"
                    draw_box(frame_bgr, x1, y1, x2, y2, color, text=text)

                # finalize dead tracks
                dead = [tid for tid in list(TRACK_INFO) if tid not in alive_ids]
                for tid in dead:
                    if TRACK_INFO[tid]["exit_time"] is None:
                        TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
                    finalize_track(tid, csv_rows)

            else:
                # detection only
                for tlwh, conf, cls_id in detections:
                    x, y, w, h = tlwh
                    x1, y1, x2, y2 = x, y, x + w, y + h

                    # classify
                    crop_img, _, _ = crop_vehicle(frame_bgr, x1, y1, x2, y2, padding=0)
                    label = None
                    if crop_img is not None:
                        rgb_crop = cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB)
                        inp = preprocess(rgb_crop).unsqueeze(0).to(device)
                        with torch.no_grad():
                            out = clf(inp)
                            idx = out.argmax(1).item()
                        label = class_names[idx]

                    # draw
                    color = COLOR_PALETTE.get(cls_id, (255,0,0))
                    text = label if label else f"cls:{cls_id}"
                    if use_axle:
                        text += f" ax:{DEFAULT_AXLES}"
                    draw_box(frame_bgr, x1, y1, x2, y2, color, text=text)

            # display / save
            if frame_count == 0:
                cv2.namedWindow('Tracking', cv2.WINDOW_NORMAL)
            disp_w = 960
            disp_h = int(frame_bgr.shape[0] * disp_w / frame_bgr.shape[1])
            cv2.imshow('Tracking', cv2.resize(frame_bgr, (disp_w, disp_h),
                                             interpolation=cv2.INTER_AREA))
            if save_out and writer is not None:
                writer.write(frame_bgr)
            if cv2.waitKey(1) & 0xFF == 27:  # ESC key
                break

            frame_count += 1
            if not debug and frame_count % log_every == 0:
                print(f"[INFO] Processed {frame_count} frames")

    except KeyboardInterrupt:
        print("[EXIT] Interrupted by user")
    finally:
        # finalize remaining
        for tid in list(TRACK_INFO):
            if TRACK_INFO[tid]["exit_time"] is None:
                TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
            finalize_track(tid, csv_rows)

        if csv_path and csv_rows:
            with open(csv_path, "w", newline="", encoding="utf-8") as f:
                writer_csv = csv.writer(f)
                writer_csv.writerow(csv_header)
                writer_csv.writerows(csv_rows)
            print(f"[CSV] Saved {len(csv_rows)} rows to {csv_path}")

        if 'writer' in locals() and writer:
            writer.release()
        cv2.destroyAllWindows()
        print(f"[END] Total frames processed: {frame_count}")


# =====================  PUBLIC API  =====================

def deepsort_reader(reader=None, **kwargs):
    if reader is None:
        raise ValueError("reader is None")
    kwargs.setdefault("axle_period", 1)
    return _run_core(
        frame_gen=frames_from_reader(reader),
        **kwargs
    )

def deepsort_video(video_path=None, target_fps=None, **kwargs):
    if video_path is None:
        raise ValueError("video_path is None")
    kwargs.setdefault("axle_period", 1)
    return _run_core(
        frame_gen=frames_from_video(video_path, target_fps=target_fps),
        **kwargs
    )

# Save

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_names = ['bus', 'car', 'motorcycle', 'truck']

clf = models.resnet34(weights=None)
in_features = clf.fc.in_features
clf.fc = nn.Linear(in_features, len(class_names))
ckpt = torch.load(r"D:\учеба\test\best_model.pth", map_location=device)
clf.load_state_dict(ckpt['model_state_dict'])
clf.to(device).eval()

preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

In [None]:
# def annotated_frames(reader,
#                      model,
#                      wheel_model,
#                      target_classes=None,
#                      use_tracking=True,
#                      show_track_id=True,
#                      use_axle=True,
#                      axle_for_all=True,
#                      hz_start=700, hz_stop=5,
#                      vz_start=None, vz_stop=None,
#                      axle_period=3,
#                      debug=False,
#                      log_every=10,
#                      lines=None,
#                      zones=None):
#     """
#     Генератор кадров с уже нарисованными:
#       - детекцией (YOLO)
#       - трекингом (DeepSort)
#       - классификацией (ResNet)
#       - подсчётом и отрисовкой осей (wheel_model + RANSAC)
#       - зонами и линиями
#     Возвращает кортеж: (PIL.Image, picket, timestamp).
#     """
#     # Инициализация трекера
#     tracker = None
#     if use_tracking:
#         tracker = DeepSort(
#             max_iou_distance=0.5, max_age=20, n_init=3,
#             nms_max_overlap=1, max_cosine_distance=0.2,
#             nn_budget=100, gating_only_position=True,
#             embedder='mobilenet', half=True, bgr=False,
#             embedder_gpu=True
#         )

#     # Для логов
#     frame_count = 0

#     # Пустой writer — мы не сохраняем видео
#     cvs_rows = []

#     # Обходим все кадры из reader
#     for frame_bgr, meta in frames_from_reader(reader):
#         h, w = frame_bgr.shape[:2]

#         # --- Рисуем зоны и линии, если заданы ---
#         if zones:
#             for zname, poly in zones.items():
#                 pts = np.array(poly, dtype=np.int32)
#                 cv2.polylines(frame_bgr, [pts], True, (0,255,255), 2)
#                 cv2.putText(frame_bgr, zname, tuple(pts[0]),
#                             cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
#         if lines:
#             for lname, (p1, p2) in lines.items():
#                 cv2.line(frame_bgr, p1, p2, (0,165,255), 2)
#                 cv2.putText(frame_bgr, lname, p1,
#                             cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,165,255), 2)

#         # --- Детекция YOLO ---
#         frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
#         results = model(frame_rgb, conf=0.5, iou=0.3,
#                         classes=target_classes, half=True,
#                         device='0', max_det=20, verbose=False)[0]

#         # Собираем детекции в формат DeepSort
#         dets = []
#         for box in results.boxes:
#             x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
#             conf = float(box.conf[0])
#             cls_id = int(box.cls[0])
#             if target_classes is None or cls_id in target_classes:
#                 dets.append(([x1, y1, x2-x1, y2-y1], conf, cls_id))

#         alive_ids = set()
#         if use_tracking:
#             tracks = tracker.update_tracks(dets, frame=frame_rgb)
#             for track in tracks:
#                 if not track.is_confirmed(): 
#                     continue
#                 tid = track.track_id
#                 alive_ids.add(tid)
#                 ensure_default_axles(tid)
#                 if tid not in TRACK_INFO:
#                     init_track_info(tid, frame_count)

#                 # Координаты бокса
#                 ltrb = track.to_ltrb(orig=True, orig_strict=True)
#                 if ltrb is None:
#                     continue
#                 x1, y1, x2, y2 = map(int, ltrb)
#                 cx, cy = (x1+x2)//2, (y1+y2)//2

#                 # --- Классификация ResNet ---
#                 crop, _, _ = crop_vehicle(frame_bgr, x1, y1, x2, y2)
#                 label = None
#                 if crop is not None:
#                     inp = preprocess(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))\
#                               .unsqueeze(0).to(next(clf.parameters()).device)
#                     with torch.no_grad():
#                         out = clf(inp)
#                         idx = out.argmax(1).item()
#                     label = class_names[idx]
#                 TRACK_INFO[tid]["class_id"] = label

#                 # Обновляем инфо трека
#                 TRACK_INFO[tid]["last_frame"] = frame_count
#                 TRACK_INFO[tid]["length_frames"] += 1

#                 # --- Зоны ---
#                 cur_zone = get_zone_for_point(cx, cy, zones or {})
#                 if TRACK_INFO[tid]["start_zone"] is None and cur_zone:
#                     TRACK_INFO[tid]["start_zone"] = cur_zone
#                     TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
#                 TRACK_INFO[tid]["end_zone"] = cur_zone

#                 # --- Линии ---
#                 if lines:
#                     for lname, (p1, p2) in lines.items():
#                         prev = LINE_SIDE[tid].get(lname, 0)
#                         cur  = side_of_line(cx, cy, p1, p2)
#                         LINE_SIDE[tid][lname] = cur
#                         if crossed(prev, cur) and point_between_segment(cx, cy, p1, p2):
#                             if TRACK_INFO[tid]["start_line"] is None:
#                                 TRACK_INFO[tid]["start_line"] = lname
#                                 if TRACK_INFO[tid]["enter_time"] is None:
#                                     TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
#                             TRACK_INFO[tid]["end_line"] = lname
#                             TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()

#                 # --- Подсчёт осей ---
#                 do_axle = (use_axle
#                            and should_calc_axle(x2, y2, w, h,
#                                                  axle_for_all,
#                                                  hz_start, hz_stop,
#                                                  vz_start, vz_stop)
#                            and (tid not in LAST_AXLE_FRAME
#                                 or frame_count - LAST_AXLE_FRAME[tid] >= axle_period))
#                 if do_axle:
#                     _, num_ax = process_wheels_on_vehicle(frame_bgr,
#                                                           (x1, y1, x2, y2),
#                                                           wheel_model)
#                     update_axles(tid, num_ax)
#                     TRACK_INFO[tid]["axles"] = AXLE_VALUE.get(tid, 2)
#                     LAST_AXLE_FRAME[tid] = frame_count

#                 stable_ax = max(AXLE_VALUE.get(tid, 2), 2)
#                 TRACK_INFO[tid]["axles"] = stable_ax

#                 # --- Рисуем бокс, текст, оси ---
#                 color = COLOR_PALETTE.get(track.get_det_class(), None) \
#                         or tuple(random.randint(50,200) for _ in range(3))
#                 text = f"ID:{tid}"
#                 if label:      text += f" {label}"
#                 if use_axle:   text += f" ax:{stable_ax}"
#                 draw_box(frame_bgr, x1, y1, x2, y2, color, text=text)

#             # финализируем вышедшие из кадра треки
#             dead = [tid for tid in TRACK_INFO if tid not in alive_ids]
#             for tid in dead:
#                 if TRACK_INFO[tid]["exit_time"] is None:
#                     TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
#                 finalize_track(tid, cvs_rows)

#         else:
#             # Detektion-only ветка (можете скопировать аналогично)
#             for tlwh, conf, cls_id in dets:
#                 x, y, w, h = tlwh
#                 x1, y1, x2, y2 = x, y, x + w, y + h

#                 # classify
#                 crop_img, _, _ = crop_vehicle(frame_bgr, x1, y1, x2, y2, padding=0)
#                 label = None
#                 if crop_img is not None:
#                     rgb_crop = cv2.cvtColor(crop_img, cv2.COLOR_BGR2RGB)
#                     inp = preprocess(rgb_crop).unsqueeze(0).to(device)
#                     with torch.no_grad():
#                         out = clf(inp)
#                         idx = out.argmax(1).item()
#                     label = class_names[idx]

#                 # draw
#                 color = COLOR_PALETTE.get(cls_id, (255,0,0))
#                 text = label if label else f"cls:{cls_id}"
#                 if use_axle:
#                     text += f" ax:{DEFAULT_AXLES}"
#                 draw_box(frame_bgr, x1, y1, x2, y2, color, text=text)
#             pass

#         # --- Конвертация в PIL и yield ---
#         pil_img = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
#         yield pil_img, meta["picket"], meta["timestamp"]

#         frame_count += 1
#         if not debug and frame_count % log_every == 0:
#             print(f"[INFO] Processed {frame_count} frames")

#     # По выходу из цикла финализируем оставшиеся треки
#     for tid in list(TRACK_INFO):
#         if TRACK_INFO[tid]["exit_time"] is None:
#             TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
#         finalize_track(tid, cvs_rows)





def annotated_frames(reader,
                     model: YOLO,
                     wheel_model: YOLO,
                     target_classes=None,
                     use_tracking=True,
                     use_axle=True,
                     axle_for_all=True,
                     hz_start=700, hz_stop=5,
                     vz_start=None, vz_stop=None,
                     axle_period=3,
                     lines=None,
                     zones=None,
                     debug=False,
                     log_every=50):
    """
    Генератор кадров с:
      • детекцией YOLO,
      • трекингом DeepSort,
      • классификацией ResNet,
      • подсчётом осей колес,
      • отрисовкой зон и линий.
    Возвращает (PIL.Image, picket, timestamp).
    """
    # инициализация трекера
    tracker = None
    if use_tracking:
        tracker = DeepSort(
            max_iou_distance=0.5, max_age=20, n_init=3,
            nms_max_overlap=1, max_cosine_distance=0.2,
            nn_budget=100, gating_only_position=True,
            embedder='mobilenet', half=True, bgr=False,
            embedder_gpu=True
        )

    frame_count = 0
    cvs_rows = []

    for frame_bgr, meta in frames_from_reader(reader):
        h, w = frame_bgr.shape[:2]

        # — отрисовка зон/линий —
        if zones:
            for name, poly in zones.items():
                pts = np.array(poly, np.int32)
                cv2.polylines(frame_bgr, [pts], True, (0,255,255), 2)
                cv2.putText(frame_bgr, name, tuple(pts[0]),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        if lines:
            for name, (p1, p2) in lines.items():
                cv2.line(frame_bgr, p1, p2, (0,165,255), 2)
                cv2.putText(frame_bgr, name, p1,
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,165,255), 2)

        # — детекция YOLO —
        rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        results = model(rgb, conf=0.5, iou=0.3,
                        classes=target_classes,
                        half=True, device='0',
                        max_det=50, verbose=False)[0]

        detections = []
        for box in results.boxes:
            x1,y1,x2,y2 = map(int, box.xyxy[0].tolist())
            conf = float(box.conf[0])
            cls_id = int(box.cls[0])
            if target_classes is None or cls_id in target_classes:
                detections.append(([x1,y1,x2-x1,y2-y1], conf, cls_id))

        alive = set()
        if use_tracking:
            tracks = tracker.update_tracks(detections, frame=rgb)
            for track in tracks:
                if not track.is_confirmed():
                    continue
                tid = track.track_id
                alive.add(tid)
                ensure_default_axles(tid)
                if tid not in TRACK_INFO:
                    init_track_info(tid, frame_count)

                ltrb = track.to_ltrb(orig=True, orig_strict=True)
                if ltrb is None:
                    continue
                x1,y1,x2,y2 = map(int, ltrb)
                cx, cy = (x1+x2)//2, (y1+y2)//2

                # — классификация ResNet —
                crop, _, _ = crop_vehicle(frame_bgr, x1,y1,x2,y2)
                label = None
                if crop is not None and crop.size != 0:
                    inp = preprocess(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))\
                              .unsqueeze(0).to(device)
                    with torch.no_grad():
                        out = clf(inp)
                        idx = out.argmax(1).item()
                    label = class_names[idx]
                TRACK_INFO[tid]["class_id"] = label

                # обновление мета-инфо трека
                TRACK_INFO[tid]["last_frame"] = frame_count
                TRACK_INFO[tid]["length_frames"] += 1

                # — зоны —
                zone = get_zone_for_point(cx, cy, zones or {})
                if TRACK_INFO[tid]["start_zone"] is None and zone:
                    TRACK_INFO[tid]["start_zone"] = zone
                    TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
                TRACK_INFO[tid]["end_zone"] = zone

                # — линии —
                if lines:
                    for name, (p1,p2) in lines.items():
                        prev = LINE_SIDE[tid].get(name, 0)
                        cur  = side_of_line(cx, cy, p1, p2)
                        LINE_SIDE[tid][name] = cur
                        if crossed(prev, cur) and point_between_segment(cx,cy,p1,p2):
                            if TRACK_INFO[tid]["start_line"] is None:
                                TRACK_INFO[tid]["start_line"] = name
                                if TRACK_INFO[tid]["enter_time"] is None:
                                    TRACK_INFO[tid]["enter_time"] = datetime.now().isoformat()
                            TRACK_INFO[tid]["end_line"] = name
                            TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()

                # — подсчёт осей —
                do_axle = (use_axle
                           and should_calc_axle(x2,y2,w,h, axle_for_all,
                                                hz_start,hz_stop,
                                                vz_start,vz_stop)
                           and (tid not in LAST_AXLE_FRAME
                                or frame_count - LAST_AXLE_FRAME[tid] >= axle_period))
                if do_axle:
                    _, cnt = process_wheels_on_vehicle(frame_bgr, (x1,y1,x2,y2), wheel_model)
                    update_axles(tid, cnt)
                    TRACK_INFO[tid]["axles"] = AXLE_VALUE[tid]
                    LAST_AXLE_FRAME[tid] = frame_count

                stable = max(AXLE_VALUE.get(tid, 2), 2)
                TRACK_INFO[tid]["axles"] = stable

                # — отрисовка —
                color = tuple(random.randint(50,200) for _ in range(3))
                text  = f"ID:{tid}"
                if label:    text += f" {label}"
                if use_axle: text += f" ax:{stable}"
                draw_box(frame_bgr, x1,y1,x2,y2, color, text=text)

            # финализируем ушедшие треки
            dead = [tid for tid in TRACK_INFO if tid not in alive]
            for tid in dead:
                if TRACK_INFO[tid]["exit_time"] is None:
                    TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
                finalize_track(tid, cvs_rows)

        # преобразуем в PIL и отдаем
        pil = Image.fromarray(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
        yield pil, meta["picket"], meta["timestamp"]

        frame_count += 1
        if not debug and frame_count % log_every == 0:
            print(f"[INFO] {frame_count} frames processed")

    # финализируем оставшихся
    for tid in list(TRACK_INFO):
        if TRACK_INFO[tid]["exit_time"] is None:
            TRACK_INFO[tid]["exit_time"] = datetime.now().isoformat()
        finalize_track(tid, cvs_rows)

In [8]:
# 1) Инициализируем reader
reader = SFFReader(r"D:\учеба\test\input\noyabrsk\Camera_1\Video.sff")

# 2) Загружаем YOLO-модели
yolo = YOLO(r"D:\учеба\test\yolo11x.pt")
wheels = YOLO(r"D:\учеба\test\danya\best.pt")
LINES = {
    "LINE_A": ((1900,400), (1900,1600)),
    "LINE_B": ((0,200), (0,1000)),
}

# 3) Создаём генератор
gen = annotated_frames(
    reader=reader,
    model=yolo,
    wheel_model=wheels,
    target_classes=[2,3,5,7],
    use_tracking=True,
    use_axle=True,
    axle_period=3,
    lines=LINES,
    zones=None
)
sff_file = r'D:\учеба\Практика\расчет_интенсивности\мониторинг_видео\Нефтеюганск_2023_ул.Набережная-Ленина\Camera_2\Video.sff'

# 4) Сохраняем в SFF/DAT
reader.create_sff_and_dat(
    output_folder="output/noyabrsk",
    frames_generator=gen
)
print("Готово!")

[INFO] 50 frames processed
[INFO] 100 frames processed
[INFO] 150 frames processed
[INFO] 200 frames processed
[INFO] 250 frames processed
[INFO] 300 frames processed
[INFO] 350 frames processed
[INFO] 400 frames processed
[INFO] 450 frames processed
[INFO] 500 frames processed
[INFO] 550 frames processed
[INFO] 600 frames processed
[INFO] 650 frames processed
[INFO] 700 frames processed
[INFO] 750 frames processed
[INFO] 800 frames processed
[INFO] 850 frames processed
[INFO] 900 frames processed
[INFO] 950 frames processed
[INFO] 1000 frames processed
[INFO] 1050 frames processed
[INFO] 1100 frames processed
[INFO] 1150 frames processed
[INFO] 1200 frames processed
[INFO] 1250 frames processed
[INFO] 1300 frames processed
[INFO] 1350 frames processed
[INFO] 1400 frames processed
[INFO] 1450 frames processed
[INFO] 1500 frames processed
[INFO] 1550 frames processed
[INFO] 1600 frames processed
[INFO] 1650 frames processed
[INFO] 1700 frames processed
[INFO] 1750 frames processed
[INFO

# Test

In [3]:
# =====================  EXAMPLE USAGE  =====================

LINES = {
    "LINE_A": ((1020, 450), (370, 690)),
    "LINE_B": ((610, 380), (310, 400)),
}

model = YOLO(r'D:\учеба\test\yolo11x.pt')
wheel_model = YOLO(r"D:\учеба\test\danya\best.pt")
reader = SFFReader(r"D:\учеба\test\input\noyabrsk\Camera_1\Video.sff")

# model = model.export(format="tensorrt", dynamic=True, opset=17)
# wheel_model = wheel_model.export(format="tensorrt", dynamic=True, opset=17)
video_path = r"D:\учеба\test\input\2)_30fps_720p.mp4"

In [4]:
deepsort_reader(
    reader=reader,
    model=model,
    target_classes=[2,3,5,7],
    use_tracking=True,
    use_axle=True,
    wheel_model=wheel_model,
    axle_for_all=True,
    axle_period=3,
    lines=LINES,
    draw_regions=True,
    csv_path="objects.csv"
)

  import pkg_resources


[INFO] Processed 10 frames
[INFO] Processed 20 frames
[INFO] Processed 30 frames
[INFO] Processed 40 frames
[INFO] Processed 50 frames
[INFO] Processed 60 frames
[INFO] Processed 70 frames
[INFO] Processed 80 frames
[INFO] Processed 90 frames
[INFO] Processed 100 frames
[INFO] Processed 110 frames
[INFO] Processed 120 frames
[INFO] Processed 130 frames
[INFO] Processed 140 frames
[INFO] Processed 150 frames
[INFO] Processed 160 frames
[INFO] Processed 170 frames
[INFO] Processed 180 frames
[INFO] Processed 190 frames
[INFO] Processed 200 frames
[INFO] Processed 210 frames
[INFO] Processed 220 frames
[INFO] Processed 230 frames
[INFO] Processed 240 frames
[INFO] Processed 250 frames
[INFO] Processed 260 frames
[INFO] Processed 270 frames
[INFO] Processed 280 frames
[INFO] Processed 290 frames
[INFO] Processed 300 frames
[INFO] Processed 310 frames
[INFO] Processed 320 frames
[INFO] Processed 330 frames
[INFO] Processed 340 frames
[INFO] Processed 350 frames
[INFO] Processed 360 frames
[

In [4]:
deepsort_video(
        video_path=video_path,
        model=model,
        target_classes=[2, 3, 5, 7],   # классы COCO: 2=авто,3=мото,5=bus,7=truck
        use_tracking=True,              # включить DeepSort
        show_track_id=True,             # рисовать ID трека
        use_axle=True,                  # считать оси
        wheel_model=wheel_model,        # модель для детекции колёс
        axle_for_all=True,              # считать для всех размеров боксов
        axle_period=3,                  # раз в 3 кадра
        # hz_start=700, hz_stop=5,        # зоны для подсчёта осей
        # vz_start=None, vz_stop=None,
        # zones=ZONES,                    # полигоны
        lines=LINES,                    # линии пересечения
        draw_regions=True,              # рисовать зоны/линии
        save_out=True,                  # сохранить видео
        out_path="out_with_tracks.mp4",
        out_fps=30,                    # фпс выходного видео
        csv_path="counts.csv",         # путь для CSV-таблицы            
)

  import pkg_resources


[INFO] Original FPS: 30.00. Processing all frames (no FPS reduction).
[INFO] Processed 10 frames
[INFO] Processed 20 frames
[INFO] Processed 30 frames
[INFO] Processed 40 frames
[INFO] Processed 50 frames
[INFO] Processed 60 frames
[INFO] Processed 70 frames
[INFO] Processed 80 frames
[INFO] Processed 90 frames
[INFO] Processed 100 frames
[INFO] Processed 110 frames
[INFO] Processed 120 frames
[INFO] Processed 130 frames
[INFO] Processed 140 frames
[INFO] Processed 150 frames
[INFO] Processed 160 frames
[INFO] Processed 170 frames
[INFO] Processed 180 frames
[INFO] Processed 190 frames
[INFO] Processed 200 frames
[INFO] Processed 210 frames
[INFO] Processed 220 frames
[INFO] Processed 230 frames
[INFO] Processed 240 frames
[INFO] Processed 250 frames
[INFO] Processed 260 frames
[INFO] Processed 270 frames
[INFO] Processed 280 frames
[INFO] Processed 290 frames
[INFO] Processed 300 frames
[INFO] Processed 310 frames
[INFO] Processed 320 frames
[INFO] Processed 330 frames
[INFO] Processe

In [11]:
import os
import pandas as pd
import ace_tools_open as tools

csv_path = "counts.csv"

if not os.path.isfile(csv_path):
    print(f"❗ Файл '{csv_path}' не найден. Убедитесь, что путь указан правильно.")
else:
    # Читаем CSV
    df = pd.read_csv(csv_path)
    # Отображаем интерактивную таблицу пользователю
    tools.display_dataframe_to_user("Counts Table", df)

Counts Table


0
Loading ITables v2.4.4 from the internet...  (need help?)


In [8]:
import csv

csv_path = "counts.csv"

with open(csv_path, newline='', encoding='utf-8') as f:
    reader = csv.reader(f)
    # Считаем заголовок
    header = next(reader)
    print("Заголовки столбцов:", header)
    # Пробегаем по строкам и печатаем
    for i, row in enumerate(reader, start=1):
        print(f"Строка {i}:", row)

Заголовки столбцов: ['track_id', 'class_id', 'axles', 'start_zone', 'end_zone', 'start_line', 'end_line', 'first_frame', 'last_frame', 'frames_total', 'enter_time', 'exit_time']
Строка 1: ['3', 'car', '2', '', '', '', '', '2', '11', '10', '', '2025-07-26T16:27:01.359153']
Строка 2: ['1', 'car', '2', '', '', '', '', '2', '12', '11', '', '2025-07-26T16:27:01.422705']
Строка 3: ['2', 'car', '2', '', '', '', '', '2', '13', '12', '', '2025-07-26T16:27:01.477832']
Строка 4: ['3', 'car', '2', '', '', '', '', '13', '13', '1', '', '2025-07-26T16:27:01.477832']
Строка 5: ['2', 'car', '2', '', '', '', '', '18', '18', '1', '', '2025-07-26T16:27:01.814843']
Строка 6: ['3', 'car', '2', '', '', '', '', '15', '26', '12', '', '2025-07-26T16:27:02.299594']
Строка 7: ['2', 'car', '2', '', '', '', '', '30', '30', '1', '', '2025-07-26T16:27:02.603831']
Строка 8: ['11', 'car', '2', '', '', '', '', '32', '36', '5', '', '2025-07-26T16:27:03.142841']
Строка 9: ['1', 'car', '2', '', '', '', '', '14', '41', '28'

# ResNet

In [None]:
# import torch
# import torch.nn as nn
# from torchvision import models, transforms
# from PIL import Image
# import torch.nn.functional as F

# # Список классов в порядке, на котором обучали
# class_names = ['bus', 'auto', 'tractor', 'tractor-trailer', 'truck', 'truck-trailer']

# # Устройство для вычислений
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # 1) Функция для загрузки модели и весов
# def load_model(weights_path: str) -> nn.Module:
#     model = models.resnet34(weights=None)
#     model.fc = nn.Linear(model.fc.in_features, len(class_names))
#     state = torch.load(weights_path, map_location=device)
#     model.load_state_dict(state)
#     model.to(device)
#     model.eval()
#     return model

# # 2) Препроцессинг изображений
# preprocess = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
# ])

# # 3) Функция предсказания
# def predict(model: nn.Module, image_path: str, topk: int = 6):
#     # Читаем и приводим к RGB
#     image = Image.open(image_path).convert('RGB')
#     # Преобразуем и добавляем батч
#     tensor = preprocess(image).unsqueeze(0).to(device)
#     with torch.no_grad():
#         logits = model(tensor)[0]
#         probs = F.softmax(logits, dim=0)
#     # Основной прогноз
#     pred_idx = probs.argmax().item()
#     pred_label = class_names[pred_idx]
#     # Топ-k для отладки
#     top_probs, top_idxs = probs.topk(topk)
#     topk_list = [(class_names[i], p.item()) for p, i in zip(top_probs, top_idxs)]
#     return pred_label, topk_list

# # 4) Пример использования
# if __name__ == '__main__':
#     # Поменяйте пути на свои
#     weights_path = r'C:\Users\acehm\OneDrive\Рабочий стол\baseline\resnet34_vehicle_classifier_224.pth'
#     test_image = r'E:\dataset\merged\images\2_000060.jpg'

#     model = load_model(weights_path)
#     pred, topk_list = predict(model, test_image)

#     print(f"Image: {test_image}")
#     print(f"Prediction: {pred}")
#     print("Top-k predictions:")
#     for label, prob in topk_list:
#         print(f"  {label}: {prob*100:.2f}%")
