
# YOLO + DeepSORT 기반 사람 추적 & 쓰러짐 감지 (중복 카운트 방지)
- YOLO로 사람을 **탐지**
- DeepSORT로 **ID 추적**하여 **중복 카운트 방지**
- 박스가 가로로 길면 **쓰러짐(FALL)** 으로 판정
- 개발/운영 편의를 위해 **환경변수/파라미터 기반 설정**과 **시각화** 포함


In [None]:
# (선택) 의존성 설치: 로컬 환경에서 한 번만 실행하세요.
# Colab이라면 아래 주석 해제 후 실행
# %pip install ultralytics deep-sort-realtime opencv-python pillow

In [1]:
import os
import time
import json

import cv2
import numpy as np
from ultralytics import YOLO

# DeepSORT
from deep_sort_realtime.deepsort_tracker import DeepSort

print("Imports OK")

Imports OK


In [13]:

# =======================
# 설정값
# =======================
VIDEO_PATH = os.getenv("VIDEO_PATH", "../backend/videos/iho_beach.mp4")   # 처리할 비디오
YOLO_WEIGHTS = os.getenv("YOLO_WEIGHTS", "yolov8n.pt")         # 가벼운 기본 모델
YOLO_CONF = float(os.getenv("YOLO_CONF", "0.35"))
YOLO_IOU = float(os.getenv("YOLO_IOU", "0.5"))
YOLO_IMGSZ = int(os.getenv("YOLO_IMGSZ", "960"))
FRAME_STRIDE = int(os.getenv("FRAME_STRIDE", "1"))             # 프레임 샘플링

# 쓰러짐 휴리스틱
FALL_RATIO = float(os.getenv("FALL_RATIO", "1.8"))             # width/height >= 1.8
FALL_MAX_HEIGHT_RATIO = float(os.getenv("FALL_MAX_HEIGHT_RATIO", "0.35"))  # h/frame_h <= 0.35

# 추적기 설정(실무에서 환경에 맞게 조정)
DEEPSORT_CFG = dict(
    max_age=30,        # 사라진 후 유지 프레임
    n_init=3,          # 초기화에 필요한 연속 관측 수
    max_iou_distance=0.7,
    max_cosine_distance=0.2,
    nn_budget=None,    # ReID 메모리 버퍼
    embedder="mobilenet",
    half=True
)

# 쿨다운 설정(중복 알림 방지)
FALL_COOLDOWN_SEC = float(os.getenv("FALL_COOLDOWN_SEC", "5.0"))


In [14]:

# =======================
# 유틸 함수
# =======================

def compute_aspect_fall(l, t, r, b, frame_h):
    """LTRB 박스 기준으로 aspect ratio와 쓰러짐 여부 계산"""
    w = max(1, r - l)
    h = max(1, b - t)
    aspect = w / h
    fallen = (aspect >= FALL_RATIO) and ((h / max(1, frame_h)) <= FALL_MAX_HEIGHT_RATIO)
    return aspect, fallen

def draw_box_with_label(frame, l, t, r, b, label, fallen=False):
    color = (0, 0, 255) if fallen else (0, 255, 0)  # 쓰러짐: 빨강
    cv2.rectangle(frame, (l, t), (r, b), color, 2)
    cv2.putText(frame, label, (l, max(0, t - 8)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

def density_level(n, th_low=5, th_mid=15):
    if n < th_low:
        return "낮음"
    elif n < th_mid:
        return "중간"
    return "높음"


In [15]:

# =======================
# 모델/추적기 초기화
# =======================
model = YOLO(YOLO_WEIGHTS)
tracker = DeepSort(**DEEPSORT_CFG)

print("Model & Tracker ready")


Model & Tracker ready


In [16]:

# =======================
# 메인 루프
# =======================
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"비디오를 열 수 없습니다: {VIDEO_PATH}")

unique_ids = set()           # 중복 카운트 방지용
last_fall_alert = dict()     # {track_id: last_timestamp}
frame_index = 0

# 결과 요약
stats = {
    "unique_person_count": 0,
    "last_visible_count": 0,
    "last_fallen_visible": 0,
    "total_fall_alerts": 0
}

while True:
    ret, frame = cap.read()
    if not ret:
        break

    if (frame_index % FRAME_STRIDE) != 0:
        frame_index += 1
        continue

    H, W = frame.shape[:2]

    # 1) YOLO 사람 탐지
    results = model(frame, classes=[0], imgsz=YOLO_IMGSZ, conf=YOLO_CONF, iou=YOLO_IOU, verbose=False)
    dets_xywh_conf_cls = []  # DeepSORT 입력: [[x,y,w,h], conf, cls]

    if len(results):
        r0 = results[0]
        for box in r0.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            conf = float(box.conf[0]) if hasattr(box, "conf") else 0.0
            # DeepSORT는 xywh 필요
            w = x2 - x1
            h = y2 - y1
            dets_xywh_conf_cls.append([[x1, y1, w, h], conf, 0])  # cls=0(person)

    # 2) DeepSORT 추적
    tracks = tracker.update_tracks(dets_xywh_conf_cls, frame=frame)

    visible_count = 0
    fallen_visible = 0

    for trk in tracks:
        if not trk.is_confirmed():
            continue
        l, t, r, b = map(int, trk.to_ltrb())
        track_id = trk.track_id

        # 고유 ID 카운트(최초 1회만)
        if track_id not in unique_ids:
            unique_ids.add(track_id)
            stats["unique_person_count"] = len(unique_ids)

        visible_count += 1

        # 3) 쓰러짐 판정 (트랙 박스 기준)
        aspect, fallen = compute_aspect_fall(l, t, r, b, H)
        if fallen:
            fallen_visible += 1
            now = time.time()
            if (track_id not in last_fall_alert) or (now - last_fall_alert[track_id] >= FALL_COOLDOWN_SEC):
                last_fall_alert[track_id] = now
                stats["total_fall_alerts"] += 1
                print(json.dumps({
                    "type": "fall_alert",
                    "track_id": int(track_id),
                    "timestamp": now,
                    "bbox": {"l": int(l), "t": int(t), "r": int(r), "b": int(b)},
                    "aspect_ratio": round(aspect, 2)
                }, ensure_ascii=False))

        # 4) 시각화
        label = f"ID:{track_id}"
        if fallen:
            label += " FALL"
        draw_box_with_label(frame, l, t, r, b, label, fallen=fallen)

    stats["last_visible_count"] = visible_count
    stats["last_fallen_visible"] = fallen_visible

    # 우상단 요약 표시
    info = f"Visible: {visible_count} | Unique: {stats['unique_person_count']} | Fallen: {fallen_visible}"
    cv2.putText(frame, info, (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)

    cv2.imshow("YOLO + DeepSORT (FALL)", frame)
    if cv2.waitKey(1) & 0xFF == 27:  # ESC 종료
        break

    frame_index += 1

cap.release()
cv2.destroyAllWindows()

print("== 요약 ==")
print(json.dumps(stats, ensure_ascii=False, indent=2))

== 요약 ==
{
  "unique_person_count": 8,
  "last_visible_count": 8,
  "last_fallen_visible": 0,
  "total_fall_alerts": 0
}
