kurze Projektbeschreibung


In [1]:
import cv2
from ultralytics import YOLO
from PIL import Image
import numpy as np
import os
import csv
import pandas as pd


In [2]:
# Load YOLO pose model
model_pose = YOLO('yolo11n-pose.pt')


# Define training exercise and corresponding input and output folders

input_folder = "videos/single_pushup_videos/"
output_folder = "keyjoints/pushups/"

#input_folder = "videos/single_squat_videos/"
#output_folder = "keyjoints/squats/"

#input_folder = "videos/single_pullup_videos/"
#output_folder = "keyjoints/pullups/"

os.makedirs(output_folder, exist_ok=True)


Aus dem input Ordner werden single videos geladen und alle keypoints pro video in einer csv gespeichert. Für jeden Ordner einmal durchführen, dauert jeweils ca. 15-20 min.

In [3]:
# List all video files in the input folder
video_files = [f for f in os.listdir(input_folder) if f.lower().endswith(('.mp4', '.mov'))]

for video_file in video_files:
    # Define corresponding CSV path for the video
    output_csv_path = os.path.join(output_folder, os.path.splitext(video_file)[0] + ".csv")
    
    # Skip if CSV already exists
    if os.path.exists(output_csv_path):
        print(f"⏭️ Skipping {video_file} (CSV already exists)")
        continue

    video_path = os.path.join(input_folder, video_file)
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0
    fps = cap.get(cv2.CAP_PROP_FPS)

    output_rows = []

    # Create column names: 17 keypoints × (x, y, confidence)
    xy_headers = [f"kp_{i}_{coord}" for i in range(17) for coord in ("x", "y")]
    conf_headers = [f"kp_{i}_conf" for i in range(17)]
    columns = ["frame", "time_sec"] + xy_headers + conf_headers

    # Process each frame in the video
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        timestamp = frame_idx / fps
        results = model_pose.predict(source=frame, save=False, conf=0.25, verbose=False)

        for result in results:
            flattened_xy = [np.nan] * (17 * 2)
            confidences = [np.nan] * 17

            try:
                keypoints = result.keypoints.xy[0].cpu().numpy()
                confidences = result.keypoints.conf[0].cpu().numpy()

                flattened_xy = []
                for i in range(17):
                    # Skip low-confidence points
                    if i >= len(confidences) or confidences[i] < 0.2:
                        x, y = np.nan, np.nan
                    else:
                        x, y = keypoints[i]
                    flattened_xy.extend([x, y])
            except:
                pass  # use pre-filled NaNs if prediction failed

            output_rows.append([frame_idx, timestamp] + flattened_xy + confidences.tolist())

        frame_idx += 1

    cap.release()

    # Create DataFrame and replace zeros with NaNs
    df_out = pd.DataFrame(output_rows, columns=columns)
    keypoint_cols = [col for col in df_out.columns if col.startswith("kp_")]
    df_out[keypoint_cols] = df_out[keypoint_cols].replace(0.0, np.nan)

    # Ensure output directory exists and save CSV
    os.makedirs(output_folder, exist_ok=True)
    df_out.to_csv(output_csv_path, index=False, na_rep="NaN")
    print(f"✅ Saved: {output_csv_path}")


⏭️ Skipping single_pushup_fabi1_1.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random4.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random5.mov (CSV already exists)
⏭️ Skipping single_pushup_fabi1_2.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_fabi1_8.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random7.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random6.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_fabi1_9.mov (CSV already exists)
⏭️ Skipping single_pushup_fabi1_3.mov (CSV already exists)
⏭️ Skipping single_pushup_fabi1_7.mov (CSV already exists)
⏭️ Skipping single_pushup_random8.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random2.mov (CSV already exists)
⏭️ Skipping mirrored_single_pushup_random3.mov (CSV already exists)
⏭️ Skipping single_pushup_random9.mov (CSV already exists)
⏭️ Skipping single_pushup_fabi1_6.mov (CSV already exists)
⏭️ Skipping single_pushup_fabi1_4.mov (CSV 

Sampling (alt und neu --> testen, was besser oder gleich??)

Iterate through all csv files from the single exercise videos and extract sampled csv files with only six frames describing an exercise.

In [4]:
# Robust sampling of 6 valid frames from a CSV containing keypoints
def sample_csv(input_csv_path, output_csv_path, num_samples=6, min_valid_keypoints=30):
    if os.path.exists(output_csv_path):
        print(f"⏭️ Skipping {output_csv_path} (already exists)")
        return

    df = pd.read_csv(input_csv_path)
    total_frames = len(df)

    # Skip if not enough frames in the CSV
    if total_frames < num_samples + 2:
        print(f"⚠️ Not enough frames in {input_csv_path} ({total_frames} < {num_samples + 2}) — skipped.")
        return

    # Exclude first and last frames to avoid unstable keypoint data
    df = df.iloc[1:-1].reset_index(drop=True)

    # Check for frame validity: count non-NaN keypoint values (excluding 'frame' and 'time_sec')
    keypoint_cols = df.columns[2:]  # assumes first two columns are frame index and timestamp
    df['valid_kp_count'] = df[keypoint_cols].notna().sum(axis=1)

    # Filter out frames with too many missing values
    df_valid = df[df['valid_kp_count'] >= min_valid_keypoints].drop(columns='valid_kp_count')

    if len(df_valid) < num_samples:
        print(f"⚠️ Not enough valid frames in {input_csv_path} ({len(df_valid)} valid) — skipped.")
        return

    # Evenly sample from valid frames
    sampled_idxs = np.linspace(0, len(df_valid) - 1, num_samples, dtype=int)
    df_sampled = df_valid.iloc[sampled_idxs].reset_index(drop=True)

    df_sampled.to_csv(output_csv_path, index=False)
    print(f"✅ Saved: {output_csv_path}")

# Input and output folders based on your project structure
base_input_dir = "keyjoints"
base_output_dir = "keyjoints_sampled"
exercise_types = ["pushups", "squats", "pullups"]

# Loop through each exercise type
for exercise in exercise_types:
    input_folder = os.path.join(base_input_dir, exercise)
    output_folder = os.path.join(base_output_dir, f"{exercise}_sampled")
    os.makedirs(output_folder, exist_ok=True)

    csv_files = [f for f in os.listdir(input_folder) if f.endswith(".csv")]

    for csv_file in csv_files:
        input_csv = os.path.join(input_folder, csv_file)
        output_csv = os.path.join(output_folder, csv_file)
        sample_csv(input_csv, output_csv)


⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_2.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/mirrored_single_pushup_fabi2_8.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/mirrored_single_pushup_fabi2_9.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_3.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_1.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_4.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_5.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_7.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi2_6.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/mirrored_single_pushup_random6.csv (already exists)
⏭️ Skipping keyjoints_sampled/pushups_sampled/single_pushup_fabi1_3.csv (already ex

Aus den samples wird ein feature dataset kreiert, das alle keyjoints x und y und berechnete joint features für jede Übung labelt.

In [5]:
# feature functions

def angle_between_points(p1, p2, p3):
    """Compute the angle at p2 between points p1 and p3."""
    a = np.array([p1[0] - p2[0], p1[1] - p2[1]])
    b = np.array([p3[0] - p2[0], p3[1] - p2[1]])
    if np.any(np.isnan(a)) or np.any(np.isnan(b)) or np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
        return np.nan
    cos_angle = np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    return np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))

def euclidean_distance(p1, p2):
    if np.any(np.isnan(p1)) or np.any(np.isnan(p2)):
        return np.nan
    return np.linalg.norm(np.array(p1) - np.array(p2))

def compute_joint_features(df):
    features = []

    for _, row in df.iterrows():
        kp = lambda i: (row[f"kp_{i}_x"], row[f"kp_{i}_y"])

        feat = {
            "feat_elbow_angle_L": angle_between_points(kp(5), kp(7), kp(9)),   # Shoulder–Elbow–Wrist
            "feat_elbow_angle_R": angle_between_points(kp(6), kp(8), kp(10)),
            "feat_knee_angle_L": angle_between_points(kp(11), kp(13), kp(15)), # Hip–Knee–Ankle
            "feat_knee_angle_R": angle_between_points(kp(12), kp(14), kp(16)),
            "feat_hip_angle_L": angle_between_points(kp(5), kp(11), kp(13)),   # Shoulder–Hip–Knee
            "feat_hip_angle_R": angle_between_points(kp(6), kp(12), kp(14)),
            "feat_shoulder_width": euclidean_distance(kp(5), kp(6)),
            "feat_hip_to_wrist_L": euclidean_distance(kp(11), kp(9)),
            "feat_hip_to_wrist_R": euclidean_distance(kp(12), kp(10)),
            "feat_knee_to_ankle_L": euclidean_distance(kp(13), kp(15)),
            "feat_knee_to_ankle_R": euclidean_distance(kp(14), kp(16)),
            "feat_hip_y": row["kp_11_y"],  # raw y of left hip as proxy for height
        }
        features.append(feat)

    return pd.DataFrame(features)

def drop_mostly_empty_columns(df, threshold=0.8):
    """Drop columns with more than `threshold` proportion of NaNs."""
    nan_ratio = df.isna().mean()
    drop_cols = nan_ratio[nan_ratio > threshold].index.tolist()
    print(f"🧹 Dropping {len(drop_cols)} mostly empty columns.")
    return df.drop(columns=drop_cols)


In [9]:
from sklearn.impute import SimpleImputer

# Input base folder
base_input_dir = "keyjoints_sampled"
exercise_labels = {
    "pushups_sampled": 0,
    "squats_sampled": 1,
    "pullups_sampled": 2
}

X = []
y = []
column_names = None

# Loop over each label type (exercise class)
for folder_name, label in exercise_labels.items():
    folder_path = os.path.join(base_input_dir, folder_name)
    if not os.path.exists(folder_path):
        print(f"❌ Folder not found: {folder_path}")
        continue

    for file in os.listdir(folder_path):
        if not file.endswith(".csv"):
            continue
        file_path = os.path.join(folder_path, file)
        df = pd.read_csv(file_path)

        # Extract keypoint columns
        keypoint_cols = [col for col in df.columns if col.startswith("kp_")]
        kp_data = df[keypoint_cols]

        # Compute engineered features
        df_features = compute_joint_features(df)

        # Combine keypoints and features
        combined_data = pd.concat([kp_data, df_features], axis=1)
        
        # Save column names once
        # Save column names once
        if column_names is None:
            feature_cols = combined_data.columns.tolist()
            num_frames = combined_data.shape[0]
            column_names = [f"{col}_f{frame}" for frame in range(num_frames) for col in feature_cols]

        # Flatten and impute
        flat = combined_data.to_numpy().flatten().reshape(1, -1)
        imputer = SimpleImputer(strategy="mean")
        flat_imputed = imputer.fit_transform(flat)

        X.append(flat_imputed.flatten())
        y.append(label)

print(f"✅ Total samples processed: {len(X)}")

# Save dataset and inspect missingness
X_df = pd.DataFrame(X, columns=column_names)
X_df["label"] = y

# Drop mostly empty columns
def drop_mostly_empty_columns(df, threshold=0.8):
    na_ratio = df.isna().mean()
    drop_cols = na_ratio[na_ratio > threshold].index
    print(f"🧹 Dropping {len(drop_cols)} mostly empty columns.")
    return df.drop(columns=drop_cols)

X_df = drop_mostly_empty_columns(X_df, threshold=0.8)

# Show NaN stats
print("\n🔎 Top 10 columns with highest NaN ratio:")
print(X_df.isna().mean().sort_values(ascending=False).head(10))

# Save used column names
columns_used = X_df.drop(columns=["label"]).columns.tolist()
with open("used_feature_columns.txt", "w") as f:
    for col in columns_used:
        f.write(f"{col}\n")
print("✅ Saved column names to used_feature_columns.txt")

# Save full dataset
X_df.to_csv("feature_dataset.csv", index=False)
print("✅ Saved feature matrix to: feature_dataset.csv")




✅ Total samples processed: 264
🧹 Dropping 12 mostly empty columns.

🔎 Top 10 columns with highest NaN ratio:
kp_16_conf_f5    0.776515
kp_15_conf_f5    0.776515
kp_14_conf_f5    0.719697
kp_13_conf_f5    0.719697
kp_12_conf_f5    0.685606
kp_11_conf_f5    0.685606
kp_10_conf_f5    0.647727
kp_9_conf_f5     0.647727
kp_8_conf_f5     0.617424
kp_7_conf_f5     0.617424
dtype: float64
✅ Saved column names to used_feature_columns.txt
✅ Saved feature matrix to: feature_dataset.csv


Train exercise recognition model with sklearn using previous generated feature dataset.

In [10]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
import joblib

# Load your dataset
df = pd.read_csv("feature_dataset.csv") 

# Split into features and labels
X = df.drop(columns=["label"])
y = df["label"]

# Train/test split (z. B. 80% Training, 20% Test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train Random Forest
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# Evaluate
y_pred = clf.predict(X_test)
print("✅ Classification report:\n", classification_report(y_test, y_pred))
print("🧩 Confusion matrix:\n", confusion_matrix(y_test, y_pred))

# Optional: Save model
joblib.dump(clf, "exercise_classifier.pkl")
print("💾 Model saved as: exercise_classifier.pkl")


✅ Classification report:
               precision    recall  f1-score   support

           0       1.00      1.00      1.00        17
           1       1.00      1.00      1.00        17
           2       1.00      1.00      1.00        19

    accuracy                           1.00        53
   macro avg       1.00      1.00      1.00        53
weighted avg       1.00      1.00      1.00        53

🧩 Confusion matrix:
 [[17  0  0]
 [ 0 17  0]
 [ 0  0 19]]
💾 Model saved as: exercise_classifier.pkl


In [11]:
# zusätzliche Validierung: Cross-Validation

from sklearn.model_selection import cross_val_score

# Cross-validation auf dem gesamten Datensatz
cv_scores = cross_val_score(clf, X, y, cv=5)  # z. B. 5-Fold

print("📊 Cross-validation scores:", cv_scores)
print("📈 Mean accuracy:", round(cv_scores.mean(), 4))
print("📉 Standard deviation:", round(cv_scores.std(), 4))


📊 Cross-validation scores: [          1     0.96226           1           1     0.94231]
📈 Mean accuracy: 0.9809
📉 Standard deviation: 0.0242
