In [1]:
import csv
from pathlib import Path
from typing import Tuple, Dict

import cv2
import numpy as np
from ultralytics import YOLO

# OCR
import easyocr



In [2]:
# Пути и параметры

# Входное видео
VIDEO_DIR = Path("../data")
INPUT_NAME = "Автотрафик.mp4"
VIDEO_PATH = VIDEO_DIR / INPUT_NAME

# Выходы
OUTPUT_VIDEO = VIDEO_DIR / f"{VIDEO_PATH.stem}_counted.mp4"
PLATES_CSV   = VIDEO_DIR / f"{VIDEO_PATH.stem}_plates.csv"
CROSS_CSV    = VIDEO_DIR / f"{VIDEO_PATH.stem}_crossings.csv"

YOLO_WEIGHTS = "yolov8s.pt"

# Детекция только машин (COCO: car = 2)
CAR_CLASS_ID = 2
CONF_THRES = 0.5
IOU_THRES  = 0.5

# Диагональная линия: позже вычислим от размеров кадра (0.1W,0.9H) -> (0.9W,0.1H)
LINE_START_RATIO = (0.10, 0.05)  # (x_ratio, y_ratio)
LINE_END_RATIO   = (0.95, 0.95)

# В каком направлении считаем пересечение:
# Для вектора (line_start -> line_end) учитываем только переход "с левой стороны" -> "на правую"
COUNT_DIRECTION_LEFT_TO_RIGHT = True

# OCR: распознаём 1 раз на трек (или несколько попыток)
OCR_LANGS = ['en', 'ru']
OCR_MAX_ATTEMPTS_PER_ID = 3  # ограничение попыток OCR на один ID (чтобы не тормозить)


In [3]:
# ЯЧЕЙКА 2 — Хелперы

def point_side_sign(point: Tuple[float, float], a: Tuple[float, float], b: Tuple[float, float]) -> int:
    """
    Знак ориентации точки относительно направленного отрезка a->b.
    >0: точка слева от направления (a->b)
    <0: точка справа
     0: на линии (коллинеарна)
    """
    (px, py) = point
    (ax, ay) = a
    (bx, by) = b
    cross = (bx - ax) * (py - ay) - (by - ay) * (px - ax)
    if cross > 0:
        return 1
    elif cross < 0:
        return -1
    else:
        return 0

def clamp_box(x1, y1, x2, y2, w, h):
    """Обрезка бокса к границам кадра."""
    x1 = max(0, min(int(x1), w - 1))
    y1 = max(0, min(int(y1), h - 1))
    x2 = max(0, min(int(x2), w - 1))
    y2 = max(0, min(int(y2), h - 1))
    if x2 <= x1: x2 = min(x1 + 1, w - 1)
    if y2 <= y1: y2 = min(y1 + 1, h - 1)
    return x1, y1, x2, y2

def preprocess_for_ocr(roi_bgr):
    """Небольшая предобработка ROI: в серый + порог Отсу (инверсия)."""
    gray = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2GRAY)
    # Подавление шума
    gray = cv2.bilateralFilter(gray, d=7, sigmaColor=50, sigmaSpace=50)
    # Порог
    _, thr = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)
    return thr

def draw_label(img, text, org, color=(0, 255, 255), scale=0.6, thickness=2):
    """Подпись с лёгкой «обводкой» для читаемости."""
    x, y = org
    cv2.putText(img, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, scale, (0,0,0), thickness+2, cv2.LINE_AA)
    cv2.putText(img, text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, scale, color, thickness, cv2.LINE_AA)


In [4]:
# ЯЧЕЙКА 3 — Инициализация

assert VIDEO_PATH.exists(), f"Видео не найдено: {VIDEO_PATH}"

# YOLOv8
model = YOLO(YOLO_WEIGHTS)

# OCR (EasyOCR)
reader = easyocr.Reader(OCR_LANGS)

print("Готово: модель YOLO и OCR инициализированы.")


Готово: модель YOLO и OCR инициализированы.


In [5]:
# ЯЧЕЙКА 4 — Основной цикл: читаем видео, считаем пересечения, распознаём номера, записываем результат

cap = cv2.VideoCapture(str(VIDEO_PATH))
assert cap.isOpened(), "Не удалось открыть видеофайл"

fps = cap.get(cv2.CAP_PROP_FPS) or 24.0
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Диагональная линия относительно размеров кадра
line_start = (int(w * LINE_START_RATIO[0]), int(h * LINE_START_RATIO[1]))
line_end   = (int(w * LINE_END_RATIO[0]), int(h * LINE_END_RATIO[1]))

# Инициализация видеозаписи
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(str(OUTPUT_VIDEO), fourcc, fps, (w, h))

# Служебные структуры
last_side: Dict[int, int] = {}        # track_id -> (-1/0/+1) сторона относительно линии на прошлом кадре
counted_ids = set()                   # уже посчитанные ID (пересечение в нужном направлении)
cross_count = 0

plate_by_id: Dict[int, str] = {}      # распознанный номер для ID
ocr_attempts_left: Dict[int, int] = {}# осталось попыток OCR для ID
plate_rows = []                       # для CSV: (time_sec, track_id, plate_text)
cross_rows = []                       # для CSV: (time_sec, track_id)

frame_idx = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1

    # Детекция+трекинг (persist=True — состояние трекера сохраняется между кадрами)
    results = model.track(frame, persist=True, classes=[CAR_CLASS_ID], conf=CONF_THRES, iou=IOU_THRES)
    boxes = []
    ids   = []
    confs = []
    clss  = []

    if results and len(results) > 0 and results[0].boxes is not None and len(results[0].boxes) > 0:
        b = results[0].boxes
        # xyxy, id, conf, cls — тензоры; приводим к numpy/int
        boxes = b.xyxy.cpu().numpy()
        confs = b.conf.cpu().numpy() if b.conf is not None else np.zeros((len(boxes),))
        clss  = b.cls.cpu().numpy()  if b.cls  is not None else np.full((len(boxes),), CAR_CLASS_ID)
        if b.id is not None:
            ids = b.id.int().cpu().numpy()
        else:
            ids = np.array([-1] * len(boxes), dtype=int)

    # Рисуем диагональную линию
    cv2.line(frame, line_start, line_end, (255, 0, 0), 2)

    # Для каждого трека — центр, подсчёт, визуализация, OCR
    for i in range(len(boxes)):
        x1, y1, x2, y2 = boxes[i]
        score = float(confs[i])
        clsid = int(clss[i])
        track_id = int(ids[i])

        # Страховка по классу
        if clsid != CAR_CLASS_ID or score < CONF_THRES:
            continue

        # Центр бокса
        cx = int((x1 + x2) / 2)
        cy = int((y1 + y2) / 2)

        # Подсчёт пересечения: знак стороны относительно направленной линии (line_start -> line_end)
        side_now = point_side_sign((cx, cy), line_start, line_end)

        if track_id not in last_side.keys():
            last_side[track_id] = side_now
            # Инициализируем счётчик попыток OCR для трека
            ocr_attempts_left.setdefault(track_id, OCR_MAX_ATTEMPTS_PER_ID)
        else:
            prev = last_side[track_id]
            # Фиксируем переход слева -> справа
            if COUNT_DIRECTION_LEFT_TO_RIGHT:
                crossed_now = (prev > 0 and side_now < 0)
            else:
                crossed_now = (prev < 0 and side_now > 0)

            if crossed_now and (track_id not in counted_ids):
                counted_ids.add(track_id)
                cross_count += 1
                # отметим время события
                time_sec = (cap.get(cv2.CAP_PROP_POS_MSEC) or 0.0) / 1000.0
                cross_rows.append((f"{time_sec:.3f}", track_id))
            # Обновим сторону
            last_side[track_id] = side_now

        # Цвет бокса в зависимости от того, пересекал ли линию
        color = (0, 255, 0) if (track_id in counted_ids) else (0, 0, 255)
        x1i, y1i, x2i, y2i = clamp_box(x1, y1, x2, y2, w, h)
        cv2.rectangle(frame, (x1i, y1i), (x2i, y2i), color, 2)

        # Центр
        cv2.circle(frame, (cx, cy), 4, color, -1)

        # Подпись ID и класс
        label = f"ID {track_id} Car"
        draw_label(frame, label, (x1i, max(20, y1i - 10)), color=color)

        # OCR: если номер ещё не распознали и остались попытки — пробуем
        if (track_id not in plate_by_id) and (ocr_attempts_left.get(track_id, 0) > 0):
            roi = frame[y1i:y2i, x1i:x2i]
            if roi.size > 0:
                thr = preprocess_for_ocr(roi)
                # EasyOCR вернёт список [ (bbox, text, conf), ... ]
                results_ocr = reader.readtext(thr, detail=1)
                if results_ocr:
                    # Сортируем по уверености
                    results_ocr.sort(key=lambda x: x[2], reverse=True)
                    raw_text = results_ocr[0][1]
                    plate = "".join(ch for ch in raw_text if ch.isalnum()).upper()
                    if len(plate) >= 3:
                        plate_by_id[track_id] = plate
                        time_sec = (cap.get(cv2.CAP_PROP_POS_MSEC) or 0.0) / 1000.0
                        plate_rows.append((f"{time_sec:.3f}", track_id, plate))
                    else:
                        ocr_attempts_left[track_id] -= 1
                else:
                    ocr_attempts_left[track_id] -= 1

        # Если номер известен — показываем его
        if track_id in plate_by_id:
            draw_label(frame, plate_by_id[track_id], (x1i, max(20, y1i - 30)), color=(255, 255, 0))

    # Общий счётчик
    draw_label(frame, f"Count: {cross_count}", (10, 30), color=(0, 255, 255), scale=1.0)

    # Запись кадра
    out.write(frame)

cap.release()
out.release()

print(f"ГОТОВО: видео сохранено -> {OUTPUT_VIDEO}")
print(f"Пересечений: {cross_count}")



0: 384x640 2 cars, 59.3ms
Speed: 0.8ms preprocess, 59.3ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 50.8ms
Speed: 0.6ms preprocess, 50.8ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 50.6ms
Speed: 0.6ms preprocess, 50.6ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 50.4ms
Speed: 0.6ms preprocess, 50.4ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 car, 49.0ms
Speed: 0.6ms preprocess, 49.0ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 49.6ms
Speed: 0.6ms preprocess, 49.6ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 49.4ms
Speed: 0.8ms preprocess, 49.4ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 cars, 49.9ms
Speed: 1.1ms preprocess, 49.9ms inference, 0.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640

In [6]:
# CSV с номерами и временем; CSV с событиями пересечения

# Номера
with open(PLATES_CSV, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["time_sec", "track_id", "plate_number"])
    for row in plate_rows:
        w.writerow(list(row))

# Пересечения
with open(CROSS_CSV, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["time_sec", "track_id"])
    for row in cross_rows:
        w.writerow(list(row))

print(f"CSV с номерами -> {PLATES_CSV}")
print(f"CSV с пересечениями -> {CROSS_CSV}")


CSV с номерами -> ../data/Автотрафик_plates.csv
CSV с пересечениями -> ../data/Автотрафик_crossings.csv
