In [None]:
import os
import time
import signal
import subprocess
import threading
import statistics
import shutil
import re
import urllib.parse
from pathlib import Path
from dataclasses import dataclass

import cv2
import numpy as np
import torch
from dotenv import load_dotenv
from ultralytics import YOLO

In [None]:
if not os.path.exists(os.path.join(os.getcwd(), ".env")):
    raise FileNotFoundError("'.env' file not found at current directory.")

load_dotenv()

HOST = os.getenv("HOST")
GO2RTC_RTSP_PORT = int(os.getenv("GO2RTC_RTSP_PORT"))

In [None]:
CAMERAS = [
        ("cam_09_стол", "cam_09_стол_AI"),
        ("cam_10_склад", "cam_10_склад_AI"),
        ("cam_11_улица", "cam_11_улица_AI"),
        ]

In [None]:
def rtsp_in(name: str) -> str:
    # чтение из go2rtc
    return f"rtsp://{HOST}:{GO2RTC_RTSP_PORT}/{urllib.parse.quote(name)}?video"

In [None]:
def rtsp_out(name: str) -> str:
    # push обратно в go2rtc
    return f"rtsp://{HOST}:{GO2RTC_RTSP_PORT}/{urllib.parse.quote(name)}"

In [None]:
# Иногда помогает стабильности RTSP в OpenCV/FFmpeg
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp|max_delay;500000|stimeout;5000000|loglevel;error"

print("IN URL example:", rtsp_in(CAMERAS[0][0]))
print("OUT URL example:", rtsp_out(CAMERAS[0][1]))

In [None]:
MODEL_PATH = (Path.cwd() / "from_GitHub/dmmmit_smoking_detection/models/final_model.pt").resolve()
assert MODEL_PATH.is_file(), f"Model not found: {MODEL_PATH}"

model = YOLO(str(MODEL_PATH))

# Если CUDA доступна — используй её
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    torch.backends.cudnn.benchmark = True


In [None]:
def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    inter = max(0, xB - xA) * max(0, yB - yA)

    if inter == 0:
        return 0.0

    areaA = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    areaB = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    return inter / (areaA + areaB - inter + 1e-9)


In [None]:
def smoking_score(results):
    # Эвристика: максимум из
    # - conf у класса hands_with_cigarettes
    # - conf у сигареты, которая пересекается с человеком
    names = results.names
    boxes = results.boxes

    persons = []
    cigarettes = []
    hands = []

    for b in boxes:
        cls_id = int(b.cls.item())
        conf = float(b.conf.item())
        name = names.get(cls_id, str(cls_id))
        xyxy = b.xyxy[0].cpu().numpy().tolist()

        if name == "Person":
            persons.append((xyxy, conf))

        elif name == "cigarette":
            cigarettes.append((xyxy, conf))

        elif name == "hands_with_cigarettes":
            hands.append(conf)

    score = max(hands) if hands else 0.0

    for c_box, c_conf in cigarettes:
        for p_box, _ in persons:
            if iou(c_box, p_box) > 0.01:
                score = max(score, c_conf)

    return score


In [None]:
def open_video_capture(stream_url: str) -> cv2.VideoCapture:
    video_capture = None

    try:
        video_capture = cv2.VideoCapture(stream_url, cv2.CAP_FFMPEG)
        video_capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
    except Exception as e:
        print(f"Exception: {e}")

    if not video_capture.isOpened():
        raise RuntimeError(f"RTSP stream not opened: {stream_url}")

    return video_capture


In [None]:
def start_ffmpeg_proc(out_url: str, w: int, h: int, fps: int) -> subprocess.Popen:
    cmd = [
            "ffmpeg", "-hide_banner",
            "-loglevel", "error",
            "-f", "rawvideo",
            "-pix_fmt", "bgr24",
            "-s", f"{w}x{h}",
            "-r", str(fps),
            "-i", "-",
            "-an",
            "-c:v", "libx264",
            "-pix_fmt", "yuv420p",
            "-preset", "ultrafast",
            "-tune", "zerolatency",
            "-crf", "30",
            "-g", str(fps),
            "-bf", "0",
            "-f", "rtsp",
            "-rtsp_transport", "tcp",
            out_url
            ]

    return subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            bufsize=0
            )

In [None]:
def stop_ffmpeg_proc(ffmpeg_proc: subprocess.Popen | None):
    if ffmpeg_proc is None:
        return

    try:
        if ffmpeg_proc.stdin:
            ffmpeg_proc.stdin.close()
    except Exception as e:
        print(f"Exception: {e}")
        pass

    try:
        ffmpeg_proc.send_signal(signal.SIGINT)
        ffmpeg_proc.wait(timeout=2)
    except Exception as e:
        ffmpeg_proc.kill()
        print(f"Exception: {e}")


In [None]:
def is_downscale_needed(h, w, max_w=1920, max_h=1080) -> bool:
    return w > max_w or h > max_h


In [None]:
def downscale_frame(frame, h, w, max_w=1920, max_h=1080) -> cv2.VideoCapture:
    scale = min(max_w / w, max_h / h)
    new_w, new_h = int(w * scale), int(h * scale)
    resized = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
    return resized


In [None]:
def draw_predictions(frame, predictions):
    annotated = frame  # рисуем поверх
    names = predictions.names

    for b in predictions.boxes:
        x1, y1, x2, y2 = map(int, b.xyxy[0].tolist())
        cls_id = int(b.cls.item())
        conf_value = float(b.conf.item())
        name = names.get(cls_id, str(cls_id))

        cv2.rectangle(
                img=annotated,
                pt1=(x1, y1),
                pt2=(x2, y2),
                color=(0, 255, 0),
                thickness=2
                )

        cv2.putText(
                img=annotated,
                text=f"{name} {conf_value:.2f}",
                org=(x1, max(20, y1 - 5)),
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.6,
                color=(0, 255, 0),
                thickness=2,
                lineType=cv2.LINE_AA
                )

    smoke_prob = smoking_score(predictions)
    cv2.putText(
            img=annotated,
            text=f"smoking (heuristic): {smoke_prob:.2f}",
            org=(10, 60),
            fontFace=cv2.FONT_HERSHEY_SIMPLEX,
            fontScale=0.9,
            color=(0, 0, 0),
            thickness=2,
            lineType=cv2.LINE_AA
            )

    return annotated

In [None]:
@dataclass
class CamState:
    name_in: str
    name_out: str
    url_in: str
    url_out: str
    cap: cv2.VideoCapture | None = None
    ffmpeg: subprocess.Popen | None = None

    lock: threading.Lock = threading.Lock()
    last_frame: np.ndarray | None = None
    last_frame_ts: float = 0.0

    # stats
    cap_frames: int = 0
    out_frames: int = 0
    misses: int = 0
    last_proc_dt: float = 0.0

In [None]:
def capture_worker(cs: CamState, stop_evt: threading.Event):
    try:
        cs.cap = open_video_capture(cs.url_in)
        while not stop_evt.is_set():
            ok, frame = cs.cap.read()
            if not ok or frame is None:
                time.sleep(0.05)
                continue
            with cs.lock:
                cs.last_frame = frame
                cs.last_frame_ts = time.perf_counter()
                cs.cap_frames += 1
    finally:
        try:
            if cs.cap is not None:
                cs.cap.release()
        except Exception:
            pass

In [None]:
def get_sys_stats():
    # Минимальный, без зависимости от Jetson/не-Jetson
    stats = { }
    try:
        import psutil
        stats["cpu_pct"] = psutil.cpu_percent(interval=None)
        vm = psutil.virtual_memory()
        stats["ram_used_gb"] = vm.used / (1024 ** 3)
        stats["ram_total_gb"] = vm.total / (1024 ** 3)
    except Exception:
        pass

    if torch.cuda.is_available():
        try:
            stats["cuda_mem_alloc_gb"] = torch.cuda.memory_allocated() / (1024 ** 3)
            stats["cuda_mem_reserved_gb"] = torch.cuda.memory_reserved() / (1024 ** 3)
        except Exception:
            pass

    # Jetson tegrastats (если есть) — вытаскиваем GR3D_FREQ и RAM
    if shutil.which("tegrastats"):
        try:
            # Некоторые сборки tegrastats понимают --interval/--count, некоторые нет.
            # Попытка 1:
            cmd = ["tegrastats", "--interval", "1000", "--count", "1"]
            p = subprocess.run(cmd, capture_output=True, text=True, timeout=2)
            line = (p.stdout or "").strip().splitlines()[-1] if p.returncode == 0 and p.stdout else ""
            if not line:
                # Попытка 2:
                p2 = subprocess.Popen(["tegrastats"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True)
                line = (p2.stdout.readline() or "").strip()
                p2.terminate()

            # RAM 1234/7777MB ... GR3D_FREQ 76%
            m = re.search(r"RAM\s+(\d+)\s*/\s*(\d+)MB", line)
            if m:
                stats["jetson_ram_used_mb"] = int(m.group(1))
                stats["jetson_ram_total_mb"] = int(m.group(2))
            g = re.search(r"GR3D_FREQ\s+(\d+)%", line)
            if g:
                stats["jetson_gr3d_pct"] = int(g.group(1))
        except Exception:
            pass

    return stats

In [None]:
def run_multicam(
        target_fps: int = 5,
        seconds: int | None = None,
        conf: float = 0.25,
        imgsz: int = 640,
        down_w: int = 1280,
        down_h: int = 720,
        print_every_sec: float = 2.0,
        ):
    cams = [
            CamState(n_in, n_out, rtsp_in(n_in), rtsp_out(n_out))
            for (n_in, n_out) in CAMERAS
            ]

    stop_evt = threading.Event()
    threads = [threading.Thread(target=capture_worker, args=(cs, stop_evt), daemon=True) for cs in cams]
    for t in threads: t.start()

    period = 1.0 / target_fps
    next_t = time.perf_counter()
    t0 = time.perf_counter()
    last_print = t0

    infer_dts = []  # для p50/p90/p99

    try:
        while True:
            now = time.perf_counter()
            if seconds is not None and (now - t0) >= seconds:
                break

            # pacing
            next_t += period
            sleep = next_t - time.perf_counter()
            if sleep > 0:
                time.sleep(sleep)
            else:
                next_t = time.perf_counter()

            # собрать последние кадры
            frames = []
            idx_map = []
            for i, cs in enumerate(cams):
                with cs.lock:
                    fr = cs.last_frame
                if fr is None:
                    continue
                frames.append(fr)
                idx_map.append(i)

            if not frames:
                continue

            t_inf0 = time.perf_counter()
            preds = model.predict(frames, conf=conf, imgsz=imgsz, verbose=False)
            t_inf1 = time.perf_counter()
            infer_dt = (t_inf1 - t_inf0)
            infer_dts.append(infer_dt)

            # обработка и publish по камерам
            for j, res in enumerate(preds):
                cs = cams[idx_map[j]]
                t_p0 = time.perf_counter()

                frame = frames[j]

                h, w = frame.shape[:2]

                if is_downscale_needed(h, w, down_w, down_h):
                    frame = downscale_frame(frame, h, w, down_w, down_h)

                annotated = draw_predictions(frame.copy(), res)

                if cs.ffmpeg is None:
                    h, w = annotated.shape[:2]
                    cs.ffmpeg = start_ffmpeg_proc(cs.url_out, w, h, target_fps)
                    time.sleep(0.2)

                try:
                    cs.ffmpeg.stdin.write(annotated.tobytes())
                    cs.out_frames += 1
                except Exception:
                    cs.misses += 1

                cs.last_proc_dt = time.perf_counter() - t_p0

            # периодический отчёт
            if (time.perf_counter() - last_print) >= print_every_sec:
                wall = time.perf_counter() - t0
                sys_stats = get_sys_stats()

                print(f"\n=== t={wall:.1f}s target_fps={target_fps} cams={len(cams)} ===")
                if sys_stats:
                    print("SYS:", sys_stats)

                for cs in cams:
                    out_fps = cs.out_frames / wall if wall > 0 else 0.0
                    cap_fps = cs.cap_frames / wall if wall > 0 else 0.0
                    print(
                            f"{cs.name_in}: cap_fps≈{cap_fps:.2f} out_fps≈{out_fps:.2f} "
                            f"misses={cs.misses}/{cs.out_frames} last_proc={cs.last_proc_dt * 1000:.1f}ms"
                            )

                # latency stats по inference (на batch)
                if len(infer_dts) >= 5:
                    s = sorted(infer_dts)

                    def pct(p):
                        k = (len(s) - 1) * (p / 100.0)
                        f = int(k) ;
                        c = min(f + 1, len(s) - 1)
                        return s[f] if f == c else s[f] + (s[c] - s[f]) * (k - f)

                    print(
                            f"INFER batch_dt: p50={pct(50) * 1000:.1f}ms p90={pct(90) * 1000:.1f}ms "
                            f"p99={pct(99) * 1000:.1f}ms max={max(s) * 1000:.1f}ms n={len(s)}"
                            )
                    infer_dts.clear()

                last_print = time.perf_counter()

    finally:
        stop_evt.set()
        for t in threads:
            t.join(timeout=1)

        for cs in cams:
            stop_ffmpeg_proc(cs.ffmpeg)

    return cams

In [None]:
cams_result = run_multicam(
        target_fps=5,
        seconds=30,
        conf=0.25,
        imgsz=640,
        down_w=1280,
        down_h=720,
        print_every_sec=2.0
        )


In [None]:
def capacity_sweep(fps_list=(2, 3, 4, 5, 6, 7, 8, 10), seconds=20):
    results = []
    for fps in fps_list:
        print("\n\n########### TEST FPS =", fps, "###########")
        cams = run_multicam(target_fps=fps, seconds=seconds, print_every_sec=2.0)

        # оценка “тянет / не тянет”: если любая камера сильно просела
        wall = seconds
        out_fps_list = [(cs.name_in, cs.out_frames / wall if wall > 0 else 0.0) for cs in cams]
        min_out = min(v for _, v in out_fps_list) if out_fps_list else 0.0

        results.append((fps, out_fps_list))
        print("OUT_FPS:", out_fps_list)

        # критерий: минимальная камера должна держать хотя бы 90% цели
        if min_out < fps * 0.90:
            print("STOP: capacity reached (min_out_fps too low)")
            break
    return results


sweep = capacity_sweep(fps_list=(2, 3, 4, 5, 6, 7, 8), seconds=20)
