In [1]:
import os
import cv2
import torch
import numpy as np
import torch.nn as nn
import mediapipe as mp
import warnings
from ultralytics import YOLO
from tqdm import tqdm
from torch.nn.functional import softmax
from scipy.spatial import distance

In [None]:
# YOLOv8s model 호출
yolo_model = YOLO('D:\\Falldown\\code-git\\runs\\detect\\human_fall_s\\weights\\best.pt')

# MediaPipe Pose 호출
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.3) # 학습 시 데이터와 동일한 임계값 설정
mp_drawing = mp.solutions.drawing_utils

# 랜드마크 인덱스 정의 # 11개
LANDMARKS = [0, 11, 12, 15, 16, 23, 24, 25, 26, 27, 28]

In [3]:
# GRU 기반 낙상 감지 모델 정의
class FallDetectionGRU(nn.Module):
    def __init__(self, input_size, hidden_size=128, num_layers=2, num_classes=3):
        super(FallDetectionGRU, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h_0 = torch.zeros(2, x.size(0), 128).to(x.device)  # 초기 은닉 상태 정의
        out, _ = self.gru(x, h_0)
        out = self.fc(out[:, -1, :])  # 마지막 time step의 출력을 사용
        return out

def load_gru_model(model_path):
    # 모델을 생성할 때 올바른 input_size를 사용
    if "full" in model_path:
        input_size = 25
    elif "simplified" in model_path:
        input_size = 3
    elif "mediapipe" in model_path:
        input_size = 22
    else:
        raise ValueError("Unknown model type in filename. Please check the model name.")

    # FallDetectionGRU 모델 생성 및 가중치 로드
    gru_model = FallDetectionGRU(input_size=input_size, hidden_size=128, num_layers=2, num_classes=3)
    gru_model.load_state_dict(torch.load(model_path, map_location='cpu'))
    gru_model.eval()

    return gru_model, input_size

def resize_and_pad_frame(frame, target_size=(640, 640)):
    h, w = frame.shape[:2]
    scale = min(target_size[0] / w, target_size[1] / h)
    new_w, new_h = int(w * scale), int(h * scale)
    resized_frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
    pad_w, pad_h = (target_size[0] - new_w) // 2, (target_size[1] - new_h) // 2
    padded_frame = cv2.copyMakeBorder(resized_frame, pad_h, target_size[1] - new_h - pad_h,
                                      pad_w, target_size[0] - new_w - pad_w, cv2.BORDER_CONSTANT, value=[128, 128, 128])
    return padded_frame, new_w, new_h, pad_w, pad_h

In [None]:
def calculate_yolo_xy_ratio(bbox):
    if len(bbox) > 0 and (bbox[2] - bbox[0]) != 0:
        return round((bbox[3] - bbox[1]) / (bbox[2] - bbox[0]), 3)
    return 0.0

def bbox_ratio_class(ratio):
    return 0 if ratio < 0.7 else 1 # 0.7 이상은 Normal일 확률이 높음

# 상체 속력 계산
def calculate_head_upper_body_speed(sequence):
    speeds = []
    for j in range(1, len(sequence)):
        keypoints = sequence[j]
        prev_keypoints = sequence[j - 1]
        h = np.array([keypoints[0][0], keypoints[0][1]])  # 머리
        l = np.array([keypoints[11][0], keypoints[11][1]])  # 좌측 어깨
        r = np.array([keypoints[12][0], keypoints[12][1]])  # 우측 어깨

        prev_h = np.array([prev_keypoints[0][0], prev_keypoints[0][1]])
        prev_l = np.array([prev_keypoints[11][0], prev_keypoints[11][1]])
        prev_r = np.array([prev_keypoints[12][0], prev_keypoints[12][1]])

        center_new = (h + l + r) / 3
        center_prev = (prev_h + prev_l + prev_r) / 3

        speeds.append(distance.euclidean(center_new, center_prev))

    return sum(speeds) / len(speeds) if speeds else 0.0

# 추가적인 특징을 계산하는 함수
def calculate_additional_features(bbox, joint_sequence):
    yolo_xy_ratio = calculate_yolo_xy_ratio(bbox)
    ratio_class = bbox_ratio_class(yolo_xy_ratio)
    speed = calculate_head_upper_body_speed(joint_sequence)
    return yolo_xy_ratio, ratio_class, speed

In [5]:
# 바운딩 박스 크기 조절(20% 확대)
def adjust_bbox(bbox, scale_factor, frame_shape):
    x1, y1, x2, y2 = bbox
    width = x2 - x1
    height = y2 - y1
    center_x = (x1 + x2) / 2
    center_y = (y1 + y2) / 2
    
    new_width = width * scale_factor
    new_height = height * scale_factor
    
    new_x1 = max(0, int(center_x - new_width / 2))
    new_y1 = max(0, int(center_y - new_height / 2))
    new_x2 = min(int(center_x + new_width / 2), frame_shape[1])  # 우측 하단 x 좌표를 조정, 이미지 폭(frame_shape[1]) 초과 방지
    new_y2 = min(int(center_y + new_height / 2), frame_shape[0])  # 우측 하단 y 좌표를 조정, 이미지 높이(frame_shape[0]) 초과 방지
    
    return [new_x1, new_y1, new_x2, new_y2]

In [None]:
# 비디오 프레임을 처리하는 함수
def process_frame(frame, yolo_model, pose, target_size):
    joint_sequence = []
    resized_frame, new_width, new_height, pad_w, pad_h = resize_and_pad_frame(frame, target_size)
    try:
        results = yolo_model(resized_frame, verbose=False)
    except Exception as e:
        tqdm.write(f"Error during YOLO inference: {e}")
        return frame, joint_sequence, None

    # Yolo 시행 후 bbox 도출
    bbox = []
    # 바운딩 박스 추출 및 크기 조정
    for result in results[0].boxes:
        x1, y1, x2, y2 = map(int, result.xyxy[0])
        adjusted_bbox = adjust_bbox([x1, y1, x2, y2], scale_factor=1.2, frame_shape=resized_frame.shape)
        bbox.append((adjusted_bbox[0], adjusted_bbox[1], adjusted_bbox[2], adjusted_bbox[3]))

    # Mediapipe 시행 후 joint_sequence 도출
    joint_coords = []
    if len(bbox) > 0:
        person_image = resized_frame[bbox[0][1]:bbox[0][3], bbox[0][0]:bbox[0][2]]
        person_image_rgb = cv2.cvtColor(person_image, cv2.COLOR_BGR2RGB)
        results_pose = pose.process(person_image_rgb)

        if results_pose.pose_landmarks:
            for landmark in results_pose.pose_landmarks.landmark:
                # 조정된 바운딩 박스 내의 좌표를 전체 프레임의 좌표로 변환
                global_x = bbox[0][0] + landmark.x * (bbox[0][2] - bbox[0][0])
                global_y = bbox[0][1] + landmark.y * (bbox[0][3] - bbox[0][1])
                joint_coords.append((global_x, global_y))
            joint_sequence.append(joint_coords)

    return results_pose, bbox, joint_coords, joint_sequence, new_width, new_height, pad_w, pad_h

In [None]:
# GRU 모델에 input_features를 생성하는 함수
def generate_input_features(bbox, joint_coords, joint_sequence, input_size):
    input_features = []

    if input_size == 25 and len(joint_coords) >= 11 and len(bbox) > 0:
        joint_array = []
        for landmark_idx in LANDMARKS:
            if landmark_idx < len(joint_coords):
                x, y = joint_coords[landmark_idx]
                joint_array.extend([x, y])
            else:
                joint_array.extend([0.0, 0.0])

        yolo_xy_ratio, ratio_class, speed = calculate_additional_features(bbox[0], joint_sequence)
        input_features = np.array(joint_array)
        input_features = np.concatenate((input_features, [yolo_xy_ratio, ratio_class, speed]))

        if len(input_features) != 25:
            raise ValueError(f"Expected input_features length to be 25, but got {len(input_features)}. Please check the feature extraction.")

    elif input_size == 22 and len(joint_coords) >= 11:
        joint_array = []
        for landmark_idx in LANDMARKS:
            if landmark_idx < len(joint_coords):
                x, y = joint_coords[landmark_idx]
                joint_array.extend([x, y])
            else:
                joint_array.extend([0.0, 0.0])
        input_features = np.array(joint_array)

        if len(input_features) != 22:
            raise ValueError(f"Expected input_features length to be 22, but got {len(input_features)}. Please check the joint coordinates.")

    elif input_size == 3 and len(bbox) > 0:
        yolo_xy_ratio, ratio_class, speed = calculate_additional_features(bbox[0], joint_sequence)
        input_features = np.array([yolo_xy_ratio, ratio_class, speed])

        if len(input_features) != 3:
            raise ValueError(f"Expected input_features length to be 3, but got {len(input_features)}. Please check the bbox features.")

    else:
        input_features = None

    return input_features

In [None]:
# 바운딩 박스와 포즈 결과를 시각화하는 함수
def visualize_results(frame, bbox, previous_pred_class, results_pose, new_width, new_height, pad_w, pad_h, original_width, original_height):
    color_map = {
        0: (0, 255, 0),      # Normal: 초록색
        1: (0, 165, 255),    # Danger: 주황색
        2: (0, 0, 255)       # Fall: 빨간색
    }
    label_map = {
        0: 'Normal',    # 비낙상
        1: 'Danger',    # 낙상 위험
        2: 'Fall'       # 완전 낙상
    }
    color = color_map.get(previous_pred_class, (255, 255, 255))  # 기본값은 흰색
    label_text = label_map.get(previous_pred_class, 'Unknown')   # 기본값은 'Unknown'

    # Yolo bbox 시각화
    for adjusted_bbox in bbox:
        x1, y1, x2, y2 = adjusted_bbox
        original_x1 = (x1 - pad_w) * (original_width / new_width)
        original_y1 = (y1 - pad_h) * (original_height / new_height)
        original_x2 = (x2 - pad_w) * (original_width / new_width)
        original_y2 = (y2 - pad_h) * (original_height / new_height)

        label = f"Class: {label_text}"
        cv2.rectangle(frame, (int(original_x1), int(original_y1)), (int(original_x2), int(original_y2)), color, thickness=3)
        cv2.putText(frame, label, (int(original_x1), int(original_y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, thickness=3)

    # Mediapipe 관절 좌표 시각화
    if results_pose and results_pose.pose_landmarks:
        for landmark in results_pose.pose_landmarks.landmark:
            global_x = adjusted_bbox[0] + landmark.x * (adjusted_bbox[2] - adjusted_bbox[0])
            global_y = adjusted_bbox[1] + landmark.y * (adjusted_bbox[3] - adjusted_bbox[1])

            original_landmark_x = (global_x - pad_w) * (original_width / new_width)
            original_landmark_y = (global_y - pad_h) * (original_height / new_height)

            cv2.circle(frame, (int(original_landmark_x), int(original_landmark_y)), radius=3, color=(0, 255, 255), thickness=-1)

    return frame

In [None]:
# 비디오를 처리하는 주 함수
def process_video(video_path, output_path, model_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    target_size = (640, 640)
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (original_width, original_height)) # 원본 해상도로 저장하기 위해 VideoWriter를 원본 크기로 설정
    gru_model, input_size = load_gru_model(model_path)

    frame_idx = 0
    joint_sequence = []  # 속도 계산을 위함
    previous_pred_class = -1  # 이전 프레임의 예측 클래스

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            if frame_idx >= total_frames: # EOF에 도달한 경우
                tqdm.write("End of video stream. Processing completed.")  # tqdm의 write 사용
                break
            else: # 프레임 누락 오류
                tqdm.write(f"Warning: Failed to read frame {frame_idx}. Skipping to the next frame.")
                frame_idx += 1  # 누락된 프레임을 건너뛰기 위해 증가
                continue

        # 매 6번째 프레임마다 처리
        if frame_idx % 6 == 0:
            results_pose, bbox, joint_coords, joint_sequence, new_width, new_height, pad_w, pad_h = process_frame(frame, yolo_model, pose, target_size) # input 값 생성 및 모델 계산은 640x640 기준
            if joint_sequence:
                if bbox:
                    input_features = generate_input_features(bbox, joint_coords, joint_sequence, input_size)
                    if input_features is not None:
                        # 입력 텐서의 형태: [input_size] -> [1, 1, input_size]로 변환
                        input_tensor = torch.tensor(input_features, dtype=torch.float32).unsqueeze(0).unsqueeze(0) # [1, 1, input_size]
                        output = gru_model(input_tensor)
                        pred_class = torch.argmax(softmax(output, dim=1), dim=1).item()
                        previous_pred_class = pred_class # 이전 예측 클래스 업데이트
                else:
                    pred_class = previous_pred_class # 감지되지 않는 경우 이전 예측 클래스를 유지

        # 바운딩 박스와 포즈 시각화
        frame = visualize_results(frame, bbox, previous_pred_class, results_pose, new_width, new_height, pad_w, pad_h, original_width, original_height)
        out.write(frame)
        
        if frame_idx % 120 == 0: # 120 프레임마다 진행 상태 출력
            tqdm.write(f"Processing {os.path.basename(video_path)}: Frame {frame_idx}/{total_frames}")
        frame_idx += 1
        
    cap.release()
    out.release()

In [18]:
model_group = ['best_fall_detection_gru_001_full.pt', 'best_fall_detection_gru_001_mediapipe.pt',
               'best_fall_detection_gru_0001_simplified.pt'] # , 'best_fall_detection_cnn_mediapipe_0001.pt']

video_group = ['D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\00028_H_A_FY_C1.mp4', #1
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\00130_H_A_FY_C2.mp4', #2
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\00712_H_D_BY_C3.mp4', #3
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\02900_Y_C_BY_C4.mp4', #4
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\02087_H_A_SY_C5.mp4', #5
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\01757_Y_E_SY_C6.mp4', #6
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\00690_H_D_N_C7.mp4', #7
               'D:\\Falldown\\Dataset\\Video_Dataset\\Video\Test\\00799_O_E_N_C8.mp4'  #8
]

output_directory = 'D:\\Falldown\\code-git'

# Video 샘플 처리
for model_path in model_group:
    for video_path in video_group:
        try:
            suffix = f"_{model_path.split('_')[3]}_{model_path.split('_')[5].replace('.pt', '')}.mp4" if '_gru_' in model_path else f"_{model_path.split('_')[3]}.mp4"
            video_name, _ = os.path.splitext(os.path.basename(video_path)) # 비디오 파일명 추출
            output_filename = f"{video_name}{suffix}"
            output_path = os.path.join(output_directory, output_filename)

            process_video(video_path, output_path, model_path)  # 비디오 처리 함수 호출
            tqdm.write(f"✅ {video_name} 처리 완료!")
        except Exception as e:
            tqdm.write(f"❌ {video_name} 처리 중 오류 발생: {e}")

Processing 00028_H_A_FY_C1.mp4: Frame 0/600


KeyboardInterrupt: 