# 🧠 SPAI: Sponsor Visibility Analysis
This notebook uses a trained YOLOv8 model to detect sponsor logos in football match frames, calculates their visibility metrics, and uploads the data to a Supabase database.

### Key Features:
- Detect sponsor logos in frames using YOLOv8
- Track logos across frames
- Calculate position- and size-based KPI scores
- Export results to Supabase

> ⚠️ Make sure to configure your `.env` file with Supabase credentials (not shared publicly).

In [None]:
!pip install -q supabase ultralytics opencv-python-headless numpy pandas python-dotenv

In [None]:
import os
import cv2
import time
import json
import numpy as np
import pandas as pd
from ultralytics import YOLO
from supabase import create_client
from collections import defaultdict
from dotenv import load_dotenv
from google.colab import drive

In [None]:
#Mount Google Drive to access the trained model
drive.mount('/content/drive')

In [None]:
#Load Supabase credentials from .env file 
load_dotenv()
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

In [None]:
#Load YOLOv8 model
MODEL_PATH = '/content/drive/MyDrive/YOLOv8_models/barca_t_shirt_detection/weights/best.pt'
model = YOLO(MODEL_PATH)

In [None]:
def calculate_position_score(x_center, y_center, frame_width, frame_height):
    """
    Calculate position score (1.0 at center, 0.4 at corners)
    Higher score for logos in the center of the frame
    """
    # Normalize coordinates to [-1, 1] range
    x_norm = 2 * (x_center / frame_width - 0.5)
    y_norm = 2 * (y_center / frame_height - 0.5)

    # Calculate distance from center (0 at center, 1 at corners)
    distance = min(1.0, np.sqrt(x_norm**2 + y_norm**2))

    # Convert to score (1.0 at center, 0.4 at corners)
    return 1.0 - (0.6 * distance)

def determine_position_category(x_center, y_center, frame_width, frame_height):
    """
    Determine position category based on location in frame
    Returns: 'center', 'edge', 'corner', etc.
    """
    # Calculate relative position in frame
    x_rel = x_center / frame_width
    y_rel = y_center / frame_height

    # Define regions
    if 0.3 <= x_rel <= 0.7 and 0.3 <= y_rel <= 0.7:
        return "center"
    elif (x_rel < 0.2 and y_rel < 0.2) or (x_rel < 0.2 and y_rel > 0.8) or \
         (x_rel > 0.8 and y_rel < 0.2) or (x_rel > 0.8 and y_rel > 0.8):
        return "corner"
    else:
        return "edge"

def determine_size_category(area_percentage):
    """
    Determine size category based on percentage of frame
    Returns: 'small', 'medium', 'large'
    """
    if area_percentage < 1:
        return "small"
    elif area_percentage < 5:
        return "medium"
    else:
        return "large"

def process_video(game_id, video_path, sampling_rate=30):
    """
    Process video to detect sponsor logos

    Args:
        game_id: ID of the game in the database
        video_path: Path to the video file
        sampling_rate: Process every Nth frame (default: 30, about 1 frame per second for 30fps videos)
    """
    print(f"Processing game ID: {game_id}")
    print(f"Video path: {video_path}")

    # Open the video file
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video: {video_path}")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"Video properties: {frame_width}x{frame_height}, {fps} fps, {total_frames} frames")

    # Detection results
    all_detections = []         # For logo_detections table
    logo_appearances = {}       # For tracking and calculating metrics
    heatmap_data = {}           # For logo_heatmaps table
    timeline_data = []          # For logo_timeline table

    # Tracking for continuous sequences
    continuous_sequences = {}   # Track continuous appearances for each logo

    # Process frames
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process every Nth frame
        if frame_count % sampling_rate == 0:
            # Calculate timestamp in seconds
            timestamp = frame_count / fps

            # Run YOLOv8 detection
            results = model(frame, conf=0.4)

            # Track logos present in this frame
            logos_in_frame = set()

            # Process each detection
            for result in results:
                for box, score, cls_id in zip(result.boxes.xyxy, result.boxes.conf, result.boxes.cls):
                    x1, y1, x2, y2 = box.cpu().numpy().tolist()
                    confidence = score.item()
                    class_id = int(cls_id.item())
                    logo_name = model.names[class_id]

                    logos_in_frame.add(logo_name)

                    # Calculate center of bounding box
                    x_center = (x1 + x2) / 2
                    y_center = (y1 + y2) / 2

                    # Calculate normalized center for heatmap
                    x_center_norm = x_center / frame_width
                    y_center_norm = y_center / frame_height

                    # Calculate metrics
                    position_score = calculate_position_score(x_center, y_center, frame_width, frame_height)
                    position_category = determine_position_category(x_center, y_center, frame_width, frame_height)

                    bbox_area = (x2 - x1) * (y2 - y1)
                    frame_area = frame_width * frame_height
                    area_percentage = (bbox_area / frame_area) * 100
                    size_category = determine_size_category(area_percentage)

                    sponsor_score = position_score * area_percentage

                    # Create detection record (logo_detections table)
                    detection = {
                        "game_id": game_id,
                        "timestamp": round(timestamp, 2),
                        "logo_name": logo_name,
                        "bbox": [float(x1), float(y1), float(x2), float(y2)],
                        "confidence": float(confidence),
                        "position_score": float(position_score),
                        "area_percentage": float(area_percentage),
                        "sponsor_score": float(sponsor_score),
                        "position_category": position_category,
                        "size_category": size_category
                    }

                    all_detections.append(detection)

                    # Add to heatmap data (logo_heatmaps table)
                    if logo_name not in heatmap_data:
                        heatmap_data[logo_name] = []

                    heatmap_data[logo_name].append({
                        "x": float(x_center_norm),
                        "y": float(y_center_norm),
                        "score": float(sponsor_score)
                    })

                    # Track logo appearances for metrics
                    if logo_name not in logo_appearances:
                        logo_appearances[logo_name] = {
                            "total_time": 0,
                            "appearances": 0,
                            "total_area": 0,
                            "total_position_score": 0,
                            "frames": [],
                            "position_counts": {"center": 0, "edge": 0, "corner": 0},
                            "size_counts": {"small": 0, "medium": 0, "large": 0},
                            "center_percentage": 0,
                            "edge_percentage": 0,
                            "corner_percentage": 0,
                            "small_percentage": 0,
                            "medium_percentage": 0,
                            "large_percentage": 0
                        }

                    logo_appearances[logo_name]["frames"].append(frame_count)
                    logo_appearances[logo_name]["total_area"] += area_percentage
                    logo_appearances[logo_name]["total_position_score"] += position_score
                    logo_appearances[logo_name]["appearances"] += 1
                    logo_appearances[logo_name]["position_counts"][position_category] += 1
                    logo_appearances[logo_name]["size_counts"][size_category] += 1

                    # Update continuous sequence tracking for timeline
                    if logo_name not in continuous_sequences:
                        continuous_sequences[logo_name] = {
                            "start_time": timestamp,
                            "end_time": timestamp,
                            "avg_position_score": position_score,
                            "avg_area": area_percentage,
                            "avg_sponsor_score": sponsor_score,
                            "detection_count": 1
                        }
                    else:
                        # If logo was seen in recent frames, extend sequence
                        if timestamp - continuous_sequences[logo_name]["end_time"] < (sampling_rate * 2) / fps:
                            seq = continuous_sequences[logo_name]
                            seq["end_time"] = timestamp
                            seq["avg_position_score"] = (seq["avg_position_score"] * seq["detection_count"] + position_score) / (seq["detection_count"] + 1)
                            seq["avg_area"] = (seq["avg_area"] * seq["detection_count"] + area_percentage) / (seq["detection_count"] + 1)
                            seq["avg_sponsor_score"] = (seq["avg_sponsor_score"] * seq["detection_count"] + sponsor_score) / (seq["detection_count"] + 1)
                            seq["detection_count"] += 1
                        else:
                            # Previous sequence ended, create timeline entry and start new sequence
                            seq = continuous_sequences[logo_name]
                            timeline_entry = {
                                "game_id": game_id,
                                "logo_name": logo_name,
                                "timestamp": round(seq["start_time"], 2),  # Start time of appearance
                                "sponsor_score": round(seq["avg_sponsor_score"], 2)
                            }
                            timeline_data.append(timeline_entry)

                            # Start a new sequence
                            continuous_sequences[logo_name] = {
                                "start_time": timestamp,
                                "end_time": timestamp,
                                "avg_position_score": position_score,
                                "avg_area": area_percentage,
                                "avg_sponsor_score": sponsor_score,
                                "detection_count": 1
                            }

            # Check for logos that disappeared in this frame
            for logo_name in list(continuous_sequences.keys()):
                if logo_name not in logos_in_frame and timestamp - continuous_sequences[logo_name]["end_time"] >= (sampling_rate * 2) / fps:
                    # Logo is no longer visible for at least 2 sampling intervals, add to timeline
                    seq = continuous_sequences[logo_name]
                    timeline_entry = {
                        "game_id": game_id,
                        "logo_name": logo_name,
                        "timestamp": round(seq["start_time"], 2),  # Start time of appearance
                        "sponsor_score": round(seq["avg_sponsor_score"], 2)
                    }
                    timeline_data.append(timeline_entry)
                    del continuous_sequences[logo_name]

            if frame_count % 300 == 0:  # Show progress every ~10 seconds
                print(f"Processed frame {frame_count}/{total_frames} ({frame_count/total_frames*100:.1f}%)")

        frame_count += 1

    cap.release()

    # Add any remaining sequences to timeline
    for logo_name, seq in continuous_sequences.items():
        timeline_entry = {
            "game_id": game_id,
            "logo_name": logo_name,
            "timestamp": round(seq["start_time"], 2),
            "sponsor_score": round(seq["avg_sponsor_score"], 2)
        }
        timeline_data.append(timeline_entry)

    # Calculate aggregated metrics
    logo_metrics = []
    for logo_name, data in logo_appearances.items():
        # Calculate total visibility time (approximate)
        frames = sorted(data["frames"])
        total_frames = 0
        current_sequence = []
        sequence_durations = []  # To calculate average sequence duration

        for frame in frames:
            if not current_sequence or frame <= current_sequence[-1] + sampling_rate * 2:  # Allow for gaps
                current_sequence.append(frame)
            else:
                seq_frames = len(current_sequence)
                total_frames += seq_frames
                sequence_durations.append(seq_frames / fps)
                current_sequence = [frame]

        if current_sequence:
            seq_frames = len(current_sequence)
            total_frames += seq_frames
            sequence_durations.append(seq_frames / fps)

        visibility_time = total_frames / fps
        avg_area = data["total_area"] / data["appearances"] if data["appearances"] > 0 else 0
        avg_position_score = data["total_position_score"] / data["appearances"] if data["appearances"] > 0 else 0

        # Calculate average sequence duration
        avg_sequence_duration = sum(sequence_durations) / len(sequence_durations) if sequence_durations else 0

        # Calculate position and size percentages
        total_positions = sum(data["position_counts"].values())
        center_percentage = (data["position_counts"]["center"] / total_positions * 100) if total_positions > 0 else 0
        edge_percentage = (data["position_counts"]["edge"] / total_positions * 100) if total_positions > 0 else 0
        corner_percentage = (data["position_counts"]["corner"] / total_positions * 100) if total_positions > 0 else 0

        total_sizes = sum(data["size_counts"].values())
        small_percentage = (data["size_counts"]["small"] / total_sizes * 100) if total_sizes > 0 else 0
        medium_percentage = (data["size_counts"]["medium"] / total_sizes * 100) if total_sizes > 0 else 0
        large_percentage = (data["size_counts"]["large"] / total_sizes * 100) if total_sizes > 0 else 0

        # Determine dominant position and size
        dominant_position = max(data["position_counts"], key=data["position_counts"].get)
        dominant_size = max(data["size_counts"], key=data["size_counts"].get)

        # Calculate sponsorship value (example formula)
        size_weight = min(2.0, 0.5 + avg_area / 10)  # Size weight: 0.5-2.0
        position_weight = avg_position_score  # Position weight: 0.4-1.0
        rate = 100  # Arbitrary base rate per second
        value = visibility_time * rate * size_weight * position_weight

        # Calculate unique appearances (number of sequences)
        unique_appearances = len(sequence_durations)

        metrics = {
            "game_id": game_id,
            "logo_name": logo_name,
            "visibility_time": round(visibility_time, 2),
            "appearances": data["appearances"],
            "unique_appearances": unique_appearances,
            "avg_sequence_duration": round(avg_sequence_duration, 2),
            "avg_area_percentage": round(avg_area, 2),
            "avg_position_score": round(avg_position_score, 2),
            "dominant_position": dominant_position,
            "dominant_size": dominant_size,
            "center_percentage": round(center_percentage, 2),
            "edge_percentage": round(edge_percentage, 2),
            "corner_percentage": round(corner_percentage, 2),
            "small_percentage": round(small_percentage, 2),
            "medium_percentage": round(medium_percentage, 2),
            "large_percentage": round(large_percentage, 2),
            "sponsorship_value": round(value, 2)
        }

        logo_metrics.append(metrics)

    # Convert to DataFrames
    detections_df = pd.DataFrame(all_detections)
    metrics_df = pd.DataFrame(logo_metrics)
    timeline_df = pd.DataFrame(timeline_data)

    # Save results to Supabase
    save_to_supabase(game_id, detections_df, metrics_df, timeline_df, heatmap_data)

    print(f"Completed processing game {game_id}")
    print(f"Detected {len(all_detections)} logo instances")
    print(f"Found {len(logo_metrics)} unique logos")
    print(f"Created {len(timeline_data)} timeline entries")

    return detections_df, metrics_df, timeline_df, heatmap_data

def save_to_supabase(game_id, detections_df, metrics_df, timeline_df, heatmap_data):
    """Save all detection data to Supabase"""
    try:
        # 1. Save detailed detections
        try:
            batch_size = 1000  # Insert in batches to avoid payload limits
            for i in range(0, len(detections_df), batch_size):
                batch = detections_df.iloc[i:i+batch_size].to_dict('records')
                supabase.table("logo_detections").insert(batch).execute()
                print(f"Saved batch {i//batch_size + 1} of detections")
        except Exception as e:
            print(f"Error saving logo_detections: {str(e)}")
            # Create local backup file
            detections_df.to_csv(f"detections_{game_id}.csv", index=False)
            print(f"Saved detections to detections_{game_id}.csv")

        # 2. Save aggregated metrics
        try:
            supabase.table("logo_metrics").insert(metrics_df.to_dict('records')).execute()
            print("Saved logo metrics")
        except Exception as e:
            print(f"Error saving logo_metrics: {str(e)}")
            metrics_df.to_csv(f"metrics_{game_id}.csv", index=False)
            print(f"Saved metrics to metrics_{game_id}.csv")

        # 3. Save timeline data
        try:
            batch_size = 1000  # Insert in batches to avoid payload limits
            for i in range(0, len(timeline_df), batch_size):
                batch = timeline_df.iloc[i:i+batch_size].to_dict('records')
                supabase.table("logo_timeline").insert(batch).execute()
                print(f"Saved batch {i//batch_size + 1} of timeline entries")
        except Exception as e:
            print(f"Error saving logo_timeline: {str(e)}")
            timeline_df.to_csv(f"timeline_{game_id}.csv", index=False)
            print(f"Saved timeline to timeline_{game_id}.csv")

        # 4. Save heatmap data
        try:
            for logo_name, positions in heatmap_data.items():
                heatmap_entry = {
                    "game_id": game_id,
                    "logo_name": logo_name,
                    "positions": positions
                }
                supabase.table("logo_heatmaps").insert(heatmap_entry).execute()
            print("Saved heatmap data")
        except Exception as e:
            print(f"Error saving logo_heatmaps: {str(e)}")
            with open(f"heatmap_{game_id}.json", 'w') as f:
                json.dump(heatmap_data, f)
            print(f"Saved heatmap data to heatmap_{game_id}.json")

        # 5. Update game status to "processed"
        try:
            supabase.table("games").update({
                "status": "processed",
                "status_message": "Processing completed successfully"
            }).eq("id", game_id).execute()
            print(f"Updated game {game_id} status to 'processed'")
        except Exception as e:
            print(f"Error updating game status: {str(e)}")

    except Exception as e:
        print(f"Error saving data to Supabase: {str(e)}")
        # Save data locally as backup
        detections_df.to_csv(f"detections_{game_id}.csv", index=False)
        metrics_df.to_csv(f"metrics_{game_id}.csv", index=False)
        timeline_df.to_csv(f"timeline_{game_id}.csv", index=False)
        with open(f"heatmap_{game_id}.json", 'w') as f:
            json.dump(heatmap_data, f)
        print(f"Saved backup data to CSV/JSON files")


In [None]:
def main():
    # Fetch pending games
    pending_games = fetch_pending_games()
    print(f"Found {len(pending_games)} pending games to process")

    if not pending_games:
        print("No pending games found")
        return

    for game in pending_games:
        game_id = game["id"]
        video_path = game["video_path"]

        try:
            process_video(game_id, video_path)
        except Exception as e:
            print(f"Error processing game {game_id}: {str(e)}")
            # Update status to "error" in case of failure
            supabase.table("games").update({
                "status": "error",
                "status_message": f"Error: {str(e)[:200]}"  # Truncate long error messages
            }).eq("id", game_id).execute()


In [None]:
if __name__ == "__main__":
    main()