##### 위험 상황(낙상, 화재)을 감지하기 위해 GRU 모델 사용

### 1. 데이터 준비
#### a. 데이터 수집 (낙상 데이터)

In [5]:
##########################
## Mediapipe Pose Point ##
##########################

import os
import cv2
import mediapipe as mp
import concurrent.futures

# Mediapipe 포즈 초기화
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)

def process_and_display_pose(image_path, pose, pointer_color=(0, 0, 255), pointer_thickness=10):
    raw_img = cv2.imread(image_path)
    if raw_img is None:
        raise FileNotFoundError(f"이미지를 찾을 수 없습니다: {image_path}")

    image = cv2.cvtColor(raw_img, cv2.COLOR_BGR2RGB)
    results = pose.process(image)

    if results.pose_landmarks:
        for idx in range(33):
            landmark = results.pose_landmarks.landmark[idx]
            x = int(landmark.x * raw_img.shape[1])
            y = int(landmark.y * raw_img.shape[0])
            cv2.circle(raw_img, (x, y), pointer_thickness, pointer_color, -1)

    # 크기 조정
    height, width = raw_img.shape[:2]
    resize_factor = 0.5
    small_img = cv2.resize(raw_img, (int(width * resize_factor), int(height * resize_factor)))

    return small_img

def save_image(path, image):
    cv2.imwrite(path, image, [int(cv2.IMWRITE_JPEG_QUALITY), 90])

# 이미지 디렉토리 경로 설정
base_Y_path = "../data/Emergency_Data/Validation/Raw_Data/image/Y/"
base_N_path = "../data/Emergency_Data/Validation/Raw_Data/image/N/"
Y_file_tag = ["BY/", "FY/", "SY/"]
N_file_tag = ["N/"]

executor = concurrent.futures.ThreadPoolExecutor()
for tag in N_file_tag:
    file_dir = os.path.join(base_N_path, tag)
    file_list = sorted(os.listdir(file_dir))

    for each_file in file_list:
        img_dir = os.path.join(file_dir, each_file)
        each_img = sorted(os.listdir(img_dir))

        for idx in each_img:
            total_img_path = os.path.join(img_dir, idx)

            try:
                processed_img = process_and_display_pose(total_img_path, pose)
                save_path = "/home/cho/NAHONLAB_data/normal_state_data/"
                os.makedirs(save_path, exist_ok=True)
                save_img_path = os.path.join(save_path, os.path.basename(total_img_path))
                executor.submit(save_image, save_img_path, processed_img)
            except Exception as e:
                print(f"Error processing {total_img_path}: {e}")

        print(f"####### {each_file} file process complete! #######")
executor.shutdown()


I0000 00:00:1733987112.382496    4212 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1733987112.383486   69124 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 Mesa 23.2.1-1ubuntu3.1~22.04.2), renderer: Mesa Intel(R) UHD Graphics (CML GT2)
W0000 00:00:1733987112.471883   69111 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1733987112.502637   69118 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


####### 00005_H_A_N_C1 file process complete! #######
####### 00005_H_A_N_C2 file process complete! #######
####### 00005_H_A_N_C3 file process complete! #######
####### 00005_H_A_N_C4 file process complete! #######
####### 00005_H_A_N_C5 file process complete! #######
####### 00005_H_A_N_C6 file process complete! #######
####### 00005_H_A_N_C7 file process complete! #######
####### 00005_H_A_N_C8 file process complete! #######
####### 00023_H_A_N_C1 file process complete! #######
####### 00023_H_A_N_C2 file process complete! #######
####### 00023_H_A_N_C3 file process complete! #######
####### 00023_H_A_N_C4 file process complete! #######
####### 00023_H_A_N_C5 file process complete! #######
####### 00023_H_A_N_C6 file process complete! #######
####### 00023_H_A_N_C7 file process complete! #######
####### 00023_H_A_N_C8 file process complete! #######
####### 00095_H_A_N_C1 file process complete! #######
####### 00095_H_A_N_C2 file process complete! #######
####### 00095_H_A_N_C3 file 

In [9]:
import os

def count_images_in_directory(directory):
    # 이미지 파일 확장자를 정의
    image_extensions = {'.jpg'}
    count = 0

    try:
        # 디렉토리 내 파일 확인
        for file in os.listdir(directory):
            # 파일 확장자를 확인하고 이미지 파일만 카운트
            if os.path.splitext(file)[1].lower() in image_extensions:
                count += 1
    except FileNotFoundError:
        print(f"Error: Directory '{directory}' not found.")
    except Exception as e:
        print(f"Error: {e}")

    return count

if __name__ == "__main__":
    base_path = "/home/cho/NAHONLAB_data"  # NAHONLAB_data 디렉토리 경로
    falling_data_path = os.path.join(base_path, "falling_data")
    normal_state_data_path = os.path.join(base_path, "normal_state_data")

    # 각각의 디렉토리에서 이미지 파일 개수 확인
    falling_count = count_images_in_directory(falling_data_path)
    normal_state_count = count_images_in_directory(normal_state_data_path)

    print(f"Number of images in 'falling_data': {falling_count} 개")
    print(f"Number of images in 'normal_state_data': {normal_state_count} 개")


Number of images in 'falling_data': 17040 개
Number of images in 'normal_state_data': 5680 개


#### b. 라벨링
- 데이터를 정상 상태(0)와 위험 상태(1)로 라벨링.
- 낙상 데이터: 0 = 정상, 1 = 낙상

In [10]:
import os
import numpy as np
from PIL import Image

def load_and_label_images(directory, label, image_size=(64, 64)):
    """
    디렉토리에서 이미지를 로드하고 라벨링합니다.

    Args:
        directory (str): 이미지 파일이 저장된 디렉토리 경로.
        label (int): 해당 디렉토리의 라벨 (예: 0 또는 1).
        image_size (tuple): 이미지를 리사이즈할 크기 (기본값: 64x64).

    Returns:
        images (list): 이미지 데이터 리스트.
        labels (list): 이미지 라벨 리스트.
    """
    image_extensions = {'.jpg'}
    images = []
    labels = []

    try:
        for file in os.listdir(directory):
            if os.path.splitext(file)[1].lower() in image_extensions:
                # 이미지 로드 및 리사이즈
                image_path = os.path.join(directory, file)
                with Image.open(image_path) as img:
                    img = img.resize(image_size).convert('RGB')
                    images.append(np.array(img))
                    labels.append(label)
    except FileNotFoundError:
        print(f"Error: Directory '{directory}' not found.")
    except Exception as e:
        print(f"Error: {e}")

    return images, labels

if __name__ == "__main__":
    base_path = "/home/cho/NAHONLAB_data/"  # NAHONLAB_data 디렉토리 경로
    falling_data_path = os.path.join(base_path, "falling_data")
    normal_state_data_path = os.path.join(base_path, "normal_state_data")

    # 각 디렉토리에서 이미지와 라벨 로드
    falling_images, falling_labels = load_and_label_images(falling_data_path, label=1)
    normal_images, normal_labels = load_and_label_images(normal_state_data_path, label=0)

    # 데이터와 라벨을 합치기
    all_images = np.array(falling_images + normal_images)
    all_labels = np.array(falling_labels + normal_labels)

    print(f"Total images: {len(all_images)}")
    print(f"Total labels: {len(all_labels)}")

    # 데이터 저장 (예: NumPy 배열로 저장)
    np.save(os.path.join(base_path, "images.npy"), all_images)
    np.save(os.path.join(base_path, "labels.npy"), all_labels)

    print("Data and labels saved successfully!")


Total images: 22720
Total labels: 22720
Data and labels saved successfully!


#### c. 데이터 전처리
- 데이터를 시간 단계(sequence)로 변환. 예를 들어서 한번에 10초동안의 데이터를 하나의 시퀀스로 구성.

In [12]:
import os
import numpy as np

def create_sequences(images, labels, sequence_length):
    """
    이미지를 시퀀스로 구성하고 해당 라벨을 반환합니다.

    Args:
        images (numpy.ndarray): 이미지 데이터 배열 (N, H, W, C).
        labels (numpy.ndarray): 라벨 배열 (N, ).
        sequence_length (int): 한 시퀀스에 포함될 프레임 수.

    Returns:
        sequences (numpy.ndarray): 시퀀스 데이터 배열 (M, sequence_length, H, W, C).
        sequence_labels (numpy.ndarray): 시퀀스 라벨 배열 (M, ).
    """
    num_sequences = len(images) // sequence_length
    sequences = []
    sequence_labels = []

    for i in range(num_sequences):
        start_idx = i * sequence_length
        end_idx = start_idx + sequence_length

        # 시퀀스 데이터와 라벨 생성
        sequence = images[start_idx:end_idx]
        label = labels[start_idx:end_idx]

        sequences.append(sequence)
        # 시퀀스의 라벨은 다수결(가장 빈도 높은 값)로 결정
        sequence_labels.append(np.bincount(label).argmax())

    return np.array(sequences), np.array(sequence_labels)

if __name__ == "__main__":
    base_path = "/home/cho/NAHONLAB_data/"

    # NumPy 데이터 로드
    images = np.load(os.path.join(base_path, "images.npy"))
    labels = np.load(os.path.join(base_path, "labels.npy"))

    sequence_length = 10

    # 시퀀스 생성
    sequences, sequence_labels = create_sequences(images, labels, sequence_length)

    print(f"Total sequences: {len(sequences)}")
    print(f"Sequence shape: {sequences.shape}")
    print(f"Sequence labels shape: {sequence_labels.shape}")

    # 시퀀스 데이터 저장
    np.save(os.path.join(base_path, "sequences.npy"), sequences)
    np.save(os.path.join(base_path, "sequence_labels.npy"), sequence_labels)

    print("Sequences and labels saved successfully!")

Total sequences: 2272
Sequence shape: (2272, 10, 64, 64, 3)
Sequence labels shape: (2272,)
Sequences and labels saved successfully!
