# Gemini 

In [None]:
!pip install ultralytics supervision numpy opencv-python scikit-learn pandas


In [None]:
!pip install --upgrade ultralytics torch torchvision

In [None]:
!pip install mplsoccer
import sys
import os
import cv2
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

# Machine Learning & Computer Vision Libraries
from ultralytics import YOLO
import supervision as sv
from sklearn.cluster import KMeans
import easyocr

# Google Gemini for AI Commentary
import google.generativeai as genai
from kaggle_secrets import UserSecretsClient
import time

# Plotting for Heatmaps
from mplsoccer import Pitch

# --- Video Utilities ---
def read_video(video_path):
    """Reads a video file and returns a list of its frames."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

def save_video(output_video_frames, output_video_path):
    """Saves a list of frames as a video file."""
    if not output_video_frames:
        print("No frames to save.")
        return
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, 24, (output_video_frames[0].shape[1], output_video_frames[0].shape[0]))
    for frame in output_video_frames:
        out.write(frame)
    out.release()

# --- BBox Utilities ---
def get_center_of_bbox(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def get_bbox_width(bbox):
    return int(bbox[2] - bbox[0])

def get_foot_position(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int(y2)
    
def measure_distance(p1, p2):
    return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5

def measure_xy_distance(p1, p2):
    return p1[0] - p2[0], p1[1] - p2[1]

In [None]:
class ImprovedCommentaryEngine:
    def __init__(self, clip_duration_seconds=5, fps=24):
        self.clip_length_frames = int(clip_duration_seconds * fps)
        self.frame_buffer = deque(maxlen=self.clip_length_frames)
        self.temp_video_path = "/kaggle/working/temp_commentary_clip.mp4"
        self.latest_commentary = "Match analysis is starting..."
        self.fps = fps
        self.match_context = {
            'possession_changes': [], 'recent_events': [],
            'ball_position_history': [], 'player_movements': []
        }
        
        print("🎙️ Initializing Enhanced Gemini Commentary Engine...")
        try:
            # user_secrets = UserSecretsClient()
            api_key = "abcd"
            genai.configure(api_key=api_key)
            self.model = genai.GenerativeModel('models/gemini-2.5-flash')
            print("✅ Gemini 2.5 Flash model loaded successfully.")
        except Exception as e:
            self.model = None
            print(f"⚠️ Could not initialize Gemini model: {e}")

    def update_with_context(self, frame, tracks_data, frame_num, events_data=None):
        if not self.model: return
        
        game_context = self._extract_game_context(tracks_data, frame_num, events_data)
        
        self.match_context['recent_events'].append(game_context)
        if len(self.match_context['recent_events']) > 10:
            self.match_context['recent_events'].pop(0)
        
        self.frame_buffer.append(frame)
        
        if len(self.frame_buffer) == self.clip_length_frames:
            print("Generating tactical summary...")
            new_comment = self._generate_contextual_commentary(game_context)
            if new_comment:
                self.latest_commentary = new_comment
            self.frame_buffer.clear()

    def _extract_game_context(self, tracks_data, frame_num, events_data):
        context = {
            'frame_num': frame_num,
            'timestamp': f"{int(frame_num / (self.fps * 60))}:{int((frame_num / self.fps) % 60):02d}",
            'players_detected': len(tracks_data['players'][frame_num]),
            'ball_detected': 1 in tracks_data['ball'][frame_num],
            'possession': None, 'ball_speed': 0, 'recent_events': []
        }
        
        for player_id, player_info in tracks_data['players'][frame_num].items():
            if player_info.get('has_ball', False):
                context['possession'] = f"Player {player_id} (Team {player_info.get('team', 'Unknown')})"
                break
        
        if events_data is not None and not events_data.empty:
            recent_events = events_data[
                (events_data['minute'] * 60 + events_data['second']) >= (frame_num / self.fps - 10)
            ].tail(3)
            context['recent_events'] = recent_events.to_dict('records')
        
        return context

    def _generate_contextual_commentary(self, game_context):
        video_file = None
        try:
            height, width, _ = self.frame_buffer[0].shape
            writer = cv2.VideoWriter(self.temp_video_path, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (width, height))
            for frame in self.frame_buffer:
                writer.write(frame)
            writer.release()

            video_file = genai.upload_file(path=self.temp_video_path)
            while video_file.state.name == "PROCESSING":
                time.sleep(2)
                video_file = genai.get_file(video_file.name)
            
            if video_file.state.name == "FAILED":
                return "Video processing failed."
            
            context_prompt = self._create_detailed_prompt(game_context)
            response = self.model.generate_content([context_prompt, video_file])
            return response.text.strip().replace('\n', ' ')
            
        except Exception as e:
            print(f"Commentary generation error: {e}")
            return self._generate_fallback_commentary(game_context)
        finally:
            if video_file: genai.delete_file(video_file.name)
            if os.path.exists(self.temp_video_path): os.remove(self.temp_video_path)

    def _create_detailed_prompt(self, context):
        prompt = f"""You are a professional football (soccer) tactical analyst.

        CURRENT GAME STATE:
        - Match Time: {context['timestamp']}
        - Ball Possession: {context.get('possession', 'Unclear')}
        - Recent Match Events: {self._format_recent_events(context.get('recent_events', []))}

        INSTRUCTIONS:
        1. Analyze the short video clip of a football match.
        2. Provide a brief, factual, tactical summary of the most significant action.
        3. Describe the sequence of play objectively. Example: "The player in red receives a pass, moves past a defender, and attempts a shot which is blocked."
        4. Do NOT use emotional or exciting commentary language like "incredible!" or "what a save!".
        5. Your entire response must be a single, concise sentence (max 25 words).

        Analyze the clip and provide your tactical summary:"""
        return prompt

    def _format_recent_events(self, events):
        if not events: return "No recent significant events detected."
        
        formatted = []
        for event in events[-3:]:
            if isinstance(event, dict):
                event_type = event.get('type_name', 'Unknown')
                team = event.get('team_name', 'Unknown Team')
                formatted.append(f"- {event_type} by {team}")
        
        return "\n".join(formatted) if formatted else "No recent significant events detected."

    def _generate_fallback_commentary(self, context):
        if context.get('possession'):
            return f"Play continues with {context['possession']} in possession."
        return "The match continues with both teams looking for opportunities."

class RealTimeTicker:
    """
    Generates a simple, real-time text commentary for each frame based on game state.
    """
    def __init__(self, fps=24):
        self.fps = fps
        self.last_player_id = -1
        self.last_team_id = -1
        self.ticker_text = "Match begins!"
        self.text_display_frames = 0

    def _get_ball_carrier(self, player_track):
        for player_id, data in player_track.items():
            if data.get('has_ball', False):
                return player_id, data.get('team')
        return -1, -1

    def update(self, tracks, frame_num):
        if self.text_display_frames > 0:
            self.text_display_frames -= 1
            return self.ticker_text
        
        player_track = tracks['players'][frame_num]
        current_player_id, current_team_id = self._get_ball_carrier(player_track)

        if (current_player_id != -1 and self.last_player_id != -1 and 
            current_player_id != self.last_player_id and current_team_id == self.last_team_id):
            self.ticker_text = f"Pass from Player {self.last_player_id} to Player {current_player_id}."
            self.text_display_frames = self.fps * 2
        
        elif current_player_id != -1 and self.last_team_id != -1 and current_team_id != self.last_team_id:
            self.ticker_text = f"Team {current_team_id} gains possession!"
            self.text_display_frames = self.fps * 2
        
        else:
            if current_player_id != -1:
                self.ticker_text = f"Player {current_player_id} (Team {current_team_id}) on the ball."
            else:
                self.ticker_text = "Ball is loose."

        if current_player_id != -1:
            self.last_player_id = current_player_id
            self.last_team_id = current_team_id
        else:
            self.last_player_id = -1
            
        return self.ticker_text

In [None]:
class JerseyNumberRecognizer:
    def __init__(self):
        self.reader = easyocr.Reader(['en'], gpu=True)
        self.jersey_cache = {}
        print("✅ Jersey OCR module initialized.")

    def recognize_jersey_number(self, player_crop, tracker_id):
        if tracker_id in self.jersey_cache: return self.jersey_cache[tracker_id]
        if player_crop.size == 0: return None
        
        crop_gray = cv2.cvtColor(player_crop, cv2.COLOR_BGR2GRAY)
        results = self.reader.readtext(crop_gray, allowlist='0123456789', detail=1)

        best_result = None
        for (bbox, text, prob) in results:
            if prob > 0.6 and text.isdigit() and len(text) <= 2:
                if best_result is None or prob > best_result[2]:
                    best_result = (bbox, text, prob)
        
        if best_result:
            self.jersey_cache[tracker_id] = best_result[1]
            return best_result[1]
        
        return None

class Tracker:
    def __init__(self, model_name='yolov8x.pt'):
        self.model = YOLO(model_name)
        self.tracker = sv.ByteTrack()
        self.jersey_recognizer = JerseyNumberRecognizer()

    def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)

        tracks = {"players": [], "referees": [], "ball": []}
        
        for frame_num, frame in enumerate(frames):
            if frame_num % 20 == 0: print(f"Processing frame {frame_num}/{len(frames)}")
            results = self.model.predict(frame, conf=0.1)[0]
            detections = sv.Detections.from_ultralytics(results)
            
            # Filter for players (class_id for 'person' is typically 0)
            player_detections = detections[detections.class_id == 0]
            tracked_players = self.tracker.update_with_detections(player_detections)
            
            tracks["players"].append({})
            tracks["referees"].append({})
            
            for detection_data in tracked_players:
                bbox = detection_data[0]
                track_id = detection_data[4]
                
                player_crop = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
                jersey_num = self.jersey_recognizer.recognize_jersey_number(player_crop, track_id)
                tracks["players"][frame_num][track_id] = {"bbox": bbox.tolist(), "jersey_number": jersey_num}

            # Filter for ball (class_id for 'sports ball' is typically 32)
            ball_detections = detections[detections.class_id == 32]
            tracks["ball"].append({})
            if len(ball_detections) > 0:
                tracks["ball"][frame_num][1] = {"bbox": ball_detections.xyxy[0].tolist()}
        
        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(tracks, f)
        return tracks

    def add_position_to_tracks(self, tracks):
        for type, obj_tracks in tracks.items():
            for frame_num, track in enumerate(obj_tracks):
                for id, info in track.items():
                    bbox = info['bbox']
                    info['position'] = get_foot_position(bbox) if type != 'ball' else get_center_of_bbox(bbox)
    
    def interpolate_ball_positions(self, ball_positions):
        ball_bboxes = [x.get(1, {}).get('bbox', []) for x in ball_positions]
        df = pd.DataFrame(ball_bboxes, columns=['x1', 'y1', 'x2', 'y2']).interpolate().bfill()
        return [{1: {"bbox": x}} for x in df.to_numpy().tolist()]

    def _draw_player_ellipse(self, frame, bbox, color, track_id, jersey_num):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)
        cv2.ellipse(frame, center=(x_center, y2), axes=(int(width), int(0.35 * width)), angle=0.0, startAngle=-45, endAngle=235, color=color, thickness=2, lineType=cv2.LINE_4)
        
        label = f"#{jersey_num}" if jersey_num else str(track_id)
        (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        rect_w, rect_h = w + 10, h + 10
        x1_rect, y1_rect = x_center - rect_w//2, (y2 - rect_h//2) + 15
        
        cv2.rectangle(frame, (x1_rect, y1_rect), (x1_rect + rect_w, y1_rect + rect_h), color, cv2.FILLED)
        cv2.putText(frame, label, (x1_rect + 5, y1_rect + h + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
        return frame

    def _draw_triangle(self, frame, bbox, color):
        y, x = int(bbox[1]), int(get_center_of_bbox(bbox)[0])
        points = np.array([[x, y], [x - 10, y - 20], [x + 10, y - 20]])
        cv2.drawContours(frame, [points], 0, color, cv2.FILLED)
        cv2.drawContours(frame, [points], 0, (0, 0, 0), 2)
        return frame

    def _draw_team_ball_control(self, frame, frame_num, team_ball_control):
        overlay = frame.copy()
        cv2.rectangle(overlay, (10, 10), (350, 70), (255, 255, 255), -1)
        cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
        
        team_1_frames = np.sum(team_ball_control[:frame_num + 1] == 1)
        team_2_frames = np.sum(team_ball_control[:frame_num + 1] == 2)
        total = max(1, team_1_frames + team_2_frames)
        p1 = (team_1_frames / total) * 100
        p2 = (team_2_frames / total) * 100
        
        cv2.putText(frame, f"Team 1 Possession: {p1:.1f}%", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        cv2.putText(frame, f"Team 2 Possession: {p2:.1f}%", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        return frame

    def _draw_commentary_overlay(self, frame, text):
        h, w, _ = frame.shape
        font = cv2.FONT_HERSHEY_SIMPLEX
        thickness = 2
        
        font_scale = 1.0
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        
        target_w = w * 0.9
        if text_w > target_w:
            font_scale = target_w / text_w
            
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)

        banner_h = text_h + 20
        overlay = frame.copy()
        cv2.rectangle(overlay, (0, h - banner_h), (w, h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
        
        text_x = (w - text_w) // 2
        text_y = h - 10
        cv2.putText(frame, text, (text_x, text_y), font, font_scale, (255, 255, 255), thickness)
        
        return frame

class EventDetector:
    def __init__(self):
        self.shot_speed_threshold_mps = 15
        self.frame_rate = 24

    def detect_events(self, tracks):
        player_assigner = PlayerBallAssigner()
        ball_possession_log = []
        for frame_num in range(len(tracks['players'])):
            player_track = tracks['players'][frame_num]
            ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
            assigned_player_id = player_assigner.assign_ball_to_player(player_track, ball_bbox) if ball_bbox else -1
            ball_possession_log.append(assigned_player_id)

        events = []
        last_player_with_ball, pass_start_info = -1, {}
        for frame_num, current_player_id in enumerate(ball_possession_log):
            ball_pos_transformed = tracks['ball'][frame_num].get(1, {}).get('position_transformed')
            if not ball_pos_transformed: continue

            is_valid_pass = (current_player_id != last_player_with_ball and last_player_with_ball != -1 and current_player_id != -1)
            if is_valid_pass:
                start_player_team = tracks['players'][pass_start_info['frame']][last_player_with_ball].get('team')
                end_player_team = tracks['players'][frame_num].get(current_player_id, {}).get('team')
                if start_player_team == end_player_team and start_player_team is not None:
                    events.append({
                        "type_name": "Pass", "player_name": f"Player_{last_player_with_ball}",
                        "team_name": f"Team {start_player_team}", "x": pass_start_info['position'][0],
                        "y": pass_start_info['position'][1], "end_x": ball_pos_transformed[0],
                        "end_y": ball_pos_transformed[1], "minute": int(frame_num / (self.frame_rate * 60)),
                        "second": int((frame_num / self.frame_rate) % 60)
                    })
            
            if current_player_id != -1:
                pass_start_info = {'frame': frame_num, 'position': ball_pos_transformed}
            last_player_with_ball = current_player_id
            
        return pd.DataFrame(events)

# Other classes (TeamAssigner, PlayerBallAssigner, etc.)
class TeamAssigner:
    def __init__(self):
        self.team_colors, self.player_team_dict, self.kmeans = {}, {}, None
    def get_player_color(self, frame, bbox):
        image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
        if image.size == 0: return np.array([0,0,0])
        top_half = image[0:int(image.shape[0] / 2), :]
        if top_half.size == 0: return np.array([0,0,0])
        kmeans = KMeans(n_clusters=2, init="k-means++", n_init=1, random_state=0).fit(top_half.reshape(-1, 3))
        labels = kmeans.labels_.reshape(top_half.shape[0], top_half.shape[1])
        corner_clusters = [labels[0, 0], labels[0, -1], labels[-1, 0], labels[-1, -1]]
        non_player_cluster = max(set(corner_clusters), key=corner_clusters.count)
        return kmeans.cluster_centers_[1 - non_player_cluster]
    def assign_team_color(self, frame, player_detections):
        if not player_detections: return
        colors = [self.get_player_color(frame, det["bbox"]) for _, det in player_detections.items()]
        self.kmeans = KMeans(n_clusters=2, init="k-means++", n_init=10, random_state=0).fit(colors)
        self.team_colors[1], self.team_colors[2] = self.kmeans.cluster_centers_
    def get_player_team(self, frame, bbox, player_id):
        if player_id in self.player_team_dict: return self.player_team_dict[player_id]
        if self.kmeans is None: return 0
        color = self.get_player_color(frame, bbox)
        team_id = self.kmeans.predict(color.reshape(1, -1))[0] + 1
        self.player_team_dict[player_id] = team_id
        return team_id

class PlayerBallAssigner:
    def __init__(self): self.max_dist = 70
    def assign_ball_to_player(self, players, ball_bbox):
        if not ball_bbox: return -1
        ball_pos, min_dist, assigned_player = get_center_of_bbox(ball_bbox), float('inf'), -1
        for id, player in players.items():
            dist = measure_distance(get_foot_position(player['bbox']), ball_pos)
            if dist < self.max_dist and dist < min_dist: min_dist, assigned_player = dist, id
        return assigned_player

class CameraMovementEstimator:
    def __init__(self, frame):
        self.lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
        self.features = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    def get_camera_movement(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)
        movements = [[0, 0]] * len(frames)
        old_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
        old_features = cv2.goodFeaturesToTrack(old_gray, **self.features)
        for i in range(1, len(frames)):
            new_gray = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
            new_features, status, _ = cv2.calcOpticalFlowPyrLK(old_gray, new_gray, old_features, None, **self.lk_params)
            
            good_new = new_features[status==1]
            good_old = old_features[status==1]

            move_x, move_y = 0, 0
            if len(good_new) > 0:
                move_x, move_y = np.mean(good_old - good_new, axis=0).ravel()

            movements[i] = [move_x, move_y]
            old_gray = new_gray.copy()
            old_features = good_new.reshape(-1, 1, 2)
        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(movements, f)
        return movements
    def add_adjust_positions_to_tracks(self, tracks, movements):
        for type, obj_tracks in tracks.items():
            for i, track in enumerate(obj_tracks):
                for id, info in track.items():
                    info['position_adjusted'] = (info['position'][0] + movements[i][0], info['position'][1] + movements[i][1])

class ViewTransformer:
    def __init__(self):
        court_w, court_l = 34, 52.5
        self.pixel_verts = np.float32([[110, 1035], [265, 275], [910, 260], [1640, 915]])
        self.target_verts = np.float32([[0, court_w], [0, 0], [court_l, 0], [court_l, court_w]])
        self.transformer = cv2.getPerspectiveTransform(self.pixel_verts, self.target_verts)
    def transform_point(self, point):
        p = (int(point[0]), int(point[1]))
        is_inside = cv2.pointPolygonTest(self.pixel_verts, p, False) >= 0
        if not is_inside: return None
        reshaped = np.array(point).reshape(-1, 1, 2).astype(np.float32)
        transformed = cv2.perspectiveTransform(reshaped, self.transformer)
        return transformed.reshape(-1, 2)
    def add_transformed_position_to_tracks(self, tracks):
        for type, obj_tracks in tracks.items():
            for track in obj_tracks:
                for id, info in track.items():
                    pos = info.get('position_adjusted', info.get('position'))
                    if pos:
                        transformed = self.transform_point(pos)
                        info['position_transformed'] = transformed.squeeze().tolist() if transformed is not None else None

class SpeedAndDistanceEstimator:
    def __init__(self):
        self.frame_window, self.frame_rate = 24, 24
    def add_speed_and_distance_to_tracks(self, tracks):
        total_dist = {}
        for type, obj_tracks in tracks.items():
            if type not in ["players", "referees"]: continue
            for i in range(len(obj_tracks)):
                for id, info in obj_tracks[i].items():
                    if i > 0:
                        prev_info = tracks[type][i-1].get(id)
                        if prev_info and info.get('position_transformed') and prev_info.get('position_transformed'):
                            dist = measure_distance(info['position_transformed'], prev_info['position_transformed'])
                            total_dist[id] = total_dist.get(id, 0) + dist
                            speed_mps = dist * self.frame_rate
                            info['speed'] = speed_mps * 3.6 # km/h
                            info['distance'] = total_dist[id]
    def draw_speed_and_distance(self, frames, tracks):
        output_frames = []
        for i, frame in enumerate(frames):
            for type, obj_tracks in tracks.items():
                if type not in ["players", "referees"]: continue
                for id, info in obj_tracks[i].items():
                    if "speed" in info:
                        x, y = get_foot_position(info['bbox'])
                        cv2.putText(frame, f"{info['speed']:.1f} km/h", (x - 20, y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
            output_frames.append(frame)
        return output_frames

In [None]:
def main():
    # --- SETUP ---
    INPUT_VIDEO_PATH = "/kaggle/input/football-video2/CityUtdR.mp4"
    STUB_PATH = "/kaggle/working/tracks_stub.pkl"
    OUTPUT_VIDEO_PATH = "/kaggle/working/final_analysis_video-gemini.mp4"
    
    frames = read_video(INPUT_VIDEO_PATH)
    if not frames:
        print("Video file not found or could not be read. Check the path.")
        return None

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    fps = cap.get(cv2.CAP_PROP_FPS) or 24
    cap.release()

    # --- INITIALIZE ALL MODULES ---
    tracker = Tracker('yolov8x.pt')
    commentary_engine = ImprovedCommentaryEngine(fps=fps)
    camera_estimator = CameraMovementEstimator(frames[0])
    view_transformer = ViewTransformer()
    speed_estimator = SpeedAndDistanceEstimator()
    team_assigner = TeamAssigner()
    player_assigner = PlayerBallAssigner()
    ticker = RealTimeTicker(fps=fps)

    # --- STAGE 1: TRACKING ---
    print("Stage 1: Performing object detection and tracking...")
    tracks = tracker.get_object_tracks(frames, read_from_stub=False, stub_path=STUB_PATH)
    tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])
    tracker.add_position_to_tracks(tracks)
    
    # --- STAGE 2: MOTION & PERSPECTIVE ---
    print("Stage 2: Estimating camera motion and transforming perspective...")
    camera_movement = camera_estimator.get_camera_movement(frames)
    camera_estimator.add_adjust_positions_to_tracks(tracks, camera_movement)
    view_transformer.add_transformed_position_to_tracks(tracks)
    speed_estimator.add_speed_and_distance_to_tracks(tracks)
    
    # --- STAGE 3: TEAM ASSIGNMENT ---
    print("Stage 3: Assigning teams...")
    team_assigner.assign_team_color(frames[0], tracks['players'][0])
    
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        for player_id, track in player_track.items():
            team = team_assigner.get_player_team(frame, track['bbox'], player_id)
            tracks['players'][frame_num][player_id]['team'] = team
            tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors.get(team, (0,0,255))
    
    # --- STAGE 4: GENERATE EVENTS DATA ---
    print("Stage 4: Detecting events for commentary context...")
    event_detector = EventDetector()
    events_df = event_detector.detect_events(tracks)
    print(f"Detected {len(events_df)} events for commentary context")
    
    # --- STAGE 5: BALL POSSESSION & COMMENTARY ---
    print("Stage 5: Tracking ball possession and generating all commentary...")
    team_ball_control = []
    ticker_history = []
    gemini_history = []
    
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
        
        for player_id in tracks['players'][frame_num]:
            tracks['players'][frame_num][player_id]['has_ball'] = False
            
        assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)
        if assigned_player != -1:
            tracks['players'][frame_num][assigned_player]['has_ball'] = True
            team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
        else:
            team_ball_control.append(team_ball_control[-1] if team_ball_control else 0)
        
        ticker_history.append(ticker.update(tracks, frame_num))
        commentary_engine.update_with_context(frame, tracks, frame_num, events_df)
        gemini_history.append(commentary_engine.latest_commentary)
        
        if frame_num % 100 == 0:
            print(f"Commentary progress: {frame_num}/{len(frames)} frames")

    team_ball_control = np.array(team_ball_control)

    # --- STAGE 6: VISUALIZATION & SAVING ---
    print("Stage 6: Combining commentary and saving final video...")
    display_commentary = ticker_history.copy()
    last_gemini_comment = gemini_history[0]
    for i, comment in enumerate(gemini_history):
        if comment != last_gemini_comment:
            start_frame = max(0, i - commentary_engine.clip_length_frames)
            for j in range(start_frame, i):
                if j < len(display_commentary):
                    display_commentary[j] = comment
            last_gemini_comment = comment

    output_frames = []
    for frame_num, frame in enumerate(frames):
        frame_copy = frame.copy()
        current_commentary = display_commentary[frame_num] if frame_num < len(display_commentary) else " "
        
        player_dict = tracks["players"][frame_num]
        ball_dict = tracks.get("ball", [])[frame_num]
        
        for track_id, player in player_dict.items():
            color = player.get("team_color", (0, 0, 255))
            frame_copy = tracker._draw_player_ellipse(frame_copy, player["bbox"], color, track_id, player.get("jersey_number"))
            if player.get('has_ball', False):
                frame_copy = tracker._draw_triangle(frame_copy, player["bbox"], (0, 0, 255))
        
        if 1 in ball_dict:
            frame_copy = tracker._draw_triangle(frame_copy, ball_dict[1]["bbox"], (0, 255, 0))
            
        frame_copy = tracker._draw_team_ball_control(frame_copy, frame_num, team_ball_control)
        frame_copy = tracker._draw_commentary_overlay(frame_copy, current_commentary)
        output_frames.append(frame_copy)
    
    output_frames = speed_estimator.draw_speed_and_distance(output_frames, tracks)
    save_video(output_frames, OUTPUT_VIDEO_PATH)

    # --- FINAL STATISTICS ---
    print("\n" + "="*50)
    print("MATCH ANALYSIS COMPLETE")
    print("="*50)
    print(f"✅ Video saved to: {OUTPUT_VIDEO_PATH}")
    
    # ... (rest of main function)

if __name__ == "__main__":
    main()

# BLIP

In [13]:
# !pip install ultralytics supervision numpy opencv-python scikit-learn pandas
# !pip install --upgrade ultralytics torch torchvision
# !pip install mplsoccer transformers accelerate easyocr
# !pip install Pillow timm datasets diffusers


Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->datasets)
  Downloading fsspec-2025.3.0-py3-none-any.whl.metadata (11 kB)
Downloading fsspec-2025.3.0-py3-none-any.whl (193 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.6/193.6 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: fsspec
  Attempting uninstall: fsspec
    Found existing installation: fsspec 2025.5.1
    Uninstalling fsspec-2025.5.1:
      Successfully uninstalled fsspec-2025.5.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
bigframes 2.8.0 requires google-cloud-bigquery-storage<3.0.0,>=2.30.0, which is not installed.
cesium 0.12.4 requires numpy<3.0,>=2.0, but you have numpy 1.26.4 which is incompatible.
torchaudio 2.6.0+cu124 requires torch==2.6.0, but you have torch 2.8.0 which is incom

In [14]:
# !pip install paddlepaddle paddleocr
# !pip install torchvision-nightly --pre


Collecting paddlepaddle
  Downloading paddlepaddle-3.1.1-cp311-cp311-manylinux1_x86_64.whl.metadata (8.8 kB)
Collecting paddleocr
  Downloading paddleocr-3.2.0-py3-none-any.whl.metadata (29 kB)
Collecting opt_einsum==3.3.0 (from paddlepaddle)
  Downloading opt_einsum-3.3.0-py3-none-any.whl.metadata (6.5 kB)
Collecting paddlex<3.3.0,>=3.2.0 (from paddlex[ocr-core]<3.3.0,>=3.2.0->paddleocr)
  Downloading paddlex-3.2.0-py3-none-any.whl.metadata (80 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m80.5/80.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
Collecting aistudio_sdk>=0.3.5 (from paddlex<3.3.0,>=3.2.0->paddlex[ocr-core]<3.3.0,>=3.2.0->paddleocr)
  Downloading aistudio_sdk-0.3.5-py3-none-any.whl.metadata (1.0 kB)
Collecting modelscope>=1.28.0 (from paddlex<3.3.0,>=3.2.0->paddlex[ocr-core]<3.3.0,>=3.2.0->paddleocr)
  Downloading modelscope-1.29.1-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31

In [15]:
# import sys
# import os
# import cv2
# import pickle
# import pandas as pd
# import numpy as np
# import matplotlib.pyplot as plt
# from collections import deque
# import time
# import math
# from typing import Dict, List, Tuple, Optional

# # Machine Learning & Computer Vision Libraries
# from ultralytics import YOLO
# import supervision as sv
# from sklearn.cluster import KMeans
# import easyocr
# try:
#     from paddleocr import PaddleOCR
# except ImportError:
#     PaddleOCR = None

# # Advanced Vision Models
# from transformers import AutoModelForCausalLM, AutoTokenizer, AutoProcessor
# from transformers import BlipProcessor, BlipForConditionalGeneration
# from transformers import Blip2Processor, Blip2ForConditionalGeneration
# import torch
# from PIL import Image, ImageEnhance, ImageFilter

# # Plotting for Heatmaps
# from mplsoccer import Pitch

# # --- Enhanced Video Utilities ---
# def read_video(video_path):
#     """Reads a video file and returns a list of its frames."""
#     cap = cv2.VideoCapture(video_path)
#     frames = []
#     total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
#     while True:
#         ret, frame = cap.read()
#         if not ret:
#             break
#         frames.append(frame)
    
#     cap.release()
#     print(f"✅ Loaded {len(frames)}/{total_frames} frames from video")
#     return frames

# def save_video(output_video_frames, output_video_path, fps=24):
#     """Saves a list of frames as a video file with enhanced quality."""
#     if not output_video_frames:
#         print("No frames to save.")
#         return
    
#     height, width, channels = output_video_frames[0].shape
#     fourcc = cv2.VideoWriter_fourcc(*'H264')  # Better quality codec
#     out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
#     for frame in output_video_frames:
#         out.write(frame)
#     out.release()
#     print(f"✅ Video saved to: {output_video_path}")

# # --- Enhanced BBox Utilities ---
# def get_center_of_bbox(bbox):
#     x1, y1, x2, y2 = bbox
#     return int((x1 + x2) / 2), int((y1 + y2) / 2)

# def get_bbox_width(bbox):
#     return int(bbox[2] - bbox[0])

# def get_bbox_height(bbox):
#     return int(bbox[3] - bbox[1])

# def get_bbox_area(bbox):
#     return get_bbox_width(bbox) * get_bbox_height(bbox)

# def get_foot_position(bbox):
#     x1, y1, x2, y2 = bbox
#     return int((x1 + x2) / 2), int(y2)
 
# def measure_distance(p1, p2):
#     return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5

# def measure_xy_distance(p1, p2):
#     return p1[0] - p2[0], p1[1] - p2[1]

# def calculate_angle(p1, p2):
#     """Calculate angle between two points."""
#     dx = p2[0] - p1[0]
#     dy = p2[1] - p1[1]
#     return math.atan2(dy, dx) * 180 / math.pi

# def enhance_image_for_ocr(image):
#     """Enhance image quality for better OCR results."""
#     if len(image.shape) == 3:
#         gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
#     else:
#         gray = image.copy()
    
#     # Apply multiple enhancement techniques
#     # 1. Contrast enhancement
#     clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
#     enhanced = clahe.apply(gray)
    
#     # 2. Noise reduction
#     denoised = cv2.bilateralFilter(enhanced, 9, 75, 75)
    
#     # 3. Sharpening
#     kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
#     sharpened = cv2.filter2D(denoised, -1, kernel)
    
#     # 4. Morphological operations to clean up
#     kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
#     cleaned = cv2.morphologyEx(sharpened, cv2.MORPH_CLOSE, kernel)
    
#     return cleaned


In [16]:
# class AdvancedJerseyNumberRecognizer:
#     def __init__(self):
#         # Initialize multiple OCR engines for better accuracy
#         self.easyocr_reader = easyocr.Reader(['en'], gpu=True)
        
#         # Initialize PaddleOCR if available
#         self.paddleocr_reader = None
#         if PaddleOCR:
#             try:
#                 self.paddleocr_reader = PaddleOCR(use_angle_cls=True, lang='en', use_gpu=True)
#                 print("✅ PaddleOCR initialized successfully")
#             except:
#                 print("⚠️ PaddleOCR initialization failed, using EasyOCR only")
        
#         self.jersey_cache = {}
#         self.confidence_threshold = 0.5
#         self.number_history = {}  # Track number consistency
#         print("✅ Advanced Jersey OCR module initialized.")

#     def preprocess_jersey_crop(self, player_crop):
#         """Enhanced preprocessing for jersey number detection."""
#         if player_crop.size == 0:
#             return None, None
            
#         # Focus on upper torso area where jersey numbers typically are
#         height, width = player_crop.shape[:2]
        
#         # Multiple crop regions to try
#         regions = [
#             # Center chest area
#             (int(width*0.2), int(height*0.1), int(width*0.8), int(height*0.6)),
#             # Front chest area
#             (int(width*0.3), int(height*0.15), int(width*0.7), int(height*0.5)),
#             # Full upper body
#             (0, 0, width, int(height*0.7))
#         ]
        
#         processed_regions = []
#         for x1, y1, x2, y2 in regions:
#             if x2 > x1 and y2 > y1 and x2 <= width and y2 <= height:
#                 region = player_crop[y1:y2, x1:x2]
#                 if region.size > 0:
#                     enhanced = enhance_image_for_ocr(region)
#                     processed_regions.append(enhanced)
        
#         return processed_regions

#     def recognize_with_easyocr(self, image_regions):
#         """Use EasyOCR to detect jersey numbers."""
#         best_result = None
#         best_confidence = 0
        
#         for region in image_regions:
#             try:
#                 results = self.easyocr_reader.readtext(
#                     region, 
#                     allowlist='0123456789', 
#                     detail=1,
#                     width_ths=0.7,
#                     height_ths=0.7
#                 )
                
#                 for (bbox, text, confidence) in results:
#                     # Validate jersey number format
#                     if (confidence > self.confidence_threshold and 
#                         text.isdigit() and 
#                         1 <= len(text) <= 2 and
#                         1 <= int(text) <= 99):
                        
#                         if confidence > best_confidence:
#                             best_confidence = confidence
#                             best_result = text
                            
#             except Exception as e:
#                 continue
                
#         return best_result, best_confidence

#     def recognize_with_paddleocr(self, image_regions):
#         """Use PaddleOCR to detect jersey numbers."""
#         if not self.paddleocr_reader:
#             return None, 0
            
#         best_result = None
#         best_confidence = 0
        
#         for region in image_regions:
#             try:
#                 results = self.paddleocr_reader.ocr(region, cls=True)
                
#                 if results and results[0]:
#                     for line in results[0]:
#                         if len(line) == 2:
#                             bbox, (text, confidence) = line
                            
#                             # Clean and validate text
#                             text = ''.join(filter(str.isdigit, text))
                            
#                             if (confidence > self.confidence_threshold and 
#                                 text.isdigit() and 
#                                 1 <= len(text) <= 2 and
#                                 1 <= int(text) <= 99):
                                
#                                 if confidence > best_confidence:
#                                     best_confidence = confidence
#                                     best_result = text
                                    
#             except Exception as e:
#                 continue
                
#         return best_result, best_confidence

#     def validate_number_consistency(self, tracker_id, detected_number):
#         """Validate number consistency across frames."""
#         if tracker_id not in self.number_history:
#             self.number_history[tracker_id] = {}
            
#         if detected_number in self.number_history[tracker_id]:
#             self.number_history[tracker_id][detected_number] += 1
#         else:
#             self.number_history[tracker_id][detected_number] = 1
            
#         # Return most frequent number if we have enough samples
#         total_detections = sum(self.number_history[tracker_id].values())
#         if total_detections >= 3:
#             most_common = max(self.number_history[tracker_id], 
#                             key=self.number_history[tracker_id].get)
#             frequency = self.number_history[tracker_id][most_common] / total_detections
            
#             if frequency >= 0.6:  # 60% consistency threshold
#                 return most_common
                
#         return detected_number

#     def recognize_jersey_number(self, player_crop, tracker_id, frame_num=0):
#         """Enhanced jersey number recognition with multiple OCR engines."""
        
#         # Check cache first
#         if tracker_id in self.jersey_cache:
#             cached_result = self.jersey_cache[tracker_id]
#             # Re-validate periodically
#             if frame_num % 60 != 0:  # Re-check every 60 frames
#                 return cached_result
        
#         # Preprocess the crop
#         image_regions = self.preprocess_jersey_crop(player_crop)
#         if not image_regions:
#             return f"P{tracker_id}"
            
#         # Try multiple OCR engines
#         results = []
        
#         # EasyOCR
#         easy_result, easy_conf = self.recognize_with_easyocr(image_regions)
#         if easy_result:
#             results.append((easy_result, easy_conf, "EasyOCR"))
            
#         # PaddleOCR
#         paddle_result, paddle_conf = self.recognize_with_paddleocr(image_regions)
#         if paddle_result:
#             results.append((paddle_result, paddle_conf, "PaddleOCR"))
        
#         # Select best result
#         if results:
#             # Sort by confidence
#             results.sort(key=lambda x: x[1], reverse=True)
#             best_number = results[0][0]
            
#             # Validate consistency
#             validated_number = self.validate_number_consistency(tracker_id, best_number)
            
#             # Cache the result
#             self.jersey_cache[tracker_id] = validated_number
#             return validated_number
        
#         # Return player ID if no number detected
#         return f"P{tracker_id}"

#     def get_detection_stats(self):
#         """Get statistics about jersey number detection."""
#         total_tracked = len(self.number_history)
#         successful_detections = len([k for k, v in self.jersey_cache.items() 
#                                    if not v.startswith('P')])
        
#         return {
#             'total_players': total_tracked,
#             'successful_detections': successful_detections,
#             'detection_rate': successful_detections / max(1, total_tracked) * 100
#         }


In [18]:
# class AdvancedCommentaryEngine:
#     def __init__(self, clip_duration_seconds=3, fps=24):
#         self.clip_length_frames = int(clip_duration_seconds * fps)
#         self.frame_buffer = deque(maxlen=self.clip_length_frames)
#         self.latest_commentary = "Match analysis is starting..."
#         self.fps = fps
#         self.frame_count = 0
        
#         # Enhanced context tracking
#         self.match_context = {
#             'possession_changes': [], 
#             'recent_events': [],
#             'ball_position_history': deque(maxlen=30),
#             'player_movements': {},
#             'formation_changes': [],
#             'speed_events': [],
#             'tactical_events': []
#         }
        
#         print("🎙️ Initializing Advanced Vision Commentary Engine...")
        
#         # Initialize multiple models for different aspects
#         self.models = {}
#         self._initialize_models()
        
#     def _initialize_models(self):
#         """Initialize multiple vision-language models for comprehensive analysis."""
#         try:
#             # Primary captioning model - BLIP2 for detailed scene understanding
#             print("Loading BLIP2 for detailed scene analysis...")
#             self.blip2_processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
#             self.blip2_model = Blip2ForConditionalGeneration.from_pretrained(
#                 "Salesforce/blip2-opt-2.7b",
#                 torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
#                 device_map="auto" if torch.cuda.is_available() else None
#             )
#             self.models['blip2'] = True
#             print("✅ BLIP2 model loaded successfully")
            
#         except Exception as e:
#             print(f"⚠️ Failed to load BLIP2: {e}")
#             self.models['blip2'] = False
            
#         try:
#             # Secondary model - BLIP for quick analysis
#             print("Loading BLIP for quick analysis...")
#             self.blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
#             self.blip_model = BlipForConditionalGeneration.from_pretrained(
#                 "Salesforce/blip-image-captioning-base",
#                 torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
#             )
#             if torch.cuda.is_available():
#                 self.blip_model = self.blip_model.cuda()
#             self.models['blip'] = True
#             print("✅ BLIP model loaded successfully")
            
#         except Exception as e:
#             print(f"⚠️ Failed to load BLIP: {e}")
#             self.models['blip'] = False
            
#         self.device = "cuda" if torch.cuda.is_available() else "cpu"

#     def _analyze_tactical_situation(self, tracks_data, frame_num):
#         """Analyze tactical situation from tracking data."""
#         tactical_info = {
#             'formation_analysis': None,
#             'pressure_zones': [],
#             'attacking_patterns': None,
#             'defensive_shape': None
#         }
        
#         try:
#             player_positions = []
#             team_positions = {1: [], 2: []}
            
#             for player_id, player_data in tracks_data['players'][frame_num].items():
#                 if 'position_transformed' in player_data and player_data['position_transformed']:
#                     pos = player_data['position_transformed']
#                     team = player_data.get('team', 0)
                    
#                     if team in [1, 2]:
#                         team_positions[team].append(pos)
                        
#             # Analyze team formations
#             for team_id, positions in team_positions.items():
#                 if len(positions) >= 7:  # Need enough players for formation analysis
#                     # Simple formation detection based on y-coordinates
#                     y_coords = [pos[1] for pos in positions]
#                     y_coords.sort()
                    
#                     # Detect defensive, midfield, and attacking lines
#                     defensive_line = np.mean(y_coords[:3]) if len(y_coords) >= 3 else None
#                     midfield_line = np.mean(y_coords[3:6]) if len(y_coords) >= 6 else None
#                     attacking_line = np.mean(y_coords[6:]) if len(y_coords) > 6 else None
                    
#                     tactical_info[f'team_{team_id}_formation'] = {
#                         'defensive_line': defensive_line,
#                         'midfield_line': midfield_line,
#                         'attacking_line': attacking_line
#                     }
                    
#         except Exception as e:
#             print(f"Tactical analysis error: {e}")
            
#         return tactical_info

#     def _detect_micro_events(self, tracks_data, frame_num):
#         """Detect micro-events from tracking data."""
#         events = []
        
#         try:
#             # Ball speed analysis
#             if frame_num > 0:
#                 current_ball = tracks_data['ball'][frame_num].get(1, {})
#                 prev_ball = tracks_data['ball'][frame_num-1].get(1, {})
                
#                 if (current_ball.get('position_transformed') and 
#                     prev_ball.get('position_transformed')):
                    
#                     ball_speed = measure_distance(
#                         current_ball['position_transformed'],
#                         prev_ball['position_transformed']
#                     ) * self.fps  # Speed in units/second
                    
#                     if ball_speed > 15:  # High speed threshold
#                         events.append(f"Fast ball movement detected (speed: {ball_speed:.1f})")
#                     elif ball_speed < 0.5:  # Very slow/stationary
#                         events.append("Ball nearly stationary")
            
#             # Player clustering analysis
#             player_positions = []
#             for player_id, player_data in tracks_data['players'][frame_num].items():
#                 if 'position_transformed' in player_data and player_data['position_transformed']:
#                     player_positions.append(player_data['position_transformed'])
            
#             if len(player_positions) > 6:
#                 # Detect player clusters
#                 from sklearn.cluster import DBSCAN
#                 clustering = DBSCAN(eps=5, min_samples=3).fit(player_positions)
#                 n_clusters = len(set(clustering.labels_)) - (1 if -1 in clustering.labels_ else 0)
                
#                 if n_clusters >= 3:
#                     events.append(f"Multiple player clusters detected ({n_clusters} groups)")
#                 elif n_clusters == 1:
#                     events.append("Players tightly clustered together")
                    
#         except Exception as e:
#             pass  # Silently handle micro-event detection errors
            
#         return events

#     def update_with_context(self, frame, tracks_data, frame_num, events_data=None):
#         """Enhanced context update with micro-event detection."""
#         if not any(self.models.values()):
#             return
            
#         self.frame_count += 1
        
#         # Extract enhanced game context
#         game_context = self._extract_enhanced_game_context(tracks_data, frame_num, events_data)
        
#         # Detect tactical situation
#         tactical_info = self._analyze_tactical_situation(tracks_data, frame_num)
#         game_context['tactical_analysis'] = tactical_info
        
#         # Detect micro-events
#         micro_events = self._detect_micro_events(tracks_data, frame_num)
#         game_context['micro_events'] = micro_events
        
#         # Update context history
#         self.match_context['recent_events'].append(game_context)
#         if len(self.match_context['recent_events']) > 15:
#             self.match_context['recent_events'].pop(0)
        
#         # Add frame to buffer
#         self.frame_buffer.append(frame)
        
#         # Generate commentary when buffer is full or at regular intervals
#         if (len(self.frame_buffer) == self.clip_length_frames or 
#             self.frame_count % 60 == 0):  # Every 60 frames as backup
            
#             print(f"Generating enhanced commentary... (Frame {frame_num})")
#             new_comment = self._generate_enhanced_commentary(game_context)
#             if new_comment:
#                 self.latest_commentary = new_comment
            
#             # Clear buffer periodically to prevent memory issues
#             if len(self.frame_buffer) >= self.clip_length_frames:
#                 self.frame_buffer.clear()

#     def _extract_enhanced_game_context(self, tracks_data, frame_num, events_data):
#         """Extract comprehensive game context."""
#         context = {
#             'frame_num': frame_num,
#             'timestamp': f"{int(frame_num / (self.fps * 60))}:{int((frame_num / self.fps) % 60):02d}",
#             'players_detected': len(tracks_data['players'][frame_num]),
#             'ball_detected': 1 in tracks_data['ball'][frame_num],
#             'possession': None,
#             'ball_speed': 0,
#             'recent_events': [],
#             'player_speeds': {},
#             'formation_info': {},
#             'pressure_areas': []
#         }
        
#         # Enhanced possession analysis
#         max_speed = 0
#         for player_id, player_info in tracks_data['players'][frame_num].items():
#             if player_info.get('has_ball', False):
#                 context['possession'] = {
#                     'player_id': player_id,
#                     'team': player_info.get('team', 'Unknown'),
#                     'position': player_info.get('position_transformed'),
#                     'jersey_number': player_info.get('jersey_number', f'P{player_id}')
#                 }
#                 break
            
#             # Track player speeds
#             speed = player_info.get('speed', 0)
#             if speed > max_speed:
#                 max_speed = speed
#             context['player_speeds'][player_id] = speed
        
#         context['max_player_speed'] = max_speed
        
#         # Ball position history for movement analysis
#         if 1 in tracks_data['ball'][frame_num]:
#             ball_pos = tracks_data['ball'][frame_num][1].get('position_transformed')
#             if ball_pos:
#                 self.match_context['ball_position_history'].append(ball_pos)
        
#         # Recent events from event detector
#         if events_data is not None and not events_data.empty:
#             recent_events = events_data[
#                 (events_data['minute'] * 60 + events_data['second']) >= (frame_num / self.fps - 15)
#             ].tail(5)
#             context['recent_events'] = recent_events.to_dict('records')
        
#         return context

#     def _create_enhanced_prompt(self, context, micro_events):
#         """Create comprehensive prompt for detailed analysis."""
#         possession_info = "Ball possession unclear"
#         if context.get('possession'):
#             poss = context['possession']
#             possession_info = f"Player {poss['jersey_number']} (Team {poss['team']}) has possession"
        
#         # Compile micro-events
#         micro_event_text = "; ".join(micro_events[:3]) if micro_events else "No micro-events detected"
        
#         # Speed information
#         speed_info = f"Max player speed: {context.get('max_player_speed', 0):.1f} km/h"
        
#         prompt = f"""You are an expert football analyst with deep tactical knowledge. Analyze this football match scene in detail.

# CURRENT SITUATION:
# - Match Time: {context['timestamp']}
# - {possession_info}
# - Players on field: {context['players_detected']}
# - {speed_info}
# - Micro-events: {micro_event_text}
# - Recent match events: {self._format_recent_events(context.get('recent_events', []))}

# ANALYSIS REQUIREMENTS:
# 1. Describe the immediate tactical situation
# 2. Identify any significant player movements, formations, or patterns
# 3. Note any pressing, attacking moves, defensive actions, or transitions
# 4. Mention speed of play, player positioning, and ball movement
# 5. Focus on tactical elements: space utilization, player roles, team shape
# 6. Include jersey numbers when mentioning specific players
# 7. Be concise but comprehensive (max 35 words)

# Provide your tactical analysis:"""
        
#         return prompt

#     def _generate_enhanced_commentary(self, game_context):
#         """Generate detailed commentary using multiple models."""
#         if not self.frame_buffer:
#             return self._generate_fallback_commentary(game_context)
            
#         try:
#             # Use the middle frame for analysis
#             middle_frame_idx = len(self.frame_buffer) // 2
#             frame = self.frame_buffer[middle_frame_idx]
            
#             # Convert to PIL Image
#             frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#             pil_image = Image.fromarray(frame_rgb)
            
#             # Enhance image quality
#             enhancer = ImageEnhance.Contrast(pil_image)
#             pil_image = enhancer.enhance(1.2)
#             enhancer = ImageEnhance.Sharpness(pil_image)
#             pil_image = enhancer.enhance(1.1)
            
#             # Get micro-events
#             micro_events = game_context.get('micro_events', [])
            
#             # Generate commentary using available models
#             commentary_parts = []
            
#             # Try BLIP2 first for detailed analysis
#             if self.models.get('blip2', False):
#                 try:
#                     prompt = self._create_enhanced_prompt(game_context, micro_events)
                    
#                     inputs = self.blip2_processor(pil_image, prompt, return_tensors="pt")
#                     if torch.cuda.is_available():
#                         inputs = {k: v.cuda() for k, v in inputs.items()}
                    
#                     with torch.no_grad():
#                         outputs = self.blip2_model.generate(
#                             **inputs,
#                             max_new_tokens=50,
#                             do_sample=True,
#                             temperature=0.7,
#                             top_p=0.9,
#                             num_beams=3
#                         )
                    
#                     commentary = self.blip2_processor.decode(outputs[0], skip_special_tokens=True)
#                     # Clean up the output
#                     commentary = commentary.replace(prompt, "").strip()
#                     if commentary:
#                         commentary_parts.append(commentary)
                        
#                 except Exception as e:
#                     print(f"BLIP2 generation error: {e}")
            
#             # Fallback to BLIP
#             if not commentary_parts and self.models.get('blip', False):
#                 try:
#                     inputs = self.blip_processor(pil_image, return_tensors="pt")
#                     if torch.cuda.is_available():
#                         inputs = {k: v.cuda() for k, v in inputs.items()}
                    
#                     with torch.no_grad():
#                         outputs = self.blip_model.generate(
#                             **inputs,
#                             max_new_tokens=30,
#                             do_sample=True,
#                             temperature=0.8
#                         )
                    
#                     caption = self.blip_processor.decode(outputs[0], skip_special_tokens=True)
                    
#                     # Enhance caption with context
#                     if game_context.get('possession'):
#                         poss = game_context['possession']
#                         enhanced_caption = f"{caption}. {poss['jersey_number']} (Team {poss['team']}) has possession."
#                     else:
#                         enhanced_caption = caption
                        
#                     commentary_parts.append(enhanced_caption)
                    
#                 except Exception as e:
#                     print(f"BLIP generation error: {e}")
            
#             # Combine commentary parts
#             if commentary_parts:
#                 final_commentary = " ".join(commentary_parts)
#                 # Add micro-events if significant
#                 if len(micro_events) > 0:
#                     final_commentary += f" {micro_events[0]}"
                    
#                 return final_commentary[:150]  # Limit length
                
#         except Exception as e:
#             print(f"Enhanced commentary generation error: {e}")
            
#         return self._generate_fallback_commentary(game_context)

#     def _format_recent_events(self, events):
#         """Format recent events for prompt."""
#         if not events:
#             return "No recent significant events"
        
#         formatted = []
#         for event in events[-3:]:
#             if isinstance(event, dict):
#                 event_type = event.get('type_name', 'Action')
#                 team = event.get('team_name', 'Team')
#                 formatted.append(f"{event_type} by {team}")
        
#         return "; ".join(formatted) if formatted else "No recent significant events"

#     def _generate_fallback_commentary(self, context):
#         """Generate fallback commentary when models fail."""
#         if context.get('possession'):
#             poss = context['possession']
#             return f"Play continues with {poss['jersey_number']} (Team {poss['team']}) controlling the ball."
        
#         max_speed = context.get('max_player_speed', 0)
#         if max_speed > 20:
#             return f"High-intensity play with players reaching {max_speed:.1f} km/h."
#         elif context.get('players_detected', 0) > 15:
#             return "Dense midfield battle with multiple players involved."
#         else:
#             return "Match continues with tactical positioning and ball movement."

# class RealTimeTicker:
#     """Enhanced real-time ticker with more detailed events."""
#     def __init__(self, fps=24):
#         self.fps = fps
#         self.last_player_id = -1
#         self.last_team_id = -1
#         self.ticker_text = "⚽ Match begins!"
#         self.text_display_frames = 0
#         self.event_history = deque(maxlen=10)
        
#     def _get_ball_carrier(self, player_track):
#         for player_id, data in player_track.items():
#             if data.get('has_ball', False):
#                 jersey_num = data.get('jersey_number', f'P{player_id}')
#                 return player_id, data.get('team'), jersey_num
#         return -1, -1, None

#     def _detect_advanced_events(self, tracks, frame_num):
#         """Detect advanced events like sprints, clusters, etc."""
#         events = []
        
#         # Detect high-speed movements
#         high_speed_players = []
#         for player_id, player_data in tracks['players'][frame_num].items():
#             speed = player_data.get('speed', 0)
#             if speed > 25:  # High speed threshold
#                 jersey = player_data.get('jersey_number', f'P{player_id}')
#                 team = player_data.get('team', '?')
#                 high_speed_players.append(f"{jersey}(T{team})")
        
#         if high_speed_players:
#             events.append(f"🏃 Sprint: {', '.join(high_speed_players[:2])}")
        
#         return events

#     def update(self, tracks, frame_num):
#         """Enhanced ticker update with more event types."""
#         if self.text_display_frames > 0:
#             self.text_display_frames -= 1
#             return self.ticker_text
        
#         player_track = tracks['players'][frame_num]
#         current_player_id, current_team_id, current_jersey = self._get_ball_carrier(player_track)

#         # Pass detection
#         if (current_player_id != -1 and self.last_player_id != -1 and 
#             current_player_id != self.last_player_id and current_team_id == self.last_team_id):
            
#             last_jersey = None
#             if self.last_player_id in player_track:
#                 last_jersey = player_track[self.last_player_id].get('jersey_number', f'P{self.last_player_id}')
            
#             self.ticker_text = f"⚽ Pass: {last_jersey or self.last_player_id} → {current_jersey} (Team {current_team_id})"
#             self.text_display_frames = self.fps * 2
        
#         # Possession change
#         elif current_player_id != -1 and self.last_team_id != -1 and current_team_id != self.last_team_id:
#             self.ticker_text = f"🔄 Team {current_team_id} gains possession! ({current_jersey})"
#             self.text_display_frames = self.fps * 3
        
#         # Advanced events
#         else:
#             advanced_events = self._detect_advanced_events(tracks, frame_num)
#             if advanced_events:
#                 self.ticker_text = advanced_events[0]
#                 self.text_display_frames = self.fps * 2
#             elif current_player_id != -1:
#                 self.ticker_text = f"⚽ {current_jersey} (Team {current_team_id}) on the ball"
#             else:
#                 self.ticker_text = "🔍 Ball is loose"

#         # Update tracking
#         if current_player_id != -1:
#             self.last_player_id = current_player_id
#             self.last_team_id = current_team_id
#         else:
#             self.last_player_id = -1
        
#         return self.ticker_text


In [19]:
# class EnhancedTracker:
#     def __init__(self, model_name='yolov8x.pt'):
#         self.model = YOLO(model_name)
#         self.tracker = sv.ByteTrack()
#         self.jersey_recognizer = AdvancedJerseyNumberRecognizer()
#         self.confidence_threshold = 0.3  # Lowered for better detection
        
#         # Enhanced tracking parameters
#         self.tracker_params = {
#             'track_thresh': 0.25,
#             'track_buffer': 60,
#             'match_thresh': 0.8,
#             'frame_rate': 24
#         }

#     def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
#         """Enhanced object tracking with better detection."""
#         if read_from_stub and stub_path and os.path.exists(stub_path):
#             with open(stub_path, 'rb') as f:
#                 print("📁 Loading tracks from cache...")
#                 return pickle.load(f)

#         tracks = {"players": [], "referees": [], "ball": []}
        
#         print(f"🎯 Starting enhanced tracking on {len(frames)} frames...")
        
#         for frame_num, frame in enumerate(frames):
#             if frame_num % 50 == 0:
#                 print(f"📊 Processing frame {frame_num}/{len(frames)} "
#                       f"({frame_num/len(frames)*100:.1f}%)")
                
#             # Enhanced YOLO prediction
#             results = self.model.predict(
#                 frame, 
#                 conf=self.confidence_threshold,
#                 iou=0.7,
#                 classes=[0, 32],  # person and sports ball
#                 verbose=False
#             )[0]
            
#             detections = sv.Detections.from_ultralytics(results)
            
#             # Enhanced player detection and tracking
#             player_detections = detections[detections.class_id == 0]
            
#             # Filter out very small detections (likely false positives)
#             if len(player_detections) > 0:
#                 areas = []
#                 for i, bbox in enumerate(player_detections.xyxy):
#                     area = get_bbox_area(bbox)
#                     areas.append(area)
                
#                 # Keep only reasonably sized detections
#                 min_area = np.median(areas) * 0.3 if areas else 0
#                 valid_indices = [i for i, area in enumerate(areas) if area >= min_area]
                
#                 if valid_indices:
#                     player_detections = player_detections[valid_indices]
            
#             tracked_players = self.tracker.update_with_detections(player_detections)
            
#             tracks["players"].append({})
#             tracks["referees"].append({})
            
#             # Process tracked players with enhanced jersey recognition
#             for detection_data in tracked_players:
#                 bbox = detection_data[0]
#                 track_id = detection_data[4]
                
#                 # Extract player crop with padding
#                 x1, y1, x2, y2 = map(int, bbox)
                
#                 # Add padding for better OCR
#                 padding = 5
#                 x1 = max(0, x1 - padding)
#                 y1 = max(0, y1 - padding)
#                 x2 = min(frame.shape[1], x2 + padding)
#                 y2 = min(frame.shape[0], y2 + padding)
                
#                 player_crop = frame[y1:y2, x1:x2]
                
#                 # Enhanced jersey number recognition
#                 jersey_num = self.jersey_recognizer.recognize_jersey_number(
#                     player_crop, track_id, frame_num
#                 )
                
#                 tracks["players"][frame_num][track_id] = {
#                     "bbox": bbox.tolist(),
#                     "jersey_number": jersey_num,
#                     "detection_confidence": detection_data[1] if len(detection_data) > 1 else 0.0
#                 }

#             # Enhanced ball detection
#             ball_detections = detections[detections.class_id == 32]
#             tracks["ball"].append({})
            
#             if len(ball_detections) > 0:
#                 # If multiple ball detections, choose the most confident one
#                 best_idx = np.argmax(ball_detections.confidence)
#                 best_ball_bbox = ball_detections.xyxy[best_idx]
#                 best_ball_conf = ball_detections.confidence[best_idx]
                
#                 tracks["ball"][frame_num][1] = {
#                     "bbox": best_ball_bbox.tolist(),
#                     "confidence": float(best_ball_conf)
#                 }
        
#         # Print detection statistics
#         jersey_stats = self.jersey_recognizer.get_detection_stats()
#         print(f"🏆 Jersey Detection Stats: {jersey_stats['successful_detections']}/{jersey_stats['total_players']} "
#               f"({jersey_stats['detection_rate']:.1f}% success rate)")
        
#         if stub_path:
#             with open(stub_path, 'wb') as f:
#                 pickle.dump(tracks, f)
#             print(f"💾 Tracks saved to cache: {stub_path}")
            
#         return tracks

#     def add_position_to_tracks(self, tracks):
#         """Add position information to tracks."""
#         for type, obj_tracks in tracks.items():
#             for frame_num, track in enumerate(obj_tracks):
#                 for id, info in track.items():
#                     bbox = info['bbox']
#                     if type == 'ball':
#                         info['position'] = get_center_of_bbox(bbox)
#                     else:
#                         info['position'] = get_foot_position(bbox)
    
#     def interpolate_ball_positions(self, ball_positions):
#         """Enhanced ball position interpolation."""
#         ball_bboxes = []
#         confidences = []
        
#         for frame_data in ball_positions:
#             if 1 in frame_data:
#                 ball_bboxes.append(frame_data[1]['bbox'])
#                 confidences.append(frame_data[1].get('confidence', 1.0))
#             else:
#                 ball_bboxes.append([])
#                 confidences.append(0.0)
        
#         df = pd.DataFrame(ball_bboxes, columns=['x1', 'y1', 'x2', 'y2'])
        
#         # More sophisticated interpolation
#         df_interpolated = df.interpolate(method='cubic').bfill().ffill()
        
#         # Rebuild ball positions with interpolated data
#         result = []
#         for i, (_, row) in enumerate(df_interpolated.iterrows()):
#             if not row.isna().any():
#                 result.append({
#                     1: {
#                         "bbox": row.tolist(),
#                         "confidence": confidences[i],
#                         "interpolated": i >= len(ball_positions) or 1 not in ball_positions[i]
#                     }
#                 })
#             else:
#                 result.append({})
                
#         return result

#     def _draw_enhanced_player_ellipse(self, frame, bbox, color, track_id, jersey_num, has_ball=False):
#         """Enhanced player visualization with more information."""
#         y2 = int(bbox[3])
#         x_center, _ = get_center_of_bbox(bbox)
#         width = get_bbox_width(bbox)
        
#         # Draw ellipse
#         ellipse_color = (0, 255, 0) if has_ball else color
#         cv2.ellipse(
#             frame, 
#             center=(x_center, y2), 
#             axes=(int(width), int(0.35 * width)), 
#             angle=0.0, 
#             startAngle=-45, 
#             endAngle=235, 
#             color=ellipse_color, 
#             thickness=3, 
#             lineType=cv2.LINE_AA
#         )
        
#         # Enhanced label
#         if jersey_num and not jersey_num.startswith('P'):
#             label = f"#{jersey_num}"
#             label_color = (255, 255, 255)
#         else:
#             label = f"ID{track_id}"
#             label_color = (200, 200, 200)
        
#         # Add ball possession indicator
#         if has_ball:
#             label = f"⚽{label}"
        
#         (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
#         rect_w, rect_h = w + 12, h + 12
#         x1_rect, y1_rect = x_center - rect_w//2, (y2 - rect_h//2) + 15
        
#         # Enhanced label background
#         cv2.rectangle(frame, (x1_rect, y1_rect), (x1_rect + rect_w, y1_rect + rect_h), 
#                      ellipse_color, cv2.FILLED)
#         cv2.rectangle(frame, (x1_rect, y1_rect), (x1_rect + rect_w, y1_rect + rect_h), 
#                      (0, 0, 0), 2)
        
#         cv2.putText(frame, label, (x1_rect + 6, y1_rect + h + 6), 
#                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, label_color, 2)
        
#         return frame

#     def _draw_enhanced_triangle(self, frame, bbox, color, label="BALL"):
#         """Enhanced ball/object visualization."""
#         y, x = int(bbox[1]), int(get_center_of_bbox(bbox)[0])
        
#         # Larger, more visible triangle
#         points = np.array([[x, y], [x - 15, y - 25], [x + 15, y - 25]])
        
#         # Draw filled triangle
#         cv2.drawContours(frame, [points], 0, color, cv2.FILLED)
#         cv2.drawContours(frame, [points], 0, (0, 0, 0), 2)
        
#         # Add label
#         cv2.putText(frame, label, (x - 20, y - 30), cv2.FONT_HERSHEY_SIMPLEX, 
#                    0.5, (255, 255, 255), 2)
        
#         return frame

#     def _draw_enhanced_team_ball_control(self, frame, frame_num, team_ball_control):
#         """Enhanced possession display with additional stats."""
#         overlay = frame.copy()
        
#         # Larger, more informative panel
#         panel_height = 100
#         cv2.rectangle(overlay, (10, 10), (400, panel_height), (0, 0, 0), -1)
#         cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame)
        
#         # Calculate possession percentages
#         team_1_frames = np.sum(team_ball_control[:frame_num + 1] == 1)
#         team_2_frames = np.sum(team_ball_control[:frame_num + 1] == 2)
#         total = max(1, team_1_frames + team_2_frames)
        
#         p1 = (team_1_frames / total) * 100
#         p2 = (team_2_frames / total) * 100
        
#         # Enhanced display
#         cv2.putText(frame, "⚽ POSSESSION STATS", (20, 30), 
#                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        
#         # Team 1 stats
#         cv2.putText(frame, f"Team 1: {p1:.1f}%", (20, 55), 
#                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 255, 100), 2)
        
#         # Team 2 stats  
#         cv2.putText(frame, f"Team 2: {p2:.1f}%", (20, 80), 
#                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 255), 2)
        
#         # Add possession bar
#         bar_width = 300
#         bar_height = 8
#         bar_x, bar_y = 20, 90
        
#         # Background bar
#         cv2.rectangle(frame, (bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height), 
#                      (50, 50, 50), -1)
        
#         # Team 1 portion
#         team1_width = int(bar_width * p1 / 100)
#         cv2.rectangle(frame, (bar_x, bar_y), (bar_x + team1_width, bar_y + bar_height), 
#                      (100, 255, 100), -1)
        
#         # Team 2 portion
#         team2_width = int(bar_width * p2 / 100)
#         cv2.rectangle(frame, (bar_x + team1_width, bar_y), 
#                      (bar_x + team1_width + team2_width, bar_y + bar_height), 
#                      (100, 100, 255), -1)
        
#         return frame

#     def _draw_enhanced_commentary_overlay(self, frame, text):
#         """Enhanced commentary display with better formatting."""
#         h, w, _ = frame.shape
#         font = cv2.FONT_HERSHEY_SIMPLEX
#         thickness = 2
        
#         # Calculate optimal font size
#         font_scale = 0.8
#         (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        
#         target_w = w * 0.95
#         while text_w > target_w and font_scale > 0.3:
#             font_scale -= 0.05
#             (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        
#         # Enhanced banner
#         banner_h = text_h + 30
#         overlay = frame.copy()
        
#         # Gradient background effect
#         for i in range(banner_h):
#             alpha = 0.8 * (1 - i / banner_h)
#             cv2.rectangle(overlay, (0, h - banner_h + i), (w, h - banner_h + i + 1), 
#                          (0, 0, 0), -1)
        
#         cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
        
#         # Add border
#         cv2.rectangle(frame, (0, h - banner_h), (w, h), (100, 100, 100), 2)
        
#         # Center text
#         text_x = (w - text_w) // 2
#         text_y = h - 15
        
#         # Add text shadow
#         cv2.putText(frame, text, (text_x + 2, text_y + 2), font, font_scale, 
#                    (0, 0, 0), thickness + 1)
        
#         # Add main text
#         cv2.putText(frame, text, (text_x, text_y), font, font_scale, 
#                    (255, 255, 255), thickness)
        
#         return frame

# # Keep all other existing classes (EventDetector, TeamAssigner, etc.) as they were


In [20]:
# # Enhanced Event Detection
# class EnhancedEventDetector:
#     def __init__(self):
#         self.shot_speed_threshold = 20  # Enhanced threshold
#         self.pass_speed_threshold = 8
#         self.frame_rate = 24
#         self.min_pass_distance = 3  # Minimum distance for pass
#         self.max_possession_gap = 10  # Frames
        
#     def detect_events(self, tracks):
#         """Enhanced event detection with more event types."""
#         player_assigner = PlayerBallAssigner()
#         ball_possession_log = []
#         ball_speeds = []
        
#         # Track ball possession and speeds
#         for frame_num in range(len(tracks['players'])):
#             player_track = tracks['players'][frame_num]
#             ball_data = tracks['ball'][frame_num].get(1, {})
#             ball_bbox = ball_data.get('bbox')
            
#             assigned_player_id = player_assigner.assign_ball_to_player(player_track, ball_bbox) if ball_bbox else -1
#             ball_possession_log.append(assigned_player_id)
            
#             # Calculate ball speed
#             if frame_num > 0 and ball_data.get('position_transformed'):
#                 prev_ball = tracks['ball'][frame_num-1].get(1, {})
#                 if prev_ball.get('position_transformed'):
#                     speed = measure_distance(
#                         ball_data['position_transformed'],
#                         prev_ball['position_transformed']
#                     ) * self.frame_rate
#                     ball_speeds.append(speed)
#                 else:
#                     ball_speeds.append(0)
#             else:
#                 ball_speeds.append(0)

#         # Detect various events
#         events = []
#         last_player_with_ball = -1
#         pass_start_info = {}
        
#         for frame_num, current_player_id in enumerate(ball_possession_log):
#             ball_data = tracks['ball'][frame_num].get(1, {})
#             ball_pos = ball_data.get('position_transformed')
#             ball_speed = ball_speeds[frame_num] if frame_num < len(ball_speeds) else 0
            
#             if not ball_pos:
#                 continue
                
#             # Detect passes
#             is_valid_pass = (current_player_id != last_player_with_ball and 
#                            last_player_with_ball != -1 and 
#                            current_player_id != -1)
                           
#             if is_valid_pass and pass_start_info:
#                 start_frame = pass_start_info.get('frame', frame_num - 1)
#                 if start_frame in range(len(tracks['players'])):
#                     start_player_data = tracks['players'][start_frame].get(last_player_with_ball, {})
#                     end_player_data = tracks['players'][frame_num].get(current_player_id, {})
                    
#                     start_team = start_player_data.get('team')
#                     end_team = end_player_data.get('team')
                    
#                     # Same team pass
#                     if start_team == end_team and start_team is not None:
#                         pass_distance = measure_distance(pass_start_info['position'], ball_pos)
                        
#                         # Classify pass type
#                         pass_type = "Pass"
#                         if pass_distance > 20:
#                             pass_type = "Long Pass"
#                         elif ball_speed > 15:
#                             pass_type = "Fast Pass"
                        
#                         start_jersey = start_player_data.get('jersey_number', f'P{last_player_with_ball}')
#                         end_jersey = end_player_data.get('jersey_number', f'P{current_player_id}')
                        
#                         events.append({
#                             "type_name": pass_type,
#                             "player_name": start_jersey,
#                             "team_name": f"Team {start_team}",
#                             "x": pass_start_info['position'][0],
#                             "y": pass_start_info['position'][1],
#                             "end_x": ball_pos[0],
#                             "end_y": ball_pos[1],
#                             "end_player": end_jersey,
#                             "distance": pass_distance,
#                             "speed": ball_speed,
#                             "minute": int(frame_num / (self.frame_rate * 60)),
#                             "second": int((frame_num / self.frame_rate) % 60)
#                         })
            
#             # Detect shots (high-speed ball movement toward goal)
#             if ball_speed > self.shot_speed_threshold:
#                 # Estimate if ball is moving toward goal area
#                 if ball_pos[1] < 10 or ball_pos[1] > 42:  # Near goal areas
#                     player_data = tracks['players'][frame_num].get(current_player_id, {}) if current_player_id != -1 else {}
#                     player_jersey = player_data.get('jersey_number', f'P{current_player_id}' if current_player_id != -1 else 'Unknown')
#                     team = player_data.get('team', 'Unknown')
                    
#                     events.append({
#                         "type_name": "Shot",
#                         "player_name": player_jersey,
#                         "team_name": f"Team {team}",
#                         "x": ball_pos[0],
#                         "y": ball_pos[1],
#                         "speed": ball_speed,
#                         "minute": int(frame_num / (self.frame_rate * 60)),
#                         "second": int((frame_num / self.frame_rate) % 60)
#                     })
            
#             # Detect possession changes (interceptions)
#             if (current_player_id != -1 and last_player_with_ball != -1 and
#                 current_player_id != last_player_with_ball):
                
#                 curr_player = tracks['players'][frame_num].get(current_player_id, {})
#                 prev_player_frame = max(0, frame_num - 5)
#                 prev_player = tracks['players'][prev_player_frame].get(last_player_with_ball, {})
                
#                 curr_team = curr_player.get('team')
#                 prev_team = prev_player.get('team')
                
#                 if curr_team != prev_team and curr_team is not None and prev_team is not None:
#                     curr_jersey = curr_player.get('jersey_number', f'P{current_player_id}')
                    
#                     events.append({
#                         "type_name": "Interception",
#                         "player_name": curr_jersey,
#                         "team_name": f"Team {curr_team}",
#                         "x": ball_pos[0],
#                         "y": ball_pos[1],
#                         "minute": int(frame_num / (self.frame_rate * 60)),
#                         "second": int((frame_num / self.frame_rate) % 60)
#                     })
            
#             # Update tracking info
#             if current_player_id != -1:
#                 pass_start_info = {'frame': frame_num, 'position': ball_pos}
#                 last_player_with_ball = current_player_id
        
#         return pd.DataFrame(events)

# # Keep existing classes with same implementation
# class TeamAssigner:
#     def __init__(self):
#         self.team_colors, self.player_team_dict, self.kmeans = {}, {}, None
#     def get_player_color(self, frame, bbox):
#         image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
#         if image.size == 0: return np.array([0,0,0])
#         top_half = image[0:int(image.shape[0] / 2), :]
#         if top_half.size == 0: return np.array([0,0,0])
#         kmeans = KMeans(n_clusters=2, init="k-means++", n_init=1, random_state=0).fit(top_half.reshape(-1, 3))
#         labels = kmeans.labels_.reshape(top_half.shape[0], top_half.shape[1])
#         corner_clusters = [labels[0, 0], labels[0, -1], labels[-1, 0], labels[-1, -1]]
#         non_player_cluster = max(set(corner_clusters), key=corner_clusters.count)
#         return kmeans.cluster_centers_[1 - non_player_cluster]
#     def assign_team_color(self, frame, player_detections):
#         if not player_detections: return
#         colors = [self.get_player_color(frame, det["bbox"]) for _, det in player_detections.items()]
#         self.kmeans = KMeans(n_clusters=2, init="k-means++", n_init=10, random_state=0).fit(colors)
#         self.team_colors[1], self.team_colors[2] = self.kmeans.cluster_centers_
#     def get_player_team(self, frame, bbox, player_id):
#         if player_id in self.player_team_dict: return self.player_team_dict[player_id]
#         if self.kmeans is None: return 0
#         color = self.get_player_color(frame, bbox)
#         team_id = self.kmeans.predict(color.reshape(1, -1))[0] + 1
#         self.player_team_dict[player_id] = team_id
#         return team_id

# class PlayerBallAssigner:
#     def __init__(self): 
#         self.max_dist = 70
#         self.history = deque(maxlen=10)  # Track assignment history
        
#     def assign_ball_to_player(self, players, ball_bbox):
#         if not ball_bbox: return -1
#         ball_pos = get_center_of_bbox(ball_bbox)
#         min_dist = float('inf')
#         assigned_player = -1
        
#         candidates = []
#         for id, player in players.items():
#             foot_pos = get_foot_position(player['bbox'])
#             dist = measure_distance(foot_pos, ball_pos)
#             if dist < self.max_dist:
#                 candidates.append((id, dist))
        
#         # Sort by distance and choose closest
#         if candidates:
#             candidates.sort(key=lambda x: x[1])
#             assigned_player = candidates[0][0]
            
#         self.history.append(assigned_player)
#         return assigned_player

# class CameraMovementEstimator:
#     def __init__(self, frame):
#         self.lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
#         self.features = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
#     def get_camera_movement(self, frames, read_from_stub=False, stub_path=None):
#         if read_from_stub and stub_path and os.path.exists(stub_path):
#             with open(stub_path, 'rb') as f: return pickle.load(f)
#         movements = [[0, 0]] * len(frames)
#         old_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
#         old_features = cv2.goodFeaturesToTrack(old_gray, **self.features)
#         for i in range(1, len(frames)):
#             new_gray = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
#             new_features, status, _ = cv2.calcOpticalFlowPyrLK(old_gray, new_gray, old_features, None, **self.lk_params)
            
#             good_new = new_features[status==1]
#             good_old = old_features[status==1]

#             move_x, move_y = 0, 0
#             if len(good_new) > 0:
#                 move_x, move_y = np.mean(good_old - good_new, axis=0).ravel()

#             movements[i] = [move_x, move_y]
#             old_gray = new_gray.copy()
#             old_features = good_new.reshape(-1, 1, 2)
#         if stub_path:
#             with open(stub_path, 'wb') as f: pickle.dump(movements, f)
#         return movements
#     def add_adjust_positions_to_tracks(self, tracks, movements):
#         for type, obj_tracks in tracks.items():
#             for i, track in enumerate(obj_tracks):
#                 for id, info in track.items():
#                     info['position_adjusted'] = (info['position'][0] + movements[i][0], info['position'][1] + movements[i][1])

# class ViewTransformer:
#     def __init__(self):
#         court_w, court_l = 34, 52.5
#         self.pixel_verts = np.float32([[110, 1035], [265, 275], [910, 260], [1640, 915]])
#         self.target_verts = np.float32([[0, court_w], [0, 0], [court_l, 0], [court_l, court_w]])
#         self.transformer = cv2.getPerspectiveTransform(self.pixel_verts, self.target_verts)
#     def transform_point(self, point):
#         p = (int(point[0]), int(point[1]))
#         is_inside = cv2.pointPolygonTest(self.pixel_verts, p, False) >= 0
#         if not is_inside: return None
#         reshaped = np.array(point).reshape(-1, 1, 2).astype(np.float32)
#         transformed = cv2.perspectiveTransform(reshaped, self.transformer)
#         return transformed.reshape(-1, 2)
#     def add_transformed_position_to_tracks(self, tracks):
#         for type, obj_tracks in tracks.items():
#             for track in obj_tracks:
#                 for id, info in track.items():
#                     pos = info.get('position_adjusted', info.get('position'))
#                     if pos:
#                         transformed = self.transform_point(pos)
#                         info['position_transformed'] = transformed.squeeze().tolist() if transformed is not None else None

# class EnhancedSpeedAndDistanceEstimator:
#     def __init__(self):
#         self.frame_window = 5  # Smaller window for more responsive speed
#         self.frame_rate = 24
#         self.speed_history = {}  # Track speed history for smoothing
        
#     def add_speed_and_distance_to_tracks(self, tracks):
#         """Enhanced speed calculation with smoothing."""
#         total_dist = {}
        
#         for type, obj_tracks in tracks.items():
#             if type not in ["players", "referees"]: continue
            
#             for i in range(len(obj_tracks)):
#                 for id, info in obj_tracks[i].items():
#                     if i > 0:
#                         prev_info = tracks[type][i-1].get(id)
#                         if prev_info and info.get('position_transformed') and prev_info.get('position_transformed'):
#                             # Calculate distance and speed
#                             dist = measure_distance(info['position_transformed'], prev_info['position_transformed'])
#                             total_dist[id] = total_dist.get(id, 0) + dist
#                             instantaneous_speed = dist * self.frame_rate * 3.6  # km/h
                            
#                             # Smooth speed using history
#                             if id not in self.speed_history:
#                                 self.speed_history[id] = deque(maxlen=self.frame_window)
                            
#                             self.speed_history[id].append(instantaneous_speed)
#                             smoothed_speed = np.mean(list(self.speed_history[id]))
                            
#                             info['speed'] = smoothed_speed
#                             info['distance'] = total_dist[id]
#                             info['instantaneous_speed'] = instantaneous_speed
#                     else:
#                         info['speed'] = 0
#                         info['distance'] = 0
                        
#     def draw_speed_and_distance(self, frames, tracks):
#         """Enhanced speed visualization."""
#         output_frames = []
#         for i, frame in enumerate(frames):
#             for type, obj_tracks in tracks.items():
#                 if type not in ["players", "referees"]: continue
#                 for id, info in obj_tracks[i].items():
#                     if "speed" in info and info['speed'] > 5:  # Only show significant speeds
#                         x, y = get_foot_position(info['bbox'])
                        
#                         # Color code speed
#                         speed = info['speed']
#                         if speed > 25:
#                             color = (0, 0, 255)  # Red for high speed
#                         elif speed > 15:
#                             color = (0, 165, 255)  # Orange for medium speed
#                         else:
#                             color = (0, 255, 255)  # Yellow for low speed
                            
#                         # Enhanced speed display
#                         speed_text = f"{speed:.1f}"
#                         cv2.putText(frame, speed_text, (x - 15, y + 25), 
#                                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
#                         cv2.putText(frame, "km/h", (x - 10, y + 40), 
#                                   cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1)
                                  
#             output_frames.append(frame)
#         return output_frames


In [22]:
# def enhanced_main():
#     # --- ENHANCED SETUP ---
#     INPUT_VIDEO_PATH = "/kaggle/input/football-video2/CityUtdR.mp4"
#     STUB_PATH = "/kaggle/working/enhanced_tracks_stub.pkl"
#     OUTPUT_VIDEO_PATH = "/kaggle/working/enhanced_football_analysis-blip.mp4"
    
#     print("🚀 Starting Enhanced Football Analysis System")
#     print("=" * 60)
    
#     # Load video
#     frames = read_video(INPUT_VIDEO_PATH)
#     if not frames:
#         print("❌ Video file not found or could not be read. Check the path.")
#         return None

#     cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
#     fps = cap.get(cv2.CAP_PROP_FPS) or 24
#     cap.release()
#     print(f"📺 Video loaded: {len(frames)} frames at {fps} FPS")

#     # --- INITIALIZE ENHANCED MODULES ---
#     print("\n🔧 Initializing Enhanced Components...")
    
#     tracker = EnhancedTracker('yolov8x.pt')
#     commentary_engine = AdvancedCommentaryEngine(fps=fps)
#     camera_estimator = CameraMovementEstimator(frames[0])
#     view_transformer = ViewTransformer()
#     speed_estimator = EnhancedSpeedAndDistanceEstimator()
#     team_assigner = TeamAssigner()
#     player_assigner = PlayerBallAssigner()
#     ticker = RealTimeTicker(fps=fps)
#     event_detector = EnhancedEventDetector()

#     # --- STAGE 1: ENHANCED TRACKING ---
#     print("\n🎯 Stage 1: Enhanced Object Detection and Tracking...")
#     tracks = tracker.get_object_tracks(frames, read_from_stub=False, stub_path=STUB_PATH)
#     tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])
#     tracker.add_position_to_tracks(tracks)
    
#     # --- STAGE 2: MOTION & PERSPECTIVE ANALYSIS ---
#     print("\n📐 Stage 2: Camera Motion and Perspective Transformation...")
#     camera_movement = camera_estimator.get_camera_movement(frames)
#     camera_estimator.add_adjust_positions_to_tracks(tracks, camera_movement)
#     view_transformer.add_transformed_position_to_tracks(tracks)
#     speed_estimator.add_speed_and_distance_to_tracks(tracks)
    
#     # --- STAGE 3: ENHANCED TEAM ASSIGNMENT ---
#     print("\n👥 Stage 3: Team Assignment and Player Identification...")
#     team_assigner.assign_team_color(frames[0], tracks['players'][0])
    
#     for frame_num, frame in enumerate(frames):
#         if frame_num % 100 == 0:
#             print(f"   Processing team assignment: {frame_num}/{len(frames)}")
            
#         player_track = tracks['players'][frame_num]
#         for player_id, track in player_track.items():
#             team = team_assigner.get_player_team(frame, track['bbox'], player_id)
#             tracks['players'][frame_num][player_id]['team'] = team
#             tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors.get(team, (0,0,255))
    
#     # --- STAGE 4: ENHANCED EVENT DETECTION ---
#     print("\n⚽ Stage 4: Advanced Event Detection...")
#     events_df = event_detector.detect_events(tracks)
#     print(f"   Detected {len(events_df)} events:")
#     if not events_df.empty:
#         event_summary = events_df['type_name'].value_counts()
#         for event_type, count in event_summary.items():
#             print(f"     - {event_type}: {count}")
    
#     # --- STAGE 5: ENHANCED COMMENTARY GENERATION ---
#     print("\n🎙️ Stage 5: Advanced Ball Possession & AI Commentary...")
#     team_ball_control = []
#     ticker_history = []
#     advanced_commentary_history = []
    
#     commentary_interval = max(1, len(frames) // 20)  # Generate commentary 20 times
    
#     for frame_num, frame in enumerate(frames):
#         if frame_num % 200 == 0:
#             print(f"   Commentary progress: {frame_num}/{len(frames)} frames")
        
#         player_track = tracks['players'][frame_num]
#         ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
        
#         # Reset ball possession flags
#         for player_id in tracks['players'][frame_num]:
#             tracks['players'][frame_num][player_id]['has_ball'] = False
        
#         # Assign ball possession
#         assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)
#         if assigned_player != -1:
#             tracks['players'][frame_num][assigned_player]['has_ball'] = True
#             team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
#         else:
#             team_ball_control.append(team_ball_control[-1] if team_ball_control else 0)
        
#         # Update ticker and commentary
#         ticker_history.append(ticker.update(tracks, frame_num))
#         commentary_engine.update_with_context(frame, tracks, frame_num, events_df)
#         advanced_commentary_history.append(commentary_engine.latest_commentary)

#     team_ball_control = np.array(team_ball_control)

#     # --- STAGE 6: ENHANCED VISUALIZATION ---
#     print("\n🎨 Stage 6: Enhanced Video Generation...")
#     display_commentary = ticker_history.copy()
    
#     # Apply advanced commentary at strategic points
#     last_advanced_comment = advanced_commentary_history[0]
#     for i, comment in enumerate(advanced_commentary_history):
#         if comment != last_advanced_comment and len(comment) > 50:  # Substantial commentary
#             # Apply commentary to a range of frames
#             start_frame = max(0, i - commentary_engine.clip_length_frames)
#             end_frame = min(len(display_commentary), i + commentary_engine.clip_length_frames//2)
            
#             for j in range(start_frame, end_frame):
#                 if j < len(display_commentary):
#                     display_commentary[j] = comment
#             last_advanced_comment = comment

#     # Generate enhanced output frames
#     output_frames = []
#     for frame_num, frame in enumerate(frames):
#         if frame_num % 100 == 0:
#             print(f"   Rendering: {frame_num}/{len(frames)} frames")
            
#         frame_copy = frame.copy()
#         current_commentary = display_commentary[frame_num] if frame_num < len(display_commentary) else ""
        
#         player_dict = tracks["players"][frame_num]
#         ball_dict = tracks.get("ball", [])[frame_num]
        
#         # Draw enhanced players
#         for track_id, player in player_dict.items():
#             color = player.get("team_color", (0, 0, 255))
#             has_ball = player.get('has_ball', False)
#             jersey_num = player.get("jersey_number", f"ID{track_id}")
            
#             frame_copy = tracker._draw_enhanced_player_ellipse(
#                 frame_copy, player["bbox"], color, track_id, jersey_num, has_ball
#             )
        
#         # Draw enhanced ball
#         if 1 in ball_dict:
#             frame_copy = tracker._draw_enhanced_triangle(
#                 frame_copy, ball_dict[1]["bbox"], (0, 255, 0), "⚽"
#             )
        
#         # Draw enhanced UI elements
#         frame_copy = tracker._draw_enhanced_team_ball_control(
#             frame_copy, frame_num, team_ball_control
#         )
#         frame_copy = tracker._draw_enhanced_commentary_overlay(
#             frame_copy, current_commentary
#         )
        
#         output_frames.append(frame_copy)
    
#     # Apply enhanced speed visualization
#     output_frames = speed_estimator.draw_speed_and_distance(output_frames, tracks)
    
#     # Save enhanced video
#     save_video(output_frames, OUTPUT_VIDEO_PATH, fps)

#     # --- ENHANCED FINAL STATISTICS ---
#     print("\n" + "🏆" + "="*58 + "🏆")
#     print("            ENHANCED MATCH ANALYSIS COMPLETE")
#     print("🏆" + "="*58 + "🏆")
    
#     # Jersey detection stats
#     jersey_stats = tracker.jersey_recognizer.get_detection_stats()
#     print(f"👕 Jersey Recognition: {jersey_stats['successful_detections']}/{jersey_stats['total_players']} players ({jersey_stats['detection_rate']:.1f}%)")
    
#     # Event detection stats
#     if not events_df.empty:
#         print(f"⚽ Events Detected: {len(events_df)} total")
#         for event_type, count in events_df['type_name'].value_counts().items():
#             print(f"   - {event_type}: {count}")
    
#     # Possession stats
#     total_frames = len(team_ball_control)
#     team1_possession = np.sum(team_ball_control == 1) / total_frames * 100
#     team2_possession = np.sum(team_ball_control == 2) / total_frames * 100
#     print(f"📊 Final Possession: Team 1: {team1_possession:.1f}%, Team 2: {team2_possession:.1f}%")
    
#     # Commentary stats
#     unique_comments = len(set(advanced_commentary_history))
#     print(f"🎙️ Commentary Generated: {unique_comments} unique insights")
    
#     print(f"✅ Enhanced video saved: {OUTPUT_VIDEO_PATH}")
#     print(f"📁 File size: {os.path.getsize(OUTPUT_VIDEO_PATH) / (1024*1024):.1f} MB")
    
#     return {
#         'tracks': tracks,
#         'events': events_df,
#         'team_ball_control': team_ball_control,
#         'jersey_stats': jersey_stats,
#         'output_path': OUTPUT_VIDEO_PATH
#     }

# # Run the enhanced system
# if __name__ == "__main__":
#     results = enhanced_main()


🚀 Starting Enhanced Football Analysis System
✅ Loaded 268/284 frames from video
📺 Video loaded: 268 frames at 25.0 FPS

🔧 Initializing Enhanced Components...




⚠️ PaddleOCR initialization failed, using EasyOCR only
✅ Advanced Jersey OCR module initialized.
🎙️ Initializing Advanced Vision Commentary Engine...
Loading BLIP2 for detailed scene analysis...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

✅ BLIP2 model loaded successfully
Loading BLIP for quick analysis...
✅ BLIP model loaded successfully

🎯 Stage 1: Enhanced Object Detection and Tracking...
🎯 Starting enhanced tracking on 268 frames...
📊 Processing frame 0/268 (0.0%)
📊 Processing frame 50/268 (18.7%)
📊 Processing frame 100/268 (37.3%)
📊 Processing frame 150/268 (56.0%)
📊 Processing frame 200/268 (74.6%)
📊 Processing frame 250/268 (93.3%)
🏆 Jersey Detection Stats: 3/3 (100.0% success rate)
💾 Tracks saved to cache: /kaggle/working/enhanced_tracks_stub.pkl

📐 Stage 2: Camera Motion and Perspective Transformation...

👥 Stage 3: Team Assignment and Player Identification...
   Processing team assignment: 0/268
   Processing team assignment: 100/268
   Processing team assignment: 200/268

⚽ Stage 4: Advanced Event Detection...
   Detected 54 events:
     - Shot: 41
     - Interception: 7
     - Fast Pass: 5
     - Pass: 1

🎙️ Stage 5: Advanced Ball Possession & AI Commentary...
   Commentary progress: 0/268 frames




Generating enhanced commentary... (Frame 59)




Generating enhanced commentary... (Frame 74)




Generating enhanced commentary... (Frame 119)




Generating enhanced commentary... (Frame 149)




Generating enhanced commentary... (Frame 179)




   Commentary progress: 200/268 frames
Generating enhanced commentary... (Frame 224)




Generating enhanced commentary... (Frame 239)

🎨 Stage 6: Enhanced Video Generation...
   Rendering: 0/268 frames
   Rendering: 100/268 frames
   Rendering: 200/268 frames
✅ Video saved to: /kaggle/working/enhanced_football_analysis.mp4

            ENHANCED MATCH ANALYSIS COMPLETE
👕 Jersey Recognition: 3/3 players (100.0%)
⚽ Events Detected: 54 total
   - Shot: 41
   - Interception: 7
   - Fast Pass: 5
   - Pass: 1
📊 Final Possession: Team 1: 34.7%, Team 2: 65.3%
🎙️ Commentary Generated: 8 unique insights
✅ Enhanced video saved: /kaggle/working/enhanced_football_analysis.mp4


OpenCV: FFMPEG: tag 0x34363248/'H264' is not supported with codec id 27 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x31637661/'avc1'
[ERROR:0@3345.661] global cap_ffmpeg_impl.hpp:3203 open Could not find encoder for codec_id=27, error: Encoder not found
[ERROR:0@3345.661] global cap_ffmpeg_impl.hpp:3281 open VIDEOIO/FFMPEG: Failed to initialize VideoWriter


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/working/enhanced_football_analysis.mp4'

# CogVLM2-Llama3-Caption

In [1]:
!pip install ultralytics supervision numpy opencv-python scikit-learn pandas
!pip install --upgrade ultralytics torch torchvision
!pip install mplsoccer transformers accelerate


Collecting ultralytics
  Downloading ultralytics-8.3.186-py3-none-any.whl.metadata (37 kB)
Collecting supervision
  Downloading supervision-0.26.1-py3-none-any.whl.metadata (13 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.16-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)

In [2]:
!pip install --upgrade torch torchvision transformers accelerate


Collecting transformers
  Downloading transformers-4.55.4-py3-none-any.whl.metadata (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.0/42.0 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Collecting accelerate
  Downloading accelerate-1.10.1-py3-none-any.whl.metadata (19 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.34.4-py3-none-any.whl.metadata (14 kB)
Downloading transformers-4.55.4-py3-none-any.whl (11.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.3/11.3 MB[0m [31m81.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hDownloading accelerate-1.10.1-py3-none-any.whl (374 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.9/374.9 kB[0m [31m26.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading huggingface_hub-0.34.4-py3-none-any.whl (561 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m561.5/561.5 kB[0m [31m35.1 MB/s[0m eta [36m0:00:00[0m
[

In [3]:
import sys
import os
import cv2
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import time

# Machine Learning & Computer Vision Libraries
from ultralytics import YOLO
import supervision as sv
from sklearn.cluster import KMeans
import easyocr

# CogVLM2 for AI Commentary (replacing Gemini)
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoProcessor
import torch
from PIL import Image

# Plotting for Heatmaps
from mplsoccer import Pitch

# --- Video Utilities ---
def read_video(video_path):
    """Reads a video file and returns a list of its frames."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

def save_video(output_video_frames, output_video_path):
    """Saves a list of frames as a video file."""
    if not output_video_frames:
        print("No frames to save.")
        return
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, 24, (output_video_frames[0].shape[1], output_video_frames[0].shape[0]))
    for frame in output_video_frames:
        out.write(frame)
    out.release()

# --- BBox Utilities ---
def get_center_of_bbox(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def get_bbox_width(bbox):
    return int(bbox[2] - bbox[0])

def get_foot_position(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int(y2)
 
def measure_distance(p1, p2):
    return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5

def measure_xy_distance(p1, p2):
    return p1[0] - p2[0], p1[1] - p2[1]


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


E0000 00:00:1756202747.553406      36 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1756202747.614416      36 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [9]:
!pip install pytorchvideo

Collecting pytorchvideo
  Downloading pytorchvideo-0.1.5.tar.gz (132 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m132.7/132.7 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fvcore (from pytorchvideo)
  Downloading fvcore-0.1.5.post20221221.tar.gz (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.2/50.2 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting av (from pytorchvideo)
  Downloading av-15.0.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting parameterized (from pytorchvideo)
  Downloading parameterized-0.9.0-py2.py3-none-any.whl.metadata (18 kB)
Collecting iopath (from pytorchvideo)
  Downloading iopath-0.1.10.tar.gz (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... 

In [11]:
import math
from collections import deque
from typing import List, Dict, Any, Optional

import torch
import cv2
from PIL import Image
from transformers import AutoTokenizer, AutoModelForCausalLM

class ImprovedCommentaryEngine:
    def __init__(self, clip_duration_seconds=5, fps=24, keyframes=3, max_words=25):
        self.clip_length_frames = int(clip_duration_seconds * fps)
        self.frame_buffer = deque(maxlen=self.clip_length_frames)
        self.latest_commentary = "Match analysis is starting..."
        self.fps = fps
        self.keyframes = max(1, min(keyframes, 5))  # keep small for latency
        self.max_words = max(6, max_words)

        self.match_context = {
            'possession_changes': [], 'recent_events': [],
            'ball_position_history': [], 'player_movements': []
        }

        print("🎙️ Initializing CogVLM2 Commentary Engine...")
        self.model = None
        self.tokenizer = None
        try:
            self.model_path = "THUDM/cogvlm2-llama3-caption"
            self.device = "cuda" if torch.cuda.is_available() else "cpu"

            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model_path,
                trust_remote_code=True
            )
            self.model = AutoModelForCausalLM.from_pretrained(
                self.model_path,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
                trust_remote_code=True,
                device_map="auto" if self.device == "cuda" else None
            )
            if self.device == "cpu":
                self.model.to(self.device)

            self._compiled = False
            print(f"✅ CogVLM2-Llama3-Caption model loaded on {self.device}.")
        except Exception as e:
            print(f"⚠️ Could not initialize CogVLM2 model: {e}")

    # ---------- public API ----------
    def update_with_context(self, frame, tracks_data, frame_num, events_data=None):
        if self.model is None:
            return  # graceful: use previous commentary

        game_context = self._extract_game_context(tracks_data, frame_num, events_data)
        self.match_context['recent_events'].append(game_context)
        if len(self.match_context['recent_events']) > 10:
            self.match_context['recent_events'].pop(0)

        self.frame_buffer.append(frame)

        if len(self.frame_buffer) == self.clip_length_frames:
            print("Generating tactical summary...")
            try:
                new_comment = self._generate_contextual_commentary(game_context)
            except Exception as e:
                print(f"Commentary generation error: {e}")
                new_comment = self._generate_fallback_commentary(game_context)

            if new_comment:
                self.latest_commentary = new_comment
            self.frame_buffer.clear()

    # ---------- context extraction ----------
    def _extract_game_context(self, tracks_data, frame_num, events_data):
        players_track = tracks_data.get('players', [])
        ball_track = tracks_data.get('ball', [])

        # guard against missing indices
        players_at_f = players_track[frame_num] if frame_num < len(players_track) else {}
        ball_at_f = ball_track[frame_num] if frame_num < len(ball_track) else {}

        # infer match clock
        total_sec = frame_num / self.fps
        minutes = int(total_sec // 60)
        seconds = int(total_sec % 60)

        # possession
        possession = None
        if isinstance(players_at_f, dict):
            for pid, pinfo in players_at_f.items():
                if pinfo.get('has_ball', False):
                    possession = f"Player {pid} (Team {pinfo.get('team', 'Unknown')})"
                    break

        # ball presence robust check
        ball_detected = False
        if isinstance(ball_at_f, dict):
            # consider presence if key 'visible' or track dict non-empty
            ball_detected = bool(ball_at_f) or bool(ball_at_f.get('visible', False))
        elif isinstance(ball_at_f, (list, tuple, set)):
            ball_detected = len(ball_at_f) > 0 or (1 in ball_at_f)

        # recent structured events in last 12s
        recent_records = []
        if events_data is not None and hasattr(events_data, "empty") and not events_data.empty:
            lower_bound = max(0, total_sec - 12.0)
            ev_secs = events_data['minute'] * 60 + events_data['second']
            recent = events_data[ev_secs >= lower_bound].tail(4)
            recent_records = recent.to_dict('records')

        return {
            'frame_num': frame_num,
            'timestamp': f"{minutes}:{seconds:02d}",
            'players_detected': len(players_at_f) if isinstance(players_at_f, dict) else 0,
            'ball_detected': ball_detected,
            'possession': possession,
            'ball_speed': ball_at_f.get('speed', 0) if isinstance(ball_at_f, dict) else 0,
            'recent_events': recent_records
        }

    # ---------- VLM prompting & decoding ----------
    def _generate_contextual_commentary(self, game_context):
        if self.model is None or self.tokenizer is None or len(self.frame_buffer) == 0:
            return self._generate_fallback_commentary(game_context)

        # choose K keyframes from the buffer
        images = self._sample_keyframes(self.frame_buffer, self.keyframes)
        pil_images = [self._to_pil(img) for img in images]

        prompt = self._create_detailed_prompt(game_context)

        # one-time torch.compile for speed (if PyTorch>=2 and cuda)
        if self.device == "cuda" and not self._compiled:
            try:
                self.model = torch.compile(self.model)  # no-op on older versions
                self._compiled = True
            except Exception:
                pass

        with torch.inference_mode():
            if self.device == "cuda":
                autocast_dtype = torch.float16
            else:
                autocast_dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32

            # Some CogVLM2 builds expect build_conversation_input_ids() with multiple images
            convo = self.model.build_conversation_input_ids(
                self.tokenizer,
                query=prompt,
                images=pil_images,  # <— multiple frames
                template_version='chat'
            )

            inputs = {
                'input_ids': convo['input_ids'].unsqueeze(0).to(self.device),
                'token_type_ids': convo['token_type_ids'].unsqueeze(0).to(self.device),
                'attention_mask': convo['attention_mask'].unsqueeze(0).to(self.device),
                'images': [[x.to(self.device).to(self.model.dtype) for x in convo['images']]]
            }

            with torch.autocast(device_type=self.device if self.device != "cpu" else "cpu",
                                dtype=autocast_dtype, enabled=(self.device!="cpu")):
                outputs = self.model.generate(
                    **inputs,
                    max_new_tokens=64,
                    do_sample=False,          # deterministic, clearer
                    temperature=0.0,
                    top_p=1.0,
                    repetition_penalty=1.1,
                    pad_token_id=self.tokenizer.eos_token_id,
                    eos_token_id=self.tokenizer.eos_token_id
                )

        raw = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:],
                                    skip_special_tokens=True)
        return self._postprocess_caption(raw)

    def _create_detailed_prompt(self, context):
        # compact events
        events_str = self._format_recent_events(context.get('recent_events', []))

        # tightly structured, single sentence, no hype words
        return (
            "You are a professional football (soccer) tactical analyst.\n"
            "TASK: Describe the most significant on-ball action shown across these frames.\n"
            "CONSTRAINTS:\n"
            f"- Time: {context['timestamp']}\n"
            f"- Possession: {context.get('possession', 'Unclear')}\n"
            f"- Recent: {events_str}\n"
            "- Style: factual, objective, 1 sentence, ≤25 words, no exclamations.\n"
            "FORMAT: <subject> <action> <outcome/intent>. Examples:\n"
            "- The red winger receives a diagonal pass, drives inside past one defender, and squares toward the penalty spot.\n"
            "- The blue fullback overlaps and delivers a low cross that is intercepted near the near post.\n"
            "Now write your single-sentence summary:"
        )

    def _format_recent_events(self, events):
        if not events:
            return "None"
        formatted = []
        for e in events[-3:]:
            if not isinstance(e, dict):
                continue
            et = e.get('type_name', 'Event')
            tm = e.get('team_name', 'Team')
            formatted.append(f"{et} – {tm}")
        return "; ".join(formatted) if formatted else "None"

    # ---------- helpers ----------
    def _sample_keyframes(self, buffer: deque, k: int) -> List[Any]:
        n = len(buffer)
        if k >= n:
            return list(buffer)
        # evenly spaced indices
        st


In [12]:
class JerseyNumberRecognizer:
    def __init__(self):
        self.reader = easyocr.Reader(['en'], gpu=True)
        self.jersey_cache = {}
        print("✅ Jersey OCR module initialized.")

    def recognize_jersey_number(self, player_crop, tracker_id):
        if tracker_id in self.jersey_cache: return self.jersey_cache[tracker_id]
        if player_crop.size == 0: return None
        
        crop_gray = cv2.cvtColor(player_crop, cv2.COLOR_BGR2GRAY)
        results = self.reader.readtext(crop_gray, allowlist='0123456789', detail=1)

        best_result = None
        for (bbox, text, prob) in results:
            if prob > 0.6 and text.isdigit() and len(text) <= 2:
                if best_result is None or prob > best_result[2]:
                    best_result = (bbox, text, prob)
        
        if best_result:
            self.jersey_cache[tracker_id] = best_result[1]
            return best_result[1]
        
        return None

class Tracker:
    def __init__(self, model_name='yolov8x.pt'):
        self.model = YOLO(model_name)
        self.tracker = sv.ByteTrack()
        self.jersey_recognizer = JerseyNumberRecognizer()

    def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)

        tracks = {"players": [], "referees": [], "ball": []}
        
        for frame_num, frame in enumerate(frames):
            if frame_num % 20 == 0: print(f"Processing frame {frame_num}/{len(frames)}")
            results = self.model.predict(frame, conf=0.1)[0]
            detections = sv.Detections.from_ultralytics(results)
            
            # Filter for players (class_id for 'person' is typically 0)
            player_detections = detections[detections.class_id == 0]
            tracked_players = self.tracker.update_with_detections(player_detections)
            
            tracks["players"].append({})
            tracks["referees"].append({})
            
            for detection_data in tracked_players:
                bbox = detection_data[0]
                track_id = detection_data[4]
                
                player_crop = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
                jersey_num = self.jersey_recognizer.recognize_jersey_number(player_crop, track_id)
                tracks["players"][frame_num][track_id] = {"bbox": bbox.tolist(), "jersey_number": jersey_num}

            # Filter for ball (class_id for 'sports ball' is typically 32)
            ball_detections = detections[detections.class_id == 32]
            tracks["ball"].append({})
            if len(ball_detections) > 0:
                tracks["ball"][frame_num][1] = {"bbox": ball_detections.xyxy[0].tolist()}
        
        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(tracks, f)
        return tracks

    def add_position_to_tracks(self, tracks):
        for type, obj_tracks in tracks.items():
            for frame_num, track in enumerate(obj_tracks):
                for id, info in track.items():
                    bbox = info['bbox']
                    info['position'] = get_foot_position(bbox) if type != 'ball' else get_center_of_bbox(bbox)
    
    def interpolate_ball_positions(self, ball_positions):
        ball_bboxes = [x.get(1, {}).get('bbox', []) for x in ball_positions]
        df = pd.DataFrame(ball_bboxes, columns=['x1', 'y1', 'x2', 'y2']).interpolate().bfill()
        return [{1: {"bbox": x}} for x in df.to_numpy().tolist()]

    def _draw_player_ellipse(self, frame, bbox, color, track_id, jersey_num):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)
        cv2.ellipse(frame, center=(x_center, y2), axes=(int(width), int(0.35 * width)), angle=0.0, startAngle=-45, endAngle=235, color=color, thickness=2, lineType=cv2.LINE_4)
        
        label = f"#{jersey_num}" if jersey_num else str(track_id)
        (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        rect_w, rect_h = w + 10, h + 10
        x1_rect, y1_rect = x_center - rect_w//2, (y2 - rect_h//2) + 15
        
        cv2.rectangle(frame, (x1_rect, y1_rect), (x1_rect + rect_w, y1_rect + rect_h), color, cv2.FILLED)
        cv2.putText(frame, label, (x1_rect + 5, y1_rect + h + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
        return frame

    def _draw_triangle(self, frame, bbox, color):
        y, x = int(bbox[1]), int(get_center_of_bbox(bbox)[0])
        points = np.array([[x, y], [x - 10, y - 20], [x + 10, y - 20]])
        cv2.drawContours(frame, [points], 0, color, cv2.FILLED)
        cv2.drawContours(frame, [points], 0, (0, 0, 0), 2)
        return frame

    def _draw_team_ball_control(self, frame, frame_num, team_ball_control):
        overlay = frame.copy()
        cv2.rectangle(overlay, (10, 10), (350, 70), (255, 255, 255), -1)
        cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
        
        team_1_frames = np.sum(team_ball_control[:frame_num + 1] == 1)
        team_2_frames = np.sum(team_ball_control[:frame_num + 1] == 2)
        total = max(1, team_1_frames + team_2_frames)
        p1 = (team_1_frames / total) * 100
        p2 = (team_2_frames / total) * 100
        
        cv2.putText(frame, f"Team 1 Possession: {p1:.1f}%", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        cv2.putText(frame, f"Team 2 Possession: {p2:.1f}%", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        return frame

    def _draw_commentary_overlay(self, frame, text):
        h, w, _ = frame.shape
        font = cv2.FONT_HERSHEY_SIMPLEX
        thickness = 2
        
        font_scale = 1.0
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        
        target_w = w * 0.9
        if text_w > target_w:
            font_scale = target_w / text_w
        
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)

        banner_h = text_h + 20
        overlay = frame.copy()
        cv2.rectangle(overlay, (0, h - banner_h), (w, h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
        
        text_x = (w - text_w) // 2
        text_y = h - 10
        cv2.putText(frame, text, (text_x, text_y), font, font_scale, (255, 255, 255), thickness)
        
        return frame

class EventDetector:
    def __init__(self):
        self.shot_speed_threshold_mps = 15
        self.frame_rate = 24

    def detect_events(self, tracks):
        player_assigner = PlayerBallAssigner()
        ball_possession_log = []
        for frame_num in range(len(tracks['players'])):
            player_track = tracks['players'][frame_num]
            ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
            assigned_player_id = player_assigner.assign_ball_to_player(player_track, ball_bbox) if ball_bbox else -1
            ball_possession_log.append(assigned_player_id)

        events = []
        last_player_with_ball, pass_start_info = -1, {}
        for frame_num, current_player_id in enumerate(ball_possession_log):
            ball_pos_transformed = tracks['ball'][frame_num].get(1, {}).get('position_transformed')
            if not ball_pos_transformed: continue

            is_valid_pass = (current_player_id != last_player_with_ball and last_player_with_ball != -1 and current_player_id != -1)
            if is_valid_pass:
                start_player_team = tracks['players'][pass_start_info['frame']][last_player_with_ball].get('team')
                end_player_team = tracks['players'][frame_num].get(current_player_id, {}).get('team')
                if start_player_team == end_player_team and start_player_team is not None:
                    events.append({
                        "type_name": "Pass", "player_name": f"Player_{last_player_with_ball}",
                        "team_name": f"Team {start_player_team}", "x": pass_start_info['position'][0],
                        "y": pass_start_info['position'][1], "end_x": ball_pos_transformed[0],
                        "end_y": ball_pos_transformed[1], "minute": int(frame_num / (self.frame_rate * 60)),
                        "second": int((frame_num / self.frame_rate) % 60)
                    })
            
            if current_player_id != -1:
                pass_start_info = {'frame': frame_num, 'position': ball_pos_transformed}
                last_player_with_ball = current_player_id
        
        return pd.DataFrame(events)

# Other classes (TeamAssigner, PlayerBallAssigner, etc.)
class TeamAssigner:
    def __init__(self):
        self.team_colors, self.player_team_dict, self.kmeans = {}, {}, None
    def get_player_color(self, frame, bbox):
        image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
        if image.size == 0: return np.array([0,0,0])
        top_half = image[0:int(image.shape[0] / 2), :]
        if top_half.size == 0: return np.array([0,0,0])
        kmeans = KMeans(n_clusters=2, init="k-means++", n_init=1, random_state=0).fit(top_half.reshape(-1, 3))
        labels = kmeans.labels_.reshape(top_half.shape[0], top_half.shape[1])
        corner_clusters = [labels[0, 0], labels[0, -1], labels[-1, 0], labels[-1, -1]]
        non_player_cluster = max(set(corner_clusters), key=corner_clusters.count)
        return kmeans.cluster_centers_[1 - non_player_cluster]
    def assign_team_color(self, frame, player_detections):
        if not player_detections: return
        colors = [self.get_player_color(frame, det["bbox"]) for _, det in player_detections.items()]
        self.kmeans = KMeans(n_clusters=2, init="k-means++", n_init=10, random_state=0).fit(colors)
        self.team_colors[1], self.team_colors[2] = self.kmeans.cluster_centers_
    def get_player_team(self, frame, bbox, player_id):
        if player_id in self.player_team_dict: return self.player_team_dict[player_id]
        if self.kmeans is None: return 0
        color = self.get_player_color(frame, bbox)
        team_id = self.kmeans.predict(color.reshape(1, -1))[0] + 1
        self.player_team_dict[player_id] = team_id
        return team_id

class PlayerBallAssigner:
    def __init__(self): self.max_dist = 70
    def assign_ball_to_player(self, players, ball_bbox):
        if not ball_bbox: return -1
        ball_pos, min_dist, assigned_player = get_center_of_bbox(ball_bbox), float('inf'), -1
        for id, player in players.items():
            dist = measure_distance(get_foot_position(player['bbox']), ball_pos)
            if dist < self.max_dist and dist < min_dist: min_dist, assigned_player = dist, id
        return assigned_player

class CameraMovementEstimator:
    def __init__(self, frame):
        self.lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
        self.features = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    def get_camera_movement(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)
        movements = [[0, 0]] * len(frames)
        old_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
        old_features = cv2.goodFeaturesToTrack(old_gray, **self.features)
        for i in range(1, len(frames)):
            new_gray = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
            new_features, status, _ = cv2.calcOpticalFlowPyrLK(old_gray, new_gray, old_features, None, **self.lk_params)
            
            good_new = new_features[status==1]
            good_old = old_features[status==1]

            move_x, move_y = 0, 0
            if len(good_new) > 0:
                move_x, move_y = np.mean(good_old - good_new, axis=0).ravel()

            movements[i] = [move_x, move_y]
            old_gray = new_gray.copy()
            old_features = good_new.reshape(-1, 1, 2)
        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(movements, f)
        return movements
    def add_adjust_positions_to_tracks(self, tracks, movements):
        for type, obj_tracks in tracks.items():
            for i, track in enumerate(obj_tracks):
                for id, info in track.items():
                    info['position_adjusted'] = (info['position'][0] + movements[i][0], info['position'][1] + movements[i][1])

class ViewTransformer:
    def __init__(self):
        court_w, court_l = 34, 52.5
        self.pixel_verts = np.float32([[110, 1035], [265, 275], [910, 260], [1640, 915]])
        self.target_verts = np.float32([[0, court_w], [0, 0], [court_l, 0], [court_l, court_w]])
        self.transformer = cv2.getPerspectiveTransform(self.pixel_verts, self.target_verts)
    def transform_point(self, point):
        p = (int(point[0]), int(point[1]))
        is_inside = cv2.pointPolygonTest(self.pixel_verts, p, False) >= 0
        if not is_inside: return None
        reshaped = np.array(point).reshape(-1, 1, 2).astype(np.float32)
        transformed = cv2.perspectiveTransform(reshaped, self.transformer)
        return transformed.reshape(-1, 2)
    def add_transformed_position_to_tracks(self, tracks):
        for type, obj_tracks in tracks.items():
            for track in obj_tracks:
                for id, info in track.items():
                    pos = info.get('position_adjusted', info.get('position'))
                    if pos:
                        transformed = self.transform_point(pos)
                        info['position_transformed'] = transformed.squeeze().tolist() if transformed is not None else None

class SpeedAndDistanceEstimator:
    def __init__(self):
        self.frame_window, self.frame_rate = 24, 24
    def add_speed_and_distance_to_tracks(self, tracks):
        total_dist = {}
        for type, obj_tracks in tracks.items():
            if type not in ["players", "referees"]: continue
            for i in range(len(obj_tracks)):
                for id, info in obj_tracks[i].items():
                    if i > 0:
                        prev_info = tracks[type][i-1].get(id)
                        if prev_info and info.get('position_transformed') and prev_info.get('position_transformed'):
                            dist = measure_distance(info['position_transformed'], prev_info['position_transformed'])
                            total_dist[id] = total_dist.get(id, 0) + dist
                            speed_mps = dist * self.frame_rate
                            info['speed'] = speed_mps * 3.6 # km/h
                            info['distance'] = total_dist[id]
    def draw_speed_and_distance(self, frames, tracks):
        output_frames = []
        for i, frame in enumerate(frames):
            for type, obj_tracks in tracks.items():
                if type not in ["players", "referees"]: continue
                for id, info in obj_tracks[i].items():
                    if "speed" in info:
                        x, y = get_foot_position(info['bbox'])
                        cv2.putText(frame, f"{info['speed']:.1f} km/h", (x - 20, y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
            output_frames.append(frame)
        return output_frames


In [13]:
def main():
    # --- SETUP ---
    INPUT_VIDEO_PATH = "/kaggle/input/football-video2/CityUtdR.mp4"
    STUB_PATH = "/kaggle/working/tracks_stub.pkl"
    OUTPUT_VIDEO_PATH = "/kaggle/working/final_analysis_video-Llama3-v1.mp4"
    
    frames = read_video(INPUT_VIDEO_PATH)
    if not frames:
        print("Video file not found or could not be read. Check the path.")
        return None

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    fps = cap.get(cv2.CAP_PROP_FPS) or 24
    cap.release()

    # --- INITIALIZE ALL MODULES ---
    tracker = Tracker('yolov8x.pt')
    commentary_engine = ImprovedCommentaryEngine(fps=fps)
    camera_estimator = CameraMovementEstimator(frames[0])
    view_transformer = ViewTransformer()
    speed_estimator = SpeedAndDistanceEstimator()
    team_assigner = TeamAssigner()
    player_assigner = PlayerBallAssigner()
    ticker = RealTimeTicker(fps=fps)

    # --- STAGE 1: TRACKING ---
    print("Stage 1: Performing object detection and tracking...")
    tracks = tracker.get_object_tracks(frames, read_from_stub=False, stub_path=STUB_PATH)
    tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])
    tracker.add_position_to_tracks(tracks)
    
    # --- STAGE 2: MOTION & PERSPECTIVE ---
    print("Stage 2: Estimating camera motion and transforming perspective...")
    camera_movement = camera_estimator.get_camera_movement(frames)
    camera_estimator.add_adjust_positions_to_tracks(tracks, camera_movement)
    view_transformer.add_transformed_position_to_tracks(tracks)
    speed_estimator.add_speed_and_distance_to_tracks(tracks)
    
    # --- STAGE 3: TEAM ASSIGNMENT ---
    print("Stage 3: Assigning teams...")
    team_assigner.assign_team_color(frames[0], tracks['players'][0])
    
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        for player_id, track in player_track.items():
            team = team_assigner.get_player_team(frame, track['bbox'], player_id)
            tracks['players'][frame_num][player_id]['team'] = team
            tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors.get(team, (0,0,255))
    
    # --- STAGE 4: GENERATE EVENTS DATA ---
    print("Stage 4: Detecting events for commentary context...")
    event_detector = EventDetector()
    events_df = event_detector.detect_events(tracks)
    print(f"Detected {len(events_df)} events for commentary context")
    
    # --- STAGE 5: BALL POSSESSION & COMMENTARY ---
    print("Stage 5: Tracking ball possession and generating all commentary...")
    team_ball_control = []
    ticker_history = []
    cogvlm_history = []
    
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
        
        for player_id in tracks['players'][frame_num]:
            tracks['players'][frame_num][player_id]['has_ball'] = False
        
        assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)
        if assigned_player != -1:
            tracks['players'][frame_num][assigned_player]['has_ball'] = True
            team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
        else:
            team_ball_control.append(team_ball_control[-1] if team_ball_control else 0)
        
        ticker_history.append(ticker.update(tracks, frame_num))
        commentary_engine.update_with_context(frame, tracks, frame_num, events_df)
        cogvlm_history.append(commentary_engine.latest_commentary)
        
        if frame_num % 100 == 0:
            print(f"Commentary progress: {frame_num}/{len(frames)} frames")

    team_ball_control = np.array(team_ball_control)

    # --- STAGE 6: VISUALIZATION & SAVING ---
    print("Stage 6: Combining commentary and saving final video...")
    display_commentary = ticker_history.copy()
    last_cogvlm_comment = cogvlm_history[0]
    for i, comment in enumerate(cogvlm_history):
        if comment != last_cogvlm_comment:
            start_frame = max(0, i - commentary_engine.clip_length_frames)
            for j in range(start_frame, i):
                if j < len(display_commentary):
                    display_commentary[j] = comment
            last_cogvlm_comment = comment

    output_frames = []
    for frame_num, frame in enumerate(frames):
        frame_copy = frame.copy()
        current_commentary = display_commentary[frame_num] if frame_num < len(display_commentary) else " "
        
        player_dict = tracks["players"][frame_num]
        ball_dict = tracks.get("ball", [])[frame_num]
        
        for track_id, player in player_dict.items():
            color = player.get("team_color", (0, 0, 255))
            frame_copy = tracker._draw_player_ellipse(frame_copy, player["bbox"], color, track_id, player.get("jersey_number"))
            if player.get('has_ball', False):
                frame_copy = tracker._draw_triangle(frame_copy, player["bbox"], (0, 0, 255))
        
        if 1 in ball_dict:
            frame_copy = tracker._draw_triangle(frame_copy, ball_dict[1]["bbox"], (0, 255, 0))
        
        frame_copy = tracker._draw_team_ball_control(frame_copy, frame_num, team_ball_control)
        frame_copy = tracker._draw_commentary_overlay(frame_copy, current_commentary)
        output_frames.append(frame_copy)
    
    output_frames = speed_estimator.draw_speed_and_distance(output_frames, tracks)
    save_video(output_frames, OUTPUT_VIDEO_PATH)

    # --- FINAL STATISTICS ---
    print("\n" + "="*50)
    print("MATCH ANALYSIS COMPLETE")
    print("="*50)
    print(f"✅ Video saved to: {OUTPUT_VIDEO_PATH}")

if __name__ == "__main__":
    main()


✅ Jersey OCR module initialized.
🎙️ Initializing CogVLM2 Commentary Engine...


util.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/THUDM/cogvlm2-llama3-caption:
- util.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


visual.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/THUDM/cogvlm2-llama3-caption:
- visual.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.
A new version of the following files was downloaded from https://huggingface.co/THUDM/cogvlm2-llama3-caption:
- util.py
- visual.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


⚠️ Could not initialize CogVLM2 model: No module named 'torchvision.transforms.functional_tensor'
Stage 1: Performing object detection and tracking...
Processing frame 0/268

0: 384x640 21 persons, 62.3ms
Speed: 1.9ms preprocess, 62.3ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 23 persons, 1 airplane, 36.7ms
Speed: 1.7ms preprocess, 36.7ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 23 persons, 1 airplane, 30.0ms
Speed: 1.6ms preprocess, 30.0ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 23 persons, 1 airplane, 29.7ms
Speed: 1.7ms preprocess, 29.7ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 24 persons, 1 airplane, 30.0ms
Speed: 1.5ms preprocess, 30.0ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 20 persons, 1 airplane, 30.7ms
Speed: 1.6ms preprocess, 30.7ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640

# CogVLM2-Llama3-Caption version 2

In [14]:
# Keep the runtime's torch/torchvision to avoid breakage; install everything else.
!pip -q install --upgrade ultralytics supervision easyocr numpy opencv-python scikit-learn pandas mplsoccer transformers accelerate huggingface_hub


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.1/62.1 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.0/67.0 MB[0m [31m25.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.8/16.8 MB[0m [31m71.0 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m82.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m80.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the p

In [15]:
# Shim for older torchvision import expected by CogVLM2 (functional_tensor)
import types, sys
try:
    import torchvision.transforms.functional as F
    ft = types.ModuleType("torchvision.transforms.functional_tensor")
    for name in ["_is_tensor_image","to_pil_image","to_tensor","normalize"]:
        if hasattr(F, name):
            setattr(ft, name, getattr(F, name))
    sys.modules["torchvision.transforms.functional_tensor"] = ft
except Exception:
    pass

# Optional: pin the HF revision to avoid repo code changing mid-run.
HF_MODEL_ID = "THUDM/cogvlm2-llama3-caption"
HF_REV = None  # put a specific commit hash or tag string here to pin; keep None to use latest


In [16]:
import sys
import os
import cv2
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import time

# Machine Learning & Computer Vision Libraries
from ultralytics import YOLO
import supervision as sv
from sklearn.cluster import KMeans
import easyocr

# CogVLM2 for AI Commentary (replacing Gemini)
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
from PIL import Image

# Plotting for Heatmaps
from mplsoccer import Pitch

# --- Video Utilities ---
def read_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

def save_video(output_video_frames, output_video_path):
    if not output_video_frames:
        print("No frames to save.")
        return
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, 24, (output_video_frames[0].shape[1], output_video_frames[0].shape[0]))
    for frame in output_video_frames:
        out.write(frame)
    out.release()

# --- BBox Utilities ---
def get_center_of_bbox(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def get_bbox_width(bbox):
    return int(bbox[2] - bbox[0])

def get_foot_position(bbox):
    x1, y1, x2, y2 = bbox
    return int((x1 + x2) / 2), int(y2)

def measure_distance(p1, p2):
    return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5

def measure_xy_distance(p1, p2):
    return p1[0] - p2[0], p1[1] - p2[1]


In [17]:
class ImprovedCommentaryEngine:
    def __init__(self, clip_duration_seconds=5, fps=24, keyframes=3, max_words=25):
        self.clip_length_frames = int(clip_duration_seconds * fps)
        self.frame_buffer = deque(maxlen=self.clip_length_frames)
        self.latest_commentary = "Match analysis is starting..."
        self.fps = fps
        self.keyframes = max(1, min(keyframes, 5))
        self.max_words = max(6, max_words)

        self.match_context = {
            'possession_changes': [], 'recent_events': [],
            'ball_position_history': [], 'player_movements': []
        }

        print("🎙️ Initializing CogVLM2 Commentary Engine...")
        self.model, self.tokenizer = None, None
        try:
            model_kwargs = dict(trust_remote_code=True)
            if HF_REV is not None:
                model_kwargs["revision"] = HF_REV

            self.device = "cuda" if torch.cuda.is_available() else "cpu"

            self.tokenizer = AutoTokenizer.from_pretrained(HF_MODEL_ID, **model_kwargs)
            self.model = AutoModelForCausalLM.from_pretrained(
                HF_MODEL_ID,
                torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
                device_map="auto" if self.device == "cuda" else None,
                **model_kwargs
            )
            if self.device == "cpu":
                self.model.to(self.device)

            self._compiled = False
            print(f"✅ CogVLM2-Llama3-Caption loaded on {self.device}.")
        except Exception as e:
            print(f"⚠️ Could not initialize CogVLM2 model: {e}")

    def update_with_context(self, frame, tracks_data, frame_num, events_data=None):
        if not self.model:
            return
        game_context = self._extract_game_context(tracks_data, frame_num, events_data)
        self.match_context['recent_events'].append(game_context)
        if len(self.match_context['recent_events']) > 10:
            self.match_context['recent_events'].pop(0)

        self.frame_buffer.append(frame)

        if len(self.frame_buffer) == self.clip_length_frames:
            print("Generating tactical summary...")
            try:
                new_comment = self._generate_contextual_commentary(game_context)
            except Exception as e:
                print(f"Commentary generation error: {e}")
                new_comment = self._generate_fallback_commentary(game_context)
            if new_comment:
                self.latest_commentary = new_comment
            self.frame_buffer.clear()

    def _extract_game_context(self, tracks_data, frame_num, events_data):
        players_seq = tracks_data.get('players', [])
        ball_seq = tracks_data.get('ball', [])

        players_at_f = players_seq[frame_num] if frame_num < len(players_seq) else {}
        ball_at_f = ball_seq[frame_num] if frame_num < len(ball_seq) else {}

        total_sec = frame_num / self.fps
        minutes, seconds = int(total_sec // 60), int(total_sec % 60)

        possession = None
        if isinstance(players_at_f, dict):
            for pid, info in players_at_f.items():
                if info.get('has_ball', False):
                    possession = f"Player {pid} (Team {info.get('team', 'Unknown')})"
                    break

        # robust ball presence check
        ball_detected = False
        if isinstance(ball_at_f, dict):
            ball_detected = bool(ball_at_f) or bool(ball_at_f.get('visible', False))
        elif isinstance(ball_at_f, (list, tuple, set)):
            ball_detected = len(ball_at_f) > 0

        recent_records = []
        if events_data is not None and hasattr(events_data, "empty") and not events_data.empty:
            ev_secs = events_data['minute'] * 60 + events_data['second']
            lower = max(0, total_sec - 12.0)
            recent = events_data[ev_secs >= lower].tail(4)
            recent_records = recent.to_dict('records')

        return {
            'frame_num': frame_num,
            'timestamp': f"{minutes}:{seconds:02d}",
            'players_detected': len(players_at_f) if isinstance(players_at_f, dict) else 0,
            'ball_detected': ball_detected,
            'possession': possession,
            'ball_speed': ball_at_f.get('speed', 0) if isinstance(ball_at_f, dict) else 0,
            'recent_events': recent_records
        }

    def _generate_contextual_commentary(self, game_context):
        if self.model is None or self.tokenizer is None or len(self.frame_buffer) == 0:
            return self._generate_fallback_commentary(game_context)

        images = self._sample_keyframes(self.frame_buffer, self.keyframes)
        pil_images = [self._to_pil(img) for img in images]

        prompt = self._create_detailed_prompt(game_context)

        if self.device == "cuda" and not self._compiled:
            try:
                self.model = torch.compile(self.model)
                self._compiled = True
            except Exception:
                pass

        with torch.inference_mode():
            convo = self.model.build_conversation_input_ids(
                self.tokenizer,
                query=prompt,
                images=pil_images,
                template_version='chat'
            )
            inputs = {
                'input_ids': convo['input_ids'].unsqueeze(0).to(self.device),
                'token_type_ids': convo['token_type_ids'].unsqueeze(0).to(self.device),
                'attention_mask': convo['attention_mask'].unsqueeze(0).to(self.device),
                'images': [[x.to(self.device).to(self.model.dtype) for x in convo['images']]]
            }
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=64,
                do_sample=False,
                temperature=0.0,
                top_p=1.0,
                repetition_penalty=1.1,
                pad_token_id=self.tokenizer.eos_token_id,
                eos_token_id=self.tokenizer.eos_token_id
            )

        raw = self.tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
        return self._postprocess_caption(raw)

    def _create_detailed_prompt(self, context):
        events = self._format_recent_events(context.get('recent_events', []))
        return (
            "You are a professional football (soccer) tactical analyst.\n"
            "TASK: Describe the most significant on-ball action visible across these frames.\n"
            f"- Time: {context['timestamp']}\n"
            f"- Possession: {context.get('possession', 'Unclear')}\n"
            f"- Recent: {events}\n"
            "STYLE: factual, objective, single sentence, ≤25 words, no exclamations.\n"
            "FORMAT: <subject> <action> <outcome/intent>. Examples:\n"
            "- The red winger receives a diagonal pass, drives inside past one defender, and squares toward the penalty spot.\n"
            "- The blue fullback overlaps and delivers a low cross that is intercepted near the near post.\n"
            "Now write the single sentence:"
        )

    def _format_recent_events(self, events):
        if not events: return "None"
        out = []
        for e in events[-3:]:
            if isinstance(e, dict):
                et = e.get('type_name', 'Event')
                tm = e.get('team_name', 'Team')
                out.append(f"{et} – {tm}")
        return "; ".join(out) if out else "None"

    def _sample_keyframes(self, buffer, k):
        n = len(buffer)
        if k >= n: return list(buffer)
        step = n / float(k + 1)
        idxs = [int((i + 1) * step) - 1 for i in range(k)]
        idxs = [min(max(0, idx), n - 1) for idx in idxs]
        return [buffer[i] for i in idxs]

    def _to_pil(self, frame):
        if frame is None:
            return Image.new("RGB", (224, 224), color=(0, 0, 0))
        if len(frame.shape) == 3 and frame.shape[2] == 3:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        else:
            frame_rgb = frame
        return Image.fromarray(frame_rgb)

    def _postprocess_caption(self, text: str) -> str:
        s = " ".join(text.strip().split())
        # keep only first sentence
        if "." in s:
            s = s.split(".")[0].strip() + "."
        # enforce word cap
        words = s.split()
        if len(words) > self.max_words:
            s = " ".join(words[:self.max_words]).rstrip(",") + "."
        # remove mild hype words
        ban = {"incredible", "amazing", "unbelievable", "spectacular", "fantastic"}
        s = " ".join([w for w in s.split() if w.lower() not in ban])
        return s

    def _generate_fallback_commentary(self, context):
        if context.get('possession'):
            return f"{context['possession']} carries possession and progresses play."
        return "Possession unclear; play develops centrally with short passing."

class RealTimeTicker:
    """Debounced live ticker to reduce flicker."""
    def __init__(self, fps=24, hold_seconds=2.0):
        self.fps = fps
        self.last_player_id = -1
        self.last_team_id = -1
        self.ticker_text = "Match begins!"
        self.text_display_frames = 0
        self.hold_frames = max(1, int(hold_seconds * fps))

    def _get_ball_carrier(self, player_track):
        if not isinstance(player_track, dict): return -1, -1
        for player_id, data in player_track.items():
            if data.get('has_ball', False):
                return player_id, data.get('team', -1)
        return -1, -1

    def update(self, tracks, frame_num):
        if self.text_display_frames > 0:
            self.text_display_frames -= 1
            return self.ticker_text

        player_track = tracks['players'][frame_num]
        current_player_id, current_team_id = self._get_ball_carrier(player_track)

        if (current_player_id != -1 and self.last_player_id != -1 and
            current_player_id != self.last_player_id and current_team_id == self.last_team_id):
            self.ticker_text = f"Pass from Player {self.last_player_id} to Player {current_player_id}."
            self.text_display_frames = self.hold_frames

        elif current_player_id != -1 and self.last_team_id != -1 and current_team_id != self.last_team_id:
            self.ticker_text = f"Team {current_team_id} gains possession."
            self.text_display_frames = self.hold_frames

        else:
            if current_player_id != -1:
                self.ticker_text = f"Player {current_player_id} (Team {current_team_id}) on the ball."
                self.text_display_frames = int(self.hold_frames * 0.5)
            else:
                self.ticker_text = "Ball is loose."
                self.text_display_frames = int(self.hold_frames * 0.5)

        self.last_player_id = current_player_id if current_player_id != -1 else -1
        if current_player_id != -1:
            self.last_team_id = current_team_id
        return self.ticker_text


In [18]:
class JerseyNumberRecognizer:
    def __init__(self):
        self.reader = easyocr.Reader(['en'], gpu=True)
        self.jersey_cache = {}
        print("✅ Jersey OCR module initialized.")

    def recognize_jersey_number(self, player_crop, tracker_id):
        if tracker_id in self.jersey_cache: return self.jersey_cache[tracker_id]
        if player_crop.size == 0: return None

        crop_gray = cv2.cvtColor(player_crop, cv2.COLOR_BGR2GRAY)
        results = self.reader.readtext(crop_gray, allowlist='0123456789', detail=1)

        best_result = None
        for (bbox, text, prob) in results:
            if prob > 0.6 and text.isdigit() and len(text) <= 2:
                if best_result is None or prob > best_result[2]:
                    best_result = (bbox, text, prob)

        if best_result:
            self.jersey_cache[tracker_id] = best_result[1]
            return best_result[1]
        return None

class Tracker:
    def __init__(self, model_name='yolov8x.pt'):
        self.model = YOLO(model_name)
        self.tracker = sv.ByteTrack()
        self.jersey_recognizer = JerseyNumberRecognizer()

    def get_object_tracks(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)

        tracks = {"players": [], "referees": [], "ball": []}

        for frame_num, frame in enumerate(frames):
            if frame_num % 20 == 0:
                print(f"Processing frame {frame_num}/{len(frames)}")

            # Restrict to person (0) and sports ball (32) to avoid "airplane/racket"
            results = self.model.predict(
                frame, conf=0.35, iou=0.5, classes=[0, 32], verbose=False
            )[0]

            detections = sv.Detections.from_ultralytics(results)

            # Players
            player_detections = detections[detections.class_id == 0]
            tracked_players = self.tracker.update_with_detections(player_detections)

            tracks["players"].append({})
            tracks["referees"].append({})

            for det in tracked_players:
                bbox = det[0]
                track_id = det[4]
                player_crop = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
                jersey_num = self.jersey_recognizer.recognize_jersey_number(player_crop, track_id)
                tracks["players"][frame_num][track_id] = {"bbox": bbox.tolist(), "jersey_number": jersey_num}

            # Ball (keep the most confident one if multiple)
            ball_detections = detections[detections.class_id == 32]
            tracks["ball"].append({})
            if len(ball_detections) > 0:
                # pick highest confidence
                idx = int(np.argmax(ball_detections.confidence))
                tracks["ball"][frame_num][1] = {"bbox": ball_detections.xyxy[idx].tolist()}

        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(tracks, f)
        return tracks

    def add_position_to_tracks(self, tracks):
        for typ, obj_tracks in tracks.items():
            for frame_num, track in enumerate(obj_tracks):
                for id, info in track.items():
                    bbox = info['bbox']
                    info['position'] = get_foot_position(bbox) if typ != 'ball' else get_center_of_bbox(bbox)

    def interpolate_ball_positions(self, ball_positions):
        ball_bboxes = [x.get(1, {}).get('bbox', []) for x in ball_positions]
        df = pd.DataFrame(ball_bboxes, columns=['x1','y1','x2','y2']).interpolate().bfill()
        return [{1: {"bbox": x}} for x in df.to_numpy().tolist()]

    def _draw_player_ellipse(self, frame, bbox, color, track_id, jersey_num):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)
        cv2.ellipse(frame, center=(x_center, y2), axes=(int(width), int(0.35 * width)), angle=0.0,
                    startAngle=-45, endAngle=235, color=color, thickness=2, lineType=cv2.LINE_4)
        label = f"#{jersey_num}" if jersey_num else str(track_id)
        (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        rect_w, rect_h = w + 10, h + 10
        x1_rect, y1_rect = x_center - rect_w//2, (y2 - rect_h//2) + 15
        cv2.rectangle(frame, (x1_rect, y1_rect), (x1_rect + rect_w, y1_rect + rect_h), color, cv2.FILLED)
        cv2.putText(frame, label, (x1_rect + 5, y1_rect + h + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
        return frame

    def _draw_triangle(self, frame, bbox, color):
        y, x = int(bbox[1]), int(get_center_of_bbox(bbox)[0])
        points = np.array([[x, y], [x - 10, y - 20], [x + 10, y - 20]])
        cv2.drawContours(frame, [points], 0, color, cv2.FILLED)
        cv2.drawContours(frame, [points], 0, (0, 0, 0), 2)
        return frame

    def _draw_team_ball_control(self, frame, frame_num, team_ball_control):
        overlay = frame.copy()
        cv2.rectangle(overlay, (10, 10), (350, 70), (255, 255, 255), -1)
        cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
        team_1_frames = np.sum(team_ball_control[:frame_num + 1] == 1)
        team_2_frames = np.sum(team_ball_control[:frame_num + 1] == 2)
        total = max(1, team_1_frames + team_2_frames)
        p1 = (team_1_frames / total) * 100
        p2 = (team_2_frames / total) * 100
        cv2.putText(frame, f"Team 1 Possession: {p1:.1f}%", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        cv2.putText(frame, f"Team 2 Possession: {p2:.1f}%", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 2)
        return frame

    def _draw_commentary_overlay(self, frame, text):
        h, w, _ = frame.shape
        font = cv2.FONT_HERSHEY_SIMPLEX
        thickness = 2
        font_scale = 1.0
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        target_w = w * 0.9
        if text_w > target_w:
            font_scale = max(0.5, target_w / text_w)
        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
        banner_h = text_h + 20
        overlay = frame.copy()
        cv2.rectangle(overlay, (0, h - banner_h), (w, h), (0, 0, 0), -1)
        cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame)
        text_x = (w - text_w) // 2
        text_y = h - 10
        cv2.putText(frame, text, (text_x, text_y), font, font_scale, (255, 255, 255), thickness)
        return frame

class EventDetector:
    def __init__(self):
        self.shot_speed_threshold_mps = 15
        self.frame_rate = 24

    def detect_events(self, tracks):
        player_assigner = PlayerBallAssigner()
        ball_possession_log = []
        for frame_num in range(len(tracks['players'])):
            player_track = tracks['players'][frame_num]
            ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')
            assigned_player_id = player_assigner.assign_ball_to_player(player_track, ball_bbox) if ball_bbox else -1
            ball_possession_log.append(assigned_player_id)

        events = []
        last_player_with_ball, pass_start_info = -1, {}
        for frame_num, current_player_id in enumerate(ball_possession_log):
            ball_pos_transformed = tracks['ball'][frame_num].get(1, {}).get('position_transformed')
            if not ball_pos_transformed: continue

            is_valid_pass = (current_player_id != last_player_with_ball and last_player_with_ball != -1 and current_player_id != -1)
            if is_valid_pass:
                start_player_team = tracks['players'][pass_start_info['frame']][last_player_with_ball].get('team')
                end_player_team = tracks['players'][frame_num].get(current_player_id, {}).get('team')
                if start_player_team == end_player_team and start_player_team is not None:
                    events.append({
                        "type_name": "Pass", "player_name": f"Player_{last_player_with_ball}",
                        "team_name": f"Team {start_player_team}", "x": pass_start_info['position'][0],
                        "y": pass_start_info['position'][1], "end_x": ball_pos_transformed[0],
                        "end_y": ball_pos_transformed[1], "minute": int(frame_num / (self.frame_rate * 60)),
                        "second": int((frame_num / self.frame_rate) % 60)
                    })

            if current_player_id != -1:
                pass_start_info = {'frame': frame_num, 'position': ball_pos_transformed}
                last_player_with_ball = current_player_id

        return pd.DataFrame(events)

# TeamAssigner, PlayerBallAssigner, CameraMovementEstimator, ViewTransformer, SpeedAndDistanceEstimator (unchanged from your code)
class TeamAssigner:
    def __init__(self):
        self.team_colors, self.player_team_dict, self.kmeans = {}, {}, None
    def get_player_color(self, frame, bbox):
        image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
        if image.size == 0: return np.array([0,0,0])
        top_half = image[0:int(image.shape[0] / 2), :]
        if top_half.size == 0: return np.array([0,0,0])
        kmeans = KMeans(n_clusters=2, init="k-means++", n_init=1, random_state=0).fit(top_half.reshape(-1, 3))
        labels = kmeans.labels_.reshape(top_half.shape[0], top_half.shape[1])
        corner_clusters = [labels[0, 0], labels[0, -1], labels[-1, 0], labels[-1, -1]]
        non_player_cluster = max(set(corner_clusters), key=corner_clusters.count)
        return kmeans.cluster_centers_[1 - non_player_cluster]
    def assign_team_color(self, frame, player_detections):
        if not player_detections: return
        colors = [self.get_player_color(frame, det["bbox"]) for _, det in player_detections.items()]
        self.kmeans = KMeans(n_clusters=2, init="k-means++", n_init=10, random_state=0).fit(colors)
        self.team_colors[1], self.team_colors[2] = self.kmeans.cluster_centers_
    def get_player_team(self, frame, bbox, player_id):
        if player_id in self.player_team_dict: return self.player_team_dict[player_id]
        if self.kmeans is None: return 0
        color = self.get_player_color(frame, bbox)
        team_id = self.kmeans.predict(color.reshape(1, -1))[0] + 1
        self.player_team_dict[player_id] = team_id
        return team_id

class PlayerBallAssigner:
    def __init__(self): self.max_dist = 70
    def assign_ball_to_player(self, players, ball_bbox):
        if not ball_bbox: return -1
        ball_pos, min_dist, assigned_player = get_center_of_bbox(ball_bbox), float('inf'), -1
        for id, player in players.items():
            dist = measure_distance(get_foot_position(player['bbox']), ball_pos)
            if dist < self.max_dist and dist < min_dist: min_dist, assigned_player = dist, id
        return assigned_player

class CameraMovementEstimator:
    def __init__(self, frame):
        self.lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
        self.features = dict(maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7)
    def get_camera_movement(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f: return pickle.load(f)
        movements = [[0, 0]] * len(frames)
        old_gray = cv2.cvtColor(frames[0], cv2.COLOR_BGR2GRAY)
        old_features = cv2.goodFeaturesToTrack(old_gray, **self.features)
        for i in range(1, len(frames)):
            new_gray = cv2.cvtColor(frames[i], cv2.COLOR_BGR2GRAY)
            new_features, status, _ = cv2.calcOpticalFlowPyrLK(old_gray, new_gray, old_features, None, **self.lk_params)
            good_new = new_features[status==1]
            good_old = old_features[status==1]
            move_x, move_y = 0, 0
            if len(good_new) > 0:
                move_x, move_y = np.mean(good_old - good_new, axis=0).ravel()
            movements[i] = [move_x, move_y]
            old_gray = new_gray.copy()
            old_features = good_new.reshape(-1, 1, 2)
        if stub_path:
            with open(stub_path, 'wb') as f: pickle.dump(movements, f)
        return movements
    def add_adjust_positions_to_tracks(self, tracks, movements):
        for typ, obj_tracks in tracks.items():
            for i, track in enumerate(obj_tracks):
                for id, info in track.items():
                    info['position_adjusted'] = (info['position'][0] + movements[i][0], info['position'][1] + movements[i][1])

class ViewTransformer:
    def __init__(self):
        court_w, court_l = 34, 52.5
        self.pixel_verts = np.float32([[110, 1035], [265, 275], [910, 260], [1640, 915]])
        self.target_verts = np.float32([[0, court_w], [0, 0], [court_l, 0], [court_l, court_w]])
        self.transformer = cv2.getPerspectiveTransform(self.pixel_verts, self.target_verts)
    def transform_point(self, point):
        p = (int(point[0]), int(point[1]))
        is_inside = cv2.pointPolygonTest(self.pixel_verts, p, False) >= 0
        if not is_inside: return None
        reshaped = np.array(point).reshape(-1, 1, 2).astype(np.float32)
        transformed = cv2.perspectiveTransform(reshaped, self.transformer)
        return transformed.reshape(-1, 2)
    def add_transformed_position_to_tracks(self, tracks):
        for typ, obj_tracks in tracks.items():
            for track in obj_tracks:
                for id, info in track.items():
                    pos = info.get('position_adjusted', info.get('position'))
                    if pos:
                        transformed = self.transform_point(pos)
                        info['position_transformed'] = transformed.squeeze().tolist() if transformed is not None else None

class SpeedAndDistanceEstimator:
    def __init__(self):
        self.frame_window, self.frame_rate = 24, 24
    def add_speed_and_distance_to_tracks(self, tracks):
        total_dist = {}
        for typ, obj_tracks in tracks.items():
            if typ not in ["players", "referees"]: continue
            for i in range(len(obj_tracks)):
                for id, info in obj_tracks[i].items():
                    if i > 0:
                        prev_info = tracks[typ][i-1].get(id)
                        if prev_info and info.get('position_transformed') and prev_info.get('position_transformed'):
                            dist = measure_distance(info['position_transformed'], prev_info['position_transformed'])
                            total_dist[id] = total_dist.get(id, 0) + dist
                            speed_mps = dist * self.frame_rate
                            info['speed'] = speed_mps * 3.6
                            info['distance'] = total_dist[id]
    def draw_speed_and_distance(self, frames, tracks):
        output_frames = []
        for i, frame in enumerate(frames):
            for typ, obj_tracks in tracks.items():
                if typ not in ["players", "referees"]: continue
                for id, info in obj_tracks[i].items():
                    if "speed" in info:
                        x, y = get_foot_position(info['bbox'])
                        cv2.putText(frame, f"{info['speed']:.1f} km/h", (x - 20, y + 20),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 2)
            output_frames.append(frame)
        return output_frames


In [19]:
def main():
    # --- SETUP ---
    INPUT_VIDEO_PATH = "/kaggle/input/football-video2/CityUtdR.mp4"
    STUB_PATH = "/kaggle/working/tracks_stub.pkl"
    OUTPUT_VIDEO_PATH = "/kaggle/working/final_analysis_video-Llama-v2.mp4"

    frames = read_video(INPUT_VIDEO_PATH)
    if not frames:
        print("Video file not found or could not be read. Check the path.")
        return None

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    fps = cap.get(cv2.CAP_PROP_FPS) or 24
    cap.release()

    # --- INITIALIZE ALL MODULES ---
    tracker = Tracker('yolov8x.pt')
    commentary_engine = ImprovedCommentaryEngine(fps=fps, keyframes=3, max_words=25)
    camera_estimator = CameraMovementEstimator(frames[0])
    view_transformer = ViewTransformer()
    speed_estimator = SpeedAndDistanceEstimator()
    team_assigner = TeamAssigner()
    player_assigner = PlayerBallAssigner()
    ticker = RealTimeTicker(fps=fps)

    # --- STAGE 1: TRACKING ---
    print("Stage 1: Performing object detection and tracking...")
    tracks = tracker.get_object_tracks(frames, read_from_stub=False, stub_path=STUB_PATH)
    tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])
    tracker.add_position_to_tracks(tracks)

    # --- STAGE 2: MOTION & PERSPECTIVE ---
    print("Stage 2: Estimating camera motion and transforming perspective...")
    camera_movement = camera_estimator.get_camera_movement(frames)
    camera_estimator.add_adjust_positions_to_tracks(tracks, camera_movement)
    view_transformer.add_transformed_position_to_tracks(tracks)
    speed_estimator.add_speed_and_distance_to_tracks(tracks)

    # --- STAGE 3: TEAM ASSIGNMENT ---
    print("Stage 3: Assigning teams...")
    team_assigner.assign_team_color(frames[0], tracks['players'][0])
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        for player_id, track in player_track.items():
            team = team_assigner.get_player_team(frame, track['bbox'], player_id)
            tracks['players'][frame_num][player_id]['team'] = team
            tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors.get(team, (0,0,255))

    # --- STAGE 4: GENERATE EVENTS DATA ---
    print("Stage 4: Detecting events for commentary context...")
    event_detector = EventDetector()
    events_df = event_detector.detect_events(tracks)
    print(f"Detected {len(events_df)} events for commentary context")

    # --- STAGE 5: BALL POSSESSION & COMMENTARY ---
    print("Stage 5: Tracking ball possession and generating all commentary...")
    team_ball_control = []
    ticker_history = []
    cogvlm_history = []

    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')

        for player_id in player_track:
            tracks['players'][frame_num][player_id]['has_ball'] = False

        assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)
        if assigned_player != -1:
            tracks['players'][frame_num][assigned_player]['has_ball'] = True
            team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
        else:
            team_ball_control.append(team_ball_control[-1] if team_ball_control else 0)

        ticker_history.append(ticker.update(tracks, frame_num))
        commentary_engine.update_with_context(frame, tracks, frame_num, events_df)
        cogvlm_history.append(commentary_engine.latest_commentary)

        if frame_num % 100 == 0:
            print(f"Commentary progress: {frame_num}/{len(frames)} frames")

    team_ball_control = np.array(team_ball_control)

    # --- STAGE 6: VISUALIZATION & SAVING ---
    print("Stage 6: Combining commentary and saving final video...")
    display_commentary = ticker_history.copy()
    last_cog = cogvlm_history[0]
    for i, comment in enumerate(cogvlm_history):
        if comment != last_cog:
            start_frame = max(0, i - commentary_engine.clip_length_frames)
            for j in range(start_frame, i):
                if j < len(display_commentary):
                    display_commentary[j] = comment
            last_cog = comment

    output_frames = []
    for frame_num, frame in enumerate(frames):
        frame_copy = frame.copy()
        current_commentary = display_commentary[frame_num] if frame_num < len(display_commentary) else " "

        player_dict = tracks["players"][frame_num]
        ball_dict = tracks.get("ball", [])[frame_num]

        for track_id, player in player_dict.items():
            color = player.get("team_color", (0, 0, 255))
            frame_copy = tracker._draw_player_ellipse(frame_copy, player["bbox"], color, track_id, player.get("jersey_number"))
            if player.get('has_ball', False):
                frame_copy = tracker._draw_triangle(frame_copy, player["bbox"], (0, 0, 255))

        if 1 in ball_dict:
            frame_copy = tracker._draw_triangle(frame_copy, ball_dict[1]["bbox"], (0, 255, 0))

        frame_copy = tracker._draw_team_ball_control(frame_copy, frame_num, team_ball_control)
        frame_copy = tracker._draw_commentary_overlay(frame_copy, current_commentary)
        output_frames.append(frame_copy)

    output_frames = speed_estimator.draw_speed_and_distance(output_frames, tracks)
    save_video(output_frames, OUTPUT_VIDEO_PATH)

    print("\n" + "="*50)
    print("MATCH ANALYSIS COMPLETE")
    print("="*50)
    print(f"✅ Video saved to: {OUTPUT_VIDEO_PATH}")

if __name__ == "__main__":
    main()


✅ Jersey OCR module initialized.
🎙️ Initializing CogVLM2 Commentary Engine...


model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

model-00002-of-00006.safetensors:   0%|          | 0.00/5.00G [00:00<?, ?B/s]

model-00003-of-00006.safetensors:   0%|          | 0.00/4.92G [00:00<?, ?B/s]

model-00005-of-00006.safetensors:   0%|          | 0.00/4.12G [00:00<?, ?B/s]

model-00004-of-00006.safetensors:   0%|          | 0.00/4.96G [00:00<?, ?B/s]

model-00006-of-00006.safetensors:   0%|          | 0.00/1.05G [00:00<?, ?B/s]

model-00001-of-00006.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/6 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/212 [00:00<?, ?B/s]

✅ CogVLM2-Llama3-Caption loaded on cuda.
Stage 1: Performing object detection and tracking...
Processing frame 0/268
Processing frame 20/268
Processing frame 40/268
Processing frame 60/268
Processing frame 80/268
Processing frame 100/268
Processing frame 120/268
Processing frame 140/268
Processing frame 160/268
Processing frame 180/268
Processing frame 200/268
Processing frame 220/268
Processing frame 240/268
Processing frame 260/268
Stage 2: Estimating camera motion and transforming perspective...
Stage 3: Assigning teams...
Stage 4: Detecting events for commentary context...
Detected 7 events for commentary context
Stage 5: Tracking ball possession and generating all commentary...
Commentary progress: 0/268 frames
Commentary progress: 100/268 frames
Generating tactical summary...
Commentary generation error: not support multi images by now.
Commentary progress: 200/268 frames
Generating tactical summary...
Commentary generation error: not support multi images by now.
Stage 6: Combini

# Qwen2-VL-2B-Instruct

In [20]:
# keep torch/torchvision versions stable on Kaggle; install/upgrade the rest
!pip -q install --upgrade ultralytics supervision easyocr numpy opencv-python scikit-learn pandas mplsoccer transformers accelerate huggingface_hub

In [24]:
import json, re, math
from typing import List, Dict, Any
import torch

import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# ---- preferred models ----
PREFERRED_VLM_ID = "Qwen/Qwen2-VL-2B-Instruct"   # primary
# PREFERRED_VLM_ID = "Qwen/Qwen2-VL-7B-Instruct"
FALLBACK_VLM_ID  = "THUDM/cogvlm2-llama3-caption"  # fallback if Qwen2-VL fails
HF_REV = None  # optional: pin a hash/tag to freeze weights

# small utilities
def clamp_words(s: str, max_words: int = 25) -> str:
    s = " ".join(s.strip().split())
    if "." in s:
        s = s.split(".")[0] + "."
    words = s.split()
    if len(words) > max_words:
        s = " ".join(words[:max_words]).rstrip(",") + "."
    return s

def safe_json_extract(text: str) -> Dict[str, Any]:
    # extract first {...} block
    m = re.search(r"\{.*\}", text, flags=re.S)
    if not m:
        return {}
    try:
        return json.loads(m.group(0))
    except Exception:
        # try to fix trailing commas / quotes
        x = re.sub(r",\s*}", "}", m.group(0))
        x = re.sub(r",\s*]", "]", x)
        try:
            return json.loads(x)
        except Exception:
            return {}

class TacticalCaptioner:
    """
    Analyst-grade captioner:
      - multi-keyframe conditioning
      - few-shot prompt with soccer ontology
      - outputs (a) structured JSON, (b) one-line overlay sentence
      - robust to model availability (Qwen2-VL -> CogVLM2 fallback)
    """
    def __init__(self, fps=24, clip_seconds=5, keyframes=5, max_words=25):
        self.fps = fps
        self.clip_len = max(1, int(clip_seconds * fps))
        self.keyframes = max(2, min(keyframes, 6))
        self.max_words = max_words
        self._buffer = deque(maxlen=self.clip_len)
        self.latest_overlay = "Match analysis is starting..."
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        # try Qwen2-VL first
        self.backend = None
        self.processor = None
        self.model_name = None
        print("🎙️ Initializing Tactical Captioner (Qwen2-VL preferred)...")
        try:
            from transformers import AutoProcessor
            from transformers import Qwen2VLForConditionalGeneration
            kw = dict()
            if HF_REV: kw["revision"] = HF_REV
            self.processor = AutoProcessor.from_pretrained(PREFERRED_VLM_ID, **kw)
            self.backend = Qwen2VLForConditionalGeneration.from_pretrained(
                PREFERRED_VLM_ID,
                torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None,
                **kw
            )
            if self.device == "cpu": self.backend.to(self.device)
            self.model_name = "qwen2-vl"
            print("✅ Using Qwen2-VL-7B-Instruct.")
        except Exception as e_qwen:
            print(f"⚠️ Qwen2-VL unavailable: {e_qwen} — falling back to CogVLM2.")
            # fallback: CogVLM2
            try:
                from transformers import AutoModelForCausalLM, AutoTokenizer
                kw = dict(trust_remote_code=True)
                if HF_REV: kw["revision"] = HF_REV
                self.processor = AutoTokenizer.from_pretrained(FALLBACK_VLM_ID, **kw)
                self.backend = AutoModelForCausalLM.from_pretrained(
                    FALLBACK_VLM_ID,
                    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                    device_map="auto" if torch.cuda.is_available() else None,
                    **kw
                )
                self.model_name = "cogvlm2"
                print("✅ Using CogVLM2-Llama3-Caption.")
            except Exception as e_cog:
                print(f"❌ Could not initialize any VLM: {e_cog}")
                self.backend = None
                self.processor = None

        # few-shot soccer examples (concise)
        self.fewshot_examples = [
            {
              "context": {
                "time": "12:34", "possession": "Player 8 (Team 1)",
                "stats": {"passes_last_10s":2,"recoveries_last_10s":1,"duels_last_10s":1},
                "zones": {"ball_third":"middle","ball_channel":"right half-space"}
              },
              "events": [
                {"type":"recovery","team":"Team 1","loc":[36.2,18.1]},
                {"type":"carry","player":"8","team":"Team 1","meters":7.5},
                {"type":"pass","subtype":"through-ball","from":"8","to":"11","team":"Team 1","start":[36.2,18.1],"end":[44.6,12.2]}
              ],
              "caption":"Player 8 recovers centrally, drives forward, and threads a through ball to Player 11 running beyond the back line."
            },
            {
              "context": {
                "time":"27:05","possession":"Player 3 (Team 2)",
                "stats":{"passes_last_10s":3,"recoveries_last_10s":0,"duels_last_10s":2},
                "zones":{"ball_third":"final","ball_channel":"left wing"}
              },
              "events":[
                {"type":"duel","outcome":"won","team":"Team 2","loc":[50.8,5.3]},
                {"type":"cross","from":"3","target_zone":"six-yard","height":"low"}
              ],
              "caption":"Left back wins a wide duel and delivers a low cross toward the six-yard area."
            }
        ]

    # ---- public API ----
    def push_frame(self, frame):
        self._buffer.append(frame)

    def maybe_caption(self, frame_idx:int, tracks:Dict, events_df:pd.DataFrame):
        if self.backend is None or len(self._buffer) < self._buffer.maxlen:
            return  # wait until clip filled
        # build structured context using tracking + events_df
        context = self._make_context(frame_idx, tracks, events_df)
        images = self._sample_keyframes(list(self._buffer), self.keyframes)
        try:
            if self.model_name == "qwen2-vl":
                result = self._caption_qwen(images, context)
            else:
                result = self._caption_cog(images, context)
        except Exception as e:
            print(f"caption error: {e}")
            result = {"overlay": self._fallback_overlay(context), "json": {}}
        self.latest_overlay = result.get("overlay", self.latest_overlay)
        self._buffer.clear()

    # ---- context assembly ----
    def _make_context(self, frame_idx:int, tracks:Dict, events_df:pd.DataFrame) -> Dict[str,Any]:
        sec = frame_idx / max(1,self.fps)
        mm = int(sec//60); ss = int(sec%60)
        players = tracks["players"][frame_idx]
        # possession
        poss = None
        for pid, info in players.items():
            if info.get("has_ball"): poss = f"Player {pid} (Team {info.get('team','?')})"; break
        # last 10s window events
        recent = []
        stats = {"passes_last_10s":0,"recoveries_last_10s":0,"duels_last_10s":0,"carries_last_10s":0}
        if events_df is not None and not events_df.empty:
            t_cut = sec - 10
            ev_secs = events_df["minute"]*60 + events_df["second"]
            recent_df = events_df[ev_secs >= t_cut].tail(6)
            for _, r in recent_df.iterrows():
                recent.append(r.to_dict())
                et = r.get("type_name","").lower()
                if "pass" in et: stats["passes_last_10s"] += 1
                if "recovery" in et: stats["recoveries_last_10s"] += 1
                if "duel" in et: stats["duels_last_10s"] += 1
                if "carry" in et: stats["carries_last_10s"] += 1

        # crude zone labels from transformed coords if ball exists
        ball = tracks["ball"][frame_idx].get(1, {})
        zones = {"ball_third":"unknown","ball_channel":"central"}
        pt = ball.get("position_transformed")
        if isinstance(pt,(list,tuple)) and len(pt)==2:
            x,y = pt # pitch: length x (0..52.5), width y (0..34)
            thirds = ["defensive","middle","final"]
            zones["ball_third"] = thirds[min(2, max(0, int((x/52.5)*3)))]
            # channels: left wing, left half-space, central, right half-space, right wing
            if y<34*0.2: zones["ball_channel"]="left wing"
            elif y<34*0.4: zones["ball_channel"]="left half-space"
            elif y<34*0.6: zones["ball_channel"]="central"
            elif y<34*0.8: zones["ball_channel"]="right half-space"
            else: zones["ball_channel"]="right wing"

        return {"time": f"{mm}:{ss:02d}","possession": poss or "Unclear","stats":stats,"zones":zones,"recent":recent}

    # ---- model-specific captioning ----
    def _caption_qwen(self, images:List[np.ndarray], ctx:Dict[str,Any]) -> Dict[str,Any]:
        from transformers import AutoProcessor
        # few-shot chat
        msgs = [{"role":"system","content":"You are a professional football (soccer) tactical analyst. Output JSON then a single overlay sentence."}]
        for ex in self.fewshot_examples:
            msgs.append({"role":"user","content":[{"type":"text","text":self._ctx_to_text(ex['context'])}]})
            msgs.append({"role":"assistant","content":[{"type":"text","text":json.dumps({
                "phase":"possession","key_events":ex["events"],"summary":ex["caption"]
            }, ensure_ascii=False)}]})
        # now the actual clip with multi-images
        user_content = [{"type":"text","text":self._ctx_to_text(ctx)}]
        for img in images:
            rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            user_content.insert(0, {"type":"image","image":Image.fromarray(rgb)})
        # ask for strict schema + overlay
        schema = {
            "phase": "possession/transition/press/defense",
            "key_events":[
                {"type":"pass/carry/cross/through-ball/recovery/duel/interception/shot",
                 "team":"Team 1/Team 2","from":"<id?>","to":"<id?>","subtype":"", "meters":"<float?>",
                 "outcome":"won/lost/completed/blocked","loc":[0,0],"end":[0,0]}
            ],
            "summary":"<single sentence ≤25 words>"
        }
        instruction = (
            "Return TWO parts:\n"
            "1) STRICT JSON matching this schema keys (omit unknown fields, keep lowercase types):\n"
            + json.dumps(schema, ensure_ascii=False) +
            "\n2) Then a newline and an overlay sentence for broadcast.\n"
        )
        msgs.append({"role":"user","content": user_content + [{"type":"text","text":instruction}]})
        inputs = self.processor.apply_chat_template(msgs, add_generation_prompt=True, tokenize=False)
        proc = self.processor(text=inputs, images=[x["image"] for x in user_content if x.get("type")=="image"], return_tensors="pt")
        for k in proc:
            proc[k] = proc[k].to(self.backend.device)  # type: ignore

        with torch.inference_mode():
            out = self.backend.generate(**proc, max_new_tokens=220, do_sample=False, temperature=0.0, eos_token_id=self.processor.tokenizer.eos_token_id)
        text = self.processor.batch_decode(out, skip_special_tokens=True)[0]
        data = safe_json_extract(text)
        overlay = clamp_words(data.get("summary",""), self.max_words) if isinstance(data, dict) else None
        if not overlay:
            # fallback: grab last line; then clamp
            overlay = clamp_words(text.strip().splitlines()[-1], self.max_words)
        return {"json": data, "overlay": overlay}

    def _caption_cog(self, images:List[np.ndarray], ctx:Dict[str,Any]) -> Dict[str,Any]:
        # Build a concise chat for CogVLM2 (single prompt with interleaved images)
        prompt = (
            "You are a professional football (soccer) tactical analyst.\n"
            f"TIME: {ctx['time']}\nPOSSESSION: {ctx['possession']}\n"
            f"ZONES: third={ctx['zones']['ball_third']}, channel={ctx['zones']['ball_channel']}\n"
            f"STATS(last 10s): {ctx['stats']}\n"
            "Identify passes, recoveries, duels, carries, crosses, through-balls, interceptions. "
            "First output a compact JSON with keys phase, key_events[], summary; then one sentence for overlay (≤25 words)."
        )
        # CogVLM2 API expects tokenizer + images in build_conversation_input_ids
        convo = self.backend.build_conversation_input_ids(self.processor, query=prompt,
                                                         images=[Image.fromarray(cv2.cvtColor(im, cv2.COLOR_BGR2RGB)) for im in images],
                                                         template_version='chat')
        inputs = {
            'input_ids': convo['input_ids'].unsqueeze(0).to(self.backend.device),
            'token_type_ids': convo['token_type_ids'].unsqueeze(0).to(self.backend.device),
            'attention_mask': convo['attention_mask'].unsqueeze(0).to(self.backend.device),
            'images': [[t.to(self.backend.device).to(self.backend.dtype) for t in convo['images']]]
        }
        with torch.inference_mode():
            out = self.backend.generate(**inputs, max_new_tokens=220, do_sample=False, temperature=0.0,
                                        pad_token_id=self.processor.eos_token_id, eos_token_id=self.processor.eos_token_id)
        text = self.processor.decode(out[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
        data = safe_json_extract(text)
        overlay = clamp_words(data.get("summary",""), self.max_words) if isinstance(data, dict) else clamp_words(text.splitlines()[-1], self.max_words)
        return {"json": data, "overlay": overlay}

    # helpers
    def _ctx_to_text(self, ctx:Dict[str,Any]) -> str:
        return (
            f"Time={ctx['time']}; Possession={ctx['possession']}; "
            f"Zones(third={ctx['zones']['ball_third']}, channel={ctx['zones']['ball_channel']}); "
            f"RecentStats={ctx['stats']}; RecentEvents(up to 3)={[(e.get('type_name'), e.get('team_name')) for e in ctx.get('recent', [])][-3:]}"
        )

    def _sample_keyframes(self, frames:List[np.ndarray], k:int) -> List[np.ndarray]:
        n = len(frames); 
        if k >= n: return frames
        step = n / float(k+1)
        idxs = [min(n-1, max(0, int((i+1)*step)-1)) for i in range(k)]
        return [frames[i] for i in idxs]

    def _fallback_overlay(self, ctx):
        if ctx.get("possession","Unclear") != "Unclear":
            return f"{ctx['possession']} retains possession and progresses play."
        return "Play develops; possession unclear."


In [25]:
class EventDetector:
    """
    Lightweight heuristics to seed the captioner with grounded facts.
    Relies on transformed pitch coords (meters) when available.
    """
    def __init__(self, frame_rate=24):
        self.frame_rate = frame_rate
        self.max_assign_dist_px = 70  # already used upstream

        # thresholds (meters / seconds)
        self.carry_min_m = 4.0
        self.pass_min_end_dist_m = 5.0
        self.throughball_min_dx_m = 8.0   # forward progress
        self.cross_y_band = (52.5*0.66, 52.5)  # into final third (approx box area in x)
        self.wing_band = 34*0.2

    def _dist(self, a, b):
        return math.hypot(a[0]-b[0], a[1]-b[1])

    def detect_events(self, tracks:Dict[str,Any]) -> pd.DataFrame:
        n = len(tracks['players'])
        # 1) ball possession per frame (you already tag has_ball)
        poss = []
        for t in range(n):
            pid = -1
            for p, info in tracks['players'][t].items():
                if info.get("has_ball"): pid = p; break
            poss.append(pid)

        events = []
        last_pid = -1
        last_pos_trans = None
        last_pid_pos_trans = None
        last_possess_frame = 0

        for t in range(n):
            ball_info = tracks['ball'][t].get(1, {})
            bpos = ball_info.get("position_transformed")
            if not bpos: 
                continue
            pid = poss[t]

            # carries: same player keeps ball and moves enough distance
            if pid != -1 and pid == last_pid:
                if last_pid_pos_trans:
                    moved = self._dist(bpos, last_pid_pos_trans)
                    dt = (t - last_possess_frame) / self.frame_rate
                    if moved >= self.carry_min_m and dt >= 0.5:
                        events.append(dict(
                            type_name="Carry", player_name=f"Player_{pid}",
                            team_name=f"Team {tracks['players'][t][pid].get('team', '?')}",
                            x=last_pid_pos_trans[0], y=last_pid_pos_trans[1],
                            end_x=bpos[0], end_y=bpos[1],
                            minute=int(t/(self.frame_rate*60)), second=int((t/self.frame_rate)%60)
                        ))
                        last_possess_frame = t

            # possession changes -> recovery/duel/interception/pass outcome
            if pid != last_pid:
                # recovery if previous frame had no owner (-1) then now someone does
                if last_pid == -1 and pid != -1:
                    events.append(dict(
                        type_name="Recovery", player_name=f"Player_{pid}",
                        team_name=f"Team {tracks['players'][t][pid].get('team','?')}",
                        x=bpos[0], y=bpos[1], end_x=bpos[0], end_y=bpos[1],
                        minute=int(t/(self.frame_rate*60)), second=int((t/self.frame_rate)%60)
                    ))
                # duel if two opponents within ~1.5m of ball around switch
                elif last_pid != -1 and pid != -1:
                    last_team = tracks['players'][t].get(last_pid, {}).get('team')
                    this_team = tracks['players'][t].get(pid, {}).get('team')
                    if last_team is not None and this_team is not None and last_team != this_team:
                        # crude proximity check
                        last_pos = tracks['players'][t].get(last_pid, {}).get('position_transformed')
                        new_pos = tracks['players'][t].get(pid, {}).get('position_transformed')
                        if last_pos and new_pos and (self._dist(last_pos, bpos) < 1.5 or self._dist(new_pos, bpos) < 1.5):
                            events.append(dict(
                                type_name="Duel (won)", player_name=f"Player_{pid}",
                                team_name=f"Team {this_team}", x=bpos[0], y=bpos[1],
                                end_x=bpos[0], end_y=bpos[1],
                                minute=int(t/(self.frame_rate*60)), second=int((t/self.frame_rate)%60)
                            ))

            # passes: owner changes within same team after flight
            if pid != -1 and last_pid != -1 and pid != last_pid:
                team_a = tracks['players'][t].get(pid, {}).get('team')
                team_b = tracks['players'][t].get(last_pid, {}).get('team')
                if team_a is not None and team_b is not None and team_a == team_b and last_pos_trans:
                    travel = self._dist(last_pos_trans, bpos)
                    if travel >= self.pass_min_end_dist_m:
                        subtype = "through-ball" if (bpos[0]-last_pos_trans[0]) > self.throughball_min_dx_m else "pass"
                        # cross: from wing into final third
                        wing = (last_pos_trans[1] < self.wing_band) or (last_pos_trans[1] > 34 - self.wing_band)
                        into_final = bpos[0] >= self.cross_y_band[0]
                        if wing and into_final:
                            subtype = "cross"
                        events.append(dict(
                            type_name="Pass" if subtype=="pass" else subtype.title(),
                            player_name=f"Player_{last_pid}", team_name=f"Team {team_a}",
                            x=last_pos_trans[0], y=last_pos_trans[1],
                            end_x=bpos[0], end_y=bpos[1],
                            minute=int(t/(self.frame_rate*60)), second=int((t/self.frame_rate)%60)
                        ))

            last_pid_pos_trans = bpos if pid != -1 else last_pid_pos_trans
            last_pos_trans = bpos
            if pid != -1:
                last_pid = pid
                last_possess_frame = t

        return pd.DataFrame(events)


In [26]:
# Cell 6: Main Function (Updated for TacticalCaptioner + richer captions)
def main():
    # --- SETUP ---
    INPUT_VIDEO_PATH = "/kaggle/input/football-video2/CityUtdR.mp4"
    STUB_PATH = "/kaggle/working/tracks_stub.pkl"
    OUTPUT_VIDEO_PATH = "/kaggle/working/final_analysis_video-qwen.mp4"

    frames = read_video(INPUT_VIDEO_PATH)
    if not frames:
        print("Video file not found or could not be read. Check the path.")
        return None

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    fps = cap.get(cv2.CAP_PROP_FPS) or 24
    cap.release()

    # --- INITIALIZE ALL MODULES ---
    tracker = Tracker('yolov8x.pt')
    captioner = TacticalCaptioner(fps=fps, clip_seconds=5, keyframes=5, max_words=25)
    camera_estimator = CameraMovementEstimator(frames[0])
    view_transformer = ViewTransformer()
    speed_estimator = SpeedAndDistanceEstimator()
    team_assigner = TeamAssigner()
    player_assigner = PlayerBallAssigner()
    ticker = RealTimeTicker(fps=fps)

    # --- STAGE 1: TRACKING ---
    print("Stage 1: Performing object detection and tracking...")
    tracks = tracker.get_object_tracks(frames, read_from_stub=False, stub_path=STUB_PATH)
    tracks["ball"] = tracker.interpolate_ball_positions(tracks["ball"])
    tracker.add_position_to_tracks(tracks)

    # --- STAGE 2: MOTION & PERSPECTIVE ---
    print("Stage 2: Estimating camera motion and transforming perspective...")
    camera_movement = camera_estimator.get_camera_movement(frames)
    camera_estimator.add_adjust_positions_to_tracks(tracks, camera_movement)
    view_transformer.add_transformed_position_to_tracks(tracks)
    speed_estimator.add_speed_and_distance_to_tracks(tracks)

    # --- STAGE 3: TEAM ASSIGNMENT ---
    print("Stage 3: Assigning teams...")
    team_assigner.assign_team_color(frames[0], tracks['players'][0])
    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        for player_id, track in player_track.items():
            team = team_assigner.get_player_team(frame, track['bbox'], player_id)
            tracks['players'][frame_num][player_id]['team'] = team
            tracks['players'][frame_num][player_id]['team_color'] = team_assigner.team_colors.get(team, (0,0,255))

    # --- STAGE 4: GENERATE EVENTS DATA ---
    print("Stage 4: Detecting events for commentary context...")
    event_detector = EventDetector(frame_rate=fps)
    events_df = event_detector.detect_events(tracks)
    print(f"Detected {len(events_df)} events for commentary context")

    # --- STAGE 5: BALL POSSESSION, TICKER & TACTICAL CAPTIONS ---
    print("Stage 5: Tracking ball possession and generating all commentary...")
    team_ball_control = []
    ticker_history = []
    overlay_history = []      # one-line sentences from TacticalCaptioner
    # (optional) collect machine-readable JSON per clip if you want to save later
    # caption_json_history = []

    for frame_num, frame in enumerate(frames):
        player_track = tracks['players'][frame_num]
        ball_bbox = tracks['ball'][frame_num].get(1, {}).get('bbox')

        # reset has_ball flags
        for pid in player_track:
            tracks['players'][frame_num][pid]['has_ball'] = False

        # assign ball to nearest player
        assigned_player = player_assigner.assign_ball_to_player(player_track, ball_bbox)
        if assigned_player != -1:
            tracks['players'][frame_num][assigned_player]['has_ball'] = True
            team_ball_control.append(tracks['players'][frame_num][assigned_player]['team'])
        else:
            team_ball_control.append(team_ball_control[-1] if team_ball_control else 0)

        # live ticker
        ticker_history.append(ticker.update(tracks, frame_num))

        # push frame to the tactical captioner and let it emit overlay when clip window fills
        captioner.push_frame(frame)
        captioner.maybe_caption(frame_num, tracks, events_df)
        overlay_history.append(captioner.latest_overlay)

        if frame_num % 100 == 0:
            print(f"Commentary progress: {frame_num}/{len(frames)} frames")

    team_ball_control = np.array(team_ball_control)

    # --- STAGE 6: VISUALIZATION & SAVING ---
    print("Stage 6: Combining commentary and saving final video...")
    # start from ticker, then backfill segments with tactical overlays when they update
    display_commentary = ticker_history.copy()
    last_overlay = overlay_history[0] if overlay_history else " "
    for i, overlay in enumerate(overlay_history):
        if overlay != last_overlay:
            start_frame = max(0, i - captioner.clip_len)  # spread overlay over the last clip span
            for j in range(start_frame, i):
                if j < len(display_commentary):
                    display_commentary[j] = overlay
            last_overlay = overlay

    output_frames = []
    for frame_num, frame in enumerate(frames):
        frame_copy = frame.copy()
        current_commentary = display_commentary[frame_num] if frame_num < len(display_commentary) else " "

        player_dict = tracks["players"][frame_num]
        ball_dict = tracks.get("ball", [])[frame_num]

        for track_id, player in player_dict.items():
            color = player.get("team_color", (0, 0, 255))
            frame_copy = tracker._draw_player_ellipse(frame_copy, player["bbox"], color, track_id, player.get("jersey_number"))
            if player.get('has_ball', False):
                frame_copy = tracker._draw_triangle(frame_copy, player["bbox"], (0, 0, 255))

        if 1 in ball_dict:
            frame_copy = tracker._draw_triangle(frame_copy, ball_dict[1]["bbox"], (0, 255, 0))

        frame_copy = tracker._draw_team_ball_control(frame_copy, frame_num, team_ball_control)
        frame_copy = tracker._draw_commentary_overlay(frame_copy, current_commentary)
        output_frames.append(frame_copy)

    output_frames = speed_estimator.draw_speed_and_distance(output_frames, tracks)
    save_video(output_frames, OUTPUT_VIDEO_PATH)

    # --- FINAL STATISTICS ---
    print("\n" + "="*50)
    print("MATCH ANALYSIS COMPLETE")
    print("="*50)
    print(f"✅ Video saved to: {OUTPUT_VIDEO_PATH}")

if __name__ == "__main__":
    main()


✅ Jersey OCR module initialized.
🎙️ Initializing Tactical Captioner (Qwen2-VL preferred)...


preprocessor_config.json:   0%|          | 0.00/347 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

chat_template.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

model.safetensors.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/3.99G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/429M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/272 [00:00<?, ?B/s]

✅ Using Qwen2-VL-7B-Instruct.
Stage 1: Performing object detection and tracking...
Processing frame 0/268
Processing frame 20/268
Processing frame 40/268
Processing frame 60/268
Processing frame 80/268
Processing frame 100/268
Processing frame 120/268
Processing frame 140/268
Processing frame 160/268
Processing frame 180/268
Processing frame 200/268
Processing frame 220/268
Processing frame 240/268
Processing frame 260/268
Stage 2: Estimating camera motion and transforming perspective...
Stage 3: Assigning teams...
Stage 4: Detecting events for commentary context...
Detected 0 events for commentary context
Stage 5: Tracking ball possession and generating all commentary...
Commentary progress: 0/268 frames
Commentary progress: 100/268 frames


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Commentary progress: 200/268 frames


The following generation flags are not valid and may be ignored: ['temperature', 'top_p', 'top_k']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Stage 6: Combining commentary and saving final video...

MATCH ANALYSIS COMPLETE
✅ Video saved to: /kaggle/working/final_analysis_video-qwen.mp4
