In [13]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Attention, LayerNormalization, Embedding
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

# Load JSON data
with open('pose_data.json', 'r') as f:
    data = json.load(f)

# Mapping exercise labels to numerical values
exercise_labels = {exercise: idx for idx, exercise in enumerate(data.keys())}

# Prepare data arrays
X, y = [], []
sequence_length = 30
num_joints = len(next(iter(data.values()))[0]["landmarks"])  # Get the number of joints

for exercise, frames in data.items():
    for i in range(0, len(frames), sequence_length):
        sequence = []
        for frame_data in frames[i:i+sequence_length]:
            landmarks = []
            for joint in frame_data["landmarks"]:
                landmarks.extend([
                    frame_data["landmarks"][joint]["x"],
                    frame_data["landmarks"][joint]["y"],
                    frame_data["landmarks"][joint]["z"],
                    frame_data["landmarks"][joint]["visibility"]
                ])
            sequence.append(landmarks)
        if len(sequence) == sequence_length:
            X.append(sequence)
            y.append(exercise_labels[exercise])

# Convert lists to numpy arrays
X = np.array(X)
y = np.array(y)

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
from tensorflow.keras.layers import MultiHeadAttention, Bidirectional, LSTM, Input, LayerNormalization, Dense, Dropout, Add
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam

# Define the input shape
input_shape = (X_train.shape[1], X_train.shape[2])  # (sequence_length, num_joints * 4)

# Define the input layer
input_layer = Input(shape=input_shape)

# First Bidirectional LSTM Layer
x = Bidirectional(LSTM(256, return_sequences=True))(input_layer)
x = LayerNormalization()(x)  # Layer Normalization for stable training

# MultiHeadAttention Layer with residual connection
attention_output = MultiHeadAttention(num_heads=8, key_dim=64)(x, x)
x = Add()([x, attention_output])  # Residual connection
x = LayerNormalization()(x)

# Second Bidirectional LSTM Layer
x = Bidirectional(LSTM(128, return_sequences=True))(x)
x = Dropout(0.3)(x)  # Dropout layer for regularization

# Another Layer of MultiHeadAttention with residual connection
attention_output = MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
x = Add()([x, attention_output])  # Residual connection
x = LayerNormalization()(x)

# Final Bidirectional LSTM Layer
x = Bidirectional(LSTM(64))(x)
x = Dropout(0.3)(x)

# Dense layer with ReLU activation before the output layer
x = Dense(64, activation='relu')(x)
x = Dropout(0.3)(x)

# Output layer for classification
output_layer = Dense(len(exercise_labels), activation='softmax')(x)

# Define the model
model = Model(inputs=input_layer, outputs=output_layer)

# Compile the model with a custom learning rate
optimizer = Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Model summary to check the architecture
model.summary()

# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 40ms/step - accuracy: 0.3201 - loss: 2.3242 - val_accuracy: 0.4341 - val_loss: 1.8090
Epoch 2/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 35ms/step - accuracy: 0.5034 - loss: 1.5356 - val_accuracy: 0.5995 - val_loss: 1.3929
Epoch 3/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 57ms/step - accuracy: 0.5368 - loss: 1.4757 - val_accuracy: 0.5269 - val_loss: 1.4415
Epoch 4/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 80ms/step - accuracy: 0.5633 - loss: 1.3657 - val_accuracy: 0.4583 - val_loss: 1.7379
Epoch 5/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 99ms/step - accuracy: 0.4983 - loss: 1.5601 - val_accuracy: 0.4610 - val_loss: 1.7134
Epoch 6/50
[1m93/93[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 116ms/step - accuracy: 0.4985 - loss: 1.5373 - val_accuracy: 0.5444 - val_loss: 1.5809
Epoch 7/50
[1m93/93[0m [32m━

In [43]:
# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 36ms/step - accuracy: 0.5865 - loss: 1.2938
Test Loss: 1.291059136390686
Test Accuracy: 0.5887096524238586


In [2]:
import cv2
import mediapipe as mp
import numpy as np

# Initialize MediaPipe Pose Estimator
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Function to process each frame and extract pose landmarks
def extract_pose_landmarks_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert frame to RGB (MediaPipe uses RGB)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process frame with MediaPipe Pose
        results = pose.process(frame_rgb)
        
        if results.pose_landmarks:
            # Extract landmarks for each joint
            landmarks = []
            for landmark in results.pose_landmarks.landmark:
                landmarks.append({
                    'x': landmark.x,
                    'y': landmark.y,
                    'z': landmark.z,
                    'visibility': landmark.visibility
                })
            frames.append(landmarks)
    
    cap.release()
    return frames

# Extract frames and pose landmarks from the video
video_path = 'example_2.mp4'
frames = extract_pose_landmarks_from_video(video_path)

In [3]:
# Assuming 'sequence_length' and 'exercise_labels' are already defined
sequence_length = 30  # Length of the sequence to input to the model
num_joints = 33  # Update this based on the number of joints in MediaPipe (or your dataset)

def prepare_sequence_data(frames, sequence_length):
    X = []
    # Split frames into sequences of length 'sequence_length'
    for i in range(0, len(frames) - sequence_length + 1, sequence_length):
        sequence = []
        for frame_data in frames[i:i+sequence_length]:
            landmarks = []
            for joint in frame_data:
                landmarks.extend([joint["x"], joint["y"], joint["z"], joint["visibility"]])
            sequence.append(landmarks)
        X.append(sequence)
    return np.array(X)

# Prepare data for prediction
X_video = prepare_sequence_data(frames, sequence_length)

In [4]:
# Predict exercise using the trained model
predictions = model.predict(X_video)

# Convert predictions to exercise labels
predicted_labels = np.argmax(predictions, axis=1)

# Map predicted labels back to exercise names
exercise_names = {idx: exercise for exercise, idx in exercise_labels.items()}
predicted_exercise_name = exercise_names[predicted_labels[0]]  # Assuming one sequence is enough

print(f"Predicted exercise: {predicted_exercise_name}")

NameError: name 'model' is not defined

In [None]:
# Save the trained model to a file
model.save('exercise_model.h5')



In [6]:
from tensorflow.keras.models import load_model

# Load the saved model
model = load_model('exercise_model.h5')



In [14]:
import cv2
import mediapipe as mp
import numpy as np
from tensorflow.keras.models import load_model

# Load the saved model
model = load_model('exercise_model.h5')

# Initialize MediaPipe Pose Estimator
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Function to extract pose landmarks from a video
def extract_pose_landmarks_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert frame to RGB (MediaPipe uses RGB)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process frame with MediaPipe Pose
        results = pose.process(frame_rgb)
        
        if results.pose_landmarks:
            # Extract landmarks for each joint
            landmarks = []
            for landmark in results.pose_landmarks.landmark:
                landmarks.append({
                    'x': landmark.x,
                    'y': landmark.y,
                    'z': landmark.z,
                    'visibility': landmark.visibility
                })
            frames.append(landmarks)
    
    cap.release()
    return frames

# Prepare data for the model (same as during training)
def prepare_sequence_data(frames, sequence_length):
    X = []
    for i in range(0, len(frames) - sequence_length + 1, sequence_length):
        sequence = []
        for frame_data in frames[i:i+sequence_length]:
            landmarks = []
            for joint in frame_data:
                landmarks.extend([joint["x"], joint["y"], joint["z"], joint["visibility"]])
            sequence.append(landmarks)
        X.append(sequence)
    return np.array(X)

# Map predicted labels back to exercise names
exercise_names = {idx: exercise for exercise, idx in exercise_labels.items()}

# Video path
video_path = 'example_4.mp4'

# Extract pose data from the video
frames = extract_pose_landmarks_from_video(video_path)

# Prepare data for prediction
sequence_length = 30  # Same as used during training
X_video = prepare_sequence_data(frames, sequence_length)

# Make predictions
predictions = model.predict(X_video)

# Get the predicted labels (exercise categories)
predicted_labels = np.argmax(predictions, axis=1)

# Get the exercise name for the first prediction (or average over all predictions if needed)
predicted_exercise_name = exercise_names[predicted_labels[0]]

print(f"Predicted exercise: {predicted_exercise_name}")



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 553ms/step
Predicted exercise: plank


In [None]:
import cv2
import mediapipe as mp
import numpy as np
import json
from tensorflow.keras.models import load_model
from tqdm import tqdm  # Import tqdm for progress bars

# Load the saved model
model = load_model('exercise_model.h5')

# Initialize MediaPipe Pose Estimator
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Function to extract pose landmarks from a video
def extract_pose_landmarks_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    # Initialize progress bar for video frames
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    with tqdm(total=total_frames, desc="Processing frames", ncols=100) as pbar:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            # Convert frame to RGB (MediaPipe uses RGB)
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Process frame with MediaPipe Pose
            results = pose.process(frame_rgb)
            
            if results.pose_landmarks:
                # Extract landmarks for each joint
                landmarks = []
                for landmark in results.pose_landmarks.landmark:
                    landmarks.append({
                        'x': landmark.x,
                        'y': landmark.y,
                        'z': landmark.z,
                        'visibility': landmark.visibility
                    })
                frames.append(landmarks)
            
            # Update progress bar
            pbar.update(1)
    
    cap.release()
    return frames

# Prepare data for the model (same as during training)
def prepare_sequence_data(frames, sequence_length):
    X = []
    for i in range(0, len(frames) - sequence_length + 1, sequence_length):
        sequence = []
        for frame_data in frames[i:i+sequence_length]:
            landmarks = []
            for joint in frame_data:
                landmarks.extend([joint["x"], joint["y"], joint["z"], joint["visibility"]])
            sequence.append(landmarks)
        X.append(sequence)
    return np.array(X)

# Load landmarks from JSON file
def load_json_landmarks(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # Debugging: Print the structure of the JSON to ensure it's correct
    print(json.dumps(data, indent=4))  # Pretty print JSON
    
    processed_data = {}
    for exercise_name, frames in data.items():
        exercise_frames = []
        for frame in frames:
            landmarks_flat = []
            # Debugging: Check the structure of the 'landmarks' field
            if isinstance(frame, dict) and "landmarks" in frame:
                for joint in frame["landmarks"]:
                    landmarks_flat.extend([joint["x"], joint["y"], joint["z"], joint["visibility"]])
                exercise_frames.append(landmarks_flat)
            else:
                print(f"Invalid frame structure: {frame}")
        processed_data[exercise_name] = exercise_frames
    return processed_data

# Calculate Mean Squared Error between two sets of landmarks
def calculate_mse(video_landmarks, json_landmarks):
    video_landmarks = np.array(video_landmarks)
    json_landmarks = np.array(json_landmarks)

    # Ensure both landmarks have the same shape
    if video_landmarks.shape != json_landmarks.shape:
        print(f"Video landmarks shape: {video_landmarks.shape}")
        print(f"JSON landmarks shape: {json_landmarks.shape}")
        raise ValueError("Landmark shapes do not match!")

    mse = np.mean(np.square(video_landmarks - json_landmarks))
    return mse

# Video path
video_path = 'example_1.mp4'

# Load JSON landmarks
json_path = 'pose_data.json'  # Path to your JSON file
json_data = load_json_landmarks(json_path)

# Extract pose data from the video
frames = extract_pose_landmarks_from_video(video_path)

# Prepare data for prediction
sequence_length = 30  # Same as used during training
X_video = prepare_sequence_data(frames, sequence_length)

# Make predictions
if X_video.shape[0] > 0:
    predictions = model.predict(X_video)

    # Get the predicted labels (exercise categories)
    predicted_labels = np.argmax(predictions, axis=1)

    # Get the exercise name for the first prediction (assuming exercise names are known)
    exercise_name = list(json_data.keys())[predicted_labels[0]]  # Map to exercise name
    print(f"Predicted exercise: {exercise_name}")

    # Extract corresponding JSON landmarks for the predicted exercise
    json_landmarks_sequence = json_data[exercise_name]  # Get landmarks for the predicted exercise

    # Ensure we have the same number of frames in both video and JSON
    num_frames = min(len(X_video), len(json_landmarks_sequence))
    
    # Prepare to calculate MSE for each frame
    total_mse = 0
    with tqdm(total=num_frames, desc="Calculating MSE", ncols=100) as pbar:
        for i in range(num_frames):
            video_landmarks = X_video[i]  # Get the ith sequence of landmarks
            json_landmarks_flat = json_landmarks_sequence[i]  # Get the ith set of landmarks

            # Calculate MSE for the current frame
            try:
                loss = calculate_mse(video_landmarks, json_landmarks_flat)
                total_mse += loss
                print(f"Mean Squared Error for frame {i}: {loss}")
            except ValueError as e:
                print(e)

            # Update progress bar
            pbar.update(1)

    # Calculate average MSE over all frames
    average_mse = total_mse / num_frames if num_frames > 0 else 0
    print(f"Average Mean Squared Error over {num_frames} frames: {average_mse}")
else:
    print("No video data available for prediction.")



In [None]:
import json
import numpy as np

# Function to load and process JSON landmarks
def load_json_landmarks(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    
    # Process the data
    processed_data = {}
    for exercise_name, frames in data.items():
        exercise_frames = []
        for frame in frames:
            landmarks_flat = []
            
            # Flatten the 'landmarks' dictionary into a single list
            if isinstance(frame, dict) and "landmarks" in frame:
                landmarks = frame["landmarks"]
                for joint_name, joint_data in landmarks.items():
                    # For each joint, extract the x, y, z, and visibility values
                    landmarks_flat.extend([
                        joint_data["x"],
                        joint_data["y"],
                        joint_data["z"],
                        joint_data["visibility"]
                    ])
                exercise_frames.append(landmarks_flat)
            else:
                print(f"Invalid frame structure: {frame}")
        
        processed_data[exercise_name] = exercise_frames
    
    return processed_data

# Example usage
json_path = 'pose_data.json'  # Path to your JSON file
json_data = load_json_landmarks(json_path)

# Example of the structure of the loaded data
print(json.dumps(json_data, indent=4))  # Pretty print JSON for debugging