# 1. 동영상에서 관절 좌표 추출 함수

In [1]:
from ultralytics import YOLO
import cv2
import numpy as np

# YOLOv8 Pose 모델 로드
model = YOLO('/home/heechun/final_ws/PoseEstimate/final_pose_esti/yolov8n-pose.pt')

# 고정된 시퀀스 길이 설정 (예: 30 프레임)
fixed_time_steps = 15
data_sequences = []
label_sequences = []

def extract_keypoints(video_path):
    cap = cv2.VideoCapture(video_path)
    keypoints_sequence = []  # 시퀀스 데이터를 저장할 리스트

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # 프레임 크기 확인
        height, width, _ = frame.shape

        # YOLOv8 Pose 모델을 이용해 관절 좌표 추출
        results = model(frame)
        keypoints = results[0].keypoints.xy  # 각 관절의 x, y 좌표 추출

        # 좌표를 하나의 벡터로 변환하여 저장 (예: [x1, y1, x2, y2, ..., x17, y17])
        if len(keypoints) > 0:  # keypoints가 비어 있지 않은지 확인
            keypoints_flat = []
            for kp in keypoints[0]:
                x = kp[0] / width  # x 좌표를 width로 나눠 정규화
                y = kp[1] / height  # y 좌표를 height로 나눠 정규화
                keypoints_flat.extend([x, y])
            
            keypoints_sequence.append(keypoints_flat)

    cap.release()
    return keypoints_sequence


# 2. 모든 클래스별 좌표 시퀀스를 수집 및 레이블링

In [2]:
# 각 클래스의 데이터 수집
classes = {
    "standing": ["/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/standing_front.MOV",
                  "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/standing_left.MOV",
                  "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/standing_right.MOV"],
    "running": ["/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/running_front.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/running_left.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/running_right.MOV"],
    "walking": ["/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/walking_front.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/walking_left.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/walking_right.MOV"],
    "sitting": ["/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/sitting_front.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/sitting_left.MOV",
                "/home/heechun/final_ws/PoseEstimate/1105_pose_estimate_1st/data_video/sitting_right.MOV"]    
}


# 각 비디오 파일에 대해 시퀀스 생성
for label, video_paths in classes.items():
    for video_path in video_paths:
        keypoints_sequence = extract_keypoints(video_path)
        
        # 시퀀스를 고정된 길이로 나누기
        for i in range(0, len(keypoints_sequence) - fixed_time_steps + 1, fixed_time_steps):
            data_seq = keypoints_sequence[i:i + fixed_time_steps]
            data_sequences.append(data_seq)
            label_sequences.append(label)  # 시퀀스에 대한 레이블 추가



0: 640x384 1 person, 32.7ms
Speed: 2.1ms preprocess, 32.7ms inference, 372.3ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 7.5ms
Speed: 2.1ms preprocess, 7.5ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 7.6ms
Speed: 1.2ms preprocess, 7.6ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 6.7ms
Speed: 1.2ms preprocess, 6.7ms inference, 1.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 6.6ms
Speed: 1.4ms preprocess, 6.6ms inference, 2.1ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 5.4ms
Speed: 1.1ms preprocess, 5.4ms inference, 3.9ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 5.2ms
Speed: 1.3ms preprocess, 5.2ms inference, 1.5ms postprocess per image at shape (1, 3, 640, 384)

0: 640x384 1 person, 5.2ms
Speed: 1.5ms preprocess, 5.2ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 384)

0: 

In [3]:
import pandas as pd

# data_sequences와 label_sequences를 DataFrame으로 변환
data_df = pd.DataFrame([item for seq in data_sequences for item in seq])  # 각 시퀀스의 데이터를 평탄화하여 저장
labels_df = pd.DataFrame(label_sequences, columns=['label'])

# 파일로 저장
data_df.to_csv('/home/heechun/final_ws/PoseEstimate/final_pose_esti/data_frame/data_sequences.csv', index=False)
labels_df.to_csv('/home/heechun/final_ws/PoseEstimate/final_pose_esti/data_frame/label_sequences.csv', index=False)


# 3 파일 불러와서 학습 진행

In [1]:
import pandas as pd
import numpy as np
import re

fixed_time_steps = 15

# CSV 파일 불러오기
data_df = pd.read_csv('/home/heechun/final_ws/PoseEstimate/final_pose_esti/data_frame/data_sequences.csv')
labels_df = pd.read_csv('/home/heechun/final_ws/PoseEstimate/final_pose_esti/data_frame/label_sequences.csv')

# 숫자 부분만 추출하는 함수 정의
def extract_numeric(value):
    # tensor(숫자) 형식에서 숫자만 추출
    match = re.search(r"tensor\(([\d.]+)", str(value))
    return float(match.group(1)) if match else np.nan

# 데이터 프레임 전체에 함수 적용
data_df = data_df.applymap(extract_numeric)

# 필요한 형식으로 변환
data_sequences = data_df.values.reshape(-1, fixed_time_steps, 34)  # 시퀀스의 개수 x 시퀀스 길이 x 특징 수
label_sequences = labels_df['label'].values  # 라벨 데이터

# 확인
print("Data shape:", data_sequences.shape)
print("Labels shape:", label_sequences.shape)


  data_df = data_df.applymap(extract_numeric)


Data shape: (1010, 15, 34)
Labels shape: (1010,)


In [2]:
data_sequences.shape, label_sequences.shape

((1010, 15, 34), (1010,))

In [3]:
data_df.shape, labels_df.shape

((15150, 34), (1010, 1))

In [4]:
data_df.head()
data_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15150 entries, 0 to 15149
Data columns (total 34 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   0       15148 non-null  float64
 1   1       15148 non-null  float64
 2   2       15148 non-null  float64
 3   3       15148 non-null  float64
 4   4       15148 non-null  float64
 5   5       15148 non-null  float64
 6   6       15148 non-null  float64
 7   7       15148 non-null  float64
 8   8       15148 non-null  float64
 9   9       15148 non-null  float64
 10  10      15148 non-null  float64
 11  11      15148 non-null  float64
 12  12      15148 non-null  float64
 13  13      15148 non-null  float64
 14  14      15148 non-null  float64
 15  15      15148 non-null  float64
 16  16      15148 non-null  float64
 17  17      15148 non-null  float64
 18  18      15148 non-null  float64
 19  19      15148 non-null  float64
 20  20      15148 non-null  float64
 21  21      15148 non-null  float64
 22

In [5]:
# NaN 값이 있는지 확인
print(np.isnan(data_sequences).any())


True


In [6]:
data_sequences = np.nan_to_num(data_sequences)


In [7]:
print(np.isnan(data_sequences).any())

False


In [8]:
data_sequences.shape

(1010, 15, 34)

LSTM 일반

In [12]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional, Dropout, BatchNormalization
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
import os

# 레이블 인코딩
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(label_sequences)

# 데이터 분할 (학습/검증 데이터)
X_train, X_val, y_train, y_val = train_test_split(data_sequences, encoded_labels, test_size=0.2, random_state=42)

# LSTM 모델 정의
model = Sequential([
    LSTM(64, input_shape=(X_train.shape[1], X_train.shape[2]), return_sequences=True),
    Dropout(0.3),
    LSTM(64, return_sequences=False),
    Dropout(0.3),
    Dense(64, activation='relu'),
    BatchNormalization(),
    Dense(32, activation='relu'),
    Dense(len(set(encoded_labels)), activation='softmax')
])

# 모델 컴파일
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 로그 디렉토리 설정
log_dir = "/home/heechun/final_ws/PoseEstimate/final_pose_esti/logs_final"
os.makedirs(log_dir, exist_ok=True)

# TensorBoard 콜백 설정
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

# ModelCheckpoint 콜백 설정
checkpoint_best = ModelCheckpoint(
    '/home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model/best_model.keras',
    monitor='val_loss',              # 검증 손실 기준으로 최고 모델 갱신
    save_best_only=True,             # 최상의 모델만 저장
    mode='min',                      # 손실이 최소일 때 최적
    verbose=1
)

# 모델 학습
history = model.fit(
    X_train, y_train,
    epochs=200,
    batch_size=32,
    validation_data=(X_val, y_val),
    callbacks=[checkpoint_best, tensorboard_callback]  # TensorBoard 콜백 추가
)

# 최종 모델 저장
model.save('/home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model/final_model.keras')


I0000 00:00:1730873980.355243  810063 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1730873980.356135  810063 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1730873980.356277  810063 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1730873980.356837  810063 cuda_executor.cc:1015] successful NUMA node read from SysFS ha

Epoch 1/200
[1m18/26[0m [32m━━━━━━━━━━━━━[0m[37m━━━━━━━[0m [1m0s[0m 6ms/step - accuracy: 0.2982 - loss: 1.4810
Epoch 1: val_loss improved from inf to 1.38328, saving model to /home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model/best_model.keras
[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 21ms/step - accuracy: 0.2878 - loss: 1.4887 - val_accuracy: 0.2525 - val_loss: 1.3833
Epoch 2/200
[1m21/26[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m0s[0m 5ms/step - accuracy: 0.3566 - loss: 1.3412
Epoch 2: val_loss improved from 1.38328 to 1.35143, saving model to /home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model/best_model.keras
[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step - accuracy: 0.3591 - loss: 1.3365 - val_accuracy: 0.2871 - val_loss: 1.3514
Epoch 3/200
[1m21/26[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m0s[0m 5ms/step - accuracy: 0.4188 - loss: 1.2194
Epoch 3: val_loss improved from 1.35143 to 1.26383,

LSTM 양방향

In [9]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Bidirectional, Dropout, BatchNormalization
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.callbacks import ModelCheckpoint, TensorBoard
import os

# 레이블 인코딩
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(label_sequences)

# 데이터 분할 (학습/검증 데이터)
X_train, X_val, y_train, y_val = train_test_split(data_sequences, encoded_labels, test_size=0.2, random_state=42)

# 양방향 LSTM 모델 정의
model = Sequential([
    Bidirectional(LSTM(64, return_sequences=True), input_shape=(X_train.shape[1], X_train.shape[2])),
    Dropout(0.3),
    Bidirectional(LSTM(64, return_sequences=False)),
    Dropout(0.3),
    Dense(64, activation='relu'),
    BatchNormalization(),
    Dense(32, activation='relu'),
    Dense(len(set(encoded_labels)), activation='softmax')
])

# 모델 컴파일
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# 로그 디렉토리 설정
log_dir = "/home/heechun/final_ws/PoseEstimate/final_pose_esti/logs_final_bidirectional"
os.makedirs(log_dir, exist_ok=True)

# TensorBoard 콜백 설정
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

# ModelCheckpoint 콜백 설정
checkpoint_best = ModelCheckpoint(
    '/home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model_bidirectional/best_model.keras',
    monitor='val_loss',              # 검증 손실 기준으로 최고 모델 갱신
    save_best_only=True,             # 최상의 모델만 저장
    mode='min',                      # 손실이 최소일 때 최적
    verbose=1
)

# 모델 학습
history = model.fit(
    X_train, y_train,
    epochs=200,
    batch_size=32,
    validation_data=(X_val, y_val),
    callbacks=[checkpoint_best, tensorboard_callback]  # TensorBoard 콜백 추가
)

# 최종 모델 저장
model.save('/home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model_bidirectional/final_model.keras')


2024-11-06 17:58:58.803777: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-11-06 17:58:58.817816: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-11-06 17:58:58.823505: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-06 17:58:58.833634: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
I0000 00:00:1730883540.460075 1344164 cuda_executor.c

Epoch 1/200


2024-11-06 17:59:05.128058: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:531] Loaded cuDNN version 8907


[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step - accuracy: 0.2585 - loss: 1.5643
Epoch 1: val_loss improved from inf to 1.37551, saving model to /home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model_bidirectional/best_model.keras
[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 44ms/step - accuracy: 0.2592 - loss: 1.5622 - val_accuracy: 0.3119 - val_loss: 1.3755
Epoch 2/200
[1m25/26[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 19ms/step - accuracy: 0.3661 - loss: 1.3786
Epoch 2: val_loss improved from 1.37551 to 1.32355, saving model to /home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model_bidirectional/best_model.keras
[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - accuracy: 0.3680 - loss: 1.3751 - val_accuracy: 0.3465 - val_loss: 1.3236
Epoch 3/200
[1m25/26[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 18ms/step - accuracy: 0.4608 - loss: 1.1712
Epoch 3: val_loss improved from 

In [13]:
!tensorboard --logdir=/home/heechun/final_ws/PoseEstimate/final_pose_esti/logs_final

I0000 00:00:1730874043.054596  835087 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1730874043.083239  835087 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1730874043.083430  835087 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355

NOTE: Using experimental fast data loading logic. To disable, pass
    "--load_fast=false" and repo

# 예측코드 1 - 웹캠

In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder

# YOLOv8 Pose 모델 로드
yolo_model = YOLO('/home/heechun/final_ws/PoseEstimate/final_pose_esti/yolov8n-pose.pt')

# 학습된 LSTM 모델 로드
lstm_model = tf.keras.models.load_model('/home/heechun/final_ws/PoseEstimate/final_pose_esti/pose_model/best_model.keras')

# LabelEncoder 객체 생성 및 클래스에 맞게 fit
label_encoder = LabelEncoder()
label_encoder.fit(["standing", "running", "walking", "sitting"])

# 고정된 시퀀스 길이 설정 (예: 30 프레임)
fixed_time_steps = 15
sequence = deque(maxlen=fixed_time_steps)  # 고정된 길이의 deque로 시퀀스 관리

# 관절 표시 여부를 결정하는 변수 (True: 표시, False: 비표시)
show_keypoints = True

# 웹캠 열기
cap = cv2.VideoCapture(0)  # 0은 기본 웹캠, 다른 번호는 추가 웹캠

# 프레임을 읽어 관절 좌표 추출 및 전처리
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 크기 가져오기
    height, width, _ = frame.shape

    # YOLOv8 Pose 모델을 이용해 관절 좌표 추출
    results = yolo_model(frame)
    keypoints = results[0].keypoints.xy.cpu().numpy() if results and results[0].keypoints else []

    # 좌표를 하나의 벡터로 변환 (예: [x1, y1, x2, y2, ..., x17, y17])
    keypoints_flat = []
    if len(keypoints) > 0:
        for kp in keypoints[0]:
            x = kp[0] / width  # x 좌표 정규화
            y = kp[1] / height  # y 좌표 정규화
            keypoints_flat.extend([x, y])

        # 관절 표시가 켜져 있는 경우 프레임에 표시
        if show_keypoints:
            for kp in keypoints[0]:
                cv2.circle(frame, (int(kp[0]), int(kp[1])), 3, (0, 0, 255), -1)

    # 시퀀스에 추가 (고정된 길이 유지)
    if keypoints_flat:
        sequence.append(keypoints_flat)

    # 시퀀스 길이가 고정된 길이에 도달하면 예측 수행
    if len(sequence) == fixed_time_steps:
        input_sequence = np.array(sequence).reshape(1, fixed_time_steps, 34)  # (1, 30, 34) 형태로 입력

        # 예측
        prediction = lstm_model.predict(input_sequence)
        predicted_class = np.argmax(prediction, axis=1)[0]

        # 예측 결과 표시
        class_name = label_encoder.inverse_transform([predicted_class])[0]  # 예측 클래스명 디코딩
        cv2.putText(frame, f"Predicted: {class_name}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # 프레임 표시
    cv2.imshow("Webcam Prediction", frame)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 웹캠 및 창 닫기
cap.release()
cv2.destroyAllWindows()


# 정면, 다른방향 보는지 실시간 웹캠 검출 코드

In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder

# YOLOv8 Pose 모델 로드
yolo_model = YOLO('/home/heechun/final_ws/PoseEstimate/yolov8n-pose.pt')

# 학습된 LSTM 모델 로드
lstm_model = tf.keras.models.load_model('/home/heechun/final_ws/PoseEstimate/1106_upgrade_model/no_lying/best_model.keras')

# LabelEncoder 객체 생성 및 클래스에 맞게 fit
label_encoder = LabelEncoder()
label_encoder.fit(["standing", "running", "walking", "sitting"])

# 고정된 시퀀스 길이 설정 (예: 30 프레임)
fixed_time_steps = 15
sequence = deque(maxlen=fixed_time_steps)  # 고정된 길이의 deque로 시퀀스 관리

# 관절 표시 여부를 결정하는 변수 (True: 표시, False: 비표시)
show_keypoints = True

# 웹캠 열기
cap = cv2.VideoCapture(0)  # 0은 기본 웹캠, 다른 번호는 추가 웹캠

# 프레임을 읽어 관절 좌표 추출 및 전처리
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 크기 가져오기
    height, width, _ = frame.shape

    # YOLOv8 Pose 모델을 이용해 관절 좌표 추출
    results = yolo_model(frame)
    keypoints = results[0].keypoints.xy.cpu().numpy() if results and results[0].keypoints else []

    # 좌표를 하나의 벡터로 변환 (예: [x1, y1, x2, y2, ..., x17, y17])
    keypoints_flat = []
    facing_forward = True  # 기본값을 정면(True)으로 설정
    if len(keypoints) > 0:
        for kp in keypoints[0]:
            x = kp[0] / width  # x 좌표 정규화
            y = kp[1] / height  # y 좌표 정규화
            keypoints_flat.extend([x, y])
            if x == 0 and y == 0:  # [0,0] 좌표가 있으면 다른 방향으로 간주
                facing_forward = False

        # 관절 표시가 켜져 있는 경우 프레임에 표시
        if show_keypoints:
            for kp in keypoints[0]:
                cv2.circle(frame, (int(kp[0]), int(kp[1])), 3, (0, 0, 255), -1)

    # 시퀀스에 추가 (고정된 길이 유지)
    if keypoints_flat:
        sequence.append(keypoints_flat)

    # 시퀀스 길이가 고정된 길이에 도달하면 예측 수행
    if len(sequence) == fixed_time_steps:
        input_sequence = np.array(sequence).reshape(1, fixed_time_steps, 34)  # (1, 30, 34) 형태로 입력

        # 예측
        prediction = lstm_model.predict(input_sequence)
        predicted_class = np.argmax(prediction, axis=1)[0]

        # 예측 결과 표시
        class_name = label_encoder.inverse_transform([predicted_class])[0]  # 예측 클래스명 디코딩
        direction_text = "Facing Forward" if facing_forward else "Not Facing Forward"
        cv2.putText(frame, f"Predicted: {class_name}, {direction_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # 프레임 표시
    cv2.imshow("Webcam Prediction", frame)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 웹캠 및 창 닫기
cap.release()
cv2.destroyAllWindows()


# 검출 결과 - 필터링 해서 자주 바뀌지 않게, 좀 후처리로 몇개이상 쌓이면 그중 평균값이 결과로 들어가는 방식 활용하면 될듯하고
# 뛰고, 걷는 경우에는 정면을 바라보고 있을때 차량은 멈춤 - 나를 향해서 오고있으니 로봇 입장에선 조심히 하는게 최선
# 나머지 경우에는 피해가면 됨
# 특이 케이스로 - 쓰러져있는 클래스가 검출이 되면 - 신고하거나 메시지 표현하는데 - 이건 보류

running walking일때만 방향정보 표시하게 코드 수정 

In [None]:
import cv2
import numpy as np
from collections import deque
from ultralytics import YOLO
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder

# YOLOv8 Pose 모델 로드
yolo_model = YOLO('/home/heechun/final_ws/PoseEstimate/yolov8n-pose.pt')

# 학습된 LSTM 모델 로드
lstm_model = tf.keras.models.load_model('/home/heechun/final_ws/PoseEstimate/1106_upgrade_model/no_lying/best_model.keras')

# LabelEncoder 객체 생성 및 클래스에 맞게 fit
label_encoder = LabelEncoder()
label_encoder.fit(["standing", "running", "walking", "sitting"])

# 고정된 시퀀스 길이 설정 (예: 30 프레임)
fixed_time_steps = 15
sequence = deque(maxlen=fixed_time_steps)  # 고정된 길이의 deque로 시퀀스 관리

# 관절 표시 여부를 결정하는 변수 (True: 표시, False: 비표시)
show_keypoints = True

# 웹캠 열기
cap = cv2.VideoCapture(0)  # 0은 기본 웹캠, 다른 번호는 추가 웹캠

# 프레임을 읽어 관절 좌표 추출 및 전처리
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 크기 가져오기
    height, width, _ = frame.shape

    # YOLOv8 Pose 모델을 이용해 관절 좌표 추출
    results = yolo_model(frame)
    keypoints = results[0].keypoints.xy.cpu().numpy() if results and results[0].keypoints else []

    # 좌표를 하나의 벡터로 변환 (예: [x1, y1, x2, y2, ..., x17, y17])
    keypoints_flat = []
    facing_forward = True  # 기본값을 정면(True)으로 설정
    if len(keypoints) > 0:
        for kp in keypoints[0]:
            x = kp[0] / width  # x 좌표 정규화
            y = kp[1] / height  # y 좌표 정규화
            keypoints_flat.extend([x, y])
            if x == 0 and y == 0:  # [0,0] 좌표가 있으면 다른 방향으로 간주
                facing_forward = False

        # 관절 표시가 켜져 있는 경우 프레임에 표시
        if show_keypoints:
            for kp in keypoints[0]:
                cv2.circle(frame, (int(kp[0]), int(kp[1])), 3, (0, 0, 255), -1)

    # 시퀀스에 추가 (고정된 길이 유지)
    if keypoints_flat:
        sequence.append(keypoints_flat)

    # 시퀀스 길이가 고정된 길이에 도달하면 예측 수행
    if len(sequence) == fixed_time_steps:
        input_sequence = np.array(sequence).reshape(1, fixed_time_steps, 34)  # (1, 30, 34) 형태로 입력

        # 예측
        prediction = lstm_model.predict(input_sequence)
        predicted_class = np.argmax(prediction, axis=1)[0]

        # 예측 결과 표시
        class_name = label_encoder.inverse_transform([predicted_class])[0]  # 예측 클래스명 디코딩

        # `running` 또는 `walking`일 때만 방향 정보 표시
        if class_name in ["running", "walking"]:
            direction_text = "Facing Forward" if facing_forward else "Not Facing Forward"
            cv2.putText(frame, f"Predicted: {class_name}, {direction_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        else:
            cv2.putText(frame, f"Predicted: {class_name}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # 프레임 표시
    cv2.imshow("Webcam Prediction", frame)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 웹캠 및 창 닫기
cap.release()
cv2.destroyAllWindows()


필터링 적용 - deque에 5개 예측 저장해서 보정

검출 - class결과 최근 5개 예측 저장해서 보정
방향 - 최근 5개 방향을 저장하여 보정

In [None]:
import cv2
import numpy as np
from collections import deque, Counter
from ultralytics import YOLO
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder

# YOLOv8 Pose 모델 로드
yolo_model = YOLO('/home/heechun/final_ws/PoseEstimate/yolov8n-pose.pt')

# 학습된 LSTM 모델 로드
lstm_model = tf.keras.models.load_model('/home/heechun/final_ws/PoseEstimate/1106_upgrade_model/no_lying/best_model.keras')

# LabelEncoder 객체 생성 및 클래스에 맞게 fit
label_encoder = LabelEncoder()
label_encoder.fit(["standing", "running", "walking", "sitting"])

# 고정된 시퀀스 길이 설정 (예: 30 프레임)
fixed_time_steps = 15
sequence = deque(maxlen=fixed_time_steps)  # 고정된 길이의 deque로 시퀀스 관리

# 예측 결과의 일관성을 위한 예측 결과 저장 deque
prediction_history = deque(maxlen=10)  # 최근 5개 예측을 저장하여 보정
direction_history = deque(maxlen=10)  # 최근 5개 방향을 저장하여 보정

# 관절 표시 여부를 결정하는 변수 (True: 표시, False: 비표시)
show_keypoints = True

# 웹캠 열기
cap = cv2.VideoCapture(0)  # 0은 기본 웹캠, 다른 번호는 추가 웹캠

# 프레임을 읽어 관절 좌표 추출 및 전처리
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 크기 가져오기
    height, width, _ = frame.shape

    # YOLOv8 Pose 모델을 이용해 관절 좌표 추출
    results = yolo_model(frame)
    keypoints = results[0].keypoints.xy.cpu().numpy() if results and results[0].keypoints else []

    # 좌표를 하나의 벡터로 변환 (예: [x1, y1, x2, y2, ..., x17, y17])
    keypoints_flat = []
    facing_forward = True  # 기본값을 정면(True)으로 설정
    if len(keypoints) > 0:
        for kp in keypoints[0]:
            x = kp[0] / width  # x 좌표 정규화
            y = kp[1] / height  # y 좌표 정규화
            keypoints_flat.extend([x, y])
            if x == 0 and y == 0:  # [0,0] 좌표가 있으면 다른 방향으로 간주
                facing_forward = False

        # 관절 표시가 켜져 있는 경우 프레임에 표시
        if show_keypoints:
            for kp in keypoints[0]:
                cv2.circle(frame, (int(kp[0]), int(kp[1])), 3, (0, 0, 255), -1)

    # 시퀀스에 추가 (고정된 길이 유지)
    if keypoints_flat:
        sequence.append(keypoints_flat)

    # 시퀀스 길이가 고정된 길이에 도달하면 예측 수행
    if len(sequence) == fixed_time_steps:
        input_sequence = np.array(sequence).reshape(1, fixed_time_steps, 34)  # (1, 15, 34) 형태로 입력

        # 예측
        prediction = lstm_model.predict(input_sequence)
        predicted_class = np.argmax(prediction, axis=1)[0]
        predicted_class_name = label_encoder.inverse_transform([predicted_class])[0]

        # 예측 결과를 deque에 추가
        prediction_history.append(predicted_class_name)

        # 예측 결과의 모드(최빈값)를 사용하여 결과 필터링
        final_class_name = Counter(prediction_history).most_common(1)[0][0]

        # 방향 정보도 보정하여 사용
        direction_history.append(facing_forward)
        final_direction = Counter(direction_history).most_common(1)[0][0]

        # `running` 또는 `walking`일 때만 방향 정보 표시
        if final_class_name in ["running", "walking"]:
            direction_text = "Facing Forward" if final_direction else "Not Facing Forward"
            cv2.putText(frame, f"Predicted: {final_class_name}, {direction_text}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        else:
            cv2.putText(frame, f"Predicted: {final_class_name}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    # 프레임 표시
    cv2.imshow("Webcam Prediction", frame)

    # 'q' 키를 누르면 종료
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 웹캠 및 창 닫기
cap.release()
cv2.destroyAllWindows()
