### Import packages

In [1]:
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from mplsoccer import Pitch
import pandas as pd
import glob
import os

## Notebook for adding more features to frames_df

### Define global variables

In [2]:
DATA_LOCAL_FOLDER = '/mimer/NOBACKUP/groups/two_second_football'
seasons = ['2023']
competitions = ['Allsvenskan']
reload_data = True
seconds_into_the_future = 2
FPS = 25
pitch_length = 105
pitch_width = 68

In [3]:
# Original colors
ajax_red = '#e40615'
ajax_white = '#ffffff'
ajax_yellow = '#f7be2f'
ajax_grey = '#eeeeee'
ajax_black = '#000000'

# Lighter shades
ajax_red_light = '#ff9ea8'
ajax_white_light = '#e9e9e9'
ajax_yellow_light = '#fff8c5'
ajax_grey_light = '#efefef'

### Functions for adding features

In [4]:
# Add the features x_future and y_future (the x and y coordinate of each player n frames into the future)
def add_xy_future(frames_df, n=50):
    # Shift the DataFrame by n frames for each player
    future_df = frames_df.groupby(['period', 'team', 'jersey_number']).shift(-n)

    # Merge the original DataFrame with the shifted DataFrame to get future coordinates
    frames_df[['x_future', 'y_future']] = future_df[['x', 'y']]

In [5]:
# Add the features v_x and v_y (current velocity (m/s) in the x and y axis respectivly). delta_frames determines the time stamp
def add_velocity_xy(frames_df, delta_frames=1):
    # Create a copy of the DataFrame and shift it by delta_frames
    past_df = frames_df.copy()
    past_df['frame'] += delta_frames

    # Merge the original DataFrame with the shifted DataFrame to get future coordinates
    past_coordinates = frames_df.merge(past_df, on=['frame', 'period', 'team', 'jersey_number'], suffixes=('', '_past'), how='outer')

    # Use the past coordinates to calculate the current velocity
    v_x = (frames_df['x'] - past_coordinates['x_past']) * FPS / delta_frames
    v_y = (frames_df['y'] - past_coordinates['y_past']) * FPS / delta_frames
    
    # The player can't surely run faster than Usian Bolt's max speed 
    usain_bolt_max_speed = 13
    frames_df['v_x'] = v_x.clip(lower=-usain_bolt_max_speed, upper=usain_bolt_max_speed)
    frames_df['v_y'] = v_y.clip(lower=-usain_bolt_max_speed, upper=usain_bolt_max_speed)

In [6]:
# Add the features a_x and a_y (current velocity (m/sÂ²) in the x and y axis respectivly). delta_frames determines the time stamp
def add_acceleration_xy(frames_df, delta_frames=1):
    # Create a copy of the DataFrame and shift it by delta_frames twice
    past_df = frames_df.copy()
    past_df['frame'] += delta_frames
    more_past_df = frames_df.copy()
    more_past_df['frame'] += 2 * delta_frames

    # Merge the original DataFrame with the shifted DataFrames to get past and future coordinates
    past_coordinates = frames_df.merge(past_df, on=['frame', 'period', 'team', 'jersey_number'], suffixes=('', '_past'), how='outer')
    more_past_coordinates = frames_df.merge(more_past_df, on=['frame', 'period', 'team', 'jersey_number'], suffixes=('', '_more_past'), how='outer')

    # Use past and future coordinates to calculate current acceleration
    a_x = ((frames_df['x'] - 2 * past_coordinates['x_past'] + more_past_coordinates['x_more_past']) * FPS / (delta_frames ** 2)).fillna(0)
    a_y = ((frames_df['y'] - 2 * past_coordinates['y_past'] + more_past_coordinates['y_more_past']) * FPS / (delta_frames ** 2)).fillna(0)

    # Clip acceleration values to reasonable limits
    max_acceleration = 10  # This is a very high acceleration
    frames_df['a_x'] = a_x.clip(lower=-max_acceleration, upper=max_acceleration)
    frames_df['a_y'] = a_y.clip(lower=-max_acceleration, upper=max_acceleration)

In [7]:
# Flip the coordinates to match the team direction
def flip_xy_based_on_team_direction(frames_df):
    # TODO: What do to with the ball?
    for period in [1, 2]:
        # Flip the x coordinates for the team attacking to the left
        home_team_attacking_to_right = frames_df[(frames_df['period'] == period) & (frames_df['team'] == 'home_team')].iloc[0]['team_direction'] == 'right'
        if home_team_attacking_to_right:
            frames_df.loc[(frames_df['period'] == period) & (frames_df['team'] == 'home_team'), 'x'] = pitch_length - frames_df[(frames_df['period'] == period) & (frames_df['team'] == 'home_team')]['x']
        else:
            frames_df.loc[(frames_df['period'] == period) & (frames_df['team'] == 'home_team'), 'x'] = pitch_length - frames_df[(frames_df['period'] == period) & (frames_df['team'] == 'home_team')]['x']
    return frames_df

In [19]:
# Add a vector indicating if the ball is in motion
def add_ball_in_motion(frames_df):
    # Initialize variables
    ball_in_motion_vec = []
    x_ball = 0
    y_ball = 0
    i = - 1

    # For all objects in each frame
    while (i < len(frames_df)-1):        
        # Update i to be the last row in the next frame
        objects_tracked = frames_df.iloc[i+1]['objects_tracked']
        i += objects_tracked

        # Determine if the ball is motion
        ball_in_motion = False
        # If the ball exists, it will surely be the last row
        if frames_df.iloc[i]['team'] == 'ball':
            # If either x_ball or y_ball has changed since the last recorded positions
            if x_ball != frames_df.iloc[i]['x'] or y_ball != frames_df.iloc[i]['y']:
                # Update varibles
                x_ball = frames_df.iloc[i]['x']
                y_ball = frames_df.iloc[i]['y']
                ball_in_motion = True

        # Store the result in ball_in_motion_vec
        [ball_in_motion_vec.append(ball_in_motion) for _ in range(objects_tracked)]
        # print(f"i: {i},len: {len(ball_in_motion_vec)}")

    # Add the new column based on the vector
    frames_df['ball_in_motion'] = ball_in_motion_vec

# Copy the first 19980 rows of frames_df into small_frames_df
# small_frames_df = frames_df.head(19979).copy()

# Test the function
add_ball_in_motion(frames_df)

### Predictive models

In [10]:
# NAIVE: Always predict that all players will stand still
# The calculations are based on x, y
def predict_two_seconds_naive_static(frames_df):
    frames_df['x_future_pred'] = frames_df['x']
    frames_df['y_future_pred'] = frames_df['y']

# NAIVE: Always predict that all players will continue with the same velocity
# The calculations are based on x, y, v_x, and v_y
def predict_two_seconds_naive_velocity(frames_df):
    frames_df['x_future_pred'] = frames_df['x'] + frames_df['v_x'] * seconds_into_the_future
    frames_df['y_future_pred'] = frames_df['y'] + frames_df['v_y'] * seconds_into_the_future

# Make a prediction with a LSTM neural network model
def predict_two_seconds_LSTM(frames_df):
    return 

### Calculate error loss function

In [11]:
# Add a column for distance wrongly predicted (in metres) for each object. Also return average_pred_error
def total_error_loss(frames_df, include_ball=True, ball_has_to_be_in_motion=False):
    # Create a vector with the Eculidian distance between the true position and the predicted position
    frames_df['pred_error'] = round(((frames_df['x_future_pred'] - frames_df['x_future'])**2 + (frames_df['y_future_pred'] - frames_df['y_future'])**2)**0.5, 2)
    
    # Calculate average pred_error
    if include_ball:
        average_pred_error = frames_df['pred_error'].mean()
    else:
        # Calculate average pred_error for all entries where team != 'ball'
        average_pred_error = frames_df[frames_df['team'] != 'ball']['pred_error'].mean()

    return round(average_pred_error, 2)

# Find a frame with approximatly the same error as the average_pred_error, with an interval
def find_frame_with_average_error(frames_df, average_pred_error, error_margin):
    # For all frames
    frames = frames_df['frame'].unique()
    for frame in frames:
        current_error = frames_df[frames_df['frame'] == frame]['pred_error'].mean()
        # If the current error is within the error_margin,
        if (current_error >= average_pred_error - error_margin) and (current_error <= average_pred_error + error_margin):
            # Return the result
            return frame

    # If no frame was found
    print(f"No frame found within the error margin of {error_margin}")
    return None

## Functions for visualizing the data in frames_df

### Visualize number of tracked objects in each frame

In [12]:
# Visualize number of tracked objects in each frame
def visualize_num_objects_tracked(frames_df):
    return

In [13]:
# Visualize the appaerance of each object (players and ball) throughout a game
def visualize_object_appaerance(frames_df):
    return

### Process frames_df

In [14]:
# Load and process the unprocessed/ frames, and store the results to the processed/ fodler
def process_frames():
    # Load frames_df
    for selected_season in seasons:
        for selected_competition in competitions:
            # Define paths
            DATA_FOLDER_UNPROCESSED = f"{DATA_LOCAL_FOLDER}/data/{selected_season}/{selected_competition}/unprocessed"
            FOLDER_OUT = f"{DATA_LOCAL_FOLDER}/data/{selected_season}/{selected_competition}/processed"
            
            # Create output folder if not exists
            if not os.path.exists(FOLDER_OUT):
                    os.makedirs(FOLDER_OUT)

            # Find all frames parquet files
            match_paths = glob.glob(os.path.join(DATA_FOLDER_UNPROCESSED, "*.parquet"))

            # Extract IDs without the ".parquet" extension
            # TODO: Uncomment this line in production
            # match_ids = [os.path.splitext(os.path.basename(path))[0] for path in match_paths]
            match_ids = ['49e6bfdf-abf3-499d-b60e-cf727c6523c1']

            # For all matches
            for match_id in match_ids:
                # Convert parquet file to a DataFrame
                file_path_match = f"{DATA_FOLDER_UNPROCESSED}/{match_id}.parquet"
                frames_df = pd.read_parquet(file_path_match)

                # Process frames_df
                add_xy_future(frames_df, FPS*seconds_into_the_future)
                add_velocity_xy(frames_df, 1)

                # Convert DataFrame to a parquet file
                frames_df.to_parquet(f"{FOLDER_OUT}/{match_id}.parquet")

## Visualize a game

### Visualize an animation of a game

In [15]:
# TODO: Extract this to a spereate file

# Define variables for plotting
draw_velocities = True
player_size = 250
ball_size = 170
color_home_team = ajax_red
color_away_team = ajax_yellow
color_ball = ajax_grey

# Draw a legend underneth the pitch
def draw_game_snippet_legend(ax, home_team, away_team):
    y_legend = -3
    # Home team
    ax.scatter(pitch_length/2 -28, y_legend, s=player_size, color=color_home_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 - 26, y_legend, home_team, ha='left', va='center', fontsize=14)

    # Ball
    ax.scatter(pitch_length/2 - 2, y_legend, s=ball_size, color=color_ball, edgecolors=ajax_black, linewidth=1.8, zorder=3)
    ax.text(pitch_length/2 - 0, y_legend, 'Ball', ha='left', va='center', fontsize=14)

    # Away team
    ax.scatter(pitch_length/2 + 14, y_legend, s=player_size, color=color_away_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 + 16, y_legend, away_team, ha='left', va='center', fontsize=14)

def visualize_game_snippet(frames_df, start_frame, end_frame):
    # Find DataFrames for each team and the ball
    home_team_df = frames_df[frames_df['team'] == 'home_team'].copy()
    away_team_df = frames_df[frames_df['team'] == 'away_team'].copy()
    ball_df = frames_df[frames_df['team'] == 'ball'].copy()

    # Find the name of the home and away team
    home_team = home_team_df.iloc[0]['team_name']
    away_team = away_team_df.iloc[0]['team_name']

    # Plot pitch
    pitch = Pitch(pitch_type='uefa', goal_type='box', line_color=ajax_black)
    fig, ax = pitch.grid(grid_height=0.96, title_height=0.02, axis=False, endnote_height=0.02, title_space=0, endnote_space=0)

    # Add title
    fig.suptitle(f"{home_team} - {away_team}", fontsize=18)

    # Define function to update scatter plots for a specific frame
    def update_scatter(frame):
        # Clear previous scatter plots
        ax['pitch'].collections.clear()
        ax['pitch'].texts.clear()
        ax['pitch'].patches.clear()

        # TODO: Remove period filter in production
        home_team_frame_df = home_team_df[(home_team_df['frame'] == frame) & (home_team_df['period'] == 1)]
        away_team_frame_df = away_team_df[(away_team_df['frame'] == frame) & (away_team_df['period'] == 1)]
        ball_frame_df = ball_df[(ball_df['frame'] == frame) & (ball_df['period'] == 1)]

        # Plot the time stamp
        time_stamp = f"{home_team_frame_df.iloc[0]['minute']}:{home_team_frame_df.iloc[0]['second']}:{home_team_frame_df.iloc[0]['frame'] % FPS}"
        ax['pitch'].text(0, 70, time_stamp, ha='left', fontsize=18)

        # Scatter the positions of the home players
        ax['pitch'].scatter(home_team_frame_df['x'], home_team_frame_df['y'], s=player_size, color=color_home_team, edgecolors=ajax_black, linewidth=1.8, label=home_team, zorder=2)

        # Scatter the positions of the away players
        ax['pitch'].scatter(away_team_frame_df['x'], away_team_frame_df['y'], s=player_size, color=color_away_team, edgecolors=ajax_black, linewidth=1.8, label=away_team, zorder=2)

        # Draw the ball
        ax['pitch'].scatter(ball_frame_df['x'], ball_frame_df['y'], s=ball_size, color=color_ball, edgecolors=ajax_black, linewidth=1.8, label='Ball', zorder=3)

        # Draw arrows for the velocity of each player
        if draw_velocities:
            # Draw the velocity for home players with valid velocities
            for _, row in home_team_frame_df[~home_team_frame_df[['v_x', 'v_y']].isna().any(axis=1)].iterrows():
                ax['pitch'].arrow(row['x'], row['y'], row['v_x'], row['v_y'], color=ajax_black, head_width=0.5, head_length=0.5, zorder=1)
        
            # Draw the velocity for away players with valid velocities
            for _, row in away_team_frame_df[~away_team_frame_df[['v_x', 'v_y']].isna().any(axis=1)].iterrows():
                ax['pitch'].arrow(row['x'], row['y'], row['v_x'], row['v_y'], color=ajax_black, head_width=0.5, head_length=0.5, zorder=1)
        
        # Add legend
        draw_game_snippet_legend(ax['pitch'], home_team, away_team)

    # Create the animation
    animation = FuncAnimation(fig, update_scatter, frames=range(start_frame, end_frame), repeat=False, interval=40)

    # Specify the GIF file path
    gif_name = f"animations/{home_team}_vs_{away_team}.gif"

    # Save the animation as a GIF file
    animation.save(gif_name, writer='pillow')

# Example usage:
# visualize_game_snippet(frames_df, 0, 1600)

In [16]:
# TODO: Extract this to a spereate file

# Define variables for plotting
draw_velocities = True
player_size = 250
ball_size = 170
predicted_player_size = 0.8 * player_size
predicted_ball_size = 0.8 * ball_size

# Original colors
color_home_team = ajax_red
color_away_team = ajax_yellow
color_ball = ajax_grey

# Lighter shades
color_home_team_light = ajax_red_light
color_away_team_light = ajax_yellow_light
color_ball_light = ajax_grey_light

# Draw a legend underneth the pitch
def draw_prediction_legend(ax, home_team, away_team):
    y_legend = -2
    y_predicted = y_legend - 3

    # Home team
    ax.scatter(pitch_length/2 - 28, y_legend, s=player_size, color=color_home_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 - 26, y_legend, home_team, ha='left', va='center', fontsize=14)

    ax.scatter(pitch_length/2 - 28, y_predicted, s=predicted_player_size, color=color_home_team_light, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 - 26, y_predicted, 'Predicted', ha='left', va='center', fontsize=14)

    # Ball
    ax.scatter(pitch_length/2 - 2, y_legend, s=ball_size, color=color_ball, edgecolors=ajax_black, linewidth=1.8, zorder=3)
    ax.text(pitch_length/2 - 0, y_legend, 'Ball', ha='left', va='center', fontsize=14)

    ax.scatter(pitch_length/2 - 2, y_predicted, s=predicted_ball_size, color=color_ball_light, edgecolors=ajax_black, linewidth=1.8, zorder=3)
    ax.text(pitch_length/2 - 0, y_predicted, 'Predicted', ha='left', va='center', fontsize=14)

    # Away team
    ax.scatter(pitch_length/2 + 14, y_legend, s=player_size, color=color_away_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 + 16, y_legend, away_team, ha='left', va='center', fontsize=14)

    ax.scatter(pitch_length/2 + 14, y_predicted, s=predicted_player_size, color=color_away_team_light, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax.text(pitch_length/2 + 16, y_predicted, 'Predicted', ha='left', va='center', fontsize=14)

def visualize_frame_predictions(frames_df, frame, model_name):
    # Find DataFrames for each team and the ball
    # TODO: Remove period filter in production
    home_team_frame_df = frames_df[(frames_df['team'] == 'home_team') & (frames_df['frame'] == frame) & (frames_df['period'] == 1)].copy()
    away_team_frame_df = frames_df[(frames_df['team'] == 'away_team') & (frames_df['frame'] == frame) & (frames_df['period'] == 1)].copy()
    ball_frame_df = frames_df[(frames_df['team'] == 'ball') & (frames_df['frame'] == frame) & (frames_df['period'] == 1)].copy()

    # Find the name of the home and away team
    home_team = home_team_frame_df.iloc[0]['team_name']
    away_team = away_team_frame_df.iloc[0]['team_name']

    # Plot pitch
    pitch = Pitch(pitch_type='uefa', goal_type='box', line_color=ajax_black)
    fig, ax = pitch.grid(grid_height=0.95, title_height=0.025, axis=False, endnote_height=0.025, title_space=0, endnote_space=0)

    # Adjust the x- and y-axis limits
    ax['pitch'].set_ylim(0, pitch_length)
    ax['pitch'].set_ylim(-6, pitch_width + 3.4)

    # Add title
    fig.suptitle(f"{home_team} - {away_team}", fontsize=18)

    # Plot the time stamp
    time_stamp = f"{home_team_frame_df.iloc[0]['minute']}:{home_team_frame_df.iloc[0]['second']}:{home_team_frame_df.iloc[0]['frame'] % FPS}"
    ax['pitch'].text(0, 70, time_stamp, ha='left', fontsize=18)

    # Scatter the true positions
    ax['pitch'].scatter(home_team_frame_df['x_future'], home_team_frame_df['y_future'], s=player_size, color=color_home_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax['pitch'].scatter(away_team_frame_df['x_future'], away_team_frame_df['y_future'], s=player_size, color=color_away_team, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax['pitch'].scatter(ball_frame_df['x_future'], ball_frame_df['y_future'], s=ball_size, color=color_ball, edgecolors=ajax_black, linewidth=1.8, zorder=3)

    # Scatter the predicted positions with an opacity
    ax['pitch'].scatter(home_team_frame_df['x_future_pred'], home_team_frame_df['y_future_pred'], s=predicted_player_size, color=color_home_team_light, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax['pitch'].scatter(away_team_frame_df['x_future_pred'], away_team_frame_df['y_future_pred'], s=predicted_player_size, color=color_away_team_light, edgecolors=ajax_black, linewidth=1.8, zorder=2)
    ax['pitch'].scatter(ball_frame_df['x_future_pred'], ball_frame_df['y_future_pred'], s=predicted_ball_size, color=color_ball_light, edgecolors=ajax_black, linewidth=1.8, zorder=3)

    # Plot lines between true positions and predicted positions with the 'pred_error' over each line
    for index, row in home_team_frame_df.iterrows():
        ax['pitch'].plot([row['x_future'], row['x_future_pred']], [row['y_future'], row['y_future_pred']], color=ajax_black, linewidth=1.5, linestyle='--', alpha=0.7, zorder=1)

    for index, row in away_team_frame_df.iterrows():
        ax['pitch'].plot([row['x_future'], row['x_future_pred']], [row['y_future'], row['y_future_pred']], color=ajax_black, linewidth=1.5, linestyle='--', alpha=0.7, zorder=1)

    for index, row in ball_frame_df.iterrows():
        ax['pitch'].plot([row['x_future'], row['x_future_pred']], [row['y_future'], row['y_future_pred']], color=ajax_black, linewidth=1.5, linestyle='--', alpha=0.7, zorder=1)

    # Draw the average prediction error in the bottom left corner
    average_pred_error = frames_df[frames_df['frame'] == frame]['pred_error'].mean()
    average_pred_error_txt = f"Avg. error: {round(average_pred_error, 2)} m"
    ax['pitch'].text(pitch_length - 15, -3.5, average_pred_error_txt, ha='left', va='center', fontsize=14)

    # Add legend
    draw_prediction_legend(ax['pitch'], home_team, away_team)

    # Save figure
    img_path = f"images/predictions/frame_{frame}_{model_name}_{home_team}_vs_{away_team}.png"
    fig.savefig(img_path)

# Example usage:
# visualize_frame_predictions(frames_df, 100, "naive_static")

## Run an example

In [17]:
# Prepare frames_df
file_path_example_match = f"{DATA_LOCAL_FOLDER}/data/2023/Allsvenskan/processed/49e6bfdf-abf3-499d-b60e-cf727c6523c1.parquet"
frames_df = pd.read_parquet(file_path_example_match)
flip_xy_based_on_team_direction(frames_df)
add_velocity_xy(frames_df, 1)
add_acceleration_xy(frames_df, 1)
add_xy_future(frames_df, FPS*seconds_into_the_future)

# Test model 1
predict_two_seconds_naive_static(frames_df)
error_model_1 = total_error_loss(frames_df, include_ball=False, ball_has_to_be_in_motion=False)
print(f"Model 1 error: {error_model_1}")
frame_with_average_error = find_frame_with_average_error(frames_df, error_model_1, 0.2)
visualize_frame_predictions(frames_df, frame_with_average_error, "naive_static")

# Test model 2
predict_two_seconds_naive_velocity(frames_df)
error_model_2 = total_error_loss(frames_df, include_ball=False, ball_has_to_be_in_motion=False)
print(f"Model 2 error: {error_model_2}")
visualize_frame_predictions(frames_df, frame_with_average_error, "naive_velocity")

Model 1 error: 3.86
Model 2 error: 2.23


In [None]:
# Store frames_df as xslx
frames_df_head = frames_df.head(19979)

# Specify the file path for the Excel file
excel_file_path = f"{DATA_LOCAL_FOLDER}/Brommapojkarna_vs_Sirius.xlsx"

# Write the DataFrame to an Excel file
frames_df_head.to_excel(excel_file_path, index=False)

print(f"DataFrame saved to {excel_file_path}")