In [1]:
import cv2
import numpy as np
import os
import time
import mediapipe as mp
import tensorflow as tf
import math

# Suppress TensorFlow warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = "3"
tf.get_logger().setLevel("ERROR")

# Initialize MediaPipe
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

In [2]:
class ExerciseRecognizer:
    def __init__(self, model_path, actions, sequence_length=30, confidence_threshold=0.5):
        """
        Initialize the Exercise Recognition system
        
        Args:
            model_path: Path to the trained .h5 model file
            actions: List of exercise action names (must match training order)
            sequence_length: Number of frames to analyze (must match training)
            confidence_threshold: Minimum confidence to classify an action
        """
        self.model_path = model_path
        self.actions = np.array(actions)
        self.sequence_length = sequence_length
        self.threshold = confidence_threshold
        
        # Load the trained model
        print(f"Loading model from: {model_path}")
        self.model = tf.keras.models.load_model(model_path)
        print("Model loaded successfully!")
        
        # Initialize tracking variables
        self.sequence = []
        self.predictions = []
        self.current_action = ''
        self.current_confidence = 0.0
        
        # Initialize rep counters
        self.reset_counters()
        
        # Colors for visualization
        self.colors = [(245,117,16), (117,245,16), (16,117,245), (200,100,200), (100,200,100)]
        while len(self.colors) < len(self.actions):
            self.colors.append((np.random.randint(0,255), np.random.randint(0,255), np.random.randint(0,255)))
    
    def reset_counters(self):
        """Reset all exercise counters"""
        self.curl_counter = 0
        self.squat_counter = 0
        self.plank_counter = 0
        self.pushup_counter = 0
        
        self.curl_stage = None
        self.squat_stage = None
        self.plank_stage = None
        self.pushup_stage = None
        
        self.plank_start_time = 0
        self.last_plank_end = 0
    
    def mediapipe_detection(self, image, model):
        """Detect pose landmarks using MediaPipe"""
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = model.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        return image, results
    
    def draw_landmarks(self, image, results):
        """Draw pose landmarks on the image"""
        if results.pose_landmarks:
            mp_drawing.draw_landmarks(
                image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
            )
    
    def extract_keypoints(self, results):
        """Extract keypoints from MediaPipe results"""
        if results.pose_landmarks:
            pose = np.array([[res.x, res.y, res.z, res.visibility] 
                           for res in results.pose_landmarks.landmark]).flatten()
        else:
            pose = np.zeros(33*4)
        return pose
    
    def calculate_angle(self, a, b, c):
        """Calculate angle between three points"""
        a = np.array(a)
        b = np.array(b)
        c = np.array(c)
        
        radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
        angle = np.abs(radians*180.0/np.pi)
        
        if angle > 180.0:
            angle = 360 - angle
            
        return angle
    
    def get_coordinates(self, landmarks, side, joint):
        """Get x,y coordinates of a specific joint"""
        coord = getattr(mp_pose.PoseLandmark, f"{side.upper()}_{joint.upper()}")
        x_coord = landmarks[coord.value].x
        y_coord = landmarks[coord.value].y
        return [x_coord, y_coord]
    
    def viz_joint_angle(self, image, angle, joint):
        """Display joint angle on the image"""
        height, width = image.shape[:2]
        cv2.putText(image, str(int(angle)), 
                   tuple(np.multiply(joint, [width, height]).astype(int)), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, cv2.LINE_AA)
    
    def count_reps(self, image, current_action, landmarks):
        """
        Count repetitions based on the recognized action and pose landmarks
        Improved logic to better match model predictions
        """
        if not landmarks:
            return
            
        try:
            # Only count reps if we have high confidence in the action
            if self.current_confidence < 0.7:
                return
                
            if current_action == 'curl':
                self.count_curl_reps(image, landmarks)
            elif current_action == 'squat':
                self.count_squat_reps(image, landmarks)
            elif current_action == 'plank':
                self.count_plank_reps(image, landmarks)
            elif current_action == 'pushup':
                self.count_pushup_reps(image, landmarks)
                
        except Exception as e:
            print(f"Error in rep counting: {e}")
    
    def count_curl_reps(self, image, landmarks):
        """Count bicep curl repetitions"""
        # Get coordinates for bicep curl (use the more prominent arm)
        try:
            # Try left arm first
            l_shoulder = self.get_coordinates(landmarks, 'left', 'shoulder')
            l_elbow = self.get_coordinates(landmarks, 'left', 'elbow')
            l_wrist = self.get_coordinates(landmarks, 'left', 'wrist')
            
            # Try right arm
            r_shoulder = self.get_coordinates(landmarks, 'right', 'shoulder')
            r_elbow = self.get_coordinates(landmarks, 'right', 'elbow')
            r_wrist = self.get_coordinates(landmarks, 'right', 'wrist')
            
            # Calculate both angles
            l_angle = self.calculate_angle(l_shoulder, l_elbow, l_wrist)
            r_angle = self.calculate_angle(r_shoulder, r_elbow, r_wrist)
            
            # Use the arm with more movement (smaller angle indicates more curl)
            if l_angle < r_angle:
                angle = l_angle
                elbow = l_elbow
            else:
                angle = r_angle
                elbow = r_elbow
            
            # Curl detection logic
            if angle < 50:  # Curled up position
                self.curl_stage = "up"
            elif angle > 120 and self.curl_stage == 'up':  # Extended position
                self.curl_stage = "down"
                self.curl_counter += 1
            
            # Visualize
            self.viz_joint_angle(image, angle, elbow)
            
        except Exception as e:
            print(f"Error in curl counting: {e}")
    
    def count_squat_reps(self, image, landmarks):
        """Count squat repetitions"""
        try:
            # Get coordinates for squat analysis
            l_hip = self.get_coordinates(landmarks, 'left', 'hip')
            l_knee = self.get_coordinates(landmarks, 'left', 'knee')
            l_ankle = self.get_coordinates(landmarks, 'left', 'ankle')
            
            r_hip = self.get_coordinates(landmarks, 'right', 'hip')
            r_knee = self.get_coordinates(landmarks, 'right', 'knee')
            r_ankle = self.get_coordinates(landmarks, 'right', 'ankle')
            
            # Calculate knee angles
            l_angle = self.calculate_angle(l_hip, l_knee, l_ankle)
            r_angle = self.calculate_angle(r_hip, r_knee, r_ankle)
            avg_angle = (l_angle + r_angle) / 2
            
            # Squat detection logic
            if avg_angle < 100:  # Squatting down
                self.squat_stage = "down"
            elif avg_angle > 150 and self.squat_stage == 'down':  # Standing up
                self.squat_stage = "up"
                self.squat_counter += 1
            
            # Visualize
            self.viz_joint_angle(image, l_angle, l_knee)
            self.viz_joint_angle(image, r_angle, r_knee)
            
        except Exception as e:
            print(f"Error in squat counting: {e}")
    
    def count_plank_reps(self, image, landmarks):
        """Count plank hold time"""
        try:
            # Get coordinates for plank analysis
            l_shoulder = self.get_coordinates(landmarks, 'left', 'shoulder')
            l_hip = self.get_coordinates(landmarks, 'left', 'hip')
            l_knee = self.get_coordinates(landmarks, 'left', 'knee')
            l_ankle = self.get_coordinates(landmarks, 'left', 'ankle')
            
            # Calculate body alignment
            shoulder_hip_angle = self.calculate_angle(l_shoulder, l_hip, l_knee)
            hip_knee_angle = self.calculate_angle(l_hip, l_knee, l_ankle)
            
            # Check if body is straight (good plank form)
            body_straight = shoulder_hip_angle > 150 and hip_knee_angle > 150
            
            # Check if in horizontal position
            horizontal = abs(l_shoulder[1] - l_hip[1]) < 0.15
            
            current_time = time.time()
            
            if body_straight and horizontal:
                if self.plank_stage != "holding":
                    self.plank_stage = "holding"
                    self.plank_start_time = current_time
            else:
                if self.plank_stage == "holding":
                    hold_duration = current_time - self.plank_start_time
                    if hold_duration > 2:  # Minimum 2 seconds to count
                        self.plank_counter += 1
                    self.plank_stage = "not_holding"
                    self.last_plank_end = current_time
            
            # Display timer
            if self.plank_stage == "holding":
                hold_time = current_time - self.plank_start_time
                cv2.putText(image, f'Hold: {hold_time:.1f}s', (10, 400), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            
            # Visualize angles
            self.viz_joint_angle(image, shoulder_hip_angle, l_hip)
            
        except Exception as e:
            print(f"Error in plank counting: {e}")
    
    def count_pushup_reps(self, image, landmarks):
        """Count push-up repetitions"""
        try:
            # Get coordinates for push-up analysis
            l_shoulder = self.get_coordinates(landmarks, 'left', 'shoulder')
            l_elbow = self.get_coordinates(landmarks, 'left', 'elbow')
            l_wrist = self.get_coordinates(landmarks, 'left', 'wrist')
            
            # Calculate elbow angle
            angle = self.calculate_angle(l_shoulder, l_elbow, l_wrist)
            
            # Push-up detection logic
            if angle < 70:  # Down position
                self.pushup_stage = "down"
            elif angle > 140 and self.pushup_stage == 'down':  # Up position
                self.pushup_stage = "up"
                self.pushup_counter += 1
            
            # Visualize
            self.viz_joint_angle(image, angle, l_elbow)
            
        except Exception as e:
            print(f"Error in pushup counting: {e}")
    
    def prob_viz(self, res, input_frame):
        """Visualize prediction probabilities"""
        output_frame = input_frame.copy()
        for num, prob in enumerate(res):
            if num < len(self.actions):
                cv2.rectangle(output_frame, (0, 60+num*40), (int(prob*100), 90+num*40), 
                            self.colors[num], -1)
                cv2.putText(output_frame, self.actions[num], (0, 85+num*40), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        return output_frame
    
    def process_frame(self, frame, pose_model):
        """Process a single frame and return annotated frame"""
        # Make detection
        image, results = self.mediapipe_detection(frame, pose_model)
        
        # Draw landmarks
        self.draw_landmarks(image, results)
        
        # Extract keypoints and make prediction
        keypoints = self.extract_keypoints(results)
        self.sequence.append(keypoints)
        self.sequence = self.sequence[-self.sequence_length:]
        
        if len(self.sequence) == self.sequence_length:
            # Make prediction
            res = self.model.predict(np.expand_dims(self.sequence, axis=0), verbose=0)[0]
            predicted_index = np.argmax(res)
            self.current_confidence = np.max(res)
            
            # Update current action
            if predicted_index < len(self.actions) and self.current_confidence > self.threshold:
                self.current_action = self.actions[predicted_index]
            else:
                self.current_action = ''
            
            # Visualize probabilities
            if len(res) >= len(self.actions):
                image = self.prob_viz(res[:len(self.actions)], image)
            
            # Count reps
            if results.pose_landmarks:
                self.count_reps(image, self.current_action, results.pose_landmarks.landmark)
        
        return image
    
    def process_video(self, input_video_path, output_video_path=None):
        """Process a video file"""
        print(f"Processing video: {input_video_path}")
        
        # Open video
        cap = cv2.VideoCapture(input_video_path)
        if not cap.isOpened():
            print(f"Error: Could not open video {input_video_path}")
            return
        
        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")
        
        # Setup output video
        out = None
        if output_video_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
        
        # Reset counters
        self.reset_counters()
        
        frame_count = 0
        
        # Process video
        with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                frame_count += 1
                print(f"Processing frame {frame_count}/{total_frames}", end='\r')
                
                # Process frame
                processed_frame = self.process_frame(frame, pose)
                
                # Add counter display
                cv2.putText(processed_frame, f'Curl: {self.curl_counter}', (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                cv2.putText(processed_frame, f'Squat: {self.squat_counter}', (200, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                cv2.putText(processed_frame, f'Plank: {self.plank_counter}', (420, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
                # Display current action
                action_text = f'Action: {self.current_action} ({self.current_confidence:.2f})'
                cv2.putText(processed_frame, action_text, (10, height-20), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA)
                
                # Write to output video
                if out is not None:
                    out.write(processed_frame)
                
                # Display (optional - remove for faster processing)
                cv2.imshow('Exercise Recognition', processed_frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
        # Cleanup
        cap.release()
        if out is not None:
            out.release()
        cv2.destroyAllWindows()
        
        # Print results
        print(f"\nProcessing complete!")
        print(f"Final counts:")
        print(f"  Curls: {self.curl_counter}")
        print(f"  Squats: {self.squat_counter}")
        print(f"  Planks: {self.plank_counter}")
        
        if output_video_path:
            print(f"Output video saved to: {output_video_path}")
    
    def process_webcam(self):
        """Process webcam feed in real-time"""
        print("Starting webcam processing. Press 'q' to quit, 'r' to reset counters.")
        
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            print("Error: Could not open webcam")
            return
        
        self.reset_counters()
        
        with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Process frame
                processed_frame = self.process_frame(frame, pose)
                
                # Add counter display
                cv2.putText(processed_frame, f'Curl: {self.curl_counter}', (10, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                cv2.putText(processed_frame, f'Squat: {self.squat_counter}', (200, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                cv2.putText(processed_frame, f'Plank: {self.plank_counter}', (420, 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
                # Display current action
                height, width = processed_frame.shape[:2]
                action_text = f'Action: {self.current_action} ({self.current_confidence:.2f})'
                cv2.putText(processed_frame, action_text, (10, height-20), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA)
                
                # Display
                cv2.imshow('Exercise Recognition - Webcam', processed_frame)
                
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
                elif key == ord('r'):
                    self.reset_counters()
                    print("Counters reset!")
        
        cap.release()
        cv2.destroyAllWindows()


In [13]:
def main():
    """Main function to run the exercise recognition"""
    # CONFIGURATION - Update these paths and settings
    MODEL_PATH = r"C:\Users\pheno\Downloads\work\main_model\LSTM_Attention_128HUs_5ex_2nd.h5"  # Path to your trained model
    ACTIONS = ['curl', 'lunge', 'plank', 'situp', 'squat']  # Must match your training data order
    
    # Video processing
    INPUT_VIDEO_PATH = r"C:\Users\pheno\Downloads\work\6632572755623995432.mp4"
    OUTPUT_VIDEO_PATH = "output_exercise_recognition.mp4"
    
    # Check if model exists
    if not os.path.exists(MODEL_PATH):
        print(f"Error: Model file not found: {MODEL_PATH}")
        print("Please update MODEL_PATH with the correct path to your trained model.")
        return
    
    # Initialize recognizer
    recognizer = ExerciseRecognizer(
        model_path=MODEL_PATH,
        actions=ACTIONS,
        sequence_length=30,
        confidence_threshold=0.5
    )
    
    # Choose processing mode
    print("Choose processing mode:")
    print("1. Process video file")
    print("2. Process webcam (real-time)")
    
    choice = input("Enter choice (1 or 2): ").strip()
    
    if choice == "1":
        if os.path.exists(INPUT_VIDEO_PATH):
            recognizer.process_video(INPUT_VIDEO_PATH, OUTPUT_VIDEO_PATH)
        else:
            print(f"Error: Video file not found: {INPUT_VIDEO_PATH}")
    elif choice == "2":
        recognizer.process_webcam()
    else:
        print("Invalid choice. Please run again and select 1 or 2.")


In [14]:
if __name__ == "__main__":
    main()



Loading model from: C:\Users\pheno\Downloads\work\main_model\LSTM_Attention_128HUs_5ex_2nd.h5
Model loaded successfully!
Choose processing mode:
1. Process video file
2. Process webcam (real-time)
Processing video: C:\Users\pheno\Downloads\work\6632572755623995432.mp4
Video properties: 1280x720, 29 FPS, 1348 frames
Processing frame 1016/1348
Processing complete!
Final counts:
  Curls: 0
  Squats: 0
  Planks: 0
Output video saved to: output_exercise_recognition.mp4
