In [243]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import load_model
from stgcn_model import STGCN
import scipy.signal

keypoints = [
    "Point_0", "Point_7", "Point_8", "Point_11", "Point_12", "Point_13",
    "Point_14", "Point_15", "Point_16", "Point_17", "Point_18", "Point_21",
    "Point_22","Point_23", "Point_24", "Point_25", "Point_26", "Point_27",
    "Point_28", "Point_29", "Point_30"
]

def load_json_skeleton(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        data = json.load(f)

    num_frames = len(data["frames"])
    num_joints = len(keypoints)
    num_features = 2  # (x, y)
    num_views = 1

    # ✅ (1, 프레임, 뷰, 관절, 좌표) 형태로 데이터 배열 생성
    X_data = np.zeros((1, num_frames, num_views, num_joints, num_features), dtype=np.float32)

    views = ["view3"]

    # ✅ JSON 데이터 -> 배열 변환
    for frame_idx, frame in enumerate(data["frames"]):
        for view_idx, view in enumerate(views):
            pts = frame.get(view, {}).get("pts", {})
            for joint_idx, joint_name in enumerate(keypoints):
                if joint_name in pts:
                    X_data[0, frame_idx, view_idx, joint_idx, 0] = pts[joint_name]["x"]
                    X_data[0, frame_idx, view_idx, joint_idx, 1] = pts[joint_name]["y"]

    return X_data
        
class PushUpPostureAnalyzer:
    def __init__(self, model):
        """
        ST-GCN 모델을 활용한 푸쉬업 자세 분석기.
        """
        self.model = model
        self.joint_indices = {
            "head": keypoints.index("Point_0"),
            "upper_back": keypoints.index("Point_11"),
            "lower_back": keypoints.index("Point_23"),
            "shoulder": keypoints.index("Point_11"),
            "elbow": keypoints.index("Point_13"),
            "wrist": keypoints.index("Point_15"),
            "left_wrist": keypoints.index("Point_15"),
            "right_wrist": keypoints.index("Point_16"),
            "left_elbow": keypoints.index("Point_13"),
            "right_elbow": keypoints.index("Point_14"),
            "left_hip": keypoints.index("Point_23"),
            "right_hip": keypoints.index("Point_24"),
            "left_knee": keypoints.index("Point_25"),
            "right_knee": keypoints.index("Point_26"),
            "left_ankle": keypoints.index("Point_27"),
            "right_ankle": keypoints.index("Point_28"),
            "chest": keypoints.index("Point_11")  # 가슴 (왼쪽 어깨)
        }

    
    def detect_faulty_posture(self, skeleton_sequence):
        """푸쉬업 동작을 분석하고 잘못된 자세를 감지합니다."""
        predictions = self.model.predict(skeleton_sequence)
        predicted_label = np.argmax(predictions, axis=-1)[0]
        confidence = predictions[0][predicted_label]
        
        # ✅ 결과 저장
        if predicted_label == 0:
            print(f"✅ 올바른 자세 ({confidence * 100:.2f}% 확신)")
        else:
            print(f"❌ 잘못된 자세 감지 ({confidence * 100:.2f}% 확신)")
                
        faults = {}
        
        # ✅ 2. 뷰 차원이 1이면 squeeze() 적용
        if skeleton_sequence.shape[2] == 1:
            skeleton_sequence = np.squeeze(skeleton_sequence, axis=2)  # (batch, frames, joints, features)
            
        if predicted_label == 1:  # 잘못된 자세로 분류된 경우
            faults["척추"] = self.check_neutral_spine(skeleton_sequence)
            # faults["팔꿈치"] = self.check_elbow_angle(skeleton_sequence)
            faults["가슴"] = self.check_chest_movement(skeleton_sequence)
            faults["손 위치"] = self.check_hand_position(skeleton_sequence)
            faults["머리 정렬"] = self.check_head_alignment(skeleton_sequence)
        
        return {k: v for k, v in faults.items() if v is not None}
    
    def check_neutral_spine(self, skeleton_sequence):
        spine_joints = [self.joint_indices['upper_back'], self.joint_indices['lower_back']]
    
        upper_back = skeleton_sequence[..., spine_joints[0], :]
        lower_back = skeleton_sequence[..., spine_joints[1], :]
    
        spine_vector = upper_back - lower_back
        spine_angle = np.arctan2(spine_vector[..., 1], spine_vector[..., 0]) * (180 / np.pi)
    
        avg_spine_angle = np.mean(spine_angle)
        # std_spine_angle = np.std(spine_angle)
    
        # print(f"평균 척추 각도: {avg_spine_angle:.2f}, 표준 편차: {std_spine_angle:.2f}")
    
        # ✅ 허용 범위 확대 (±20 → ±25), 표준 편차 기준 완화 (7 → 10)
        threshold_angle = 15  # 허용되는 최대 각도 차이
        # threshold_std = 10  # 허용되는 표준 편차
        min_faulty_frames_ratio = 0.3  # 최소 30% 프레임 이상 벗어나야 경고
    
        # ✅ 몇 개의 프레임이 기준을 벗어났는지 계산
        faulty_frames = np.sum(spine_angle > threshold_angle)
        faulty_ratio = faulty_frames / 16
    
        # print(f"기준 초과 프레임 비율: {faulty_ratio:.2f}")
    
        # ✅ 전체 프레임 중 30% 이상이 기준을 벗어난 경우에만 경고
        if faulty_ratio > min_faulty_frames_ratio and avg_spine_angle > threshold_angle:
            return "척추가 중립적이지 않습니다. 허리를 곧게 펴세요."
    
        return None

    # def check_elbow_angle(self, skeleton_sequence):
    #     """팔꿈치가 최저점에서 90도를 이루는지 확인합니다."""
    #     elbow_joints = [self.joint_indices['shoulder'], self.joint_indices['elbow'], self.joint_indices['wrist']]
    #     elbow_angles = self.calculate_joint_angle(skeleton_sequence, elbow_joints)
        
    #     if np.min(elbow_angles) > 100:
    #         return "팔꿈치가 충분히 구부러지지 않았습니다. 90도까지 구부리세요."
    #     return None

    def check_chest_movement(self, skeleton_sequence):
        """푸쉬업 중 가슴이 충분히 아래로 내려가는지 확인합니다."""
        chest_index = self.joint_indices['chest']

        # ✅ 1. 전체 프레임 수 계산
        num_frames = skeleton_sequence.shape[1]
        # start_frame = int(num_frames * 0.2)  # 10% 지점 (푸쉬업 시작 구간 제외)
        # end_frame = int(num_frames * 0.8)  # 90% 지점 (푸쉬업 끝 구간 제외)
    
        # ✅ 3. 푸쉬업 동작 중 가슴 높이(Y좌표)만 가져오기
        chest_positions = skeleton_sequence[:, :, chest_index, 1]  # Y좌표(높이) 추출
    
        # ✅ 4. 데이터 스무딩 적용 (Moving Average)
        chest_positions_smoothed = scipy.signal.savgol_filter(chest_positions, window_length=5, polyorder=2, axis=1)
    
        # ✅ 5. 이상치 제거: 하위 10% 백분위수를 `min_height`로 사용
        min_height = np.percentile(chest_positions_smoothed, 10)  
        max_height = np.percentile(chest_positions_smoothed, 90)  # 최대 높이
        # median_height = np.median(chest_positions_smoothed)  # 중앙값 (수정된 비교 기준)
        movement_range = max_height - min_height  # 가슴이 이동한 거리
    
        # ✅ 6. 기준값을 조정 (기존 15% → 12%)
        threshold = np.median(chest_positions_smoothed) * 0.04 
    
        # print(f"가슴 높이 변화: {movement_range:.3f}, 허용 기준: {threshold:.3f}")
    
        # ✅ 9. 최소 65%의 프레임이 기준을 넘으면 정상으로 판단
        if movement_range < threshold:
            return "가슴이 충분히 내려가지 않았습니다. 몸을 더 낮추세요."
    
        return None

    def check_hand_position(self, skeleton_sequence):
        """손의 위치가 가슴과 일직선상에 있는지 확인합니다."""
        
        wrist_indices = self.joint_indices['left_wrist']
        chest_index = self.joint_indices['chest']
        
        # ✅ 손목과 가슴의 X좌표 가져오기
        hand_positions = skeleton_sequence[:, :, wrist_indices, 0]  # (batch, frames)
        chest_position = skeleton_sequence[:, :, chest_index, 0]  # (batch, frames)
        
        # ✅ 각 프레임별 손-가슴 정렬 차이 계산 (평균값을 먼저 내지 않음)
        hand_misalignment_per_frame = np.abs(hand_positions - chest_position)
        
        # ✅ 모든 프레임에서 평균 오차 계산
        avg_hand_misalignment = np.mean(hand_misalignment_per_frame)
        
        if avg_hand_misalignment > 0.03:
            return "손이 가슴과 정렬되지 않았습니다. 손의 위치를 조정하세요."
    
        return None
        
    def check_head_alignment(self, skeleton_sequence):
        """머리가 바르게 정렬되어 있는지 확인합니다."""
        head_index = self.joint_indices['head']
        neck_index = self.joint_indices['upper_back']
    
        # ✅ Head와 Neck 위치 비교 (Y축 및 X축 차이 계산)
        head_y_movement = np.abs(skeleton_sequence[:, :, head_index, 1] - skeleton_sequence[:, :, neck_index, 1])
    
        # ✅ 머리 움직임 스무딩 적용 (노이즈 제거)
        head_y_movement_smoothed = scipy.signal.savgol_filter(head_y_movement, window_length=5, polyorder=2, axis=1)

        # ✅ 머리가 너무 위아래로 흔들린 경우 감지 (기준: Y축 차이가 0.1 초과)
        head_y_misalignment_ratio = np.sum(head_y_movement_smoothed > 0.1) / head_y_movement.shape[1]
    
        # print(f"머리 전방 기울기 비율: {head_forward_ratio:.2f}, 머리 상하 움직임 비율: {head_y_misalignment_ratio:.2f}")
    
        # ✅ 60% 이상의 프레임에서 머리 정렬이 틀어졌다면 오류 발생
        if head_y_misalignment_ratio > 0.6:
            return "머리 위치가 올바르지 않습니다. 머리를 중립적으로 유지하세요."
    
        return None
    
    def provide_feedback(self, skeleton_sequence):
        """감지된 자세 오류를 기반으로 실시간 피드백을 제공합니다."""
        faults = self.detect_faulty_posture(skeleton_sequence)
        
        if not faults:
            return "측정 불가"
        
        feedback = "다음 사항을 수정하세요: "
        for key, message in faults.items():
            feedback += f"\n- {message}"
        
        return feedback

In [244]:
# num_joints = 21  # 사용 중인 관절 개수
# num_features = 2  # (x, y)
# num_classes = 2  # (올바른 자세 / 잘못된 자세)
# adjacency_matrix_norm = np.load("adjacency_matrix.npy")

# model = STGCN(num_joints, num_features, adjacency_matrix_norm, num_classes)

# dummy_input = np.random.rand(1, 10, num_joints, num_features).astype(np.float32)
# model(dummy_input)

# model.load_weights("D:/Studying/gradu/stgcn_model6.weights.h5")


In [245]:
analyzer = PushUpPostureAnalyzer(model)
count = 561
file_path = "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/body_v-3-561.json"
skeleton_sequence = load_json_skeleton(file_path)
feedback = analyzer.provide_feedback(skeleton_sequence)
print(feedback)

file_paths = [
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-2-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-3-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-4-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-5-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-6-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-7-561.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-562.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-563.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-564.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-565.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-566.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-567.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-568.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-569.json",
    "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-570.json",
]

for file_path in file_paths:
    print(file_path)
    skeleton_sequence = load_json_skeleton(file_path)
    feedback = analyzer.provide_feedback(skeleton_sequence)
    print(feedback)
    count += 1
    file_path = file_path.replace(str(count - 1), str(count))
    print()
    print()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
✅ 올바른 자세 (71.16% 확신)
측정 불가
D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-561.json
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
✅ 올바른 자세 (66.93% 확신)
측정 불가


D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-2-561.json
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
✅ 올바른 자세 (61.21% 확신)
측정 불가


D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-3-561.json
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
✅ 올바른 자세 (69.01% 확신)
측정 불가


D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-4-561.json
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
✅ 올바른 자세 (69.68% 확신)
측정 불가


D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-5-561.json
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
✅ 올바른 자세 (55.63% 확신)
측정 불가


D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/

In [246]:
# def predict_multiple_json_skeleton(file_paths):
#     results = {}

#     for file_path in file_paths:
#         try:
#             # ✅ JSON 데이터 로드
#             X_data = load_json_skeleton(file_path)
            
#             # ✅ 모델 예측
#             prediction = model.predict(X_data)
            
#             # ✅ 예측 결과 처리
#             predicted_class = np.argmax(prediction, axis=-1)[0]
#             confidence = prediction[0][predicted_class]
            

#             # ✅ 결과 저장
#             if predicted_class == 0:
#                 results[file_path] = f"✅ 올바른 자세 ({confidence * 100:.2f}% 확신)"
#             else:
#                 results[file_path] = f"❌ 잘못된 자세 감지 ({confidence * 100:.2f}% 확신)"

#         except Exception as e:
#             results[file_path] = f"❌ 예측 실패 (오류: {e})"

#     return results



# # ✅ 여러 개의 JSON 파일 리스트
# file_paths = [
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-2-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-3-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-4-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-5-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-6-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-7-561.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-562.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-563.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-564.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-565.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-566.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-567.json",
#     "D:/Studying/gradu/013.피트니스자세/2.Validation/검증데이터/16/body_v-1-568.json",
# ]

# # ✅ 예측 결과 얻기
# prediction_results = predict_multiple_json_skeleton(file_paths)

# # ✅ 결과 출력
# for file, result in prediction_results.items():
#     print(f"{file}: {result}")