# Model Error - Phân loại lỗi trong golf swing sử dụng MLP
Mô hình sử dụng keypoint detection (MediaPipe) để trích xuất đặc trưng chuyển động (Joint Angles, Displacements, Velocities), mean pooling theo phase, và MLP để phân loại lỗi multi-label.

In [None]:
import cv2
import numpy as np
import json
import os
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
import mediapipe as mp

# Đường dẫn tới thư mục dữ liệu
DATA_PATH = r'C:\Users\USER\Desktop\golf_swing\golf_dataset'
VIDEO_PATH = os.path.join(DATA_PATH, 'videos')
ANNOTATION_PATH = os.path.join(DATA_PATH, 'annotations')
POSSIBLE_ERRORS_PATH = os.path.join(DATA_PATH, 'possible_errors.json')

# Khởi tạo MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)

# Hàm trích xuất tổng số frame từ video
def get_total_frames(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f'Error: Cannot open video file {video_path}')
        return 0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return total_frames

# Hàm trích xuất keypoint từ frame
def extract_keypoints(frame):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)
    if results.pose_landmarks:
        keypoints = np.array([[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]).flatten()
        return keypoints if keypoints.size else np.zeros(33 * 3)  # 33 keypoints, 3 coords each
    return np.zeros(33 * 3)

# Hàm tính Joint Angles (giả lập đơn giản, cần cải tiến thực tế)
def calculate_joint_angles(keypoints):
    if len(keypoints) < 33 * 3:
        return np.zeros(3)  # 3 góc chính
    angles = np.random.rand(3) * 180  # Giả lập tạm thời
    return angles

# Hàm tính Frame-wise Motion Features
def calculate_motion_features(keypoints_seq):
    motion_features = []
    for i in range(1, len(keypoints_seq)):
        prev_keypoints = keypoints_seq[i-1]
        curr_keypoints = keypoints_seq[i]
        displacements = curr_keypoints - prev_keypoints
        velocities = displacements  # Giả định thời gian 1 đơn vị
        joint_angles = calculate_joint_angles(curr_keypoints)
        frame_feature = np.concatenate([displacements, velocities, joint_angles])
        motion_features.append(frame_feature)
    return np.array(motion_features)

# Hàm trích xuất frame, keypoint, và nhãn lỗi
def extract_frames_and_error_labels(video_file, annotation_file, phase_classifier):
    frames = []
    keypoints_seq = []
    labels = []
    video_path = os.path.join(VIDEO_PATH, video_file)
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        print(f'Error: Cannot open video file {video_path}')
        return np.array([]), np.array([]), np.array([])
    
    try:
        with open(annotation_file, 'r') as f:
            annotations = json.load(f)
    except FileNotFoundError:
        print(f'Error: Annotation file {annotation_file} not found')
        return np.array([]), np.array([]), np.array([])
    except json.JSONDecodeError:
        print(f'Error: Invalid JSON format in {annotation_file}')
        return np.array([]), np.array([]), np.array([])
    
    total_frames = get_total_frames(video_path)
    frame_count = 0
    labeled_frames = 0
    
    while cap.isOpened() and frame_count < total_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(cv2.resize(frame, (128, 128)))
        keypoints = extract_keypoints(frame)
        keypoints_seq.append(keypoints)
        
        frame_input = cv2.resize(frame, (128, 128)) / 255.0
        frame_input = np.expand_dims(frame_input, axis=0)  # (1, 128, 128, 3)
        frame_input_flat = frame_input.reshape(1, 1, 128 * 128 * 3)  # Flatten cho BiLSTM
        
        phase_pred = phase_classifier.predict(frame_input_flat, verbose=0)
        phase = list(possible_errors.keys())[np.argmax(phase_pred)]
        
        label = np.zeros(num_errors)
        for phase_key, info in annotations.items():
            try:
                if phase_key == phase and info['start_frame'] <= frame_count <= info['end_frame']:
                    for error in info['errors']:
                        if error in error_to_index:
                            label[error_to_index[error]] = 1
                    labeled_frames += 1
            except KeyError:
                print(f'Error: Invalid phase data in {annotation_file} for phase {phase_key}')
                continue
        labels.append(label)
        frame_count += 1
    
    cap.release()
    print(f'Info: {labeled_frames}/{total_frames} frames labeled in {video_file}')
    if labeled_frames == 0:
        print(f'Error: No frames labeled in {video_file}')
    
    motion_features = calculate_motion_features(keypoints_seq)
    return np.array(frames), motion_features, np.array(labels)

# Hàm Mean Pooling theo phase
def mean_pooling_by_phase(motion_features, phase_labels, total_frames):
    phase_to_label = {'setup': 0, 'backswing': 1, 'downswing': 2, 'follow-through': 3}
    phase_features = {label: [] for label in range(4)}
    
    for i in range(total_frames - 1):  # -1 vì motion_features thiếu frame đầu
        if i < len(motion_features):
            phase = np.argmax(phase_labels[i + 1]) if i + 1 < len(phase_labels) else 0
            phase_features[phase].append(motion_features[i])
    
    pooled_features = []
    feature_size = motion_features.shape[1] if motion_features.size else 3 * 33 + 3  # Kích thước đặc trưng
    for label in range(4):
        if phase_features[label]:
            pooled_feature = np.mean(phase_features[label], axis=0)
            pooled_features.append(pooled_feature)
        else:
            pooled_features.append(np.zeros(feature_size))  # Default zero vector
    return np.array(pooled_features)

# Load toàn bộ dữ liệu
def load_error_dataset(phase_classifier):
    all_frames = []
    all_motion_features = []
    all_labels = []
    
    if not os.path.exists(VIDEO_PATH):
        print(f'Error: Directory {VIDEO_PATH} does not exist')
        return np.array([]), np.array([]), np.array([])
    
    video_files = [f for f in os.listdir(VIDEO_PATH) if f.endswith('.mp4')]
    if not video_files:
        print(f'Error: No .mp4 files found in {VIDEO_PATH}')
        return np.array([]), np.array([]), np.array([])
    
    for video_file in video_files:
        video_path = os.path.join(VIDEO_PATH, video_file)
        annotation_file = os.path.join(ANNOTATION_PATH, f'{video_file[:-4]}_error.json')
        if not os.path.exists(annotation_file):
            print(f'Error: No annotation file for {video_file}')
            continue
        frames, motion_features, labels = extract_frames_and_error_labels(video_path, annotation_file, phase_classifier)
        if len(frames) > 0 and len(motion_features) > 0 and len(labels) > 0:
            all_frames.append(frames)
            all_motion_features.append(motion_features)
            all_labels.append(labels)
        else:
            print(f'Warning: No valid frames, motion features, or labels for {video_file}')
    
    if not all_frames:
        print('Error: No valid data loaded')
        return np.array([]), np.array([]), np.array([])
    
    phase_labels = np.argmax(np.concatenate([l for l in all_labels]), axis=1)
    all_motion_features = np.concatenate(all_motion_features)
    pooled_features = mean_pooling_by_phase(all_motion_features, phase_labels, len(phase_labels))
    
    return np.concatenate(all_frames), pooled_features, np.concatenate(all_labels)

# Load possible errors và tạo từ điển ánh xạ
try:
    with open(POSSIBLE_ERRORS_PATH, 'r') as f:
        possible_errors = json.load(f)
except FileNotFoundError:
    print(f'Error: {POSSIBLE_ERRORS_PATH} not found')
    exit()
except json.JSONDecodeError:
    print(f'Error: Invalid JSON format in {POSSIBLE_ERRORS_PATH}')
    exit()

error_to_index = {}
index = 0
for phase, errors in possible_errors.items():
    for error in errors:
        error_to_index[error] = index
        index += 1
num_errors = len(error_to_index)

def main():
    # Load mô hình phase classifier
    try:
        phase_classifier = load_model('phase_classifier.h5')
    except FileNotFoundError:
        print('Error: phase_classifier.h5 not found. Run model_phase.py first.')
        return

    # Chuẩn bị dữ liệu
    frames, pooled_features, labels = load_error_dataset(phase_classifier)
    if len(frames) == 0 or len(pooled_features) == 0:
        print('Exiting due to no valid data')
        return
    
    # Đảm bảo pooled_features có cùng số mẫu với labels
    num_samples = min(len(pooled_features), len(labels))
    X_train, X_test, y_train, y_test = train_test_split(pooled_features[:num_samples], labels[:num_samples], test_size=0.2, random_state=42)

    # Xây dựng mô hình MLP
    model = Sequential([
        Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
        Dropout(0.5),
        Dense(64, activation='relu'),
        Dropout(0.3),
        Dense(num_errors, activation='sigmoid')  # Multi-label output
    ])

    model.compile(optimizer=Adam(learning_rate=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

    # Huấn luyện mô hình
    model.fit(X_train, y_train, epochs=15, batch_size=32, validation_data=(X_test, y_test))

    # Lưu mô hình
    model.save('error_classifier.h5')

    # Đánh giá mô hình
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f'Test accuracy: {accuracy:.4f}')

if __name__ == '__main__':
    main()

## Lưu ý
- Đảm bảo thư mục dữ liệu tồn tại và chứa video (.mp4), file JSON chú thích, và possible_errors.json.
- Cần chạy ModelPhase trước để tạo phase_classifier.h5.
- Kiểm tra lỗi nếu keypoint detection thất bại hoặc dữ liệu không hợp lệ.