In [17]:
# Essential Imports
import cv2
import numpy as np
import os
import time
import torch
import pathlib
import math
import re
import matplotlib.pyplot as plt
from SimpleHRNet import SimpleHRNet
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from scipy.signal import find_peaks
import scipy.ndimage

In [18]:
MODEL_PATH = pathlib.Path("C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/simple-HRNet/weights/pose_hrnet_w48_256x192.pth") # Path to HRNet weights
BASELINE_ROOT_DIR = "C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/baselines2"
VIDEO_TO_RECOGNIZE_PATH = "C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/test_videos/test_excercise.mp4" # Video file to analyze

EXPECTED_JOINT_COUNT = 17 
JOINT_NAMES_LIST = ["Nose","Left Eye","Right Eye","Left Ear","Right Ear","Left Shoulder",
                    "Right Shoulder","Left Elbow","Right Elbow","Left Wrist","Right Wrist",
                    "Left Hip","Right Hip","Left Knee","Right Knee","Left Ankle","Right Ankle"]

TARGET_REP_LENGTH = 100      # Fixed number of frames for timenormalized sequences
DTW_DISTANCE_THRESHOLD = 500.0 # Max DTW distance to consider a match

DEFAULT_SMOOTHING_WINDOW = 5#2 #5   # Frames for moving average smoothing
DEFAULT_PEAK_PROMINENCE = 0.05#0.01 #0.05 # Relative prominence for peak detection (0.0 to 1.0)
DEFAULT_PEAK_DISTANCE = 10#5#10     # Minimum frames between detected peaks/reps

TUNING_PROMINENCE_RANGE = [0.02, 0.04, 0.06, 0.08, 0.10] # Example range
TUNING_DISTANCE_RANGE = [5, 8, 10, 12, 15, 20]      # Example range

print("Constants and configuration set (including tuning ranges).")

print("Constants and configuration set.")
print(f"Baseline Directory: {BASELINE_ROOT_DIR}")
print(f"Model location: {MODEL_PATH}")
print(f"Video to Recognize: {VIDEO_TO_RECOGNIZE_PATH}")
print(f"DTW Threshold: {DTW_DISTANCE_THRESHOLD}")

Constants and configuration set (including tuning ranges).
Constants and configuration set.
Baseline Directory: C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/baselines2
Model location: C:\Users\michel.marien_icarew\Documents\Privé\Opleiding\Mastervakken\Deep Neural Networks\Opdacht 2\simple-HRNet\weights\pose_hrnet_w48_256x192.pth
Video to Recognize: C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/test_videos/test_excercise.mp4
DTW Threshold: 500.0


In [19]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {DEVICE}")

HRNET_MODEL = None
try:
    HRNET_MODEL = SimpleHRNet(c=48, 
                              nof_joints=EXPECTED_JOINT_COUNT,
                              checkpoint_path=MODEL_PATH,
                              device=DEVICE,
                              multiperson=False)
    print("SimpleHRNet model loaded successfully.")
except NameError:
    print("Error: SimpleHRNet class not found. Check import in Cell 1.")
except FileNotFoundError:
    print(f"Error: Model checkpoint file not found at {MODEL_PATH}")
except Exception as e:
    print(f"Error loading SimpleHRNet model: {e}")


Using device: cpu
device: 'cpu'
SimpleHRNet model loaded successfully.


In [22]:
def get_keypoint(frame_kps, kp_index):
    """Safely get keypoint coordinates (x, y) from a frame's keypoint array."""
    if frame_kps is not None and kp_index < frame_kps.shape[0]:
        coords = frame_kps[kp_index, :2] # Take only x, y
        if not np.isnan(coords).any():
            return coords
    return None

def calculate_angle(p1, p2, p3):
    """Calculates the angle (in degrees) at p2 formed by p1p2p3."""
    if p1 is None or p2 is None or p3 is None: return None
    v1, v2 = np.array(p1) - np.array(p2), np.array(p3) - np.array(p2)
    mag1, mag2 = np.linalg.norm(v1), np.linalg.norm(v2)
    if mag1 * mag2 == 0: return None
    cos_angle = np.clip(np.dot(v1, v2) / (mag1 * mag2), 1.0, 1.0)
    return np.degrees(np.arccos(cos_angle))

def handle_nan_values(sequence):
    """Handles NaN values in a pose sequence (frames, coords). Removes frames containing any NaNs."""
    if sequence is None or sequence.ndim != 2 or sequence.shape[0] == 0:
        return np.empty((0, sequence.shape[1] if sequence is not None and sequence.ndim == 2 else 0))
    valid_frames_mask = ~np.isnan(sequence).any(axis=1)
    return sequence[valid_frames_mask]

def time_normalize_sequence(sequence, target_length):
    """Resamples a pose sequence (frames, coords) to a target length using linear interpolation."""
    if sequence is None or sequence.shape[0] < 2:
        num_coords = sequence.shape[1] if sequence is not None and sequence.ndim == 2 else EXPECTED_JOINT_COUNT * 2
        return np.full((target_length, num_coords), np.nan)

    num_frames, num_coords = sequence.shape
    original_indices = np.linspace(0, num_frames - 1, num_frames)
    target_indices = np.linspace(0, num_frames - 1, target_length)
    normalized_sequence = np.zeros((target_length, num_coords))

    for j in range(num_coords):
        valid_mask = ~np.isnan(sequence[:, j])
        if np.sum(valid_mask) < 2:
            normalized_sequence[:, j] = np.nan
        else:
            try:
                normalized_sequence[:, j] = np.interp(target_indices, original_indices[valid_mask], sequence[valid_mask, j])
            except Exception:
                normalized_sequence[:, j] = np.nan

    nan_mask = np.isnan(normalized_sequence)
    if np.any(nan_mask):
        idx = np.where(~nan_mask, np.arange(nan_mask.shape[0])[:, None], 0)
        np.maximum.accumulate(idx, axis=0, out=idx)
        normalized_sequence[nan_mask] = normalized_sequence[idx[nan_mask], np.nonzero(nan_mask)[1]]
        nan_mask = np.isnan(normalized_sequence)
        if np.any(nan_mask):
            idx = np.where(~nan_mask, np.arange(nan_mask.shape[0])[:, None], nan_mask.shape[0] - 1)
            idx = np.minimum.accumulate(idx[::1], axis=0)[::1]
            normalized_sequence[nan_mask] = normalized_sequence[idx[nan_mask], np.nonzero(nan_mask)[1]]

    return np.nan_to_num(normalized_sequence, nan=0.0)

print("Helper functions defined.")

Helper functions defined.


In [34]:
def get_keypoint(frame_kps, kp_index):
    """Safely get keypoint coordinates (x, y) from a frame's keypoint array."""
    if frame_kps is not None and kp_index < frame_kps.shape[0]:
        coords = frame_kps[kp_index, :2]
        if not np.isnan(coords).any():
            return coords
    return None

def calculate_angle(p1, p2, p3):
    """Calculates the angle (in degrees) at p2 formed by p1-p2-p3."""
    if p1 is None or p2 is None or p3 is None: return None
    # Corrected vector calculation
    v1, v2 = np.array(p1) - np.array(p2), np.array(p3) - np.array(p2)
    mag1, mag2 = np.linalg.norm(v1), np.linalg.norm(v2)
    if mag1 * mag2 == 0: return None
    cos_angle = np.clip(np.dot(v1, v2) / (mag1 * mag2), -1.0, 1.0)
    return np.degrees(np.arccos(cos_angle))

def handle_nan_values(sequence):
    """Handles NaN values in a pose sequence (frames, coords). Removes frames containing any NaNs."""
    if sequence is None or sequence.ndim != 2 or sequence.shape[0] == 0:
        return np.empty((0, sequence.shape[1] if sequence is not None and sequence.ndim == 2 else 0))
    valid_frames_mask = ~np.isnan(sequence).any(axis=1)
    return sequence[valid_frames_mask]

def time_normalize_sequence(sequence, target_length):
    """Resamples a pose sequence (frames, coords) to a target length using linear interpolation."""
    if sequence is None or sequence.shape[0] < 2:
        num_coords = sequence.shape[1] if sequence is not None and sequence.ndim == 2 else EXPECTED_JOINT_COUNT * 2
        return np.full((target_length, num_coords), np.nan)

    num_frames, num_coords = sequence.shape
    original_indices = np.linspace(0, num_frames - 1, num_frames)
    target_indices = np.linspace(0, num_frames - 1, target_length)
    normalized_sequence = np.zeros((target_length, num_coords))

    for j in range(num_coords):
        valid_mask = ~np.isnan(sequence[:, j])
        if np.sum(valid_mask) < 2:
            normalized_sequence[:, j] = np.nan
        else:
            try:
                normalized_sequence[:, j] = np.interp(target_indices, original_indices[valid_mask], sequence[valid_mask, j])
            except Exception:
                normalized_sequence[:, j] = np.nan

    nan_mask = np.isnan(normalized_sequence)
    if np.any(nan_mask):
        idx_f = np.where(~nan_mask, np.arange(nan_mask.shape[0])[:, None], 0)
        np.maximum.accumulate(idx_f, axis=0, out=idx_f)
        valid_f_source_idx = idx_f[nan_mask] < normalized_sequence.shape[0]
        nan_rows, nan_cols = np.nonzero(nan_mask)
        valid_nan_rows = nan_rows[valid_f_source_idx]
        valid_nan_cols = nan_cols[valid_f_source_idx]
        valid_source_indices = idx_f[valid_nan_rows, valid_nan_cols] # Use valid indices only
        normalized_sequence[valid_nan_rows, valid_nan_cols] = normalized_sequence[valid_source_indices, valid_nan_cols]

        # Backward fill
        nan_mask = np.isnan(normalized_sequence)
        if np.any(nan_mask):
             idx_b = np.where(~nan_mask, np.arange(nan_mask.shape[0])[:, None], nan_mask.shape[0] - 1)
             idx_b = np.minimum.accumulate(idx_b[::-1], axis=0)[::-1]
             valid_b_source_idx = idx_b[nan_mask] >= 0
             nan_rows, nan_cols = np.nonzero(nan_mask)
             valid_nan_rows = nan_rows[valid_b_source_idx]
             valid_nan_cols = nan_cols[valid_b_source_idx]
             valid_source_indices = idx_b[valid_nan_rows, valid_nan_cols] # Use valid indices only
             normalized_sequence[valid_nan_rows, valid_nan_cols] = normalized_sequence[valid_source_indices, valid_nan_cols]

    return np.nan_to_num(normalized_sequence, nan=0.0)

def process_single_video(video_path, model, expected_joint_count=17):
    """Processes a single video, extracts joints, and returns a NumPy array (frames, num_coords)."""
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return None
    video = cv2.VideoCapture(video_path)
    if not video.isOpened():
        print(f"Error: Could not open video {video_path}.")
        return None

    video_joints, frame_count = [], 0
    expected_coords = expected_joint_count * 2

    while True:
        ret, frame = video.read()
        if not ret: break
        try:
            joints = model.predict(frame)
            if joints is None or joints.shape[0] == 0 or joints.shape[1] != expected_joint_count:
                frame_coords = np.full(expected_coords, np.nan)
            else:
                frame_coords = joints[0, :, :2].flatten()
                if frame_coords.shape[0] != expected_coords:
                    frame_coords = np.full(expected_coords, np.nan)
        except Exception as e:
            frame_coords = np.full(expected_coords, np.nan)
        video_joints.append(frame_coords)
        frame_count += 1
    video.release()
    return np.array(video_joints) if video_joints else None

def normalize_pose_sequence(pose_sequence, joint_names, ref_joint1_name="Left Shoulder", ref_joint2_name="Right Shoulder"):
    """Spatially normalizes a pose sequence based on reference joints."""
    if pose_sequence is None or pose_sequence.shape[0] == 0: return np.array([])
    try:
        ref_joint1_idx, ref_joint2_idx = joint_names.index(ref_joint1_name), joint_names.index(ref_joint2_name)
    except ValueError:
        print(f"Error: Reference joints '{ref_joint1_name}' or '{ref_joint2_name}' not found.")
        return np.full_like(pose_sequence, np.nan)

    normalized_frames = []
    num_coords = pose_sequence.shape[1]
    if num_coords != len(joint_names) * 2:
         print(f"Warning: Mismatch between expected coords ({len(joint_names)*2}) and actual ({num_coords}) in normalize_pose_sequence.")
         if num_coords % 2 != 0: return np.full_like(pose_sequence, np.nan)

    for frame in pose_sequence:
        try:
             frame_joints = frame.reshape(-1, 2)

             if frame_joints.shape[0] != len(joint_names):
                  normalized_frames.append(np.full(num_coords, np.nan))
                  continue

             ref1 = get_keypoint(frame_joints, ref_joint1_idx)
             ref2 = get_keypoint(frame_joints, ref_joint2_idx)

             if ref1 is None or ref2 is None:
                 normalized_frames.append(np.full(num_coords, np.nan))
                 continue

             center = (np.array(ref1) + np.array(ref2)) / 2.0
             scale = np.linalg.norm(np.array(ref1) - np.array(ref2))

             if scale == 0 or np.isclose(scale, 0): 
                 normalized_frames.append(np.full(num_coords, np.nan))
                 continue

             norm_frame_joints = (frame_joints - center) / scale
             normalized_frames.append(norm_frame_joints.flatten())

        except ValueError as ve: 
            normalized_frames.append(np.full(num_coords, np.nan))
            continue
        except Exception as e: 
            normalized_frames.append(np.full(num_coords, np.nan))
            continue

    return np.array(normalized_frames)


def _extract_trajectory_and_find_peaks(pose_sequence, exercise_label, joint_names, smoothing_window, prominence, distance):
    """Internal helper to get trajectory and find peaks with specific parameters."""
    if pose_sequence is None or pose_sequence.shape[0] < max(distance * 2, smoothing_window) : return None, []

    trajectory, invert_for_valley = None, False
    try: 
        if 'squat' in exercise_label.lower() or 'lunge' in exercise_label.lower():
            lhip_idx, rhip_idx = joint_names.index('Left Hip'), joint_names.index('Right Hip')
            lhip_y, rhip_y = pose_sequence[:, 2 * lhip_idx + 1], pose_sequence[:, 2 * rhip_idx + 1]
            valid_mask = ~np.isnan(lhip_y) & ~np.isnan(rhip_y)
            if not np.any(valid_mask): return None, [] # No valid frames
            trajectory = np.nanmean([lhip_y[valid_mask], rhip_y[valid_mask]], axis=0) 
            trajectory = np.nanmean([pose_sequence[:, 2 * lhip_idx + 1], pose_sequence[:, 2 * rhip_idx + 1]], axis=0) 
            invert_for_valley = True
        elif 'bicep_curl' in exercise_label.lower() or 'curl' in exercise_label.lower():
             lshoulder_idx, lelbow_idx, lwrist_idx = joint_names.index('Left Shoulder'), joint_names.index('Left Elbow'), joint_names.index('Left Wrist')
             angles = [calculate_angle(get_keypoint(f.reshape(-1,2), lshoulder_idx), get_keypoint(f.reshape(-1,2), lelbow_idx), get_keypoint(f.reshape(-1,2), lwrist_idx)) for f in pose_sequence]
             trajectory = np.array([a if a is not None else np.nan for a in angles])
             invert_for_valley = True
        elif 'tricep pushdown' in exercise_label.lower():
             lshoulder_idx, lelbow_idx, lwrist_idx = joint_names.index('Left Shoulder'), joint_names.index('Left Elbow'), joint_names.index('Left Wrist')
             angles = [calculate_angle(get_keypoint(f.reshape(-1,2), lshoulder_idx), get_keypoint(f.reshape(-1,2), lelbow_idx), get_keypoint(f.reshape(-1,2), lwrist_idx)) for f in pose_sequence]
             trajectory = np.array([a if a is not None else np.nan for a in angles])
             invert_for_valley = False
        elif 'tricep dips' in exercise_label.lower():
             lshoulder_idx, rshoulder_idx = joint_names.index('Left Shoulder'), joint_names.index('Right Shoulder')
             trajectory = np.nanmean([pose_sequence[:, 2 * lshoulder_idx + 1], pose_sequence[:, 2 * rshoulder_idx + 1]], axis=0)
             invert_for_valley = True
        else:
             print(f"Warning: Trajectory logic not defined for '{exercise_label}' in _extract_trajectory_and_find_peaks.")
             return None, []

        if trajectory is None or trajectory.shape[0] == 0 or np.all(np.isnan(trajectory)): return None, []
        nan_mask = np.isnan(trajectory)
        if np.any(nan_mask):
            indices = np.arange(len(trajectory))
            valid_indices = indices[~nan_mask]
            if len(valid_indices) < 2: return None, [] 
            trajectory[nan_mask] = np.interp(indices[nan_mask], valid_indices, trajectory[valid_indices])
        # Check again after interpolation
        if np.any(np.isnan(trajectory)):
             trajectory = np.nan_to_num(trajectory, nan=np.nanmean(trajectory))

        if invert_for_valley: trajectory = -trajectory

    except (ValueError, IndexError) as e: return None, []
    except NameError as e: return None, []
    except Exception as e: return None, []

    if len(trajectory) < smoothing_window: return trajectory, []
    smoothed_trajectory = np.convolve(trajectory, np.ones(smoothing_window)/smoothing_window, mode='valid')

    if smoothed_trajectory.shape[0] == 0: return trajectory, []
    data_range = np.ptp(smoothed_trajectory)
    required_prominence = data_range * prominence if data_range > 0 else 0.01

    try:
        smoothed_indices_offset = (len(trajectory) - len(smoothed_trajectory)) // 2 
        peak_indices_smoothed, properties = find_peaks(smoothed_trajectory, prominence=required_prominence, distance=distance)
        peak_indices_original = peak_indices_smoothed + smoothed_indices_offset
        # Ensure indices are within bounds
        peak_indices_original = peak_indices_original[(peak_indices_original >= 0) & (peak_indices_original < len(trajectory))]
        return trajectory, peak_indices_original
    except Exception: return trajectory, []


def segment_repetitions(pose_sequence, exercise_label, joint_names, smoothing_window=DEFAULT_SMOOTHING_WINDOW, peak_prominence=DEFAULT_PEAK_PROMINENCE, peak_distance=DEFAULT_PEAK_DISTANCE):
    """Segments a pose sequence into individual repetitions."""
    _, peak_indices = _extract_trajectory_and_find_peaks(pose_sequence, exercise_label, joint_names, smoothing_window, peak_prominence, peak_distance)
    if peak_indices is None or len(peak_indices) < 2: return []
    repetitions = [pose_sequence[peak_indices[i]:peak_indices[i+1], :] for i in range(len(peak_indices) - 1)]
    return [rep for rep in repetitions if rep.shape[0] > 1]


def tune_counting_parameters(exercise_label, labeled_video_data, joint_names, prominence_range, distance_range):
    """Performs grid search to find optimal peak detection parameters."""
    print(f"  Tuning repetition counting parameters for: {exercise_label}...")
    best_params = (DEFAULT_PEAK_PROMINENCE, DEFAULT_PEAK_DISTANCE)
    min_total_error = float('inf')
    if not labeled_video_data: return best_params

    num_combinations = len(prominence_range) * len(distance_range)
    print(f"    Testing {num_combinations} parameter combinations...")
    default_error = 0

    for sequence, true_reps in labeled_video_data:
         _, peak_indices_def = _extract_trajectory_and_find_peaks(sequence, exercise_label, joint_names, DEFAULT_SMOOTHING_WINDOW, DEFAULT_PEAK_PROMINENCE, DEFAULT_PEAK_DISTANCE)
         calc_reps_def = len(peak_indices_def) if peak_indices_def is not None else 0
         default_error += abs(calc_reps_def - true_reps)

    for p_idx, p in enumerate(prominence_range):
        for d_idx, d in enumerate(distance_range):
            current_total_error = 0
            for sequence, true_reps in labeled_video_data:
                _, peak_indices = _extract_trajectory_and_find_peaks(sequence, exercise_label, joint_names, DEFAULT_SMOOTHING_WINDOW, p, d)
                calculated_reps = len(peak_indices) if peak_indices is not None else 0
                current_total_error += abs(calculated_reps - true_reps)
            if current_total_error < min_total_error:
                min_total_error = current_total_error
                best_params = (p, d)
            if min_total_error == 0: break
        if min_total_error == 0: break


    print(f"    Tuning complete. Best Params: P={best_params[0]:.3f}, D={best_params[1]}, Min Error={min_total_error}. (Default Error={default_error})")
    return best_params


def load_baseline_data(baseline_root_dir, hrnet_model, joint_names, target_rep_length, prominence_range, distance_range,
                       threshold_std_multiplier=2.0, min_reps_for_threshold=3):
    """Loads baselines, tunes rep counting params, averages sequences, AND calculates derived thresholds."""
    averaged_baseline_data, tuned_counting_params, exercise_specific_thresholds = {}, {}, {}
    start_time_total = time.time()
    print(f"Loading, Tuning, Averaging Baselines, and Deriving Thresholds from: {baseline_root_dir}")
    if not os.path.isdir(baseline_root_dir): return averaged_baseline_data, tuned_counting_params, exercise_specific_thresholds

    for exercise_label in sorted(os.listdir(baseline_root_dir)):
        exercise_folder_path = os.path.join(baseline_root_dir, exercise_label)
        if not os.path.isdir(exercise_folder_path): continue
        print(f"\nProcessing baseline exercise: {exercise_label}")
        all_normalized_reps_for_exercise, labeled_data_for_tuning = [], []
        video_files = sorted([f for f in os.listdir(exercise_folder_path) if f.lower().endswith(('.mp4', '.avi', '.mov'))])

        for video_file in video_files:
            video_path = os.path.join(exercise_folder_path, video_file)
            true_reps = None
            match = re.search(r'_(\d+)reps', video_file, re.IGNORECASE)
            if match: true_reps = int(match.group(1))

            raw_seq = process_single_video(video_path, hrnet_model, EXPECTED_JOINT_COUNT)
            if raw_seq is None or raw_seq.shape[0] == 0: continue
            spatially_norm_seq = normalize_pose_sequence(raw_seq, joint_names)
            if spatially_norm_seq is None or np.all(np.isnan(spatially_norm_seq)): continue
            cleaned_seq = handle_nan_values(spatially_norm_seq)
            if cleaned_seq.shape[0] < 2: continue
            if true_reps is not None: labeled_data_for_tuning.append((cleaned_seq, true_reps))

            repetitions = segment_repetitions(cleaned_seq, exercise_label, joint_names)
            for rep_segment in repetitions:
                if rep_segment is not None and rep_segment.shape[0] >= 2:
                    time_norm_rep = time_normalize_sequence(rep_segment, target_rep_length)
                    if not np.isnan(time_norm_rep).all(): all_normalized_reps_for_exercise.append(time_norm_rep)

        if labeled_data_for_tuning:
            tuned_p, tuned_d = tune_counting_parameters(exercise_label, labeled_data_for_tuning, joint_names, prominence_range, distance_range)
            tuned_counting_params[exercise_label] = (tuned_p, tuned_d)
        else: tuned_counting_params[exercise_label] = (DEFAULT_PEAK_PROMINENCE, DEFAULT_PEAK_DISTANCE)

        averaged_sequence = None
        if len(all_normalized_reps_for_exercise) > 0:
            reps_stack = np.stack(all_normalized_reps_for_exercise, axis=0)
            averaged_sequence = np.nanmean(reps_stack, axis=0)
            averaged_baseline_data[exercise_label] = np.nan_to_num(averaged_sequence, nan=0.0)
        else: exercise_specific_thresholds[exercise_label] = None; continue

        derived_threshold = None
        if averaged_sequence is not None and len(all_normalized_reps_for_exercise) >= min_reps_for_threshold:
            intra_distances = []
            safe_avg = np.nan_to_num(averaged_sequence, nan=0.0)
            for norm_rep in all_normalized_reps_for_exercise:
                safe_rep = np.nan_to_num(norm_rep, nan=0.0)
                if safe_rep.shape == safe_avg.shape:
                    try: dist, _ = fastdtw(safe_rep, safe_avg, dist=euclidean); intra_distances.append(dist)
                    except Exception: pass 
            if len(intra_distances) >= min_reps_for_threshold:
                 mean_dist, std_dist = np.mean(intra_distances), np.std(intra_distances)
                 derived_threshold = mean_dist + threshold_std_multiplier * std_dist
                 print(f"    Derived Threshold for {exercise_label}: {derived_threshold:.2f} (Mean={mean_dist:.2f}, Std={std_dist:.2f})")
        exercise_specific_thresholds[exercise_label] = derived_threshold

    print(f"\nBaseline processing complete in {time.time() - start_time_total:.2f}s.")
    print(f"Averaged baselines: {list(averaged_baseline_data.keys())}")
    print(f"Tuned count params: {tuned_counting_params}")
    print(f"Derived thresholds: {exercise_specific_thresholds}")
    return averaged_baseline_data, tuned_counting_params, exercise_specific_thresholds


def count_repetitions(pose_sequence, exercise_label, joint_names, prominence=DEFAULT_PEAK_PROMINENCE, distance=DEFAULT_PEAK_DISTANCE, smoothing_window=DEFAULT_SMOOTHING_WINDOW):
    """Counts repetitions using the core helper logic with specified parameters."""
    _, peak_indices = _extract_trajectory_and_find_peaks(pose_sequence, exercise_label, joint_names, smoothing_window, prominence, distance)
    return len(peak_indices) if peak_indices is not None else 0


def recognize_exercise(new_video_path, averaged_baseline_data, hrnet_model, joint_names, distance_threshold, target_rep_length, tuned_counting_params):
    """Recognizes exercise using tuned segmentation and per-rep comparison."""
    print(f"\n--- Recognizing Exercise (Per Repetition / Tuned Seg) for: {os.path.basename(new_video_path)} ---")
    start_time = time.time()
    if tuned_counting_params is None: tuned_counting_params = {}
    default_p, default_d = DEFAULT_PEAK_PROMINENCE, DEFAULT_PEAK_DISTANCE

    new_raw_seq = process_single_video(new_video_path, hrnet_model, EXPECTED_JOINT_COUNT)
    if new_raw_seq is None or new_raw_seq.shape[0] == 0: return [], 0
    new_spatially_norm_seq = normalize_pose_sequence(new_raw_seq, joint_names)
    if new_spatially_norm_seq is None or np.all(np.isnan(new_spatially_norm_seq)): return [], 0
    new_sequence_clean = handle_nan_values(new_spatially_norm_seq)
    if new_sequence_clean.shape[0] < 2: return [], 0
    new_sequence_time_norm = time_normalize_sequence(new_sequence_clean, target_rep_length)
    if np.isnan(new_sequence_time_norm).all(): return [], 0
    new_sequence_time_norm = np.nan_to_num(new_sequence_time_norm, nan=0.0)

    initial_min_distance, initial_best_label = float('inf'), None
    segment_p, segment_d = default_p, default_d
    segment_label_guess = 'unknown'
    if not averaged_baseline_data: print("Error: No baseline data for initial guess."); return [], 0
    for ex_label, avg_base_seq in averaged_baseline_data.items():
        if new_sequence_time_norm.shape != avg_base_seq.shape: continue
        safe_base = np.nan_to_num(avg_base_seq, nan=0.0)
        try: dist, _ = fastdtw(new_sequence_time_norm, safe_base, dist=euclidean)
        except Exception: continue
        if dist < initial_min_distance: initial_min_distance, initial_best_label = dist, ex_label

    if initial_best_label and initial_min_distance < distance_threshold:
        tuned_p, tuned_d = tuned_counting_params.get(initial_best_label, (default_p, default_d))
        segment_p, segment_d = tuned_p, tuned_d
        segment_label_guess = initial_best_label
        print(f"  Initial guess: {initial_best_label}. Using its tuned params for segmentation (P={segment_p:.3f}, D={segment_d}).")
    else:
        print(f"  Initial guess inconclusive. Using default params for segmentation.")
        if initial_best_label: segment_label_guess = initial_best_label

    test_repetitions = segment_repetitions(new_sequence_clean, segment_label_guess, joint_names, peak_prominence=segment_p, peak_distance=segment_d)
    num_detected_reps = len(test_repetitions)
    print(f"  Detected {num_detected_reps} potential repetitions in test video.")
    if num_detected_reps == 0: return [], 0

    repetition_results = []
    print("  Comparing each detected repetition against averaged baselines...")
    for i, rep_segment in enumerate(test_repetitions):
        if rep_segment is None or rep_segment.shape[0] < 2:
             repetition_results.append( (i+1, "Error: Invalid Segment", float('inf')) ); continue
        time_norm_rep_segment = time_normalize_sequence(rep_segment, target_rep_length)
        if np.isnan(time_norm_rep_segment).all():
            repetition_results.append( (i+1, "Error: Time Norm Failed", float('inf')) ); continue
        time_norm_rep_segment = np.nan_to_num(time_norm_rep_segment, nan=0.0)

        min_rep_distance, best_rep_label_for_this_rep = float('inf'), None
        for exercise_label, avg_baseline_seq in averaged_baseline_data.items():
            if time_norm_rep_segment.shape != avg_baseline_seq.shape: continue
            safe_baseline = np.nan_to_num(avg_baseline_seq, nan=0.0)
            try: distance, _ = fastdtw(time_norm_rep_segment, safe_baseline, dist=euclidean)
            except Exception: continue
            if distance < min_rep_distance: min_rep_distance, best_rep_label_for_this_rep = distance, exercise_label

        matched_label_for_rep = "No Match / Inconclusive"
        if best_rep_label_for_this_rep and min_rep_distance < distance_threshold:
            matched_label_for_rep = best_rep_label_for_this_rep
        repetition_results.append( (i+1, matched_label_for_rep, min_rep_distance) )

    end_time = time.time()
    print(f"--- Recognition finished in {end_time - start_time:.2f}s ---")
    return repetition_results, num_detected_reps

Defining Core Processing Functions...
Core processing functions defined (Cell 5 - Final Version).


In [35]:

import os
import numpy as np

print(" Running Configuration PreCheck ")
config_ok = True
error_messages = []

if 'MODEL_PATH' not in locals() or not MODEL_PATH:
    error_messages.append("❌ MODEL_PATH: Not defined.")
    config_ok = False
elif not os.path.exists(MODEL_PATH):
    error_messages.append(f"❌ MODEL_PATH: File not found at '{MODEL_PATH}'.")
    config_ok = False
else:
    print(f"✅ MODEL_PATH: Found ('{os.path.basename(MODEL_PATH)}').")

if 'BASELINE_ROOT_DIR' not in locals() or not BASELINE_ROOT_DIR:
    error_messages.append("❌ BASELINE_ROOT_DIR: Not defined.")
    config_ok = False
elif not os.path.isdir(BASELINE_ROOT_DIR):
    error_messages.append(f"❌ BASELINE_ROOT_DIR: Directory not found at '{BASELINE_ROOT_DIR}'. Please create it or correct the path.")
    config_ok = False
else:
    print(f"✅ BASELINE_ROOT_DIR: Found ('{BASELINE_ROOT_DIR}').")

if 'VIDEO_TO_RECOGNIZE_PATH' not in locals() or not VIDEO_TO_RECOGNIZE_PATH:
    error_messages.append("❌ VIDEO_TO_RECOGNIZE_PATH: Not defined.")
    config_ok = False
elif not os.path.exists(VIDEO_TO_RECOGNIZE_PATH):
    error_messages.append(f"❌ VIDEO_TO_RECOGNIZE_PATH: File not found at '{VIDEO_TO_RECOGNIZE_PATH}'.")
    config_ok = False
else:
    print(f"✅ VIDEO_TO_RECOGNIZE_PATH: Found ('{os.path.basename(VIDEO_TO_RECOGNIZE_PATH)}').")

if 'DTW_DISTANCE_THRESHOLD' not in locals() or not isinstance(DTW_DISTANCE_THRESHOLD, (int, float, np.number)) or np.isnan(DTW_DISTANCE_THRESHOLD):
    error_messages.append("❌ DTW_DISTANCE_THRESHOLD: Not defined or not a valid number.")
    config_ok = False
else:
    print(f"✅ DTW_DISTANCE_THRESHOLD: Set ({DTW_DISTANCE_THRESHOLD}).")

if 'TARGET_REP_LENGTH' not in locals() or not isinstance(TARGET_REP_LENGTH, int) or TARGET_REP_LENGTH <= 0:
    error_messages.append("❌ TARGET_REP_LENGTH: Not defined or not a positive integer.")
    config_ok = False
else:
    print(f"✅ TARGET_REP_LENGTH: Set ({TARGET_REP_LENGTH}).")

if 'EXPECTED_JOINT_COUNT' not in locals() or not isinstance(EXPECTED_JOINT_COUNT, int) or EXPECTED_JOINT_COUNT <= 0:
    error_messages.append("❌ EXPECTED_JOINT_COUNT: Not defined or not a positive integer.")
    config_ok = False
else:
    print(f"✅ EXPECTED_JOINT_COUNT: Set ({EXPECTED_JOINT_COUNT}).")
    if 'JOINT_NAMES_LIST' not in locals() or not isinstance(JOINT_NAMES_LIST, list):
        error_messages.append("❌ JOINT_NAMES_LIST: Not defined or not a list.")
        config_ok = False
    elif len(JOINT_NAMES_LIST) != EXPECTED_JOINT_COUNT:
        error_messages.append(f"❌ JOINT_NAMES_LIST: Length ({len(JOINT_NAMES_LIST)}) does not match EXPECTED_JOINT_COUNT ({EXPECTED_JOINT_COUNT}).")
        config_ok = False
    else:
        print(f"✅ JOINT_NAMES_LIST: Defined with correct length ({len(JOINT_NAMES_LIST)}).")
if 'HRNET_MODEL' not in locals() or HRNET_MODEL is None:
     error_messages.append("❌ HRNET_MODEL: Model was not loaded successfully in Cell 3.")
     config_ok = False
else:
     print("✅ HRNET_MODEL: Loaded.")


 Running Configuration PreCheck 
✅ MODEL_PATH: Found ('pose_hrnet_w48_256x192.pth').
✅ BASELINE_ROOT_DIR: Found ('C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/baselines2').
✅ VIDEO_TO_RECOGNIZE_PATH: Found ('test_excercise.mp4').
✅ DTW_DISTANCE_THRESHOLD: Set (500.0).
✅ TARGET_REP_LENGTH: Set (100).
✅ EXPECTED_JOINT_COUNT: Set (17).
✅ JOINT_NAMES_LIST: Defined with correct length (17).
✅ HRNET_MODEL: Loaded.


In [36]:
if 'HRNET_MODEL' not in locals() or HRNET_MODEL is None:
    print("HRNet Model not loaded. Please run Cell 3 successfully first.")
elif 'config_ok' not in locals() or not config_ok:
    print("Configuration check in Cell 5.5 failed. Please fix issues before running baseline loading.")

else:
    print("Step 1: Loading Baselines, Tuning Params, Deriving Thresholds") 

    averaged_baselines, tuned_params, derived_thresholds = load_baseline_data(
        BASELINE_ROOT_DIR,
        HRNET_MODEL,
        JOINT_NAMES_LIST,
        TARGET_REP_LENGTH,
        TUNING_PROMINENCE_RANGE,
        TUNING_DISTANCE_RANGE,
        threshold_std_multiplier=2.0,
        min_reps_for_threshold=3
    )

    if averaged_baselines and tuned_params and derived_thresholds:
        print("\n Baseline Data Loaded, Tuned, Thresholds Derived Successfully  👍")
        print(f"Loaded exercises: {list(averaged_baselines.keys())}")
    else:
        print("\n Baseline Data Processing Failed  👎")


Step 1: Loading Baselines, Tuning Params, Deriving Thresholds
Loading, Tuning, Averaging Baselines, and Deriving Thresholds from: C:/Users/michel.marien_icarew/Documents/Privé/Opleiding/Mastervakken/Deep Neural Networks/Opdacht 2/baselines2

Processing baseline exercise: tricep Pushdown
  Tuning repetition counting parameters for: tricep Pushdown...
    Testing 30 parameter combinations...
    Tuning complete. Best Params: P=0.080, D=15, Min Error=2. (Default Error=6)
    Derived Threshold for tricep Pushdown: 6002.86 (Mean=2258.88, Std=1871.99)

Processing baseline exercise: tricep dips
  Tuning repetition counting parameters for: tricep dips...
    Testing 30 parameter combinations...
    Tuning complete. Best Params: P=0.080, D=20, Min Error=10. (Default Error=31)
    Derived Threshold for tricep dips: 2026.38 (Mean=1014.17, Std=506.10)

Baseline processing complete in 1889.20s.
Averaged baselines: ['tricep Pushdown', 'tricep dips']
Tuned count params: {'tricep Pushdown': (0.08, 15)

In [38]:
DTW_DISTANCE_THRESHOLD = 7000

In [41]:
video_to_test = VIDEO_TO_RECOGNIZE_PATH 
proceed_to_recognition = True

if proceed_to_recognition:
    print("==============================================")
    print(f"    Step 2: Recognizing Exercise in Video (Per Rep) ")
    print(f"    Video: {os.path.basename(video_to_test)} ")
    print("==============================================")

    list_of_rep_results, total_reps_detected = recognize_exercise(
        video_to_test,
        averaged_baselines,
        HRNET_MODEL,
        JOINT_NAMES_LIST,
        DTW_DISTANCE_THRESHOLD,
        TARGET_REP_LENGTH,
        tuned_params
    )

    print(f"FINAL RESULT for {os.path.basename(video_to_test)}")

    print(f"Detected {total_reps_detected} Repetitions (using tuned segmentation parameters where possible).")

    if not list_of_rep_results:
        print("No valid repetition results to display (segmentation might have failed)")
    else:
        print("\n  --- Per-Repetition Analysis ---")
        match_counts = {}
        successful_reps = 0
        for rep_index, matched_label, min_distance in list_of_rep_results:
            status = ""
            if matched_label.startswith("Error"):
                status = f"-> {matched_label}"
            elif matched_label == "No Match / Inconclusive":
                status = f"-> No Match (Min Dist: {min_distance:.2f} >= {DTW_DISTANCE_THRESHOLD})"
            else:
                status = f"-> Match: {matched_label} (Dist: {min_distance:.2f})"
                match_counts[matched_label] = match_counts.get(matched_label, 0) + 1
                successful_reps += 1

            print(f"    Rep {rep_index}: {status}")

        if successful_reps > 0:
             if match_counts:
                 majority_label = max(match_counts, key=match_counts.get)
                 majority_count = match_counts[majority_label]
                 print(f"  Overall Predominant Match: {majority_label} ({majority_count}/{successful_reps} successfully matched reps)")
             else:
                  print("No specific exercises were matched successfully.")
             print(f"  Total Reps Successfully Matched (below threshold): {successful_reps} / {total_reps_detected}")
        else:
             print("No repetitions were matched")

else:
    for msg in error_messages:
        print(msg)

    Step 2: Recognizing Exercise in Video (Per Rep) 
    Video: test_excercise.mp4 

--- Recognizing Exercise (Per Repetition / Tuned Seg) for: test_excercise.mp4 ---
  Initial guess: tricep dips. Using its tuned params for segmentation (P=0.080, D=20).
  Detected 0 potential repetitions in test video.

    FINAL RESULT for test_excercise.mp4
  Detected 0 Repetitions (using tuned segmentation parameters where possible).
  No valid repetition results to display (segmentation might have failed or video was problematic).

