# Extract and Transform the Injury and Concussion Tracking Data

This data is much more voluminous than the qualitative data. Maintaining the datatypes is paramount to keeping the size smaller. Again, this will be the set of functions for all of the transformation processing of the tracking data and the listing of the .py files used to actually process and save the data. 

I think that I can process all of the data as a dataframe for most of the processing. There were issues in collecting the dataframe from a lazyframe following the impulse calculations, so I am foregoing that attempt and going back to what was already working. 

Since the summary table uses aggregates, I may be able to aggregate within a lazyframe, but it may not be so expensive that I can just do this as a dataframe as well, since this worked before. Since I will be performing the aggregation after the other calcualtions are performed, there is no reason not to do this in chunks. 

The functions that need to be performed are as follows: 

- Data Shrinker
The optimized data should be saved as a parquet file for the whole dataset. Once collected, this datset can be broken into chunks, making sure that it is not divided between PlayKey. This WILL be critical, since some of the calculations are based on a time of zero and iterate through. 

- Angle Corrector
- Velocity Calculator
- Body Builder
- Impulse Calculator

Following these four functions, each chunk should be saved as a parquet file. 

In order to create the Quantitative Set, I will need to concatenate each of these files into one. However, since I can use these as lazyframes, I can utilize the parallelization of aggregate functions when processing during the collect(). 


In [1]:
import polars as pl
import numpy as np
import math
import os

injury_tracking_path = "F:/Data/nfl-playing-surface-analytics/PlayerTrackData.csv"
optimized_path = "F:/Data/Processing_data/OptimizedTrackData.parquet"
output_dir = "F:/Data/Processing_data/injury_output"

In [2]:
def calculate_angle_difference(angle1, angle2):
    """
    Calculate the smallest angle difference between two angles 
    using trigonometric functions, accounting for edge cases.
    """
    import numpy as np # type: ignore

    sin_diff = np.sin(np.radians(angle2 - angle1))
    cos_diff = np.cos(np.radians(angle2 - angle1))
    return np.degrees(np.arctan2(sin_diff, cos_diff))

def angle_corrector(df):
    """
    Make corrections to angles to reduce fringe errors at 360
    """
    import polars as pl # type: ignore

    try: 
        df = df.with_columns([
            ((pl.col("dir") + 180) % 360 - 180).alias("dir")
            , ((pl.col("o") + 180) % 360 - 180).alias("o")
        ]).with_columns(
            (calculate_angle_difference(pl.col("dir"), pl.col("o"))).abs().round(2).alias("Angle_Diff")
            )
        
        return df
    
    except Exception as e: 
        print(f"An error occurred during calculate_angle_difference: {e}")
        return None

In [3]:
def velocity_calculator(df):
    """
    Using the (X,Y) and time columns, perform calculations based on the 
    difference between two rows to find displacement, speed, direction 
    of motion, velocity in x and y components, and the angular velocities 
    of the direction of motion and orientations 
    """
    import numpy as np # type: ignore
    import polars as pl # type: ignore

    try:
        return df.with_columns([
            # Convert 'o' and 'dir' to radians
            (pl.col("o") * np.pi / 180).alias("o_rad"),
            (pl.col("dir") * np.pi / 180).alias("dir_rad")
        ]).with_columns([
            # Pre-calculate shifted values
            pl.col("x").shift(1).over("PlayKey").alias("prev_x")
            , pl.col("y").shift(1).over("PlayKey").alias("prev_y")
            # , pl.col("time").shift(1).over("PlayKey").alias("prev_time")
            , pl.col("dir_rad").shift(1).over("PlayKey").alias("prev_dir")
            , pl.col("o_rad").shift(1).over("PlayKey").alias("prev_o")
        ]).with_columns([
            # Calculate the component displacements 
            (pl.col("x") - pl.col("prev_x")).alias("dx")
            , (pl.col("y") - pl.col("prev_y")).alias("dy")
        ]).with_columns([
            # Calculate displacement
            ((pl.col("dx")**2 + pl.col("dy")**2)**0.5).alias("Displacement")
        ]).with_columns([
            # Calculate speed
            (pl.col("Displacement") / 0.1).alias("Speed")
            # Calculate direction
            , (np.degrees(np.arctan2(pl.col("dx"), pl.col("dy")))).alias("Direction")
            # Calculate velocity components
            , (pl.col("dx") / 0.1).alias("vx")
            , (pl.col("dy") / 0.1).alias("vy")
            # Calculate angular velocities
            , ((pl.col("dir_rad") - pl.col("prev_dir")) / 0.1).alias("omega_dir")
            , ((pl.col("o_rad") - pl.col("prev_o")) / 0.1).alias("omega_o")
        ]).with_columns([
            ((pl.col("omega_dir") - pl.col("omega_o")).abs()).alias("omega_diff")
        ]).drop([
            "prev_x", "prev_y", "prev_dir", "prev_o", "dx", "dy", "o_rad", "dir_rad"
        ])
    
    except Exception as e: 
        print(f"An error occurred during velocity_calculator: {e}")
        return None

In [4]:
def body_builder(df):
    """
    This uses averages collected for height, weight, and chest radius for each position. This information
    is used to determine the momentum and impulse rather than just looking at velocities in the analysis. Chest
    radius is needed for angular moment of inertia as a rotating cyliner.
    """
    import polars as pl # type: ignore

    try: 
        body_data = pl.LazyFrame({
            "Position": ["QB", "RB", "FB", "WR", "TE", "T", "G", "C", "DE", "DT", "NT", "LB", "OLB", "MLB", "CB", "S", "K", "P", "SS", "ILB", "FS", "LS", "DB"]
            # , "Position_Name": ["Quarterback", "Running Back", "Fullback", "Wide Receiver", "Tight End", "Tackle", "Guard", "Center", "Defensive End", "Defensive Tackle", "Nose Tackle", "Linebacker", "Outside Linebacker", "Middle Linebacker", "Cornerback", "Safety", "Kicker", "Punter", "Strong Safety", "Inside Linebacker", "Free Safety", "Long Snapper", "Defensive Back"]
            , "Height_m": [1.91, 1.79, 1.85, 1.88, 1.96, 1.97, 1.90, 1.87, 1.97, 1.92, 1.88, 1.90, 1.90, 1.87, 1.82, 1.84, 1.83, 1.88, 1.84, 1.90, 1.84, 1.88, 1.82]
            , "Weight_kg": [102.1, 95.3, 111.1, 90.7, 114.6, 140.6, 141.8, 136.1, 120.2, 141.8, 152.0, 110.0, 108.9, 113.4, 87.4, 95.9, 92.08, 97.52, 95.9, 110.0, 95.9, 108.86, 87.4]
            , "Chest_rad_m": [0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191, 0.191]
            })

        PlayList_path = "F:/Data/nfl-playing-surface-analytics/PlayList.csv"
        position = pl.scan_csv(PlayList_path).select(["PlayKey", "Position"])
        position = position.join(
                body_data
                , on='Position'
                , how='left'
                )

        df = df.join(
                position
                , on='PlayKey'
                , how='left'
            )    
        

        return df.filter(pl.col('Position').is_not_null())    
        
    except Exception as e: 
        print(f"An error occurred during body_builder: {e}")
        return None

In [5]:
def impulse_calculator(df):
    """
    Using the (X,Y) and time columns, perform calculations based on the velocities and changes 
    in velocites along with player mass to get the momentum and impulse, a measure that can 
    be assessed along with medical data related to concussions and injuries
    """
    import numpy as np # type: ignore
    import polars as pl # type: ignore
    

    try: 
        return df.with_columns([
            # Calculate the linear momentum for each instant
            (pl.col('vx') * pl.col('Weight_kg')).alias('px')
            , (pl.col('vy') * pl.col('Weight_kg')).alias('py')

            # Calculate the moment of inertia of a rotating upright body (1/12 mr^2)
            , (1/12 * pl.col('Weight_kg') * (pl.col('Chest_rad_m')**2)).alias('moment')
            
            # Calculate the moment of inertia of the upper body turning upright with respect to waist (70% mass)
            , (1/12 * (pl.col('Weight_kg')*0.7) * (pl.col('Chest_rad_m')**2)).alias('moment_upper')
        
        ]).with_columns([
            # Calculate the magnitude of linear momentum
            ((pl.col("px")**2 + pl.col("py")**2)**0.5).alias("p_magnitude")
            
            # Calculate the angular momentum for the direction
            , (pl.col('omega_dir')*pl.col('moment')).alias('L_dir')

            # Calculate the angular momentum of the upper body with respect to lower
            , (pl.col('omega_diff')*pl.col('moment_upper')).alias('L_diff')


        ]).with_columns([
            # Pre-calculate shifted values for linear and angular momenta
            pl.col("px").shift(1).over("PlayKey").alias("prev_px")
            , pl.col("py").shift(1).over("PlayKey").alias("prev_py")
            , pl.col("L_dir").shift(1).over("PlayKey").alias("prev_L_dir")
            , pl.col("L_diff").shift(1).over("PlayKey").alias("prev_L_diff")
            
        ]).with_columns([
            # Calculate impulse, J, which is the change in linear momentum 
            ((pl.col("px") - pl.col("prev_px"))).alias("Jx")
            , ((pl.col("py") - pl.col("prev_py"))).alias("Jy")
            
        ]).with_columns([
            # Calculate the magnitude of linear momentum
            ((pl.col("Jx")**2 + pl.col("Jy")**2)**0.5).alias("J_magnitude")

            # Calculate torque as the change in angular momentum L over the change in time
            , (((pl.col("L_dir") - pl.col("prev_L_dir"))) / 0.1).alias("torque")
            , (((pl.col("L_diff") - pl.col("prev_L_diff"))) / 0.1).alias("torque_internal")

        ]).drop([
            "prev_L_dir", "prev_px", "prev_py", "prev_L_diff"
        ])
    
    except Exception as e: 
        print(f"An error occurred during the impulse_calculator, which surprises no one.")
        return None

In [7]:
def create_initial_lazyframe(injury_tracking_path):
    return pl.scan_csv(injury_tracking_path, truncate_ragged_lines=True, infer_schema_length=10000, ignore_errors=True).drop(['event', 's', 'dis'])


In [11]:
def data_shrinker(df, verbose=True):
    """
    Optimize memory usage of a Polars dataframe for both categorical and numeric data.
    """
    import polars as pl # type: ignore
    import numpy as np # type: ignore

    start_mem = df.estimated_size("mb")
    if verbose:
        print(f'Memory usage of dataframe is {start_mem:.2f} MB')


    for col in df.columns:
        col_type = df[col].dtype

        if col_type in [pl.Int8, pl.Int16, pl.Int32, pl.Int64, pl.Float32, pl.Float64]:
            # Handle missing values
            if df[col].null_count() > 0:
                c_min = df[col].min() if df[col].min() is not None else float('nan')
                c_max = df[col].max() if df[col].max() is not None else float('nan')
            else:
                c_min = df[col].min()
                c_max = df[col].max()

            if col_type.is_integer():
                if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int8))
                elif c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int16))
                elif c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int32))
                else:
                    df = df.with_columns(pl.col(col).cast(pl.Int64))
            else:
                if c_min >= np.finfo(np.float32).min and c_max <= np.finfo(np.float32).max:
                    df = df.with_columns(pl.col(col).cast(pl.Float32))
                else:
                    df = df.with_columns(pl.col(col).cast(pl.Float64))

        elif col_type == pl.Utf8:
            if col != "PlayKey" and df[col].n_unique() / len(df) < 0.5:  # If less than 50% unique values
                df = df.with_columns(pl.col(col).cast(pl.Categorical))

    end_mem = df.estimated_size("mb")

    optimized_schema = df.schema

    if verbose:
        print(f'Memory usage after optimization is: {end_mem:.2f} MB')
        print(f'Decreased by {100 * (start_mem - end_mem) / start_mem:.1f}%')

    return df, optimized_schema

In [22]:
def optimize_and_save_lazyframe(injury_tracking_path, optimized_path):
    """
    This function opens the original tracking csv, reduces size by casting to less expensive data types, 
    and then saves the optimized tracking dataset to file. It does not return a dataframe, so any use will 
    be expected to call from the saved optimized file. 
    """
    import polars as pl

    df = create_initial_lazyframe(injury_tracking_path).collect(streaming=True)
    optimized_df, optimized_schema = data_shrinker(df)
    
    # Cast the DataFrame columns to the types specified in optimized_schema
    for column, dtype in optimized_schema.items():
        optimized_df = optimized_df.with_columns(pl.col(column).cast(dtype))

    # Write the DataFrame to a Parquet file
    optimized_df.write_parquet(optimized_path)

    print(f"Saved optimized data to {optimized_path}")


In [23]:
optimize_and_save_lazyframe(injury_tracking_path, optimized_path)

Memory usage of dataframe is 3674.06 MB
Memory usage after optimization is: 2217.48 MB
Decreased by 39.6%
Saved optimized data to F:/Data/Processing_data/OptimizedTrackData.parquet


At this point, the optimized df has been written as a parquet file. I returned this file and opened the new saved file to verify that the save is maintaining the reduced data structure, and this has been verified. 

In [21]:
df = pl.read_parquet(optimized_path)

In [None]:
# def lazy_data_shrinker(lf, verbose=True):
#     """
#     Optimize memory usage of a Polars LazyFrame for both categorical and numeric data.
#     """
#     import polars as pl
#     import numpy as np

#     if verbose:
#         print("Starting lazy data shrinking process...")

#     # Collect schema to avoid repeated schema resolution
#     schema = lf.collect_schema()

#     for col, col_type in schema.items():
#         if col_type in [pl.Int8, pl.Int16, pl.Int32, pl.Int64, pl.Float32, pl.Float64]:
#             try:
#                 # For numeric columns, we'll use statistics to determine the appropriate type
#                 stats = lf.select([
#                     pl.col(col).min().alias("min"),
#                     pl.col(col).max().alias("max"),
#                     pl.col(col).null_count().alias("null_count")
#                 ]).collect()
                
#                 c_min, c_max, null_count = stats[0, "min"], stats[0, "max"], stats[0, "null_count"]

#                 if col_type.is_integer():
#                     if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max:
#                         lf = lf.with_columns(pl.col(col).cast(pl.Int8))
#                     elif c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max:
#                         lf = lf.with_columns(pl.col(col).cast(pl.Int16))
#                     elif c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max:
#                         lf = lf.with_columns(pl.col(col).cast(pl.Int32))
#                     else:
#                         lf = lf.with_columns(pl.col(col).cast(pl.Int64))
#                 elif col_type.is_float():
#                     lf = lf.with_columns(pl.col(col).cast(pl.Float64))
#             except Exception as e:
#                 print(f"Error processing column {col}: {str(e)}")
#                 print(f"Keeping original data type for column {col}")

#         elif col_type == pl.Utf8:
#             if col != "PlayKey":
#                 try:
#                     # For string columns, we'll check the cardinality to decide if it should be categorical
#                     unique_count = lf.select(pl.col(col).n_unique()).collect()[0, 0]
#                     total_count = lf.select(pl.len()).collect()[0, 0]
#                     if unique_count / total_count < 0.5:  # If less than 50% unique values
#                         lf = lf.with_columns(pl.col(col).cast(pl.Categorical))
#                 except Exception as e:
#                     print(f"Error processing column {col}: {str(e)}")
#                     print(f"Keeping original data type for column {col}")

#     if verbose:
#         print("Lazy data shrinking process completed.")

#     optimized_schema = lf.schema

#     return lf, optimized_schema


In [None]:
# Run the optimization to save the dataframe
optimize_and_save_lazyframe(injury_tracking_path=injury_tracking_path, optimized_path=optimized_path)

In [None]:
def process_and_save_playkey_group(lazy_df, playkeys, output_dir, group_number):
    # Filter the lazy DataFrame for the specific PlayKeys
    group_df = lazy_df.filter(pl.col("PlayKey").is_in(playkeys))
    
    # Processing
    group_df = (group_df
                .pipe(angle_corrector)
                .pipe(velocity_calculator)
                .pipe(body_builder)
                .pipe(impulse_calculator))

    # Create the output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Save the DataFrame for this group as a Parquet file
    output_file = os.path.join(output_dir, f"group_{group_number}.parquet")
    group_df.collect().write_parquet(output_file)
    print(f"Saved data for PlayKey group: {group_number}")


In [None]:
def process_file(optimized_path, output_dir, group_size=20000):
    import math
    
    # Scan the Parquet file (which is already an optimized LazyFrame)
    lazy_df = pl.scan_parquet(optimized_path)
    
    # Get unique PlayKey values
    unique_playkeys = lazy_df.select(pl.col("PlayKey").unique()).collect()["PlayKey"].to_list()
    
    # Calculate the number of groups
    num_groups = math.ceil(len(unique_playkeys) / group_size)
    
    # Process each group of PlayKeys
    for i in range(num_groups):
        start_idx = i * group_size
        end_idx = min((i + 1) * group_size, len(unique_playkeys))
        playkey_group = unique_playkeys[start_idx:end_idx]
        process_and_save_playkey_group(lazy_df, playkey_group, output_dir, i + 1)

    print("Processing complete.")


In [None]:
def transform_injury_data(optimized_path, output_dir):
    """
    Full transform process for the surface injury data.
    Output options are for returning a summary df to the database or the full tracking with 
    additional columns added
    """

    if __name__ == "__main__":
        process_file(optimized_path, output_dir)

In [None]:
if __name__ == "__main__":
    transform_injury_data(optimized_path=optimized_path, output_dir=output_dir)

In [None]:
df = pl.read_parquet("F:/Data/Processing_data/output/group_1.parquet")

In [None]:
df.head()

In [None]:
df_optimized = data_shrinker(df)

In [None]:
df_optimized.head()

For the Summary table, I don't need all of these parameters. I will only need the following: 

- Playkey
- Displacement
- x
- y
- Angle_Diff
- Speed
- J_magnitude
- torque
- torque_internal
