In [1]:
import polars as pl
import pandas as pd
import numpy as np
import math as math
import copy

In [2]:

CONE_ANGLE_FAR = 50 # degrees
CONE_ANGLE_CLOSE = 120 # degrees
MAX_DISTANCE = 3 # yards
BLOCKING_RADIUS = 1.25 # yards

def angle_in_range(angle, left_boundary, right_boundary):
    return (angle - left_boundary) % 360 <= (right_boundary - left_boundary) % 360

# row = [o, dir, adjustedX, adjustedY, oDefender, dirDefender, adjustedXDefender, adjustedYDefender]
def looking_to_block_or_blocking_df_fn(row) -> int:
    blocking_status = 0
    player1 = row[0:4]
    player2 = row[4:]

    if is_in_vision_cone(player1, player2, CONE_ANGLE_FAR, MAX_DISTANCE) or is_in_vision_cone(player1, player2, CONE_ANGLE_CLOSE, BLOCKING_RADIUS):
        blocking_status = 1

    if is_blocking(player1, player2) and blocking_status == 1:
        blocking_status = 2

    return blocking_status
    

def looking_to_block_or_blocking(player1: tuple, player2: tuple) -> int:
    blocking_status = 0
    if is_in_vision_cone(player1, player2):
        blocking_status = 1

    if is_blocking(player1, player2) and blocking_status == 1:
        blocking_status = 2

    return 0

def is_in_vision_cone(player1: tuple, player2: tuple, cone_angle, distance) -> bool:
    return (is_in_angle(player1, player2, cone_angle) and is_in_distance(player1, player2, distance))

def is_in_angle(player1: tuple, player2: tuple, cone_angle) -> bool:
    half_cone_angle = cone_angle / 2

    y_dist = player2[3] - player1[3]
    x_dist = player2[2] - player1[2]
    player1_orientation = float(player1[0]) if type(player1[0]) == str else player1[0]
    

    angle = math.degrees(math.atan2(y_dist, x_dist))
    angle = (angle + 360) % 360
    
    left_vision_boundary = (360 + player1_orientation - half_cone_angle) % 360
    right_vision_boundary = (player1_orientation + half_cone_angle) % 360
    

    if angle_in_range(angle,left_vision_boundary,right_vision_boundary):
        return True
    
    return False

def is_in_distance(player1: tuple, player2: tuple, dist=MAX_DISTANCE) -> bool:
    distance_between_players = calculate_distance(player1, player2)
    if distance_between_players <= dist:
        return True
    
    return False

def is_blocking(player1: tuple, player2: tuple) -> bool:
    distance_between_players = calculate_distance(player1, player2)
    if distance_between_players <= BLOCKING_RADIUS:
        return True

    return False


def calculate_distance(player1: tuple, player2: tuple) -> float:
    y_dist = abs(player1[3] - player2[3])
    x_dist = abs(player1[2] - player2[2])

    return math.sqrt(x_dist**2 + y_dist**2)

In [3]:
DENT_CONE_ANGLE = 30
DENT_DISTANCE = 1.25
SIDE_CONE_ANGLES = 180

# 0 => None
# 1 => Box -> Left
# 2 => Dent
# 3 => Spill -> Right
def type_of_block_df_fn(row) -> str:    
    player1 = list(row[0:4])
    player2 = list(row[4:])
    
    if is_dent(player1, copy.copy(player2)):
        return "headup"
    if is_left_vision_cone(player1, copy.copy(player2)):
        return "left"
    elif is_right_vision_cone(player1, copy.copy(player2)):
        return "right"
    
    return "none"

def is_left_vision_cone(player1: tuple, player2: tuple) -> bool:
    player2[0] = (player2[0] + 90) % 360
    return is_in_angle(player2, player1, SIDE_CONE_ANGLES)

# Both players can see each other
# Only need to check player2 vision cone since we calculate blockType based on player1 earlier
def is_dent(player1: tuple, player2: tuple) -> bool:
    return is_in_vision_cone(player2, player1, DENT_CONE_ANGLE, DENT_DISTANCE)

def is_right_vision_cone(player1: tuple, player2: tuple) -> bool:
    player2[0] = (player2[0] - 90) % 360
    return is_in_angle(player2, player1, SIDE_CONE_ANGLES)
    

In [4]:
for week in range(1,10):

    # read all data
    players = pl.read_csv('../nfl-big-data-bowl-2024/players.csv')
    plays = pl.read_csv('../nfl-big-data-bowl-2024/plays.csv',infer_schema_length=100000)
    games = pl.read_csv('../nfl-big-data-bowl-2024/games.csv',infer_schema_length=10000)
    tracking = pl.read_csv(f'../nfl-big-data-bowl-2024/tracking_week_{week}.csv',infer_schema_length=10000)


    # normalize data
    players = players.with_columns([pl.col('nflId').cast(str)])
    plays = plays.join(games,on='gameId')
    plays = plays.with_columns([
        (pl.col('gameId').cast(str) + '-'
         + pl.col('playId').cast(str)).alias('uniquePlayId')
    ])

    tracking = tracking.with_columns(
        (pl.col('gameId').cast(str) + '-'
         + pl.col('playId').cast(str)).alias('uniquePlayId'),
        (pl.col('gameId').cast(str) + '-'
         + pl.col('playId').cast(str) + '-'
         + pl.col('nflId').cast(str)).alias('uniquePlayerId'),
    )

    # normalize position
    tracking=tracking.with_columns([
        pl.when(pl.col('playDirection')=='right').then(53.3-pl.col('y')).otherwise(pl.col('y')).alias('adjustedX'),
        pl.when(pl.col('playDirection')=='right').then(pl.col('x')).otherwise(120-pl.col('x')).alias('adjustedY')
    ])

    tracking=tracking.with_columns([
        pl.when(pl.col('event').is_in(['ball_snap','autoevent_ballsnap'])).then(pl.col('frameId')).otherwise(-1)
        .alias('startingFrameId'),
    ])
    tracking=tracking.with_columns([
        pl.col('startingFrameId').max().over(pl.col('uniquePlayId')),
    ])
    tracking=tracking.with_columns([
        (pl.col('frameId') - pl.col('startingFrameId')).alias('framesSinceSnap'),
    ])
    tracking = tracking.filter(pl.col('startingFrameId')!=-1)

    # normalize orientation 'o' and direction 'dir'
    # convert 'NA' to 0
    replacement_values = {'NA': '0'}
    tracking = tracking.with_columns(
        pl.col('o').apply(lambda x: replacement_values.get(x, x)),
        pl.col('dir').apply(lambda x: replacement_values.get(x, x)),
    )

    tracking=tracking.with_columns([
        pl.when(pl.col('playDirection')=='right')
        .then(pl.col('dir').cast(pl.Float64))
        .otherwise(180-pl.col('dir').cast(pl.Float64))
        .alias('adjustedDir'),

        pl.when(pl.col('playDirection')=='right')
        .then(pl.col('o').cast(pl.Float64))
        .otherwise((180+pl.col('o').cast(pl.Float64))%360)
        .alias('firstAdjustedO'),
    ])

    tracking=tracking.with_columns([
        pl.when(pl.col('firstAdjustedO') <= 180)
        .then(180-pl.col('firstAdjustedO'))
        .otherwise(540-pl.col('firstAdjustedO')).alias('adjustedO')
    ])


    labeled = tracking.filter(pl.col('framesSinceSnap').is_between(5,30))

    players = labeled.join(players,on='nflId',how='left')
    players = players.filter(pl.col('club')!='football')
    players = players.join(games.select(['gameId','homeTeamAbbr','visitorTeamAbbr']),on='gameId')
    players = players.with_columns([
        pl.when(pl.col('club')==pl.col('homeTeamAbbr'))
        .then(pl.col('visitorTeamAbbr'))
        .otherwise(pl.col('homeTeamAbbr'))
        .alias('opponentClub')
    ])


    players = players.join(
        players,
        left_on=['gameId','playId','frameId','club'],
        right_on=['gameId','playId','frameId','opponentClub'],
        suffix='Defender'
    )
    
    players = players.join(plays.select('uniquePlayId','possessionTeam',pl.col('ballCarrierId').cast(str))
                           ,on='uniquePlayId')
    players = players.filter(pl.col('club')==pl.col('possessionTeam')) # filter to only offensive players
    players = players.filter(pl.col('nflId')!=pl.col('ballCarrierId')) # remove the ball carrier
    players = players.filter(pl.col('position')!='QB') # remove the QB
    
    
    #filter to only players within 5 yards of each other in order to cut useless rows
    players = players.with_columns([
        ((pl.col('adjustedX')-pl.col('adjustedXDefender')).pow(2) +
        (pl.col('adjustedY')-pl.col('adjustedYDefender')).pow(2)).sqrt()
        .alias('distance')
    ]).filter(pl.col('distance')<5)



    blocking_df = players.select(
        'adjustedO', 'adjustedDir', 'adjustedX', 'adjustedY', 'adjustedODefender',
        'adjustedDirDefender', 'adjustedXDefender', 'adjustedYDefender'
    ).apply(looking_to_block_or_blocking_df_fn)

    blocking_df = blocking_df.to_series()
    players = players.with_columns(blockType=blocking_df)

    is_blocking_df = players.select(
        'adjustedO', 'adjustedDir', 'adjustedX', 'adjustedY', 'adjustedODefender', 
        'adjustedDirDefender', 'adjustedXDefender', 'adjustedYDefender'
    ).apply(looking_to_block_or_blocking_df_fn)

    is_blocking_df = is_blocking_df.to_series()
    players = players.with_columns(blockType=is_blocking_df)


    type_of_block_df = players.select(
        'adjustedO', 'adjustedDir', 'adjustedX', 'adjustedY', 'adjustedODefender', 
        'adjustedDirDefender', 'adjustedXDefender', 'adjustedYDefender', 'blockType'
    ).apply(type_of_block_df_fn)

    players = players.with_columns(blockLocation=type_of_block_df.to_series())
    players = players.select([
        'gameId',
        'playId',
        'nflId',
        'nflIdDefender',
        'framesSinceSnap',
        'distance',
        'adjustedXDefender',
        'adjustedYDefender',
        'blockType',
        'blockLocation',
    ])


    players.write_parquet(f'coneAnalysisWeek{week}.parquet')