In [1]:
import cv2
import mediapipe as mp
import numpy as np
import json
import os
import time
import matplotlib.pyplot as plt
from collections import deque, defaultdict
import logging
from datetime import datetime
import configparser

# Initialize MediaPipe Pose globally
mp_pose = mp.solutions.pose
PoseLandmark = mp_pose.PoseLandmark




# ========================
# Configuration Manager
# ========================

In [2]:
class ConfigManager:
    def __init__(self, config_path="config.ini"):
        self.config = configparser.ConfigParser()
        self.config.read(config_path)
        
        # Set default values if sections/keys are missing
        if not self.config.has_section("PATHES"):
            self.config["PATHES"] = {
                "input_video": "input_video.mp4",
                "output_video": "output/annotated_video.mp4",
                "output_json": "output/evaluation.json",
                "output_plot": "output/temporal_metrics.png",
                "output_report": "output/report.html"
            }
            
        if not self.config.has_section("THRESHOLDS"):
            self.config["THRESHOLDS"] = {
                "min_detection_confidence": "0.6",
                "min_tracking_confidence": "0.5",
                "visibility_threshold": "0.5",
                "max_missed_frames": "5",
                "fps_target": "10",
                "contact_velocity_threshold": "0.5",
                "phase_velocity_threshold": "0.1",
                "elbow_angle_min": "110",
                "elbow_angle_max": "160",
                "spine_lean_max": "25",
                "head_knee_max": "0.15",
                "foot_angle_min": "5",
                "foot_angle_max": "30"
            }
            
        if not self.config.has_section("REFERENCE"):
            self.config["REFERENCE"] = {
                "elbow_angle_ideal": "135",
                "spine_lean_ideal": "10",
                "head_knee_ideal": "0.05",
                "foot_angle_ideal": "15"
            }
        
        # Save updated config
        with open(config_path, 'w') as configfile:
            self.config.write(configfile)
            
    def get(self, section, key, fallback=None):
        return self.config.get(section, key, fallback=fallback)
    
    def getfloat(self, section, key, fallback=None):
        return self.config.getfloat(section, key, fallback=fallback)
    
    def getint(self, section, key, fallback=None):
        return self.config.getint(section, key, fallback=fallback)

# ========================
# Pose Estimation Module
# ========================

In [3]:
class PoseEstimator:
    def __init__(self, config):
        self.config = config
        self.pose = mp_pose.Pose(
            static_image_mode=False,
            model_complexity=2,
            min_detection_confidence=config.getfloat("THRESHOLDS", "min_detection_confidence"),
            min_tracking_confidence=config.getfloat("THRESHOLDS", "min_tracking_confidence")
        )
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_drawing_styles = mp.solutions.drawing_styles
        
    def estimate(self, frame):
        return self.pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    
    def draw_landmarks(self, frame, landmarks):
        self.mp_drawing.draw_landmarks(
            frame,
            landmarks,
            mp_pose.POSE_CONNECTIONS,
            landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
        )
    
    def close(self):
        self.pose.close()

# ========================
# Batsman Tracking Module
# ========================

In [4]:
class BatsmanTracker:
    def __init__(self, frame_shape, config):
        self.prev_landmarks = None
        self.missed_frames = 0
        self.frame_shape = frame_shape
        self.config = config
        
    def track_batsman(self, results):
        if not results.pose_landmarks:
            self.missed_frames += 1
            return self.prev_landmarks if self.missed_frames < self.config.getint("THRESHOLDS", "max_missed_frames") else None
        
        # For single-person detection, always use detected person
        landmarks = results.pose_landmarks
        
        # Check if we need to re-identify batsman
        if self.prev_landmarks is None:
            self.prev_landmarks = landmarks
            self.missed_frames = 0
            return landmarks
        
        self.prev_landmarks = landmarks
        self.missed_frames = 0
        return landmarks

# ========================
# Bat Tracking Module
# ========================

In [5]:
class BatTracker:
    def __init__(self, config):
        self.config = config
        self.bat_line_history = deque(maxlen=10)
        
    def detect_bat(self, frame, hand_pos):
        """Simplified bat detection using color-based approach near hand position"""
        if hand_pos is None:
            return None
            
        # Create ROI around hand position
        x, y = hand_pos
        roi_size = 100
        x1, y1 = max(0, x - roi_size//2), max(0, y - roi_size//2)
        x2, y2 = min(frame.shape[1], x + roi_size//2), min(frame.shape[0], y + roi_size//2)
        
        if x1 >= x2 or y1 >= y2:
            return None
            
        roi = frame[y1:y2, x1:x2]
        
        # Convert to HSV for color detection
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
        
        # Define range for wood color (bat)
        lower_wood = np.array([10, 50, 50])
        upper_wood = np.array([30, 255, 255])
        
        # Threshold the HSV image
        mask = cv2.inRange(hsv, lower_wood, upper_wood)
        
        # Find contours - Fixed for OpenCV version compatibility
        try:
            contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        except ValueError:
            # Handle older OpenCV versions
            _, contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        if not contours:
            return None
            
        # Find largest contour
        largest_contour = max(contours, key=cv2.contourArea)
        
        # Check if contour is large enough
        if cv2.contourArea(largest_contour) < 50:
            return None
        
        # Fit line to contour
        try:
            rows, cols = roi.shape[:2]
            [vx, vy, x, y] = cv2.fitLine(largest_contour, cv2.DIST_L2, 0, 0.01, 0.01)
            
            # Fix numpy deprecation warnings
            vx, vy, x, y = float(vx), float(vy), float(x), float(y)
            
            # Calculate bat endpoints (in ROI coordinates)
            if vx != 0:
                lefty = int((-x*vy/vx) + y)
                righty = int(((cols-x)*vy/vx)+y)
            else:
                lefty = int(y)
                righty = int(y)
            
            # Convert to full image coordinates
            start_point = (0 + x1, lefty + y1)
            end_point = (cols-1 + x1, righty + y1)
            
            # Store for smoothing
            self.bat_line_history.append((start_point, end_point))
            
            # Return average of last few detections
            if self.bat_line_history:
                start_points = [p[0] for p in self.bat_line_history]
                end_points = [p[1] for p in self.bat_line_history]
                avg_start = np.mean(start_points, axis=0).astype(int)
                avg_end = np.mean(end_points, axis=0).astype(int)
                return (int(avg_start[0]), int(avg_start[1]), int(avg_end[0]), int(avg_end[1]))
        except Exception as e:
            logging.warning(f"Bat line fitting error: {str(e)}")
            return None
        
        return None

# ========================
# Phase Segmentation Module
# ========================

In [6]:
class PhaseSegmenter:
    PHASES = ["Stance", "Stride", "Downswing", "Impact", "Follow-through", "Recovery"]
    
    def __init__(self, config):
        self.config = config
        self.current_phase = "Stance"
        self.phase_history = []
        self.prev_wrist_pos = None
        self.contact_detected = False
        self.velocity_history = deque(maxlen=5)
        
    def detect_contact(self, wrist_velocity):
        """Detect contact moment based on velocity spike"""
        if len(self.velocity_history) < 5:
            self.velocity_history.append(wrist_velocity)
            return False
            
        # Detect peak followed by sharp drop
        if (wrist_velocity > self.config.getfloat("THRESHOLDS", "contact_velocity_threshold") and
            len(self.velocity_history) >= 2 and
            self.velocity_history[-2] > wrist_velocity and
            abs(self.velocity_history[-2] - wrist_velocity) > 0.3):
            self.contact_detected = True
            return True
            
        self.velocity_history.append(wrist_velocity)
        return False
        
    def update_phase(self, landmarks, frame_shape):
        """Update current phase based on joint movements"""
        if landmarks is None:
            return self.current_phase
            
        # Get wrist position (right wrist for right-handed batsman)
        try:
            wrist = landmarks.landmark[PoseLandmark.RIGHT_WRIST]
            wrist_pos = (wrist.x * frame_shape[1], wrist.y * frame_shape[0])
            
            # Calculate wrist velocity
            if self.prev_wrist_pos:
                velocity = np.sqrt((wrist_pos[0] - self.prev_wrist_pos[0])**2 + 
                                  (wrist_pos[1] - self.prev_wrist_pos[1])**2)
                
                # Detect contact moment
                if self.current_phase == "Downswing" and not self.contact_detected:
                    if self.detect_contact(velocity):
                        self.current_phase = "Impact"
                        self.phase_history.append(("Impact", time.time()))
                        self.prev_wrist_pos = wrist_pos
                        return self.current_phase
                
                # Phase transitions
                if self.current_phase == "Stance" and velocity > self.config.getfloat("THRESHOLDS", "phase_velocity_threshold"):
                    self.current_phase = "Stride"
                elif self.current_phase == "Stride" and wrist_pos[1] > self.prev_wrist_pos[1]:  # Downward movement
                    self.current_phase = "Downswing"
                elif self.current_phase == "Impact" and velocity > self.config.getfloat("THRESHOLDS", "phase_velocity_threshold"):
                    self.current_phase = "Follow-through"
                elif self.current_phase == "Follow-through" and velocity < self.config.getfloat("THRESHOLDS", "phase_velocity_threshold")/3:  # More relaxed threshold
                    self.current_phase = "Recovery"
                
            self.prev_wrist_pos = wrist_pos
        except Exception as e:
            # Graceful degradation
            logging.error(f"Phase update error: {str(e)}")
            
        # Record phase change
        if not self.phase_history or self.phase_history[-1][0] != self.current_phase:
            self.phase_history.append((self.current_phase, time.time()))
            
        return self.current_phase

# ========================
# Biomechanics Analysis Module
# ========================

In [7]:
class BiomechanicsAnalyzer:
    def __init__(self, frame_shape, config):
        self.frame_shape = frame_shape
        self.config = config
        self.metrics_history = defaultdict(lambda: deque(maxlen=100))
        self.temporal_metrics = defaultdict(list)
        self.velocity_history = deque(maxlen=5)
        self.prev_wrist_pos = None
        
    def get_landmark_coords(self, landmarks, landmark_idx):
        """Get landmark coordinates with visibility check"""
        try:
            lmk = landmarks.landmark[landmark_idx]
            if lmk.visibility < self.config.getfloat("THRESHOLDS", "visibility_threshold"):
                return None
            return int(lmk.x * self.frame_shape[1]), int(lmk.y * self.frame_shape[0])
        except (IndexError, AttributeError):
            return None
    
    def calculate_angle(self, a, b, c):
        """Calculate angle between three points in degrees"""
        if None in (a, b, c):
            return None
            
        try:
            a, b, c = np.array(a), np.array(b), np.array(c)
            ba, bc = a - b, c - b
            
            # Prevent division by zero
            norm_ba = np.linalg.norm(ba)
            norm_bc = np.linalg.norm(bc)
            
            if norm_ba == 0 or norm_bc == 0:
                return None
                
            cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc)
            angle = np.degrees(np.arccos(np.clip(cosine_angle, -1, 1)))
            return angle
        except Exception as e:
            logging.warning(f"Angle calculation error: {str(e)}")
            return None
    
    def calculate_spine_lean(self, hip_mid, shoulder_mid):
        """Calculate spine lean angle relative to vertical axis"""
        if None in (hip_mid, shoulder_mid):
            return None
            
        try:
            dx = shoulder_mid[0] - hip_mid[0]
            dy = shoulder_mid[1] - hip_mid[1]
            
            if dy == 0:
                return 90.0 if dx != 0 else 0.0
                
            angle = np.degrees(np.arctan2(dx, abs(dy)))
            return abs(angle)
        except Exception as e:
            logging.warning(f"Spine lean calculation error: {str(e)}")
            return None
    
    def calculate_foot_direction(self, heel, toe):
        """Calculate foot direction angle relative to horizontal"""
        if None in (heel, toe):
            return None
            
        try:
            dx = toe[0] - heel[0]
            dy = toe[1] - heel[1]
            
            if dx == 0:
                return 90.0 if dy != 0 else 0.0
                
            angle = np.degrees(np.arctan2(dy, dx))
            return abs(angle)
        except Exception as e:
            logging.warning(f"Foot direction calculation error: {str(e)}")
            return None
    
    def analyze_frame(self, landmarks, current_phase=None):
        """Calculate biomechanical metrics for a frame"""
        metrics = {}
        
        try:
            # Front elbow angle (right-handed batsman)
            left_shoulder = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_SHOULDER)
            left_elbow = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_ELBOW)
            left_wrist = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_WRIST)
            metrics["elbow_angle"] = self.calculate_angle(left_shoulder, left_elbow, left_wrist)
            
            # Spine lean
            left_hip = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_HIP)
            right_hip = self.get_landmark_coords(landmarks, PoseLandmark.RIGHT_HIP)
            hip_mid = None
            if left_hip and right_hip:
                hip_mid = ((left_hip[0] + right_hip[0])//2, (left_hip[1] + right_hip[1])//2)
            
            left_shoulder = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_SHOULDER)
            right_shoulder = self.get_landmark_coords(landmarks, PoseLandmark.RIGHT_SHOULDER)
            shoulder_mid = None
            if left_shoulder and right_shoulder:
                shoulder_mid = ((left_shoulder[0] + right_shoulder[0])//2, 
                               (left_shoulder[1] + right_shoulder[1])//2)
            
            metrics["spine_lean"] = self.calculate_spine_lean(hip_mid, shoulder_mid) if hip_mid and shoulder_mid else None
            
            # Head-over-knee alignment (only during swing phases)
            if current_phase and current_phase in ["Downswing", "Impact", "Follow-through"]:
                head = self.get_landmark_coords(landmarks, PoseLandmark.NOSE)
                knee = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_KNEE)
                if head and knee:
                    metrics["head_knee_distance"] = abs(head[0] - knee[0]) / self.frame_shape[1]  # Normalized
            else:
                metrics["head_knee_distance"] = None
            
            # Front foot direction
            heel = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_HEEL)
            toe = self.get_landmark_coords(landmarks, PoseLandmark.LEFT_FOOT_INDEX)
            metrics["foot_direction"] = self.calculate_foot_direction(heel, toe)
            
            # Calculate wrist velocity for phase segmentation
            wrist = landmarks.landmark[PoseLandmark.RIGHT_WRIST]
            wrist_pos = (wrist.x * self.frame_shape[1], wrist.y * self.frame_shape[0])
            
            if self.prev_wrist_pos:
                velocity = np.sqrt((wrist_pos[0] - self.prev_wrist_pos[0])**2 + 
                                  (wrist_pos[1] - self.prev_wrist_pos[1])**2)
                metrics["wrist_velocity"] = velocity
            else:
                metrics["wrist_velocity"] = 0
                
            self.prev_wrist_pos = wrist_pos
            
        except Exception as e:
            metrics["wrist_velocity"] = 0
            logging.warning(f"Frame analysis error: {str(e)}")
        
        # Store metrics
        for key, value in metrics.items():
            if value is not None:
                self.metrics_history[key].append(value)
                self.temporal_metrics[key].append(value)
                
        return metrics

# ========================
# Visualization Module
# ========================

In [8]:
class VisualizationEngine:
    def __init__(self, frame_shape, config):
        self.frame_shape = frame_shape
        self.config = config
        self.font = cv2.FONT_HERSHEY_SIMPLEX
        self.colors = {
            "good": (0, 255, 0),
            "bad": (0, 0, 255),
            "neutral": (255, 255, 255),
            "phase": (0, 165, 255)
        }
        self.current_fps = 0
    
    def draw_metrics(self, frame, metrics, current_phase):
        """Display real-time metrics on frame with new layout"""
        height, width = frame.shape[:2]
        
        # FPS at top right (smaller)
        fps_text = f"FPS: {self.current_fps:.1f}"
        cv2.putText(frame, fps_text, (width - 150, 30), 
                   self.font, 0.6, self.colors["neutral"], 1)
        
        # Metrics at bottom left
        y_offset = height - 40
        
        # Phase display
        cv2.putText(frame, f"Phase: {current_phase}", (10, y_offset), 
                   self.font, 0.6, self.colors["phase"], 1)
        y_offset -= 30
        
        # Elbow angle
        if metrics.get("elbow_angle") is not None:
            # Compare to ideal reference
            ideal = self.config.getfloat("REFERENCE", "elbow_angle_ideal", fallback=135)
            diff = abs(metrics['elbow_angle'] - ideal)
            color = self.colors["good"] if diff < 15 else self.colors["bad"]
            
            text = f"Elbow: {metrics['elbow_angle']:.1f}°"
            cv2.putText(frame, text, (10, y_offset), self.font, 0.5, color, 1)
            y_offset -= 25
            
            # Only show feedback during relevant phases
            if current_phase in ["Downswing", "Impact", "Follow-through"]:
                if (self.config.getfloat("THRESHOLDS", "elbow_angle_min") <= 
                    metrics['elbow_angle'] <= 
                    self.config.getfloat("THRESHOLDS", "elbow_angle_max")):
                    cv2.putText(frame, "✓ Good elbow", (10, y_offset), 
                               self.font, 0.4, self.colors["good"], 1)
                else:
                    cv2.putText(frame, "✗ Improve elbow", (10, y_offset), 
                               self.font, 0.4, self.colors["bad"], 1)
                y_offset -= 25
        
        # Head position (only during swing phases)
        if current_phase in ["Downswing", "Impact", "Follow-through"] and metrics.get("head_knee_distance") is not None:
            ideal = self.config.getfloat("REFERENCE", "head_knee_ideal", fallback=0.05)
            diff = abs(metrics['head_knee_distance'] - ideal)
            color = self.colors["good"] if diff < 0.05 else self.colors["bad"]
            
            text = f"Head-Knee: {metrics['head_knee_distance']:.3f}"
            cv2.putText(frame, text, (10, y_offset), self.font, 0.5, color, 1)
            y_offset -= 25
            
            if metrics['head_knee_distance'] < self.config.getfloat("THRESHOLDS", "head_knee_max"):
                cv2.putText(frame, "✓ Good head position", (10, y_offset), 
                           self.font, 0.4, self.colors["good"], 1)
            else:
                cv2.putText(frame, "✗ Head not aligned", (10, y_offset), 
                           self.font, 0.4, self.colors["bad"], 1)
            y_offset -= 25
            
        # Spine lean
        if metrics.get("spine_lean") is not None:
            ideal = self.config.getfloat("REFERENCE", "spine_lean_ideal", fallback=10)
            diff = abs(metrics['spine_lean'] - ideal)
            color = self.colors["good"] if diff < 5 else self.colors["bad"]
            
            text = f"Spine: {metrics['spine_lean']:.1f}°"
            cv2.putText(frame, text, (10, y_offset), self.font, 0.5, color, 1)
            y_offset -= 25
            
        # Foot direction
        if metrics.get("foot_direction") is not None:
            ideal = self.config.getfloat("REFERENCE", "foot_angle_ideal", fallback=15)
            diff = abs(metrics['foot_direction'] - ideal)
            color = self.colors["good"] if diff < 10 else self.colors["bad"]
            
            text = f"Foot: {metrics['foot_direction']:.1f}°"
            cv2.putText(frame, text, (10, y_offset), self.font, 0.5, color, 1)
    
    def draw_bat(self, frame, bat_line):
        """Draw detected bat line"""
        if bat_line is not None:
            cv2.line(frame, (bat_line[0], bat_line[1]), (bat_line[2], bat_line[3]), (0, 255, 255), 3)

# ========================
# Evaluation Module
# ========================

In [9]:
class ShotEvaluator:
    def __init__(self, config, temporal_metrics):
        self.config = config
        self.temporal_metrics = temporal_metrics
        
    def calculate_score(self, values, ideal_range, reverse=False):
        """Calculate a score (1-10) based on metric values and ideal range"""
        if not values:
            return 5  # Default average score
        
        try:
            avg_value = np.mean(values)
            
            # Get ideal value from config
            if isinstance(ideal_range, dict):
                key = list(ideal_range.keys())[0]
                min_val, max_val = ideal_range[key]
                ideal_value = self.config.getfloat("REFERENCE", f"{key}_ideal", fallback=(min_val+max_val)/2)
            else:
                min_val, max_val = ideal_range
                ideal_value = (min_val + max_val) / 2
            
            # Calculate deviation from ideal reference
            deviation = abs(avg_value - ideal_value)
            max_deviation = max(abs(min_val - ideal_value), abs(max_val - ideal_value))
            
            if max_deviation == 0:
                return 10
            
            # Adjust score based on deviation
            base_score = 10 * (1 - min(deviation / max_deviation, 1))
            
            # Apply reverse scoring if needed
            return max(1, base_score if not reverse else 10 - base_score)
        except Exception as e:
            logging.warning(f"Score calculation error: {str(e)}")
            return 5
    
    def calculate_smoothness(self, values):
        """Calculate temporal smoothness of a metric"""
        if len(values) < 2:
            return 5  # Default score
        
        try:
            deltas = np.abs(np.diff(values))
            avg_delta = np.mean(deltas)
            return max(1, 10 - min(avg_delta * 10, 9))
        except Exception as e:
            logging.warning(f"Smoothness calculation error: {str(e)}")
            return 5
    
    def determine_skill_grade(self, total_score):
        """Map total score to skill grade"""
        if total_score >= 8.5:
            return "Advanced"
        elif total_score >= 6.5:
            return "Intermediate"
        else:
            return "Beginner"
    
    def generate_evaluation(self, metrics_history, phase_history):
        """Generate final shot evaluation report"""
        if not any(metrics_history.values()):
            return {"error": "No pose data available for evaluation"}
        
        try:
            # Calculate category scores
            evaluation = {
                "Footwork": {
                    "score": self.calculate_score(
                        list(metrics_history.get("foot_direction", [])),
                        (self.config.getfloat("THRESHOLDS", "foot_angle_min"),
                         self.config.getfloat("THRESHOLDS", "foot_angle_max"))
                    ),
                    "smoothness": self.calculate_smoothness(list(metrics_history.get("foot_direction", []))),
                    "feedback": "Maintain stable base with front foot pointing toward cover region"
                },
                "Head Position": {
                    "score": self.calculate_score(
                        list(metrics_history.get("head_knee_distance", [])),
                        (0, self.config.getfloat("THRESHOLDS", "head_knee_max")),
                        reverse=True
                    ),
                    "smoothness": self.calculate_smoothness(list(metrics_history.get("head_knee_distance", []))),
                    "feedback": "Keep head aligned with front knee throughout the shot"
                },
                "Swing Control": {
                    "score": self.calculate_score(
                        list(metrics_history.get("elbow_angle", [])),
                        (self.config.getfloat("THRESHOLDS", "elbow_angle_min"),
                         self.config.getfloat("THRESHOLDS", "elbow_angle_max"))
                    ),
                    "smoothness": self.calculate_smoothness(list(metrics_history.get("elbow_angle", []))),
                    "feedback": "Maintain high front elbow for better bat control"
                },
                "Balance": {
                    "score": self.calculate_score(
                        list(metrics_history.get("spine_lean", [])),
                        (0, self.config.getfloat("THRESHOLDS", "spine_lean_max")),
                        reverse=True
                    ),
                    "smoothness": self.calculate_smoothness(list(metrics_history.get("spine_lean", []))),
                    "feedback": "Maintain upright posture with controlled spine lean"
                },
                "Follow-through": {
                    "score": 7.5,  # Placeholder for bat tracking implementation
                    "smoothness": 6.5,
                    "feedback": "Complete swing motion with weight transfer to front foot"
                },
                "Phase Timing": {
                    "phases": phase_history,
                    "feedback": "Analyze phase durations for optimal timing"
                }
            }
            
            # Calculate total score and skill grade
            category_scores = [cat["score"] for cat in evaluation.values() 
                             if isinstance(cat, dict) and "score" in cat]
            evaluation["Total Score"] = np.mean(category_scores) if category_scores else 0
            evaluation["Skill Grade"] = self.determine_skill_grade(evaluation["Total Score"])
            
            return evaluation
        except Exception as e:
            logging.error(f"Evaluation generation error: {str(e)}")
            return {"error": f"Evaluation failed: {str(e)}"}
    
    def generate_temporal_plot(self, output_path):
        """Generate temporal metrics plot"""
        if not any(self.temporal_metrics.values()):
            return
            
        try:
            plt.figure(figsize=(12, 8))
            
            # Plot elbow angle
            if self.temporal_metrics.get("elbow_angle"):
                plt.subplot(2, 2, 1)
                plt.plot(self.temporal_metrics["elbow_angle"], label='Elbow Angle')
                plt.axhline(y=self.config.getfloat("REFERENCE", "elbow_angle_ideal", fallback=135), 
                           color='r', linestyle='--', label='Ideal')
                plt.title('Elbow Angle Over Time')
                plt.ylabel('Angle (degrees)')
                plt.legend()
            
            # Plot spine lean
            if self.temporal_metrics.get("spine_lean"):
                plt.subplot(2, 2, 2)
                plt.plot(self.temporal_metrics["spine_lean"], label='Spine Lean')
                plt.axhline(y=self.config.getfloat("REFERENCE", "spine_lean_ideal", fallback=10), 
                           color='r', linestyle='--', label='Ideal')
                plt.title('Spine Lean Over Time')
                plt.ylabel('Angle (degrees)')
                plt.legend()
            
            # Plot head-knee distance
            if self.temporal_metrics.get("head_knee_distance"):
                plt.subplot(2, 2, 3)
                plt.plot(self.temporal_metrics["head_knee_distance"], label='Head-Knee Distance')
                plt.axhline(y=self.config.getfloat("REFERENCE", "head_knee_ideal", fallback=0.05), 
                           color='r', linestyle='--', label='Ideal')
                plt.title('Head-Knee Alignment Over Time')
                plt.ylabel('Normalized Distance')
                plt.xlabel('Frame Number')
                plt.legend()
            
            # Plot foot direction
            if self.temporal_metrics.get("foot_direction"):
                plt.subplot(2, 2, 4)
                plt.plot(self.temporal_metrics["foot_direction"], label='Foot Direction')
                plt.axhline(y=self.config.getfloat("REFERENCE", "foot_angle_ideal", fallback=15), 
                           color='r', linestyle='--', label='Ideal')
                plt.title('Foot Direction Over Time')
                plt.ylabel('Angle (degrees)')
                plt.xlabel('Frame Number')
                plt.legend()
            
            plt.tight_layout()
            plt.savefig(output_path)
            plt.close()
        except Exception as e:
            logging.error(f"Plot generation error: {str(e)}")

# ========================
# Report Generator
# ========================

In [10]:
class ReportGenerator:
    @staticmethod
    def generate_html(evaluation, output_path, plot_path):
        """Generate HTML report with evaluation results"""
        try:
            html = f"""
            <!DOCTYPE html>
            <html>
            <head>
                <title>Cricket Shot Analysis Report</title>
                <style>
                    body {{ font-family: Arial, sans-serif; margin: 20px; }}
                    table {{ border-collapse: collapse; width: 100%; }}
                    th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                    th {{ background-color: #f2f2f2; }}
                    .good {{ background-color: #d4edda; }}
                    .average {{ background-color: #fff3cd; }}
                    .poor {{ background-color: #f8d7da; }}
                    .section {{ margin-bottom: 30px; }}
                </style>
            </head>
            <body>
                <h1>Cricket Shot Analysis Report</h1>
                <p>Generated on {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
                
                <div class="section">
                    <h2>Overall Evaluation</h2>
                    <p><strong>Total Score:</strong> {evaluation.get('Total Score', 0):.1f}/10</p>
                    <p><strong>Skill Grade:</strong> {evaluation.get('Skill Grade', 'N/A')}</p>
                </div>
                
                <div class="section">
                    <h2>Technical Breakdown</h2>
                    <table>
                        <tr>
                            <th>Category</th>
                            <th>Score</th>
                            <th>Smoothness</th>
                            <th>Feedback</th>
                        </tr>
            """
            
            # Add category rows
            for category, data in evaluation.items():
                if category in ["Total Score", "Skill Grade", "Phase Timing"]:
                    continue
                    
                if not isinstance(data, dict) or "score" not in data:
                    continue
                    
                score_class = "good" if data["score"] >= 8 else "average" if data["score"] >= 6 else "poor"
                smoothness_class = "good" if data["smoothness"] >= 8 else "average" if data["smoothness"] >= 6 else "poor"
                
                html += f"""
                        <tr>
                            <td>{category}</td>
                            <td class="{score_class}">{data['score']:.1f}/10</td>
                            <td class="{smoothness_class}">{data['smoothness']:.1f}/10</td>
                            <td>{data['feedback']}</td>
                        </tr>
                """
            
            html += """
                    </table>
                </div>
            """
            
            # Add phase timing if available
            if "Phase Timing" in evaluation and evaluation["Phase Timing"].get("phases"):
                html += """
                <div class="section">
                    <h2>Phase Timing</h2>
                    <table>
                        <tr>
                            <th>Phase</th>
                            <th>Start Time</th>
                            <th>Duration (s)</th>
                        </tr>
                """
                
                phases = evaluation["Phase Timing"]["phases"]
                for i in range(len(phases)):
                    phase, start_time = phases[i]
                    end_time = phases[i+1][1] if i < len(phases)-1 else start_time + 1
                    duration = end_time - start_time
                    
                    html += f"""
                        <tr>
                            <td>{phase}</td>
                            <td>{start_time:.2f}</td>
                            <td>{duration:.2f}</td>
                        </tr>
                    """
                
                html += """
                    </table>
                </div>
                """
            
            # Add temporal plot
            if os.path.exists(plot_path):
                html += f"""
                <div class="section">
                    <h2>Temporal Metrics</h2>
                    <img src="{os.path.basename(plot_path)}" alt="Temporal Metrics" width="100%">
                </div>
                """
            
            html += """
            </body>
            </html>
            """
            
            with open(output_path, 'w') as f:
                f.write(html)
            
            # Copy plot to same directory as report for relative reference
            if os.path.exists(plot_path):
                import shutil
                plot_filename = os.path.basename(plot_path)
                report_dir = os.path.dirname(output_path)
                shutil.copy2(plot_path, os.path.join(report_dir, plot_filename))
        except Exception as e:
            logging.error(f"HTML report generation error: {str(e)}")

# ========================
# Main Processing Pipeline
# ========================

In [11]:
class CricketShotAnalyzer:
    def __init__(self, config_path="config.ini"):
        # Initialize configuration
        self.config = ConfigManager(config_path)
        
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler("analysis.log"),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger()
        
        # Initialize processing modules
        self.pose_estimator = None
        self.video_info = {}
        self.frame_count = 0
        self.prev_time = 0
        self.fps_history = deque(maxlen=100)
        
    def analyze_video(self, input_path=None, output_video_path=None, output_json_path=None):
        """Main video processing pipeline"""
        cap = None
        out = None
        
        try:
            # Get paths from config if not provided
            if input_path is None:
                input_path = self.config.get("PATHES", "input_video")
            if output_video_path is None:
                output_video_path = self.config.get("PATHES", "output_video")
            if output_json_path is None:
                output_json_path = self.config.get("PATHES", "output_json")
                
            # Create output directory
            os.makedirs(os.path.dirname(output_video_path), exist_ok=True)
            
            # Open video
            cap = cv2.VideoCapture(input_path)
            if not cap.isOpened():
                raise IOError(f"Cannot open video file: {input_path}")
            
            # Get video properties
            self.video_info = {
                "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
                "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
                "fps": cap.get(cv2.CAP_PROP_FPS),
                "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            }
            self.logger.info(f"Processing video: {input_path}")
            self.logger.info(f"Resolution: {self.video_info['width']}x{self.video_info['height']}")
            self.logger.info(f"FPS: {self.video_info['fps']:.2f}, Frames: {self.video_info['frame_count']}")
            
            # Initialize modules
            self.pose_estimator = PoseEstimator(self.config)
            batsman_tracker = BatsmanTracker((self.video_info["width"], self.video_info["height"]), self.config)
            biomechanics_analyzer = BiomechanicsAnalyzer((self.video_info["width"], self.video_info["height"]), self.config)
            visualizer = VisualizationEngine((self.video_info["width"], self.video_info["height"]), self.config)
            bat_tracker = BatTracker(self.config)
            phase_segmenter = PhaseSegmenter(self.config)
            
            # Initialize video writer
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(
                output_video_path,
                fourcc,
                self.video_info['fps'],
                (self.video_info['width'], self.video_info['height'])
            )
            
            # Process frames
            self.prev_time = time.time()
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                    
                # Calculate FPS
                current_time = time.time()
                fps = 1 / (current_time - self.prev_time) if self.prev_time > 0 else 0
                self.prev_time = current_time
                self.fps_history.append(fps)
                avg_fps = sum(self.fps_history)/len(self.fps_history) if self.fps_history else 0
                visualizer.current_fps = avg_fps
                
                # Process frame
                processed_frame = frame.copy()
                results = self.pose_estimator.estimate(frame)
                landmarks = batsman_tracker.track_batsman(results)
                current_phase = "Stance"  # Default
                
                if landmarks:
                    # Draw pose landmarks
                    self.pose_estimator.draw_landmarks(processed_frame, landmarks)
                    
                    # Get current phase first
                    current_phase = phase_segmenter.update_phase(landmarks, (self.video_info["width"], self.video_info["height"]))
                    
                    # Analyze biomechanics
                    metrics = biomechanics_analyzer.analyze_frame(landmarks, current_phase)
                    
                    # Detect bat
                    hand_pos = None
                    try:
                        # Get right wrist position for bat tracking
                        hand_lmk = landmarks.landmark[PoseLandmark.RIGHT_WRIST]
                        if hand_lmk.visibility > self.config.getfloat("THRESHOLDS", "visibility_threshold"):
                            hand_pos = (int(hand_lmk.x * self.video_info["width"]), 
                                       int(hand_lmk.y * self.video_info["height"]))
                    except Exception as e:
                        self.logger.warning(f"Hand position error: {str(e)}")
                    
                    bat_line = bat_tracker.detect_bat(frame, hand_pos)
                    visualizer.draw_bat(processed_frame, bat_line)
                    
                    # Visualize metrics and phase
                    visualizer.draw_metrics(processed_frame, metrics, current_phase)
                
                # Write frame
                out.write(processed_frame)
                
                # Optional: Display frame (comment out for headless operation)
                cv2.imshow('Cricket Shot Analysis', processed_frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                    
                self.frame_count += 1
                
                # Log progress
                if self.frame_count % 50 == 0:
                    self.logger.info(f"Processed {self.frame_count}/{self.video_info['frame_count']} frames, Avg FPS: {avg_fps:.1f}")
            
            # Generate temporal plot
            plot_path = self.config.get("PATHES", "output_plot")
            evaluator = ShotEvaluator(self.config, biomechanics_analyzer.temporal_metrics)
            evaluator.generate_temporal_plot(plot_path)
            
            # Generate final evaluation
            evaluation = evaluator.generate_evaluation(
                biomechanics_analyzer.metrics_history,
                phase_segmenter.phase_history
            )
            
            # Save evaluation
            with open(output_json_path, 'w') as f:
                json.dump(evaluation, f, indent=2)
            
            # Generate HTML report
            report_path = self.config.get("PATHES", "output_report")
            ReportGenerator.generate_html(evaluation, report_path, plot_path)
            
            # Log performance
            self.logger.info(f"Processing complete! Avg FPS: {avg_fps:.1f}")
            self.logger.info(f"Output saved to: {output_video_path}")
            self.logger.info(f"Evaluation saved to: {output_json_path}")
            self.logger.info(f"Report generated: {report_path}")
            
            return evaluation
            
        except Exception as e:
            self.logger.error(f"Error processing video: {str(e)}")
            return {"error": str(e)}
        finally:
            # Clean up resources
            if cap is not None:
                cap.release()
            if out is not None:
                out.release()
            cv2.destroyAllWindows()
            if self.pose_estimator:
                self.pose_estimator.close()

# ===========================
# Main Execution
# ===========================

In [12]:
if __name__ == "__main__":
    # Create output directory
    os.makedirs("output", exist_ok=True)
    
    # Create default config file if missing
    if not os.path.exists("config.ini"):
        with open("config.ini", "w") as f:
            f.write("")
    
    # Initialize and run analyzer
    analyzer = CricketShotAnalyzer("config.ini")
    evaluation = analyzer.analyze_video()
    
    if "error" not in evaluation:
        print("Processing complete!")
        print(f"Skill Grade: {evaluation.get('Skill Grade', 'N/A')}")
        print(f"Total Score: {evaluation.get('Total Score', 0):.1f}/10")
    else:
        print(f"Error: {evaluation['error']}")

2025-08-14 23:26:17,526 - INFO - Processing video: test.mp4
2025-08-14 23:26:17,527 - INFO - Resolution: 360x640
2025-08-14 23:26:17,528 - INFO - FPS: 30.00, Frames: 136
2025-08-14 23:26:23,500 - INFO - Processed 50/136 frames, Avg FPS: 9.9
2025-08-14 23:26:28,885 - INFO - Processed 100/136 frames, Avg FPS: 9.7
  vx, vy, x, y = float(vx), float(vy), float(x), float(y)
2025-08-14 23:26:33,022 - ERROR - HTML report generation error: 'output/temporal_metrics.png' and 'output\\temporal_metrics.png' are the same file
2025-08-14 23:26:33,023 - INFO - Processing complete! Avg FPS: 9.5
2025-08-14 23:26:33,023 - INFO - Output saved to: output/annotated_video.mp4
2025-08-14 23:26:33,024 - INFO - Evaluation saved to: output/evaluation.json
2025-08-14 23:26:33,024 - INFO - Report generated: output/report.html


Processing complete!
Skill Grade: Beginner
Total Score: 4.1/10
