In [3]:
import numpy as np
import torch
import cv2
import mediapipe as mp
from scipy.spatial import distance

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


### mediapipe로 낙상 감지

In [None]:
# MediaPipe 초기화
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

# 랜드마크 인덱스 정의 (예: 코, 왼쪽 어깨, 오른쪽 어깨 등)
LANDMARKS = [0, 11, 12, 15, 16, 23, 24, 25, 26, 27, 28]  # 총 11개 랜드마크

# Threshold 값 정의
threshold_normal = 10.5   # 일반 상태로 간주되는 속도 임계값
threshold_danger = 15.5   # 위험 상태로 간주되는 속도 임계값

def calculate_head_upper_body_speed(keypoints, prev_keypoints):
    h = np.array([keypoints[0, 0], keypoints[0, 1]])   # 머리 좌표
    l = np.array([keypoints[11, 0], keypoints[11, 1]])  # 왼쪽 어깨 좌표
    r = np.array([keypoints[12, 0], keypoints[12, 1]])  # 오른쪽 어깨 좌표

    # 이전 프레임의 좌표가 없는 경우 속도는 0으로 설정
    if prev_keypoints is None:
        return 0.0

    prev_h = np.array([prev_keypoints[0, 0], prev_keypoints[0, 1]])
    prev_l = np.array([prev_keypoints[11, 0], prev_keypoints[11, 1]])
    prev_r = np.array([prev_keypoints[12, 0], prev_keypoints[12, 1]])

    # 현재 프레임과 이전 프레임의 상체 중심 계산
    center_new = (h + l + r) / 3
    center_prev = (prev_h + prev_l + prev_r) / 3

    # 유클리드 거리 계산 (속도)
    speed = distance.euclidean(center_new, center_prev)
    return speed

# 클래스 이름 정의
class_names = {0: 'Normal', 1: 'Fall', 2: 'Danger'}

def calculate_and_draw_bbox(frame, landmarks):
    x_coordinates = landmarks[:, 0]
    y_coordinates = landmarks[:, 1]
    
    x1 = max(0, int(np.min(x_coordinates)))
    y1 = max(0, int(np.min(y_coordinates)))
    x2 = min(frame.shape[1], int(np.max(x_coordinates)))
    y2 = min(frame.shape[0], int(np.max(y_coordinates)))
    
    bbox_width = x2 - x1
    bbox_height = y2 - y1
    
    # 바운딩 박스를 조금 더 넓게 조정 (각 방향으로 패딩 추가)
    padding = 50
    x1 = max(0, x1 - padding)
    y1 = max(0, y1 - padding)
    x2 = min(frame.shape[1], x2 + padding)
    y2 = min(frame.shape[0], y2 + padding)

    return (x1, y1, x2, y2), bbox_width, bbox_height

# 낙상 감지 함수
def detect_fall(frame, landmarks, prev_landmarks, fall_frame_counter):
    global determine_fall
    
    if determine_fall:
        return 1, fall_frame_counter
    
    speed = calculate_head_upper_body_speed(landmarks, prev_landmarks)
    _, bbox_width, bbox_height = calculate_and_draw_bbox(frame, landmarks)
    bbox_ratio = bbox_width / bbox_height if bbox_height != 0 else float('inf')
    
    if speed < threshold_normal and bbox_ratio < 0.7:
        bbox_class = 0  # Normal
    elif speed >= threshold_danger or bbox_ratio > 1:
        bbox_class = 1  # Fall
    else:
        bbox_class = 2  # Danger
            
    print(f"Speed: {speed}, bbox_ratio: {bbox_ratio}")
    print(f"Predicted class: {bbox_class}")
    
    # Fall_counter 업데이트
    if bbox_class == 1:
        fall_frame_counter += 1
        if fall_frame_counter >= 10:
            determine_fall = True
    else:
        fall_frame_counter = 0

    return bbox_class, fall_frame_counter

video_path = "D:\\041.낙상사고 위험동작 영상-센서 쌍 데이터\\3.개방데이터\\1.데이터\\Validation\\01.원천데이터\\VS\\영상\\N\\N\\01228_O_E_N_C7\\01228_O_E_N_C7.mp4"  
cap = cv2.VideoCapture(video_path)

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_path = 'C:\\Users\\user\\Desktop\\prj_sample_vid\\mediapipe_sample_4.mp4'
out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

# 프레임 처리 루프 
fall_frame_counter = 0
determine_fall = False
prev_landmarks = None

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results_pose = pose.process(rgb_frame)

    if results_pose.pose_landmarks:
        landmarks = np.array([[lm.x * width, lm.y * height, lm.z] for lm in results_pose.pose_landmarks.landmark])
       
        label, fall_frame_counter = detect_fall(frame, landmarks, prev_landmarks, fall_frame_counter)  
        print(f"Predicted Class: {label}")  

        # 바운딩 박스와 라벨 그리기 
        bbox, _, _ = calculate_and_draw_bbox(frame, landmarks)
        color = (0, 255, 0) if label == 0 else ((0, 255, 255) if label == 2 else (0, 0, 255)) 
        cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
        class_name = class_names[label]
        cv2.putText(frame, f'Class: {class_name}', (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        if determine_fall: 
            cv2.putText(frame, 'FALL', (10, 30), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 3)
        # 랜드마크 표시 
        mp_drawing.draw_landmarks(frame, results_pose.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        prev_landmarks = landmarks

    # 프레임 저장 및 출력 
    out.write(frame) 
    cv2.imshow('Fall Detection', frame) 
    if cv2.waitKey(1) & 0xFF == ord('q'):
         break

cap.release()
out.release()
cv2.destroyAllWindows()



Speed: 0.0, bbox_ratio: 0.4220314735336195
Predicted class: 0
Predicted Class: 0
Speed: 7.015447515145465, bbox_ratio: 0.45982142857142855
Predicted class: 0
Predicted Class: 0
Speed: 6.027793584328462, bbox_ratio: 0.4740853658536585
Predicted class: 0
Predicted Class: 0
Speed: 6.282396916298011, bbox_ratio: 0.4546875
Predicted class: 0
Predicted Class: 0
Speed: 6.1077580584347615, bbox_ratio: 0.43213728549141966
Predicted class: 0
Predicted Class: 0
Speed: 5.62019822793649, bbox_ratio: 0.4041204437400951
Predicted class: 0
Predicted Class: 0
Speed: 6.113938325964901, bbox_ratio: 0.3614649681528662
Predicted class: 0
Predicted Class: 0
Speed: 4.859646713604942, bbox_ratio: 0.34455128205128205
Predicted class: 0
Predicted Class: 0
Speed: 4.1484307461734184, bbox_ratio: 0.31758957654723124
Predicted class: 0
Predicted Class: 0
Speed: 2.871652617446551, bbox_ratio: 0.2719869706840391
Predicted class: 0
Predicted Class: 0
Speed: 5.268915408994188, bbox_ratio: 0.23366013071895425
Predicted 

## 웹캠으로 mediapipe만을 사용하여 낙상 감지
* MacOS + iphone

In [None]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)

In [None]:
# MediaPipe 초기화
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

# 랜드마크 인덱스 정의
LANDMARKS = [0, 11, 12, 15, 16, 23, 24, 25, 26, 27, 28]  # 총 11개 랜드마크

# Threshold 값 정의
threshold_normal = 10.5   
threshold_danger = 15.5   

def calculate_head_upper_body_speed(keypoints, prev_keypoints):
    h = np.array([keypoints[0, 0], keypoints[0, 1]])   
    l = np.array([keypoints[11, 0], keypoints[11, 1]])  
    r = np.array([keypoints[12, 0], keypoints[12, 1]])  

    # 이전 프레임의 좌표가 없는 경우 속도는 0으로 설정
    if prev_keypoints is None:
        return 0.0

    prev_h = np.array([prev_keypoints[0, 0], prev_keypoints[0, 1]])
    prev_l = np.array([prev_keypoints[11, 0], prev_keypoints[11, 1]])
    prev_r = np.array([prev_keypoints[12, 0], prev_keypoints[12, 1]])

    # 현재 프레임과 이전 프레임의 상체 중심 계산
    center_new = (h + l + r) / 3
    center_prev = (prev_h + prev_l + prev_r) / 3

    # 유클리드 거리 계산 (속도)
    speed = distance.euclidean(center_new, center_prev)
    return speed

# 클래스 이름 정의
class_names = {0: 'Normal', 1: 'Fall', 2: 'Danger'}

def calculate_and_draw_bbox(frame, landmarks):
    x_coordinates = landmarks[:, 0]
    y_coordinates = landmarks[:, 1]
    
    x1 = max(0, int(np.min(x_coordinates)))
    y1 = max(0, int(np.min(y_coordinates)))
    x2 = min(frame.shape[1], int(np.max(x_coordinates)))
    y2 = min(frame.shape[0], int(np.max(y_coordinates)))
    
    bbox_width = x2 - x1
    bbox_height = y2 - y1
    
    # 바운딩 박스를 조금 더 넓게 조정 (각 방향으로 패딩 추가)
    padding = 50
    x1 = max(0, x1 - padding)
    y1 = max(0, y1 - padding)
    x2 = min(frame.shape[1], x2 + padding)
    y2 = min(frame.shape[0], y2 + padding)

    return (x1, y1, x2, y2), bbox_width, bbox_height

# 낙상 감지 함수
def detect_fall(frame, landmarks, prev_landmarks, fall_frame_counter):
    global determine_fall
    
    if determine_fall:
        return 1, fall_frame_counter
    
    speed = calculate_head_upper_body_speed(landmarks, prev_landmarks)
    _, bbox_width, bbox_height = calculate_and_draw_bbox(frame, landmarks)
    bbox_ratio = bbox_width / bbox_height if bbox_height != 0 else float('inf')
    
    if speed < threshold_normal and bbox_ratio < 0.7:
        bbox_class = 0  # Normal
    elif speed >= threshold_danger or bbox_ratio > 1:
        bbox_class = 1  # Fall
    else:
        bbox_class = 2  # Danger
            
    print(f"Speed: {speed}, bbox_ratio: {bbox_ratio}")
    print(f"Predicted class: {bbox_class}")
    
    # Fall_counter 업데이트
    if bbox_class == 1:
        fall_frame_counter += 1
        if fall_frame_counter >= 10:
            determine_fall = True
    else:
        fall_frame_counter = 0

    return bbox_class, fall_frame_counter

# 웹캠 초기화
cap = cv2.VideoCapture(0)  

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 30  

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out_path = 'webcam_fall_detection.mp4'
out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

# 프레임 처리 루프 
fall_frame_counter = 0
determine_fall = False
prev_landmarks = None

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results_pose = pose.process(rgb_frame)

    if results_pose.pose_landmarks:
        landmarks = np.array([[lm.x * width, lm.y * height, lm.z] for lm in results_pose.pose_landmarks.landmark])
       
        label, fall_frame_counter = detect_fall(frame, landmarks, prev_landmarks, fall_frame_counter)  
        print(f"Predicted Class: {label}")  

        # 바운딩 박스와 라벨 그리기 
        bbox, _, _ = calculate_and_draw_bbox(frame, landmarks)
        color = (0, 255, 0) if label == 0 else ((0, 255, 255) if label == 2 else (0, 0, 255)) 
        cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
        class_name = class_names[label]
        cv2.putText(frame, f'Class: {class_name}', (bbox[0], bbox[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)

        if determine_fall: 
            cv2.putText(frame, 'FALL', (10, 30), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 3)
        # 랜드마크 표시 
        mp_drawing.draw_landmarks(frame, results_pose.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        prev_landmarks = landmarks

    # 프레임 저장 및 출력 
    out.write(frame) 
    cv2.imshow('Fall Detection', frame) 
    if cv2.waitKey(1) & 0xFF == ord('q'):
         break

cap.release()
out.release()
cv2.destroyAllWindows()
cv2.waitKey()
cv2.waitKey(1)