In [3]:
%pip install opencv-python
%pip install mediapipe 

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [None]:
import cv2 
import mediapipe as mp 
import numpy as np 
import matplotlib.pyplot as plt 
import time
import platform 
from collections import deque

#Mediapip setup
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

class MediaPipeKineticEnergyAnalyzer:
      def __init__(self, window_size=10):
            self.window_size = window_size
            self.mp_pose = mp.solutions.pose
            
            # Get landmark names and indices dynamically from MediaPipe
            self.landmark_names = [landmark.name for landmark in self.mp_pose.PoseLandmark]
            self.num_landmarks = len(self.landmark_names)
            
            # Create dynamic landmark subsets based on MediaPipe's pose model
            self.landmark_subsets = self._create_dynamic_subsets()
            
            # Initialize tracking arrays
            self.landmark_positions = [deque(maxlen=window_size) for _ in range(self.num_landmarks)]
            self.current_gesture_energies = {subset: 0.0 for subset in self.landmark_subsets.keys()}
            self.gesture_active = False
            self.gesture_energies = []
            self.velocities = [ [] for _ in range(self.num_landmarks)]
            self.kinetic_energy = {subset: 0.0 for subset in self.landmark_subsets.keys()}
            self.last_timestamps = [None for _ in range(self.num_landmarks)]
            
            # Assuming each landmark has equal mass (can be modified if needed)
            self.mass = 1.0
            
            print(f"Initialized analyzer with {self.num_landmarks} landmarks")
            print(f"Available body parts: {list(self.landmark_subsets.keys())}")
            
      def _create_dynamic_subsets(self):
            self.landmark_indices = {name:idx for idx,name in enumerate(self.landmark_names)}
            subsets = {}
            subsets['whole_body'] = list(range(self.num_landmarks))
     
            #Upper body landmarks       
            upper_body_landmarks = []
            for name in self.landmark_names:
                  if any(part in name.lower() for part in ['shoulder','neck', 'chest', 'elbow', 'wrist', 'thumb', 'index', 'middle', 'ring', 'pinky']):
                        upper_body_landmarks.append(self.landmark_indices[name])
            if upper_body_landmarks:
                  subsets['upper_body'] = upper_body_landmarks
            #Lower body landmarks
            lower_body_landmarks = []
            for name in self.landmark_names:
                  if any(part in name.lower() for part in ['hip', 'knee', 'ankle', 'heel', 'foot']):
                        lower_body_landmarks.append(self.landmark_indices[name])
            if lower_body_landmarks:
                  subsets['lower_body'] = lower_body_landmarks
            #right arm landmarks
            right_arm_landmarks = []
            for name in self.landmark_names:
                  if  'right' in name.lower() and any(part in name.lower() for part in ['shoulder', 'elbow', 'wrist', 'thumb', 'index', 'middle', 'ring', 'pinky']):    
                        right_arm_landmarks.append(self.landmark_indices[name])
            if right_arm_landmarks:
                  subsets['right_arm'] = right_arm_landmarks
            #left arm landmarks
            left_arm_landmarks = []
            for name in self.landmark_names:
                  if  'left' in name.lower() and any(part in name.lower() for part in ['shoulder', 'elbow', 'wrist', 'thumb', 'index', 'middle', 'ring', 'pinky']):    
                        left_arm_landmarks.append(self.landmark_indices[name])
            if left_arm_landmarks:
                  subsets['left_arm'] = left_arm_landmarks
            #right leg landmarks
            right_leg_landmarks = []      
            for name in self.landmark_names:
                  if  'right' in name.lower() and any(part in name.lower() for part in ['hip', 'knee', 'ankle', 'heel', 'foot']):    
                        right_leg_landmarks.append(self.landmark_indices[name])
            if right_leg_landmarks:
                  subsets['right_leg'] = right_leg_landmarks
            #left leg landmarks
            left_leg_landmarks = []
            for name in self.landmark_names:
                  if  'left' in name.lower() and any(part in name.lower() for part in ['hip', 'knee', 'ankle', 'heel', 'foot']):    
                        left_leg_landmarks.append(self.landmark_indices[name])
            if left_leg_landmarks:
                  subsets['left_leg'] = left_leg_landmarks
            
            return subsets
      def get_landmark_info(self):
            info ={
                  'total_landmarks': self.num_landmarks,
                  'landmark_names': self.landmark_names,
                  'subsets': {name: len(indices) for name, indices in self.landmark_subsets.items()},
            }  
            return info 
      def start_gesture(self):
            
            """Mark the beginning of a gesture"""
            self.gesture_active = True
            self.current_gesture_energies = {subset: 0.0 for subset in self.landmark_subsets.keys()}
   
      def update_positions(self, landmarks, timestamp):
            """
            Update landmark positions and calculate velocities
            """
            for i, landmark in enumerate(landmarks.landmark):
                  if i >= self.num_landmarks:
                        break  # Safety check
                  
                  position= np.array([landmark.x, landmark.y, landmark.z])
                  
                  # If this is a valid position (not NaN) and landmark is visible
                  if not np.isnan(position).any() and landmark.visibility > 0.5:
                        # Add position to history
                        self.landmark_position[i].append(position)
                        
                        # Calculate velocity if we have previous position
                        if self.last_timestamps[i] is not None and len(self.landmark_positions[i]) >= 2:
                              time_delta = timestamp - self.last_timestamps[i]
                              if time_delta > 0:
                                    prev_position = self.landmark_positions [i][-2][0]
                                    velocity = (position - prev_position) / time_delta
                                    self.velocities[i].append(velocity)
                        
                        # Update timestamp
                        self.last_timestamps[i] = timestamp
            
      def compute_kinetic_energy(self): 
            """
            Calculate kinetic energy for each subset of landmarks
            """
            # Reset kinetic energies
            for subset in self.landmark_subsets.keys():
                  self.kinetic_energy[subset] = 0.0
            
            # Compute kinetic energy for each landmark
            landmark_ke = [0.0]  * self.num_landmarks
            
            for i in range(self.num_landmarks):
                  if len(self.velocities[i])> 0:
                        # Use the most recent velocity
                        v = self.velocities [i][-1]     
                        # KE = 0.5 * m * |v|^2
                        ke = 0.5 * self.mass * np.sum(v**2)
                        landmark_ke[i] = ke   
                        
            # Accumulate kinetic energy for each subset
            for subset_name, indices in self.landmark_subsets.items():
                  self.kinetic_energy[subset_name] = sum(landmark_ke[i] for i in indices if i < self.num_landmarks)
                  
            return self.kinetic_energy
      
      def update_gesture_energy(self):
            """Update the energy for the current gesture"""
            if self.gesture_active:
                  for subset in self.landmark_subsets.keys():
                        self.current_gesture_energies[subset] += self.kinetic_energy[subset]
      
      def analyze_distribution(self, energies = None):
            """Analyze the distribution of kinetic energy among body parts"""
            if energies is None:
                  energies = self.kinetic_energy
                  
            # Skip whole_body as it's the sum of all others
            relevant_subsets = {name for name in self.landmark_subsets.keys() if name != 'whole_body'}
            
            # If whole_body energy is very small, return early
            if energies['whole_body'] < 1e-6:
                  return {subset: 0 for subset in relevant_subsets}, "No significant movement detected"
            
            # Calculate percentages
            total_energy = energies.get('whole_body', 1e-6) # Avoid division by zero
            if percentages:
                  percentages = {subset: (energies.get(subset, 0)/ total_energy) * 100
                           for subset in relevant_subsets}
            
                  # Determine if any part is moving significantly more
                  treshold = 30  # If a part has more than 30% of total energy
                  max_subset= max(percentages.items(), key=lambda x: x[1])
                  if max_subset[1] > treshold:
                        message = f"The {max_subset[0].replace('_', ' ')} is moving significantly"
                  else:
                        message = "Movement is distributed across multiple body parts"
            else:
                  message = "No significant movement detected"
            return percentages, message
      
def analyze_webcam_dynamic(duration=30, show_video=True, detection_threshold=0.5, gesture_detection_threshold=0.1):
    """
    Analyze body movement from webcam using dynamic MediaPipe landmark detection
    """
    print("Starting dynamic MediaPipe analysis...")
    
    # Initialize camera
    is_macos = platform.system()== 'Darwin'
    if is_macos:
          print("Running on macOS - using macOS-specific camera handling")
    max_attempts = 3
    for attempt in range(max_attempts):
            try:
                  print(f"Camera initialization attempt {attempt+1}/{max_attempts}")
                  cap = cv2.VideoCapture(0)
                  time.sleep(1)
                  
                  if not cap.isOpened():
                        print("Failed to open camera - retrying...")
                        cap.release()
                        time.sleep(1)
                        continue
                  
                  ret, test_frame = cap.read()
                  if not ret:
                        print("Camera opened but failed to read test frame - retrying...")
                        cap.release()
                        time.sleep(1)
                        continue
                  
                  print(f"Camera initialized successfully! Resolution: {test_frame.shape[1]}x{test_frame.shape[0]}")
                  break    
            except Exception as e:
                        
                  print(f"Error during camera initialization: {e}")
                  if 'cap' in locals() and cap is not None:
                        cap.release()
                        time.sleep(1)
    else:
            print("ERROR: Could not initialize camera after multiple attempts.")
            return None, {}, "Camera initialization failed"
    # Setup analyzer and MediaPipe Pose
    analyzer= MediaPipeKineticEnergyAnalyzer()
    # Print landmark information
    info = analyzer.get_landmark_info()
    print(f"Detected {info['total_landmarks']} landmarks")
    print(f"Creaed {len(info['subsets'])} body part subsets")
    with mp_pose.Pose(
          min_detection_confidence=detection_threshold,
          min_tracking_confidence=detection_threshold,
          model_complexity=1) as pose:
          
          start_time = time.time()   # Start time for duration tracking
          frame_count = 0  # Frame counter for periodic updates
          in_gesture = False # Track if we are currently in a gesture
          
          print(f"Starting webcam analysis for {duration} seconds...")
          print("Press ESC to stop early")
          
          analyzer.start_gesture()  # Start gesture tracking
          while True:
            current_time = time.time()
            elapsed_time = current_time - start_time
            
            if elapsed_time > duration:
                break
                
            success, image = cap.read()
            if not success:
                print("Failed to read frame - skipping")
                continue
            #Process the frame
            image.flags.writeable = False
            image= cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            results = pose.process(image)
            
            if results.pose_landmarks:
                  timestamp = current_time
                  analyzer.update_positions(results.pose_landmarks, timestamp)
                  energy = analyzer.compute_kinetic_energy()
                  analyzer.update_gesture_energy()
                  
                  #Gesture detection 
                  whole_body_energy = energy.get('whole_body', 0)
                  if whole_body_energy > gesture_detection_threshold and not in_gesture:
                        in_gesture = True 
                        print("Gesture started!")
                        
                  elif whole_body_energy < gesture_detection_threshold and in_gesture:
                        in_gesture = False
                        print("Gesture ended!")
                  
                  # Display updates every 10 frames
                  if frame_count % 10 == 0:
                        percentages, message = analyzer.analyze_distribution(energy)
                        
                  