# Fall Detection Full Pipeline (Training, Evaluation, Inference)

In [2]:
# ✅ 1. Import Libraries
import os, cv2, glob
import numpy as np
import mediapipe as mp
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score, f1_score, classification_report


In [3]:
# ✅ 2. Pose Extractor
class PoseExtractor:
    def __init__(self, num_frames=16):
        self.pose = mp.solutions.pose.Pose(static_image_mode=False)
        self.num_frames = num_frames

    def extract_keypoints(self, video_path):
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_idxs = np.linspace(0, total_frames - 1, self.num_frames).astype(int)
        keypoints_sequence = []

        for i in range(total_frames):
            ret, frame = cap.read()
            if not ret:
                break
            if i in frame_idxs:
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                result = self.pose.process(frame_rgb)

                if result.pose_landmarks:
                    keypoints = []
                    for lm in result.pose_landmarks.landmark:
                        keypoints.extend([lm.x, lm.y, lm.z, lm.visibility])
                else:
                    keypoints = [0] * (33 * 4)

                keypoints_sequence.append(keypoints)

        cap.release()

        if len(keypoints_sequence) == 0:
            print(f"[❗] No keypoints extracted in: {video_path}")
            return None

        while len(keypoints_sequence) < self.num_frames:
            keypoints_sequence.append(keypoints_sequence[-1])

        keypoints_sequence = np.array(keypoints_sequence)
        velocity = np.diff(keypoints_sequence, axis=0, prepend=keypoints_sequence[0:1])
        combined = np.concatenate([keypoints_sequence, velocity], axis=1)
        return combined


In [4]:
# ✅ 3. Custom Dataset
class FallDataset(Dataset):
    def __init__(self, npy_files, labels=None, return_fname=False):
        self.files = npy_files
        self.labels = labels
        self.return_fname = return_fname

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        keypoints = np.load(self.files[idx])
        keypoints = torch.tensor(keypoints, dtype=torch.float32)
        if self.labels is not None:
            label = torch.tensor(self.labels[idx], dtype=torch.long)
            return keypoints, label
        elif self.return_fname:
            fname = os.path.basename(self.files[idx])
            return keypoints, fname
        else:
            return keypoints


In [5]:
# ✅ 4. GRU Classifier
class FallGRUClassifier(nn.Module):
    def __init__(self, input_size=264, hidden_size=64, num_layers=1, num_classes=2):
        super().__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        _, h_n = self.gru(x)
        return self.fc(h_n[-1])


In [6]:
# ✅ 5. Training and Evaluation Functions
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    all_preds, all_labels = [], []
    total_loss = 0
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        all_preds += outputs.argmax(dim=1).cpu().tolist()
        all_labels += y.cpu().tolist()
    acc = accuracy_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    return total_loss / len(dataloader), acc, recall, f1

def evaluate(model, dataloader, criterion, device):
    model.eval()
    all_preds, all_labels = [], []
    total_loss = 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            loss = criterion(outputs, y)
            total_loss += loss.item()
            all_preds += outputs.argmax(dim=1).cpu().tolist()
            all_labels += y.cpu().tolist()
    acc = accuracy_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    print(classification_report(all_labels, all_preds))
    return total_loss / len(dataloader), acc, recall, f1


In [7]:

# ✅ 6. Prepare Data
os.makedirs("data/npy", exist_ok=True)

fall_videos = glob.glob("data/video/Y/*/*/*.mp4")
not_fall_videos = glob.glob("data/video/N/N/*/*.mp4")


def get_npy_path_from_video_path(video_path):
    fname = os.path.splitext(os.path.basename(video_path))[0]
    return os.path.join("data/npy", f"{fname}.npy")

extractor = PoseExtractor(num_frames=16)

# 낙상 영상 → .npy 저장
for video_path in fall_videos:
    npy_path = get_npy_path_from_video_path(video_path)
    if not os.path.exists(npy_path):
        keypoints = extractor.extract_keypoints(video_path)
        if keypoints is not None:
            np.save(npy_path, keypoints)
        else:
            print(f"[⚠️] 낙상 실패: {video_path}")

# 정상 영상 → .npy 저장
for video_path in not_fall_videos:
    npy_path = get_npy_path_from_video_path(video_path)
    if not os.path.exists(npy_path):
        keypoints = extractor.extract_keypoints(video_path)
        if keypoints is not None:
            np.save(npy_path, keypoints)
        else:
            print(f"[⚠️] 정상 실패: {video_path}")

# 학습에 사용될 .npy 경로 설정
fall_files = [get_npy_path_from_video_path(p) for p in fall_videos if os.path.exists(get_npy_path_from_video_path(p))]
not_fall_files = [get_npy_path_from_video_path(p) for p in not_fall_videos if os.path.exists(get_npy_path_from_video_path(p))]


all_files = fall_files + not_fall_files
labels = [1]*len(fall_files) + [0]*len(not_fall_files)

train_files, val_files, train_labels, val_labels = train_test_split(
    all_files, labels, test_size=0.2, random_state=42
)

train_dataset = FallDataset(train_files, labels=train_labels)
val_dataset = FallDataset(val_files, labels=val_labels)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


In [8]:
# ✅ 7. Train Model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = FallGRUClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
best_f1 = 0.0
save_path = 'fall_gru_best.pt'

for epoch in range(5):
    train_loss, train_acc, train_recall, train_f1 = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_recall, val_f1 = evaluate(model, val_loader, criterion, device)
    print(f"\n📘 Epoch {epoch+1}")
    print(f"Train | Loss: {train_loss:.4f}, Acc: {train_acc:.2%}, Recall: {train_recall:.2%}, F1: {train_f1:.2%}")
    print(f"Val   | Loss: {val_loss:.4f}, Acc: {val_acc:.2%}, Recall: {val_recall:.2%}, F1: {val_f1:.2%}")
    if val_f1 > best_f1:
        best_f1 = val_f1
        torch.save(model.state_dict(), save_path)
        print(f"✅ Best model saved (F1: {val_f1:.2%})")


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1746512812.797802  114718 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1746512812.818717  114718 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


              precision    recall  f1-score   support

           0       0.00      0.00      0.00         3
           1       0.57      1.00      0.73         4

    accuracy                           0.57         7
   macro avg       0.29      0.50      0.36         7
weighted avg       0.33      0.57      0.42         7


📘 Epoch 1
Train | Loss: 0.4662, Acc: 80.00%, Recall: 100.00%, F1: 88.89%
Val   | Loss: 0.5669, Acc: 57.14%, Recall: 100.00%, F1: 72.73%
✅ Best model saved (F1: 72.73%)
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         3
           1       0.57      1.00      0.73         4

    accuracy                           0.57         7
   macro avg       0.29      0.50      0.36         7
weighted avg       0.33      0.57      0.42         7


📘 Epoch 2
Train | Loss: 0.3674, Acc: 80.00%, Recall: 100.00%, F1: 88.89%
Val   | Loss: 0.6115, Acc: 57.14%, Recall: 100.00%, F1: 72.73%
              precision    recall  f1-sc

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [9]:
# ✅ 8. Inference (from .mp4)
def infer_video_file(model, video_path, device, num_frames=16):
    extractor = PoseExtractor(num_frames)
    keypoints = extractor.extract_keypoints(video_path)
    if keypoints is None:
        print(f"[⚠️] 관절이 감지되지 않아 추론 불가: {video_path}")
        return None
    x = torch.tensor(keypoints, dtype=torch.float32).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(x)
        prob = torch.softmax(output, dim=1).cpu().numpy()[0]
        pred_class = np.argmax(prob)
        return pred_class, prob
    


model = FallGRUClassifier().to(device)
model.load_state_dict(torch.load('fall_gru_best.pt', map_location=device))
model.eval()

#video_path = "data/video/Y/FY/00003_H_A_FY_C1/00003_H_A_FY_C1.mp4"
video_path = "data/video/N/N/00047_H_A_N_C1/00047_H_A_N_C1.mp4"
pred, prob = infer_video_file(model, video_path, device)

if pred is not None:
    print(f"📍 예측 결과: {'낙상' if pred == 1 else '정상'}")
    print(f"→ 클래스 확률 (정상, 낙상): {prob}")
else:
    print("⚠️ 관절 추출 실패로 추론 불가")



W0000 00:00:1746512813.257539  114763 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1746512813.288548  114778 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1746512813.457657  114764 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


📍 예측 결과: 낙상
→ 클래스 확률 (정상, 낙상): [0.16869348 0.8313066 ]


In [None]:
# ✅ 9. Real-Time Inference
def realtime_fall_detection(model, device, num_frames=16):
    pose = mp.solutions.pose.Pose(static_image_mode=False)
    cap = cv2.VideoCapture(0)
    keypoints_sequence = []
    print("🟢 실시간 낙상 감지를 시작합니다. 'q'를 누르면 종료됩니다.")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("❗ 프레임 읽기 실패")
            break
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = pose.process(frame_rgb)
        if result.pose_landmarks:
            keypoints = [val for lm in result.pose_landmarks.landmark for val in (lm.x, lm.y, lm.z, lm.visibility)]
        else:
            keypoints = [0] * 132
        keypoints_sequence.append(keypoints)
        if len(keypoints_sequence) == num_frames:
            input_tensor = torch.tensor([keypoints_sequence], dtype=torch.float32).to(device)
            with torch.no_grad():
                output = model(input_tensor)
                pred = torch.argmax(output, dim=1).item()
                print("📍 예측 결과:", "낙상" if pred == 1 else "정상")
            keypoints_sequence = []
        cv2.imshow("Real-time Fall Detection", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    cap.release()
    cv2.destroyAllWindows()


In [11]:
import cv2
import mediapipe as mp
import numpy as np

def visualize_pose_on_video_debug(video_path, save_dir="vis_frames", num_frames=16):
    os.makedirs(save_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"❌ Failed to open video: {video_path}")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_idxs = np.linspace(0, total_frames - 1, num_frames).astype(int)

    pose = mp.solutions.pose.Pose()
    mp_drawing = mp.solutions.drawing_utils
    mp_style = mp.solutions.drawing_styles

    idx = 0
    for i in range(total_frames):
        ret, frame = cap.read()
        if not ret:
            print(f"⚠️ Frame {i} read failed")
            break

        if i in frame_idxs:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            result = pose.process(rgb)

            if result.pose_landmarks:
                mp_drawing.draw_landmarks(
                    frame, result.pose_landmarks, mp.solutions.pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=mp_style.get_default_pose_landmarks_style()
                )
            else:
                print(f"⚠️ No landmarks detected in frame {i}")

            save_path = os.path.join(save_dir, f"frame_{idx:02d}.jpg")
            success = cv2.imwrite(save_path, frame)
            if not success:
                print(f"❌ Failed to save: {save_path}")
            idx += 1

    cap.release()
    print(f"✅ 저장 완료된 프레임 수: {idx}")



visualize_pose_on_video_debug("data/video/N/N/00047_H_A_N_C1/00047_H_A_N_C1.mp4")


W0000 00:00:1746512843.597399  114870 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1746512843.632767  114881 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


✅ 저장 완료된 프레임 수: 16
