In [None]:
! pip install statsbombpy

In [2]:
from statsbombpy import sb
import pandas as pd
import numpy as np
import statsmodels.api as sm
import statsmodels.formula.api as smf
from collections import Counter
import pickle
from datetime import datetime
from datetime import date
import warnings
warnings.filterwarnings('ignore')

In [3]:
competitions = sb.competitions()
competitions.loc[lambda x: x['country_name'] == 'England']

matches = sb.matches(competition_id=2, season_id=27)

warnings.filterwarnings('ignore')
events = pd.concat([sb.events(match_id=x) for x in matches['match_id']])

match_date = matches[['match_id','match_date']]
events = events.merge(match_date,on='match_id',how='left')
events['new_date'] = events['match_date'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d').date())
events['league half'] = events['new_date'] > date(2015,12,31)

In [4]:
unique_match_counts = events.groupby('player')['match_id'].nunique().reset_index()
unique_match_counts = unique_match_counts.rename(columns={"match_id":"num_match"})
unique_match_counts = unique_match_counts[unique_match_counts['num_match'] > 30]

In [5]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [6]:
def calculate_shot_credit(row):
    # Calculate distance to the closest goal
    distance_to_goal = min(
        np.sqrt((row['loc_x'] - 0)**2 + (row['loc_y'] - 40)**2),
        np.sqrt((row['loc_x'] - 120)**2 + (row['loc_y'] - 40)**2)
    )

    xG = row.get('shot_statsbomb_xg')
    if distance_to_goal > 25:
        if row['shot_outcome'] == 'Goal':
            return (1-xG) * 1.55
        else:
            return -xG * 0.25
    else:
        if row['shot_outcome'] == 'Goal':
            return (1-xG) * 1.25
        else:
            return -xG * 0.5

In [7]:
def calculate_credit_multiplier(row):
    if row['type'] != 'Pass':
        return 1, 0  # No distribution needed for non-pass actions
    if row['under_pressure'] and row['under_pressure_next']:
        return (0.5, 0.5)  # Both passer and recipient under pressure
    if row['under_pressure']:
        return (0.75, 0.25)  # Only passer under pressure
    if row['under_pressure_next']:
        return (0.25, 0.75)  # Only recipient under pressure
    return (0.5, 0.5)  # Default distribution if none are under pressure

In [8]:
def calculate_shot_credit_vectorized(df):
    # Calculate distance to the closest goal
    distance_to_goal_1 = np.sqrt((df['start_loc_x'] - 0)**2 + (df['start_loc_y'] - 40)**2)
    distance_to_goal_2 = np.sqrt((df['start_loc_x'] - 120)**2 + (df['start_loc_y'] - 40)**2)
    distance_to_goal = np.minimum(distance_to_goal_1, distance_to_goal_2)

    # Calculate shot credit
    xG = df.get('shot_statsbomb_xg') - df.get('diff_goal_conceded')
    shot_credit = np.where(distance_to_goal > 25,
                           np.where(df['shot_outcome'] == 'Goal', (df['shot_statsbomb_xg'] - df['diff_goal_conceded']) * 1.15, (df['shot_statsbomb_xg'] - df['diff_goal_conceded'])*0.85),
                           np.where(df['shot_outcome'] == 'Goal', (df['shot_statsbomb_xg'] - df['diff_goal_conceded']), df['shot_statsbomb_xg'] - df['diff_goal_conceded']))
    return shot_credit

In [9]:
def VAEP(event):
  events_spadl = (
      events
          # By default, events are not sorted by index, and this is a requirement for us
          .sort_values(['match_id', 'index'])
          .assign(
  #            start_time=lambda x: [60 * int(y[3:5]) + float(y[6:]) for y in x['timestamp']],
  #            end_time=lambda x: x['start_time'] + np.where(x['duration'].isna(), 0, x['duration']),
              start_loc_x=lambda x: x['location'].apply(lambda y: y[0] if isinstance(y, list) else np.nan),
              start_loc_y=lambda x: x['location'].apply(lambda y: y[1] if isinstance(y, list) else np.nan),
              end_location=lambda x: np.select(
                  condlist=[x['type'] == 'Pass', x['type'] == 'Carry', x['type'] == 'Shot', x['type'] == 'Goalkeeper'],
                  choicelist=[x['pass_end_location'], x['carry_end_location'], x['shot_end_location'], x['goalkeeper_end_location']],
                  default=x['location'],
              ),
              end_loc_x=lambda x: x['end_location'].apply(lambda y: y[0] if isinstance(y, list) else np.nan),
              end_loc_y=lambda x: x['end_location'].apply(lambda y: y[1] if isinstance(y, list) else np.nan),
  #            body_part=lambda x: np.select(
  #                condlist=[x['type'] == 'Pass', x['type'] == 'Shot', x['type'] == 'Clearance', x['type'] == 'Goalkeeper'],
  #                choicelist=[x['pass_body_part'], x['shot_body_part'], x['clearance_body_part'], x['goalkeeper_body_part']],
  #                default=None,
  #            ),
              # Manually define the success/failure result for the most frequent events.
              # If we had more time, we'd want to handle ALL events.
              result=lambda x: np.select(
                  condlist=[
                      x['type'] == 'Pass',
                      x['type'] == 'Carry',
                      x['type'] == 'Ball Recovery',
                      x['type'] == 'Duel',
                      x['type'] == 'Clearance',
                      x['type'] == 'Dribble',
                      x['type'] == 'Goalkeeper',
                      x['type'] == 'Dispossessed',
                      x['type'] == 'Miscontrol',
                      x['type'] == 'Shot',
                      x['type'] == 'Foul Committed',
                      x['type'] == 'Foul Won',
                      x['type'] == 'Dribbled Past',
                      x['type'] == 'Interception',
                  ],
                  choicelist=[
                      # Pass
                      np.select(
                          condlist=[x['pass_outcome'].isna(), x['pass_outcome'] == 'Incomplete', x['pass_outcome'] == 'Pass Offside'],
                          choicelist=['success', 'failure', 'offside'],
                          default=None
                      ),
                      'success', # Carry
                      'success', # Ball Recovery
                      # Duel
                      np.select(
                          condlist=[x['duel_type'] == 'Aerial Lost', x['duel_outcome'].isin(['Lost In Play', 'Lost Out']), x['duel_outcome'].isin(['Won', 'Success in Play'])],
                          choicelist=['failure', 'failure', 'success'],
                          default=None
                      ),
                      'success', # Clearance
                      np.where(x['dribble_outcome'] == 'Complete', 'success', 'failure'), # Dribble
                      # Goalkeeper
                      np.select(
                          condlist=[x['goalkeeper_type'].isin(['Shot Faced', 'Shot Saved', 'Collected']), x['goalkeeper_outcome'] == 'Claim'],
                          choicelist=['success', 'success'],
                          default=None
                      ),
                      'failure', # Dispossessed
                      'failure', # Miscontrol
                      np.where(x['shot_outcome'] == 'Goal', 'success', 'failure'),    # Shot
                      'failure', # Foul Committed
                      'success', # Foul Won
                      'success', # Dribbled Past
                      np.where(x['interception_outcome'] == 'Won', 'success', 'failure'), # Interception
                  ],
                  default=None
              ),
              # Create a new string that encodes both type and result
              type_result=lambda x: x['type'] + '_' + x['result'],
          )
          # Throw out any events for which the result is NA
          .loc[lambda x: ~x['result'].isna()]
  )
  # Compute outcomes and features for the regression model ----

  k = 10

  data = (
      events_spadl
          # Get the SPADL features for the two events that preceded the event in question
          .assign(
              under_pressure_next=lambda x: x['under_pressure'].shift(-1),
              start_loc_x_1=lambda x: x['start_loc_x'].shift(1),
              start_loc_x_2=lambda x: x['start_loc_x'].shift(2),
              start_loc_y_1=lambda x: x['start_loc_y'].shift(1),
              start_loc_y_2=lambda x: x['start_loc_y'].shift(2),
              end_loc_x_1=lambda x: x['end_loc_x'].shift(1),
              end_loc_x_2=lambda x: x['end_loc_x'].shift(2),
              end_loc_y_1=lambda x: x['end_loc_y'].shift(1),
              end_loc_y_2=lambda x: x['end_loc_y'].shift(2)
          )
          # Compute the outcome variables goals scored and goals conceded
          .assign(
              # Need to re-compute index after we've filtered down to SPADL events
              index_spadl=lambda x: range(x.shape[0]),
              # Set up some helper variables so that we can identify when the next goal happens.
              # First, grab the index, match ID and team ID for each goal (if no goal, this is NaN)
              goal_index_spadl=lambda x: np.where(x['shot_outcome'] == 'Goal', x['index_spadl'], np.nan),
              goal_match_id=lambda x: np.where(x['shot_outcome'] == 'Goal', x['match_id'], np.nan),
              goal_team_id=lambda x: np.where(x['shot_outcome'] == 'Goal', x['team_id'], np.nan),
              # Second, fill new columns upward to find the index, etc., of the next goal after each event
              next_goal_index_spadl=lambda x: x['goal_index_spadl'].fillna(method='bfill'),
              next_goal_match_id=lambda x: x['goal_match_id'].fillna(method='bfill'),
              next_goal_team_id=lambda x: x['goal_team_id'].fillna(method='bfill'),
              is_goal_scored=lambda x: list(
                  map(    # Convert to int instead of logical so that probabilities below reflect success
                      int,
                      (x['next_goal_match_id'] == x['match_id']) &
                          (x['next_goal_team_id'] == x['team_id']) &
                          (x['next_goal_index_spadl'] <= x['index_spadl'] + 10) &
                          (x['period'] == x['period'])
                  )
              ),
              is_goal_conceded=lambda x: list(
                  map(    # Convert to int instead of logical so that probabilities below reflect success
                      int,
                      (x['next_goal_match_id'] == x['match_id']) &
                          ~(x['next_goal_team_id'] == x['team_id']) &
                          (x['next_goal_index_spadl'] <= x['index_spadl'] + 10) &
                          (x['period'] == x['period'])
                  )
              ),
          )
          # Calculate predictive features for outcome models (far fewer than we saw in the VAEP paper)
          .assign(
              end_distance_opp_goal=lambda x: np.sqrt((x['end_loc_x']-120)**2 + (x['end_loc_y'] - 40)**2),
              end_dist=lambda x: np.sqrt((x['end_loc_x']-120)**2 + (x['end_loc_y'] - 40)**2),
              end_distance_own_goal=lambda x: np.sqrt((x['end_loc_x'])**2 + (x['end_loc_y'] - 40)**2),
              start_angle = lambda x: np.arctan((40 - x['start_loc_y'])/(120 - x['start_loc_x'])),
              end_angle = lambda x: np.arctan((40 - x['end_loc_y'])/(120 - x['end_loc_x'])),
              downfield_movement=lambda x: x['end_loc_x'] - x['start_loc_x_2'],
              forward = lambda x: ((x['type'] == "Pass") & (x['end_loc_x'] > x['start_loc_x'])),
          )
  )

  model_goal_scored = (
      smf.glm(
          formula="is_goal_scored ~ type_result + end_distance_opp_goal + downfield_movement + start_angle + end_angle",
          # Exclude goals from the outcome model because they result in a goal 100% of the time
          data=data.loc[lambda x: x['type_result'] != "Shot_success"],
          family=sm.families.Binomial()
      )
          .fit()
  )

  model_goal_conceded = (
      smf.glm(
          formula="is_goal_conceded ~ type_result + end_distance_own_goal + downfield_movement",
          data=data.loc[lambda x: x['type_result'] != "Shot_success"],
          family=sm.families.Binomial()
      )
          .fit()
  )
  duplicate_indices = data[data.index.duplicated(keep=False)]
  data = data.loc[~data.index.duplicated(keep='first')]
  data = data.reset_index(drop=True)

  # Extract predictions from models ----

  # We have to predict non-goal events separately from goal events because goal events
  # were excluded from the training step.
  pred_no_goal = (
      data
          .loc[lambda x: x['type_result'] != 'Shot_success']
          .assign(
              pred_goal_scored=lambda x: model_goal_scored.predict(exog=x),
              pred_goal_conceded=lambda x: model_goal_conceded.predict(exog=x),
          )
  )

  pred_goal = (
      data
          .loc[lambda x: x['type_result'] == 'Shot_success']
          .assign(
              pred_goal_scored=1,     # a goal event ALWAYS leads to a goal scoring
              # We'll assume that a goal event leads to a goal conceded in the next k events at the
              # same rate as non-goal shots (this is probably not a great assumption).
              pred_goal_conceded=pred_no_goal.loc[lambda x: x['type'] == 'Shot']['pred_goal_conceded'].mean(),
          )
  )

  # Put goal event predictions and non-goal event predictions together and calculate the change
  # in goal scored/conceded probabilities which each event.

  pred = (
      pd.concat([pred_goal, pred_no_goal])
          .sort_values(['match_id', 'index'])
          .assign(
              diff_goal_scored=lambda x: np.select(
                  condlist=[x['pass_type'] == 'Kick Off', x['team_id'] == x['team_id'].shift(1), ~(x['team_id'] == x['team_id'].shift(1))],
                  choicelist=[0, x['pred_goal_scored'] - x['pred_goal_scored'].shift(1), x['pred_goal_scored'] - x['pred_goal_conceded']],
                  default=None
              ),
              # For goal conceded probability, we follow similar logic to the above.
              diff_goal_conceded=lambda x: np.select(
                  condlist=[x['pass_type'] == 'Kick Off', x['team_id'] == x['team_id'].shift(1), ~(x['team_id'] == x['team_id'].shift(1))],
                  choicelist=[0, x['pred_goal_conceded'] - x['pred_goal_conceded'].shift(1), x['pred_goal_conceded'] - x['pred_goal_scored']],
                  default=None
              ),
              passer_multiplier=lambda df: df.apply(lambda x: calculate_credit_multiplier(x)[0], axis=1),
              receiver_multiplier=lambda df: df.apply(lambda x: calculate_credit_multiplier(x)[1], axis=1),
              passer_credit=lambda df: df['diff_goal_scored'] * df['passer_multiplier'],
              receiver_credit=lambda df: df['diff_goal_scored'] * df['receiver_multiplier'],
              adjusted_shot_credit=lambda df: df.apply(calculate_shot_credit_vectorized, axis=1),
              vaep=lambda x: x['adjusted_shot_credit'] + x['passer_credit'] + x['receiver_credit'],
              sec_vaep = lambda x: np.select(
                condlist = [x['type'] == 'Pass'],
                choicelist = [(x['diff_goal_scored'] - x['diff_goal_conceded']) - x['vaep']],
                default = 0
              ),

        )
  )

  # Aggregate VAEP by player ----

  # First aggregate by player AND action type in case we want to look at the action-specific value
  # for each player
  player_type_summary = (
      pred
          .groupby(['player_id', 'player', 'type', 'league half','team'])
          .agg(
              count=('player_id', 'count'),
              vaep=('vaep', 'sum')
          )
          .reset_index()
  )

  vaep_team_type_summary = (
      pred
          .groupby(['team'])
          .agg(
              vaep=('vaep', 'sum')
          )
          .reset_index()
  )

  # Now aggregate by player across action types
  player_summary = (
      player_type_summary
          .groupby(['player_id', 'player','team','league half'])
          .agg(
              count=('count', 'sum'),
              vaep=('vaep', 'sum')
          )
          .reset_index()
  )

  # Look at the top overall players according to VAEP
  player_summary.sort_values('vaep', ascending=False)

  sec_player_type_summary = (
    pred
        .groupby(['pass_recipient','league half'])
        .agg(
            sec_count=('pass_recipient', 'count'),
            sec_vaep=('sec_vaep', 'sum')
        )
        .reset_index()
  )

  # Look at the top overall players according to VAEP
  sec_player_type_summary.sort_values('sec_vaep', ascending=False)
  sec_player_type_summary = sec_player_type_summary.rename(columns={"pass_recipient":"player"})

  a = player_summary.merge(sec_player_type_summary,on=["player",'league half'],how='left')
  a['total_vaep'] = np.select(
      condlist=[
          ~(pd.isna(a['vaep']) | pd.isna(a['sec_vaep'])),
          pd.isna(a['vaep']),
          pd.isna(a['sec_vaep'])
      ],
      choicelist=[
          a['vaep'] + a['sec_vaep'],
          a['sec_vaep'],
          a['vaep']
      ],
      default=None
  )
  vaep_first_half = a[a['league half'] == False]
  vaep_second_half = a[a['league half'] == True]
  vaep_reval = vaep_second_half.merge(vaep_first_half,on=['player','player_id','team'], how = 'outer')[['player_id','player','team','vaep_x','vaep_y']]
  vaep_reval['vaep_x'] = vaep_reval['vaep_x'].fillna(0)
  vaep_reval['vaep_y'] = vaep_reval['vaep_y'].fillna(0)
  vaep_reval['vaep'] = vaep_reval['vaep_x'] + vaep_reval['vaep_y']
  vaep_reval = vaep_reval.sort_values(by='vaep',ascending=False).reset_index()
  return vaep_first_half, vaep_second_half, vaep_reval

In [10]:
vaep_first_half, vaep_second_half, player_values_VAEP = VAEP(events)

In [11]:
# Calculate the number of unique games each player played
player_games = events.groupby('player_id')['match_id'].nunique().reset_index()
player_games.columns = ['player_id', 'games_played']
# Assuming each game has 90 minutes
player_games['minutes'] = player_games['games_played'] * 90
player_team = events[['player_id', 'team']].drop_duplicates().reset_index(drop=True)

vaep_df = player_values_VAEP.reset_index(drop=True)
vaep_df.columns = ['index', 'player_id', 'player', 'team', 'vaep_x', 'vaep_y', 'vaep']

merged_with_minutes = pd.merge(vaep_df, player_games, on=['player_id'])

# Calculate VAEP and MRKV per 90 minutes
merged_with_minutes['vaep_per_90'] = merged_with_minutes['vaep'] / merged_with_minutes['games_played']

# Order the dataframe by vaep_per_90 in descending order
output_file = merged_with_minutes[['player_id', 'player', 'team', 'minutes', 'vaep_per_90']].sort_values(by='vaep_per_90', ascending=False)

In [18]:
player_team = events[['player_id', 'team']].drop_duplicates().reset_index(drop=True)

vaep_df = player_values_VAEP.reset_index(drop=True)
vaep_df.columns = ['index', 'player_id', 'player', 'team', 'vaep_x', 'vaep_y', 'vaep']

# Group by team and sum the vaep and mrkv values
vaep_df = vaep_df.groupby('team').agg({'vaep': 'sum'}).reset_index()

goal_df = events[events['shot_outcome'] == 'Goal']

# Group by team and count the number of goals
team_goals = goal_df.groupby('team').size().reset_index(name='#goals')

loss_df = events[events['goalkeeper_type'] == 'Goal Conceded']

# Group by team and count the number of goals
team_loss = loss_df.groupby('team').size().reset_index(name='#goals_conceded')

df_c = pd.merge(vaep_df, team_loss, on=['team'], how='left')
df_c = pd.merge(df_c, team_goals, on=['team'], how='left')
df_c['goal_diff'] = df_c['#goals'] -  df_c['#goals_conceded']

vaep_series = df_c['vaep'].dropna().astype(float)
goal_diff_series = df_c['goal_diff'].dropna()

correlation_vaep_goal_diff = np.corrcoef(vaep_series, goal_diff_series)[0, 1]
print(f"Correlation between VAEP and Goal Differential: {correlation_vaep_goal_diff}")

Correlation between VAEP and Goal Differential: 0.9233773981674361


In [19]:
player_30_vaep = player_values_VAEP.merge(unique_match_counts,on='player',how='right').sort_values(by='vaep',ascending=False).reset_index()
vaep_player_corr = player_30_vaep['vaep_x'].corr(player_30_vaep['vaep_y'])
print(f"Correlation of VAEP between 1st half and 2nd half: {vaep_player_corr}")

Correlation of VAEP between 1st half and 2nd half: 0.6623636327221156
