In [None]:
!pip install ultralytics
import os
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import xgboost as xgb
from tqdm import tqdm

In [72]:
INPUT_VIDEO  = "/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Training/01.원천데이터/TS/영상/Y/SY/00135_H_A_SY_C4/00135_H_A_SY_C4.mp4"
OUTPUT_VIDEO = "/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/output_fall_detected_00135_H_A_SY_C4.mp4"
T = 10

In [73]:
pose_model = YOLO("yolov8n-pose.pt")

clf = xgb.XGBClassifier()
clf.load_model("/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/fall_xgb.json")

In [74]:
sk_buf   = deque(maxlen=T)
bb_buf   = deque(maxlen=T)
vid_buf  = deque(maxlen=T)
prev_gray = None

In [75]:
# COCO skeleton pairs
SKELETON_EDGES = [
    (0,1),(0,2),
    (1,3),(2,4),
    (5,6),
    (5,7),(7,9),
    (6,8),(8,10),
    (5,11),(6,12),
    (11,12),
    (11,13),(13,15),
    (12,14),(14,16)
]

In [76]:
def video_motion(prev_gray, gray):
    if prev_gray is None:
        return np.zeros(3)

    diff = cv2.absdiff(prev_gray, gray)
    mean_diff = np.mean(diff)
    max_diff = np.max(diff)

    return np.array([
        mean_diff / 255.0,
        max_diff / 255.0,
        np.std(diff) / 255.0
    ], dtype=np.float32)


def build_feature(sk, bb, vid):
    f = []
    for x in sk: f.extend(x)
    for x in bb: f.extend(x)
    f.extend(np.mean(np.array(vid), axis=0))
    return np.array(f, dtype=np.float32)

def draw_skeleton(frame, kp, conf, conf_th=0.3):
    kp = kp.astype(int)

    # joint
    for i, (x, y) in enumerate(kp):
        if conf[i] > conf_th:
            cv2.circle(frame, (x, y), 4, (0,255,255), -1)

    # bone
    for i, j in SKELETON_EDGES:
        if conf[i] > conf_th and conf[j] > conf_th:
            x1,y1 = kp[i].astype(int)
            x2,y2 = kp[j].astype(int)
            cv2.line(frame, (x1,y1), (x2,y2), (255,255,0), 2)

In [77]:
cap = cv2.VideoCapture(INPUT_VIDEO)

w = int(cap.get(3))
h = int(cap.get(4))
fps = cap.get(5)

writer = cv2.VideoWriter(
    OUTPUT_VIDEO,
    cv2.VideoWriter_fourcc(*"mp4v"),
    fps, (w, h)
)

In [79]:
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    res = pose_model(frame, verbose=False)[0]

    if (
        res.keypoints is not None and
        res.keypoints.xy is not None and
        len(res.keypoints.xy) > 0
    ):
        kp = res.keypoints.xy[0].cpu().numpy()
        conf = res.keypoints.conf[0].cpu().numpy()

        draw_skeleton(frame, kp, conf)

        sk = np.hstack([kp, conf[:, None]]).flatten()
        sk_buf.append(sk)

        box = res.boxes.xyxy[0].cpu().numpy()
        x1, y1, x2, y2 = map(int, box)
        cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
        bb_buf.append([cx, cy, x2 - x1, y2 - y1, cx / w, cy / h])

        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

    else:
        if len(sk_buf) > 0:
            sk_buf.append(sk_buf[-1])
            bb_buf.append(bb_buf[-1])

    # ❗ 사람 없어도 영상 모션은 계속
    vid_buf.append(video_motion(prev_gray, gray))
    prev_gray = gray

    if len(sk_buf) == T:
        feat = build_feature(sk_buf, bb_buf, vid_buf)
        prob = clf.predict_proba(feat.reshape(1, -1))[0, 1]

        label = "FALL" if prob > 0.7 else "NORMAL"
        color = (0, 0, 255) if label == "FALL" else (0, 255, 0)

        cv2.putText(
            frame, f"{label} {prob:.2f}",
            (30, 50),
            cv2.FONT_HERSHEY_SIMPLEX,
            1.2, color, 3
        )


    writer.write(frame)


In [80]:
cap.release()
writer.release()
print("✅ Done:", OUTPUT_VIDEO)

✅ Done: /content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/output_fall_detected_00135_H_A_SY_C4.mp4


---

In [54]:
INPUT_DIR = "/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상"

OUTPUT_DIR = "/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/output_validation"
os.makedirs(OUTPUT_DIR, exist_ok=True)

T = 10
THRESH = 0.7

In [45]:
pose_model = YOLO("yolov8n-pose.pt")

clf = xgb.XGBClassifier()
clf.load_model(
    "/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/fall_xgb.json"
)

In [46]:
SKELETON_EDGES = [
    (0,1),(0,2),
    (1,3),(2,4),
    (5,6),
    (5,7),(7,9),
    (6,8),(8,10),
    (5,11),(6,12),
    (11,12),
    (11,13),(13,15),
    (12,14),(14,16)
]

In [47]:
def video_motion(prev_gray, gray):
    if prev_gray is None:
        return np.zeros(3)
    diff = cv2.absdiff(prev_gray, gray)
    m = np.mean(diff) / 255.0
    return np.array([m, m, m], dtype=np.float32)


def build_feature(sk, bb, vid):
    f = []
    for x in sk: f.extend(x)
    for x in bb: f.extend(x)
    f.extend(np.mean(np.array(vid), axis=0))
    return np.array(f, dtype=np.float32)


def draw_skeleton(frame, kp, conf, conf_th=0.3):
    kp = kp.astype(int)

    for i, (x, y) in enumerate(kp):
        if conf[i] > conf_th:
            cv2.circle(frame, (x, y), 4, (0,255,255), -1)

    for i, j in SKELETON_EDGES:
        if conf[i] > conf_th and conf[j] > conf_th:
            x1,y1 = kp[i].astype(int)
            x2,y2 = kp[j].astype(int)
            cv2.line(frame, (x1,y1), (x2,y2), (255,255,0), 2)

In [56]:
import random

Y_videos = []
N_videos = []

for root, _, files in os.walk(INPUT_DIR):
    for f in files:
        if not f.lower().endswith(".mp4"):
            continue

        full = os.path.join(root, f)

        if "/Y/" in full:
            Y_videos.append(full)
        elif "/N/" in full:
            N_videos.append(full)

print("Y:", len(Y_videos), "N:", len(N_videos))

Y: 280 N: 312


In [60]:
random.seed(2025)

Y_sample = random.sample(Y_videos, 5)
N_sample = random.sample(N_videos, 5)

sample_videos = Y_sample + N_sample

for v in sample_videos:
    print(v)

/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상/Y/SY/00079_H_A_SY_C4/00079_H_A_SY_C4.mp4
/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상/Y/SY/02547_H_D_SY_C5/02547_H_D_SY_C5.mp4
/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상/Y/SY/00274_H_D_SY_C1/00274_H_D_SY_C1.mp4
/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상/Y/SY/02659_H_A_SY_C7/02659_H_A_SY_C7.mp4
/content/drive/MyDrive/[Projects]/AI Hub/Pose Detection 기반 실시간 낙상 감지 시스템 개발/Data/data/낙상사고 위험동작 영상-센서 쌍 데이터_병원,후면낙상/3.개방데이터/1.데이터/Validation/01.원천데이터/VS/영상/Y/SY/00001_H_A_SY_C1/00001_H_A_SY_C1.mp4
/content/drive/

In [61]:
for input_path in tqdm(sample_videos):
    video_name = os.path.basename(input_path)
    output_path = os.path.join(
        OUTPUT_DIR,
        video_name.replace(".mp4", "_fall_detected.mp4")
    )

    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"❌ Failed to open {video_name}")
        continue

    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    writer = cv2.VideoWriter(
        output_path,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps, (w, h)
    )

    sk_buf  = deque(maxlen=T)
    bb_buf  = deque(maxlen=T)
    vid_buf = deque(maxlen=T)
    prev_gray = None

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        res = pose_model(frame, verbose=False)[0]

        if res.keypoints is not None and len(res.keypoints.xy) > 0:
            kp = res.keypoints.xy[0].cpu().numpy()
            conf = res.keypoints.conf[0].cpu().numpy()

            draw_skeleton(frame, kp, conf)

            sk = np.hstack([kp, conf[:, None]]).flatten()
            sk_buf.append(sk)

            box = res.boxes.xyxy[0].cpu().numpy()
            x1,y1,x2,y2 = map(int, box)
            cx, cy = (x1+x2)/2, (y1+y2)/2
            bb_buf.append([cx, cy, x2-x1, y2-y1, cx/w, cy/h])

            cv2.rectangle(frame,(x1,y1),(x2,y2),(255,0,0),2)

        vid_buf.append(video_motion(prev_gray, gray))
        prev_gray = gray

        if len(sk_buf) == T:
            feat = build_feature(sk_buf, bb_buf, vid_buf)
            prob = clf.predict_proba(feat.reshape(1,-1))[0,1]

            label = "FALL" if prob > THRESH else "NORMAL"
            color = (0,0,255) if label=="FALL" else (0,255,0)

            cv2.putText(
                frame,
                f"{label} {prob:.2f}",
                (30,50),
                cv2.FONT_HERSHEY_SIMPLEX,
                1.2,
                color,
                3
            )

        writer.write(frame)

    cap.release()
    writer.release()

print("✅ All validation videos processed")

100%|██████████| 10/10 [03:16<00:00, 19.65s/it]

✅ All validation videos processed



