## *Analyzing StatsBomb Shot Freeze Frames and Constructing an xG Model*
    - Retrieve details regarding shots and their attributes.
    - Investigate the use and information extracted from shot freeze frames.
    - Extract new features.
    - Develop a predictive model for expected goals.
    - Evaluate the performance and accuracy of the created model.

In [1]:
import os
import json
import math 
import numpy as np
import pandas as pd
import geopandas as gpd 

from mplsoccer import Pitch
import matplotlib.pyplot as plt 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from shapely.geometry import MultiPoint, Polygon, Point 

import warnings
warnings.filterwarnings("ignore")

## *Data Pre-processing*

In [None]:
def process_json_file(file_path, shots_list):
    with open(file_path, 'r') as file:
        data = json.load(file)
        for item in data:
            shot = item.get('shot')
            location = item.get('location')
            id_ = item.get('id')
            if shot and 'freeze_frame' in shot:
                shots_list.append((id_, shot['freeze_frame'], location ))


In [None]:
def process_all_json_files_in_directory(directory):
    shots_list = []
    
    for filename in os.listdir(directory):
        if filename.endswith('.json'):
            file_path = os.path.join(directory, filename)
            process_json_file(file_path, shots_list)
    
    return shots_list

In [None]:
json_directory = 'data/events'
shots = process_all_json_files_in_directory(json_directory)
shots_df = pd.DataFrame(shots, columns=['id', 'freeze_frame', 'location'])
shots_df[['X','Y']] = shots_df['location'].apply(lambda x: pd.Series(x, index=['X', 'Y']))
shots_df.to_parquet('data/shots/statsbomb-shots.parquet')

In [None]:
#a shot freeze frame example
shot_freeze_frames = shots_df['freeze_frame']
shot_freeze_frames.iloc[5]

In [None]:
! pip install pyspark 

In [2]:
spark = SparkSession.builder \
    .appName("Shot Analysis") \
    .getOrCreate()

# Read the Parquet file into a PySpark DataFrame
shots_df = spark.read.parquet('data/shots/statsbomb-shots.parquet')

# Transform the DataFrame to extract required information
df_shot_frame = (
    shots_df.select("id", explode("freeze_frame").alias("frame"))
            .select("id", 
                    col("frame.location")[0].alias("x"), 
                    col("frame.location")[1].alias("y"), 
                    col("frame.position.name").alias("position"), 
                    col("frame.teammate").alias("teammate"))
)

statsbomb_pitch = Pitch()
num_rows = df_shot_frame.count()
vertices = np.zeros((num_rows, 3, 2))
vertices[:, 1:, :] = statsbomb_pitch.goal_right

# Convert vertices to GeoSeries
vertices_gdf = gpd.GeoSeries([Polygon(vert) for vert in vertices])
vertices_gdf["id"] = df_shot_frame.select("id").collect()

23/10/08 02:07:40 WARN Utils: Your hostname, dahbi-Latitude-7280 resolves to a loopback address: 127.0.1.1; using 192.168.0.198 instead (on interface wlp2s0)
23/10/08 02:07:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/08 02:07:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
df_shot_frame_pd = df_shot_frame.toPandas()

# Convert the pandas DataFrame to a GeoDataFrame
df_shot_frame_gdf = gpd.GeoDataFrame(df_shot_frame_pd, 
                                      geometry=gpd.points_from_xy(df_shot_frame_pd.x, df_shot_frame_pd.y))

                                                                                

In [4]:
flat_vertices = [Polygon(vert) for vert in vertices]
geo_series = gpd.GeoSeries(flat_vertices)

# Create a GeoDataFrame/
vertices_gdf = gpd.GeoDataFrame(geometry=geo_series)

In [None]:
player_positions = gpd.sjoin(df_shot_frame_gdf, vertices_gdf, how="inner", op="intersects")

In [None]:
events_id= list(shots_df.id)
shot_frames= list(shots_df['freeze_frame'])

rows=[]
for event_id, frame in zip(events_id, shot_frames):
    for i in frame:
        rows.append((event_id, i['location'][0], i['location'][1], i['position']['name'],i['teammate']))
df_shot_frame= pd.DataFrame(data= rows, columns=["id", "x", "y", "position", "teammate"])
statsbomb_pitch = Pitch()
vertices = np.zeros((len(df_shot_frame), 3, 2))
vertices[:, 1:, :] = statsbomb_pitch.goal_right
vertices[:, 0, :] = df_shot_frame[['x','y']].values
vertices = gpd.GeoSeries([Polygon(vert) for vert in vertices])
vertices = gpd.GeoDataFrame({'id': df_shot_frame['id'], 'shot_polygon': gpd.GeoSeries(vertices)})

player_positions = gpd.GeoSeries.from_xy(df_shot_frame['x'], df_shot_frame['y'])
player_positions = gpd.GeoDataFrame({'id': df_shot_frame['id'], 'position': player_positions,
                                    'Tactical position':df_shot_frame["position"], 'teammate':df_shot_frame["teammate"],
                                     'X_':df_shot_frame["x"], 'Y_':df_shot_frame["y"]
                                    })

player_positions = gpd.GeoDataFrame(player_positions.merge(vertices, on='id'))

#detect wether players intersects with the shot angle
player_positions['does player interfer with the goal angle?'] = player_positions['position'].intersects(player_positions['shot_polygon'])
#drop players who doesn't interefer with the shot angle
player_positions= player_positions[player_positions['does player interfer with the goal angle?']]
player_positions

In [None]:
xb, yb = (120, 44)
xc, yc = (120, 36)
LP = 120

def calculate_shot_angle(x, y):
    numerator = 2 * (x - LP) ** 2 + (y - yb) ** 2 + (y - yc) ** 2 - (yb - yc) ** 2
    denominator = 2 * math.sqrt(((x - LP) ** 2 + (y - yb) ** 2) * ((x - LP) ** 2 + (y - yc) ** 2))
    shot_angle_radians = math.acos(numerator / denominator)
    angle = math.degrees(shot_angle_radians)
    return angle

def calculate_angle(xy, xi_yi_Li):
    x, y = xy
    xi, yi, Li = xi_yi_Li
    numerator = (x - xi) ** 2 + (y - yi) ** 2 - (Li / 2) ** 2
    denominator1 = (x - xi) ** 2 + (y - yi - (Li / 2)) ** 2
    denominator2 = (x - xi) ** 2 + (y - yi + (Li / 2)) ** 2
    try:
        angle_radians = math.acos(numerator / math.sqrt(denominator1 * denominator2))
        anlge = math.degrees(angle_radians)
    except ValueError:
        anlge = 0.0
    
    return anlge

def determine_L(row):
    if row['teammate']:
        return 0.4
    elif row['Tactical position'] == 'Goalkeeper':
        return 1.4
    else:
        return 0.5

In [None]:
#create a column for the width of player based on his position and wheter he's a teammate of the shooter or not
player_positions['L'] = player_positions.apply(determine_L, axis=1)

#add as much potential features as possible 
player_positions = gpd.GeoDataFrame(player_positions.merge(shots_df, on='id'))

In [None]:
def calculate_angle_for_row(row):
    xy = (row['X'], row['Y'])
    xi_yi_Li = (row['X_'], row['Y_'], row['L'])
    return calculate_angle(xy, xi_yi_Li)

#calculate the angle of each player
player_positions['angle'] = player_positions.apply(calculate_angle_for_row, axis=1)

In [None]:
columns_to_drop= ['position', 'Tactical position', 'teammate', 'shot_polygon', 
                  'does player interfer with the goal angle?', 'L', 'location' , 'X_' ,'Y_']
player_positions.drop(columns= columns_to_drop, inplace= True)

In [None]:
grouped_df = player_positions.groupby('id').agg({
    'freeze_frame': 'first',
    'X': 'first',  
    'Y': 'first',  
    'angle': 'sum'
}).reset_index()