In [None]:
import numpy as np
import pandas as pd
from IPython.display import display
import seaborn as sns
import matplotlib.pyplot as plt
import ast
from scipy.stats import spearmanr, kendalltau, pearsonr
from mplsoccer import Pitch

# Pipeline Setup

In [None]:
# Function to print out top players over the entire data (on an arbitrary level)
def top_players(df, metric):
    print(f'Top players by {metric}')
    return df.sort_values(by=metric, ascending=False)

# Function to show top players for each position
def top_players_by_position(df, metric):
    grouped = df.groupby('position_primary').apply(lambda x: x.sort_values(by=metric, ascending=False))
    return grouped

# Function to show top players by event type
def top_players_by_event(df, metric):
    grouped = df.groupby('type').apply(lambda x: x.sort_values(by=metric, ascending=False))
    return grouped

In [None]:
def bootstrap_metric(game_df, metric, n_bootstrap):
    bootstrap_results = {}
    
    # Group data by player-season first
    player_season_groups = game_df.groupby(['season', 'player'])
    for (season, player), group in player_season_groups:
        key = (season, player)
        bootstrap_results[key] = []
        
        # Get unique matches for this player-season
        player_matches = group['match_id'].unique()
        num_matches = len(player_matches)
            
        # Perform bootstrap iterations for this player-season
        for i in range(n_bootstrap):
            bootstrap_match_ids = np.random.choice(player_matches, size=num_matches, replace=True)
            bootstrap_data = pd.concat([group[group['match_id'] == match_id] for match_id in bootstrap_match_ids])
            
            # Calculate the metric sum for this bootstrap sample
            metric_sum = bootstrap_data[metric].sum()
            bootstrap_results[key].append(metric_sum)
    
    # Calculate variance for each player-season
    player_variances = {}
    for key, values in bootstrap_results.items():
        if len(values) > 1:  # Only calculate variance if we have multiple samples
            player_variances[key] = np.var(values)
    
    return np.mean(list(player_variances.values()))

# Compute and print out metric season-to-season stabiity
def metric_stability(season_df, metric):
    player_vars = season_df.groupby('player')[metric].var()
    total_var = season_df[metric].var()
    print(f'Season-to-season Stability: {1 - player_vars.mean() / total_var}')

In [None]:
# Compare 2 same length lists of rankings and output the spearman rank correlation and kendall tau distance
def compare_rankings(rankings1, rankings2):
    if len(rankings1) != len(rankings2):
        raise ValueError("Both rankings must have the same length.")

    spearman_corr, _ = spearmanr(rankings1, rankings2)
    kendall_tau, _ = kendalltau(rankings1, rankings2)

    return {
        "Spearman": spearman_corr,
        "Kendall Tau": kendall_tau,
    }

In [None]:
# Grouping together positions
position_groups = {
    'Forwards': [
        'Center Forward',
        'Left Center Forward', 
        'Right Center Forward',
    ],
    
    'Attacking Midfielders': [
        'Center Attacking Midfield',
        'Left Attacking Midfield',
        'Right Attacking Midfield',
        'Left Wing',
        'Right Wing'
    ],
    
    'Central Midfielders': [
        'Right Midfield',
        'Right Center Midfield',
        'Center Midfield',
        'Left Center Midfield',
        'Left Midfield'
    ],
    
    'Defensive Midfielders': [
        'Center Defensive Midfield',
        'Left Defensive Midfield',
        'Right Defensive Midfield',
        'Left Wing Back',
        'Right Wing Back'
    ],
    
    'Backs': [
        'Left Back',
        'Right Back',
        'Center Back',
        'Left Center Back',
        'Right Center Back',
    ],
    'Goalkeeper': ['Goalkeeper']
}

# Create a mapping from individual position to group
position_to_group = {}
for group, positions in position_groups.items():
    for position in positions:
        position_to_group[position] = group

# Color scheme that follows field positioning logic
group_colors_position = {
    'Backs': '#0000FF',             # Blue
    'Defensive Midfielders': '#4169E1',  # Royal Blue
    'Central Midfielders': '#008000',    # Green
    'Attacking Midfielders': '#32CD32',  # Lime Green
    'Forwards': '#FF0000',           # Red
    'Goalkeeper': '#FFA500'         # Orange
}

In [None]:
# Grouping together various event types
event_groups = {
    'Attacking Actions': [
        'Carry', 
        'Pass', 
        'Shot'
    ],
    
    'Defensive Actions': [
        'Pressure', 
        'Block', 
        'Interception', 
        'Clearance', 
        'Shield'
    ],
    
    'Other Actions': [
        'Goal Keeper',
        '50/50'
    ],
    
    'Fouls & Offenses': [
        'Foul Committed',
        'Yellow Card',
        'Second Yellow',
        'Red Card',
        'Offside'
    ]
}

# Create a mapping from individual event to group
event_to_group = {}
for group, events in event_groups.items():
    for event in events:
        event_to_group[event] = group

# Color scheme
group_colors_event = {
    'Attacking Actions': '#FF0000',      # Red
    'Defensive Actions': '#0000FF',      # Blue
    'Other Actions': '#FFA500',     # Orange
    'Fouls & Offenses': '#8B0000'      # Dark Red
}

In [None]:
def metric_distributions(df, metric, feature, use_groups=False, show_counts=False, plot_type='violinplot'):
    """
    Creates boxplots or violin plots of metrics by feature with optional grouping.
    Parameters:
    - df: DataFrame with the data
    - metric: Column name for the metric to plot
    - feature: Column name for the feature (e.g., 'position', 'event_type')
    - use_groups: Whether to use predefined groups for visualization
    - show_counts: Whether to display observation counts (default: False)
    - plot_type: Type of plot to use ('boxplot' or 'violinplot', default: 'boxplot')
    """
    data = df.dropna(subset=[metric, feature])
    means = df.groupby(feature)[metric].mean()
    print(f'f={feature} Stability: {1 - np.var(means) / np.var(data[metric])}')
    
    if use_groups:
        # Make a copy to avoid SettingWithCopyWarning
        data = data.copy()
        
        # Select appropriate mapping and order based on group_type
        if 'position' in feature.lower():
            mapping = position_to_group
            group_column = 'position_group'
            group_order = ['Goalkeeper', 'Backs', 'Defensive Midfielders', 
                          'Central Midfielders', 'Attacking Midfielders', 'Forwards']
            groups_dict = position_groups
            colors_dict = group_colors_position
        elif feature.lower() == 'type':
            mapping = event_to_group
            group_column = 'event_group'
            group_order = ['Other Actions', 'Defensive Actions', 
                          'Attacking Actions', 'Fouls & Offenses']
            groups_dict = event_groups
            colors_dict = group_colors_event
        else:
            raise ValueError("group_type must be 'position' or 'event'")
            
        # Set up groups
        data[group_column] = data[feature].map(mapping)
        group_order = [group for group in group_order if group in data[group_column].unique()]
        items_in_groups = {}
        colors_by_item = {}
        
        for group in group_order:
            items = [item for item in groups_dict[group] 
                    if item in data[feature].unique()]
            items_in_groups[group] = items
            # Assign the group color to each item
            for item in items:
                colors_by_item[item] = colors_dict[group]
        
        # Create a sorted list of all items to display
        all_items = []
        for group in group_order:
            all_items.extend(items_in_groups[group])
        
        # Create plot with ordered items
        plt.figure(figsize=(14, 8))
        if plot_type.lower() == 'violinplot':
            ax = sns.violinplot(x=feature, y=metric, data=data, 
                               order=all_items, palette=colors_by_item)
        else:
            ax = sns.boxplot(x=feature, y=metric, data=data, 
                            order=all_items, palette=colors_by_item)
        
        # Add observation count labels if show_counts is True
        if show_counts:
            y_max_global = data[metric].max()
            for i, item in enumerate(all_items):
                count = len(data[data[feature] == item])
                formatted_count = f"{count:,}"
                
                # Set text position: use either group max or a fixed position above global max
                y_max = data[data[feature] == item][metric].max()
                text_y = max(y_max * 1.05, y_max_global * 1.02, ax.get_ylim()[1]*0.92)
                ax.text(i, text_y, f"n={formatted_count}", 
                       ha='center', va='bottom', color='black', fontweight='bold', fontsize=9)
        
        # Add visual group separators
        curr_pos = 0
        for i, group in enumerate(group_order):
            if i > 0: # No line before the first group
                plt.axvline(x=curr_pos - 0.5, color='gray', linestyle='--', alpha=0.7)
            curr_pos += len(items_in_groups[group])
        # Add a legend for groups
        from matplotlib.patches import Patch
        legend_elements = [Patch(facecolor=colors_dict[group], edgecolor='black', label=group) 
                          for group in group_order]
        plt.legend(handles=legend_elements, loc='upper right')
        
    else: # No groups case
        plt.figure(figsize=(12, 6))
        if plot_type.lower() == 'violinplot':
            ax = sns.violinplot(x=feature, y=metric, data=data)
        else:
            ax = sns.boxplot(x=feature, y=metric, data=data)
        
        # Add observation count labels for non-grouped version if show_counts is True
        if show_counts:
            y_max_global = data[metric].max()
            for i, item in enumerate(sorted(data[feature].unique())):
                count = len(data[data[feature] == item])
                formatted_count = f"{count:,}"
                
                y_max = data[data[feature] == item][metric].max()
                text_y = max(y_max * 1.05, y_max_global * 1.02, ax.get_ylim()[1]*0.92)   
                ax.text(i, text_y, f"n={formatted_count}", 
                       ha='center', va='bottom', color='black', fontweight='bold', fontsize=9)
    
    plt.xticks(rotation=90)
    plt.title(f'{metric} by {feature}')
    plt.xlabel(feature)
    plt.ylabel(metric)
    plt.tight_layout()
    plt.show()
    
    return ax

# Prepare Data for Evaluation

In [None]:
# Load in data
df = pd.read_csv("data/processed_events.csv")

In [None]:
# Clean data
keep_columns = ['season', 'match_id', 'id', 'ts_minute', 'minute', 'second', 'location', 'zone', 'type', 'possession', 'possession_team', 'possession_team_id', 'play_pattern', 'team', 'team_id', 'player', 'player_id', 'position', 'obv_for_net', 'obv_against_net', 'obv_total_net', 'duration', 'under_pressure', 'substitution_replacement', 'substitution_replacement_id', 'tactics', 'shot_outcome', 'shot_statsbomb_xg', 'pass_goal_assist']
df = df[keep_columns] 
df['obv_for_net'] = df['obv_for_net'].astype(float)
df['obv_against_net'] = df['obv_against_net'].astype(float)
df['obv_total_net'] = df['obv_total_net'].astype(float)

In [None]:
# Read in metrics
train_metrics = pd.read_csv("data/train_metrics.csv")
actions = train_metrics

# # Alternatively, read in train and test:
# test_metrics = pd.read_csv("data/test_metrics.csv") 
# actions = pd.concat([train_metrics, test_metrics]).reset_index(drop=True)

# Filter metrics data
cols_to_use = ['id', 'season', 'zone', 'shot_outcome', 'shot_statsbomb_xg', 'pass_goal_assist']
actions = pd.merge(actions, df[cols_to_use], on='id', how='inner')

In [None]:
# Specify the metrics of interest
metrics = ['VAEP', 'GPA', 'WPA', 'WPA2']

# Compute stats for each unique (match, player)
game_stats = actions.groupby(['season', 'match_id', 'player', 'team']).agg(
    {**{metric: 'sum' for metric in metrics}, **{f'{metric}_norm': 'sum' for metric in metrics}}
).reset_index()

In [None]:
# Identify entry times of starters and substitutes
def parse_lineup(row, event_type):
    if row['type'] == event_type:
        tactics_data = ast.literal_eval(row['tactics'])
        lineup = tactics_data.get('lineup', [])
        parsed_data = [
            {
                'match_id': row['match_id'],
                'player': player['player']['name'],
                'player_id': player['player']['id'],
                'position': player['position']['name'],
                'team': row['team'],
                'ts_minute': row['ts_minute']  # Starting players enter at the start of the match
            }
            for player in lineup
        ]
        return parsed_data
    return []

# Apply parsing to extract all starting lineups
starting = df.apply(parse_lineup, args=("Starting XI", ), axis=1).explode().dropna()
starting_df = pd.DataFrame(starting.tolist()) 
starting_df = starting_df.rename(columns={'ts_minute': 'entry_time'})

sub_in_df = df[df['type'] == 'Substitution']
sub_in_df = sub_in_df.drop(columns=['player'])
sub_in_df = sub_in_df.rename(columns={'substitution_replacement': 'player', 'ts_minute': 'entry_time'})
entry_times = pd.concat([starting_df.reset_index(drop=True), sub_in_df.reset_index(drop=True)])[['player', 'match_id', 'entry_time']]

# Identify exit times 
sub_out_df = df[df['type'] == 'Substitution']
sub_out_df = sub_out_df.rename(columns={'ts_minute': 'exit_time'})
match_end_df = df.groupby('match_id')['ts_minute'].max().reset_index()
match_end_df = match_end_df.rename(columns={'ts_minute': 'exit_time'})
# Assign match end times as the default exit time for all players
default_exit_times = entry_times[['match_id', 'player']].merge(match_end_df, on='match_id', how='left')
exit_times = pd.concat([sub_out_df, default_exit_times]).drop_duplicates(subset=['match_id', 'player'], keep='first')[['player', 'match_id', 'exit_time']]

# Merge entry and exit times
play_times = pd.merge(entry_times, exit_times, on=['match_id', 'player'], how='inner')
# convert times to timestamps
play_times['minutes_played'] = (play_times['exit_time'] - play_times['entry_time'])

In [None]:
# Find all tactical shifts and extract the players involved
shifts = df.apply(parse_lineup, args=("Tactical Shift", ), axis=1).explode().dropna()
shifts_df = pd.DataFrame(shifts.tolist()) 

positions = pd.concat([shifts_df, df], axis=0).dropna(subset=['position'])
positions = positions[positions['position'] != 'Substitute'] # accounting for extra data (bad behavior)

In [None]:
# Identify times of position changes
positions = positions.sort_values(by=['match_id', 'player', 'ts_minute']).drop_duplicates(subset=['match_id', 'player', 'position'], keep='first').reset_index(drop=True)
positions['next_ts_minute'] = positions.groupby(['match_id', 'player'])['ts_minute'].shift(-1)
player_exit_times = play_times[['match_id', 'player', 'exit_time']].copy()
positions = positions.merge( 
    player_exit_times[['match_id', 'player', 'exit_time']],
    on=['match_id', 'player'], how='left'
)

# Calculate durations
positions['duration'] = (positions['next_ts_minute']
                  .fillna(positions['exit_time']) - positions['ts_minute'])

# Aggregate unique positions and durations
positions = positions.groupby(['match_id', 'player']).agg({
    'position': lambda x: list(x),
    'duration': lambda x: list(x) 
}).reset_index()
# Rename columns for clarity
positions = positions.rename(columns={'position': 'positions', 'duration': 'durations'})

# Create column for the element in the positions column with the longest associated duration, also store that duration. Then make a column with the length of positions
positions['duration_position_primary'] = positions.apply(lambda x: max(x['durations']), axis=1)
positions['position_primary'] = positions.apply(lambda x: x['positions'][x['durations'].index(x['duration_position_primary'])], axis=1)
positions['num_positions'] = positions['positions'].apply(len)

In [None]:
# Get number of shots, number of goals, total XG for each (match_id, player) pair
shots = actions[actions['type'] == 'Shot']
goals = shots[shots['shot_outcome'] == 'Goal'].groupby(['match_id', 'player']).size().reset_index(name='goals')
shots_count = shots.groupby(['match_id', 'player']).size().reset_index(name='shots')
shots_count = pd.merge(shots_count, goals, on=['match_id', 'player'], how='left')
shots_count['goals'] = shots_count['goals'].fillna(0).astype(int)
# Sum up shot_statsbomb_xg for each (match_id, player)
xg = shots.groupby(['match_id', 'player'])['shot_statsbomb_xg'].sum().reset_index(name='total_xg')
shots_count = pd.merge(shots_count, xg, on=['match_id', 'player'], how='left')

# Compute assist stats
passes = actions[actions['type'] == 'Pass']
assists = passes[passes['pass_goal_assist'] == True].groupby(['match_id', 'player']).size().reset_index(name='assists')
shots_count = pd.merge(shots_count, assists, on=['match_id', 'player'], how='left')
shots_count['assists'] = shots_count['assists'].fillna(0).astype(int)

In [None]:
# Merge player minutes data onto original data
game_stats = pd.merge(game_stats, play_times, on=['match_id', 'player'], how='left')
game_stats = pd.merge(game_stats, positions, on=['match_id', 'player'], how='left')
game_stats = pd.merge(game_stats, shots_count, on=['match_id', 'player'], how='left')
game_stats['shots'] = game_stats['shots'].fillna(0)
game_stats['goals'] = game_stats['goals'].fillna(0)
game_stats['assists'] = game_stats['assists'].fillna(0)
game_stats['total_xg'] = game_stats['total_xg'].fillna(0)

In [None]:
# Normalize the metric columns by minutes played
for metric in metrics:
    game_stats[f'{metric}_per_90'] = game_stats[metric] / game_stats['minutes_played'] * 90
    game_stats[f'{metric}_norm_per_90'] = game_stats[f'{metric}_norm'] / game_stats['minutes_played'] * 90

In [None]:
# Aggregate season stats dynamically
season_stats = game_stats.groupby(['season', 'player']).agg(
    {**{metric: 'sum' for metric in metrics}, **{f'{metric}_norm': 'sum' for metric in metrics}, **{'minutes_played': 'sum', 'shots': 'sum', 'goals': 'sum', 'assists': 'sum', 'total_xg': 'sum'}}
).reset_index()

# Add a column for position_primary based on the highest total duration_position_primary
position_time = game_stats.groupby(['season', 'player', 'position_primary'])['duration_position_primary'].sum().reset_index()
primary_position_season = position_time.loc[
    position_time.groupby(['season', 'player'])['duration_position_primary'].idxmax()
]
primary_position_season = primary_position_season.rename(columns={'duration_position_primary': 'minutes_position_primary'})
season_stats = pd.merge(season_stats, primary_position_season, on=['season', 'player'], how='left')

# Dynamically create per 90-minute statistics for all relevant metrics
for metric in metrics:
    season_stats[f'{metric}_per_90'] = season_stats[metric] / season_stats['minutes_played'] * 90
    season_stats[f'{metric}_norm_per_90'] = season_stats[f'{metric}_norm'] / season_stats['minutes_played'] * 90

    # Add metric contributions from each event type dynamically
    for event_type in ['Shot', 'Pass', 'Dribble', 'Interception', 'Block']:
        obv_by_event = actions[actions['type'] == event_type].groupby(['season', 'player']).agg({metric: 'sum'}).reset_index()
        obv_by_event = obv_by_event.rename(columns={metric: f'{metric}_{event_type}'})
        
        season_stats = pd.merge(season_stats, obv_by_event, on=['season', 'player'], how='left')
        season_stats[f'{metric}_{event_type}'] = season_stats[f'{metric}_{event_type}'].fillna(0)
        season_stats[f'{metric}_{event_type}_per_90'] = season_stats[f'{metric}_{event_type}'] / season_stats['minutes_played'] * 90

In [None]:
# # Save the game and season stats
# game_stats.to_csv('data/train_game_stats.csv', index=False)
# season_stats.to_csv('data/train_season_stats.csv', index=False)

# Perform Evaluation

In [None]:
# Top players
game_threshold = 30
season_threshold = 500
game = game_stats[game_stats['minutes_played'] >= game_threshold]
season = season_stats[season_stats['minutes_played'] >= season_threshold]
metric_base = 'WPA'
metric = f'{metric_base}_per_90'

print("Top individual performances:") # overall best performances
display(top_players(game, metric)[['season', 'match_id', 'player', 'position_primary', metric]].head(10))
print("Top season players:")
for s in season['season'].unique(): # statistical leaders for each season
    print(f"Season {s}")
    display(top_players(season[season['season'] == s], metric)[['player', 'position_primary', metric]].head(10))

    print(f"Compared to goals: {compare_rankings(season.loc[season['season'] == s, metric].rank(ascending=False), season.loc[season['season'] == s, 'goals'].rank(ascending=False))}")
    print(f"Compared to goals + assists: {compare_rankings(season.loc[season['season'] == s, metric].rank(ascending=False), (season.loc[season['season'] == s, 'goals'] + season.loc[season['season'] == s, 'assists']).rank(ascending=False))}")
    print(f"Compared to Total xG: {compare_rankings(season.loc[season['season'] == s, metric].rank(ascending=False), season.loc[season['season'] == s, 'total_xg'].rank(ascending=False))}")

# Compute metric stability and distributions
metric_stability(game, season, metric)
metric_distributions(actions, metric_base, 'position', use_groups=True)
metric_distributions(actions, metric_base, 'type', use_groups=True, plot_type='violinplot')
metric_distributions(actions, metric_base, 'goal_diff_start', show_counts=True, plot_type='violinplot')
print(f"Game-Level {metric} by Position")
metric_distributions(game, metric, 'position_primary', use_groups=True)

In [None]:
# Create a heatmap of the metric by zone
def create_zone_heatmap(df, metric_column):
    # Pitch setup
    field_length = 120 
    field_width = 80 
    horizontal_zones = 5
    vertical_zones = 7
    horizontal_lines = np.linspace(0, field_width, horizontal_zones + 1)
    vertical_lines = np.linspace(0, field_length, vertical_zones + 1)
    
    # Aggregate the metric by zone
    zone_metrics = df.groupby('zone')[metric_column].median().reset_index()
    heatmap_data = np.zeros((horizontal_zones, vertical_zones))
    for _, row in zone_metrics.iterrows():
        zone = row['zone']
        i, j = map(int, zone.replace('Zone ', '').split('-'))
        # Convert to 0-based indexing
        i -= 1
        j -= 1
        heatmap_data[i, j] = row[metric_column]
    
    # Create a pitch using mplsoccer
    pitch = Pitch(pitch_type='custom', pitch_length=field_length, pitch_width=field_width, 
                  line_color='black', line_zorder=2)
    fig, ax = pitch.draw(figsize=(12, 8))
    X, Y = np.meshgrid(vertical_lines, horizontal_lines)
    # Create a custom colormap (blue to red)
    cmap = plt.cm.RdYlBu_r  # Red-Yellow-Blue colormap reversed (high=red, low=blue)
    # Create the heatmap
    mesh = ax.pcolormesh(X, Y, heatmap_data, cmap=cmap, alpha=0.5, zorder=1)
    # Add a colorbar
    cbar = plt.colorbar(mesh, ax=ax)
    cbar.set_label(metric_column)
    
    # Add only the metric values in the cells
    for i in range(horizontal_zones):
        for j in range(vertical_zones):
            # Calculate center of the zone
            x_center = (vertical_lines[j] + vertical_lines[j + 1]) / 2
            y_center = (horizontal_lines[i] + horizontal_lines[i + 1]) / 2
            # Add text with the metric value
            value = heatmap_data[i, j]
            ax.text(x_center, y_center, f"{value:.2f}", ha='center', va='center', 
                   fontsize=9, color='black', fontweight='bold', zorder=3)
    
    # Add row labels (first number in zone) on the y-axis
    for i in range(horizontal_zones):
        # Calculate the middle of each row
        y_pos = (horizontal_lines[i] + horizontal_lines[i + 1]) / 2
        # Add text on the left side of the pitch
        ax.text(-3, y_pos, f"{i+1}", ha='right', va='center', fontsize=10)
    
    # Add column labels (second number in zone) on the x-axis
    for j in range(vertical_zones):
        # Calculate the middle of each column
        x_pos = (vertical_lines[j] + vertical_lines[j + 1]) / 2
        # Add text below the pitch
        ax.text(x_pos, field_width + 5, f"{j+1}", ha='center', va='bottom', fontsize=10)
    
    # Title and display
    ax.set_title(f"{metric_column} Heatmap by Zone", fontsize=16)
    plt.gca().invert_yaxis()  # Ensure (0,0) is at the top-left
    
    return fig, ax

# Create the heatmap
fig, ax = create_zone_heatmap(actions, metric_base)
plt.show()

In [None]:
def compare_metrics(df, metric1, metric2, data_type='season'):
    # Compute Pearson correlation coefficient
    corr_coef, _ = pearsonr(df[metric1], df[metric2])
    
    plt.figure(figsize=(10, 6))
    # Choose scatter plot type based on data type
    if data_type == 'season':
        sns.scatterplot(data=df, x=metric1, y=metric2)
    else:
        # Bin scatter plot for game stats and actions
        sns.regplot(data=df, x=metric1, y=metric2, 
                    scatter_kws={'alpha':0.5}, 
                    line_kws={'color': 'red'}, 
                    x_bins=10) 

    plt.text(
        x=df[metric1].min(),
        y=df[metric2].max(),
        s=f"Pearson r = {corr_coef:.2f}",
        fontsize=12,
        color="red"
    )
    # Add labels and grid
    plt.title(f"{metric2} vs. {metric1}")
    plt.xlabel(metric1)
    plt.ylabel(metric2)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

compare_metrics(game_stats, 'GPA', 'WPA', data_type='season') # player-match level
compare_metrics(actions, 'GPA', 'WPA', data_type='game') # action level

In [None]:
### Examine specific cases: looking at extreme values

# Condense displayed information
detail_cols = ['ts_minute', 'ts_end', 'team', 'possession', 'season', 'player', 'obv_total_net','GPA', 'WPA', 'GPA_WPA_diff', 'position', 'type', 'team_score', 'opponent_score', 'goal_contribution', 'outcome']

# Helper function for showing extreme action sequences
def display_extreme_metrics(metric, actions=actions, condensed=False):
    extreme = actions[detail_cols].sort_values(by=metric, ascending=False)
    if condensed:
        extreme = extreme[[metric, 'type', 'position', 'ts_minute', 'team_score', 'opponent_score', 'outcome']]
    print('Top')
    display(extreme.head(10))
    print('Bottom')
    display(extreme.tail(10))

In [None]:
### Set up two filters of the actions to avoid letting unreasonable outliers impact analysis

# Helper functions
def winsorize_actions(actions, metrics, lower, upper):
    new_actions = actions.copy()
    for metric in metrics:
        lower_limit = actions[metric].quantile(lower)
        upper_limit = actions[metric].quantile(upper)
        new_actions[metric] = actions[metric].clip(lower=lower_limit, upper=upper_limit)
    return new_actions
def compute_diff(actions):
    actions['GPA_WPA_diff'] = actions['GPA'] - actions['WPA']
    return actions

# Filter for actions starting on a new possession
filtered_actions = compute_diff(actions[actions[['match_id', 'possession']].duplicated(keep='first')].reset_index(drop=True))
# Winsorized actions, shaving off 2.5 percent of top and bottom extreme values
new_actions = compute_diff(winsorize_actions(actions=filtered_actions, metrics=['GPA', 'WPA'], lower=0.025, upper=0.975))

In [None]:
# Analyze the extreme values
display_extreme_metrics('GPA', actions=filtered_actions, condensed=True)
display_extreme_metrics('WPA', actions=filtered_actions, condensed=True)
display_extreme_metrics('GPA_WPA_diff', actions=new_actions, condensed=False)

In [None]:
# Helper function for plotting a specific sequence of actions
def plot_soccer_actions(actions_df, metric_column, title):
    """
    Plot soccer actions on a pitch with team-colored arrows indicating movement and metric values.
    Rotates coordinates to ensure consistent representation.
    """
    # Pitch setup
    PITCH_LENGTH = 120
    PITCH_WIDTH = 80
    pitch = Pitch(pitch_type='statsbomb')
    fig, ax = pitch.draw(figsize=(16, 11))
    ax.set_title(title, fontsize=20, color='black', fontweight='bold')

    teams = actions_df['team'].unique()
    # Default color palette (mplsoccer's standard colors)
    default_colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', 
                      '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']
    team_colors = {team: default_colors[i % len(default_colors)] 
                   for i, team in enumerate(teams)}
    
    # Track existing label positions to avoid overlap
    existing_labels = []
    
    def rotate_coordinates(x, y, team):
        """
        Rotate coordinates so that all teams attack from left to right
        """
        # If the team's goal is on the right side, rotate 180 degrees
        if len(teams) > 1 and team == teams[1]:  # Assumes second team attacks right
            # Rotate around pitch midpoint
            rotated_x = PITCH_LENGTH - x
            rotated_y = PITCH_WIDTH - y
            return rotated_x, rotated_y
        return x, y
    
    def is_overlapping(x, y, existing_labels, min_distance=2):
        """Check if a new label position overlaps with existing labels"""
        for ex, ey in existing_labels:
            if ((x - ex)**2 + (y - ey)**2)**0.5 < min_distance:
                return True
        return False
    
    # Plot each action
    for i, action in enumerate(actions_df.iterrows(), 1):
        action = action[1]
        # Determine coordinate transformation based on team
        team = action['team']
        start_x = action['location_x']
        start_y = action['location_y']
        end_x = action['end_location_x']
        end_y = action['end_location_y']  
        # Rotate coordinates
        start_x, start_y = rotate_coordinates(start_x, start_y, team)
        end_x, end_y = rotate_coordinates(end_x, end_y, team)
        
        # Plot the metric values in the right colors
        metric_value = action[metric_column]
        line_color = team_colors[team]
        pitch.arrows(start_x, start_y, end_x, end_y, 
                     width=2, headwidth=10, headlength=10, 
                     color=line_color, alpha=0.7, ax=ax)
        # Add sequence numbers
        ax.text(start_x, start_y, str(i), 
                color='white', 
                fontweight='bold', 
                fontsize=8,
                bbox=dict(facecolor='black', alpha=0.7, edgecolor='none'),
                ha='center', va='center')
        
        # Try to avoid overlapping text
        mid_x = (start_x + end_x) / 2
        mid_y = (start_y + end_y) / 2
        # Different offset directions to avoid overlap
        offsets = [
            (2, 0),   # right
            (-2, 0),  # left
            (0, 2),   # up
            (0, -2)   # down
        ]
        label_placed = False
        for dx, dy in offsets:
            test_x = mid_x + dx
            test_y = mid_y + dy
            if not is_overlapping(test_x, test_y, existing_labels):
                # Place label and add to existing labels
                ax.text(test_x, test_y, 
                        f'{metric_value:.2f}', 
                        color='white', 
                        fontweight='bold', 
                        fontsize=8,
                        bbox=dict(facecolor=line_color, alpha=0.7, edgecolor='none'),
                        ha='center', va='center')
                existing_labels.append((test_x, test_y))
                label_placed = True
                break
        # If all positions are taken, skip this label or use a fallback
        if not label_placed:
            print(f"Warning: Could not place label for action {i}")
    
    # Add a legend for teams
    legend_elements = [plt.Line2D([0], [0], color=color, lw=4, label=team)
                       for team, color in team_colors.items()]
    fig.legend(handles=legend_elements, loc='lower center', 
               bbox_to_anchor=(0.5, 0),  # Position below the plot
               ncol=len(teams),
               facecolor='white', edgecolor='black', 
               title='Teams')
    # Adjust layout to make room for the legend
    plt.tight_layout()
    
    return fig, ax

In [None]:
# Define the indices of action sequence of interest
start_action = 0 
end_action = 1

# Plot the actions
fig, ax = plot_soccer_actions(filtered_actions.iloc[start_action:end_action], metric_column='GPA', title='Case Study')
plt.show()