In [16]:
# bdb2025.py
"""Common functions for loading and preparing Big Data Bowl 2025 data for modeling."""
from typing import Optional
from pathlib import Path
import pandas as pd
import numpy as np

def load_csv_data(
    paths: list[Path]
) -> list[pd.DataFrame]:
    # TODO: look into reading from template, i.e. tracking_week_*.csv
    return [pd.read_csv(path) for path in paths]


def load_tracking_data(
    weeks: Optional[int | list[int]]=None,
    db: Optional[Path]=None,
    folder: Optional[Path]=None,
) -> pd.DataFrame:
   
    MIN_WEEK = 1
    MAX_WEEK = 9

    if not weeks:
        # defualt to all data
        weeks = range(MIN_WEEK, MAX_WEEK)
    
    if isinstance(weeks, int):
        weeks = [weeks]
    weeks = set(weeks)
    
    for week in weeks:
        assert isinstance(week, int) and week >= MIN_WEEK and week <= MAX_WEEK, \
        f"provide integer weeks between {MIN_WEEK} and {MAX_WEEK}"

    # TODO: Load from database
    if db and db.exists():
        return

    # load data from a folderectory
    if folder and folder.exists():
        paths = [Path(folder, f'tracking_week_{n}.csv') for n in weeks]

        dfs = load_csv_data(paths)
        return pd.concat(dfs)
    
    raise FileNotFoundError



def load_player_data(
    db: Optional[Path]=None,
    folder: Optional[Path]=None,
) -> pd.DataFrame:
    
    if db and db.exists():
        return

    if folder and folder.exists():
        path = Path(folder, 'players.csv')

        return pd.read_csv(path)
    

def load_play_data(
        db: Optional[Path]=None,
        folder: Optional[Path]=None,
) -> pd.DataFrame:
    
    if db and db.exists():
        return

    if folder and folder.exists():
        path = Path(folder, 'plays.csv')

    return pd.read_csv(path)


def clean_player_data(player_df: pd.DataFrame) -> pd.DataFrame:
    
    # clean height
    feet = pd.to_numeric(player_df['height'].str.split('-', expand=True)[0])
    inches = pd.to_numeric(player_df['height'].str.split('-', expand=True)[1])
    player_df['height_inches'] = 12 * feet + inches

    # create zscore for height
    player_df['height_z'] = (player_df['height_inches'] - player_df['height_inches'].mean()) / player_df['height_inches'].std()
    player_df['weight_z'] = (player_df['weight'] - player_df['weight'].mean()) / player_df['weight'].std()

    return player_df


def clean_tracking_data(tracking_df: pd.DataFrame) -> pd.DataFrame:

    # angles to radians
    tracking_df['o'] = ((-1 * tracking_df['o'] + 90) % 360) * np.pi / 180
    tracking_df['dir'] = ((-1 * tracking_df['dir'] + 90) % 360) * np.pi / 180

    # standardize locations to the ball snap location
    ball = (
        tracking_df
        .loc[
            (tracking_df['event'] == 'ball_snap') & 
            (tracking_df['club'] == 'football'),
             
            ['gameId', 'playId', 'frameId', 'x', 'y']
        ]
    )
    tracking_df = tracking_df.merge(
        ball, # ball info for starting time
        how='left', 
        on=('gameId', 'playId'),
        suffixes=('', '_ball')
    )

    tracking_df['x'] = tracking_df['x'] - tracking_df['x_ball']
    tracking_df['y'] = tracking_df['y'] - tracking_df['y_ball']

    # normalize play directions to the right
    tracking_df['x'] = ((tracking_df['playDirection'] == 'left') * -2 + 1) * tracking_df['x']
    tracking_df['o'] = ( ((tracking_df['playDirection'] == 'left') * np.pi) + tracking_df['o'] ) % (2 * np.pi) 
    tracking_df['dir'] = ( ((tracking_df['playDirection'] == 'left') * np.pi) + tracking_df['dir'] ) % (2 * np.pi) 

    # x and y components of orientation and velocity
    tracking_df['ox'] = np.cos(tracking_df['o'])
    tracking_df['oy'] = np.sin(tracking_df['o'])

    tracking_df['vx'] = np.cos(tracking_df['dir']) * tracking_df['s']
    tracking_df['vy'] = np.sin(tracking_df['dir']) * tracking_df['s']

    tracking_df['stop_point'] = pd.NA
    tracking_df.loc[tracking_df['event'].isin(['pass_outcome_incomplete', 'qb_sack', 'tackle']), 'stop_point'] = True 
    # fill stopping points forward to filter out moments after a defined route stop
    tracking_df['stop_point'] = tracking_df.groupby(['gameId', 'playId', 'nflId'])['stop_point'].ffill()
    
    tracking_df = tracking_df.loc[pd.isna(tracking_df['stop_point'])].copy()

    # filter
    tracking_df = tracking_df.loc[
        (tracking_df['frameId'] >= tracking_df['frameId_ball']) & # after the starting point
        (tracking_df['club'] != 'football')
        
        #['gameId', 'playId', 'frameId', 'nflId', 'x', 'y', 'vx', 'vy', 'a', 'ox', 'oy']
    ].copy()
    
    return tracking_df


def mirror_tracking_plays(tracking_df: pd.DataFrame) -> pd.DataFrame:

    mirrored_df = tracking_df.copy()

    mirrored_df['y'] = mirrored_df['y'] * -1
    mirrored_df['vy'] = mirrored_df['vy'] * -1
    mirrored_df['oy'] = mirrored_df['oy'] * -1

    mirrored_df['mirrored'] = True
    tracking_df['mirrored'] = False

    return pd.concat([tracking_df, mirrored_df])


def prepare_static_data(tracking_df: pd.DataFrame, play_df: pd.DataFrame, player_df: pd.DataFrame ) -> pd.DataFrame:

    # join 
    joined_df = (
        tracking_df
        .groupby(['gameId', 'playId', 'nflId', 'mirrored'], as_index=False)
        .first()
        .merge(
            play_df,
            how='left',
            on=('gameId', 'playId')
        )
        .merge( # player info for positions
            player_df, 
            how='left',
            on='nflId'
        )
    )
    joined_df['offense'] = joined_df['club'] == joined_df['possessionTeam']

    # filter
    joined_df = joined_df[        
        ['gameId', 'playId', 'frameId', 'nflId', 'mirrored', 'height_z', 'weight_z', 'position', 'offense']
    ].copy()

    joined_df = pd.get_dummies(joined_df)

    return joined_df

In [None]:
# datasets.py
"""Functions for perparing datasets for modeling"""
import pandas as pd
import numpy as np
from scipy.interpolate import CubicSpline
from torch.utils.data import Dataset


def interpolate_movement(tracking_df: pd.DataFrame) -> np.ndarray:

    play_index = tracking_df.set_index(['gameId', 'playId', 'mirrored']).sort_index()

    num_obs = 40
    interpolated_columns = ['x', 'y', 'ox', 'oy', 'vx', 'vy', 'a']
    ids = play_index.index.unique()
    num_plays = len(ids)
    routes_per_play = tracking_df.groupby(['gameId', 'playId'])['nflId'].nunique(dropna=True).max()

    # array for interpolated movement
    movement_arr = np.zeros( (num_plays, routes_per_play, num_obs, len(interpolated_columns)), dtype=np.float32)

    for i, (gameid, playid, mirrored) in enumerate(ids): 
        
        play: pd.DataFrame = play_index.loc[gameid, playid, mirrored]
        
        for ii, nflid in enumerate(play['nflId'].dropna().unique()):

            player: pd.DataFrame = play.loc[play['nflId'] == nflid]
            min_frame = player['frameId'].min()
            frame_range = player['frameId'].max() - min_frame
            t = np.array((player['frameId'] - min_frame) / frame_range)

            player_data = np.array(player[interpolated_columns])

            # smooth the info about the player over time
            spline = CubicSpline(t, player_data, axis=0, extrapolate=False)

            # take n observations
            observation_times = np.arange(num_obs) / num_obs

            # populate an array with smoothed values
            movement_arr[i, ii, ...] = spline(observation_times)

    return movement_arr


def perpare_data_arrays(static_df: pd.DataFrame, movement_arr: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    play_index = static_df.set_index(['gameId', 'playId', 'mirrored'])
    
    num_plays = len(play_index.index.unique())
    routes_per_play = static_df.groupby(['gameId', 'playId', 'mirrored'])['nflId'].nunique(dropna=True).iloc[0]

    assert (static_df.groupby(['gameId', 'playId', 'mirrored'])['nflId'].nunique(dropna=False) == routes_per_play).all(), \
    "Different values across player dimension"
    
    num_columns = len(play_index.columns) - 1 # subtract the nflId column
    player_info = np.zeros( (num_plays, routes_per_play, num_columns) ) 

    for i, (name, group) in enumerate(play_index.groupby(['gameId', 'playId', 'mirrored'])):
        player_info[i, ...] = group.drop(columns=['nflId'])

    input_arr = np.concat(
        [
            movement_arr.reshape((num_plays, routes_per_play, -1)), # combine final axes 
            player_info, # concat fixed info
        ],
        axis=-1
    )

    target_arr = movement_arr[..., :2].reshape((num_plays, routes_per_play, -1))

    return input_arr, target_arr


def mask_input(input_arr: np.ndarray, n: int) -> np.ndarray:
    mask_pct = .15

    # "mask token"
    mask = np.zeros(n)

    n_player_vectors = np.prod(input_arr.shape[:-1])
    mask_idx = np.random.choice(
        n_player_vectors,
        size=int(n_player_vectors * mask_pct),
        replace=False
    ) # random choice over the product of the first axes

    # determine vectors to mask

    masked_players_arr = input_arr.reshape((n_player_vectors, -1)).copy()
    masked_players_arr[mask_idx, :n] = mask

    masked_players_arr.reshape(input_arr.shape)

    return masked_players_arr


class NflDataset(Dataset):
    
    def __init__(self, input_arr: np.ndarray, target_arr: np.ndarray):

        # make sure there's the right amount of input observatinos and target observations
        assert input_arr.shape[0] == target_arr.shape[0], "# Observations mismatch between input and target"

        self.input_arr = input_arr
        self.target_arr = target_arr

        return
    
    def __getitem__(self, idx) -> tuple[np.ndarray, np.ndarray]:
        
        return np.float32(self.input_arr[idx]), self.target_arr[idx]


    def __len__(self) -> int:

        return self.input_arr.shape[0]
    


In [58]:
# model.py
"""Model Architecture"""
import torch
import numpy as np

class NflBERT(torch.nn.Module):
    """Bert style model for encoding nfl player movement
    
    BERT uses a nearly identical transformer encoder as "Attention is all you need" and thus
    similar to the provided torch TransformerEncoder layer"""

    def __init__(
        self,
        feature_dim: int,
        output_dim: int,
        hidden_size: int=512,
        num_layers: int=8,
        num_heads: int=8,
        ffn_size: int=2048,
        ffn_act: str="gelu",
        dropout: float=.3
    ):
        super().__init__()
        self.norm_layer = torch.nn.BatchNorm1d(feature_dim)
        
        self.embed = torch.nn.Sequential(
            torch.nn.Linear(feature_dim, hidden_size),
            torch.nn.GELU(),
            torch.nn.LayerNorm(hidden_size),
            torch.nn.Dropout(dropout)
        )

        self.transformer = torch.nn.TransformerEncoder(
            torch.nn.TransformerEncoderLayer(
                d_model=hidden_size,
                nhead=num_heads,
                dim_feedforward=ffn_size,
                dropout=dropout,
                activation=ffn_act,
                batch_first=True
            ),
            num_layers=num_layers
        )

        self.decoder = torch.nn.Sequential(
            torch.nn.Linear(hidden_size, hidden_size // 4),
            torch.nn.GELU(),
            torch.nn.Dropout(dropout),
            torch.nn.LayerNorm(hidden_size // 4),
            torch.nn.Linear(hidden_size // 4, output_dim)
        )


    
    def forward(self, x: torch.Tensor):
        
        # x: [B: batch_size, P: # of players, F: feature_len]
        B, P, F = x.size()

        # Normalize features
        x = self.norm_layer(x.permute(0, 2, 1)).permute(0, 2, 1)  # [B,P,F] -> [B,P,F]

        # Embed features
        x = self.embed(x)  # [B,P,F] -> [B,P,M: model_dim]

        # Apply transformer encoder
        x = self.transformer(x)  # [B,P,M] -> [B,P,M]

        # Decode to predict tackle location
        x = self.decoder(x)  # [B, P, M] -> [B,P,O]

        return x



In [70]:
# load data

bdb_folder = Path("/Users/henrykraessig/code/bdb2025/data/")

tracking = pd.read_csv("minimal_tracking.csv")
players = load_player_data(folder=bdb_folder)
plays = load_play_data(folder=bdb_folder)

players = clean_player_data(players)

tracking = clean_tracking_data(tracking)
tracking = mirror_tracking_plays(tracking)

static = prepare_static_data(tracking, plays, players)

In [71]:
movement = interpolate_movement(tracking)
input, target = perpare_data_arrays(static, movement)
dataset = NflDataset(input, target)

In [72]:
loader = torch.utils.data.DataLoader(dataset, 32, True)

In [73]:
model = NflBERT(input.shape[-1], target.shape[-1])

In [None]:
# prepare the loss function
# create the optimizer
# split the data
# create plotting utils
# analyze the play normailzation
# fix masking to only loss over masked samples & limit max masked samples per play

In [None]:



for inp, tar in loader:


    print(f"{inp.shape} -> {tar.shape}")
    print(model(inp).shape)

    

torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([32, 22, 303]) -> torch.Size([32, 22, 80])
torch.Size([32, 22, 80])
torch.Size([2, 22, 303]) -> torch.Size([2, 22, 80])
torch.Size([2, 22, 80])
