In [1]:
import cv2
import numpy as np
import mediapipe as mp
import math
import argparse
from tqdm import tqdm

2025-03-29 16:39:32.856020: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1743284372.881739  366134 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1743284372.888714  366134 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1743284372.912149  366134 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743284372.912192  366134 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1743284372.912197  366134 computation_placer.cc:177] computation placer alr

In [2]:
class PoseAnalyzer:
    def __init__(self):
        # inicia mediapipe pose
        self.mp_pose = mp.solutions.pose
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions_drawing_styles
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=2,
            enable_segmentation=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )

        # Define colores
        self.colors = {
            "red":(0,0,255),
            "green":(0,255,0),
            "blue":(255,0,0),
            "yellow":(0,255,255),
            "purple":(255,0,255),
            "white":(255,255,255)
        }

        # accumulator: almacenar posiciones previas para tracking
        self.prev_positions = {}
        self.stride_distances = []
        self.step_heights = []   

In [3]:
 def calculate_angle(self, a, b, c):
        """ calcular angulo en grados entre tres puntos """
        if not all(p.visibility > 0.5 for p in  [a,b,c]):
            return None

        a_coords = np.array([a.x, a.y])
        b_coords = np.array([b.x, b.y])
        c_coords = np.array([c.x, c.y])

        ba = a_coords - b_coords
        bc = c_coords - b_coords

        cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np,linalg.norm(bc))
        angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))

        return np.degrees(angle)


In [4]:

 def calculate_distance(self, a, b):
        """ Distancia entre dos puntos """
        if not all(p.visibility > 0.5 for p in [a,b]):
            return None

        return math.sqrt((b.x - a.x)**2 + (b.y - a.y)**2)

In [5]:
def check_horizontality(self, a, b):
        """Calculate horizontality (angle from horizontal in degrees)"""
        if not all(p.visibility > 0.5 for p in [a, b]):
            return None
            
        dx = b.x - a.x
        dy = b.y - a.y
        angle = math.degrees(math.atan2(dy, dx))
        return angle

In [6]:
def calculate_vertical_lift(self, ankle, knee, hip, prev_ankle_y=None):
        """Calculate how much a leg is lifted"""
        if not all(p.visibility > 0.5 for p in [ankle, knee, hip]):
            return None, None
            
        # Current height relative to hip
        current_lift = hip.y - ankle.y
        
        # Change in height from previous frame
        lift_change = None
        if prev_ankle_y is not None:
            lift_change = prev_ankle_y - ankle.y
            
        return current_lift, lift_change

In [7]:
def calculate_stride(self, left_ankle, right_ankle, frame_height):
        """Calculate stride distance between steps"""
        if not all(p.visibility > 0.5 for p in [left_ankle, right_ankle]):
            return None
            
        # Convert to pixel coordinates
        left_pos = (int(left_ankle.x * frame_height), int(left_ankle.y * frame_height))
        right_pos = (int(right_ankle.x * frame_height), int(right_ankle.y * frame_height))
        
        # Store current positions
        current_positions = {
            "left_ankle": left_pos,
            "right_ankle": right_pos
        }
        
        # Calculate stride if we have previous positions
        stride = None
        if "left_ankle" in self.prev_positions and "right_ankle" in self.prev_positions:
            # Check which foot moved (the one with larger displacement)
            left_disp = np.linalg.norm(np.array(left_pos) - np.array(self.prev_positions["left_ankle"]))
            right_disp = np.linalg.norm(np.array(right_pos) - np.array(self.prev_positions["right_ankle"]))
            
            if max(left_disp, right_disp) > 5:  # Threshold to detect actual step
                stride = max(left_disp, right_disp)
                if stride > 5:  # Reasonable stride detected
                    self.stride_distances.append(stride)
        
        # Update previous positions
        self.prev_positions = current_positions
        return stride

In [8]:
def process_video(self, input_video, output_video=None):
        if output_video is None:
            # Create output filename by adding "_analyzed" before extension
            name_parts = input_video.rsplit('.', 1)
            output_video = f"{name_parts[0]}_analyzed.{name_parts[1]}"
        
        # Open the video file
        cap = cv2.VideoCapture(input_video)
        if not cap.isOpened():
            print(f"Error: Could not open video {input_video}")
            return
        
        # Get video properties
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Create video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video, fourcc, fps, (frame_width, frame_height))
        
        # Process frames
        prev_ankle_y = {"left": None, "right": None}
        
        for _ in tqdm(range(total_frames), desc="Processing frames"):
            ret, frame = cap.read()
            if not ret:
                break
            
            # Convert to RGB for MediaPipe
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Process the frame with MediaPipe Pose
            results = self.pose.process(rgb_frame)
            
            if results.pose_landmarks:
                # Draw the pose landmarks
                self.mp_drawing.draw_landmarks(
                    frame,
                    results.pose_landmarks,
                    self.mp_pose.POSE_CONNECTIONS,
                    landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
                )
                
                landmarks = results.pose_landmarks.landmark
                
                # Extract key points
                left_shoulder = landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value]
                right_shoulder = landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
                left_hip = landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value]
                right_hip = landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value]
                left_knee = landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value]
                right_knee = landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value]
                left_ankle = landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value]
                right_ankle = landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value]
                
                # 1. Calculate shoulder horizontality
                shoulder_angle = self.check_horizontality(left_shoulder, right_shoulder)
                if shoulder_angle is not None:
                    # Draw shoulder line
                    cv2.line(
                        frame, 
                        (int(left_shoulder.x * frame_width), int(left_shoulder.y * frame_height)),
                        (int(right_shoulder.x * frame_width), int(right_shoulder.y * frame_height)),
                        self.colors["blue"], 
                        2
                    )
                    # Add shoulder horizontality info
                    shoulder_text = f"Shoulder angle: {shoulder_angle:.1f}°"
                    cv2.putText(
                        frame, 
                        shoulder_text,
                        (10, 30), 
                        cv2.FONT_HERSHEY_SIMPLEX, 
                        0.7, 
                        self.colors["white"], 
                        2
                    )
                
                # 2. Calculate hip horizontality
                hip_angle = self.check_horizontality(left_hip, right_hip)
                if hip_angle is not None:
                    # Draw hip line
                    cv2.line(
                        frame, 
                        (int(left_hip.x * frame_width), int(left_hip.y * frame_height)),
                        (int(right_hip.x * frame_width), int(right_hip.y * frame_height)),
                        self.colors["green"], 
                        2
                    )
                    # Add hip horizontality info
                    hip_text = f"Hip angle: {hip_angle:.1f}°"
                    cv2.putText(
                        frame, 
                        hip_text,
                        (10, 60), 
                        cv2.FONT_HERSHEY_SIMPLEX, 
                        0.7, 
                        self.colors["white"], 
                        2
                    )
                
                # 3. Calculate leg lift for left leg
                left_lift, left_lift_change = self.calculate_vertical_lift(
                    left_ankle, left_knee, left_hip, prev_ankle_y["left"]
                )
                if left_lift is not None:
                    # Update previous ankle position
                    prev_ankle_y["left"] = left_ankle.y
                    # Normalize lift to frame height
                    normalized_lift = left_lift * frame_height
                    if normalized_lift > 0:
                        self.step_heights.append(normalized_lift)
                    # Add left leg lift info
                    left_lift_text = f"Left leg lift: {normalized_lift:.1f}px"
                    cv2.putText(
                        frame,
                        left_lift_text,
                        (10, 90),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        self.colors["yellow"],
                        2
                    )
                
                # 4. Calculate leg lift for right leg
                right_lift, right_lift_change = self.calculate_vertical_lift(
                    right_ankle, right_knee, right_hip, prev_ankle_y["right"]
                )
                if right_lift is not None:
                    # Update previous ankle position
                    prev_ankle_y["right"] = right_ankle.y
                    # Normalize lift to frame height
                    normalized_lift = right_lift * frame_height
                    if normalized_lift > 0:
                        self.step_heights.append(normalized_lift)
                    # Add right leg lift info
                    right_lift_text = f"Right leg lift: {normalized_lift:.1f}px"
                    cv2.putText(
                        frame,
                        right_lift_text,
                        (10, 120),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        self.colors["yellow"],
                        2
                    )
                
                # 5. Calculate stride
                stride = self.calculate_stride(left_ankle, right_ankle, frame_height)
                if stride is not None and stride > 5:  # Reasonable stride
                    # Add stride info
                    stride_text = f"Stride: {stride:.1f}px"
                    cv2.putText(
                        frame,
                        stride_text,
                        (10, 150),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        self.colors["purple"],
                        2
                    )
                
                # 6. Display average values
                if len(self.stride_distances) > 0:
                    avg_stride = sum(self.stride_distances) / len(self.stride_distances)
                    avg_stride_text = f"Avg stride: {avg_stride:.1f}px"
                    cv2.putText(
                        frame,
                        avg_stride_text,
                        (frame_width - 250, 30),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        self.colors["white"],
                        2
                    )
                
                if len(self.step_heights) > 0:
                    avg_height = sum(self.step_heights) / len(self.step_heights)
                    avg_height_text = f"Avg leg lift: {avg_height:.1f}px"
                    cv2.putText(
                        frame,
                        avg_height_text,
                        (frame_width - 250, 60),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.7,
                        self.colors["white"],
                        2
                    )
            
            # Write the frame to output video
            out.write(frame)
        
        # Release resources
        cap.release()
        out.release()
        print(f"Analysis complete. Output saved to {output_video}")
        
        # Return summary stats
        summary = {
            "avg_stride": sum(self.stride_distances) / len(self.stride_distances) if self.stride_distances else 0,
            "max_stride": max(self.stride_distances) if self.stride_distances else 0,
            "avg_leg_lift": sum(self.step_heights) / len(self.step_heights) if self.step_heights else 0,
            "max_leg_lift": max(self.step_heights) if self.step_heights else 0
        }
        return summary

In [10]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Analyze pose in a video')
    parser.add_argument('input_video', type=str, help='Path to input video file')
    parser.add_argument('--output', type=str, help='Path to output video file (optional)')
    
    args = parser.parse_args()
    
    analyzer = PoseAnalyzer()
    summary = analyzer.process_video(args.input_video, args.output)
    
    print("\nAnalysis Summary:")
    print(f"Average Stride Distance: {summary['avg_stride']:.2f} pixels")
    print(f"Maximum Stride Distance: {summary['max_stride']:.2f} pixels")
    print(f"Average Leg Lift: {summary['avg_leg_lift']:.2f} pixels")
    print(f"Maximum Leg Lift: {summary['max_leg_lift']:.2f} pixels")

usage: ipykernel_launcher.py [-h] [--output OUTPUT] input_video
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2