In [13]:
import cv2
import mediapipe as mp
import numpy as np
from ultralytics import YOLO
import time
from collections import deque
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
import json
import threading
from pathlib import Path
import logging
import csv

class PoseAnalytics:
    """Analytics component for tracking and analyzing pose detection metrics"""
    def __init__(self, save_dir='analytics'):
        self.save_dir = Path(save_dir)
        self.save_dir.mkdir(exist_ok=True)
        
        # Initialize analytics storage
        self.session_data = {
            'timestamps': [],
            'postures': [],
            'model_used': [],
            'confidence_scores': [],
            'processing_times': [],
            'fps_values': []
        }
        
        # Setup logging
        logging.basicConfig(
            filename=self.save_dir / 'pose_detection.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
        # Initialize real-time metrics
        self.current_metrics = {
            'session_start': datetime.now(),
            'poses_detected': 0,
            'model_switches': 0,
            'average_fps': 0.0
        }
    
    def update_metrics(self, posture, model, confidence, process_time, fps):
        """Update analytics with new frame data"""
        timestamp = datetime.now()
        
        # Update session data
        self.session_data['timestamps'].append(timestamp)
        self.session_data['postures'].append(posture)
        self.session_data['model_used'].append(model)
        self.session_data['confidence_scores'].append(confidence)
        self.session_data['processing_times'].append(process_time)
        self.session_data['fps_values'].append(fps)
        
        # Update real-time metrics
        self.current_metrics['poses_detected'] += 1
        self.current_metrics['average_fps'] = np.mean(self.session_data['fps_values'])
        
        # Log significant events
        if confidence < 0.5:
            logging.warning(f"Low confidence detection: {confidence:.2f}")
    
    def _create_posture_distribution_plot(self, df):
        """Create and save posture distribution plot"""
        plt.figure(figsize=(10, 6))
        posture_counts = df['postures'].value_counts()
        sns.barplot(x=posture_counts.values, y=posture_counts.index)
        plt.title('Posture Distribution')
        plt.xlabel('Count')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'posture_distribution.png')
        plt.close()

    def _create_performance_plot(self, df):
        """Create and save performance metrics plot"""
        plt.figure(figsize=(12, 6))
        plt.plot(df['timestamps'], df['fps_values'], label='FPS')
        plt.plot(df['timestamps'], df['processing_times'], label='Processing Time')
        plt.title('Performance Metrics Over Time')
        plt.xlabel('Time')
        plt.ylabel('Value')
        plt.legend()
        plt.tight_layout()
        plt.savefig(self.save_dir / 'performance_metrics.png')
        plt.close()

    def _create_confidence_plot(self, df):
        """Create and save confidence scores plot"""
        plt.figure(figsize=(10, 6))
        plt.plot(df['timestamps'], df['confidence_scores'])
        plt.title('Detection Confidence Over Time')
        plt.xlabel('Time')
        plt.ylabel('Confidence Score')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'confidence_scores.png')
        plt.close()

    def generate_analytics_report(self):
        """Generate comprehensive analytics report"""
        if not self.session_data['timestamps']:
            logging.warning("No data collected for analytics report")
            return

        # Convert data to DataFrame
        df = pd.DataFrame(self.session_data)
        
        # Generate visualizations
        self._create_posture_distribution_plot(df)
        self._create_performance_plot(df)
        self._create_confidence_plot(df)
        
        # Generate summary statistics
        summary = self._generate_summary_stats(df)
        
        # Save report
        self._save_analytics_report(df, summary)
    
    def _generate_summary_stats(self, df):
        """Generate summary statistics from the session data"""
        summary = {
            'total_frames': len(df),
            'average_fps': df['fps_values'].mean(),
            'average_confidence': df['confidence_scores'].mean(),
            'most_common_posture': df['postures'].mode().iloc[0] if not df['postures'].empty else 'None',
            'session_duration': str(datetime.now() - self.current_metrics['session_start'])
        }
        return summary

    def _save_analytics_report(self, df, summary):
        """Save analytics report to file"""
        report_path = self.save_dir / f'analytics_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
        with open(report_path, 'w') as f:
            f.write("=== Pose Detection Analytics Report ===\n\n")
            for key, value in summary.items():
                f.write(f"{key}: {value}\n")

class PoseDetector:
    def __init__(self):
        # MediaPipe and YOLO initialization
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
            model_complexity=2
        )
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        self.yolo_model = YOLO('yolov8n-pose.pt')
        
        # Initialize analytics
        self.analytics = PoseAnalytics()
        
        # Performance monitoring
        self.fps_history = deque(maxlen=30)
        self.detection_history = deque(maxlen=30)
        self.joint_consistency = deque(maxlen=30)
        
        # Model switching parameters
        self.current_model = 'mediapipe'
        self.last_switch_time = time.time()
        self.switch_cooldown = 10
        self.switch_pending = False
        
        # Initialize video recording
        self.recording = False
        self.video_writer = None
        self.output_dir = Path('output')
        self.output_dir.mkdir(exist_ok=True)
        
        # State tracking
        self.current_posture = "No pose detected"
        self.current_confidence = 0.0
        self.current_landmarks = None
        
        # Motion tracking
        self.pose_history = deque(maxlen=5)
        self.gesture_patterns = self._load_gesture_patterns()
        
        # Threshold values
        self.movement_threshold = 10
        self.shoulder_movement_threshold = 5
        self.squat_threshold = 0.25
        self.standing_hip_knee_ratio = 0.9
        self.sitting_hip_knee_ratio = 0.4
        self.arm_raised_threshold = 0.2
        
        self.current_frame_shape = None
        
    def process_frame(self, frame):
            """Process a single frame for pose detection with improved error handling"""
            try:
                start_time = time.time()
                if frame is None or frame.size == 0:
                    logging.error("Invalid frame received")
                    return frame

                self.current_frame_shape = frame.shape
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

                # Process with current model
                landmarks = None
                confidence = 0.0

                try:
                    if self.current_model == 'mediapipe':
                        results = self.pose.process(rgb_frame)
                        if results.pose_landmarks:
                            landmarks = results.pose_landmarks
                            confidence = np.mean([lm.visibility for lm in landmarks.landmark])
                    else:  # YOLO
                        results = self.yolo_model(rgb_frame)[0]
                        if len(results.keypoints) > 0:
                            # Safely extract keypoints
                            try:
                                keypoints = results.keypoints[0].data.cpu().numpy()
                                if len(keypoints) > 0:
                                    landmarks = keypoints
                                    confidence = float(results.boxes[0].conf) if len(results.boxes) > 0 else 0.0
                            except (IndexError, AttributeError) as e:
                                logging.warning(f"Error processing YOLO keypoints: {str(e)}")
                                landmarks = None
                except Exception as e:
                    logging.error(f"Error during pose detection: {str(e)}")
                    return frame

                # Calculate metrics
                process_time = time.time() - start_time
                fps = 1 / process_time if process_time > 0 else 0
                self.fps_history.append(fps)

                if landmarks is not None:
                    try:
                        # Detect posture with safe conversion
                        posture = self.detect_posture(landmarks, self.current_model)
                        self.current_posture = posture
                        self.current_confidence = confidence
                        self.current_landmarks = landmarks

                        # Calculate bounding box with error handling
                        try:
                            bbox = self.calculate_bounding_box(landmarks, self.current_model)
                            self.draw_action_box(frame, bbox, posture)
                        except Exception as e:
                            logging.warning(f"Error drawing bounding box: {str(e)}")

                        # Draw landmarks based on model type
                        if self.current_model == 'mediapipe':
                            self.mp_drawing.draw_landmarks(
                                frame,
                                landmarks,
                                self.mp_pose.POSE_CONNECTIONS,
                                landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
                            )
                        else:
                            self.draw_yolo_skeleton(frame, landmarks)

                        # Update analytics
                        self.analytics.update_metrics(
                            posture=posture,
                            model=self.current_model,
                            confidence=confidence,
                            process_time=process_time,
                            fps=fps
                        )

                        # Check for model switching
                        self._check_model_switch(confidence)

                    except Exception as e:
                        logging.error(f"Error processing landmarks: {str(e)}")

                # Draw performance metrics
                self._draw_performance_metrics(frame, fps, confidence)

                # Handle recording if active
                if self.recording and self.video_writer is not None:
                    try:
                        self.video_writer.write(frame)
                    except Exception as e:
                        logging.error(f"Error writing video frame: {str(e)}")

                return frame

            except Exception as e:
                logging.error(f"Error in process_frame: {str(e)}")
                return frame

    def draw_yolo_skeleton(self, frame, keypoints):
        """Draw YOLO skeleton on frame with improved error handling"""
        if not isinstance(keypoints, np.ndarray):
            try:
                keypoints = keypoints.cpu().numpy()
            except AttributeError:
                logging.error("Invalid keypoints format")
                return

        connections = [
            (0, 1), (1, 2), (2, 3), (3, 4),  # Right arm
            (5, 6), (6, 7), (7, 8),          # Left arm
            (9, 10), (11, 12),               # Spine
            (12, 13), (13, 14), (14, 15),    # Right leg
            (16, 17), (17, 18), (18, 19)     # Left leg
        ]

        # Ensure keypoints are within frame boundaries
        h, w = frame.shape[:2]
        
        for start, end in connections:
            try:
                if start >= len(keypoints) or end >= len(keypoints):
                    continue
                    
                x1, y1, conf1 = keypoints[start]
                x2, y2, conf2 = keypoints[end]

                # Clip coordinates to frame boundaries
                x1, y1 = np.clip(int(x1), 0, w-1), np.clip(int(y1), 0, h-1)
                x2, y2 = np.clip(int(x2), 0, w-1), np.clip(int(y2), 0, h-1)

                if conf1 > 0.5 and conf2 > 0.5:  # Only draw if confidence is sufficient
                    cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)

            except (IndexError, ValueError) as e:
                continue

        # Draw keypoints
        for kp in keypoints:
            try:
                x, y, conf = kp
                if conf > 0.5:
                    x, y = np.clip(int(x), 0, w-1), np.clip(int(y), 0, h-1)
                    cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
            except (IndexError, ValueError):
                continue


    def _check_model_switch(self, confidence):
        """Check if model switch is needed based on performance"""
        current_time = time.time()
        
        # Only consider switching if cooldown period has passed
        if current_time - self.last_switch_time < self.switch_cooldown:
            return
            
        # Switch models if confidence is consistently low
        if confidence < 0.5:
            self.detection_history.append(False)
        else:
            self.detection_history.append(True)
            
        # Calculate detection rate
        detection_rate = sum(self.detection_history) / len(self.detection_history)
        
        if detection_rate < 0.7 and not self.switch_pending:
            self.switch_pending = True
            self.current_model = 'yolo' if self.current_model == 'mediapipe' else 'mediapipe'
            self.last_switch_time = current_time
            logging.info(f"Switching to {self.current_model} model")
            self.switch_pending = False

    def _check_gestures(self):
        """Check for gesture patterns in pose history"""
        if len(self.pose_history) < 3:
            return
            
        # Convert history to list for pattern matching
        history_list = list(self.pose_history)
        
        # Check each gesture pattern
        for gesture, pattern in self.gesture_patterns.items():
            sequence = pattern['sequence']
            if len(history_list) >= len(sequence):
                # Check last n poses against pattern
                last_n_poses = history_list[-len(sequence):]
                if all(a == b for a, b in zip(last_n_poses, sequence)):
                    logging.info(f"Gesture detected: {gesture}")

    def _draw_performance_metrics(self, frame, fps, confidence):
        """Draw performance metrics on frame"""
        # Calculate average FPS
        avg_fps = np.mean(self.fps_history) if self.fps_history else 0
        
        # Draw metrics
        metrics_text = [
            f"FPS: {avg_fps:.1f}",
            f"Model: {self.current_model}",
            f"Confidence: {confidence:.2f}",
            f"Posture: {self.current_posture}"
        ]
        
        y_position = 30
        for text in metrics_text:
            cv2.putText(frame, text, (10, y_position),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            y_position += 25

    def toggle_recording(self):
        """Toggle video recording"""
        if not self.recording:
            # Start recording
            filename = self.output_dir / f'pose_recording_{datetime.now().strftime("%Y%m%d_%H%M%S")}.avi'
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            self.video_writer = cv2.VideoWriter(str(filename), fourcc, 20.0, 
                                              (self.current_frame_shape[1], self.current_frame_shape[0]))
            self.recording = True
            logging.info("Recording started")
        else:
            # Stop recording
            if self.video_writer is not None:
                self.video_writer.release()
                self.video_writer = None
            self.recording = False
            logging.info("Recording stopped")

    def save_frame(self):
        """Save current frame as image"""
        if self.current_frame_shape is not None:
            filename = self.output_dir / f'pose_frame_{datetime.now().strftime("%Y%m%d_%H%M%S")}.jpg'
            cv2.imwrite(str(filename), self.current_frame)
            logging.info(f"Frame saved: {filename}")

    def _load_gesture_patterns(self):
        """Load predefined gesture patterns"""
        return {
            'wave': {'sequence': ['standing with raised arms', 'standing'], 'threshold': 0.8},
            'squat': {'sequence': ['standing', 'squatting', 'standing'], 'threshold': 0.9}
        }

    def detect_posture(self, landmarks, model_type):
        """Detect posture from landmarks"""
        if model_type == 'mediapipe':
            return self._detect_posture_mediapipe(landmarks)
        else:
            return self._detect_posture_yolo(landmarks)

    def _detect_posture_mediapipe(self, landmarks):
        """Detect posture using MediaPipe landmarks"""
        # Get relevant landmark points
        nose = landmarks.landmark[self.mp_pose.PoseLandmark.NOSE]
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_hip = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_HIP]
        right_hip = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_HIP]
        left_knee = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_KNEE]
        right_knee = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_KNEE]
        
        # Calculate angles and ratios
        hip_knee_ratio = self._calculate_hip_knee_ratio(left_hip, right_hip, left_knee, right_knee)
        shoulder_hip_angle = self._calculate_angle(
            (left_shoulder.x, left_shoulder.y),
            (left_hip.x, left_hip.y),
            (left_knee.x, left_knee.y)
        )
        
        # Detect posture based on calculations
        if hip_knee_ratio > self.standing_hip_knee_ratio:
            if self._are_arms_raised(landmarks):
                return "standing with raised arms"
            return "standing"
        elif hip_knee_ratio < self.sitting_hip_knee_ratio:
            return "sitting"
        elif shoulder_hip_angle < 150:  # degrees
            return "squatting"
        else:
            return "undefined posture"

    def _detect_posture_yolo(self, landmarks):
        """Detect posture using YOLO landmarks"""
        # Convert YOLO format to similar structure as MediaPipe for reuse
        converted_landmarks = self._convert_yolo_to_mediapipe_format(landmarks)
        return self._detect_posture_mediapipe(converted_landmarks)

    def _convert_yolo_to_mediapipe_format(self, yolo_landmarks):
        """Convert YOLO landmark format to MediaPipe-like format"""
        # Create a simple class to mimic MediaPipe landmark structure
        class LandmarkContainer:
            def __init__(self, landmarks):
                self.landmark = landmarks

        # Convert YOLO keypoints to normalized coordinates
        converted = []
        # YOLO keypoints come as a tensor, need to convert to numpy array first
        keypoints_array = yolo_landmarks.cpu().numpy() if hasattr(yolo_landmarks, 'cpu') else yolo_landmarks
    
        if isinstance(keypoints_array, np.ndarray):
            # If it's a batch of keypoints, take the first person
            if len(keypoints_array.shape) == 3:
                keypoints_array = keypoints_array[0]
            
            for point in keypoints_array:
                if len(point) >= 2:  # Ensure we have at least x,y coordinates
                    landmark = type('Landmark', (), {
                        'x': float(point[0]) / float(self.current_frame_shape[1]),
                        'y': float(point[1]) / float(self.current_frame_shape[0]),
                        'visibility': float(point[2]) if len(point) > 2 else 1.0
                    })
                    converted.append(landmark)
        
        return LandmarkContainer(converted)

    def _calculate_hip_knee_ratio(self, left_hip, right_hip, left_knee, right_knee):
        """Calculate the ratio between hip and knee positions"""
        hip_y = (left_hip.y + right_hip.y) / 2
        knee_y = (left_knee.y + right_knee.y) / 2
        return abs(knee_y - hip_y)

    def _calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points"""
        v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
        v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])
        
        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
        return np.degrees(angle)

    def _are_arms_raised(self, landmarks):
        """Check if arms are raised above shoulders"""
        left_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_WRIST]
        right_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_WRIST]
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        
        return (left_wrist.y < left_shoulder.y - self.arm_raised_threshold or 
                right_wrist.y < right_shoulder.y - self.arm_raised_threshold)

    def calculate_bounding_box(self, landmarks, model_type):
        """Calculate bounding box for detected pose"""
        if model_type == 'mediapipe':
            points = [(lm.x, lm.y) for lm in landmarks.landmark]
        else:  # YOLO
            points = [(x/self.current_frame_shape[1], y/self.current_frame_shape[0]) 
                     for x, y, _ in landmarks]
        
        x_coords = [p[0] for p in points]
        y_coords = [p[1] for p in points]
        
        # Calculate normalized coordinates
        x_min, x_max = min(x_coords), max(x_coords)
        y_min, y_max = min(y_coords), max(y_coords)
        
        # Convert to pixel coordinates
        return {
            'x1': int(x_min * self.current_frame_shape[1]),
            'y1': int(y_min * self.current_frame_shape[0]),
            'x2': int(x_max * self.current_frame_shape[1]),
            'y2': int(y_max * self.current_frame_shape[0])
        }

    def draw_action_box(self, frame, bbox, action):
        """Draw bounding box and action label on frame"""
        # Draw bounding box
        cv2.rectangle(frame, 
                     (bbox['x1'], bbox['y1']), 
                     (bbox['x2'], bbox['y2']), 
                     (0, 255, 0), 2)
        
        # Draw action label
        label_size = cv2.getTextSize(action, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
        cv2.rectangle(frame,
                     (bbox['x1'], bbox['y1'] - label_size[1] - 10),
                     (bbox['x1'] + label_size[0], bbox['y1']),
                     (0, 255, 0), -1)
        cv2.putText(frame, action,
                   (bbox['x1'], bbox['y1'] - 5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)

def main():
    """Enhanced main function with analytics and error handling"""
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    try:
        # Initialize detector
        detector = PoseDetector()
        logging.info("Pose detector initialized successfully")
        
        # Initialize video capture
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Could not open webcam")
        
        logging.info("Video capture started")
        
        # Main processing loop
        while True:
            success, frame = cap.read()
            if not success:
                logging.warning("Failed to capture frame")
                continue
            
            try:
                # Process frame
                processed_frame = detector.process_frame(frame)
                if processed_frame is None:
                    continue
                
                # Display frame
                cv2.imshow('Pose Detection', processed_frame)
                
                # Handle keyboard input
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logging.info("User requested exit")
                    break
                elif key == ord('r'):
                    detector.toggle_recording()
                elif key == ord('s'):
                    detector.save_frame()
                
            except Exception as e:
                logging.error(f"Error processing frame: {str(e)}")
                continue
                
    except Exception as e:
        logging.error(f"Fatal error: {str(e)}")
        
    finally:
        # Cleanup
        if cap is not None:
            cap.release()
        cv2.destroyAllWindows()
        
        # Generate final analytics
        detector.analytics.generate_analytics_report()
        logging.info("Analytics report generated")

if __name__ == "__main__":
    main()

2024-11-16 23:37:47,425 - INFO - Pose detector initialized successfully
2024-11-16 23:37:50,235 - INFO - Video capture started
2024-11-16 23:37:52,722 - INFO - User requested exit
2024-11-16 23:37:53,315 - INFO - Analytics report generated


In [None]:
import cv2
import mediapipe as mp
import numpy as np
from collections import deque
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
import json
import threading
from pathlib import Path
import logging
import csv
import time

class PoseAnalytics:
    """Analytics component for tracking and analyzing pose detection metrics"""
    def __init__(self, save_dir='analytics'):
        self.save_dir = Path(save_dir)
        self.save_dir.mkdir(exist_ok=True)
        
        # Initialize analytics storage
        self.session_data = {
            'timestamps': [],
            'postures': [],
            'confidence_scores': [],
            'processing_times': [],
            'fps_values': []
        }
        
        # Setup logging
        logging.basicConfig(
            filename=self.save_dir / 'pose_detection.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
        # Initialize real-time metrics
        self.current_metrics = {
            'session_start': datetime.now(),
            'poses_detected': 0,
            'average_fps': 0.0
        }
    
    def update_metrics(self, posture, confidence, process_time, fps):
        """Update analytics with new frame data"""
        timestamp = datetime.now()
        
        # Update session data
        self.session_data['timestamps'].append(timestamp)
        self.session_data['postures'].append(posture)
        self.session_data['confidence_scores'].append(confidence)
        self.session_data['processing_times'].append(process_time)
        self.session_data['fps_values'].append(fps)
        
        # Update real-time metrics
        self.current_metrics['poses_detected'] += 1
        self.current_metrics['average_fps'] = np.mean(self.session_data['fps_values'])
        
        # Log significant events
        if confidence < 0.5:
            logging.warning(f"Low confidence detection: {confidence:.2f}")
    
    def _create_posture_distribution_plot(self, df):
        """Create and save posture distribution plot"""
        plt.figure(figsize=(10, 6))
        posture_counts = df['postures'].value_counts()
        sns.barplot(x=posture_counts.values, y=posture_counts.index)
        plt.title('Posture Distribution')
        plt.xlabel('Count')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'posture_distribution.png')
        plt.close()

    def _create_performance_plot(self, df):
        """Create and save performance metrics plot"""
        plt.figure(figsize=(12, 6))
        plt.plot(df['timestamps'], df['fps_values'], label='FPS')
        plt.plot(df['timestamps'], df['processing_times'], label='Processing Time')
        plt.title('Performance Metrics Over Time')
        plt.xlabel('Time')
        plt.ylabel('Value')
        plt.legend()
        plt.tight_layout()
        plt.savefig(self.save_dir / 'performance_metrics.png')
        plt.close()

    def _create_confidence_plot(self, df):
        """Create and save confidence scores plot"""
        plt.figure(figsize=(10, 6))
        plt.plot(df['timestamps'], df['confidence_scores'])
        plt.title('Detection Confidence Over Time')
        plt.xlabel('Time')
        plt.ylabel('Confidence Score')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'confidence_scores.png')
        plt.close()

    def generate_analytics_report(self):
        """Generate comprehensive analytics report"""
        if not self.session_data['timestamps']:
            logging.warning("No data collected for analytics report")
            return

        # Convert data to DataFrame
        df = pd.DataFrame(self.session_data)
        
        # Generate visualizations
        self._create_posture_distribution_plot(df)
        self._create_performance_plot(df)
        self._create_confidence_plot(df)
        
        # Generate summary statistics
        summary = self._generate_summary_stats(df)
        
        # Save report
        self._save_analytics_report(df, summary)
    
    def _generate_summary_stats(self, df):
        """Generate summary statistics from the session data"""
        summary = {
            'total_frames': len(df),
            'average_fps': df['fps_values'].mean(),
            'average_confidence': df['confidence_scores'].mean(),
            'most_common_posture': df['postures'].mode().iloc[0] if not df['postures'].empty else 'None',
            'session_duration': str(datetime.now() - self.current_metrics['session_start'])
        }
        return summary

    def _save_analytics_report(self, df, summary):
        """Save analytics report to file"""
        report_path = self.save_dir / f'analytics_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
        with open(report_path, 'w') as f:
            f.write("=== Pose Detection Analytics Report ===\n\n")
            for key, value in summary.items():
                f.write(f"{key}: {value}\n")

class PoseDetector:
    def __init__(self):
        # MediaPipe initialization
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
            model_complexity=2
        )
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        
        # Initialize analytics
        self.analytics = PoseAnalytics()
        
        # Performance monitoring
        self.fps_history = deque(maxlen=30)
        self.detection_history = deque(maxlen=30)
        self.joint_consistency = deque(maxlen=30)
        
        # Initialize video recording
        self.recording = False
        self.video_writer = None
        self.output_dir = Path('output')
        self.output_dir.mkdir(exist_ok=True)
        
        # State tracking
        self.current_posture = "No pose detected"
        self.current_confidence = 0.0
        self.current_landmarks = None
        
        # Motion tracking
        self.pose_history = deque(maxlen=5)
        self.gesture_patterns = self._load_gesture_patterns()
        
        # Threshold values
        self.movement_threshold = 10
        self.shoulder_movement_threshold = 5
        self.squat_threshold = 0.25
        self.standing_hip_knee_ratio = 0.9
        self.sitting_hip_knee_ratio = 0.4
        self.arm_raised_threshold = 0.2
        
        self.current_frame_shape = None
        
    def process_frame(self, frame):
        """Process a single frame for pose detection with improved error handling"""
        try:
            start_time = time.time()
            if frame is None or frame.size == 0:
                logging.error("Invalid frame received")
                return frame

            self.current_frame_shape = frame.shape
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Process with MediaPipe
            results = self.pose.process(rgb_frame)
            landmarks = None
            confidence = 0.0

            if results.pose_landmarks:
                landmarks = results.pose_landmarks
                confidence = np.mean([lm.visibility for lm in landmarks.landmark])

                try:
                    # Detect posture
                    posture = self.detect_posture(landmarks)
                    self.current_posture = posture
                    self.current_confidence = confidence
                    self.current_landmarks = landmarks

                    # Calculate and draw bounding box
                    bbox = self.calculate_bounding_box(landmarks, model_type="mediapipe")
                    self.draw_action_box(frame, bbox, posture)

                    # Draw landmarks
                    self.mp_drawing.draw_landmarks(
                        frame,
                        landmarks,
                        self.mp_pose.POSE_CONNECTIONS,
                        landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
                    )

                    # Calculate metrics
                    process_time = time.time() - start_time
                    fps = 1 / process_time if process_time > 0 else 0
                    self.fps_history.append(fps)

                    # Update analytics
                    self.analytics.update_metrics(
                        posture=posture,
                        confidence=confidence,
                        process_time=process_time,
                        fps=fps
                    )
                except Exception as e:
                    logging.error(f"Error processing landmarks: {str(e)}")


            # Draw performance metrics
            self._draw_performance_metrics(frame, fps if 'fps' in locals() else 0, confidence)

            # Handle recording if active
            if self.recording and self.video_writer is not None:
                try:
                    self.video_writer.write(frame)
                except Exception as e:
                    logging.error(f"Error writing video frame: {str(e)}")

            return frame

        except Exception as e:
            logging.error(f"Error in process_frame: {str(e)}")
            return frame

    def _draw_performance_metrics(self, frame, fps, confidence):
        """Draw performance metrics on frame"""
        # Calculate average FPS
        avg_fps = np.mean(self.fps_history) if self.fps_history else 0
        
        # Draw metrics
        metrics_text = [
            f"FPS: {avg_fps:.1f}",
            f"Confidence: {confidence:.2f}",
            f"Posture: {self.current_posture}"
        ]
        
        y_position = 30
        for text in metrics_text:
            cv2.putText(frame, text, (10, y_position),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            y_position += 25

    def toggle_recording(self):
        """Toggle video recording"""
        if not self.recording:
            # Start recording
            filename = self.output_dir / f'pose_recording_{datetime.now().strftime("%Y%m%d_%H%M%S")}.avi'
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            self.video_writer = cv2.VideoWriter(str(filename), fourcc, 20.0, 
                                              (self.current_frame_shape[1], self.current_frame_shape[0]))
            self.recording = True
            logging.info("Recording started")
        else:
            # Stop recording
            if self.video_writer is not None:
                self.video_writer.release()
                self.video_writer = None
            self.recording = False
            logging.info("Recording stopped")

    def save_frame(self):
        """Save current frame as image"""
        if self.current_frame_shape is not None:
            filename = self.output_dir / f'pose_frame_{datetime.now().strftime("%Y%m%d_%H%M%S")}.jpg'
            cv2.imwrite(str(filename), self.current_frame)
            logging.info(f"Frame saved: {filename}")

    def _load_gesture_patterns(self):
        """Load predefined gesture patterns"""
        return {
            'wave': {'sequence': ['standing with raised arms', 'standing'], 'threshold': 0.8},
            'squat': {'sequence': ['standing', 'squatting', 'standing'], 'threshold': 0.9}
        }

    def detect_posture(self, landmarks):
        """Enhanced posture detection with more accurate standing/sitting differentiation"""
        # Get relevant landmark points
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_hip = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_HIP]
        right_hip = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_HIP]
        left_knee = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_KNEE]
        right_knee = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_KNEE]
        left_ankle = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_ANKLE]
        right_ankle = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_ANKLE]

        # Calculate vertical alignments and angles
        hip_y = (left_hip.y + right_hip.y) / 2
        knee_y = (left_knee.y + right_knee.y) / 2
        ankle_y = (left_ankle.y + right_ankle.y) / 2
        shoulder_y = (left_shoulder.y + right_shoulder.y) / 2

        # Calculate angles
        left_knee_angle = self._calculate_angle(
            (left_hip.x, left_hip.y),
            (left_knee.x, left_knee.y),
            (left_ankle.x, left_ankle.y)
        )
        right_knee_angle = self._calculate_angle(
            (right_hip.x, right_hip.y),
            (right_knee.x, right_knee.y),
            (right_ankle.x, right_ankle.y)
        )

        # Calculate vertical alignment scores
        spine_alignment = abs(shoulder_y - hip_y)  # Smaller means more upright
        leg_alignment = abs(hip_y - ankle_y)       # Larger means more likely standing

        # Enhanced standing detection
        is_standing = (
            knee_y > hip_y and                     # Knees below hips
            ankle_y > knee_y and                   # Ankles below knees
            spine_alignment < 0.2 and              # Relatively straight spine
            leg_alignment > 0.3 and                # Legs extended
            left_knee_angle > 160 and              # Straight legs
            right_knee_angle > 160                 # Straight legs
        )

        # Enhanced sitting detection
        is_sitting = (
            abs(knee_y - hip_y) < 0.15 and        # Knees close to hip level
            ankle_y > knee_y and                   # Ankles below knees
            (left_knee_angle < 140 or              # Bent knees
             right_knee_angle < 140)
        )

        # Enhanced squatting detection
        is_squatting = (
            hip_y > shoulder_y and                 # Hips below shoulders
            knee_y > hip_y and                     # Knees below hips
            ankle_y > knee_y and                   # Ankles below knees
            left_knee_angle < 120 and              # Bent knees
            right_knee_angle < 120                 # Bent knees
        )

        # Check for raised arms
        arms_raised = self._are_arms_raised(landmarks)

        # Determine posture with confidence levels
        if is_standing:
            return "standing with raised arms" if arms_raised else "standing"
        elif is_squatting:
            return "squatting"
        elif is_sitting:
            return "sitting"
        else:
            # Calculate closest match based on conditions
            standing_score = (
                (1 if knee_y > hip_y else 0) +
                (1 if ankle_y > knee_y else 0) +
                (1 if spine_alignment < 0.2 else 0) +
                (1 if leg_alignment > 0.3 else 0) +
                (1 if left_knee_angle > 160 else 0) +
                (1 if right_knee_angle > 160 else 0)
            ) / 6.0

            if standing_score > 0.5:
                return "standing"  # Partial standing detection

            return "undefined posture"

    def _are_arms_raised(self, landmarks):
        """Improved detection of raised arms"""
        left_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_WRIST]
        right_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_WRIST]
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_ELBOW]
        right_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_ELBOW]

        # Check both elbow and wrist positions relative to shoulders
        left_arm_raised = (left_wrist.y < left_shoulder.y - 0.1 or 
                          left_elbow.y < left_shoulder.y - 0.1)
        right_arm_raised = (right_wrist.y < right_shoulder.y - 0.1 or 
                           right_elbow.y < right_shoulder.y - 0.1)

        return left_arm_raised or right_arm_raised

    def _calculate_hip_knee_ratio(self, left_hip, right_hip, left_knee, right_knee):
        """Calculate the ratio between hip and knee positions"""
        hip_y = (left_hip.y + right_hip.y) / 2
        knee_y = (left_knee.y + right_knee.y) / 2
        return abs(knee_y - hip_y)

    def _calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points"""
        v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
        v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])

        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
        return np.degrees(angle)



    def calculate_bounding_box(self, landmarks, model_type):
        """Calculate bounding box for detected pose"""
        if model_type == 'mediapipe':
            points = [(lm.x, lm.y) for lm in landmarks.landmark]
        else:  # YOLO
            points = [(x/self.current_frame_shape[1], y/self.current_frame_shape[0]) 
                     for x, y, _ in landmarks]

        x_coords = [p[0] for p in points]
        y_coords = [p[1] for p in points]

        # Calculate normalized coordinates
        x_min, x_max = min(x_coords), max(x_coords)
        y_min, y_max = min(y_coords), max(y_coords)

        # Convert to pixel coordinates
        return {
            'x1': int(x_min * self.current_frame_shape[1]),
            'y1': int(y_min * self.current_frame_shape[0]),
            'x2': int(x_max * self.current_frame_shape[1]),
            'y2': int(y_max * self.current_frame_shape[0])
        }

    def draw_action_box(self, frame, bbox, action):
        """Draw bounding box and action label on frame"""
        # Draw bounding box
        cv2.rectangle(frame, 
                     (bbox['x1'], bbox['y1']), 
                     (bbox['x2'], bbox['y2']), 
                     (0, 255, 0), 2)

        # Draw action label
        label_size = cv2.getTextSize(action, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
        cv2.rectangle(frame,
                     (bbox['x1'], bbox['y1'] - label_size[1] - 10),
                     (bbox['x1'] + label_size[0], bbox['y1']),
                     (0, 255, 0), -1)
        cv2.putText(frame, action,
                   (bbox['x1'], bbox['y1'] - 5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)

def main():
    """Enhanced main function with analytics and error handling"""
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    try:
        # Initialize detector
        detector = PoseDetector()
        logging.info("Pose detector initialized successfully")
        
        # Initialize video capture
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Could not open webcam")
        
        logging.info("Video capture started")
        
        # Main processing loop
        while True:
            success, frame = cap.read()
            if not success:
                logging.warning("Failed to capture frame")
                continue
            
            try:
                # Process frame
                processed_frame = detector.process_frame(frame)
                if processed_frame is None:
                    continue
                
                # Display frame
                cv2.imshow('Pose Detection', processed_frame)
                
                # Handle keyboard input
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logging.info("User requested exit")
                    break
                elif key == ord('r'):
                    detector.toggle_recording()
                elif key == ord('s'):
                    detector.save_frame()
                
            except Exception as e:
                logging.error(f"Error processing frame: {str(e)}")
                continue
                
    except Exception as e:
        logging.error(f"Fatal error: {str(e)}")
        
    finally:
        # Cleanup
        if cap is not None:
            cap.release()
        cv2.destroyAllWindows()
        
        # Generate final analytics
        detector.analytics.generate_analytics_report()
        logging.info("Analytics report generated")

if __name__ == "__main__":
    main()

2024-11-17 00:06:27,153 - INFO - Pose detector initialized successfully
2024-11-17 00:06:29,822 - INFO - Video capture started




In [2]:
import cv2
import mediapipe as mp
import numpy as np
from collections import deque
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
import json
import threading
from pathlib import Path
import logging
import csv
import time

class PoseAnalytics:
    """Analytics component for tracking and analyzing pose detection metrics"""
    def __init__(self, save_dir='analytics'):
        self.save_dir = Path(save_dir)
        self.save_dir.mkdir(exist_ok=True)
        
        # Initialize analytics storage
        self.session_data = {
            'timestamps': [],
            'postures': [],
            'confidence_scores': [],
            'processing_times': [],
            'fps_values': []
        }
        
        # Setup logging
        logging.basicConfig(
            filename=self.save_dir / 'pose_detection.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        
        # Initialize real-time metrics
        self.current_metrics = {
            'session_start': datetime.now(),
            'poses_detected': 0,
            'average_fps': 0.0
        }
    
    def update_metrics(self, posture, confidence, process_time, fps):
        """Update analytics with new frame data"""
        timestamp = datetime.now()
        
        # Update session data
        self.session_data['timestamps'].append(timestamp)
        self.session_data['postures'].append(posture)
        self.session_data['confidence_scores'].append(confidence)
        self.session_data['processing_times'].append(process_time)
        self.session_data['fps_values'].append(fps)
        
        # Update real-time metrics
        self.current_metrics['poses_detected'] += 1
        self.current_metrics['average_fps'] = np.mean(self.session_data['fps_values'])
        
        # Log significant events
        if confidence < 0.5:
            logging.warning(f"Low confidence detection: {confidence:.2f}")
    
    def _create_posture_distribution_plot(self, df):
        """Create and save posture distribution plot"""
        plt.figure(figsize=(10, 6))
        posture_counts = df['postures'].value_counts()
        sns.barplot(x=posture_counts.values, y=posture_counts.index)
        plt.title('Posture Distribution')
        plt.xlabel('Count')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'posture_distribution.png')
        plt.close()

    def _create_performance_plot(self, df):
        """Create and save performance metrics plot"""
        plt.figure(figsize=(12, 6))
        plt.plot(df['timestamps'], df['fps_values'], label='FPS')
        plt.plot(df['timestamps'], df['processing_times'], label='Processing Time')
        plt.title('Performance Metrics Over Time')
        plt.xlabel('Time')
        plt.ylabel('Value')
        plt.legend()
        plt.tight_layout()
        plt.savefig(self.save_dir / 'performance_metrics.png')
        plt.close()

    def _create_confidence_plot(self, df):
        """Create and save confidence scores plot"""
        plt.figure(figsize=(10, 6))
        plt.plot(df['timestamps'], df['confidence_scores'])
        plt.title('Detection Confidence Over Time')
        plt.xlabel('Time')
        plt.ylabel('Confidence Score')
        plt.tight_layout()
        plt.savefig(self.save_dir / 'confidence_scores.png')
        plt.close()

    def generate_analytics_report(self):
        """Generate comprehensive analytics report"""
        if not self.session_data['timestamps']:
            logging.warning("No data collected for analytics report")
            return

        # Convert data to DataFrame
        df = pd.DataFrame(self.session_data)
        
        # Generate visualizations
        self._create_posture_distribution_plot(df)
        self._create_performance_plot(df)
        self._create_confidence_plot(df)
        
        # Generate summary statistics
        summary = self._generate_summary_stats(df)
        
        # Save report
        self._save_analytics_report(df, summary)
    
    def _generate_summary_stats(self, df):
        """Generate summary statistics from the session data"""
        summary = {
            'total_frames': len(df),
            'average_fps': df['fps_values'].mean(),
            'average_confidence': df['confidence_scores'].mean(),
            'most_common_posture': df['postures'].mode().iloc[0] if not df['postures'].empty else 'None',
            'session_duration': str(datetime.now() - self.current_metrics['session_start'])
        }
        return summary

    def _save_analytics_report(self, df, summary):
        """Save analytics report to file"""
        report_path = self.save_dir / f'analytics_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
        with open(report_path, 'w') as f:
            f.write("=== Pose Detection Analytics Report ===\n\n")
            for key, value in summary.items():
                f.write(f"{key}: {value}\n")

class PoseDetector:
    def __init__(self):
        # MediaPipe initialization
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7,
            model_complexity=2
        )
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        
        # Initialize analytics
        self.analytics = PoseAnalytics()
        
        # Performance monitoring
        self.fps_history = deque(maxlen=30)
        self.detection_history = deque(maxlen=30)
        self.joint_consistency = deque(maxlen=30)
        
        # Movement tracking
        self.previous_landmarks = None
        self.movement_history = deque(maxlen=10)
        
        # Confidence tracking
        self.confidence_history = deque(maxlen=10)
        self.low_confidence_count = 0
        self.max_low_confidence_attempts = 3
        
        # Threshold values
        self.movement_threshold = 0.02  # Normalized movement threshold
        self.head_movement_threshold = 0.015
        self.shoulder_movement_threshold = 0.018
        self.squat_threshold = 0.25
        self.standing_hip_knee_ratio = 0.9
        self.sitting_hip_knee_ratio = 0.4
        
        # Confidence thresholds
        self.low_confidence_threshold = 0.5
        self.very_low_confidence_threshold = 0.3
        
        # Movement smoothing
        self.shoulder_positions = deque(maxlen=5)
        self.head_positions = deque(maxlen=5)
        
        # Posture change tracking
        self.last_posture_change_time = 0
        self.posture_change_interval = 0.5  # 1 seconds between posture changes
        self.current_posture = None
        
        
        # Initialize other attributes
        self.current_frame_shape = None
        self.recording = False
        self.video_writer = None
        self.output_dir = Path('output')
        self.output_dir.mkdir(exist_ok=True)
        
    def process_frame(self, frame):
        """Process a single frame for pose detection with improved error handling and confidence tracking"""
        try:
            start_time = time.time()
            if frame is None or frame.size == 0:
                logging.error("Invalid frame received")
                return frame

            self.current_frame_shape = frame.shape
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Process with MediaPipe
            results = self.pose.process(rgb_frame)
            landmarks = None
            confidence = 0.0

            if results.pose_landmarks:
                landmarks = results.pose_landmarks
                confidence = np.mean([lm.visibility for lm in landmarks.landmark])
                
                # Update confidence history
                self.confidence_history.append(confidence)
                
                # Check confidence levels
                confidence_status = self._check_confidence(confidence)
                
                if confidence_status == "low":
                    # Add low confidence warning to frame
                    cv2.putText(frame, "LOW CONFIDENCE: Move back", 
                                (10, self.current_frame_shape[0] - 50), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
                
                if confidence_status == "very_low":
                    # Add very low confidence warning to frame
                    cv2.putText(frame, "VERY LOW CONFIDENCE: Please reposition!", 
                                (10, self.current_frame_shape[0] - 50), 
                                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 3)
                    
                    # Optional: Add blinking effect
                    if int(time.time() * 2) % 2 == 0:
                        frame = cv2.addWeighted(frame, 0.5, np.zeros_like(frame), 0.5, 0)

                try:
                    # Detect posture
                    posture = self.detect_posture(landmarks)
                    self.current_posture = posture
                    self.current_confidence = confidence
                    self.current_landmarks = landmarks

                    # Calculate and draw bounding box
                    bbox = self.calculate_bounding_box(landmarks, model_type="mediapipe")
                    self.draw_action_box(frame, bbox, posture)

                    # Draw landmarks
                    self.mp_drawing.draw_landmarks(
                        frame,
                        landmarks,
                        self.mp_pose.POSE_CONNECTIONS,
                        landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
                    )

                    # Calculate metrics
                    process_time = time.time() - start_time
                    fps = 1 / process_time if process_time > 0 else 0
                    self.fps_history.append(fps)

                    # Update analytics
                    self.analytics.update_metrics(
                        posture=posture,
                        confidence=confidence,
                        process_time=process_time,
                        fps=fps
                    )
                except Exception as e:
                    logging.error(f"Error processing landmarks: {str(e)}")

            # Draw performance metrics
            self._draw_performance_metrics(frame, fps if 'fps' in locals() else 0, confidence)

            # Handle recording if active
            if self.recording and self.video_writer is not None:
                try:
                    self.video_writer.write(frame)
                except Exception as e:
                    logging.error(f"Error writing video frame: {str(e)}")

            return frame

        except Exception as e:
            logging.error(f"Error in process_frame: {str(e)}")
            return frame

    def _draw_performance_metrics(self, frame, fps, confidence):
        """Draw performance metrics on frame"""
        # Calculate average FPS
        avg_fps = np.mean(self.fps_history) if self.fps_history else 0
        
        # Draw metrics
        metrics_text = [
            f"FPS: {avg_fps:.1f}",
            f"Confidence: {confidence:.2f}",
            f"Posture: {self.current_posture}"
        ]
        
        y_position = 30
        for text in metrics_text:
            cv2.putText(frame, text, (10, y_position),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            y_position += 25

    def toggle_recording(self):
        """Toggle video recording"""
        if not self.recording:
            # Start recording
            filename = self.output_dir / f'pose_recording_{datetime.now().strftime("%Y%m%d_%H%M%S")}.avi'
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            self.video_writer = cv2.VideoWriter(str(filename), fourcc, 20.0, 
                                              (self.current_frame_shape[1], self.current_frame_shape[0]))
            self.recording = True
            logging.info("Recording started")
        else:
            # Stop recording
            if self.video_writer is not None:
                self.video_writer.release()
                self.video_writer = None
            self.recording = False
            logging.info("Recording stopped")

    def save_frame(self):
        """Save current frame as image"""
        if self.current_frame_shape is not None:
            filename = self.output_dir / f'pose_frame_{datetime.now().strftime("%Y%m%d_%H%M%S")}.jpg'
            cv2.imwrite(str(filename), self.current_frame)
            logging.info(f"Frame saved: {filename}")

    def _load_gesture_patterns(self):
        """Load predefined gesture patterns"""
        return {
            'wave': {'sequence': ['standing with raised arms', 'standing'], 'threshold': 0.8},
            'squat': {'sequence': ['standing', 'squatting', 'standing'], 'threshold': 0.9}
        }

    def _detect_head_movement(self, landmarks):
        """
        Detect head movement patterns
        Returns: string indicating head movement type
        """
        nose = landmarks.landmark[self.mp_pose.PoseLandmark.NOSE]
        left_ear = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_EAR]
        right_ear = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_EAR]
        
        # Store current head position
        head_pos = (nose.x, nose.y)
        self.head_positions.append(head_pos)
        
        if len(self.head_positions) < 2:
            return ""
            
        # Calculate movement
        prev_pos = self.head_positions[-2]
        dx = head_pos[0] - prev_pos[0]
        dy = head_pos[1] - prev_pos[1]
        
        # Calculate ear position relative to nose for head rotation
        left_ear_dist = abs(left_ear.x - nose.x)
        right_ear_dist = abs(right_ear.x - nose.x)
        ear_ratio = left_ear_dist / right_ear_dist if right_ear_dist > 0 else 1.0
        
        # Detect movements
        if abs(dx) > self.head_movement_threshold:
            if ear_ratio > 1.2:
                return "turning head left"
            elif ear_ratio < 0.8:
                return "turning head right"
            else:
                return "moving head horizontally"
        elif abs(dy) > self.head_movement_threshold:
            return "nodding head"
            
        return ""

    def _detect_shoulder_movement(self, landmarks):
        """
        More precise shoulder movement detection
        Returns: string indicating shoulder movement type
        """
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_ELBOW]
        right_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_ELBOW]
        
        # Store current shoulder positions
        shoulder_pos = (
            (left_shoulder.x, left_shoulder.y),
            (right_shoulder.x, right_shoulder.y)
        )
        self.shoulder_positions.append(shoulder_pos)
        
        if len(self.shoulder_positions) < 2:
            return ""
            
        # Calculate movement
        prev_pos = self.shoulder_positions[-2]
        left_dx = shoulder_pos[0][0] - prev_pos[0][0]
        left_dy = shoulder_pos[0][1] - prev_pos[0][1]
        right_dx = shoulder_pos[1][0] - prev_pos[1][0]
        right_dy = shoulder_pos[1][1] - prev_pos[1][1]
        
        # Calculate elbow positions for more context
        left_elbow_pos = (left_elbow.x, left_elbow.y)
        right_elbow_pos = (right_elbow.x, right_elbow.y)
        
        # Enhanced shoulder movement detection
        # Check vertical and horizontal movements with context
        vertical_diff_left = abs(left_dy)
        vertical_diff_right = abs(right_dy)
        horizontal_diff_left = abs(left_dx)
        horizontal_diff_right = abs(right_dx)
        
        # Shoulder shrug detection
        if (vertical_diff_left > self.shoulder_movement_threshold and 
            vertical_diff_right > self.shoulder_movement_threshold and 
            abs(left_dy - right_dy) < 0.02):  # Almost simultaneous
            return "shrugging shoulders"
        
        # Asymmetric shoulder movement
        if vertical_diff_left > self.shoulder_movement_threshold * 1.5 and \
           vertical_diff_right < self.shoulder_movement_threshold:
            return "raising left shoulder"
        
        if vertical_diff_right > self.shoulder_movement_threshold * 1.5 and \
           vertical_diff_left < self.shoulder_movement_threshold:
            return "raising right shoulder"
        
        # Rotational movement
        if horizontal_diff_left > self.shoulder_movement_threshold and \
           horizontal_diff_right > self.shoulder_movement_threshold:
            return "rotating shoulders"
        
        return ""

    def detect_posture(self, landmarks):
        """Enhanced posture detection with improved sitting and squatting logic"""
        current_time = time.time()
        
        # Only update posture every 5 seconds
        if (current_time - self.last_posture_change_time < self.posture_change_interval 
            and self.current_posture is not None):
            return self.current_posture

        # Get base posture
        base_posture = self._detect_base_posture(landmarks)
        
        # Detect head and shoulder movements
        head_movement = self._detect_head_movement(landmarks)
        shoulder_movement = self._detect_shoulder_movement(landmarks)
        
        # Determine sitting type if sitting
        if 'sitting' in base_posture:
            base_posture = f"sitting on floor"
        
        # Combine movements with base posture
        movements = []
        if head_movement:
            movements.append(head_movement)
        if shoulder_movement:
            movements.append(shoulder_movement)
            
        if movements:
            final_posture = f"{base_posture} while {' and '.join(movements)}"
        else:
            final_posture = base_posture
        
        # Update tracking
        self.current_posture = final_posture
        self.last_posture_change_time = current_time
        
        return final_posture

    def _detect_base_posture(self, landmarks):
        """Detect base posture (standing, sitting, squatting)"""
        # Get relevant landmark points
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_hip = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_HIP]
        right_hip = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_HIP]
        left_knee = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_KNEE]
        right_knee = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_KNEE]
        left_ankle = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_ANKLE]
        right_ankle = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_ANKLE]
        
        # Calculate vertical positions and angles
        hip_y = (left_hip.y + right_hip.y) / 2
        knee_y = (left_knee.y + right_knee.y) / 2
        ankle_y = (left_ankle.y + right_ankle.y) / 2
        shoulder_y = (left_shoulder.y + right_shoulder.y) / 2
        
        # Calculate joint angles
        left_knee_angle = self._calculate_angle(
            (left_hip.x, left_hip.y),
            (left_knee.x, left_knee.y),
            (left_ankle.x, left_ankle.y)
        )
        right_knee_angle = self._calculate_angle(
            (right_hip.x, right_hip.y),
            (right_knee.x, right_knee.y),
            (right_ankle.x, right_ankle.y)
        )
        
        # Calculate hip angle
        hip_angle = self._calculate_angle(
            (shoulder_y, 0),
            (hip_y, 0),
            (knee_y, 0)
        )
        
        # Enhanced posture detection with confidence scores
        standing_score = self._calculate_standing_score(
            hip_y, knee_y, ankle_y, shoulder_y,
            left_knee_angle, right_knee_angle
        )
        sitting_score = self._calculate_sitting_score(
            hip_y, knee_y, ankle_y,
            left_knee_angle, right_knee_angle
        )
        squatting_score = self.calculate_squat_score(
            hip_y, knee_y, ankle_y, shoulder_y,
            left_knee_angle, right_knee_angle
        )

        # Adjust threshold for posture determination
        scores = {
            "standing": standing_score,
            "sitting": sitting_score,
            "squatting": squatting_score
        }

        # More nuanced posture selection
        base_posture = max(scores.items(), key=lambda x: x[1])[0]


        # Additional checks to prevent false positives
        if base_posture == "standing":
            # Check if it might actually be a squat
            if squatting_score > 0.00000001 or scores["squatting"] > scores["standing"]:
                base_posture = "squatting"
        
        if base_posture == "sitting":
            # Check if it might actually be a squat
            if squatting_score > 0.00000001 or scores["squatting"] > scores["sitting"]:
                base_posture = "squatting"
        
            # Check for raised arms
        if self._are_arms_raised(landmarks):
            base_posture += " with raised arms"

        return base_posture

    def _calculate_standing_score(self, hip_y, knee_y, ankle_y, shoulder_y,
                                 left_knee_angle, right_knee_angle):

        score = 0.0

        # 1. Vertical Alignment Check (40% of score)
        if shoulder_y > hip_y > knee_y > ankle_y:
            score += 0.4

        # 2. Knee Angle Check (30% of score)
        knee_angle_score = 0.0
        if 160 <= left_knee_angle <= 180 and 160 <= right_knee_angle <= 180:
            # Both knees in fully extended position
            knee_angle_score = 0.3
        elif 160 <= left_knee_angle <= 180 or 160 <= right_knee_angle <= 180:
            # At least one knee in fully extended position
            knee_angle_score = 0.15
        score += knee_angle_score

        # 3. Spine Alignment Check (30% of score)
        spine_alignment_score = 0.0
        if abs(shoulder_y - hip_y) < 0.1:
            # Shoulder and hip vertically aligned
            spine_alignment_score = 0.3
        elif abs(shoulder_y - hip_y) < 0.2:
            # Moderate spine alignment
            spine_alignment_score = 0.15
        score += spine_alignment_score

        return score

    def _calculate_sitting_score(self, hip_y, knee_y, ankle_y,
                               left_knee_angle, right_knee_angle):
        """Calculate confidence score for sitting posture"""
        score = 0.0
        
        # 1. Hip-Knee Alignment Check (40% of score)
        alignment_score = 0.0

        # Instead of comparing to floor height, check relative positioning
        if abs(knee_y - hip_y) < 0.1:  # Knees close to hip level
            alignment_score = 0.4
        score += alignment_score

        # 2. Knee Angle Check (30% of score)
        knee_angle_score = 0.0
        if 90 <= left_knee_angle <= 120 and 90 <= right_knee_angle <= 120:
            # Both knees in proper flexion range for floor sitting
            knee_angle_score = 0.3
        elif 90 <= left_knee_angle <= 120 or 90 <= right_knee_angle <= 120:
            # At least one knee in proper flexion range
            knee_angle_score = 0.15
        score += knee_angle_score

        # 3. Ankle Position Check (30% of score)
        ankle_score = 0.0
        if abs(ankle_y - knee_y) < 0.1:  # Ankles close to knee level
            ankle_score = 0.3
        score += ankle_score

        return min(score, 1.0)  # Ensure score doesn't exceed 1.0

    def calculate_squat_score(self, hip_y, knee_y, ankle_y, shoulder_y,
                               left_knee_angle, right_knee_angle):
        # Basic score calculation focusing on key body alignments
        score = 0.0

        # 1. Vertical Alignment Check (40% of score)
        if shoulder_y > hip_y > knee_y > ankle_y:
            score += 0.4

        # 2. Squat Depth Check (30% of score)
        depth = hip_y - ankle_y
        if depth > 0.5:  # Deep squat
            score += 0.3
        elif depth > 0.3:  # Moderate squat
            score += 0.2
        elif depth > 0.2:  # Partial squat
            score += 0.1

        # 3. Knee Angle Check (30% of score)
        knee_angle_score = 0.0
        if 50 < left_knee_angle < 110 and 50 < right_knee_angle < 110:
            knee_angle_score = 0.3
        elif 50 < left_knee_angle < 110 or 50 < right_knee_angle < 110:
            knee_angle_score = 0.15
        score += knee_angle_score

        return score
    
    def _are_arms_raised(self, landmarks):
        
        left_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_WRIST]
        right_wrist = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_WRIST]
        left_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_SHOULDER]
        right_shoulder = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_SHOULDER]
        left_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.LEFT_ELBOW]
        right_elbow = landmarks.landmark[self.mp_pose.PoseLandmark.RIGHT_ELBOW]

        # Check both elbow and wrist positions relative to shoulders
        left_arm_raised = (left_wrist.y < left_shoulder.y - 0.1 or 
                          left_elbow.y < left_shoulder.y - 0.1)
        right_arm_raised = (right_wrist.y < right_shoulder.y - 0.1 or 
                           right_elbow.y < right_shoulder.y - 0.1)

        return left_arm_raised or right_arm_raised

    def _calculate_hip_knee_ratio(self, left_hip, right_hip, left_knee, right_knee):
        """Calculate the ratio between hip and knee positions"""
        hip_y = (left_hip.y + right_hip.y) / 2
        knee_y = (left_knee.y + right_knee.y) / 2
        return abs(knee_y - hip_y)

    def _calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points"""
        v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
        v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])

        cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
        angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
        return np.degrees(angle)



    def calculate_bounding_box(self, landmarks, model_type):

        if model_type == 'mediapipe':
            points = [(lm.x, lm.y) for lm in landmarks.landmark]
        else:  # YOLO
            points = [(x/self.current_frame_shape[1], y/self.current_frame_shape[0]) 
                     for x, y, _ in landmarks]

        x_coords = [p[0] for p in points]
        y_coords = [p[1] for p in points]

        # Calculate normalized coordinates
        x_min, x_max = min(x_coords), max(x_coords)
        y_min, y_max = min(y_coords), max(y_coords)

        # Convert to pixel coordinates
        return {
            'x1': int(x_min * self.current_frame_shape[1]),
            'y1': int(y_min * self.current_frame_shape[0]),
            'x2': int(x_max * self.current_frame_shape[1]),
            'y2': int(y_max * self.current_frame_shape[0])
        }

    def draw_action_box(self, frame, bbox, action):
        """Draw bounding box and action label on frame"""
        # Draw bounding box
        cv2.rectangle(frame, 
                     (bbox['x1'], bbox['y1']), 
                     (bbox['x2'], bbox['y2']), 
                     (0, 255, 0), 2)

        # Draw action label
        label_size = cv2.getTextSize(action, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
        cv2.rectangle(frame,
                     (bbox['x1'], bbox['y1'] - label_size[1] - 10),
                     (bbox['x1'] + label_size[0], bbox['y1']),
                     (0, 255, 0), -1)
        cv2.putText(frame, action,
                   (bbox['x1'], bbox['y1'] - 5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2)
    
    def _check_confidence(self, confidence):

        if confidence > self.low_confidence_threshold:
            # Reset low confidence counter
            self.low_confidence_count = 0
            return "normal"
        
        # Increment low confidence counter
        self.low_confidence_count += 1
        
        if confidence <= self.very_low_confidence_threshold:
            # Critical low confidence
            return "very_low"
        
        # Standard low confidence
        return "low"

def main():
    """Enhanced main function with analytics and error handling"""
    # Setup logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    try:
        # Initialize detector
        detector = PoseDetector()
        logging.info("Pose detector initialized successfully")
        
        # Initialize video capture
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            raise RuntimeError("Could not open webcam")
        
        logging.info("Video capture started")
        
        # Main processing loop
        while True:
            success, frame = cap.read()
            if not success:
                logging.warning("Failed to capture frame")
                continue
            
            try:
                # Process frame
                processed_frame = detector.process_frame(frame)
                if processed_frame is None:
                    continue
                
                # Display frame
                cv2.namedWindow("Pose Detection", cv2.WINDOW_NORMAL)
                cv2.setWindowProperty("Pose Detection", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
                cv2.imshow('Pose Detection', processed_frame)
                
                # Handle keyboard input
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    logging.info("User requested exit")
                    break
                elif key == ord('r'):
                    detector.toggle_recording()
                elif key == ord('s'):
                    detector.save_frame()
                
            except Exception as e:
                logging.error(f"Error processing frame: {str(e)}")
                continue
                
    except Exception as e:
        logging.error(f"Fatal error: {str(e)}")
        
    finally:
        # Cleanup
        if cap is not None:
            cap.release()
        cv2.destroyAllWindows()
        
        # Generate final analytics
        detector.analytics.generate_analytics_report()
        logging.info("Analytics report generated")

if __name__ == "__main__":
    main()
    

2024-12-14 13:34:56,386 - INFO - Pose detector initialized successfully
2024-12-14 13:34:59,432 - INFO - Video capture started
2024-12-14 13:35:09,713 - INFO - User requested exit
2024-12-14 13:35:10,444 - INFO - Analytics report generated


In [None]:
from PIL import Image as PILImage
from IPython.display import clear_output, display, Image
import io
import socket
import numpy as np
import time
import tkinter as tk
from tkinter import ttk
from PIL import ImageTk
import threading
import sys

SERVER_IP = "127.0.0.1"
SERVER_PORT = 9999

class NAOCameraApp:
    def __init__(self, root):
        self.root = root
        self.root.title("NAO Robot Camera Feed")
        
        # Make it fullscreen
        self.root.attributes('-fullscreen', True)
        
        # Configure root to expand
        self.root.grid_rowconfigure(0, weight=1)
        self.root.grid_columnconfigure(0, weight=1)
        
        # Create main frame
        self.main_frame = ttk.Frame(root)
        self.main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        # Configure main_frame to expand
        self.main_frame.grid_rowconfigure(0, weight=1)
        self.main_frame.grid_columnconfigure(0, weight=1)
        
        # Create label for image display
        self.image_label = ttk.Label(self.main_frame)
        self.image_label.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        # Create status label
        self.status_label = ttk.Label(self.main_frame, text="Disconnected")
        self.status_label.grid(row=1, column=0, pady=5)
        
        # Connection status
        self.running = False
        self.client_socket = None
        self._quit_event = threading.Event()
        
        # Bind quit event to both 'q' and 'Q'
        self.root.bind('q', self.quit_application)
        self.root.bind('Q', self.quit_application)
        self.root.bind('<Escape>', self.quit_application)
        
        # Set socket timeout to make quit more responsive
        self.socket_timeout = 0.1
        
        # Start connection thread
        self.connection_thread = threading.Thread(target=self.display_live_feed)
        self.connection_thread.daemon = True
        self.connection_thread.start()

    def quit_application(self, event=None):
        if self._quit_event.is_set():  # Prevent multiple quit attempts
            return
        self._quit_event.set()
        self.running = False
        
        def force_quit():
            if self.client_socket:
                try:
                    self.client_socket.shutdown(socket.SHUT_RDWR)
                    self.client_socket.close()
                except:
                    pass
            self.root.quit()
            self.root.destroy()
        
        # Schedule force quit after a short delay
        self.root.after(100, force_quit)

    def update_image(self, frame):
        if self._quit_event.is_set():
            return
        try:
            pil_image = PILImage.fromarray(frame)
            
            screen_width = self.root.winfo_width()
            screen_height = self.root.winfo_height()
            
            img_width, img_height = pil_image.size
            aspect_ratio = img_width / img_height
            
            if screen_width / screen_height > aspect_ratio:
                new_height = screen_height
                new_width = int(screen_height * aspect_ratio)
            else:
                new_width = screen_width
                new_height = int(screen_width / aspect_ratio)
            
            pil_image = pil_image.resize((new_width, new_height), PILImage.Resampling.LANCZOS)
            photo = ImageTk.PhotoImage(image=pil_image)
            
            self.image_label.configure(image=photo)
            self.image_label.image = photo
        except Exception as e:
            if not self._quit_event.is_set():
                print(f"Error updating image: {e}")

    def display_live_feed(self):
        self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client_socket.settimeout(self.socket_timeout)  # Set timeout for socket operations
        self.running = True
        
        try:
            self.client_socket.connect((SERVER_IP, SERVER_PORT))
            self.status_label.configure(text="Connected to NAO robot stream server")
            
            while self.running and not self._quit_event.is_set():
                try:
                    # Receive width and height
                    width_data = self.client_socket.recv(4)
                    if not width_data:
                        break
                        
                    width = int.from_bytes(width_data, byteorder="big")
                    height = int.from_bytes(self.client_socket.recv(4), byteorder="big")
                    
                    frame_size = width * height * 3
                    raw_data = bytearray()
                    remaining = frame_size
                    
                    while remaining > 0 and not self._quit_event.is_set():
                        try:
                            chunk = self.client_socket.recv(min(remaining, 4096))
                            if not chunk:
                                raise ConnectionError("Connection lost while receiving frame data")
                            raw_data.extend(chunk)
                            remaining -= len(chunk)
                        except socket.timeout:
                            if self._quit_event.is_set():
                                return
                            continue
                    
                    if self._quit_event.is_set():
                        return
                        
                    frame = np.frombuffer(raw_data, dtype=np.uint8).reshape((height, width, 3))
                    self.root.after(0, self.update_image, frame)
                    
                except socket.timeout:
                    continue
                except ConnectionError as e:
                    if not self._quit_event.is_set():
                        self.status_label.configure(text=f"Connection error: {e}")
                    break
                except Exception as e:
                    if not self._quit_event.is_set():
                        self.status_label.configure(text=f"Error processing frame: {e}")
                    continue
                    
        except ConnectionRefusedError:
            if not self._quit_event.is_set():
                self.status_label.configure(text="Could not connect to server. Make sure the server is running.")
        except Exception as e:
            if not self._quit_event.is_set():
                self.status_label.configure(text=f"Unexpected error: {e}")
        finally:
            if self.client_socket:
                try:
                    self.client_socket.close()
                except:
                    pass
            if not self._quit_event.is_set():
                self.status_label.configure(text="Connection closed")

if __name__ == "__main__":
    root = tk.Tk()
    app = NAOCameraApp(root)
    root.mainloop()

In [None]:
from PIL import Image as PILImage
from IPython.display import clear_output, display, Image
import io
import socket
import numpy as np
import time
import tkinter as tk
from tkinter import ttk
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import seaborn as sns
from PIL import ImageTk
import threading
import sys
import cv2
import mediapipe as mp
import logging
from datetime import datetime
from pathlib import Path
from collections import deque
import queue
import pandas as pd

class PoseAnalyticsWindow:
    def __init__(self, analytics):
        self.window = tk.Toplevel()
        self.window.title("Pose Analytics")
        self.analytics = analytics
        
        # Make the analytics window smaller than main window
        self.window.geometry("1000x800")
        
        # Configure window to expand
        self.window.grid_rowconfigure(0, weight=1)
        self.window.grid_columnconfigure(0, weight=1)
        
        # Create notebook for different analytics tabs
        self.notebook = ttk.Notebook(self.window)
        self.notebook.grid(row=0, column=0, sticky='nsew', padx=5, pady=5)
        
        # Create tabs
        self.realtime_tab = ttk.Frame(self.notebook)
        self.history_tab = ttk.Frame(self.notebook)
        self.stats_tab = ttk.Frame(self.notebook)
        
        self.notebook.add(self.realtime_tab, text='Real-time Analysis')
        self.notebook.add(self.history_tab, text='History')
        self.notebook.add(self.stats_tab, text='Statistics')
        
        # Initialize components
        self.setup_realtime_tab()
        self.setup_history_tab()
        self.setup_stats_tab()
        
        # Update interval
        self.update_interval = 100  # milliseconds
        self.window.after(self.update_interval, self.update_displays)

    def setup_realtime_tab(self):
        # Real-time metrics frame
        self.metrics_frame = ttk.LabelFrame(self.realtime_tab, text="Current Metrics")
        self.metrics_frame.pack(fill=tk.X, padx=5, pady=5)
        
        # FPS and confidence display
        self.fps_label = ttk.Label(self.metrics_frame, text="FPS: 0")
        self.fps_label.pack(side=tk.LEFT, padx=10)
        
        self.confidence_label = ttk.Label(self.metrics_frame, text="Confidence: 0%")
        self.confidence_label.pack(side=tk.LEFT, padx=10)
        
        self.posture_label = ttk.Label(self.metrics_frame, text="Current Posture: None")
        self.posture_label.pack(side=tk.LEFT, padx=10)
        
        # Create matplotlib figure for real-time confidence plot
        self.rt_figure, (self.conf_ax, self.fps_ax) = plt.subplots(2, 1, figsize=(10, 6))
        self.rt_canvas = FigureCanvasTkAgg(self.rt_figure, master=self.realtime_tab)
        self.rt_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

    def setup_history_tab(self):
        # Controls frame
        self.history_controls = ttk.Frame(self.history_tab)
        self.history_controls.pack(fill=tk.X, padx=5, pady=5)
        
        ttk.Label(self.history_controls, text="Time Range:").pack(side=tk.LEFT, padx=5)
        self.time_range = ttk.Combobox(self.history_controls, 
                                     values=['Last 1 minute', 'Last 5 minutes', 'Last 15 minutes'])
        self.time_range.pack(side=tk.LEFT, padx=5)
        self.time_range.set('Last 5 minutes')
        self.time_range.bind('<<ComboboxSelected>>', self.update_history_plots)
        
        ttk.Button(self.history_controls, text="Generate Report", 
                  command=self.analytics.generate_analytics_report).pack(side=tk.RIGHT, padx=5)
        
        # Create matplotlib figure for historical plots
        self.hist_figure, (self.posture_ax, self.perf_ax) = plt.subplots(2, 1, figsize=(10, 8))
        self.hist_canvas = FigureCanvasTkAgg(self.hist_figure, master=self.history_tab)
        self.hist_canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

    def setup_stats_tab(self):
        self.stats_frame = ttk.Frame(self.stats_tab)
        self.stats_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
        
        # Session statistics
        self.session_frame = ttk.LabelFrame(self.stats_frame, text="Session Statistics")
        self.session_frame.pack(fill=tk.X, padx=5, pady=5)
        
        self.stats_labels = {
            'session_duration': ttk.Label(self.session_frame, text="Session Duration: 0:00"),
            'total_frames': ttk.Label(self.session_frame, text="Total Frames: 0"),
            'avg_fps': ttk.Label(self.session_frame, text="Average FPS: 0.0"),
            'avg_confidence': ttk.Label(self.session_frame, text="Average Confidence: 0%"),
        }
        
        for label in self.stats_labels.values():
            label.pack(anchor='w', padx=5, pady=2)

    def update_displays(self):
        """Update all displays with current analytics data"""
        if not self.analytics.session_data['timestamps']:
            self.window.after(self.update_interval, self.update_displays)
            return
            
        # Update real-time metrics
        self.update_realtime_metrics()
        
        # Update plots if current tab is visible
        current_tab = self.notebook.select()
        if current_tab == str(self.realtime_tab):
            self.update_realtime_plots()
        elif current_tab == str(self.history_tab):
            self.update_history_plots()
        elif current_tab == str(self.stats_tab):
            self.update_statistics()
        
        self.window.after(self.update_interval, self.update_displays)

    def update_realtime_metrics(self):
        """Update real-time metric labels"""
        if self.analytics.session_data['fps_values']:
            current_fps = self.analytics.session_data['fps_values'][-1]
            self.fps_label.config(text=f"FPS: {current_fps:.1f}")
        
        if self.analytics.session_data['confidence_scores']:
            current_confidence = self.analytics.session_data['confidence_scores'][-1]
            self.confidence_label.config(text=f"Confidence: {current_confidence:.1%}")
        
        if self.analytics.session_data['postures']:
            current_posture = self.analytics.session_data['postures'][-1]
            self.posture_label.config(text=f"Current Posture: {current_posture}")

    def update_realtime_plots(self):
        """Update real-time plots"""
        # Clear previous plots
        self.conf_ax.clear()
        self.fps_ax.clear()
        
        # Get last 100 data points
        timestamps = self.analytics.session_data['timestamps'][-100:]
        confidence_scores = self.analytics.session_data['confidence_scores'][-100:]
        fps_values = self.analytics.session_data['fps_values'][-100:]
        
        # Plot confidence scores
        self.conf_ax.plot(timestamps, confidence_scores)
        self.conf_ax.set_title('Real-time Confidence Scores')
        self.conf_ax.set_ylim(0, 1)
        
        # Plot FPS
        self.fps_ax.plot(timestamps, fps_values)
        self.fps_ax.set_title('Real-time FPS')
        
        # Adjust layout and redraw
        self.rt_figure.tight_layout()
        self.rt_canvas.draw()

    def update_history_plots(self, event=None):
        """Update historical plots"""
        # Clear previous plots
        self.posture_ax.clear()
        self.perf_ax.clear()
        
        # Convert time range to minutes
        time_range_str = self.time_range.get()
        minutes = int(time_range_str.split()[1])
        
        # Create DataFrame from session data
        df = pd.DataFrame(self.analytics.session_data)
        
        # Filter data by time range
        cutoff_time = datetime.now() - pd.Timedelta(minutes=minutes)
        df = df[df['timestamps'] > cutoff_time]
        
        # Plot posture distribution
        posture_counts = df['postures'].value_counts()
        sns.barplot(x=posture_counts.values, y=posture_counts.index, ax=self.posture_ax)
        self.posture_ax.set_title('Posture Distribution')
        
        # Plot performance metrics
        self.perf_ax.plot(df['timestamps'], df['fps_values'], label='FPS')
        self.perf_ax.plot(df['timestamps'], df['processing_times'], label='Processing Time')
        self.perf_ax.set_title('Performance Metrics')
        self.perf_ax.legend()
        
        # Adjust layout and redraw
        self.hist_figure.tight_layout()
        self.hist_canvas.draw()

    def update_statistics(self):
        """Update statistics display"""
        # Calculate session duration
        duration = datetime.now() - self.analytics.current_metrics['session_start']
        
        # Update statistics labels
        self.stats_labels['session_duration'].config(
            text=f"Session Duration: {str(duration).split('.')[0]}")
        
        self.stats_labels['total_frames'].config(
            text=f"Total Frames: {self.analytics.current_metrics['poses_detected']}")
        
        self.stats_labels['avg_fps'].config(
            text=f"Average FPS: {self.analytics.current_metrics['average_fps']:.1f}")
        
        if self.analytics.session_data['confidence_scores']:
            avg_confidence = np.mean(self.analytics.session_data['confidence_scores'])
            self.stats_labels['avg_confidence'].config(
                text=f"Average Confidence: {avg_confidence:.1%}")

class NAOPoseDetectionApp:
    def __init__(self, root):
        # ... (previous initialization code remains the same) ...
        
        # Initialize analytics
        self.analytics = PoseAnalytics(save_dir='analytics')
        
        # Create analytics window
        self.analytics_window = PoseAnalyticsWindow(self.analytics)
        
        # Add button to show/hide analytics window
        self.analytics_button = ttk.Button(self.status_frame, 
                                         text="Toggle Analytics", 
                                         command=self.toggle_analytics)
        self.analytics_button.pack(side=tk.LEFT, padx=5)

    def process_frames(self):
        while not self._quit_event.is_set():
            try:
                frame = self.frame_queue.get(timeout=0.1)
                start_time = time.time()
                self.last_frames_time.append(start_time)
                
                # Process frame with pose detection
                processed_frame, pose_data = self.pose_detector.process_frame(frame)
                
                # Calculate processing time and FPS
                process_time = time.time() - start_time
                fps = len(self.last_frames_time) / (self.last_frames_time[-1] - self.last_frames_time[0])
                
                # Update analytics
                self.analytics.update_metrics(
                    posture=pose_data.get('posture', 'Unknown'),
                    confidence=pose_data.get('confidence', 0.0),
                    process_time=process_time,
                    fps=fps
                )
                
                # ... (rest of the processing code remains the same) ...
                
            except queue.Empty:
                continue
            except Exception as e:
                if not self._quit_event.is_set():
                    logging.error(f"Error processing frame: {e}")

    def toggle_analytics(self):
        """Show or hide the analytics window"""
        if self.analytics_window.window.winfo_viewable():
            self.analytics_window.window.withdraw()
        else:
            self.analytics_window.window.deiconify()

    def cleanup(self):
        if self.client_socket:
            try:
                self.client_socket.close()
            except:
                pass
        if not self._quit_event.is_set():
            self.status_label.configure(text="Connection closed")
            # Generate final analytics report
            self.analytics.generate_analytics_report()