In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import mediapipe as mp

def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360 - angle

    return angle

def extract_features(video_path, exercise_name):
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    cap = cv2.VideoCapture(video_path)
    features = []

    previous_frame_landmarks = None

    while cap.isOpened():
        success, image = cap.read()
        if not success:
            break

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = pose.process(image)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark

            # Extract normalized joint coordinates (x,y,z) for all 33 joints
            joint_coordinates = np.array([[landmark.x, landmark.y, landmark.z] for landmark in landmarks]).flatten()

            # Key joint angles
            right_knee_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]
            )
            left_knee_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
            )
            right_hip_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            )
            left_hip_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
            )
            right_elbow_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            )
            left_elbow_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
            )
            right_shoulder_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            )
            left_shoulder_angle = calculate_angle(
                [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y],
                [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
            )

            # Torso lean angle approx: angle between shoulders and hips with vertical
            left_shoulder_xy = np.array([landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y])
            right_shoulder_xy = np.array([landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y])
            left_hip_xy = np.array([landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y])
            right_hip_xy = np.array([landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y])
            shoulder_midpoint = (left_shoulder_xy + right_shoulder_xy) / 2
            hip_midpoint = (left_hip_xy + right_hip_xy) / 2
            vertical_vector = np.array([0, -1])
            torso_vector = hip_midpoint - shoulder_midpoint
            torso_angle_rad = np.arccos(np.clip(np.dot(torso_vector, vertical_vector) / (np.linalg.norm(torso_vector) * np.linalg.norm(vertical_vector) + 1e-6), -1.0, 1.0))
            torso_lean_angle = np.degrees(torso_angle_rad)

            # Depth of squat (only for squat exercise)
            depth_of_squat = 0
            if exercise_name == 'squat':
                depth_of_squat = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y - landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y

            # Center of mass approximation
            center_of_mass_x = np.mean([landmark.x for landmark in [
                landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                landmarks[mp_pose.PoseLandmark.LEFT_HIP.value],
                landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
            ]])
            center_of_mass_y = np.mean([landmark.y for landmark in [
                landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
                landmarks[mp_pose.PoseLandmark.LEFT_HIP.value],
                landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
            ]])

            # Left vs Right comparison for knee and hip angle difference
            knee_angle_diff = abs(right_knee_angle - left_knee_angle)
            hip_angle_diff = abs(right_hip_angle - left_hip_angle)
            elbow_angle_diff = abs(right_elbow_angle - left_elbow_angle)
            shoulder_angle_diff = abs(right_shoulder_angle - left_shoulder_angle)

            # Frame difference (pixel-wise difference on joint coordinates to previous frame)
            frame_diff = 0
            if extract_features.prev_landmarks is not None:
                current = np.array([[landmark.x, landmark.y, landmark.z] for landmark in landmarks])
                previous = np.array([[landmark.x, landmark.y, landmark.z] for landmark in extract_features.prev_landmarks])
                frame_diff = np.linalg.norm(current - previous)
            extract_features.prev_landmarks = landmarks

            # Compilation of features
            # 33 joints * 3 coordinates
            feature_vector = np.concatenate([
                joint_coordinates,
                np.array([
                    right_knee_angle, left_knee_angle, right_hip_angle, left_hip_angle,
                    right_elbow_angle, left_elbow_angle, right_shoulder_angle, left_shoulder_angle,
                    torso_lean_angle, depth_of_squat,
                    center_of_mass_x, center_of_mass_y,
                    knee_angle_diff, hip_angle_diff, elbow_angle_diff, shoulder_angle_diff,
                    frame_diff
                ])
            ])

            features.append(feature_vector)

    cap.release()
    pose.close()

    return features

# Initialize static variable for frame difference
extract_features.prev_landmarks = None

def save_features(features, exercise_name):
    # Build column names
    columns = []
    for i in range(33):
        columns.extend([f'Joint_{i+1}_X', f'Joint_{i+1}_Y', f'Joint_{i+1}_Z'])

    angle_columns = [
        'Right_Knee_Angle', 'Left_Knee_Angle', 'Right_Hip_Angle', 'Left_Hip_Angle',
        'Right_Elbow_Angle', 'Left_Elbow_Angle', 'Right_Shoulder_Angle', 'Left_Shoulder_Angle',
        'Torso_Lean_Angle', 'Depth_of_Squat', 'Center_of_Mass_X', 'Center_of_Mass_Y',
        'Knee_Angle_Diff', 'Hip_Angle_Diff', 'Elbow_Angle_Diff', 'Shoulder_Angle_Diff',
        'Frame_Difference'
    ]

    columns.extend(angle_columns)

    df = pd.DataFrame(features, columns=columns)
    df.to_csv(f'{exercise_name}_features.csv', index=False)

def process_dataset(base_dir='dataset'):
    exercises = ['squat', 'deadlift', 'overhead_press']
    labels = ['correct', 'incorrect']
    
    for exercise in exercises:
        all_features = []
        all_labels = []
        for label in labels:
            folder_path = os.path.join(base_dir, exercise, label)
            if not os.path.exists(folder_path):
                print(f"Folder not found: {folder_path}. Skipping...")
                continue
            
            for video_file in os.listdir(folder_path):
                if not video_file.lower().endswith(('.mp4', '.mov', '.avi', '.mkv')):
                    continue
                video_path = os.path.join(folder_path, video_file)
                print(f"Processing {video_path} ...")
                extract_features.prev_landmarks = None  # Reset between videos
                feats = extract_features(video_path, exercise)
                all_features.extend(feats)
                all_labels.extend([label] * len(feats))

        if all_features:
            df = pd.DataFrame(all_features)
            # Add column names
            columns = []
            for i in range(33):
                columns.extend([f'Joint_{i+1}_X', f'Joint_{i+1}_Y', f'Joint_{i+1}_Z'])
            columns.extend([
                'Right_Knee_Angle', 'Left_Knee_Angle', 'Right_Hip_Angle', 'Left_Hip_Angle',
                'Right_Elbow_Angle', 'Left_Elbow_Angle', 'Right_Shoulder_Angle', 'Left_Shoulder_Angle',
                'Torso_Lean_Angle', 'Depth_of_Squat', 'Center_of_Mass_X', 'Center_of_Mass_Y',
                'Knee_Angle_Diff', 'Hip_Angle_Diff', 'Elbow_Angle_Diff', 'Shoulder_Angle_Diff',
                'Frame_Difference'
            ])
            df.columns = columns

            df['label'] = all_labels
            csv_path = f'{exercise}_features.csv'
            df.to_csv(csv_path, index=False)
            print(f"Saved features and labels for {exercise} to {csv_path}")

if __name__ == '__main__':
    process_dataset()


Processing dataset\squat\correct\1.mp4 ...
Processing dataset\squat\correct\10.mp4 ...
Processing dataset\squat\correct\12.mp4 ...
Processing dataset\squat\correct\17.mp4 ...
Processing dataset\squat\correct\2.mp4 ...
Processing dataset\squat\correct\20.mp4 ...
Processing dataset\squat\correct\21.mp4 ...
Processing dataset\squat\correct\23.mp4 ...
Processing dataset\squat\correct\26.mp4 ...
Processing dataset\squat\correct\28.mp4 ...
Processing dataset\squat\correct\3.mp4 ...
Processing dataset\squat\correct\32.mp4 ...
Processing dataset\squat\correct\34.mp4 ...
Processing dataset\squat\correct\36.mp4 ...
Processing dataset\squat\correct\38.mp4 ...
Processing dataset\squat\correct\4.mp4 ...
Processing dataset\squat\correct\41.mp4 ...
Processing dataset\squat\correct\42.mp4 ...
Processing dataset\squat\correct\44.mp4 ...
Processing dataset\squat\correct\8.mp4 ...
Processing dataset\squat\correct\IMG_6091.MOV ...
Processing dataset\squat\correct\IMG_6092.MOV ...
Processing dataset\squat\