## Import Modules

In [None]:
import cv2
import mediapipe as mp
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import plotly.graph_objects as go
import imageio
import pickle

import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
import keras_tuner as kt

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score, accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.ensemble import VotingClassifier
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder

## Constants

In [None]:
# Define all landmarks in uppercase
LANDMARKS = [
    "NOSE",
    "LEFT_EYE_INNER",
    "LEFT_EYE",
    "LEFT_EYE_OUTER",
    "RIGHT_EYE_INNER",
    "RIGHT_EYE",
    "RIGHT_EYE_OUTER",
    "LEFT_EAR",
    "RIGHT_EAR",
    "MOUTH_LEFT",
    "MOUTH_RIGHT",
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "LEFT_ELBOW",
    "RIGHT_ELBOW",
    "LEFT_WRIST",
    "RIGHT_WRIST",
    "LEFT_PINKY",
    "RIGHT_PINKY",
    "LEFT_INDEX",
    "RIGHT_INDEX",
    "LEFT_THUMB",
    "RIGHT_THUMB",
    "LEFT_HIP",
    "RIGHT_HIP",
    "LEFT_KNEE",
    "RIGHT_KNEE",
    "LEFT_ANKLE",
    "RIGHT_ANKLE",
    "LEFT_HEEL",
    "RIGHT_HEEL",
    "LEFT_FOOT_INDEX",
    "RIGHT_FOOT_INDEX"
]

BODY_ANGLES = ['left_elbow_angle',
 'right_elbow_angle',
 'left_armpit_angle',
 'right_armpit_angle',
 'left_hip_angle',
 'right_hip_angle',
 'left_knee_angle',
 'right_knee_angle',
 'left_side_collarbone_angle',
 'right_side_collarbone_angle']

BODY_HEIGHTS = [
    'shoulder_width_x', 'shoulder_width_y',
    'hip_width_x', 'hip_width_y',
    'left_elbow_wrist_distance_x', 'left_elbow_wrist_distance_y',
    'right_elbow_wrist_distance_x', 'right_elbow_wrist_distance_y',
    'left_elbow_shoulder_distance_x', 'left_elbow_shoulder_distance_y',
    'right_elbow_shoulder_distance_x', 'right_elbow_shoulder_distance_y',
    'left_shoulder_hip_alignment_x', 'left_shoulder_hip_alignment_y',
    'right_shoulder_hip_alignment_x', 'right_shoulder_hip_alignment_y',
    'left_knee_ankle_distance_x', 'left_knee_ankle_distance_y',
    'right_knee_ankle_distance_x', 'right_knee_ankle_distance_y',
    'left_knee_hip_distance_x', 'left_knee_hip_distance_y',
    'right_knee_hip_distance_x', 'right_knee_hip_distance_y',
    'left_hand_shoulder_distance_x', 'left_hand_shoulder_distance_y',
    'right_hand_shoulder_distance_x', 'right_hand_shoulder_distance_y',
    'left_hand_nose_distance_x', 'left_hand_nose_distance_y',
    'right_hand_nose_distance_x', 'right_hand_nose_distance_y',
    'left_hand_hip_distance_x', 'left_hand_hip_distance_y',
    'right_hand_hip_distance_x', 'right_hand_hip_distance_y',
    'foot_spread_x', 'foot_spread_y'
]


## Helper Functions

In [None]:
# Helper function to get coordinates of a landmark
def get_coords(landmark, df_frame):
    coords = df_frame[df_frame['landmark'] == landmark][['x', 'y', 'z']].values
    return coords[0] if len(coords) > 0 else None

# Calculate vector between two points
def calc_vector(p1, p2):
    return np.array(p2) - np.array(p1)

# Calculate angle between two vectors using the dot product
def angle_between(v1, v2):
    dot_product = np.dot(v1, v2)
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    cos_theta = dot_product / (norm_v1 * norm_v2)
    angle = np.arccos(np.clip(cos_theta, -1.0, 1.0))  # Clip to avoid numerical errors
    return np.degrees(angle)

def get_body_vecs(coordinates):
    # Upper arms
    left_upper_arm = calc_vector(coordinates['LEFT_SHOULDER'], coordinates['LEFT_ELBOW'])
    right_upper_arm = calc_vector(coordinates['RIGHT_SHOULDER'], coordinates['RIGHT_ELBOW'])

    # Forearms
    left_forearm = calc_vector(coordinates['LEFT_ELBOW'], coordinates['LEFT_WRIST'])
    right_forearm = calc_vector(coordinates['RIGHT_ELBOW'], coordinates['RIGHT_WRIST'])

    # Upper legs
    left_upper_leg = calc_vector(coordinates['LEFT_HIP'], coordinates['LEFT_KNEE'])
    right_upper_leg = calc_vector(coordinates['RIGHT_HIP'], coordinates['RIGHT_KNEE'])

    # Lower legs
    left_lower_leg = calc_vector(coordinates['LEFT_KNEE'], coordinates['LEFT_ANKLE'])
    right_lower_leg = calc_vector(coordinates['RIGHT_KNEE'], coordinates['RIGHT_ANKLE'])

    # Groin (hip-to-hip vector)
    groin = calc_vector(coordinates['LEFT_HIP'], coordinates['RIGHT_HIP'])

    # Collarbones (shoulder-to-shoulder vector)
    collarbones = calc_vector(coordinates['LEFT_SHOULDER'], coordinates['RIGHT_SHOULDER'])

    # Sides 
    right_side = calc_vector(coordinates['RIGHT_SHOULDER'], coordinates['RIGHT_HIP'])
    left_side = calc_vector(coordinates['LEFT_SHOULDER'], coordinates['LEFT_HIP'])

    # Return all vectors in a dictionary
    return {
        "left_upper_arm": left_upper_arm,
        "right_upper_arm": right_upper_arm,
        "left_forearm": left_forearm,
        "right_forearm": right_forearm,
        "left_upper_leg": left_upper_leg,
        "right_upper_leg": right_upper_leg,
        "left_lower_leg": left_lower_leg,
        "right_lower_leg": right_lower_leg,
        "groin": groin,
        "collarbones": collarbones,
        "right_side": right_side,
        "left_side": left_side
    }

# Calculate Angles
def calculate_body_angles(vectors):
    # Elbow angles (between upper arm and forearm)
    left_elbow_angle = angle_between(vectors['left_upper_arm'], vectors['left_forearm'])
    right_elbow_angle = angle_between(vectors['right_upper_arm'], vectors['right_forearm'])

    # Armpit angles (between upper arm and collarbones)
    left_armpit_angle = angle_between(vectors['left_upper_arm'], vectors['collarbones'])
    right_armpit_angle = angle_between(vectors['right_upper_arm'], vectors['collarbones'])

    # Hip angles (between upper leg and side vectors, representing torso alignment)
    left_hip_angle = angle_between(vectors['left_upper_leg'], vectors['left_side'])
    right_hip_angle = angle_between(vectors['right_upper_leg'], vectors['right_side'])

    # Knee angles (between upper leg and lower leg)
    left_knee_angle = angle_between(vectors['left_upper_leg'], vectors['left_lower_leg'])
    right_knee_angle = angle_between(vectors['right_upper_leg'], vectors['right_lower_leg'])

    # Side to collarbone angles (lateral torso tilt)
    left_side_collarbone_angle = angle_between(vectors['left_side'], vectors['collarbones'])
    right_side_collarbone_angle = angle_between(vectors['right_side'], vectors['collarbones'])

    # Return all angles in a dictionary
    return {
        "left_elbow_angle": left_elbow_angle,
        "right_elbow_angle": right_elbow_angle,
        "left_armpit_angle": left_armpit_angle,
        "right_armpit_angle": right_armpit_angle,
        "left_hip_angle": left_hip_angle,
        "right_hip_angle": right_hip_angle,
        "left_knee_angle": left_knee_angle,
        "right_knee_angle": right_knee_angle,
        "left_side_collarbone_angle": left_side_collarbone_angle,
        "right_side_collarbone_angle": right_side_collarbone_angle
    }

# Helper function to calculate differences in x and y directions between two points
def calc_xy_difference(p1, p2):
    return p2[0] - p1[0], p2[1] - p1[1] 

# Calculate Heights
def calculate_body_heights(coordinates):
    # Calculate relevant x and y differences
    heights = {
        # Shoulder width (left shoulder to right shoulder)
        "shoulder_width_x": calc_xy_difference(coordinates['LEFT_SHOULDER'], coordinates['RIGHT_SHOULDER'])[0],
        "shoulder_width_y": calc_xy_difference(coordinates['LEFT_SHOULDER'], coordinates['RIGHT_SHOULDER'])[1],

        # Hip width (left hip to right hip)
        "hip_width_x": calc_xy_difference(coordinates['LEFT_HIP'], coordinates['RIGHT_HIP'])[0],
        "hip_width_y": calc_xy_difference(coordinates['LEFT_HIP'], coordinates['RIGHT_HIP'])[1],

        # Elbow-Wrist distance for arms
        "left_elbow_wrist_distance_x": calc_xy_difference(coordinates['LEFT_ELBOW'], coordinates['LEFT_WRIST'])[0],
        "left_elbow_wrist_distance_y": calc_xy_difference(coordinates['LEFT_ELBOW'], coordinates['LEFT_WRIST'])[1],
        "right_elbow_wrist_distance_x": calc_xy_difference(coordinates['RIGHT_ELBOW'], coordinates['RIGHT_WRIST'])[0],
        "right_elbow_wrist_distance_y": calc_xy_difference(coordinates['RIGHT_ELBOW'], coordinates['RIGHT_WRIST'])[1],

        # Elbow-Shoulder distance for arms
        "left_elbow_shoulder_distance_x": calc_xy_difference(coordinates['LEFT_ELBOW'], coordinates['LEFT_SHOULDER'])[0],
        "left_elbow_shoulder_distance_y": calc_xy_difference(coordinates['LEFT_ELBOW'], coordinates['LEFT_SHOULDER'])[1],
        "right_elbow_shoulder_distance_x": calc_xy_difference(coordinates['RIGHT_ELBOW'], coordinates['RIGHT_SHOULDER'])[0],
        "right_elbow_shoulder_distance_y": calc_xy_difference(coordinates['RIGHT_ELBOW'], coordinates['RIGHT_SHOULDER'])[1],

        # Shoulder-Hip alignment for left and right sides
        "left_shoulder_hip_alignment_x": calc_xy_difference(coordinates['LEFT_SHOULDER'], coordinates['LEFT_HIP'])[0],
        "left_shoulder_hip_alignment_y": calc_xy_difference(coordinates['LEFT_SHOULDER'], coordinates['LEFT_HIP'])[1],
        "right_shoulder_hip_alignment_x": calc_xy_difference(coordinates['RIGHT_SHOULDER'], coordinates['RIGHT_HIP'])[0],
        "right_shoulder_hip_alignment_y": calc_xy_difference(coordinates['RIGHT_SHOULDER'], coordinates['RIGHT_HIP'])[1],

        # Knee-Ankle distance for legs
        "left_knee_ankle_distance_x": calc_xy_difference(coordinates['LEFT_KNEE'], coordinates['LEFT_ANKLE'])[0],
        "left_knee_ankle_distance_y": calc_xy_difference(coordinates['LEFT_KNEE'], coordinates['LEFT_ANKLE'])[1],
        "right_knee_ankle_distance_x": calc_xy_difference(coordinates['RIGHT_KNEE'], coordinates['RIGHT_ANKLE'])[0],
        "right_knee_ankle_distance_y": calc_xy_difference(coordinates['RIGHT_KNEE'], coordinates['RIGHT_ANKLE'])[1],

        # Knee-Hip distance for legs
        "left_knee_hip_distance_x": calc_xy_difference(coordinates['LEFT_KNEE'], coordinates['LEFT_HIP'])[0],
        "left_knee_hip_distance_y": calc_xy_difference(coordinates['LEFT_KNEE'], coordinates['LEFT_HIP'])[1],
        "right_knee_hip_distance_x": calc_xy_difference(coordinates['RIGHT_KNEE'], coordinates['RIGHT_HIP'])[0],
        "right_knee_hip_distance_y": calc_xy_difference(coordinates['RIGHT_KNEE'], coordinates['RIGHT_HIP'])[1],

        # Wrist-Shoulder distance for arms
        "left_hand_shoulder_distance_x": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['LEFT_SHOULDER'])[0],
        "left_hand_shoulder_distance_y": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['LEFT_SHOULDER'])[1],
        "right_hand_shoulder_distance_x": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['RIGHT_SHOULDER'])[0],
        "right_hand_shoulder_distance_y": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['RIGHT_SHOULDER'])[1],

        # Wrist-Nose distance
        "left_hand_nose_distance_x": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['NOSE'])[0],
        "left_hand_nose_distance_y": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['NOSE'])[1],
        "right_hand_nose_distance_x": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['NOSE'])[0],
        "right_hand_nose_distance_y": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['NOSE'])[1],

        # Wrist-Hip distance
        "left_hand_hip_distance_x": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['LEFT_HIP'])[0],
        "left_hand_hip_distance_y": calc_xy_difference(coordinates['LEFT_WRIST'], coordinates['LEFT_HIP'])[1],
        "right_hand_hip_distance_x": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['RIGHT_HIP'])[0],
        "right_hand_hip_distance_y": calc_xy_difference(coordinates['RIGHT_WRIST'], coordinates['RIGHT_HIP'])[1],

        # Foot spread (left foot index to right foot index)
        "foot_spread_x": calc_xy_difference(coordinates['LEFT_FOOT_INDEX'], coordinates['RIGHT_FOOT_INDEX'])[0],
        "foot_spread_y": calc_xy_difference(coordinates['LEFT_FOOT_INDEX'], coordinates['RIGHT_FOOT_INDEX'])[1]
    }

    return heights


def calculate_angle(A, B, C):
    """
    Calculate the angle (in degrees) between vectors AB and BC.
    A, B, C are tuples representing (x, y) coordinates.
    """
    # Create vectors AB and BC
    AB = np.array([B[0] - A[0], B[1] - A[1]])
    BC = np.array([C[0] - B[0], C[1] - B[1]])

    # Calculate the dot product and magnitudes (norms) of the vectors
    dot_product = np.dot(AB, BC)
    magnitude_AB = np.linalg.norm(AB)
    magnitude_BC = np.linalg.norm(BC)

    # Calculate the cosine of the angle
    cos_theta = dot_product / (magnitude_AB * magnitude_BC)

    # Handle numerical precision issues (cosine should be between -1 and 1)
    cos_theta = np.clip(cos_theta, -1.0, 1.0)

    # Calculate the angle in radians and convert to degrees
    angle_rad = np.arccos(cos_theta)
    angle_deg = np.degrees(angle_rad)

    return angle_deg

def create_features_per_excercise(video_folder, excercise_type,mp_pose):
    excercise_path = os.path.join(video_folder, excercise_type)  # Construct the full path
    vid_count = 0
    csv_data = []
    if os.path.isdir(excercise_path):  # Ensure it's a directory
        for video_file in tqdm(os.listdir(excercise_path)):
            video_path = os.path.join(excercise_path, video_file)  
            # print(f"Processing video: {video_path}")

            # Open video file
            cap = cv2.VideoCapture(video_path)

            if not cap.isOpened():
                print(f"Error: Could not open video {video_path}.")
                continue
            vid_count += 1
            frame_number = 0

            # Initialize MediaPipe Pose with default settings
            with mp_pose.Pose() as pose:
                while cap.isOpened():
                    ret, frame = cap.read()
                    
                    if not ret:
                        # print("End of video or failed to read frame.")
                        break

                    # Convert the frame to RGB (MediaPipe works with RGB images)
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                    # Process the frame to detect pose landmarks
                    result = pose.process(frame_rgb)

                    # Draw pose landmarks if detected
                    if result.pose_landmarks:
                        for idx, landmark in enumerate(result.pose_landmarks.landmark):
                            # print(f"{mp_pose.PoseLandmark(idx).name}: (x: {landmark.x}, y: {landmark.y}, z: {landmark.z})")
                            csv_data.append([excercise_type,vid_count,frame_number, mp_pose.PoseLandmark(idx).name, landmark.x, landmark.y, landmark.z])
                    # Increment frame number
                    frame_number += 1
            # Release video capture and close all windows
            cap.release()
            cv2.destroyAllWindows()
    return csv_data

def feat_to_parquet(csv_data, excercise_name):
    df_feat = pd.DataFrame(csv_data)
    df_feat = df_feat.rename(columns={
        0: 'exc_name',
        1: 'vid_num',
        2: 'frame',
        3: 'landmark',
        4: 'x',
        5: 'y',
        6: 'z'
    })
    df_feat.to_parquet(f'{excercise_name}_features.parquet')

def pad_sequence(sequence_chunk, desired_sequence_length):
    num_rows_sequence = sequence_chunk.shape[0]
    num_columns_sequence = sequence_chunk.shape[1]

    # Determine the label value from the existing sequence
    label_value = sequence_chunk['exc_name_encoded'].iloc[0]  # Assumes the label is the same for the whole chunk

    # Create an empty sequence with the remaining rows, filled with 0
    empty_sequence = pd.DataFrame(
        np.full((desired_sequence_length - num_rows_sequence, num_columns_sequence), 0), 
        columns=sequence_chunk.columns
    )
    
    # Set the label column in `empty_sequence` to the correct label
    empty_sequence['exc_name_encoded'] = label_value

    # Concatenate the original sequence with the padded rows
    padded_sequence = pd.concat([sequence_chunk, empty_sequence], axis=0).reset_index(drop=True)

    return padded_sequence

## Initial Mediapipe Trial

### Run landmark detection on a video

In [None]:
# Initialize MediaPipe Pose and Drawing modules
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# Video capture path (ensure it points to your video correctly)
video_path = r'excercise_videos\barbell biceps curl\0b43a151-8995-4f7e-8568-45d65996a19c.mp4'
excercise_type_temp = 'bicep curl'
vid_count_temp = 1
cap = cv2.VideoCapture(video_path)

frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f'Total number of frames: {frame_count}')

# Get the frames per second (FPS)
fps = cap.get(cv2.CAP_PROP_FPS)
print(f'Frames per second (FPS): {fps}')

if not cap.isOpened():
    print(f"Error: Could not open video {video_path}.")
    exit()

frame_number = 0
csv_data = []
landmark_lst = []

# Initialize MediaPipe Pose with default settings
with mp_pose.Pose() as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("End of video or failed to read frame.")
            break

        # Convert the frame to RGB (MediaPipe works with RGB images)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame to detect pose landmarks
        result = pose.process(frame_rgb)

        # Draw pose landmarks if detected
        if result.pose_landmarks:
            mp_drawing.draw_landmarks(
                frame, result.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )

            frame_landmarks = result.pose_landmarks.landmark
            right_shoulder = (frame_landmarks[12].x,frame_landmarks[12].y)
            right_elbow = (frame_landmarks[14].x,frame_landmarks[14].y)
            right_wrist = (frame_landmarks[16].x,frame_landmarks[16].y)
            frame_angle = -(calculate_angle(right_wrist, right_elbow, right_shoulder) - 180)

            # Optional: Add landmark coordinates to CSV data
            for idx, landmark in enumerate(result.pose_landmarks.landmark):
                # print(f"{mp_pose.PoseLandmark(idx).name}: (x: {landmark.x}, y: {landmark.y}, z: {landmark.z})")
                csv_data.append([excercise_type_temp,vid_count_temp,frame_number, mp_pose.PoseLandmark(idx).name, landmark.x, landmark.y, landmark.z])
            landmark_lst.append(result.pose_landmarks)

            # Add text to the frame
        text = f"Angle {round(frame_angle,2)}"
        org = (50, 50)  # Coordinates for the text (x, y)
        font = cv2.FONT_HERSHEY_SIMPLEX  # Font type
        fontScale = 1  # Font size
        color = (0, 255, 0)  # Green text in BGR
        thickness = 2  # Thickness of the text
        lineType = cv2.LINE_AA  # Anti-aliased text

        # Write the text on the frame
        cv2.putText(frame, text, org, font, fontScale, color, thickness, lineType)


        # cv2.putText(frame, frame_angle)
        # Display the frame in a window
        cv2.imshow('MediaPipe Pose', frame)

        # Increment frame number
        frame_number += 1

        # Wait for 1ms and exit if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # Release video capture and close all windows
    cap.release()
    cv2.destroyAllWindows()
    print("Video processing complete.")

### Collect data for analysis

In [None]:
df_trail_run = pd.DataFrame(csv_data)
# Rename the columns from 0-5 to meaningful names
df_trail_run = df_trail_run.rename(columns={
    0: 'exc_name',
    1: 'vid_num',
    2: 'frame',
    3: 'landmark',
    4: 'x',
    5: 'y',
    6: 'z'
})
df_trail_run

In [None]:
# Look at what one frame looks like
df_trail_run[df_trail_run['frame'] == 0]

### Look at angles

In [None]:
frame_angles = []
for frame_num in np.unique(df_trail_run['frame']):
    right_elbow = (df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_ELBOW')].x.values[0], df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_ELBOW')].y.values[0])
    right_shoulder = (df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_SHOULDER')].x.values[0],df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_SHOULDER')].y.values[0])
    right_wrist = (df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_WRIST')].x.values[0],df_trail_run[(df_trail_run['frame'] == frame_num) & (df_trail_run['landmark'] == 'RIGHT_WRIST')].y.values[0])
    frame_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
    frame_angles.append(frame_angle)

In [None]:
plt.plot(range(len(frame_angles)),frame_angles)

### Working with the landmark list

In [None]:
right_shoulder = (landmark_lst[0].landmark[12].x,landmark_lst[0].landmark[12].y)
right_elbow = (landmark_lst[0].landmark[14].x,landmark_lst[0].landmark[14].y)
right_wrist = (landmark_lst[0].landmark[16].x,landmark_lst[0].landmark[16].y)
frame_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
frame_angle

### 3d plot

#### Initial 3d plot

In [None]:
# Initialize MediaPipe Pose module
mp_pose = mp.solutions.pose
POSE_CONNECTIONS = mp_pose.POSE_CONNECTIONS
frame_number = 0

# Filter the data for the desired frame
df_frame = df_trail_run[df_trail_run['frame'] == frame_number]

# Create a dictionary mapping landmark names to their (x, y, z) coordinates
landmarks = {row['landmark']: (row['x'], row['y'], row['z']) for _, row in df_frame.iterrows()}

# Initialize the 3D plot
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

# Plot landmarks as scatter points
for name, (x, y, z) in landmarks.items():
    ax.scatter(-z, x, y, label=name, s=50)  # Adjusting axes to align with desired orientation

# Helper function to get coordinates by landmark name
def get_coord(name):
    """Returns the (x, y, z) coordinates of a given landmark."""
    return landmarks.get(name, (None, None, None))

# Plot lines connecting the landmarks using POSE_CONNECTIONS
for connection in POSE_CONNECTIONS:
    start, end = connection
    start_name = mp_pose.PoseLandmark(start).name
    end_name = mp_pose.PoseLandmark(end).name

    # Get the coordinates of the two landmarks
    start_coord = get_coord(start_name)
    end_coord = get_coord(end_name)

    # Check if both landmarks are available
    if None not in start_coord and None not in end_coord:
        ax.plot(
            [-start_coord[2], -end_coord[2]],  # x-coordinates
            [start_coord[0], end_coord[0]],    # y-coordinates
            [start_coord[1], end_coord[1]],    # z-coordinates
            color='blue'
        )

# Set axis labels
ax.set_zlabel('y')
ax.set_xlabel('z')
ax.set_ylabel('x')

# Flip the Z-axis to ensure head is at the top and legs at the bottom
ax.invert_zaxis()

# Rotate, tilt, and twist the graph
ax.view_init(elev=10, azim=20)  # Adjust 'elev' and 'azim' as needed

# Set plot title
plt.title(f'3D Pose for Frame {frame_number}')

# Display the plot
plt.show()

#### Interactive 3d plot

In [None]:
# Initialize MediaPipe Pose module
mp_pose = mp.solutions.pose
POSE_CONNECTIONS = mp_pose.POSE_CONNECTIONS
frame_number = 0

# Filter the data for the desired frame
df_frame = df_trail_run[df_trail_run['frame'] == frame_number]

# Create a dictionary mapping landmark names to their (x, y, z) coordinates
landmarks = {row['landmark']: (row['x'], row['y'], row['z']) for _, row in df_frame.iterrows()}

# Prepare lists to store points and connections
x_vals, y_vals, z_vals, names = [], [], [], []

# Collect the landmarks for plotting
for name, (x, y, z) in landmarks.items():
    x_vals.append(x)
    y_vals.append(y)
    z_vals.append(z)
    names.append(name)

# Create a 3D scatter plot for landmarks
scatter = go.Scatter3d(
    x=x_vals, y=z_vals, z=y_vals,  # Swap axes to align with your desired orientation
    mode='markers+text',
    marker=dict(size=5, color='blue'),
    text=names,  # Show names as text labels
    textposition="top center"
)

# Create line segments for each connection
connections = []
for connection in POSE_CONNECTIONS:
    start, end = connection
    start_name = mp_pose.PoseLandmark(start).name
    end_name = mp_pose.PoseLandmark(end).name

    start_coord = landmarks.get(start_name, None)
    end_coord = landmarks.get(end_name, None)

    # Ensure both landmarks exist
    if start_coord and end_coord:
        connections.append(
            go.Scatter3d(
                x=[start_coord[0], end_coord[0]],
                y=[start_coord[2], end_coord[2]],  # Swap Y and Z axes
                z=[start_coord[1], end_coord[1]],
                mode='lines',
                line=dict(color='blue', width=2)
            )
        )

# Combine scatter plot and connections into a figure
fig = go.Figure(data=[scatter] + connections)

# Set axis labels and layout
fig.update_layout(
    scene=dict(
        xaxis_title='X',
        yaxis_title='Z',
        zaxis_title='Y',
        zaxis=dict(autorange='reversed')  # Ensure head is at the top and legs at the bottom
    ),
    title=f"3D Pose for Frame {frame_number}",
    showlegend=False
)

# Display the interactive plot
fig.show()


#### Turn 3d plot into movie with multiple frames

In [None]:
# Initialize MediaPipe Pose module
mp_pose = mp.solutions.pose
POSE_CONNECTIONS = mp_pose.POSE_CONNECTIONS

# Create a directory to store individual frame images
os.makedirs('frames', exist_ok=True)

# Determine the overall min and max values across all frames to set consistent axis limits
x_min, x_max = df_trail_run['x'].min(), df_trail_run['x'].max()
y_min, y_max = df_trail_run['y'].min(), df_trail_run['y'].max()
z_min, z_max = df_trail_run['z'].min(), df_trail_run['z'].max()

def get_coord(name, landmarks):
    """Returns the (x, y, z) coordinates of a given landmark."""
    return landmarks.get(name, (None, None, None))

# Generate and save plots for each frame
for frame_number in range(df_trail_run['frame'].max() + 1):
    # Filter the data for the desired frame
    df_trail_run_frame = df_trail_run[df_trail_run['frame'] == frame_number]

    # Create a dictionary mapping landmark names to their (x, y, z) coordinates
    landmarks = {row['landmark']: (row['x'], row['y'], row['z']) for _, row in df_trail_run_frame.iterrows()}

    # Initialize the 3D plot
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')

    # Plot landmarks as scatter points
    for name, (x, y, z) in landmarks.items():
        ax.scatter(-z, x, y, label=name, s=50)  # Adjusting axes to align with desired orientation

    # Plot lines connecting the landmarks using POSE_CONNECTIONS
    for connection in POSE_CONNECTIONS:
        start, end = connection
        start_name = mp_pose.PoseLandmark(start).name
        end_name = mp_pose.PoseLandmark(end).name

        start_coord = get_coord(start_name, landmarks)
        end_coord = get_coord(end_name, landmarks)

        # Check if both landmarks are available
        if None not in start_coord and None not in end_coord:
            ax.plot(
                [-start_coord[2], -end_coord[2]],  # x-coordinates
                [start_coord[0], end_coord[0]],    # y-coordinates
                [start_coord[1], end_coord[1]],    # z-coordinates
                color='blue'
            )

    # Set consistent axis limits
    ax.set_xlim([-z_max, -z_min])
    ax.set_ylim([x_min, x_max])
    ax.set_zlim([y_min, y_max])

    # Set axis labels
    ax.set_xlabel('Z')
    ax.set_ylabel('X')
    ax.set_zlabel('Y')

    # Flip the Z-axis to ensure head is at the top and legs at the bottom
    ax.invert_zaxis()

    # Maintain aspect ratio
    ax.set_box_aspect([1, 1, 1])  # Same scaling for all axes

    # Rotate, tilt, and twist the graph
    ax.view_init(elev=10, azim=20)  # Adjust 'elev' and 'azim' as needed

    # Set plot title
    plt.title(f'3D Pose for Frame {frame_number}')

    # Save the plot as an image
    filename = f'frames/frame_{frame_number:03d}.png'
    plt.savefig(filename)
    plt.close(fig)  # Close the figure to free memory

# Create a video from the saved images
with imageio.get_writer('pose_video_2.mp4', fps=10) as writer:
    for frame_number in range(df_trail_run['frame'].max() + 1):
        filename = f'frames/frame_{frame_number:03d}.png'
        image = imageio.imread(filename)
        writer.append_data(image)

print("Video saved as 'pose_video_2.mp4'")

## Features

### Extract Mediapipe Landmark Features per Excercise

In [None]:
add_new_feat = True
feat_not_to_add_lst = [
    "barbell biceps curl",
    "bench press",
    "deadlift",
    "lat pulldown",
    "pull up",
    "push-up",
    "squat",
    "t bar row",
    "tricep dips",
    "tricep pushdown",
    "chest fly machine"
]

In [None]:
# Initialize MediaPipe Pose and Drawing modules
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# Directory containing videos
video_folder = 'excercise_videos'  # Update this with the path to your videos folder

# Process all videos in the directory
for excercise_type in os.listdir(video_folder):
    if add_new_feat:
        if excercise_type not in feat_not_to_add_lst:
            print(excercise_type)
            csv_data = create_features_per_excercise(video_folder, excercise_type,mp_pose)
            feat_to_parquet(csv_data, excercise_type)
            print(excercise_type)
            
print("All videos processed.")

### Feature Engineering

In [None]:
exclude_lst = [
    'decline bench press_features.parquet',
    'incline bench press_features.parquet'
]

df_excercise = pd.DataFrame()
data_dir = r'\data'
for prqt in tqdm(os.listdir(data_dir)):
    if prqt not in exclude_lst:
        prqt_path = data_dir + '\\' + prqt
        df_temp = pd.read_parquet(prqt_path)
        df_excercise = pd.concat((df_excercise,df_temp), axis=0)

In [None]:
df_excercise = pd.read_parquet(r'\pose_detection\video_features_20.parquet')

In [None]:
df_videos = df_excercise.copy()
df_videos.head()

### Make the Features

#### Trial Run --> One Video

In [None]:
vid_num = 1
frame_num = 10

df_trial = df_videos[(df_videos['exc_name'] == 'bench press') & 
    (df_videos['vid_num'] == vid_num) & 
    (df_videos['frame'] == frame_num)
    ]
df_trial

In [None]:
# Retrieve and store coordinates for all landmarks in a dictionary
coordinates = {landmark: get_coords(landmark, df_trial) for landmark in LANDMARKS}
body_vecs = get_body_vecs(coordinates)
body_angles = calculate_body_angles(body_vecs)
body_heights = calculate_body_heights(coordinates)

#### All Videos

In [None]:
# Reshape: Group by 'vid_num' to create sequences (each sequence = one video)
label_encoder = LabelEncoder()
df_videos['exc_name_encoded'] = label_encoder.fit_transform(df_videos['exc_name'])
xyz_feat = [f'{axis}_{i}' for i in range(33) for axis in ['x', 'y', 'z']]

# Parameters
NUM_LANDMARKS = 33  
FEATURES_PER_LANDMARK = 3  
FRAME_FEATURES = NUM_LANDMARKS * FEATURES_PER_LANDMARK

# Ensure consistent landmark order across frames
expected_landmarks = df_videos['landmark'].unique()

# Group by video number (vid_num)
grouped_videos = df_videos.groupby(['exc_name', 'vid_num'])

# Initialize lists for videos and labels
df_videos = pd.DataFrame()
count = 0
# Process each video
for (exercise_name, vid_num), group in tqdm(grouped_videos):
    try:
        count += 1
        # Sort by frame to ensure correct sequence
        group = group.sort_values(by='frame')

        # Store frames for this video
        df_vid = pd.DataFrame()

        # Group by frame number within each video
        for frame_num, frame in group.groupby('frame'):
            # Ensure all 33 landmarks are present; fill missing ones with zeros
            frame = frame.set_index('landmark').reindex(expected_landmarks, fill_value=0)
            # Flatten the landmarks for this frame (shape: (99,))
            df_frame_x = pd.DataFrame([frame[['x', 'y', 'z']].values.flatten()], columns=xyz_feat)
            # Get the body features
            coordinates = {landmark: get_coords(landmark, frame.reset_index()) for landmark in LANDMARKS}
            body_vecs = get_body_vecs(coordinates)
            # Get the angles
            body_angles = calculate_body_angles(body_vecs)
            df_angle_frame_x = pd.DataFrame([body_angles],columns=BODY_ANGLES)
            df_frame_x = pd.concat((df_frame_x,df_angle_frame_x), axis=1)
            # Get the heights
            body_heights = calculate_body_heights(coordinates)
            df_height_frame_x = pd.DataFrame([body_heights],columns=BODY_HEIGHTS)
            df_frame_x = pd.concat((df_frame_x,df_height_frame_x), axis=1)
            
            # Record the video number and frame number
            df_frame_vid = pd.DataFrame({
                'vid_num': [vid_num],
                'frame_num': [frame_num]
            })
            df_frame_x = pd.concat((df_frame_x,df_frame_vid), axis=1)

            # Append this frame's info to the video DataFrame
            df_vid = pd.concat((df_vid, df_frame_x), axis=0)
            
            # Add the label as a final column
            # Add the label column
            df_frame_x['label'] = exercise_name

            df_vid = pd.concat((df_vid,df_frame_x), axis = 0)
        
        df_videos = pd.concat((df_videos,df_vid), axis=0)
        # if count == 2:
        #     break

    except Exception as e:
        
        print(exercise_name, e)
        break

#### Pad and Split

In [None]:
df_videos = pd.read_parquet('video_features_22.parquet')

# Select Which Features to use
df_videos = df_videos[~df_videos['label'].isin(['decline bench press', 'incline bench press','romanian deadlift'])]

In [None]:
label_encoder = LabelEncoder()
df_videos['exc_name_encoded'] = label_encoder.fit_transform(df_videos['label'])


# Save the LabelEncoder as a pickle file
with open('label_encoder.pkl', 'wb') as file:
    pickle.dump(label_encoder, file)

In [None]:
# Define the sequence length
desired_sequence_length = 30

# Group by 'vid_num' and get the number of videos
grouped_videos = df_videos.groupby(['label','vid_num'])

# Initialize a list to store split data
all_sequences = []
all_labels = []

for excercise_type, group_data_excercise in grouped_videos:
    group_data_excercise = group_data_excercise.reset_index(drop=True)
    
    # Process in chunks of `desired_sequence_length`
    for i in range(0, group_data_excercise.shape[0], desired_sequence_length):
        # Select the first `desired_sequence_length` rows
        sequence_chunk = group_data_excercise.iloc[i:i + desired_sequence_length, :]
        if sequence_chunk.shape[0] == desired_sequence_length:
            # Append the sequence to our list
            all_sequences.append(sequence_chunk.drop(['label','exc_name_encoded','vid_num','frame_num'], axis=1))
            all_labels.append(sequence_chunk['exc_name_encoded'].reset_index(drop=True)[0])
        else:
            padded_sequence = pad_sequence(sequence_chunk,desired_sequence_length )
            all_sequences.append(padded_sequence.drop(['label','exc_name_encoded','vid_num','frame_num'], axis=1))
            all_labels.append(padded_sequence['exc_name_encoded'].reset_index(drop=True)[0])

all_sequences = np.array(all_sequences)
all_labels = np.array(all_labels)

## Model

### Split the data

In [None]:
# Train/test split (80% train, 20% test)
train_X, test_X, train_y, test_y = train_test_split(
    all_sequences, all_labels, test_size=0.2, random_state=42, stratify=all_labels
)

# Check shapes
print(f"Train X Shape: {train_X.shape}, Train y Shape: {train_y.shape}")
print(f"Test X Shape: {test_X.shape}, Test y Shape: {test_y.shape}")

# Ensure labels are in the correct dtype (int32)
train_y = train_y.astype(np.int32)
test_y = test_y.astype(np.int32)

### Train Models

#### Transformer Model

In [None]:
def transformer_classifier(sequence_len, input_dim, num_classes, 
                           d_model=64, num_heads=4, ff_dim=128, 
                           num_layers=2, dropout_rate=0.2):
    # Input layer
    inputs = tf.keras.Input(shape=(sequence_len, input_dim))

    # Project input features to d_model dimensions
    x = tf.keras.layers.Dense(d_model)(inputs)

    # Positional Encoding with learnable embeddings
    position_embedding = tf.keras.layers.Embedding(input_dim=sequence_len, output_dim=d_model)
    positions = tf.range(start=0, limit=sequence_len, delta=1)
    pos_encoding = position_embedding(positions)
    x = x + pos_encoding  # Add positional encoding to input

    # Transformer Encoder Layers
    for _ in range(num_layers):
        # Multi-Head Attention
        attn_output = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model, dropout=dropout_rate)(x, x)
        attn_output = tf.keras.layers.Dropout(dropout_rate)(attn_output)
        attn_output = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x + attn_output)

        # Feedforward Network
        ff_output = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dropout(dropout_rate),
            tf.keras.layers.Dense(d_model),
        ])(attn_output)
        x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attn_output + ff_output)

    # Global Average Pooling
    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    # Dropout before final output
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    # Output layer for classification
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    # Build and compile the model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model


In [None]:
# Set hyperparameters
num_classes = len(np.unique(train_y))  # Number of exercise classes
sequence_len = train_X.shape[1]  # Number of frames (max_frames)
input_dim = train_X.shape[2]  # 99 features per frame

# Learning rate schedule
lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate=1e-3, decay_steps=10000, alpha=1e-5
)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

# Build and compile the model
transform_model = transformer_classifier(sequence_len, input_dim, num_classes)
transform_model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Print model summary
transform_model.summary()

# Flatten `all_labels` to ensure it's 1D
all_labels = np.array(all_labels).flatten()

# Calculate class weights
classes = np.unique(all_labels)
class_weights = compute_class_weight('balanced', classes=classes, y=all_labels)
class_weights_dict = dict(enumerate(class_weights))

print("Class weights:", class_weights_dict)

In [None]:
# Define the checkpoint to save the best model weights based on validation accuracy
checkpoint_callback = ModelCheckpoint(
    filepath='transformer_19.h5',   
    monitor='val_accuracy',             
    save_best_only=True,                
    mode='max',                         
    verbose=1                           
)

In [None]:
# Train the model with class weights
history = transform_model.fit(
    train_X,
    train_y,
    validation_data=(test_X, test_y),
    batch_size=64,
    epochs=50,
    class_weight=class_weights_dict,
    callbacks=[checkpoint_callback]
)

# Evaluate the model
loss, accuracy = transform_model.evaluate(test_X, test_y)
print(f'Test Loss: {loss:.4f}, Test Accuracy: {accuracy:.4f}')

In [None]:
transform_model.load_weights('transformer_19.h5')
transform_model.save('transformer_19_30_frames.h5')

In [None]:
y_pred = np.argmax(transform_model.predict(test_X), axis=1)
accuracy = accuracy_score(test_y, y_pred)
print(accuracy)

# Create confusion matrix
cm = confusion_matrix(test_y, y_pred)
display_labels = np.unique(test_y)
# Assuming y_test contains the true labels and final_predictions are the ensemble predictions

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

#### Hyperparamater Optimization

In [None]:
def model_builder(hp):
    # Automatically determine the number of unique classes
    num_classes = len(np.unique(train_y))  # Make sure train_y is defined

    # Hyperparameters to tune
    d_model = hp.Int('d_model', min_value=64, max_value=256, step=64)
    num_heads = hp.Choice('num_heads', values=[4, 8, 12])
    ff_dim = hp.Int('ff_dim', min_value=128, max_value=512, step=128)
    num_layers = hp.Int('num_layers', min_value=1, max_value=4, step=1)
    dropout_rate = hp.Float('dropout_rate', min_value=0.1, max_value=0.5, step=0.1)
    learning_rate = hp.Choice('learning_rate', values=[1e-4, 5e-5, 1e-5])

    # Build the Transformer model
    inputs = tf.keras.Input(shape=(60, 99))  # Adjust based on your input shape
    x = tf.keras.layers.Dense(d_model)(inputs)

    position_embedding = tf.keras.layers.Embedding(input_dim=60, output_dim=d_model)
    positions = tf.range(start=0, limit=60, delta=1)
    pos_encoding = position_embedding(positions)
    pos_encoding = tf.keras.layers.Dense(d_model)(pos_encoding)
    x = x + pos_encoding

    for _ in range(num_layers):
        x_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        attn_output = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model, dropout=dropout_rate)(x_norm, x_norm)
        attn_output = tf.keras.layers.Dropout(dropout_rate)(attn_output)
        x = x + attn_output

        ff_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        ff_output = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dropout(dropout_rate),
            tf.keras.layers.Dense(d_model)
        ])(ff_norm)
        x = x + ff_output

    x = tf.keras.layers.GlobalAveragePooling1D()(x)
    x = tf.keras.layers.Dropout(dropout_rate)(x)

    # Output layer with correct number of classes
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    # Compile the model
    model = tf.keras.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

In [None]:
tuner = kt.Hyperband(
    model_builder,
    objective='val_accuracy',
    max_epochs=20,
    factor=3,  
    directory='my_tuning_dir',
    project_name='transformer_tuning'
)

In [None]:
tuner.search(train_X, tf.keras.utils.to_categorical(train_y),
             validation_data=(test_X, tf.keras.utils.to_categorical(test_y)),
             epochs=50, batch_size=32)

# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best d_model: {best_hps.get('d_model')}")
print(f"Best num_heads: {best_hps.get('num_heads')}")
print(f"Best ff_dim: {best_hps.get('ff_dim')}")
print(f"Best learning_rate: {best_hps.get('learning_rate')}")

In [None]:
def build_final_model(best_hps):
    num_classes = len(np.unique(train_y))
    # Extract hyperparameters from the best_hps object
    d_model = best_hps.get('d_model')
    num_heads = best_hps.get('num_heads')
    ff_dim = best_hps.get('ff_dim')
    num_layers = best_hps.get('num_layers')  
    dropout_rate = best_hps.get('dropout_rate')
    learning_rate = best_hps.get('learning_rate')

    # Define the Transformer model using the best hyperparameters
    inputs = tf.keras.Input(shape=(60, 99))  
    x = tf.keras.layers.Dense(d_model)(inputs)

    # Positional encoding
    position_embedding = tf.keras.layers.Embedding(input_dim=60, output_dim=d_model)
    positions = tf.range(start=0, limit=60, delta=1)
    pos_encoding = position_embedding(positions)
    pos_encoding = tf.keras.layers.Dense(d_model)(pos_encoding)
    x = x + pos_encoding

    # Transformer encoder layers
    for _ in range(num_layers):
        x_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        attn_output = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model, dropout=dropout_rate)(x_norm, x_norm)
        attn_output = tf.keras.layers.Dropout(dropout_rate)(attn_output)
        x = x + attn_output  # Residual connection

        ff_norm = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        ff_output = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dropout(dropout_rate),
            tf.keras.layers.Dense(d_model),
        ])(ff_norm)
        x = x + ff_output  

    # Global average pooling
    x = tf.keras.layers.GlobalAveragePooling1D()(x)

    # Final dropout and output layer
    x = tf.keras.layers.Dropout(dropout_rate)(x)
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    # Compile the model with the best learning rate
    model = tf.keras.Model(inputs, outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

In [None]:
# Build the model with the best hyperparameters
final_model = build_final_model(best_hps)

# Print the model summary to verify the architecture
final_model.summary()

# Train the model
history = final_model.fit(
    train_X, tf.keras.utils.to_categorical(train_y),
    validation_data=(test_X, tf.keras.utils.to_categorical(test_y)),
    epochs=200, batch_size=32,  
    callbacks=[
        tf.keras.callbacks.ModelCheckpoint('best_final_model.h5', save_best_only=True)
    ]
)


In [None]:
y_pred = np.argmax(final_model.predict(test_X), axis=1)
# Create confusion matrix
cm = confusion_matrix(test_y, y_pred)
display_labels = np.unique(test_y)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

## Test Model Real Time

In [None]:
transform_model = tf.keras.models.load_model('transformer_19.h5')

#### Mediapipe features live

In [None]:
# Initialize Mediapipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils


# Parameters
NUM_LANDMARKS = 33  
FEATURES_PER_LANDMARK = 3  
FRAME_FEATURES =  147 
WINDOW_SIZE = 120 
chosen_model = transform_model
# Video source
video_path = r'\pose_detection\Random Workout clips.mp4'
cap = cv2.VideoCapture(video_path)

# Initialize a buffer for 60 frames
buffer = []

# Create a named window and set its size only once
cv2.namedWindow("Live Prediction", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Live Prediction", 300, 300)  # Set the window to the desired size

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Process the frame with Mediapipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )
        # Extract landmarks and flatten to (99,)
        landmarks = np.array([[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]).flatten()

        if landmarks.shape[0] == FRAME_FEATURES:
            # Add the frame to the buffer
            buffer.append(landmarks)

            # If buffer exceeds 60 frames, remove the oldest frame
            if len(buffer) > WINDOW_SIZE:
                buffer.pop(0)

            # Make a prediction when buffer has exactly 60 frames
            if len(buffer) == WINDOW_SIZE:
                input_data = np.expand_dims(np.array(buffer), axis=0)

                # Uncomment after loading your model
                prediction = chosen_model.predict(input_data)
                print(prediction)
                predicted_label = label_encoder.inverse_transform(np.argmax(prediction, axis=1))

                # Simulate a prediction label for now
                # predicted_label = "Sample Exercise"
                
                # Display the prediction on the frame
                cv2.putText(frame, f'Exercise: {predicted_label}', (10, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    # Show the frame in the correctly sized window
    cv2.imshow("Live Prediction", frame)

    # Exit on 'q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup
cap.release()
cv2.destroyAllWindows()
pose.close()

#### Many Features live

In [None]:
# Initialize Mediapipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

xyz_feat = [f'{axis}_{i}' for i in range(33) for axis in ['x', 'y', 'z']]

# Parameters
NUM_FEATURES =  147 
WINDOW_SIZE = 60  # Adjust to your model input size
chosen_model = transform_model
# Video source
video_path = r"pose_detection\doron_random_filter.mp4"
cap = cv2.VideoCapture(video_path)

# Initialize a buffer for 60 frames
buffer = []
predicted_label = None

# Create a named window and set its size only once
cv2.namedWindow("Live Prediction", cv2.WINDOW_NORMAL)
cv2.resizeWindow("Live Prediction", 300, 300)  # Set the window to the desired size
frame_feature_sequences = []
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break    

    # Process the frame with Mediapipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )
        landmarks = np.array([[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]).flatten()
        coordinates = {}                      
        for idx, landmark in enumerate(results.pose_landmarks.landmark):
            # print(f"{mp_pose.PoseLandmark(idx).name}: (x: {landmark.x}, y: {landmark.y}, z: {landmark.z})")
            coordinates[mp_pose.PoseLandmark(idx).name] = [landmark.x, landmark.y, landmark.z]       

        body_vecs = get_body_vecs(coordinates)
        # Get the angles
        body_angles = calculate_body_angles(body_vecs)
        # Get the heights
        body_heights = calculate_body_heights(coordinates)

        frame_feature_sequence = list(landmarks) + list(body_angles.values()) + list(body_heights.values())

        frame_feature_sequences.append(frame_feature_sequence)      
        # print(len(frame_feature_sequences))
        if len(frame_feature_sequences) == WINDOW_SIZE:
                frame_feature_array = np.array(frame_feature_sequences)
                frame_feature_array = np.expand_dims(frame_feature_array, axis=0)
                # Uncomment after loading your model
                prediction = chosen_model.predict(frame_feature_array)
            
                predicted_label = label_encoder.inverse_transform(np.argmax(prediction, axis=1))
                print(predicted_label)
                # Display the prediction on the frame
                cv2.putText(frame, f'Exercise: {predicted_label}', (10, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                
                frame_feature_sequences = []
        
        else:
            if predicted_label:
                                  # Display the prediction on the frame
                cv2.putText(frame, f'Exercise: {predicted_label}', (10, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            else:
                                 # Display the prediction on the frame
                cv2.putText(frame, 'detecting excercise', (10, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    # Show the frame in the correctly sized window
    cv2.imshow("Live Prediction", frame)

    # Exit on 'q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup
cap.release()
cv2.destroyAllWindows()
pose.close()        

#### Many features process

In [None]:
# Initialize Mediapipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

xyz_feat = [f'{axis}_{i}' for i in range(33) for axis in ['x', 'y', 'z']]

# Parameters
NUM_FEATURES = 147 
WINDOW_SIZE = 60  # Adjust to your model input size
chosen_model = transform_model
# Video source
video_path = r"\pose_detection\doron_random_filter.mp4"
cap = cv2.VideoCapture(video_path)

# Get the total number of frames for progress calculation
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Initialize a buffer for 60 frames
buffer = []
predicted_label = None

frame_feature_sequences = []

# Video writer setup
output_path = r'pose_detection\process_doron_random_filter.mp4'
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
original_fps = cap.get(cv2.CAP_PROP_FPS)
output_fps = original_fps * 2  # Save at double speed

# Initialize VideoWriter
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, output_fps, (frame_width, frame_height))

# Track the current frame count
current_frame = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break    

    # Process the frame with Mediapipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )
        landmarks = np.array([[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]).flatten()
        coordinates = {}                      
        for idx, landmark in enumerate(results.pose_landmarks.landmark):
            coordinates[mp_pose.PoseLandmark(idx).name] = [landmark.x, landmark.y, landmark.z]       

        body_vecs = get_body_vecs(coordinates)
        # Get the angles
        body_angles = calculate_body_angles(body_vecs)
        # Get the heights
        body_heights = calculate_body_heights(coordinates)

        frame_feature_sequence = list(landmarks) + list(body_angles.values()) + list(body_heights.values())
        frame_feature_sequences.append(frame_feature_sequence)      

        if len(frame_feature_sequences) == WINDOW_SIZE:
                frame_feature_array = np.array(frame_feature_sequences)
                frame_feature_array = np.expand_dims(frame_feature_array, axis=0)
                # Uncomment after loading your model
                prediction = chosen_model.predict(frame_feature_array)
            
                predicted_label = label_encoder.inverse_transform(np.argmax(prediction, axis=1))
                
                frame_feature_sequences = []
        
        # Overlay the prediction on the frame
        if predicted_label:
            cv2.putText(frame, f'Exercise: {predicted_label}', (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        else:
            cv2.putText(frame, 'Detecting exercise...', (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    # Write the frame to the output video
    out.write(frame)

    # Update progress
    current_frame += 1
    progress = (current_frame / total_frames) * 100
    print(f"Processing: {progress:.2f}% complete", end='\r')

# Cleanup
cap.release()
pose.close()     
out.release()

print("\nProcessing complete.")


In [None]:
import cv2
import numpy as np
import mediapipe as mp

# Initialize Mediapipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

xyz_feat = [f'{axis}_{i}' for i in range(33) for axis in ['x', 'y', 'z']]

# Parameters
NUM_FEATURES = 147 
WINDOW_SIZE = 60  # Adjust to your model input size
chosen_model = transform_model
video_path = r"\pose_detection\doron_random_filter.mp4"
cap = cv2.VideoCapture(video_path)

# Limit the number of frames processed to 1000
frame_limit = 1000

# Initialize a buffer for 60 frames
buffer = []
predicted_label = None
frame_feature_sequences = []

# Video writer setup
output_path = r'\pose_detection\process_doron_random_filter_1000.mp4'
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
original_fps = cap.get(cv2.CAP_PROP_FPS)
output_fps = original_fps * 2  # Save at double speed

# Initialize VideoWriter
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, output_fps, (frame_width, frame_height))

# Track the current frame count
current_frame = 0

while cap.isOpened() and current_frame < frame_limit:
    ret, frame = cap.read()
    if not ret:
        break    

    # Process the frame with Mediapipe
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    if results.pose_landmarks:
        mp_drawing.draw_landmarks(
                frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS
            )
        landmarks = np.array([[lm.x, lm.y, lm.z] for lm in results.pose_landmarks.landmark]).flatten()
        coordinates = {}                      
        for idx, landmark in enumerate(results.pose_landmarks.landmark):
            coordinates[mp_pose.PoseLandmark(idx).name] = [landmark.x, landmark.y, landmark.z]       

        body_vecs = get_body_vecs(coordinates)
        body_angles = calculate_body_angles(body_vecs)
        body_heights = calculate_body_heights(coordinates)

        frame_feature_sequence = list(landmarks) + list(body_angles.values()) + list(body_heights.values())
        frame_feature_sequences.append(frame_feature_sequence)      

        if len(frame_feature_sequences) == WINDOW_SIZE:
                frame_feature_array = np.array(frame_feature_sequences)
                frame_feature_array = np.expand_dims(frame_feature_array, axis=0)
                # Uncomment after loading your model
                prediction = chosen_model.predict(frame_feature_array)
            
                predicted_label = label_encoder.inverse_transform(np.argmax(prediction, axis=1))
                
                frame_feature_sequences = []
        
        # Overlay the prediction on the frame
        if predicted_label:
            cv2.putText(frame, f'Exercise: {predicted_label}', (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
        else:
            cv2.putText(frame, 'Detecting exercise...', (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

    # Write the frame to the output video
    out.write(frame)

    # Update progress
    current_frame += 1
    progress = (current_frame / frame_limit) * 100
    print(f"Processing: {progress:.2f}% complete", end='\r')

# Cleanup
cap.release()
pose.close()     
out.release()

print("\nProcessing complete.")
