# Imports

In [39]:
import os
import json

import gandula

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.model_selection import GridSearchCV

# Utils

In [40]:
def euclidean_distance(p1, p2):
    return np.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)

def get_shoot_angle(x, y):
    _x = 52.5 - x
    return np.absolute(np.arctan((7.32 * _x) / (_x ** 2 + y ** 2 - (7.32 / 2)**2)))

def get_angle_from_player(p_ball, p_player):
    p_x = p_player[0]
    p_y = p_player[1]

    x = p_x - p_ball[0]
    y = p_y - p_ball[1]
    
    return np.absolute(np.arctan((1 * x) / (x ** 2 + y ** 2 - (1 / 2)**2)))

def get_triangle_area(pA, pB, pC):
    return np.absolute((pA[0]*(pB[1]-pC[1]) + pB[0]*(pC[1]-pA[1])+ pC[0]*(pA[1]-pB[1]))/2.0)

def isInside(t_p1, t_p2, t_p3, point):
    triangle_area = np.floor(get_triangle_area(t_p1, t_p2, t_p3))

    area_1 = get_triangle_area(point, t_p2, t_p3)
    area_2 = get_triangle_area(t_p1, point, t_p3)
    area_3 = get_triangle_area(t_p1,t_p2, point)

    area_sum = np.floor(area_1 + area_2 + area_3)

    if(area_sum == triangle_area):
        return True
    else:
        return False

def change_location(p):
    x = p[0]
    y = p[1]

    if(x < 0):
        x = x * -1
        y = y * -1
    
    return (x, y)

    

def get_players_location(frame):
    away_players = frame.away_players_with_kalman
    home_players = frame.home_players_with_kalman

    away_locations = []
    home_locations = []

    for player in away_players:
        x = player.x
        y = player.y

        away_locations.append((x, y))

    for player in home_players:
        x = player.x
        y = player.y

        home_locations.append((x, y))

    return home_locations, away_locations

def get_ball_location(frame):
    ball = frame.ball_with_kalman

    if(ball == None):
        return None

    ball_x = ball.x
    ball_y = ball.y

    return (ball_x, ball_y)

def get_closest_player_from_all(home_locations, away_locations, target):
    
    is_home = True
    closest_distance = np.inf
    closest_index = -1

    for i, location in enumerate(home_locations):
        distance = euclidean_distance(location, target)
        if(distance < closest_distance):
            closest_distance = distance
            closest_index = i

    for i, location in enumerate(away_locations):
        distance = euclidean_distance(location, target)
        if(distance < closest_distance):
            closest_distance = distance
            closest_index = i
            is_home = False

    return closest_index, is_home


def get_closest_player(players_location, target):
    
    closest_index = -1
    closest_distance = np.inf

    for i, location in enumerate(players_location):
        distance = euclidean_distance(location, target)
        if(distance < closest_distance):
            closest_distance = distance
            closest_index = i

    return closest_index

# Pre-processing

In [41]:
shot_frames = gandula.loader.read_pickle(
    'data/enhaced_frames_shots.pkl'
)

In [42]:
result = set()
# iterate over all frames to discover what event features are useful
for frame in shot_frames:
    event_features = list(frame['event'].shootingEvent.model_dump(
        exclude_none=True
    ).keys())
    result.update(event_features)

result

{'blockerPlayer',
 'clearerPlayer',
 'createsSpace',
 'deflectorBodyType',
 'deflectorPlayer',
 'keeperTouchType',
 'pressurePlayer',
 'pressureType',
 'saveHeightType',
 'saveReboundType',
 'saverPlayer',
 'shooterPlayer',
 'shotBodyType',
 'shotInitialHeightType',
 'shotNatureType',
 'shotOutcomeType',
 'shotPointX',
 'shotPointY'}

In [43]:
def extract_features(raw_frames):
    """
    Extract features from raw data and returns the pandas DataFrame
    """
    data = []

    for entry in raw_frames:
        
        frame = entry['frame']
        
        # general info
        ball_location = get_ball_location(frame) # ball_location is a tuple (x,y)
        
        # if tracking doesnt provide ball info, get from event data
        if(ball_location == None):
            x_ball = entry['event'].shootingEvent.shotPointX
            y_ball = entry['event'].shootingEvent.shotPointY
            ball_location = (x_ball, y_ball)
            
        home_locations, away_locations = get_players_location(frame)
        
        # change locations to normalize all attacks to onde side of the field
        ball_location = change_location(ball_location)
        home_locations = list(map(change_location, home_locations))
        away_locations = list(map(change_location, away_locations))

        # who is attacking?
        closest_index, home_attacking = get_closest_player_from_all(home_locations, away_locations, ball_location)

        #dummy inits
        goalkeeper_location = (52.5, 0)
        num_blocking_players = 0
        blocking_angle = 0
        
        # true inits
        shoot_angle = get_shoot_angle(ball_location[0], ball_location[1])

        if(home_attacking == True):
            # get goalkeeper location
            goalkeeper_idx = get_closest_player(away_locations, (52.5, 0))
            goalkeeper_location = away_locations[goalkeeper_idx]

            # shoot_angle
            for away_pos in away_locations:
                # get num of blocking players
                if(isInside(ball_location, (52.5, -3.66), (52.5, 3.66), away_pos)):
                    num_blocking_players += 1
                    # get blocking angle
                    blocking_angle += get_angle_from_player(ball_location, away_pos)

        else:
            # get goalkeeper location
            goalkeeper_idx = get_closest_player(home_locations, (52.5, 0))
            goalkeeper_location = home_locations[goalkeeper_idx]

            # shoot_angle
            for home_pos in home_locations:
                # get num of blocking players
                if(isInside(ball_location, (52.5, -3.66), (52.5, 3.66), home_pos)):
                    num_blocking_players += 1
                    # get blocking angle
                    blocking_angle += get_angle_from_player(ball_location, home_pos)
        
        goalkeeper_distance = euclidean_distance(ball_location, goalkeeper_location)
        goalkeeper_angle = get_angle_from_player(ball_location, goalkeeper_location)
        shoot_distance = euclidean_distance(ball_location, (52.5, 0))
        

        # set event features
        event_id            = entry['frame'].event_id
        frame_id            = entry['frame'].frame_id
        outcome             = entry['event'].shootingEvent.shotOutcomeType
        body_part           = entry['event'].shootingEvent.shotBodyType
        save_height         = entry['event'].shootingEvent.saveHeightType
        shot_initial_height = entry['event'].shootingEvent.shotInitialHeightType
        shooter_id          = entry['event'].shootingEvent.shooterPlayer.id
        pressure_type       = entry['event'].shootingEvent.pressureType
        
        row_entry = {
            'event_id' : event_id,
            'frame_id' : frame_id,
            'goalkeeper_distance' : goalkeeper_distance,
            'goalkeeper_angle' : goalkeeper_angle,
            'blocking_angle' : blocking_angle,
            'num_blocking_players' : num_blocking_players,
            'shot_angle' : shoot_angle,
            'shot_distance' : shoot_distance,
            'outcome' : outcome,
            'body_part' : body_part,
            'shot_height' : save_height,
            'shot_initial_height' : shot_initial_height,
            'pressure_type' : pressure_type,
            'shooter_id' : shooter_id,
        }

        data.append(row_entry)

    df = pd.DataFrame(data)
    df = df.drop_duplicates()  

    return df

In [44]:
df = extract_features(shot_frames)
df.head()

Unnamed: 0,event_id,frame_id,goalkeeper_distance,goalkeeper_angle,blocking_angle,num_blocking_players,shot_angle,shot_distance,outcome,body_part,shot_height,shot_initial_height,pressure_type,shooter_id
0,6629878,9282,7.378759,0.118128,0.118128,1,0.496175,13.094506,ShotOutcomeType.ON_TARGET,BodyType.LEFT_FOOT,ShotHeightType.BOTTOM_THIRD,ShotHeightType.BOTTOM_THIRD,,107
1,6631191,20398,15.938412,0.03886,0.121658,2,0.228383,19.829049,ShotOutcomeType.OFF_TARGET,BodyType.RIGHT_FOOT,,ShotHeightType.SHORT,PressureType.PRESSURED,8342
2,6630642,22485,11.912144,0.082103,0.082103,1,0.548898,12.990412,ShotOutcomeType.GOAL,BodyType.RIGHT_FOOT,,ShotHeightType.BOTTOM_THIRD,,8342
3,6629751,36063,34.823735,0.026726,0.102536,3,0.171969,39.209325,ShotOutcomeType.OFF_TARGET,BodyType.LEFT_FOOT,,ShotHeightType.OVER,,8026
4,6630005,41923,29.599079,0.03333,0.03333,1,0.205871,34.401262,ShotOutcomeType.OFF_TARGET,BodyType.RIGHT_FOOT,,ShotHeightType.BOTTOM_THIRD,,8342


In [45]:
# fill na pressure type values
df.loc[df['pressure_type'].isna(), 'pressure_type'] = df.apply(
    lambda row: 'P' if row['num_blocking_players'] >= 4 else 'L',
    axis=1
)

# set the maximum number of blocking players to 7
df['num_blocking_players'] = df['num_blocking_players'].clip(upper=7)

In [46]:
# creating the target and shoot_height columns
df['goal'] = df['outcome'] == 'G'
df['shot_height'] = df['shot_height'].fillna(df['shot_initial_height'])

# drop useless columns
df.drop(columns=['shot_initial_height', 'outcome'], inplace=True)

In [47]:
# encode the categorical features
label_encoder = LabelEncoder()

df['shot_height'] = label_encoder.fit_transform(df['shot_height'])
df['shot_height'].astype('category')

df['pressure_type'] = label_encoder.fit_transform(df['pressure_type'])
df['pressure_type'].astype('category')

df.head()

# 

Unnamed: 0,event_id,frame_id,goalkeeper_distance,goalkeeper_angle,blocking_angle,num_blocking_players,shot_angle,shot_distance,body_part,shot_height,pressure_type,shooter_id,goal
0,6629878,9282,7.378759,0.118128,0.118128,1,0.496175,13.094506,BodyType.LEFT_FOOT,0,1,107,False
1,6631191,20398,15.938412,0.03886,0.121658,2,0.228383,19.829049,BodyType.RIGHT_FOOT,2,2,8342,False
2,6630642,22485,11.912144,0.082103,0.082103,1,0.548898,12.990412,BodyType.RIGHT_FOOT,0,1,8342,True
3,6629751,36063,34.823735,0.026726,0.102536,3,0.171969,39.209325,BodyType.LEFT_FOOT,6,1,8026,False
4,6630005,41923,29.599079,0.03333,0.03333,1,0.205871,34.401262,BodyType.RIGHT_FOOT,0,1,8342,False


In [48]:
# create header column
df['header'] = df['body_part'].isin(
    [
        gandula.providers.pff.schema.event.BodyType.RIGHT_FOOT,
        gandula.providers.pff.schema.event.BodyType.LEFT_FOOT,
        gandula.providers.pff.schema.event.BodyType.RIGHT_KNEE,
        gandula.providers.pff.schema.event.BodyType.LEFT_KNEE,
        gandula.providers.pff.schema.event.BodyType.RIGHT_BACK_HEEL,
        gandula.providers.pff.schema.event.BodyType.LEFT_BACK_HEEL,
        gandula.providers.pff.schema.event.BodyType.RIGHT_THIGH,
        gandula.providers.pff.schema.event.BodyType.LEFT_THIGH,
    ]
)
df['header']

0        True
1        True
2        True
3        True
4        True
        ...  
3994    False
3995    False
3996    False
3997    False
3998    False
Name: header, Length: 1518, dtype: bool

In [10]:
# maintain only shots made with feet
shots_df = df[
    df['body_part'].isin(
        [
            gandula.providers.pff.schema.event.BodyType.RIGHT_FOOT,
            gandula.providers.pff.schema.event.BodyType.LEFT_FOOT,
            gandula.providers.pff.schema.event.BodyType.RIGHT_KNEE,
            gandula.providers.pff.schema.event.BodyType.LEFT_KNEE,
            gandula.providers.pff.schema.event.BodyType.RIGHT_BACK_HEEL,
            gandula.providers.pff.schema.event.BodyType.LEFT_BACK_HEEL,
            gandula.providers.pff.schema.event.BodyType.RIGHT_THIGH,
            gandula.providers.pff.schema.event.BodyType.LEFT_THIGH,
        ]
    )
]

# Model

In [74]:
# separate the features matrix and target vector
non_features = ['event_id', 'frame_id', 'body_part', 'shooter_id', 'goal']

X = df.drop(columns=non_features)
y = df['goal']

# train test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

In [75]:
xgb_model = xgb.XGBClassifier(
    objective='binary:logistic',
    eval_metric='logloss',
    use_label_encoder=False,
)

param_grid = {
    'max_depth': [2, 3, 4, 5],
    'reg_alpha': [0.1, 0.25 ,0.5, 0.75, 1],  # Regularização L1
    'reg_lambda': [1, 2, 5, 7, 10],     # Regularização L2
}

# grid search to find best hiperparameter set
grid_search = GridSearchCV(
    estimator=xgb_model,
    param_grid=param_grid,
    cv=5,  
    scoring='roc_auc',  
    verbose=0,   
    n_jobs=-1,
)

grid_search.fit(X_train, y_train)

# show best hiperparameters
print("Melhores hiperparâmetros:", grid_search.best_params_)

# get best model
best_model = grid_search.best_estimator_

# eval in test set
y_pred_proba = best_model.predict_proba(X_test)[:, 1]
roc_auc = roc_auc_score(y_test, y_pred_proba)
print(f"Melhor ROC-AUC no conjunto de teste: {roc_auc:.4f}")

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encoder" } are not used.

Parameters: { "use_label_encode

Melhores hiperparâmetros: {'max_depth': 2, 'reg_alpha': 1, 'reg_lambda': 5}
Melhor ROC-AUC no conjunto de teste: 0.8418


In [76]:
# test early stop rounds
# Agora você pode aplicar early stopping no modelo com os melhores hiperparâmetros
best_model_with_early_stopping = xgb.XGBClassifier(
    objective='binary:logistic',
    eval_metric='logloss',
    use_label_encoder=False,
    max_depth=grid_search.best_params_['max_depth'],
    reg_alpha=grid_search.best_params_['reg_alpha'],
    reg_lambda=grid_search.best_params_['reg_lambda'],
    early_stopping_rounds=10
)

# Treinar o modelo com early stopping, agora com o conjunto de validação
best_model_with_early_stopping.fit(
    X_train, y_train,
    eval_set=[(X_test, y_test)],  # Conjunto de validação para early stopping
    verbose=False
)

# Avaliar o modelo no conjunto de teste
y_pred_proba = best_model_with_early_stopping.predict_proba(X_test)[:, 1]
roc_auc = roc_auc_score(y_test, y_pred_proba)
print(f"Melhor ROC-AUC no conjunto de teste: {roc_auc:.4f}")


Melhor ROC-AUC no conjunto de teste: 0.8483


Parameters: { "use_label_encoder" } are not used.



# Submission File

In [77]:
# select the best model
model = best_model_with_early_stopping

# get submission file
X = df.drop(columns = non_features)
df['xG'] = model.predict_proba(X)[:, 1]

submission = df[['event_id', 'frame_id', 'xG']].copy()
submission['shot_id'] = (
    submission['event_id'].astype(str) + '_' + submission['frame_id'].astype(str)
)
submission = submission.drop(columns=['event_id', 'frame_id'])
submission = submission.rename(columns={'xG_dist': 'xG'})
submission = submission[['shot_id', 'xG']]

test = pd.read_csv('data/test.csv')
submission = pd.merge(
    submission,
    test[["shot_id"]],
    on="shot_id"
)

submission.to_csv('submission.csv', index=False)