<a href="https://colab.research.google.com/github/Duaa-Raed/Workout-Performance-Analysis/blob/main/Workout_Performance_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install kaggle API
!pip install kaggle

# Create a directory to store kaggle.json
!mkdir ~/.kaggle

# Upload kaggle.json from your device
from google.colab import files
files.upload()

In [None]:
# Move kaggle.json to the correct folder and set permissions
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Download data from Kaggle
!kaggle datasets download -d hasyimabdillah/workoutfitness-video

# Unzip the data
!unzip workoutfitness-video.zip -d workout_videos

In [None]:
!pip install --upgrade numpy==1.26.4 mediapipe==0.10.14 --force-reinstall

In [None]:
# Import all necessary libraries
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mediapipe as mp
import torch
import torch.nn as nn
import torch.optim as optim
import joblib
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import confusion_matrix, classification_report
from collections import Counter
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR

In [None]:
# Display the first 10 files in the videos folder

files = os.listdir("workout_videos")
print("Number of videos:", len(files))
print("Names of some files:", files[:5])

In [None]:


# List exercise folders inside the main dataset
folders = os.listdir("workout_videos")
exercise_folder = folders[0]  # for example: "russian twist"
print("Selected exercise folder:", exercise_folder)

# List videos inside that exercise folder
path = os.path.join("workout_videos", exercise_folder)
videos = os.listdir(path)
print("Number of videos in this exercise:", len(videos))
print("First few videos:", videos[:5])

# Select the first video
video_path = os.path.join(path, videos[0])
print("Opening video:", video_path)

# Read the first frame from the video
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
cap.release()

if not ret:
    print(" Failed to read the video or video is empty.")
else:
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    plt.imshow(frame)
    plt.title(f"First frame from: {exercise_folder}")
    plt.axis("off")
    plt.show()

In [None]:

# Base folder
base_folder = "workout_videos"

# Select the first exercise folder
exercise_folder = os.path.join(base_folder, os.listdir(base_folder)[0])
print(" Exercise Folder:", exercise_folder)

# Display the first 5 files inside
files_inside = os.listdir(exercise_folder)
print(" Files inside:", files_inside[:5])

# Select the first video inside
video_path = os.path.join(exercise_folder, files_inside[0])
print(" Video Path:", video_path)

# Try to open the video
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
cap.release()

if ret:
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils

    results = pose.process(frame_rgb)

    annotated_image = frame_rgb.copy()
    mp_drawing.draw_landmarks(
        annotated_image,
        results.pose_landmarks,
        mp_pose.POSE_CONNECTIONS
    )

    plt.imshow(annotated_image)
    plt.title("Pose Estimation Example")
    plt.axis("off")
    plt.show()
else:
    print(" Failed to read the video frame — maybe not a video file.")

In [None]:


# Process the same frame we analyzed before
results = pose.process(frame_rgb)

landmarks = []
if results.pose_landmarks:
    for id, lm in enumerate(results.pose_landmarks.landmark):
        landmarks.append({
            "id": id,             # joint id
            "x": lm.x,            # horizontal coordinate
            "y": lm.y,            # vertical coordinate
            "z": lm.z,            # depth (distance from camera)
            "visibility": lm.visibility  # how visible the joint is
        })

df = pd.DataFrame(landmarks)
print(" Pose landmarks extracted successfully!")
print(df.head(10))


In [None]:


mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

data = []  # list to collect all frames' data

base_dir = "workout_videos"  # folder that contains all exercise folders

# Loop through all exercise folders
for exercise_name in os.listdir(base_dir):
    exercise_path = os.path.join(base_dir, exercise_name)

    # Check if it's a folder (not a file)
    if not os.path.isdir(exercise_path):
        continue

    # Loop through all videos in the exercise folder
    for video_file in os.listdir(exercise_path):
        video_path = os.path.join(exercise_path, video_file)
        cap = cv2.VideoCapture(video_path)

        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break  # stop when the video ends

            # Convert colors
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(frame_rgb)

            if results.pose_landmarks:
                # Extract the (x, y, z) for all landmarks
                landmarks = []
                for lm in results.pose_landmarks.landmark:
                    landmarks.extend([lm.x, lm.y, lm.z])

                # Add exercise label + frame data
                landmarks.append(exercise_name)
                data.append(landmarks)

            frame_count += 1

        cap.release()
        print(f" Processed {video_file} from {exercise_name}")

# Create DataFrame
cols = [f"x{i}" for i in range(33)] + [f"y{i}" for i in range(33)] + [f"z{i}" for i in range(33)] + ["label"]
df = pd.DataFrame(data, columns=cols)

# Save to CSV
df.to_csv("pose_landmarks_dataset.csv", index=False)
print("📁 Dataset saved successfully as 'pose_landmarks_dataset.csv'!")
print(df.shape)
df.head()


In [None]:

# Load the dataset
df = pd.read_csv("pose_landmarks_dataset.csv")
print(" Data loaded successfully!")
print(df.shape)
df.head()

In [None]:
# Separate features and labels
X = df.drop("label", axis=1).values
y = df["label"].values

# Encode labels (e.g., squat → 0, push-up → 1, etc.)
le = LabelEncoder()
y = le.fit_transform(y)

# Scale the features (very important for speeding up training)
scaler = StandardScaler()
X = scaler.fit_transform(X)

# Split into training and testing
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(" Data ready for training!")
print("Train shape:", X_train.shape)
print("Test shape:", X_test.shape)

In [None]:
# 🔧 Improved Model
class ExerciseClassifierImproved(nn.Module):
    def __init__(self, input_size, num_classes):
        super(ExerciseClassifierImproved, self).__init__()
        self.fc1 = nn.Linear(input_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.4)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.relu(self.fc3(x))
        x = self.fc4(x)
        return x

input_size = X_train.shape[1]
num_classes = len(np.unique(y))
model = ExerciseClassifierImproved(input_size, num_classes)

In [None]:

# Ensure data is Tensors
if not isinstance(X_train, torch.Tensor):
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.long)
    X_test = torch.tensor(X_test, dtype=torch.float32)
    y_test = torch.tensor(y_test, dtype=torch.long)

# Create DataLoaders for training and testing
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)

In [None]:


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

num_epochs = 80
loss_list = []

best_loss = float('inf')
patience = 5
trigger = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    scheduler.step()
    avg_loss = running_loss / len(train_loader)
    loss_list.append(avg_loss)

    # Early stopping
    if avg_loss < best_loss:
        best_loss = avg_loss
        trigger = 0
    else:
        trigger += 1
        if trigger >= patience:
            print(" Early stopping: Model stopped because performance did not improve") # Translated comment
            break

    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

In [None]:
# Evaluate on test data
# Immediately after training ends
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f" Test Accuracy: {accuracy:.2f}%")

In [None]:
# Evaluate on training data
model.eval()
with torch.no_grad():
    outputs_train = model(X_train)
    _, predicted_train = torch.max(outputs_train, 1)
    correct_train = (predicted_train == y_train).sum().item()
    total_train = y_train.size(0)
    train_accuracy = 100 * correct_train / total_train

print(f"🏋️‍♀️ Train Accuracy: {train_accuracy:.2f}%")

In [None]:

# Make predictions on the test data
with torch.no_grad():

    outputs = model(X_test_tensor)
    _, preds = torch.max(outputs, 1)

# Convert numbers to exercise names
y_true = le.inverse_transform(y_test.cpu().numpy())
y_pred = le.inverse_transform(preds.cpu().numpy())

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred, labels=le.classes_)

# Convert to DataFrame for better readability
cm_df = pd.DataFrame(cm, index=le.classes_, columns=le.classes_)

print(" Confusion Matrix:")
print(cm_df)

# Detailed classification report
print("\n Performance Report:")
print(classification_report(y_true, y_pred))

# Table showing only exercises where the model made mistakes
mistakes = []
for true_label, pred_label in zip(y_true, y_pred):
    if true_label != pred_label:
        mistakes.append((true_label, pred_label))

if mistakes:
    mistake_df = pd.DataFrame(mistakes, columns=["True Exercise", "Model Prediction"])
    print("\n Exercises with mistakes:")
    print(mistake_df.value_counts().reset_index(name="Number of Errors"))
else:
    print(" The model did not make any mistakes!")

In [None]:

#  PLOT LEARNING CURVE
plt.plot(loss_list)
plt.title("Learning Curve")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

In [None]:
from google.colab import files

#  Create a folder to save inside Google Colab
save_dir = "/content/trained_model"
os.makedirs(save_dir, exist_ok=True)

#  Save model and component files
torch.save(model.state_dict(), os.path.join(save_dir, "exercise_model.pth"))
joblib.dump(le, os.path.join(save_dir, "label_encoder.pkl"))
joblib.dump(scaler, os.path.join(save_dir, "scaler.pkl"))

print(" Model and preprocessing objects saved successfully in:", save_dir)

#  Download the files to your local machine
files.download(os.path.join(save_dir, "exercise_model.pth"))
files.download(os.path.join(save_dir, "label_encoder.pkl"))
files.download(os.path.join(save_dir, "scaler.pkl"))

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
!ls -lh /content/

In [None]:


# The name of the video as it appeared to you
video_path = "document_6046617532410893975.mp4"

cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print(" Failed to open the video")
else:
    print(" Video opened successfully")

count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    count += 1

print(f" Number of frames read: {count}")
cap.release()

In [None]:

# Load the tools
le = joblib.load("trained_model/label_encoder.pkl")
scaler = joblib.load("trained_model/scaler.pkl")

# Load the model
input_size = scaler.mean_.shape[0]
num_classes = len(le.classes_)

model = ExerciseClassifierImproved(input_size, num_classes)
model.load_state_dict(torch.load("trained_model/exercise_model.pth", map_location=torch.device('cpu')))
model.eval()

print(" Model loaded successfully!")

# Setup Mediapipe
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Test the video
video_path = "document_6046617532410893975.mp4"
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print(" Failed to open the video")
else:
    print(" Video opened successfully")

predictions = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)

    if results.pose_landmarks:
        landmarks = []
        for lm in results.pose_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])
        landmarks = np.array(landmarks).reshape(1, -1)
        landmarks_scaled = scaler.transform(landmarks)

        with torch.no_grad():
            inputs = torch.tensor(landmarks_scaled, dtype=torch.float32)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            label = le.inverse_transform([predicted.item()])[0]
            predictions.append(label)

cap.release()

if predictions:
    final_label = Counter(predictions).most_common(1)[0][0]
    print(f" The predicted exercise for the video is: {final_label}")
else:
    print(" No clear body points were detected in the video.")