### ignore (past implementation)

In [None]:
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from sklearn.preprocessing import StandardScaler, LabelEncoder
import math
import joblib
from tensorflow.keras.models import load_model

In [2]:
class RealTimeSquatErrorDetector:
    """
    Class for real-time squat error detection
    """
    def __init__(self, model_path, scaler_path, label_encoder_path, window_size=30):
        """
        Initialize the detector
        
        Parameters:
        ----------
        model_path : str
            Path to the saved model
        scaler_path : str
            Path to the saved scaler
        label_encoder_path : str
            Path to the saved label encoder
        window_size : int
            Size of the sliding window
        """
        # Load model and preprocessing objects
        self.model = load_model(model_path)
        self.scaler = joblib.load(scaler_path)
        self.label_encoder = joblib.load(label_encoder_path)
        
        # Initialize buffer
        self.window_size = window_size
        self.buffer = []
        
        # Prepare feature names (needed for preprocessing)
        self.feature_names = [col for col in self.scaler.feature_names_in_ 
                             if not col.endswith('_velocity')]
        
        # Print initialization info
        print(f"Loaded model from {model_path}")
        print(f"Model expects features: {self.feature_names}")
        print(f"Detecting classes: {self.label_encoder.classes_}")
    
    def preprocess_frame(self, keypoints_dict):
        """
        Preprocess a single frame of keypoints
        
        Parameters:
        ----------
        keypoints_dict : dict
            Dictionary of keypoints {joint_name_x: value, joint_name_y: value, ...}
            
        Returns:
        -------
        features : np.array
            Extracted features for this frame
        """
        # Convert keypoints to a dataframe with a single row
        df = pd.DataFrame([keypoints_dict])
        
        # Extract angles (same as in preprocessing pipeline)
        # This is a simplified version - in production you'd reuse your angle extraction code
        # For brevity, I'm assuming the extraction happens here
        
        # Placeholder for extracted features
        extracted_features = extract_angles_from_frame(df)
        
        return extracted_features
    
    def update_buffer(self, features):
        """
        Update the sliding window buffer with new features
        
        Parameters:
        ----------
        features : np.array
            Features for the current frame
        """
        # Add new features to buffer
        self.buffer.append(features)
        
        # Keep buffer at fixed size
        if len(self.buffer) > self.window_size:
            self.buffer.pop(0)
    
    def detect_errors(self, keypoints_dict):
        """
        Detect squat errors from a new frame of keypoints
        
        Parameters:
        ----------
        keypoints_dict : dict
            Dictionary of keypoints {joint_name_x: value, joint_name_y: value, ...}
            
        Returns:
        -------
        prediction : str
            Predicted error class or 'good'
        confidence : float
            Confidence score for the prediction
        """
        # Preprocess frame
        features = self.preprocess_frame(keypoints_dict)
        
        # Update buffer
        self.update_buffer(features)
        
        # Only make prediction if buffer is full
        if len(self.buffer) == self.window_size:
            # Convert buffer to numpy array
            X = np.array(self.buffer)
            
            # Normalize features
            X_normalized = self.scaler.transform(X)
            
            # Reshape for LSTM input [samples, time steps, features]
            X_reshaped = X_normalized.reshape(1, self.window_size, -1)
            
            # Make prediction
            y_proba = self.model.predict(X_reshaped, verbose=0)[0]
            predicted_class = np.argmax(y_proba)
            confidence = y_proba[predicted_class]
            
            # Convert to class name
            prediction = self.label_encoder.classes_[predicted_class]
            
            return prediction, confidence
        else:
            # Not enough frames yet
            return "Collecting frames...", 0.0

def extract_angles_from_frame(keypoints_dict):
    """
    Extract angles from a single frame of keypoints for real-time processing
    
    Parameters:
    ----------
    keypoints_dict : dict
        Dictionary containing keypoint coordinates
        
    Returns:
    -------
    features : np.array
        Array of extracted angle features
    """
    # Convert single keypoint dictionary to a properly structured pandas DataFrame
    df = pd.DataFrame([keypoints_dict])
    
    # Create empty dictionary to store extracted angles
    angles = {}
    
    # Define function to calculate vector between two points
    def calc_vector(point1, point2):
        vec = np.zeros(3)
        vec[0] = df[f'{point2}_x'].values[0] - df[f'{point1}_x'].values[0]
        vec[1] = df[f'{point2}_y'].values[0] - df[f'{point1}_y'].values[0]
        vec[2] = df[f'{point2}_z'].values[0] - df[f'{point1}_z'].values[0]
        return vec
    
    # Define function to normalize vector
    def normalize_vector(vec):
        magnitude = np.sqrt(np.sum(vec**2))
        if magnitude < 1e-10:  # Avoid division by zero
            magnitude = 1e-10
        return vec / magnitude
    
    # Define function to calculate angle between vectors
    def angle_between(vec1, vec2):
        vec1_norm = normalize_vector(vec1)
        vec2_norm = normalize_vector(vec2)
        dot_product = np.sum(vec1_norm * vec2_norm)
        # Clip to avoid domain errors due to floating point precision
        dot_product = np.clip(dot_product, -1.0, 1.0)
        return np.degrees(np.arccos(dot_product))
    
    # Calculate vectors
    # Torso vectors
    shoulder_to_hip_left = calc_vector('LEFT_SHOULDER', 'LEFT_HIP')
    shoulder_to_hip_right = calc_vector('RIGHT_SHOULDER', 'RIGHT_HIP')
    
    # Thigh vectors
    hip_to_knee_left = calc_vector('LEFT_HIP', 'LEFT_KNEE')
    hip_to_knee_right = calc_vector('RIGHT_HIP', 'RIGHT_KNEE')
    
    # Shin vectors
    knee_to_ankle_left = calc_vector('LEFT_KNEE', 'LEFT_ANKLE')
    knee_to_ankle_right = calc_vector('RIGHT_KNEE', 'RIGHT_ANKLE')
    
    # Head vector
    nose_to_shoulder_mid = calc_vector('NOSE', 'LEFT_SHOULDER')
    
    # Calculate angles
    # Knee angles (shin to thigh)
    angles['left_knee_angle'] = angle_between(hip_to_knee_left, knee_to_ankle_left)
    angles['right_knee_angle'] = angle_between(hip_to_knee_right, knee_to_ankle_right)
    
    # Hip angles (torso to thigh)
    angles['left_hip_angle'] = angle_between(shoulder_to_hip_left, hip_to_knee_left)
    angles['right_hip_angle'] = angle_between(shoulder_to_hip_right, hip_to_knee_right)
    
    # Back angles (relative to vertical)
    vertical = np.array([0, 1, 0])  # Y axis is usually vertical in pose estimation
    angles['torso_vertical_angle'] = angle_between(shoulder_to_hip_left, vertical)
    
    # Head angle relative to torso
    angles['head_torso_angle'] = angle_between(nose_to_shoulder_mid, shoulder_to_hip_left)
    
    # Knee distance (for detecting knee valgus/varus)
    hip_width = np.sqrt(
        (df['RIGHT_HIP_x'].values[0] - df['LEFT_HIP_x'].values[0])**2 + 
        (df['RIGHT_HIP_z'].values[0] - df['LEFT_HIP_z'].values[0])**2
    )
    knee_distance = np.sqrt(
        (df['RIGHT_KNEE_x'].values[0] - df['LEFT_KNEE_x'].values[0])**2 + 
        (df['RIGHT_KNEE_z'].values[0] - df['LEFT_KNEE_z'].values[0])**2
    )
    angles['knee_distance_normalized'] = knee_distance / hip_width if hip_width > 0 else 0
    
    # Foot positioning (relevant for toe angle issues)
    ankle_distance = np.sqrt(
        (df['RIGHT_ANKLE_x'].values[0] - df['LEFT_ANKLE_x'].values[0])**2 + 
        (df['RIGHT_ANKLE_z'].values[0] - df['LEFT_ANKLE_z'].values[0])**2
    )
    angles['ankle_distance_normalized'] = ankle_distance / hip_width if hip_width > 0 else 0
    
    # Squat depth - hip height relative to knee height
    angles['left_squat_depth'] = df['LEFT_HIP_y'].values[0] - df['LEFT_KNEE_y'].values[0]
    angles['right_squat_depth'] = df['RIGHT_HIP_y'].values[0] - df['RIGHT_KNEE_y'].values[0]
    
    # Convert angles dictionary to a pandas Series and then to a numpy array
    features = pd.Series(angles).values
    
    return features

In [57]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd

In [8]:
class MediaPipePoseFeatureExtractor:
    def __init__(self):
        """
        Initialize MediaPipe pose detection and setup landmark mapping
        """
        # Initialize MediaPipe Pose
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=2,  # Use highest accuracy model
            smooth_landmarks=True,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        
        # Define mapping from MediaPipe landmarks to our keypoint format
        self.landmark_mapping = {
            # Format: our_keypoint_name: mediapipe_landmark_index
            'LEFT_SHOULDER': self.mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            'RIGHT_SHOULDER': self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            'LEFT_HIP': self.mp_pose.PoseLandmark.LEFT_HIP.value,
            'RIGHT_HIP': self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            'LEFT_KNEE': self.mp_pose.PoseLandmark.LEFT_KNEE.value,
            'RIGHT_KNEE': self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            'LEFT_ANKLE': self.mp_pose.PoseLandmark.LEFT_ANKLE.value,
            'RIGHT_ANKLE': self.mp_pose.PoseLandmark.RIGHT_ANKLE.value,
            'NOSE': self.mp_pose.PoseLandmark.NOSE.value,
        }
    
    def process_frame(self, frame):
        """
        Process a video frame with MediaPipe and extract pose landmarks
        
        Parameters:
        ----------
        frame : np.array
            RGB video frame
        
        Returns:
        -------
        keypoints_dict : dict
            Dictionary of keypoint coordinates compatible with our model
        landmarks : list or None
            Raw MediaPipe landmark results for visualization
        """
        # Convert BGR to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process the frame with MediaPipe
        results = self.pose.process(frame_rgb)
        
        # If no pose detected, return None
        if not results.pose_landmarks:
            return None, None
        
        # Extract landmarks and convert to our keypoint format
        keypoints_dict = self._landmarks_to_keypoint_dict(results.pose_landmarks.landmark, frame.shape)
        
        return keypoints_dict, results.pose_landmarks
    
    def _landmarks_to_keypoint_dict(self, landmarks, frame_shape):
        """
        Convert MediaPipe landmarks to our keypoint dictionary format
        
        Parameters:
        ----------
        landmarks : list
            MediaPipe landmark results
        frame_shape : tuple
            Shape of the video frame (height, width, channels)
        
        Returns:
        -------
        keypoints_dict : dict
            Dictionary of keypoint coordinates compatible with our model
        """
        height, width, _ = frame_shape
        keypoints_dict = {}
        
        # Extract coordinates for each keypoint
        for our_name, mp_idx in self.landmark_mapping.items():
            landmark = landmarks[mp_idx]
            
            # Convert normalized coordinates to pixel coordinates
            x = landmark.x * width
            y = landmark.y * height
            z = landmark.z * width  # Z is relative to width for reasonable scale
            
            # Store in our format
            keypoints_dict[f'{our_name}_x'] = x
            keypoints_dict[f'{our_name}_y'] = y
            keypoints_dict[f'{our_name}_z'] = z
        
        return keypoints_dict
    
    def draw_landmarks(self, frame, landmarks):
        """
        Draw the pose landmarks on the frame
        
        Parameters:
        ----------
        frame : np.array
            BGR video frame
        landmarks : mediapipe pose landmarks
            Landmarks to draw
        
        Returns:
        -------
        annotated_frame : np.array
            Frame with landmarks drawn
        """
        if landmarks:
            mp_drawing = mp.solutions.drawing_utils
            mp_drawing_styles = mp.solutions.drawing_styles
            
            # Draw pose landmarks
            mp_drawing.draw_landmarks(
                frame,
                landmarks,
                self.mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
            )
        
        return frame
    
    def release(self):
        """
        Release MediaPipe resources
        """
        self.pose.close()


def real_time_squat_detection(model_path='./best_squat_model.keras', 
                             scaler_path='./preprocessed_data_scaler.joblib',
                             label_encoder_path='./preprocessed_data_label_encoder.joblib',
                             camera_id=0):
    """
    Run real-time squat detection using webcam feed
    
    Parameters:
    ----------
    model_path : str
        Path to trained LSTM model
    scaler_path : str
        Path to fitted StandardScaler
    label_encoder_path : str
        Path to fitted LabelEncoder
    camera_id : int
        Camera ID for OpenCV
    """
    # Import here to avoid issues if user doesn't want to run this function
    #from real_time_error_detector import RealTimeSquatErrorDetector
    
    # Initialize MediaPipe feature extractor
    extractor = MediaPipePoseFeatureExtractor()
    
    # Initialize squat error detector
    detector = RealTimeSquatErrorDetector(
        model_path=model_path,
        scaler_path=scaler_path,
        label_encoder_path=label_encoder_path
    )
    
    # Initialize webcam
    cap = cv2.VideoCapture(camera_id)
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame")
            break
        
        # Process frame with MediaPipe
        keypoints_dict, landmarks = extractor.process_frame(frame)
        
        if keypoints_dict:
            # Get error prediction
            prediction, confidence = detector.detect_errors(keypoints_dict)
            
            # Draw prediction on frame
            text = f"{prediction} ({confidence:.2f})"
            cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, 
                       (0, 255, 0) if prediction == 'good' else (0, 0, 255), 2)
        else:
            cv2.putText(frame, "No pose detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, 
                       (0, 0, 255), 2)
        
        # Draw landmarks on frame
        frame = extractor.draw_landmarks(frame, landmarks)
        
        # Display frame
        cv2.imshow('Squat Form Analysis', frame)
        
        # Exit on 'q' press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Clean up
    cap.release()
    cv2.destroyAllWindows()
    extractor.release()


def process_video_file(video_path, output_path=None,
                      model_path='./best_squat_model.keras', 
                      scaler_path='./preprocessed_data_scaler.joblib',
                      label_encoder_path='./preprocessed_data_label_encoder.joblib'):
    """
    Process a video file for squat form analysis
    
    Parameters:
    ----------
    video_path : str
        Path to input video file
    output_path : str or None
        Path to save output video (if None, won't save)
    model_path : str
        Path to trained LSTM model
    scaler_path : str
        Path to fitted StandardScaler
    label_encoder_path : str
        Path to fitted LabelEncoder
    """
    # Import here to avoid issues if user doesn't want to run this function
    #from real_time_error_detector import RealTimeSquatErrorDetector
    
    # Initialize MediaPipe feature extractor
    extractor = MediaPipePoseFeatureExtractor()
    
    # Initialize squat error detector
    detector = RealTimeSquatErrorDetector(
        model_path=model_path,
        scaler_path=scaler_path,
        label_encoder_path=label_encoder_path
    )
    
    # Open video file
    cap = cv2.VideoCapture(video_path)
    
    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Initialize video writer if output path provided
    writer = None
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Process video frame by frame
    frame_count = 0
    predictions = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Process frame with MediaPipe
        keypoints_dict, landmarks = extractor.process_frame(frame)
        
        if keypoints_dict:
            # Get error prediction
            prediction, confidence = detector.detect_errors(keypoints_dict)
            predictions.append((frame_count, prediction, confidence))
            
            # Draw prediction on frame
            text = f"{prediction} ({confidence:.2f})"
            cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, 
                       (0, 255, 0) if prediction == 'good' else (0, 0, 255), 2)
        else:
            cv2.putText(frame, "No pose detected", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, 
                       (0, 0, 255), 2)
        
        # Draw landmarks on frame
        frame = extractor.draw_landmarks(frame, landmarks)
        
        # Write frame if output path provided
        if writer:
            writer.write(frame)
        
        frame_count += 1
        
        # Show progress every 100 frames
        if frame_count % 100 == 0:
            print(f"Processed {frame_count} frames")
    
    # Clean up
    cap.release()
    if writer:
        writer.close()
    extractor.release()
    
    # Create analysis dataframe
    if predictions:
        analysis_df = pd.DataFrame(predictions, columns=['frame', 'prediction', 'confidence'])
        
        # Print summary statistics
        print("\nAnalysis Summary:")
        print(f"Total frames analyzed: {frame_count}")
        print("\nError distribution:")
        error_counts = analysis_df['prediction'].value_counts()
        for error, count in error_counts.items():
            percentage = count / len(analysis_df) * 100
            print(f"{error}: {count} frames ({percentage:.1f}%)")
        
        # Save analysis to CSV if output path provided
        if output_path:
            csv_path = output_path.rsplit('.', 1)[0] + '_analysis.csv'
            analysis_df.to_csv(csv_path, index=False)
            print(f"\nAnalysis saved to {csv_path}")
    
    return predictions

In [9]:
if __name__ == "__main__":
    # Example usage:
    # For webcam-based detection:
    #real_time_squat_detection()
    
    # For video file processing:
    process_video_file("./squat_video.mp4", "analyzed_squat_video.mp4")

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Loaded model from ./best_squat_model.keras
Model expects features: ['left_knee_angle', 'right_knee_angle', 'left_hip_angle', 'right_hip_angle', 'torso_vertical_angle', 'head_torso_angle', 'knee_distance_normalized', 'ankle_distance_normalized', 'left_squat_depth', 'right_squat_depth']
Detecting classes: ['bad_back_round' 'bad_back_warp' 'bad_head' 'bad_inner_thigh'
 'bad_shallow' 'bad_toe' 'good']


ValueError: Must pass 2-d input. shape=(1, 1, 27)

# FINAL

In [58]:
import cv2
import numpy as np
import tensorflow as tf
import mediapipe as mp
import joblib
import pandas as pd
from collections import deque
import time

## Squat Analyzer Class (Loads and runs the model with mediapipe)

In [59]:
class SquatAnalyzer:
    def __init__(self, model_path, scaler_path, label_encoder_path, window_size=30):
        """Initialize the squat analyzer with trained model and preprocessing tools"""
        # Load model and preprocessing tools
        self.model = tf.keras.models.load_model(model_path)
        self.scaler = joblib.load(scaler_path)
        self.label_encoder = joblib.load(label_encoder_path)
        self.window_size = window_size
        
        # Initialize MediaPipe Pose
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5,
            model_complexity=1
        )
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        
        # Create buffer for storing features
        self.features_buffer = deque(maxlen=window_size)
        
        # Store current prediction and confidence
        self.current_prediction = None
        self.prediction_confidence = 0.0
        self.last_predictions = deque(maxlen=5)  # Store last 5 predictions for smoothing
        
        
        # Feature names for the processed angles
        self.feature_names = [
            'left_knee_angle', 'right_knee_angle', 
            'left_hip_angle', 'right_hip_angle',
            'torso_vertical_angle', 'head_torso_angle',
            'knee_distance_normalized', 'ankle_distance_normalized',
            'left_squat_depth', 'right_squat_depth',
            'left_knee_angle_velocity', 'right_knee_angle_velocity',
            'left_hip_angle_velocity', 'right_hip_angle_velocity',
            'torso_vertical_angle_velocity', 'head_torso_angle_velocity',
            'knee_distance_normalized_velocity', 'ankle_distance_normalized_velocity',
            'left_squat_depth_velocity', 'right_squat_depth_velocity'
        ]
        
        # Keypoint mapping from MediaPipe to our preprocessing format
        self.keypoint_map = {
            'NOSE': self.mp_pose.PoseLandmark.NOSE,
            'LEFT_SHOULDER': self.mp_pose.PoseLandmark.LEFT_SHOULDER,
            'RIGHT_SHOULDER': self.mp_pose.PoseLandmark.RIGHT_SHOULDER,
            'LEFT_HIP': self.mp_pose.PoseLandmark.LEFT_HIP,
            'RIGHT_HIP': self.mp_pose.PoseLandmark.RIGHT_HIP,
            'LEFT_KNEE': self.mp_pose.PoseLandmark.LEFT_KNEE,
            'RIGHT_KNEE': self.mp_pose.PoseLandmark.RIGHT_KNEE,
            'LEFT_ANKLE': self.mp_pose.PoseLandmark.LEFT_ANKLE,
            'RIGHT_ANKLE': self.mp_pose.PoseLandmark.RIGHT_ANKLE
        }
        
        # Error explanations
        self.error_explanations = {
            'bad_back_round': "Your back is rounding. Keep your spine neutral.",
            'bad_back_warp': "Your back is excessively arched. Maintain a neutral spine.",
            'bad_head': "Head position incorrect. Look slightly downward, keeping your neck aligned with your spine.",
            'bad_inner_thigh': "Knees collapsing inward. Keep knees aligned with toes.",
            'bad_shallow': "Squat is too shallow. Try to go deeper with proper form.",
            'bad_toe': "Foot positioning issue. Keep feet shoulder-width apart with toes slightly turned out.",
            'good': "Good form! Keep it up."
        }




        # Add rep counting variables
        self.rep_count = 0
        self.state = 'STANDING'  # Initial state
        self.depth_history = deque(maxlen=10)  # Store recent squat depths for dynamic thresholding
        self.min_depth = None  # Dynamic minimum depth (updated during exercise)
        self.max_depth = None  # Dynamic maximum depth (updated during exercise)
        self.depth_threshold_factor = 0.5  # Percentage of depth range to consider a state change

        # Add error occurrence tracking
        self.error_counts = {
            'bad_back_round': 0,
            'bad_back_warp': 0,
            'bad_head': 0,
            'bad_inner_thigh': 0,
            'bad_shallow': 0,
            'bad_toe': 0,
            'good': 0  # Track good form too
        }



        
        
        print("Squat Analyzer initialized successfully")

    def _landmarks_to_keypoints_dict(self, landmarks):
        """Convert MediaPipe landmarks to format compatible with preprocessing code"""
        keypoints = {}
        for name, landmark_id in self.keypoint_map.items():
            landmark = landmarks.landmark[landmark_id]
            keypoints[f'{name}_x'] = landmark.x
            keypoints[f'{name}_y'] = landmark.y
            keypoints[f'{name}_z'] = landmark.z
        return keypoints

    def _keypoints_dict_to_df(self, keypoints_dict):
        """Convert keypoints dictionary to DataFrame"""
        return pd.DataFrame([keypoints_dict])

    def _calculate_vector(self, df, point1, point2):
        """Calculate vector between two keypoints"""
        vec = np.zeros((len(df), 3))
        vec[:, 0] = df[f'{point2}_x'].values - df[f'{point1}_x'].values
        vec[:, 1] = df[f'{point2}_y'].values - df[f'{point1}_y'].values
        vec[:, 2] = df[f'{point2}_z'].values - df[f'{point1}_z'].values
        return vec

    def _normalize_vector(self, vec):
        """Normalize a vector to unit length"""
        magnitude = np.sqrt(np.sum(vec**2, axis=1))
        magnitude = np.where(magnitude == 0, 1e-10, magnitude)
        vec_normalized = vec / magnitude[:, np.newaxis]
        return vec_normalized

    def _angle_between_vectors(self, vec1, vec2):
        """Calculate the angle between two 3D vectors in degrees"""
        vec1_norm = self._normalize_vector(vec1)
        vec2_norm = self._normalize_vector(vec2)
        dot_product = np.sum(vec1_norm * vec2_norm, axis=1)
        dot_product = np.clip(dot_product, -1.0, 1.0)
        angles = np.degrees(np.arccos(dot_product))
        return angles

    def _extract_anatomical_angles(self, df):
        """Extract biomechanically relevant angles from keypoints"""
        angles_df = pd.DataFrame()
        
        # Calculate vectors
        shoulder_to_hip_left = self._calculate_vector(df, 'LEFT_SHOULDER', 'LEFT_HIP')
        shoulder_to_hip_right = self._calculate_vector(df, 'RIGHT_SHOULDER', 'RIGHT_HIP')
        hip_to_knee_left = self._calculate_vector(df, 'LEFT_HIP', 'LEFT_KNEE')
        hip_to_knee_right = self._calculate_vector(df, 'RIGHT_HIP', 'RIGHT_KNEE')
        knee_to_ankle_left = self._calculate_vector(df, 'LEFT_KNEE', 'LEFT_ANKLE')
        knee_to_ankle_right = self._calculate_vector(df, 'RIGHT_KNEE', 'RIGHT_ANKLE')
        nose_to_shoulder_mid = self._calculate_vector(df, 'NOSE', 'LEFT_SHOULDER')
        
        # Calculate angles
        angles_df['left_knee_angle'] = self._angle_between_vectors(hip_to_knee_left, knee_to_ankle_left)
        angles_df['right_knee_angle'] = self._angle_between_vectors(hip_to_knee_right, knee_to_ankle_right)
        angles_df['left_hip_angle'] = self._angle_between_vectors(shoulder_to_hip_left, hip_to_knee_left)
        angles_df['right_hip_angle'] = self._angle_between_vectors(shoulder_to_hip_right, hip_to_knee_right)
        
        # Back angles (relative to vertical)
        vertical = np.zeros_like(shoulder_to_hip_left)
        vertical[:, 1] = 1  # Y axis is usually vertical in pose estimation
        angles_df['torso_vertical_angle'] = self._angle_between_vectors(shoulder_to_hip_left, vertical)
        
        # Head angle relative to torso
        angles_df['head_torso_angle'] = self._angle_between_vectors(nose_to_shoulder_mid, shoulder_to_hip_left)
        
        # Knee distance (for detecting knee valgus/varus)
        hip_width = np.sqrt(
            (df['RIGHT_HIP_x'] - df['LEFT_HIP_x'])**2 + 
            (df['RIGHT_HIP_z'] - df['LEFT_HIP_z'])**2
        )
        knee_distance = np.sqrt(
            (df['RIGHT_KNEE_x'] - df['LEFT_KNEE_x'])**2 + 
            (df['RIGHT_KNEE_z'] - df['LEFT_KNEE_z'])**2
        )
        angles_df['knee_distance_normalized'] = knee_distance / hip_width
        
        # Foot positioning
        ankle_distance = np.sqrt(
            (df['RIGHT_ANKLE_x'] - df['LEFT_ANKLE_x'])**2 + 
            (df['RIGHT_ANKLE_z'] - df['LEFT_ANKLE_z'])**2
        )
        angles_df['ankle_distance_normalized'] = ankle_distance / hip_width
        
        # Squat depth - hip height relative to knee height
        angles_df['left_squat_depth'] = df['LEFT_HIP_y'] - df['LEFT_KNEE_y']
        angles_df['right_squat_depth'] = df['RIGHT_HIP_y'] - df['RIGHT_KNEE_y']
        
        return angles_df

    def _add_velocity_features(self, current_features, prev_features=None, fps=30):
        """Add velocity features based on frame-to-frame changes"""
        velocity_features = {}
        
        if prev_features is None:
            # No previous frame, set velocities to 0
            for column in current_features.index:
                velocity_features[f'{column}_velocity'] = 0
        else:
            # Calculate frame-to-frame changes
            for column in current_features.index:
                velocity_features[f'{column}_velocity'] = (current_features[column] - prev_features[column]) * fps
                
        # Combine with current features
        all_features = {}
        for column in current_features.index:
            all_features[column] = current_features[column]
        all_features.update(velocity_features)
        
        return pd.Series(all_features)

    def _process_frame(self, frame):
        """Process a single frame and extract features"""
        # Convert to RGB for MediaPipe
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Process the image
        results = self.pose.process(image_rgb)
        
        # If no pose detected, return None
        if not results.pose_landmarks:
            return None, frame
        
        # Draw pose landmarks on the image
        annotated_image = frame.copy()
        self.mp_drawing.draw_landmarks(
            annotated_image,
            results.pose_landmarks,
            self.mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
        )
        
        # Convert landmarks to keypoints dictionary
        keypoints_dict = self._landmarks_to_keypoints_dict(results.pose_landmarks)
        
        # Convert to DataFrame
        keypoints_df = self._keypoints_dict_to_df(keypoints_dict)
        
        # Extract angles
        angles_df = self._extract_anatomical_angles(keypoints_df)
        
        # Get the first row as a Series
        feature_series = angles_df.iloc[0]
        
        # Add velocity features if we have previous features
        if len(self.features_buffer) > 0:
            prev_features = self.features_buffer[-1]
            feature_series = self._add_velocity_features(feature_series, prev_features)
        else:
            # No previous features, add zero velocities
            feature_series = self._add_velocity_features(feature_series)
        
        # Add to buffer
        self.features_buffer.append(feature_series)
        
        return feature_series, annotated_image

    def _make_prediction(self):
        """Make a prediction using the current feature buffer"""
        if len(self.features_buffer) < self.window_size:
            return None, 0.0
        
        # Prepare features for model input
        features_array = np.array([feature[self.feature_names] for feature in self.features_buffer])
        
        # Apply scaler transformation (same as in training)
        normalized_features = self.scaler.transform(features_array)
        
        # Reshape for model input [batch_size=1, window_size, num_features]
        model_input = normalized_features.reshape(1, self.window_size, len(self.feature_names))
        
        # Get prediction
        prediction_probs = self.model.predict(model_input, verbose=0)[0]
        predicted_class_idx = np.argmax(prediction_probs)
        confidence = prediction_probs[predicted_class_idx]
        
        # Get class name
        predicted_class = self.label_encoder.classes_[predicted_class_idx]
        
        return predicted_class, confidence

    def _smooth_predictions(self, new_prediction, new_confidence):
        """Smooth predictions to avoid flickering"""
        if new_prediction is None:
            return self.current_prediction, self.prediction_confidence
        
        # Add to recent predictions
        self.last_predictions.append((new_prediction, new_confidence))
        
        # Count occurrences of each prediction
        prediction_counts = {}
        total_confidence = {}
        
        for pred, conf in self.last_predictions:
            if pred not in prediction_counts:
                prediction_counts[pred] = 0
                total_confidence[pred] = 0
            
            prediction_counts[pred] += 1
            total_confidence[pred] += conf
        
        # Get the most common prediction
        if prediction_counts:
            most_common = max(prediction_counts.items(), key=lambda x: x[1])[0]
            avg_confidence = total_confidence[most_common] / prediction_counts[most_common]
            return most_common, avg_confidence
        
        return None, 0.0



    def _update_rep_count(self, current_depth):
        """Update squat state and count reps based on squat depth"""
        # Use average of left and right squat depth for consistency
        avg_depth = (current_depth['left_squat_depth'] + current_depth['right_squat_depth']) / 2
        self.depth_history.append(avg_depth)

        # Update min and max depths dynamically
        if len(self.depth_history) == self.depth_history.maxlen:
            current_min = min(self.depth_history)
            current_max = max(self.depth_history)
            
            if self.min_depth is None or current_min < self.min_depth:
                self.min_depth = current_min
            if self.max_depth is None or current_max > self.max_depth:
                self.max_depth = current_max

            # Calculate dynamic threshold
            if self.min_depth is not None and self.max_depth is not None:
                depth_range = self.max_depth - self.min_depth
                threshold = self.min_depth + (depth_range * self.depth_threshold_factor)

                # State transitions
                if self.state == 'STANDING' and avg_depth < threshold:
                    self.state = 'SQUATTING'
                elif self.state == 'SQUATTING' and avg_depth > threshold:
                    self.state = 'STANDING'
                    self.rep_count += 1  # Count a rep when returning to standing

    def _update_error_counts(self, prediction):
        """Update the count of the current prediction/error"""
        if prediction in self.error_counts:
            self.error_counts[prediction] += 1



    
    def draw_feedback(self, image, prediction, confidence):
        h, w, _ = image.shape
        
        # Background for text (make it larger to fit rep counter)
        cv2.rectangle(image, (0, h-140), (w, h), (0, 0, 0), -1)
        
        # Draw form feedback (existing code)
        if prediction is None:
            cv2.putText(image, "Getting ready...", (10, h-100), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        else:
            # Determine text color based on prediction
            if prediction == 'good':
                text_color = (0, 255, 0)  # Green for good form
            else:
                text_color = (0, 0, 255)  # Red for errors
            
            # Add prediction and confidence
            cv2.putText(image, f"Form: {prediction}", (10, h-100), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, text_color, 2)
            cv2.putText(image, f"Confidence: {confidence:.2f}", (10, h-70), 
                        cv2.FONT_HERSHEY_SIMPLEX, 1, text_color, 2)
            
            # Add explanation for errors
            if prediction in self.error_explanations:
                explanation = self.error_explanations[prediction]
                cv2.putText(image, explanation, (w//4, 30), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, text_color, 2)





        # Add rep count display
        cv2.putText(image, f"Reps: {self.rep_count}", (10, h-40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        
        return image




    def generate_report(self):
        """Generate a report summarizing reps and error occurrences"""
        report = f"Squat Analysis Report - {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
        report += "=" * 50 + "\n"
        report += f"Total Reps Performed: {self.rep_count}\n\n"
        report += "Form Analysis:\n"
        report += "-" * 20 + "\n"
        
        total_frames_with_prediction = sum(self.error_counts.values())
        if total_frames_with_prediction > 0:
            for error, count in self.error_counts.items():
                percentage = (count / total_frames_with_prediction) * 100
                explanation = self.error_explanations.get(error, "No explanation available")
                report += f"{error}: {count} occurrences ({percentage:.1f}%)\n"
                report += f"  - {explanation}\n"
        else:
            report += "No form predictions recorded.\n"
        
        report += "=" * 50
        return report

    def reset_counters(self):
        """Reset rep count and error counts"""
        self.rep_count = 0
        self.state = 'STANDING'
        self.depth_history.clear()
        self.min_depth = None
        self.max_depth = None
        for error in self.error_counts:
            self.error_counts[error] = 0
        print("Counters reset")




        


    def rescale_frame(self, frame, scale_percent=50):
        """
        Rescale the input frame to improve processing speed.
        
        Args:
            frame: Input frame to be rescaled
            scale_percent: Percentage of original size (default: 50%)
            
        Returns:
            Rescaled frame
        """
        width = int(frame.shape[1] * scale_percent / 100)
        height = int(frame.shape[0] * scale_percent / 100)
        dim = (width, height)
        
        # Resize image
        resized = cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)
        return resized

        
    def process_video(self, input_source=0, output_path=None):
        """Process video from webcam or file"""
        # Open video capture
        if isinstance(input_source, int) or input_source.isdigit():
            input_source = int(input_source)
            print(f"Opening webcam {input_source}")
        else:
            print(f"Opening video file: {input_source}")
            
        cap = cv2.VideoCapture(input_source)
        
        # Check if video opened successfully
        if not cap.isOpened():
            print(f"Error: Could not open video source {input_source}")
            return
        
        # Get video properties
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps <= 0:
            fps = 30  # Default FPS if not available
        
        # Create video writer if output path is specified
        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
        
        # Initialize variables
        frame_count = 0
        processing_times = []
        
        try:
            while cap.isOpened():
                # Measure processing time
                start_time = time.time()

                # Inside the while cap.isOpened() loop:
                success, frame = cap.read()
                if not success:
                    print("End of video")
                    break
                
                # Rescale frame for processing (but keep original for display/output)
                original_frame = frame.copy()
                processing_frame = self.rescale_frame(frame, scale_percent=50)  # Adjust percentage as needed
                
                # Process the smaller frame
                features, annotated_frame = self._process_frame(processing_frame)
                
                # Scale the annotated frame back to original size for display
                if annotated_frame is not None:
                    annotated_frame = cv2.resize(annotated_frame, (frame_width, frame_height), 
                                                interpolation=cv2.INTER_LINEAR)
                
                # If no pose detected
                if features is None:
                    cv2.putText(frame, "No pose detected", (10, 30),
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                    if output_path:
                        out.write(frame)
                    cv2.imshow('Squat Form Analysis', frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                    continue
                
                # Make prediction if enough frames collected
                if len(self.features_buffer) >= self.window_size:
                    new_prediction, new_confidence = self._make_prediction()
                    self.current_prediction, self.prediction_confidence = self._smooth_predictions(
                        new_prediction, new_confidence)
                    if self.current_prediction is not None:
                        self._update_error_counts(self.current_prediction)

                # Update rep count with current features
                self._update_rep_count(features)
                
                # Draw feedback on frame
                result_frame = self.draw_feedback(annotated_frame, self.current_prediction, self.prediction_confidence)
                
                # Calculate FPS
                processing_time = time.time() - start_time
                processing_times.append(processing_time)
                
                # Only average the last 30 frames for FPS calculation
                if len(processing_times) > 30:
                    processing_times.pop(0)
                
                avg_processing_time = sum(processing_times) / len(processing_times)
                fps_text = f"FPS: {1/avg_processing_time:.1f}"
                cv2.putText(result_frame, fps_text, (frame_width - 120, 30),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                
                # Write frame to output video if specified
                if output_path:
                    out.write(result_frame)
                
                # Display frame
                cv2.imshow('Squat Form Analysis', result_frame)
                
                # Exit on 'q' press
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                
                frame_count += 1
                
        except Exception as e:
            print(f"Error during video processing: {e}")
        finally:
            # Release resources
            cap.release()
            if output_path:
                out.release()
            cv2.destroyAllWindows()
            
            print(f"Processed {frame_count} frames")
            if processing_times:
                print(f"Average processing time: {sum(processing_times)/len(processing_times):.4f} seconds per frame")

            # Print report at the end
            print("\n" + self.generate_report())
            with open('report.txt', 'w') as f: f.write(self.generate_report())


## Function to run the class

In [62]:
def run_squat_analyzer_jupyter(model_path='best_squat_model.keras',
                              scaler_path='preprocessed_data_scaler.joblib',
                              label_encoder_path='preprocessed_data_label_encoder.joblib',
                              input_source=0,
                              output_path=None,
                              window_size=30):
    """
    Run the squat analyzer from a Jupyter notebook
    
    Parameters:
    ----------
    model_path : str
        Path to the trained model file
    scaler_path : str
        Path to the saved scaler
    label_encoder_path : str
        Path to the saved label encoder
    input_source : int or str
        Camera index (0 for default webcam) or path to video file
    output_path : str or None
        Path to save output video (None for no saving)
    window_size : int
        Size of the sliding window for analysis
        
    Returns:
    -------
    analyzer : SquatAnalyzer
        The initialized analyzer object for potential reuse
    """
    # Initialize the analyzer
    print("Initializing Squat Analyzer...")
    analyzer = SquatAnalyzer(
        model_path=model_path,
        scaler_path=scaler_path,
        label_encoder_path=label_encoder_path,
        window_size=window_size
    )
    
    # Process video
    print(f"Starting video processing from source: {input_source}")
    analyzer.process_video(input_source=input_source, output_path=output_path)
    
    return analyzer


## Testing 

In [61]:
analyzer = run_squat_analyzer_jupyter(
    model_path='./best_squat_model.keras',
    scaler_path='./preprocessed_data_scaler.joblib',
    label_encoder_path='./preprocessed_data_label_encoder.joblib',
    input_source='./squat_video.mp4',  # Use video
    #input_source=0,  # Use webcam
    output_path='squat_analysis_output.mp4'  # Optional output file
)

Initializing Squat Analyzer...


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Squat Analyzer initialized successfully
Starting video processing from source: ./squat_video.mp4
Opening video file: ./squat_video.mp4




End of video
Processed 300 frames
Average processing time: 0.1171 seconds per frame

Squat Analysis Report - 2025-03-16 07:23:23
Total Reps Performed: 5

Form Analysis:
--------------------
bad_back_round: 0 occurrences (0.0%)
  - Your back is rounding. Keep your spine neutral.
bad_back_warp: 0 occurrences (0.0%)
  - Your back is excessively arched. Maintain a neutral spine.
bad_head: 28 occurrences (10.3%)
  - Head position incorrect. Look slightly downward, keeping your neck aligned with your spine.
bad_inner_thigh: 15 occurrences (5.5%)
  - Knees collapsing inward. Keep knees aligned with toes.
bad_shallow: 0 occurrences (0.0%)
  - Squat is too shallow. Try to go deeper with proper form.
bad_toe: 0 occurrences (0.0%)
  - Foot positioning issue. Keep feet shoulder-width apart with toes slightly turned out.
good: 228 occurrences (84.1%)
  - Good form! Keep it up.
