In [4]:
import pandas as pd
import numpy as np
import ast
import torch
import random
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging
from enum import IntEnum

In [5]:
# Configure logging for clarity
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("soccer_analytics")

In [6]:
# === Tag Definitions as IntEnum for better readability ===
class EventTag(IntEnum):
    GOAL = 101
    MISS = 102
    SAVE = 121
    PENALTY = 1801
    POST = 2001
    BAR = 2002
    ACCURATE_PASS = 1801
    THROUGH_PASS = 901
    KEY_PASS = 302
    HIGH_PRESS = 1401
    INTERCEPTION = 1401
    COUNTER_ATTACK = 1901
    RED_CARD = 1702
    YELLOW_CARD = 1701
    TACKLE = 401
    DANGEROUS_ATTACK = 2101
    PASS_INTO_FINAL_THIRD = 1105

In [7]:
# === Configuration Dataclass ===
@dataclass
class AnalyticsConfig:
    long_range_threshold: float = 25.0
    exceptional_threshold: float = 35.0
    aggregation_window: int = 15  # seconds
    match_limit: int = 10
    event_probabilities: Dict[str, float] = None

    def __post_init__(self):
        if self.event_probabilities is None:
            # Default probabilities for event filtering based on professional insights
            self.event_probabilities = {
                'penalty': 1.0,               # Always show penalties
                'exceptional_goal': 1.0,      # Always show exceptional goals
                'high_xg_miss': 0.8,          # Important insights on high expected goal misses
                'tactical_pattern': 0.7,      # Tactical sequences reflect strategic play
                'key_pass': 0.6,             # Key passes for creative play analysis
                'high_press_success': 0.5,    # Successful pressing events
                'defensive_breakdown': 0.5,   # Critical defensive breakdowns
                'counter_attack': 0.5,        # Fast break opportunities
                'set_piece': 0.7,             # Set piece opportunities
                'third_transition': 0.4,      # Final third entries highlighting attacking patterns
                'goal_sequence': 0.8,         # Goal build-ups
                'card': 0.7                 # Disciplinary events
            }

In [8]:
# === Data Models ===
@dataclass
class Player:
    id: int
    name: str
    role: str
    team_id: int

    @property
    def is_goalkeeper(self) -> bool:
        return self.role.upper() == "GK"

@dataclass
class Team:
    id: int
    name: str

@dataclass
class Position:
    x: float  # field length (0-100 scale)
    y: float  # field width (0-100 scale)

    def distance_to_goal(self, attacking_direction: int = 1) -> float:
        """
        Calculate Euclidean distance to goal.
        :param attacking_direction: 1 if attacking left-to-right, -1 otherwise.
        """
        goal_x = 100 if attacking_direction == 1 else 0
        goal_y = 50  # Center of goal
        return np.sqrt((goal_x - self.x) ** 2 + (goal_y - self.y) ** 2)

In [9]:
@dataclass
class MatchEvent:
    id: int
    match_id: int
    team_id: int
    player_id: int
    event_name: str
    minute: int
    second: int
    timestamp: float
    position: Position
    tags: List[int]
    attacking_direction: int = 1  # 1 for left-to-right, -1 for right-to-left

    @property
    def time_str(self) -> str:
        return f"[{int(self.timestamp // 60)}:{int(self.timestamp % 60):02d}]"

    @property
    def is_goal(self) -> bool:
        return EventTag.GOAL in self.tags

    @property
    def is_miss(self) -> bool:
        return EventTag.MISS in self.tags

    @property
    def is_save(self) -> bool:
        return EventTag.SAVE in self.tags

    @property
    def is_penalty(self) -> bool:
        return EventTag.PENALTY in self.tags

    @property
    def hit_woodwork(self) -> bool:
        return EventTag.POST in self.tags or EventTag.BAR in self.tags

    @property
    def distance_to_goal(self) -> float:
        if not self.position:
            return 100.0  # If position unknown, assume far away
        return self.position.distance_to_goal(self.attacking_direction)

In [10]:
# === Soccer Analytics Engine ===
class SoccerAnalyticsEngine:
    def __init__(self, config: AnalyticsConfig):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logger.info(f"Running on device: {self.device}")

        # Data containers
        self.players: Dict[int, Player] = {}
        self.teams: Dict[int, Team] = {}
        self.events: pd.DataFrame = None
        self.matches: pd.DataFrame = None

        # Analytics state
        self.last_printed_time = -config.aggregation_window
        self.previous_events: List[MatchEvent] = []
        self.event_sequence_buffer: List[MatchEvent] = []
        self.pass_sequences: Dict[int, List[MatchEvent]] = {}  # key: team id

    def load_data(self, data_path: str) -> None:
        """Load and preprocess all datasets."""
        logger.info("Loading datasets...")

        # Load datasets
        events_df = pd.read_csv(f'{data_path}/events_World_Cup.csv')
        players_df = pd.read_csv(f'{data_path}/players.csv')
        teams_df = pd.read_csv(f'{data_path}/teams.csv')
        matches_df = pd.read_csv(f'{data_path}/matches_World_Cup.csv')

        # Process players data
        for _, row in players_df.iterrows():
            try:
                role_info = ast.literal_eval(row['role']) if not pd.isna(row['role']) else {"code2": ""}
                role_code = role_info.get('code2', "").upper()
            except (ValueError, SyntaxError):
                role_code = ""
            self.players[row['wyId']] = Player(
                id=row['wyId'],
                name=row['shortName'] if not pd.isna(row['shortName']) else f"Player {row['wyId']}",
                role=role_code,
                team_id=row['currentTeamId'] if not pd.isna(row['currentTeamId']) else 0
            )

        # Process teams data
        for _, row in teams_df.iterrows():
            self.teams[row['wyId']] = Team(
                id=row['wyId'],
                name=row['name'] if not pd.isna(row['name']) else f"Team {row['wyId']}"
            )

        # Store match data
        self.matches = matches_df

        # Parse event tags and clean events
        events_df['tags_clean'] = events_df['tags'].apply(self._parse_tags)

        # Determine attacking direction per match and team.
        match_directions = {}
        for match_id in events_df['matchId'].unique():
            match_events = events_df[events_df['matchId'] == match_id]
            teams_in_match = match_events['teamId'].unique()
            if len(teams_in_match) >= 2:
                # First team attacks left-to-right, second right-to-left
                match_directions[match_id] = {
                    teams_in_match[0]: 1,
                    teams_in_match[1]: -1
                }

        # Process and convert events to MatchEvent objects.
        processed_events = []
        for _, row in events_df.iterrows():
            if pd.isna(row['pos_orig_x']) or pd.isna(row['pos_orig_y']):
                continue

            match_id = row['matchId']
            team_id = row['teamId']
            attacking_direction = match_directions.get(match_id, {}).get(team_id, 1)

            event = MatchEvent(
                id=row['id'] if 'id' in row else 0,
                match_id=match_id,
                team_id=team_id,
                player_id=row['playerId'],
                event_name=row['eventName'],
                minute=int(row['eventSec'] // 60),
                second=int(row['eventSec'] % 60),
                timestamp=row['eventSec'],
                position=Position(x=row['pos_orig_x'], y=row['pos_orig_y']),
                tags=row['tags_clean'],
                attacking_direction=attacking_direction
            )
            processed_events.append(event)

        # Convert list of events to DataFrame for efficient filtering.
        self.events = pd.DataFrame([vars(e) for e in processed_events])
        logger.info(f"Processed {len(self.events)} events from {len(self.matches)} matches.")

    def _parse_tags(self, tag_str: str) -> List[int]:
        """Convert tags from string representation to a list of integers."""
        if pd.isna(tag_str):
            return []
        try:
            return [tag['id'] for tag in ast.literal_eval(tag_str)]
        except (ValueError, SyntaxError):
            return []

    def get_player_name(self, player_id: int) -> str:
        """Retrieve player name using player ID."""
        player = self.players.get(player_id)
        return player.name if player else f"Player {player_id}"

    def get_team_name(self, team_id: int) -> str:
        """Retrieve team name using team ID."""
        team = self.teams.get(team_id)
        return team.name if team else f"Team {team_id}"

    def _should_analyze_event(self, event: MatchEvent, category: str) -> bool:
        """Decide whether to analyze an event based on its category probability."""
        probability = self.config.event_probabilities.get(category, 0.1)
        return random.random() < probability

    def calculate_expected_goals(self, event: MatchEvent) -> float:
        """Compute a simple expected goals (xG) value based on shot distance and context."""
        if event.event_name != 'Shot':
            return 0.0

        distance = event.distance_to_goal
        # Base xG model: inversely proportional to distance
        xg = max(0.01, min(0.9, 0.7 * (1 - distance/100)))

        # Adjust xG based on shot type
        if event.is_penalty:
            xg = 0.76  # Typical penalty conversion rate
        elif event.hit_woodwork:
            xg *= 1.2
        elif distance > self.config.exceptional_threshold:
            xg *= 0.5  # Long-range shots have lower chance

        return xg

    def detect_tactical_pattern(self, recent_events: List[MatchEvent]) -> Optional[str]:
        """Detect significant tactical patterns from a sequence of recent events."""
        if len(recent_events) < 3:
            return None

        team_id = recent_events[-1].team_id
        team_events = [e for e in recent_events if e.team_id == team_id]
        if len(team_events) < 3:
            return None

                # Only consider events after the opening period (e.g. after 15 seconds)
        if team_events[-1].timestamp > 15:
            time_interval = team_events[-1].timestamp - team_events[-3].timestamp
            if time_interval < 8 and any(e.event_name == 'Pass' for e in team_events):
                if abs(team_events[-1].position.x - team_events[-3].position.x) > 30:
                    return "counter_attack"

        # Possession-based buildup if multiple passes occur consecutively
        if len([e for e in team_events if e.event_name == 'Pass']) >= 5:
            return "possession_buildup"

        # Check for high press events
        if any(EventTag.HIGH_PRESS in e.tags for e in team_events):
            return "high_press"

        return None

    def analyze_match(self, match_id: int) -> List[str]:
        """Analyze a single match and generate plain-English insights."""
        logger.info(f"Analyzing match {match_id}")
        match_events = self.events[self.events['match_id'] == match_id].sort_values('timestamp')

        # Reset state for the match
        self.last_printed_time = -self.config.aggregation_window
        self.previous_events = []
        self.event_sequence_buffer = []
        self.pass_sequences = {}

        insights = []
        # Iterate over each event
        for _, row in match_events.iterrows():
            # Use the Position object stored in row (assuming it remains a Position)
            pos = row['position'] if isinstance(row['position'], Position) else Position(**row['position'])
            event = MatchEvent(
                id=row['id'],
                match_id=row['match_id'],
                team_id=row['team_id'],
                player_id=row['player_id'],
                event_name=row['event_name'],
                minute=row['minute'],
                second=row['second'],
                timestamp=row['timestamp'],
                position=pos,
                tags=row['tags'],
                attacking_direction=row['attacking_direction']
            )

            # Update event buffer for pattern detection; only last 10 events are kept
            self.event_sequence_buffer.append(event)
            if len(self.event_sequence_buffer) > 10:
                self.event_sequence_buffer.pop(0)

            player_name = self.get_player_name(event.player_id)
            team_name = self.get_team_name(event.team_id)

            # Ensure a minimum time gap (aggregation window) between insights
            if (event.timestamp - self.last_printed_time) < self.config.aggregation_window:
                self.previous_events.append(event)
                continue

            insight = None

            # === Analyze Shot Events ===
            if event.event_name == 'Shot':
                xg = self.calculate_expected_goals(event)
                # Skip goalkeeper events unless it's a penalty
                player = self.players.get(event.player_id)
                if player and player.is_goalkeeper and not event.is_penalty:
                    continue

                if event.is_goal:
                    distance = event.distance_to_goal
                    if event.is_penalty:
                        insight = f"{event.time_str} 🎯 Penalty scored by {player_name} ({team_name})."
                    elif distance >= self.config.exceptional_threshold:
                        insight = (f"{event.time_str} 🚀 EXCEPTIONAL goal by {player_name} ({team_name}) from {int(distance)}m! "
                                   f"xG: {xg:.2f} – A low-probability finish.")
                    elif distance >= self.config.long_range_threshold:
                        if self._should_analyze_event(event, 'exceptional_goal'):
                            insight = f"{event.time_str} 🎯 Long-range goal by {player_name} ({team_name}) from {int(distance)}m. xG: {xg:.2f}"
                    elif xg >= 0.3:
                        if self._should_analyze_event(event, 'goal_sequence'):
                            insight = f"{event.time_str} ⚽ Goal by {player_name} ({team_name}) with an xG of {xg:.2f}."
                elif event.is_miss:
                    distance = event.distance_to_goal
                    if xg >= 0.3 and self._should_analyze_event(event, 'high_xg_miss'):
                        insight = f"{event.time_str} ❌ High-quality chance missed by {player_name} ({team_name})! xG: {xg:.2f}"
                    elif distance < self.config.long_range_threshold and self._should_analyze_event(event, 'high_xg_miss'):
                        insight = f"{event.time_str} ❓ Shot missed by {player_name} ({team_name}) from {int(distance)}m."
                elif event.is_save:
                    if xg >= 0.3 and self._should_analyze_event(event, 'defensive_breakdown'):
                        insight = f"{event.time_str} 🧤 Great shot (xG: {xg:.2f}) by {player_name} ({team_name}) was saved!"
                elif event.hit_woodwork:
                    if self._should_analyze_event(event, 'high_xg_miss'):
                        loc = "post" if EventTag.POST in event.tags else "crossbar"
                        insight = f"{event.time_str} 🥵 Shot by {player_name} ({team_name}) hit the {loc}! xG: {xg:.2f}"

            # === Analyze Pass Events ===
            elif event.event_name == 'Pass':
                # Record passes to track sequences per team
                team_passes = self.pass_sequences.setdefault(event.team_id, [])
                team_passes.append(event)
                if len(team_passes) > 8:
                    team_passes.pop(0)

                if EventTag.KEY_PASS in event.tags and self._should_analyze_event(event, 'key_pass'):
                    insight = f"{event.time_str} 🔑 Key pass by {player_name} ({team_name}) created a scoring chance."
                elif EventTag.THROUGH_PASS in event.tags and self._should_analyze_event(event, 'tactical_pattern'):
                    insight = f"{event.time_str} 🎯 Through-ball by {player_name} ({team_name}) split the defense."
                elif EventTag.PASS_INTO_FINAL_THIRD in event.tags and self._should_analyze_event(event, 'third_transition'):
                    insight = f"{event.time_str} ⚔️ {team_name} entered the final third with a progressive pass from {player_name}."

            # === Tactical Pattern Detection ===
            pattern = self.detect_tactical_pattern(self.event_sequence_buffer)
            if pattern and self._should_analyze_event(event, 'tactical_pattern'):
                if pattern == "counter_attack":
                    insight = f"{event.time_str} ⚡ Quick counter-attack by {team_name}!"
                elif pattern == "possession_buildup":
                    insight = f"{event.time_str} 🔄 Patient possession build-up by {team_name}."
                elif pattern == "high_press":
                    insight = f"{event.time_str} 🔥 High pressing by {team_name} disrupted the opposition."

            # === Disciplinary Events ===
            if event.event_name == 'Foul':
                if EventTag.RED_CARD in event.tags and self._should_analyze_event(event, 'card'):
                    insight = f"{event.time_str} 🟥 RED CARD issued to {player_name} ({team_name})."
                elif EventTag.YELLOW_CARD in event.tags and self._should_analyze_event(event, 'card'):
                    insight = f"{event.time_str} 🟨 Yellow card for {player_name} ({team_name})."

            # === Set Piece Analysis ===
            elif event.event_name == 'Free Kick' and self._should_analyze_event(event, 'set_piece'):
                distance = event.distance_to_goal
                if distance <= 30:
                    insight = f"{event.time_str} ⚠️ Dangerous free kick opportunity for {team_name} by {player_name}."

            if insight:
                insights.append(insight)
                self.last_printed_time = event.timestamp

            # Keep previous events for potential future analysis
            self.previous_events.append(event)
            if len(self.previous_events) > 5:
                self.previous_events.pop(0)

        return insights

    def run_analysis(self, data_path: str = '/kaggle/input/soccer-match-event-dataset/') -> Dict[int, List[str]]:
        """Analyze multiple matches and return a dictionary of match insights."""
        self.load_data(data_path)
        match_insights = {}
        match_count = 0

        for match_id in self.events['match_id'].unique():
            if match_count >= self.config.match_limit:
                break
            insights = self.analyze_match(match_id)
            match_insights[match_id] = insights
            match_count += 1

        return match_insights

In [11]:
# === Main Execution Function ===
def run_soccer_analytics(data_path: str = '/kaggle/input/soccer-match-event-dataset/', match_limit: int = 10) -> None:
    """Main function to run the soccer analytics pipeline."""
    # Initialize configuration with professional analytics focus
    config = AnalyticsConfig(
        long_range_threshold=25.0,
        exceptional_threshold=35.0,
        aggregation_window=15,
        match_limit=match_limit
    )
    engine = SoccerAnalyticsEngine(config)
    all_insights = engine.run_analysis(data_path)

    # Display insights and a professional summary ("meow numbers") per match
    for match_id, insights in all_insights.items():
        print(f"\n=== 📊 PROFESSIONAL ANALYSIS: MATCH {match_id} ===")
        if not insights:
            print("No significant tactical or technical events were identified in this match.")
        else:
            for insight in insights:
                print(insight)

        # Summary statistics with "meow numbers"
        total_insights = len(insights)
        shot_insights = len([i for i in insights if any(icon in i for icon in ["⚽", "🎯", "❌"])])
        tactical_insights = len([i for i in insights if any(icon in i for icon in ["🔄", "⚡", "🔥"])])
        key_pass_insights = len([i for i in insights if "🔑" in i])
        print(f"\nSummary:")
        print(f"Total Insights: {total_insights}")
        print(f"Shot Events: {shot_insights} | Tactical Patterns: {tactical_insights} | Key Passes: {key_pass_insights}")
        print(f"MEOW Score: {shot_insights + tactical_insights + key_pass_insights} (a quick professional insight indicator)")

In [12]:
# Execute the analytics pipeline when run as a script
if __name__ == "__main__":
    print("======== Professional Soccer Analytics System ========")
    run_soccer_analytics()


=== 📊 PROFESSIONAL ANALYSIS: MATCH 2057954 ===
[0:16] 🔄 Patient possession build-up by Saudi Arabia.
[0:39] ⚡ Quick counter-attack by Russia!
[1:03] 🔥 High pressing by Russia disrupted the opposition.
[1:33] ⚡ Quick counter-attack by Russia!
[1:51] ⚡ Quick counter-attack by Russia!
[2:08] 🔄 Patient possession build-up by Russia.
[2:32] 🎯 Through-ball by A. Golovin (Russia) split the defense.
[2:53] 🔄 Patient possession build-up by Russia.
[3:17] 🔄 Patient possession build-up by Saudi Arabia.
[3:32] ⚡ Quick counter-attack by Saudi Arabia!
[3:52] 🔄 Patient possession build-up by Saudi Arabia.
[4:19] ⚡ Quick counter-attack by Russia!
[4:56] 🔄 Patient possession build-up by Saudi Arabia.
[5:15] 🔄 Patient possession build-up by Saudi Arabia.
[5:39] 🔥 High pressing by Russia disrupted the opposition.
[5:55] 🔥 High pressing by Saudi Arabia disrupted the opposition.
[6:15] ⚡ Quick counter-attack by Russia!
[6:34] ⚡ Quick counter-attack by Russia!
[6:49] 🔥 High pressing by Saudi Arabia disrupt