Base repo of data: https://github.com/mlsedigital/SPL-open-data

Great xyz data dict: https://www.inpredictable.com/2021/01/nba-player-shooting-motions-data-dump.html


## Ultimate Goals:
* Find optimal shooting form/release point
  * Include the feedback system based on optimal shooting form and release point
  * ### Use shooting meter from NBA 2k to show good vs bad releases and body movements
  * ^use the motions that:
    * produce good results (a make or enough kinetic energy for a near make)
    * are within certain boundaries where the player is comfortable AND in right shooting form
    * are within certain ranges (this inch to this inch is the best motion area for this player, etc.)
    * Compared to Popular Shooters to make recommendations to shoot more like someone like Klay Thompson or Buddy Hield (see if the data from the xyz data dictionary has good shooters to make recommendations to shoot like them)
* Exhaustion levels and optimal energy max and min
* Shot outcome prediction


1. Finding Optimal Shooting Form/Release Point

    Detailed Analysis of Body Movements: Include analysis of body kinematics (positions and velocities of limbs) at the time of release. Use joint coordinates to extract features like elbow angle, shoulder rotation, and wrist flexion at the release frame.
    Machine Learning for Form Classification: Train a model to classify "good" vs. "bad" shooting forms using successful shot data (makes or near-makes) and compare them against unsuccessful shots.
    Feedback Mechanism: Implement a feedback system that suggests adjustments based on the comparison of current shooting mechanics to historical optimal ones.

2. Shooting Meter Simulation (NBA 2K-Style)

    Visual Feedback System: Develop a visualization tool that overlays a “meter” on shot video frames, indicating the quality of the release in real-time. This can be based on a scoring function derived from the shooting form features and ball dynamics.
    Comfort Zone Identification: Use clustering algorithms (e.g., K-means) to identify comfortable ranges of motion based on historical shooting data for individual players.

3. Shot Outcome Prediction

    Feature Engineering for Outcome Modeling: Include additional features such as:
        Kinetic Energy Calculation: KE=12mv2KE=21​mv2 to see if the ball had sufficient energy for a make.
        Entry Angle and Trajectory Analysis: Analyze whether the angle at which the ball approaches the hoop aligns with optimal scoring trajectories.
    Model Development: Build a machine learning model (e.g., logistic regression, XGBoost) that predicts shot outcomes based on ball speed, entry angle, release point, and body dynamics.
    Training with Data Augmentation: Use synthetic data generation to include a wide variety of shot scenarios.

4. Exhaustion Levels and Energy Management

    Tracking Player Movements: Use the coordinates of major joints (e.g., knees, hips) to estimate a player's exertion level using metrics like the average vertical displacement over time.
    Velocity and Acceleration Patterns: Monitor changes in the velocity and acceleration of the player's body parts throughout a game to detect fatigue.
    Feature Integration: Create features such as average speed and distance covered leading up to the shot to include in the predictive model.

5. shot simulator: 
    if by these metrics we can simulate the shot to a nearby hoop. We can set the hoop in a set location (or when using spatial, we'd pick the location), and set it up to virtualize the experience with yolo/opencv where when you make those motions to show shooting motion, you can show where the ball might go (even without a ball in hand).

data example:

  {'frame': 239,
   'time': 7966,
   'data': {'ball': [nan, nan, nan],
    'player': {'R_EYE': [27.106, 0.267, 5.361],
     'L_EYE': [26.916, 0.385, 5.344],
     'NOSE': [26.976, 0.366, 5.241],
     'R_EAR': [27.207, -0.074, 5.422],
     'L_EAR': [26.702, 0.18, 5.398],
     'R_SHOULDER': [27.225, -0.603, 4.782],
     'L_SHOULDER': [26.435, 0.176, 4.794],
     'R_ELBOW': [27.342, -0.838, 3.813],
     'L_ELBOW': [26.189, 0.234, 3.858],
     'R_WRIST': [27.755, -0.728, 2.991],
     'L_WRIST': [26.159, 0.492, 3.078],
     'R_HIP': [27.326, -0.338, 3.038],
     'L_HIP': [26.739, 0.148, 3.039],
     'R_KNEE': [27.777, -0.104, 1.833],
     'L_KNEE': [26.884, 0.033, 1.609],
     'R_ANKLE': [27.453, -0.603, 0.539],
     'L_ANKLE': [26.859, -0.166, 0.305],
     'R_1STFINGER': [27.711, -0.642, 2.765],
     'R_5THFINGER': [27.644, -0.832, 2.7],
     'L_1STFINGER': [26.186, 0.402, 2.85],
     'L_5THFINGER': [26.05, 0.382, 2.788],
     'R_1STTOE': [27.896, -0.303, 0.151],
     'R_5THTOE': [27.887, -0.433, 0.175],
     'L_1STTOE': [27.223, 0.149, -0.037],
     'L_5THTOE': [27.066, 0.17, -0.029],
     'R_CALC': [27.344, -0.697, 0.346],
     'L_CALC': [26.812, -0.269, 0.16]}}}]}

The json contains shot metadata and a list of dictionaries, where each dictionary of the list corresponds to a single frame of data. The keys of the dictionary are as follows:

    frame: The frame number, starting from 0.
    time: The time in milliseconds, since the beginning of the trial. Since the frame rate is 30fps, the difference between subsequent frames' times will be 33 or 34 milliseconds (due to rounding).
    landing_x: Landing position x coordinate on hoop plane.
    landing_y: Landing position y coordinate on hoop plane.
    entry_angle: Angle at which ball breaks (enters) hoop plane.
    tracking: A dictionary that contains the following xyz data corresponding to the frame:
        ball: The x, y, and z coordinates of the ball center.
        player: A dictionary containing x, y, and z coordinates for all of the person's keypoints. Each key in this dictionary corresponds to a specific keypoint.

## Participant information: SPL-Open-Data/basketball/freethrow/participant_information.json

{
    "participant_id": "P0001",
    "height_in_meters": 1.91,
    "weight__in_kg": 90.7
}

In [6]:
%%writefile ml/data_load_prepare/load_and_parse.py


import os
import json

def load_and_parse_json(file_path, debug=False):
    if debug:
        print(f"Debug: Loading and parsing file: {file_path}")
    
    try:
        with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
            data = json.load(f)
        
        trial_id = data['trial_id']
        result = 1 if data['result'] == 'made' else 0
        landing_x = data['landing_x']
        landing_y = data['landing_y']
        entry_angle = data['entry_angle']
        release_frame = data.get('release_frame', None)

        if debug:
            print(f"Debug: Trial ID: {trial_id}, Result: {result}, Release Frame: {release_frame}")
        
        return data, trial_id, result, landing_x, landing_y, entry_angle, release_frame
    except Exception as e:
        print(f"Error: Failed to load or parse JSON file: {file_path}. Exception: {e}")
        return None, None, None, None, None, None, None

def load_single_ft_and_parse(file_path, debug=False):
    data = load_and_parse_json(file_path, debug=debug)
    return data

# Test the function
if __name__ == "__main__":
    test_file = "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json"
    load_single_ft_and_parse(test_file, debug=False)


Overwriting ml/data_load_prepare/load_and_parse.py


In [7]:
%%writefile ml/data_load_prepare/dataframe_creation.py

import pandas as pd
import numpy as np

def create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False):
    frame_data = []
    if debug:
        print("Debug: Initializing dataframe creation with given trial metadata.")

    # Process each frame in the data
    for i, frame in enumerate(data['tracking']):
        frame_time = frame['time']
        ball_pos = frame['data'].get('ball', [None, None, None])
        player_pos = frame['data']['player']

        if debug:
            print(f"Debug: Processing frame {i} with time {frame_time}. Ball position: {ball_pos}")

        # Flattening frame data with player and ball positions
        flat_frame = {
            'trial_id': trial_id,
            'result': result,
            'landing_x': landing_x,
            'landing_y': landing_y,
            'entry_angle': entry_angle,
            'frame_time': frame_time,
            'ball_x': ball_pos[0] if ball_pos[0] is not None else np.nan,
            'ball_y': ball_pos[1] if ball_pos[1] is not None else np.nan,
            'ball_z': ball_pos[2] if ball_pos[2] is not None else np.nan,
        }

        for part, coords in player_pos.items():
            flat_frame[f'{part}_x'] = coords[0]
            flat_frame[f'{part}_y'] = coords[1]
            flat_frame[f'{part}_z'] = coords[2]

        frame_data.append(flat_frame)
    
    df = pd.DataFrame(frame_data)
    if debug:
        print("Debug: DataFrame created from frame data. Dimensions:", df.shape)

    # Set the index to the sequential frame number
    df.index = np.arange(len(df))
    if debug:
        print(f"Debug: DataFrame index set to sequential frame numbers. Index range: {df.index.min()} to {df.index.max()}")

    # Filter out rows where the ball is not being tracked
    original_len = len(df)
    df.dropna(subset=['ball_x', 'ball_y', 'ball_z'], inplace=True)
    filtered_len = len(df)
    df.reset_index(drop=True, inplace=True)
    if debug:
        print(f"Debug: DataFrame filtered to exclude rows where ball is not tracked. Rows before: {original_len}, after: {filtered_len}")

    if debug:
        # Contextual debug output
        print(f"Debug: Created DataFrame for Trial ID: {trial_id}")
        print("Debug: Columns available in DataFrame:", df.columns)
        print("\nDebug: Data types of each column for validation:", df.dtypes)
        print("\nDebug: Sample row from DataFrame to inspect initial data structure:", df.iloc[0] if not df.empty else "DataFrame is empty.")
        print("\nDebug: Count of null values in each column to check data completeness:", df.isna().sum())
        print("\nDebug: Summary statistics for numeric columns, providing insight into data range and variance:", df.describe())

    return df


def main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False):
    if debug:
        print("Debug: Calling main_create_dataframe with trial data and parameters.")
    df = create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=debug)
    return df


# Test the function
if __name__ == "__main__":
    # from data_loading.load_and_parse import load_single_ft_and_parse
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse("../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json", debug=False)
    main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)


Overwriting ml/data_load_prepare/dataframe_creation.py


In [8]:
%%writefile ml/data_load_prepare/integrity_checks.py


def check_data_integrity(df, debug=False):
    null_counts = df.isna().sum()
    
    if debug:
        print("\nDebug: Data integrity check - Missing counts for all columns:")
        print(null_counts)
    
    problematic_columns = null_counts[null_counts > 0]
    
    if not problematic_columns.empty:
        print("\nWarning: The following columns have missing data:")
        for col, count in problematic_columns.items():
            print(f"Column '{col}' has {count} missing values.")
    else:
        print("\nInfo: No columns with missing data detected.")
    
    return problematic_columns

    

# Test the function
if __name__ == "__main__":
    # from dataframe_creation import main_create_dataframe
    # from load_and_parse import load_single_ft_and_parse
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse("../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json")
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    check_data_integrity(df, debug=False)


Overwriting ml/data_load_prepare/integrity_checks.py


In [9]:
%%writefile ml/data_load_prepare/velocity_and_speed_calc.py
import numpy as np

def calculate_ball_speed_velocity_direction(df, debug=False):
    # Calculate ball speed using 3D distance and time differences
    df['ball_speed'] = (np.sqrt(
        (df['ball_x'].diff()**2) +
        (df['ball_y'].diff()**2) +
        (df['ball_z'].diff()**2)
    ) + np.sqrt(
        (df['ball_x'].shift(-1).diff()**2) +
        (df['ball_y'].shift(-1).diff()**2) +
        (df['ball_z'].shift(-1).diff()**2)
    )) / (df['frame_time'].diff() + df['frame_time'].shift(-1).diff())

    # Calculate ball velocity components along x, y, z axes
    df['ball_velocity_x'] = (df['ball_x'].diff() / df['frame_time'].diff() +
                       df['ball_x'].shift(-1).diff() / df['frame_time'].shift(-1).diff()) / 2
    df['ball_velocity_y'] = (df['ball_y'].diff() / df['frame_time'].diff() +
                       df['ball_y'].shift(-1).diff() / df['frame_time'].shift(-1).diff()) / 2
    df['ball_velocity_z'] = (df['ball_z'].diff() / df['frame_time'].diff() +
                       df['ball_z'].shift(-1).diff() / df['frame_time'].shift(-1).diff()) / 2
    df['overall_ball_velocity'] = np.sqrt(df['ball_velocity_x']**2 + df['ball_velocity_y']**2 + df['ball_velocity_z']**2)

    # Calculate normalized direction components (unit vectors) along x, y, z
    df['ball_direction_x'] = df['ball_velocity_x'] / df['overall_ball_velocity']
    df['ball_direction_y'] = df['ball_velocity_y'] / df['overall_ball_velocity']
    df['ball_direction_z'] = df['ball_velocity_z'] / df['overall_ball_velocity']

    # Filter out rows with NaN direction values caused by zero velocity (overall_ball_velocity = 0)
    df = df.dropna(subset=['ball_direction_x', 'ball_direction_y', 'ball_direction_z']).reset_index(drop=True)

    # Debugging to verify direction accuracy
    if debug:
        print("Debug: Calculated ball speed, velocity, and direction.")
        print("Debug: NaN counts for key columns after filtering:")
        print(df[['ball_speed', 'ball_velocity_x', 'ball_velocity_y', 'ball_velocity_z', 'overall_ball_velocity', 'ball_direction_x', 'ball_direction_y', 'ball_direction_z']].isna().sum())
        
        # Verify direction correctness by comparing to velocity components
        df['computed_ball_velocity_x'] = df['ball_direction_x'] * df['overall_ball_velocity']
        df['computed_ball_velocity_y'] = df['ball_direction_y'] * df['overall_ball_velocity']
        df['computed_ball_velocity_z'] = df['ball_direction_z'] * df['overall_ball_velocity']
        
        # Check for discrepancies between original and computed velocity components
        discrepancy_x = np.abs(df['ball_velocity_x'] - df['computed_ball_velocity_x']).mean()
        discrepancy_y = np.abs(df['ball_velocity_y'] - df['computed_ball_velocity_y']).mean()
        discrepancy_z = np.abs(df['ball_velocity_z'] - df['computed_ball_velocity_z']).mean()
        
        print(f"Debug: Average discrepancy for ball_velocity_x: {discrepancy_x:.5f}")
        print(f"Debug: Average discrepancy for ball_velocity_y: {discrepancy_y:.5f}")
        print(f"Debug: Average discrepancy for ball_velocity_z: {discrepancy_z:.5f}")
        
        # Output sample data for manual inspection
        print("Sample data around release frame:")
        print(df[['frame_time', 'ball_speed', 'ball_velocity_x', 'computed_ball_velocity_x', 'ball_velocity_y', 'computed_ball_velocity_y', 'ball_velocity_z', 'computed_ball_velocity_z', 'overall_ball_velocity',
                  'ball_direction_x', 'ball_direction_y', 'ball_direction_z']].head(10))

    return df



# Test the function
if __name__ == "__main__":
    # from data_preprocessing.dataframe_creation import main_create_dataframe
    # from data_loading.load_and_parse import load_single_ft_and_parse
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse("../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json")
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    df = calculate_ball_speed_velocity_direction(df, debug=False)



Overwriting ml/data_load_prepare/velocity_and_speed_calc.py


In [10]:
%%writefile ml/data_load_prepare/phase_labeling.py

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

def calculate_distance(x1, y1, z1, x2, y2, z2):
    """
    Calculate the Euclidean distance between two 3D points.

    Parameters:
    - x1, y1, z1: Coordinates of the first point.
    - x2, y2, z2: Coordinates of the second point.

    Returns:
    - The Euclidean distance as a float.
    """
    return np.sqrt((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)

def label_ball_in_hands(df, hand_threshold=0.4, debug=False):
    """
    Label when the ball is in the hands by calculating distances between the ball and finger positions.

    Parameters:
    - df: DataFrame containing ball and hand joint positions.
    - hand_threshold: Distance threshold to determine if the ball is in hand.
    - debug: If True, prints debug information.

    Returns:
    - df: DataFrame with 'ball_in_hands' column indicating if the ball is in hand (1) or not (0).
    """
    df['dist_ball_R_1STFINGER'] = calculate_distance(
        df['ball_x'], df['ball_y'], df['ball_z'],
        df['R_1STFINGER_x'], df['R_1STFINGER_y'], df['R_1STFINGER_z']
    )
    df['dist_ball_R_5THFINGER'] = calculate_distance(
        df['ball_x'], df['ball_y'], df['ball_z'],
        df['R_5THFINGER_x'], df['R_5THFINGER_y'], df['R_5THFINGER_z']
    )
    df['dist_ball_L_1STFINGER'] = calculate_distance(
        df['ball_x'], df['ball_y'], df['ball_z'],
        df['L_1STFINGER_x'], df['L_1STFINGER_y'], df['L_1STFINGER_z']
    )
    df['dist_ball_L_5THFINGER'] = calculate_distance(
        df['ball_x'], df['ball_y'], df['ball_z'],
        df['L_5THFINGER_x'], df['L_5THFINGER_y'], df['L_5THFINGER_z']
    )
    # Determine if ball is in hand based on threshold
    df['ball_in_hands'] = ((df['dist_ball_R_1STFINGER'] < hand_threshold) |
                           (df['dist_ball_R_5THFINGER'] < hand_threshold) |
                           (df['dist_ball_L_1STFINGER'] < hand_threshold) |
                           (df['dist_ball_L_5THFINGER'] < hand_threshold)).astype(int)
    
    if debug:
        print("Debug: Ball-in-hand labeling completed.")
        print("Debug: Ball-in-hand table columns =", df.columns)
        print("Debug: Ball-in-hand table additions example =", df[['dist_ball_L_5THFINGER', 'dist_ball_L_1STFINGER']].head(5))
    return df

def label_shooting_motion(df, debug=False):
    """
    Label frames indicating shooting motion based on ball position relative to average shoulder height,
    extending the motion for 5 frames after the ball leaves the hands.

    Parameters:
    - df: DataFrame with ball and shoulder data.
    - debug: If True, prints debug information.

    Returns:
    - df: DataFrame with 'shooting_motion' column (1 for shooting motion, 0 otherwise).
    """
    df = label_ball_in_hands(df, debug=debug)
    df['shooting_motion'] = 0
    df['avg_shoulder_height'] = (df['R_SHOULDER_z'] + df['L_SHOULDER_z']) / 2

    # Identify shooting motion conditionally
    start_motion_condition = (df['ball_in_hands'] == 1) & (df['ball_z'] >= df['avg_shoulder_height'])
    df.loc[start_motion_condition, 'shooting_motion'] = 1

    # Track when motion starts and propagate shooting motion
    df['motion_group'] = start_motion_condition.cumsum()  # Grouping identifier for shooting motion
    shooting_groups = df['motion_group'].unique()
    shooting_groups = shooting_groups[shooting_groups > 0]  # Ignore group 0 (non-shooting frames)

    # Extend shooting motion for 5 frames after the ball leaves the hands
    for group in shooting_groups:
        group_indices = df.index[df['motion_group'] == group].tolist()
        if group_indices:
            last_index = group_indices[-1]
            extension_indices = range(last_index + 1, last_index + 5)  # Extend by 5 frames
            valid_extension_indices = [i for i in extension_indices if i < len(df)]
            df.loc[valid_extension_indices, 'shooting_motion'] = 1

    # Drop the helper column if not needed for debugging
    df.drop(columns='motion_group', inplace=True)

    if debug:
        print("Debug: Shooting motion labeled and extended by 5 frames.")
        print("Debug: Shooting motion table columns =", df.columns)
        print("Debug: Shooting motion example =", df[['frame_time', 'shooting_motion', 'ball_in_hands']].head(15))
        
    return df



def find_release_point(df, debug=False):
    """
    Identify the release point of the shot by finding frames where elbows are above shoulder height 
    and then identifying the frame with the maximum vertical ball velocity during the shooting motion
    while the ball is still in hand.

    Parameters:
    - df: DataFrame containing motion and ball data.
    - debug: If True, prints step-by-step debug information.

    Returns:
    - df: DataFrame with 'release_point_filter' column marking the release point.
    """
    df['release_point_filter'] = 0  # Initialize with 0 for all frames

    # Make a copy of the subset to avoid modifying the original DataFrame directly
    shooting_motion_df = df[(df['shooting_motion'] == 1) & (df['ball_in_hands'] == 1)].copy()
    
    # Calculate the average shoulder height for frames within the shooting motion
    shooting_motion_df['avg_shoulder_height'] = (
        shooting_motion_df['R_SHOULDER_z'] + shooting_motion_df['L_SHOULDER_z']
    ) / 2
    
    if debug:
        print("Debug: Calculating average shoulder height for shooting frames.")
    
    # Filter frames where both elbows are above shoulder height
    elbows_above_shoulder_df = shooting_motion_df[
        (shooting_motion_df['R_ELBOW_z'] >= shooting_motion_df['avg_shoulder_height']) &
        (shooting_motion_df['L_ELBOW_z'] >= shooting_motion_df['avg_shoulder_height'])
    ]
    
    # Ensure we have enough frames after the threshold is met
    if not elbows_above_shoulder_df.empty:
        # Shift by 2 frames after the first occurrence of elbows above shoulder height
        first_above_shoulder_index = elbows_above_shoulder_df.index[0] + 3

        # Filter the subset to include frames starting from the two-frame offset
        filtered_df = shooting_motion_df.loc[first_above_shoulder_index:]
        
        # Identify the frame with maximum vertical ball velocity in the filtered set
        if not filtered_df.empty:
            max_velocity_frame = filtered_df.loc[
                filtered_df['ball_velocity_z'] == filtered_df['ball_velocity_z'].max(), 'frame_time'
            ].values
            
            if max_velocity_frame.size > 0:
                max_velocity_frame_time = max_velocity_frame[0]
                release_frame_index = filtered_df.index[filtered_df['frame_time'] == max_velocity_frame_time].tolist()
                
                if release_frame_index:
                    release_index = release_frame_index[0]
                    # Using .loc on the original df to avoid SettingWithCopyWarning
                    df.loc[release_index, 'release_point_filter'] = 1  # Mark release frame with 1
                    if debug:
                        print(f"Debug: Release frame found at index {release_index}, frame time {max_velocity_frame_time}")
                        print("Debug: Release point column updated for release frame.")
    else:
        if debug:
            print("Debug: No valid frames found for release point determination.")
    
    return df




def main_label_shot_phases(df, debug=False):
    """
    Main function to label phases of a basketball shot: identifies the shooting motion 
    and determines the release point.

    Parameters:
    - df: DataFrame containing ball and joint data.
    - debug: If True, prints debug information at each step.

    Returns:
    - df: DataFrame with labeled shot phases, including 'shooting_motion' and 'release_point_filter'.
    """
    df = label_shooting_motion(df, debug=debug)
    df = find_release_point(df, debug=debug)
    
    if debug:
        print("Debug: DataFrame head with shooting motion and release point labels:")
        print(df[['frame_time', 'shooting_motion', 'ball_in_hands', 'release_point_filter']].head(10))
    return df


# Example of how to use these functions in practice:
if __name__ == "__main__":
    import logging
    # from src.animation_dataframe_addons import animate_trial_from_df
    # from data_loading.load_and_parse import load_single_ft_and_parse
    # from data_preprocessing.dataframe_creation import main_create_dataframe
    # from velocity_and_speed_calc import calculate_ball_speed_velocity_direction
    # from shot_phase_labeling import main_label_shot_phases
    from IPython.display import display
    # from animate.animation import animate_trial_from_df

    # Default connections between joints
    connections = [
        ("R_EYE", "L_EYE"), ("R_EYE", "NOSE"), ("L_EYE", "NOSE"),
        ("R_EYE", "R_EAR"), ("L_EYE", "L_EAR"), ("R_SHOULDER", "L_SHOULDER"),
        ("R_SHOULDER", "R_ELBOW"), ("L_SHOULDER", "L_ELBOW"), ("R_ELBOW", "R_WRIST"),
        ("L_ELBOW", "L_WRIST"), ("R_SHOULDER", "R_HIP"), ("L_SHOULDER", "L_HIP"),
        ("R_HIP", "L_HIP"), ("R_HIP", "R_KNEE"), ("L_HIP", "L_KNEE"),
        ("R_KNEE", "R_ANKLE"), ("L_KNEE", "L_ANKLE"), ("R_WRIST", "R_1STFINGER"),
        ("R_WRIST", "R_5THFINGER"), ("L_WRIST", "L_1STFINGER"), ("L_WRIST", "L_5THFINGER"),
        ("R_ANKLE", "R_1STTOE"), ("R_ANKLE", "R_5THTOE"), ("L_ANKLE", "L_1STTOE"),
        ("L_ANKLE", "L_5THTOE"), ("R_ANKLE", "R_CALC"), ("L_ANKLE", "L_CALC"),
        ("R_1STTOE", "R_5THTOE"), ("L_1STTOE", "L_5THTOE"), ("R_1STTOE", "R_CALC"),
        ("L_1STTOE", "L_CALC"), ("R_5THTOE", "R_CALC"), ("L_5THTOE", "L_CALC"),
        ("R_1STFINGER", "R_5THFINGER"), ("L_1STFINGER", "L_5THFINGER")
    ]

    # Configure logging for the script
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    try:
        # Load and prepare the DataFrame (replace with actual loading code)
        data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(
            "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0088.json",
            debug=False
        )
        df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)

        # Calculate ball speed, velocity, and direction
        df = calculate_ball_speed_velocity_direction(df, debug=False)

        # Label shot phases, find release frame, and mark the release point filter
        df = main_label_shot_phases(df, debug=False)

        print("Final DataFrame after labeling:", df)
        print("Final DataFrame columns:", df.columns)

        # Display animation if a release frame is marked in the DataFrame
        if 'release_point_filter' in df.columns and df['release_point_filter'].sum() > 0:
            release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0]

            # Set parameters for visualization
            viewpoint_name = "diagonal_player_centric"  # Choose from COMMON_VIEWPOINTS
            xbuffer = 10.0  # Adjust as needed
            ybuffer = 10.0  # Adjust as needed
            zlim = 15        # Adjust for height

            # Call the first animation function
            animation_html = animate_trial_from_df(
                df=df,
                release_frame=release_frame_index,
                viewpoint_name=viewpoint_name,
                connections=connections,
                zlim=zlim,
                player_color="purple",
                player_lw=2.0,
                ball_color="#ee6730",
                ball_size=20.0,
                highlight_color="red",
                show_court=True,
                court_type="nba",
                units="ft",
                notebook_mode=True,
                debug=True  # Enable detailed logging for troubleshooting
            )

            # Display the animation
            display(animation_html)
        else:
            print("Debug: No valid release frame found.")
    except Exception as e:
        logger.error(f"An error occurred in visualization: {e}")



Overwriting ml/data_load_prepare/phase_labeling.py


In [11]:
%%writefile ml/data_load_prepare/ball_trajectory_and_release_time_stats.py

import numpy as np
import pandas as pd
import math

def calculate_dt(df, time_column='frame_time', debug=False):
    """
    Calculate the time delta ('dt') between frames and handle initial NaN values.
    
    Parameters:
    - df: DataFrame containing the time column.
    - time_column: The column name containing time values in milliseconds.
    - debug: If True, prints detailed debug information and validation checks.
    
    Returns:
    - df with an added 'dt' column, forward-filled and validated.
    """
    # Calculate 'dt' as the difference in time between consecutive frames in seconds
    df['dt'] = df[time_column].diff() / 1000.0  # Convert ms to seconds
    
    # Check for initial NaN or zero values in 'dt' and forward fill them
    if pd.isna(df['dt'].iloc[1]) or df['dt'].iloc[1] <= 0:
        df['dt'] = df['dt'].ffill()

    # Handle the first row NaN specifically by assigning a default value if it persists
    if pd.isna(df.loc[0, 'dt']):
        df.loc[0, 'dt'] = df['dt'].iloc[1:].mean()  # Use mean of subsequent values as an estimate

    # Additional validation checks
    if debug:
        # Ensure there are no null values in 'dt' after filling
        if df['dt'].isnull().any():
            print("Warning: 'dt' still contains NaN values after forward fill.")
        else:
            print("Debug: No NaN values in 'dt' after forward fill.")

        # Ensure 'dt' values are all positive
        if (df['dt'] <= 0).any():
            print("Warning: 'dt' contains non-positive values. Check the time column for consistency.")
        else:
            print("Debug: All 'dt' values are positive.")

        # Output some sample values for verification
        print("Debug: 'dt' calculated with forward fill. Sample values:", df['dt'].head())

        # Check if 'dt' values are reasonably consistent (not fluctuating unexpectedly)
        dt_std = df['dt'].std()
        dt_mean = df['dt'].mean()
        if dt_std > dt_mean * 0.1:
            print(f"Warning: 'dt' values have high variance (std={dt_std:.4f}). This could indicate inconsistencies in time intervals.")

    return df




def calculate_ball_dynamics(df, release_frame_index, debug=False):
    """
    Calculate ball speed, velocity, and direction starting from the release frame.
    """
    # Set pre-release frames to NaN for dynamics
    df.loc[:release_frame_index, ['ball_velocity_x', 'ball_velocity_y', 'ball_velocity_z', 'ball_speed', 'ball_direction_x', 'ball_direction_y', 'ball_direction_z']] = np.nan

    # Calculate 'dt' using the helper function
    df = calculate_dt(df, time_column='frame_time', debug=debug)

    # Calculate differences for dynamics
    df['dx'] = df['ball_x'].diff()
    df['dy'] = df['ball_y'].diff()
    df['dz'] = df['ball_z'].diff()

    # Calculate velocities and speed
    df['ball_velocity_x'] = df['dx'] / df['dt']
    df['ball_velocity_y'] = df['dy'] / df['dt']
    df['ball_velocity_z'] = df['dz'] / df['dt']
    df['ball_speed'] = np.sqrt(df['ball_velocity_x']**2 + df['ball_velocity_y']**2 + df['ball_velocity_z']**2)
    df['ball_direction_x'] = df['ball_velocity_x'] / df['ball_speed']
    df['ball_direction_y'] = df['ball_velocity_y'] / df['ball_speed']
    df['ball_direction_z'] = df['ball_velocity_z'] / df['ball_speed']
    df[['ball_direction_x', 'ball_direction_y', 'ball_direction_z']] = df[['ball_direction_x', 'ball_direction_y', 'ball_direction_z']].fillna(0)

    return df



def extract_release_features(df, release_frame_index, debug=False):
    """
    Extracts key features at the release frame for machine learning and visualization.
    """
    if release_frame_index is None:
        if debug:
            print("Debug: No release frame found.")
        return pd.DataFrame()  # Return empty DataFrame if no release frame

    release_row = df.iloc[release_frame_index]

    release_features = {
        'release_ball_speed': release_row['ball_speed'],
        'release_ball_velocity_x': release_row['ball_velocity_x'],
        'release_ball_velocity_y': release_row['ball_velocity_y'],
        'release_ball_velocity_z': release_row['ball_velocity_z'],
        'release_ball_direction_x': release_row['ball_direction_x'],
        'release_ball_direction_y': release_row['ball_direction_y'],
        'release_ball_direction_z': release_row['ball_direction_z'],
        'release_ball_x': release_row['ball_x'],
        'release_ball_y': release_row['ball_y'],
        'release_ball_z': release_row['ball_z'],
        'release_frame_time': release_row['frame_time']
    }

    # Calculate release angle
    horizontal_velocity = np.hypot(release_features['release_ball_velocity_x'], release_features['release_ball_velocity_y'])
    release_features['release_angle'] = math.degrees(math.atan2(release_features['release_ball_velocity_z'], horizontal_velocity))

    # Calculate time to peak height
    g = -9.81  # Gravity in METERS
    release_features['time_to_peak'] = -release_features['release_ball_velocity_z'] / g

    # Calculate peak height relative to release height
    peak_height = release_features['release_ball_z'] + (release_features['release_ball_velocity_z'] ** 2) / (2 * -g)
    release_features['peak_height_relative'] = peak_height - release_features['release_ball_z']

    return pd.DataFrame([release_features])  # Return as a single-row DataFrame



def project_ball_trajectory(df, release_index, debug=False):
    """
    Project the ball's trajectory based on release dynamics.
    """
    g = -9.81  # Gravity in METERS
    release_row = df.iloc[release_index]

    vx = release_row['ball_velocity_x']
    vy = release_row['ball_velocity_y']
    vz = release_row['ball_velocity_z']
    x0, y0, z0 = release_row['ball_x'], release_row['ball_y'], release_row['ball_z']

    projection_time = np.linspace(0, 2, num=200)
    dt = projection_time[1] - projection_time[0]

    x_proj = x0 + vx * projection_time
    y_proj = y0 + vy * projection_time
    z_proj = z0 + vz * projection_time + 0.5 * g * projection_time**2

    valid_indices = z_proj >= 0  # Keep trajectory points above ground level
    x_proj = x_proj[valid_indices]
    y_proj = y_proj[valid_indices]
    z_proj = z_proj[valid_indices]
    proj_time = projection_time[valid_indices]

    if debug:
        print("Debug: Projected ball trajectory up to impact point.")

    return proj_time, x_proj, y_proj, z_proj


def main_ball_trajectory_analysis(df, release_frame_index, debug=False):
    """
    Main analysis function to calculate dynamics and extract release features.
    """
    if release_frame_index is None:
        if debug:
            print("Debug: No release frame found.")
        return df, None, None, None  # Return placeholders for consistency

    df = calculate_ball_dynamics(df, release_frame_index, debug=debug)

    if debug:
        print("Debug: Release frame index:", release_frame_index)
        print("Debug: Frame data at release:\n", df.loc[release_frame_index])

    # Extract release features for ML
    ml_metrics_df = extract_release_features(df, release_frame_index, debug=debug)
    proj_time, x_proj, y_proj, z_proj = project_ball_trajectory(df, release_frame_index, debug=debug)

    # Projection results for visualization
    projection_df = pd.DataFrame({
        'projection_time': proj_time,
        'projected_x': x_proj,
        'projected_y': y_proj,
        'projected_z': z_proj
    })

    return df, release_frame_index, projection_df, ml_metrics_df


# Example usage
if __name__ == "__main__":
    # Placeholder functions for data loading and creation
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(
        "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json"
    )
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    df = calculate_ball_speed_velocity_direction(df, debug=False)
    df = main_label_shot_phases(df, debug=False)
    release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0]

    df, release_index, projection_df, ml_metrics_df = main_ball_trajectory_analysis(df, release_frame_index, debug=True)
    print("Final DataFrame with release features and trajectory data:\n", df)
    print("ML Metrics DataFrame for model input:\n", ml_metrics_df)


Overwriting ml/data_load_prepare/ball_trajectory_and_release_time_stats.py


### How Joint Power (Speed) is Calculated

In the context of this code, **joint power** is calculated as the speed of a joint’s movement from one frame to the next, assuming constant mass. This simplifies the concept of power in biomechanics to mean the **rate of movement** or **speed** at each time point. Below is a detailed breakdown of how this is done:

1. **Coordinate Differences**:
   - Each joint’s position in 3D space is represented by its x, y, and z coordinates (`joint_x`, `joint_y`, `joint_z`) in each frame.
   - The **difference** in position for each coordinate from one frame to the next is calculated using the `.diff()` method on each coordinate column. This method provides the change in position for each coordinate.

2. **Euclidean Distance Calculation**:
   - Using the differences in x, y, and z coordinates (`dx`, `dy`, `dz`), we calculate the **Euclidean distance** moved by the joint between frames.
   - This distance is given by the formula:
     \[
     \text{distance} = \sqrt{(\Delta x)^2 + (\Delta y)^2 + (\Delta z)^2}
     \]
   - This distance represents the **displacement** of the joint in 3D space from one frame to the next.

3. **Time Difference (dt)**:
   - The time difference between frames is stored in the `dt` column, calculated as the difference in `frame_time` divided by 1000 to convert milliseconds to seconds.
   - The `dt` value provides the **time interval** over which the displacement occurred.

4. **Speed Calculation**:
   - The speed, or **rate of displacement** (which we refer to as power in this context), is calculated by dividing the Euclidean distance by the time interval `dt`:
     \[
     \text{speed} = \frac{\text{distance}}{\text{dt}}
     \]
   - This calculation provides the **speed of the joint in each frame**, representing how quickly the joint is moving from one position to the next.

5. **Per-frame Joint Power Column**:
   - For each joint, the calculated speed for each frame is stored in a new column named `{joint}_ongoing_power`, where `{joint}` represents the specific joint (e.g., `L_ANKLE_ongoing_power`).
   - This column contains the joint’s movement speed at each time step, which we can interpret as the ongoing power for that joint throughout the movement sequence.

### Example Calculation for a Single Joint (e.g., L_ANKLE)

Consider the `L_ANKLE` joint as an example. The steps for calculating the ongoing power (speed) for `L_ANKLE` in a specific frame might look like this:

1. **Calculate Differences**:
   - Compute the change in `L_ANKLE_x`, `L_ANKLE_y`, and `L_ANKLE_z` positions from the previous frame.
     \[
     \Delta x = \text{current } L\_ANKLE\_x - \text{previous } L\_ANKLE\_x
     \]
     \[
     \Delta y = \text{current } L\_ANKLE\_y - \text{previous } L\_ANKLE\_y
     \]
     \[
     \Delta z = \text{current } L\_ANKLE\_z - \text{previous } L\_ANKLE\_z
     \]

2. **Euclidean Distance (Displacement)**:
   - Using the differences, calculate the displacement:
     \[
     \text{distance} = \sqrt{(\Delta x)^2 + (\Delta y)^2 + (\Delta z)^2}
     \]

3. **Calculate Speed**:
   - Divide the calculated distance by the time difference `dt` for that frame:
     \[
     \text{L\_ANKLE\_ongoing\_power} = \frac{\text{distance}}{\text{dt}}
     \]

4. **Store in DataFrame**:
   - The value is then stored in the column `L_ANKLE_ongoing_power` for that specific frame.

### Joint Power Metrics Calculation (During Shooting Motion Only)

The code calculates metrics such as **minimum, maximum, mean, and standard deviation** of each joint’s ongoing power, but only for frames during the **shooting motion phase** (`shooting_motion == 1`):

- For each joint, metrics are calculated based on the values in the `{joint}_ongoing_power` column where `shooting_motion == 1`, resulting in metrics that specifically capture the joint’s movement characteristics during the shooting phase.
  
These metrics are then stored in `joint_power_metrics_df`, providing summary statistics for each joint’s movement during the critical shooting phase.


In [12]:
%%writefile ml/data_load_prepare/joint_power_calc.py
import numpy as np
import pandas as pd

def calculate_joint_speed(df, joint):
    """Calculate the speed for a specified joint over the entire DataFrame."""
    speed = np.sqrt(
        (df[f'{joint}_x'].diff() ** 2) +
        (df[f'{joint}_y'].diff() ** 2) +
        (df[f'{joint}_z'].diff() ** 2)
    ) / df['dt']
    return speed

def add_ongoing_power_columns(df, joints):
    """
    Adds ongoing power (speed) columns for each joint in the main DataFrame.
    """
    for joint in joints:
        if {f'{joint}_x', f'{joint}_y', f'{joint}_z'}.issubset(df.columns):
            df[f'{joint}_ongoing_power'] = calculate_joint_speed(df, joint)
        else:
            print(f"Warning: Missing coordinates for joint '{joint}'.")
    return df

def calculate_joint_power_metrics(df, joints, debug=False):
    """
    Calculate overall metrics for each joint's ongoing power during shooting motion.
    """
    joint_power_metrics = {}
    # Filter the DataFrame to include only shooting motion frames
    shooting_motion_df = df[df['shooting_motion'] == 1]
    
    for joint in joints:
        power_column = f'{joint}_ongoing_power'
        if power_column in df.columns:
            joint_power = shooting_motion_df[power_column]
            # Calculate metrics only during shooting motion
            joint_power_metrics.update({
                f'{joint}_min_power': joint_power.min(),
                f'{joint}_max_power': joint_power.max(),
                f'{joint}_avg_power': joint_power.mean(),
                f'{joint}_std_power': joint_power.std(),
            })
            if debug:
                print(f"Debug: Calculated metrics for {joint}: {joint_power_metrics}")
        else:
            print(f"Warning: {power_column} not found in DataFrame.")
    return pd.DataFrame([joint_power_metrics])

def main_calculate_joint_power(df, release_frame_index, debug=False):
    """
    Main function to calculate ongoing joint power and overall metrics.
    """
    joints = [
        'L_ANKLE', 'R_ANKLE', 'L_KNEE', 'R_KNEE', 'L_HIP', 'R_HIP',
        'L_ELBOW', 'R_ELBOW', 'L_WRIST', 'R_WRIST',
        'L_1STFINGER', 'L_5THFINGER', 'R_1STFINGER', 'R_5THFINGER'
    ]
    if debug:
        print("Debug: Starting joint power calculations...")
    
    # Step 1: Add ongoing power columns directly to df
    df = add_ongoing_power_columns(df, joints)
    
    # Step 2: Calculate joint power metrics during shooting motion
    joint_power_metrics_df = calculate_joint_power_metrics(df, joints, debug=debug)
    
    if debug:
        print("Debug: Joint power calculations completed.")
    
    # Return df with ongoing power columns and the metrics DataFrame
    return df, joint_power_metrics_df

if __name__ == "__main__":
    # Load and process data
    print("Debug: Loading and parsing file: ../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json")
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(
        "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json"
    )
    debug = True
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    
    df = calculate_ball_speed_velocity_direction(df, debug=False)
    df = main_label_shot_phases(df)
    release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0]
    
    df, release_index, projection_df, ml_metrics_df = main_ball_trajectory_analysis(df, release_frame_index, debug=True)
    print("Final DataFrame with release features and trajectory data:\n", df)
    print("columns = ", df.columns)
    
    joints = [
        'L_ANKLE', 'R_ANKLE', 'L_KNEE', 'R_KNEE', 'L_HIP', 'R_HIP',
        'L_ELBOW', 'R_ELBOW', 'L_WRIST', 'R_WRIST',
        'L_1STFINGER', 'L_5THFINGER', 'R_1STFINGER', 'R_5THFINGER'
    ]

    # Step 1: Calculate joint power and metrics
    df, joint_power_metrics_df = main_calculate_joint_power(df, release_frame_index, debug=debug)
    
    # Now you have:
    # - df: Main DataFrame with added per-frame joint power columns
    # - joint_power_metrics_df: DataFrame with joint power metrics calculated during shooting motion
    
    print("DataFrame with ongoing joint power columns added:\n", df.head())
    print("Joint Power Metrics DataFrame (during shooting motion):\n", joint_power_metrics_df)



Overwriting ml/data_load_prepare/joint_power_calc.py


In [13]:
%%writefile ml/data_load_prepare/joint_angles_details.py
import numpy as np
import pandas as pd

def calculate_joint_angle(joint_a, joint_b, joint_c):
    """
    Calculate the angle between three points (joint_a, joint_b, joint_c).
    """
    vector_ab = joint_b - joint_a
    vector_bc = joint_c - joint_b
    dot_product = np.dot(vector_ab, vector_bc)
    mag_ab = np.linalg.norm(vector_ab)
    mag_bc = np.linalg.norm(vector_bc)

    if mag_ab == 0 or mag_bc == 0:
        return 0.0

    angle = np.arccos(dot_product / (mag_ab * mag_bc))
    return np.degrees(angle)

def calculate_joint_angles_over_motion(df, release_frame_index, side='R', debug=False):
    """
    Calculate joint angles over motion, allowing selection of side ('L' for left, 'R' for right).

    Parameters:
    - df: DataFrame containing motion data.
    - release_frame_index: Index for the release point.
    - side: 'L' for left side, 'R' for right side.
    - debug: If True, prints debug information.

    Returns:
    - df: Updated DataFrame with joint angle columns.
    - joint_angle_metrics_df: DataFrame with max and release angles for the specified joints.
    """
    # Set up joint combinations based on the selected side
    joint_combinations = {
        'elbow': [f'{side}_SHOULDER', f'{side}_ELBOW', f'{side}_WRIST'],
        'wrist': [f'{side}_ELBOW', f'{side}_WRIST', f'{side}_1STFINGER'],
        'knee': [f'{side}_HIP', f'{side}_KNEE', f'{side}_ANKLE']
    }

    # Step 1: Calculate angles for all rows in the DataFrame
    for joint_name, (a, b, c) in joint_combinations.items():
        df[f'{joint_name}_angle'] = df.apply(
            lambda row: calculate_joint_angle(
                row[[f"{a}_x", f"{a}_y", f"{a}_z"]].values,
                row[[f"{b}_x", f"{b}_y", f"{b}_z"]].values,
                row[[f"{c}_x", f"{c}_y", f"{c}_z"]].values
            ), axis=1
        )
        if debug:
            print(f"Debug: Calculated {side} {joint_name} angles across all rows:\n", df[[f'{joint_name}_angle']].head())

    # Step 2: Filter the DataFrame to get only the shooting motion rows
    shooting_motion_df = df[df['shooting_motion'] == 1]

    # Step 3: Initialize dictionary for max and release angles with single-row format
    joint_angle_metrics = {}

    # Step 4: Calculate max and release angles for each joint within the shooting motion
    for joint_name in joint_combinations.keys():
        max_angle = shooting_motion_df[f'{joint_name}_angle'].max()
        release_angle = (
            shooting_motion_df.at[release_frame_index, f'{joint_name}_angle']
            if release_frame_index in shooting_motion_df.index else np.nan
        )

        # Store these as single-row format columns
        joint_angle_metrics[f'{joint_name}_max_angle'] = max_angle
        joint_angle_metrics[f'{joint_name}_release_angle'] = release_angle

        if debug:
            print(f"Debug: {side} {joint_name} max_angle: {max_angle}, release_angle: {release_angle}")

    # Step 5: Convert metrics dictionary to a single-row DataFrame for output
    joint_angle_metrics_df = pd.DataFrame([joint_angle_metrics])

    # Final debug to check both DataFrames
    if debug:
        print("Debug: Main DataFrame with ongoing joint angles:\n", df.head())
        print("Debug: Joint Angle Metrics DataFrame (single row with max and release angles during shooting motion):\n", joint_angle_metrics_df)

    return df, joint_angle_metrics_df


if __name__ == "__main__":
    # Load and process data
    print("Debug: Loading and parsing file: ../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json")
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(
        "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json"
    )
    debug = True
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    
    
    df = calculate_ball_speed_velocity_direction(df, debug=False)
    df = main_label_shot_phases(df)
    release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0]
    
    df, release_index, projection_df, ml_metrics_df = main_ball_trajectory_analysis(df, 
                                                                                    release_frame_index,
                                                                                    debug=False)
    print("Final DataFrame with release features and trajectory data:\n", df)
    print("columns = ", df.columns)
    
    joints = [
        'L_ANKLE', 'R_ANKLE', 'L_KNEE', 'R_KNEE', 'L_HIP', 'R_HIP',
        'L_ELBOW', 'R_ELBOW', 'L_WRIST', 'R_WRIST',
        'L_1STFINGER', 'L_5THFINGER', 'R_1STFINGER', 'R_5THFINGER'
    ]

    # Step 1: Calculate joint power and metrics
    df, joint_power_metrics_df = main_calculate_joint_power(df, release_frame_index, debug=False)
    
    # Now you have:
    # - df: Main DataFrame with added per-frame joint power columns
    # - joint_power_metrics_df: DataFrame with joint power metrics calculated during shooting motion
    
    print("DataFrame with ongoing joint power columns added:\n", df.head())
    print("Joint Power Metrics DataFrame (during shooting motion):\n", joint_power_metrics_df)

    # Calculate Joint Angles Across All Rows and Get Metrics for Shooting Motion
    df, joint_angle_metrics_df = calculate_joint_angles_over_motion(df, release_frame_index, side='R', debug=True)
    print("Final DataFrame with ongoing joint angles across all rows:\n", df.head())
    print("Joint Angle Metrics DataFrame (max and release angles during shooting motion):\n", joint_angle_metrics_df)

    # metrics df's for machine learning dataset and base dataset
    # - joint_power_metrics_df: DataFrame with joint power metrics calculated during shooting motion
    # - joint_angle_metrics_df (max and release angles during shooting motion)
    # - shot_details_df = df[['result', 'landing_x', 'landing_y', 'entry_angle']].drop_duplicates()
    # ^ shot_details_df are the base metrics from the dataset and the y variable = result

Overwriting ml/data_load_prepare/joint_angles_details.py


In [14]:
%%writefile ml/data_load_prepare/key_feature_extraction.py
import pandas as pd
import numpy as np
import json


def estimate_basketball_stats(height_m):
    """
    Estimate basketball-specific stats based on height.

    Args:
        height_m (float): Height of the player in meters.
    
    Returns:
        dict: Dictionary containing estimated basketball metrics.
    """
    # Convert height to centimeters for better readability in basketball context
    height_cm = height_m * 100
    
    # Estimate wingspan (average is roughly 1.06x height)
    wingspan_cm = height_cm * 1.06
    
    # Estimate standing reach (average for males is height in cm * 0.92 + 50 cm)
    standing_reach_cm = height_cm * 0.92 + 50
    
    # Estimate hand size (average male hand length is ~0.106x height in cm)
    hand_length_cm = height_cm * 0.106
    
    return {
        "player_estimated_wingspan_cm": round(wingspan_cm, 2),
        "player_estimated_standing_reach_cm": round(standing_reach_cm, 2),
        "player_estimated_hand_length_cm": round(hand_length_cm, 2)
    }

def load_player_info(json_path):
    """
    Load participant information from JSON file and add 'player_' prefix to each metric.
    Additionally, add a 'dominant_hand' key with the value 'R' to indicate right-handedness
    and calculate additional basketball-specific metrics based on height.

    Args:
        json_path (str): Path to the JSON file with player information.
    
    Returns:
        player_info (dict): Dictionary containing player-specific data with 'player_' prefix for each key.
    """
    with open(json_path, 'r') as f:
        player_info = json.load(f)
    
    # Add 'player_' prefix to each key
    player_info = {f"player_{key}": value for key, value in player_info.items()}
    
    # Add dominant hand key
    player_info["player_dominant_hand"] = "R"  # Setting dominant hand to 'R' (Right)
    
    # Estimate basketball-specific stats and add to player_info
    height_m = player_info.get("player_height_in_meters")
    if height_m:
        basketball_stats = estimate_basketball_stats(height_m)
        player_info.update(basketball_stats)
    
    return player_info



def get_column_definitions():
    """
    Define column descriptions for the dataset, including granular free throw data, ML features,
    energy metrics, exhaustion scores, and additional statistics.

    Returns:
        column_definitions (dict): Dictionary where keys are column names and values are descriptions.
    """
    column_definitions = {
        # Basic trial metadata
        'trial_id': "Unique identifier for each trial, formatted as 'Txxxx'.",
        'shot_id': "Sequential shot ID for organizing shots within a trial.",
        'result': "Binary indicator of shot outcome: 1 if made, 0 if missed.",
        
        # Free throw shot landing and entry characteristics
        'landing_x': "X coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.",
        'landing_y': "Y coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.",
        'entry_angle': "Angle at which the ball enters the hoop plane, measured in degrees to indicate entry precision.",

        # Frame and timing information
        'frame_time': "Timestamp in milliseconds for each frame, relative to trial start.",
        'dt': "Time delta between consecutive frames in seconds, used to calculate velocities and accelerations.",
        'by_trial_time': "Time within a specific trial, calculated as time elapsed from the trial start.",
        'continuous_frame_time': "Cumulative time across all trials, accounting for the trial offsets.",

        # Ball position and dynamics
        'ball_x': "X coordinate of the ball's position on the court, representing lateral position in feet, based on the center of the court.",
        'ball_y': "Y coordinate of the ball's position on the court, representing forward/backward position in feet from the center of the court.",
        'ball_z': "Z coordinate of the ball's position, representing height in feet off the court (vertical position).",
        'ball_speed': "Overall speed of the ball, derived from the 3D velocities (Pythagorean theorem of x, y, z velocities).",
        'overall_ball_velocity': "Magnitude of the ball's velocity vector, indicating its total speed at any moment.",

        # Exhaustion and energy metrics
        'joint_energy': "Energy expended by a specific joint in a single frame, calculated as power multiplied by time delta.",
        'joint_energy_by_trial': "Cumulative energy expended by a specific joint within a trial.",
        'joint_energy_by_trial_exhaustion_score': "Normalized exhaustion score for a specific joint within a trial, scaled to the maximum trial energy.",
        'joint_energy_overall_cumulative': "Cumulative energy expended by a specific joint across all trials.",
        'joint_energy_overall_exhaustion_score': "Normalized exhaustion score for a specific joint across all trials, scaled to the maximum overall energy.",

        'total_energy': "Total energy expended by all joints in a single frame, calculated as the sum of joint energies.",
        'by_trial_energy': "Cumulative energy expended during a specific trial, calculated as the sum of total energy across frames.",
        'by_trial_exhaustion_score': "Exhaustion score normalized within each trial, calculated as cumulative energy divided by maximum energy in the trial.",
        'overall_cumulative_energy': "Cumulative energy expended across all trials, calculated as a running sum of total energy across frames.",
        'overall_exhaustion_score': "Exhaustion score normalized across all trials, calculated as cumulative energy divided by the maximum cumulative energy.",

        # Joint-specific energy and exhaustion metrics
        'L_ANKLE_energy_by_trial': "Energy expended by the left ankle during a specific trial, calculated as power multiplied by time delta.",
        'L_ANKLE_energy_by_trial_exhaustion_score': "Exhaustion score for the left ankle energy in a trial, normalized to the maximum trial energy.",
        'L_ANKLE_energy_overall_cumulative': "Cumulative energy expended by the left ankle across all trials.",
        'L_ANKLE_energy_overall_exhaustion_score': "Exhaustion score for the left ankle energy across all trials, normalized to the maximum cumulative energy.",
        # Similarly for R_ANKLE, L_KNEE, R_KNEE, L_HIP, R_HIP, L_ELBOW, R_ELBOW, L_WRIST, R_WRIST

        # Joint angles and release metrics
        'elbow_angle': "Angle at the elbow joint during motion, useful for tracking shooting form.",
        'wrist_angle': "Angle at the wrist joint, essential for understanding release dynamics.",
        'knee_angle': "Angle at the knee joint, analyzed to determine player’s stance and motion.",
        'max_elbow_angle': "Maximum angle achieved by the elbow during a trial, indicating the range of motion.",
        'max_wrist_angle': "Maximum angle achieved by the wrist during a trial, indicating the peak of wrist flexion or extension.",
        'max_knee_angle': "Maximum angle achieved by the knee during a trial, indicating the peak of knee flexion or extension.",
        'release_elbow_angle_filtered_optimal_min': "Minimum filtered optimal elbow angle at the release point during shooting motion.",
        'release_knee_angle_initial_optimal_max': "Maximum initial optimal knee angle during shooting motion at the release point.",
        'release_wrist_angle_initial_optimal_min': "Minimum initial optimal wrist angle during shooting motion at the release point.",

        # Power metrics
        'L_ANKLE_ongoing_power': "Calculated power (velocity) for the left ankle during motion.",
        'L_ELBOW_ongoing_power': "Calculated power (velocity) for the left elbow during motion.",
        'R_WRIST_ongoing_power': "Calculated power (velocity) for the right wrist during motion.",
        'L_1STFINGER_ongoing_power': "Calculated power (velocity) for the left first finger during motion.",
        'R_5THFINGER_ongoing_power': "Calculated power (velocity) for the right fifth finger during motion.",
        # Similarly for other joints

        # Shot classification and metrics
        'release_wrist_shot_classification': "Categorical classification of the shot based on wrist angle at the release point.",
        'max_knee_shot_classification': "Categorical classification of the shot based on the maximum knee angle during the trial.",
        'initial_release_angle': "Initial calculated release angle of the ball at the start of the shooting motion.",
        'optimal_release_angle': "Optimal release angle derived from shooting mechanics and trajectory analysis.",
        'calculated_release_angle': "Calculated release angle based on ball velocities and directions.",

        # Distances
        'dist_ball_R_1STFINGER': "Distance between the ball and the right first finger at each frame.",
        'dist_ball_L_5THFINGER': "Distance between the ball and the left fifth finger at each frame.",

        # Miscellaneous
        'avg_shoulder_height': "Average height of the shoulders during the trial, calculated as the mean of left and right shoulder heights.",
        'shooting_motion': "Binary indicator of whether the player is in a shooting motion (1) or not (0).",
        'release_point_filter': "Boolean filter indicating whether the frame meets release point criteria.",
        'initial_release_angle': "the release angle at the frame of the release_point_filter=1",
        'calculated_release_angle': "the average release angle over the 3 frames after the release_point_filter=1",
        'distance_to_basket': "the distance to the basket from the player, input manually by user. Can be calculated through YOLO",
        'optimal_release_angle': 'optimal_release_angle is calculated from the reference table from the 2005 study on optimal angle givent he release height, ball velocity, and such' ,
    }

    return column_definitions



def main_prepare_ml_data(df, joint_power_metrics_df, joint_angle_metrics_df, ml_metrics_df, player_info, debug=False):
    """
    Main function to prepare a single DataFrame for machine learning.

    Args:
        df: DataFrame containing the original data with shot details.
        joint_power_metrics_df: DataFrame with joint power metrics during the shooting motion.
        joint_angle_metrics_df: DataFrame with max and release angles during shooting motion.
        ml_metrics_df: DataFrame containing metrics specific to release dynamics and peak trajectory.
        player_info (dict): Dictionary containing player information (e.g., height, weight).
    
    Returns:
        key_features_dataframe: the final features of this free throw.
    """
    
    # Step 1: Extract essential shot details, including target variables and unique features
    shot_details_df = df[['result', 'landing_x', 'landing_y', 'entry_angle']].drop_duplicates().reset_index(drop=True)
    
    if debug:
        print("Debug: Shot details DataFrame:\n", shot_details_df)

    # Step 2: Combine all metrics and player info into a single row DataFrame for machine learning
    key_features_dataframe = pd.concat(
        [shot_details_df, joint_power_metrics_df, joint_angle_metrics_df, ml_metrics_df],
        axis=1
    )
    
    # Add player information as additional columns
    for key, value in player_info.items():
        key_features_dataframe[key] = value
    
    if debug:
        print("Debug: Combined key features DataFrame with player information for ML:\n", key_features_dataframe)

    return key_features_dataframe

if __name__ == "__main__":
    # Load and process data
    player_info_path = "../../SPL-Open-Data/basketball/freethrow/participant_information.json"
    player_info = load_player_info(player_info_path)
    
    print("Debug: Loaded player info:\n", player_info)
    
    data_path = "../../SPL-Open-Data/basketball/freethrow/data/P0001/BB_FT_P0001_T0001.json"
    print(f"Debug: Loading and parsing file: {data_path}")
    
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(data_path)
    debug = True
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=False)
    
    df = calculate_ball_speed_velocity_direction(df, debug=False)
    df = main_label_shot_phases(df)
    release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0]
    
    df, release_index, projection_df, ml_metrics_df = main_ball_trajectory_analysis(df, release_frame_index, debug=True)
    print("Final DataFrame with release features and trajectory data:\n", df)
    print("ML Metrics DataFrame for model input:\n", ml_metrics_df)

    joints = [
        'L_ANKLE', 'R_ANKLE', 'L_KNEE', 'R_KNEE', 'L_HIP', 'R_HIP',
        'L_ELBOW', 'R_ELBOW', 'L_WRIST', 'R_WRIST',
        'L_1STFINGER', 'L_5THFINGER', 'R_1STFINGER', 'R_5THFINGER'
    ]

    # Step 1: Calculate joint power and metrics
    df, joint_power_metrics_df = main_calculate_joint_power(df, release_frame_index, debug=debug)
    
    # Now you have:
    # - df: Main DataFrame with added per-frame joint power columns
    # - joint_power_metrics_df: DataFrame with joint power metrics calculated during shooting motion
    
    print("DataFrame with ongoing joint power columns added:\n", df.head())
    print("Joint Power Metrics DataFrame (during shooting motion):\n", joint_power_metrics_df)

    # Calculate Joint Angles Across All Rows and Get Metrics for Shooting Motion
    df, joint_angle_metrics_df = calculate_joint_angles_over_motion(df, release_frame_index, side='R', debug=True)
    print("Final DataFrame with ongoing joint angles across all rows:\n", df.head())
    print("Joint Angle Metrics DataFrame (max and release angles during shooting motion):\n", joint_angle_metrics_df)

    # metrics df's for machine learning dataset and base dataset
    # - joint_power_metrics_df: DataFrame with joint power metrics calculated during shooting motion
    # - joint_angle_metrics_df (max and release angles during shooting motion)
    # - shot_details_df = df[['result', 'landing_x', 'landing_y', 'entry_angle']].drop_duplicates()
    # ^ shot_details_df are the base metrics from the dataset and the y variable = result

    # Prepare the ML dataset
    key_features_dataframe = main_prepare_ml_data(df, joint_power_metrics_df, joint_angle_metrics_df, ml_metrics_df, player_info, debug=True)
    print("Final Key Features DataFrame for ML:\n", key_features_dataframe)
    print("Final Key Features DataFrame Columns for ML:\n", key_features_dataframe.columns)

    #Final Two output tables: df (granular free throw data) and key_features_dataframe for the ML dataset
    
    # Column Definitions
    column_definitions = get_column_definitions()
    print("Column Definitions for the Dataset:\n", column_definitions)

Overwriting ml/data_load_prepare/key_feature_extraction.py


In [21]:
%%writefile ml/data_load_prepare/main_load_and_prepare.py

import os
import pandas as pd
import json
from ml.data_load_prepare.load_and_parse import load_single_ft_and_parse
from ml.data_load_prepare.dataframe_creation import main_create_dataframe
from ml.data_load_prepare.velocity_and_speed_calc import calculate_ball_speed_velocity_direction
from ml.data_load_prepare.phase_labeling import main_label_shot_phases
from ml.data_load_prepare.ball_trajectory_and_release_time_stats import main_ball_trajectory_analysis
from ml.data_load_prepare.joint_power_calc import main_calculate_joint_power
from ml.data_load_prepare.joint_angles_details import  calculate_joint_angles_over_motion
from ml.data_load_prepare.key_feature_extraction import main_prepare_ml_data, load_player_info, get_column_definitions

def process_file(file_path, shot_id, player_info, debug=False):
    # Load and parse JSON data
    data, trial_id, result, landing_x, landing_y, entry_angle, _ = load_single_ft_and_parse(file_path)
    if data is None:
        if debug:
            print(f"Debug: Skipping file {file_path} due to parsing error.")
        return pd.DataFrame(), pd.DataFrame()

    # Step 1: Create initial DataFrame from parsed data
    df = main_create_dataframe(data, trial_id, result, landing_x, landing_y, entry_angle, debug=debug)
    df = calculate_ball_speed_velocity_direction(df, debug=debug)
    df = main_label_shot_phases(df)

    # Step 2: Identify release frame
    release_frame_index = df.index[df['release_point_filter'] == 1].tolist()[0] if df['release_point_filter'].sum() > 0 else None
    if release_frame_index is None:
        if debug:
            print(f"Debug: No release frame found for file {file_path}.")
        return pd.DataFrame(), pd.DataFrame()

    # Step 3: Calculate ball dynamics and release metrics first (for `dt` and velocity/direction data)
    df, _, _, ml_metrics_df = main_ball_trajectory_analysis(df, release_frame_index, debug=debug)

    # Step 4: Calculate joint power and joint angle metrics
    df, joint_power_metrics_df = main_calculate_joint_power(df, release_frame_index, debug=debug)
    df, joint_angle_metrics_df = calculate_joint_angles_over_motion(df, release_frame_index, debug=debug)

    # Step 5: Prepare ML features combining shot details, joint power, angles, release metrics, and player info
    key_features_dataframe = main_prepare_ml_data(df, joint_power_metrics_df, joint_angle_metrics_df, ml_metrics_df, player_info, debug=debug)
    key_features_dataframe['trial_id'] = trial_id
    key_features_dataframe['shot_id'] = shot_id

    return df, key_features_dataframe


def bulk_process_directory(directory_path, player_info_path, debug=False):
    # Load player information
    player_info = load_player_info(player_info_path)
    if debug:
        print("Debug: Loaded player info:\n", player_info)

    all_granular_data = []
    all_features = []
    shot_id = 1
    
    for filename in os.listdir(directory_path):
        if filename.endswith(".json"):
            file_path = os.path.join(directory_path, filename)
            if debug:
                print(f"Debug: Processing file: {file_path}")

            granular_df, trial_features_df = process_file(file_path, shot_id, player_info, debug=debug)
            shot_id += 1
            
            if not granular_df.empty and not trial_features_df.empty:
                all_granular_data.append(granular_df)
                all_features.append(trial_features_df)

    final_granular_df = pd.concat(all_granular_data, ignore_index=True) if all_granular_data else pd.DataFrame()
    final_ml_df = pd.concat(all_features, ignore_index=True) if all_features else pd.DataFrame()
    
    if debug:
        print("Debug: Final granular DataFrame created with shape:", final_granular_df.shape)
        print("Debug: Final ML DataFrame created with shape:", final_ml_df.shape)
    
    return final_granular_df, final_ml_df

if __name__ == "__main__":
    directory_path = "../../../../SPL-Open-Data/basketball/freethrow/data/P0001"
    player_info_path = "../../../../SPL-Open-Data/basketball/freethrow/participant_information.json"

    # Run the bulk processing with player info integration
    final_granular_df, final_ml_df = bulk_process_directory(directory_path, player_info_path, debug=False)
    
    if not final_granular_df.empty and not final_ml_df.empty:
        print("Debug: Granular free throw data for all trials:")
        print(final_granular_df.head())
        print("Debug: ML Features DataFrame for all trials with player info:")
        print(final_ml_df.head())

        # Column Definitions
        column_definitions = get_column_definitions()
        print("Column Definitions for the Dataset:\n", column_definitions)


Overwriting ml/data_load_prepare/main_load_and_prepare.py


# Feature Engineering/ Tool Making


Overall look:

- maximum release point:
  - the maximum kinetic energy is being channeled into the shot from knees, elbows, wrists, jump height (if jump shot or if set shot just the first bit). Check the study to find the most optimal point of kinetic energy
- Exhaustion Levels throughout training
- arc_category: compare to the top vs the bottom max height of the ball to split up arc 


### Once I can exact the release point down past my current filters, too variable so using max for shot meter
current filters: at max ball_velocity_y, elbows above shoulders, add to frames past the elbows above the shoulders
- optimal release point
  - finding the optimal angles and power at release point during made shots to get comfortable range
- optimal shooting motion
  - finding the optimal angles and power at release point during made shots to get comfortable range AT DIFFERENT STAGES throughout shooting motion (maybe shoulder-to-release, release, post release?)

### goal: Optimal release angle shooting 

https://livingstones.thetreeoflife.us/ShootingAngle.pdf

Table 1
Dr. Frank Lin's Optimal Arc Angles for Shooting the Basketball
Shot Shot Distance
Height 9’ 11’ 14’ 17’ 20’ 24’
3’ 6” Shot Angle (Deg.) 62.92 60.29 57.45 55.46 54.00 52.58
Shot Speed (ft/s) 23.79 24.90 26.56 28.18 29.75 31.76
Flight Time (sec.) .83 .89 .98 1.06 1.14 1.24
4’ Shot Angle (Deg.) 61.85 59.31 56.60 54.72 53.35 52.02
Shot Speed (ft/s) 23.25 24.41 26.13 27.79 29.40 31.44
Flight Time (sec.) .82 .88 .97 1.06 1.14 1.24
4’ 6” Shot Angle (Deg.) 60.71 58.28 55.72 53.96 52.69 51.45
Shot Speed (ft/s) 22.71 23.92 25.70 27.41 29.05 31.12
Flight Time (sec.) .81 .87 .97 1.05 1.14 1.24
5” Shot Angle (Deg.) 59.53 57.22 54.83 53.19 52.02 50.88
Shot Speed (ft/s) 22.18 23.44 25.27 27.03 28.70 30.81
Flight Time (sec.) .80 .87 .96 1.05 1.13 1.23
5’ 5” Shot Angle (Deg.) 58.28 56.12 53.91 52.41 51.34 50.31
Shot Speed (ft/s) 21.64 22.95 24.85 26.65 28.35 30.49
Flight Time (sec.) .79 .86 .96 1.05 1.13 1.23
6’ Shot Angle (Deg.) 56.98 54.99 52.97 51.62 50.65 49.73
Shot Speed (ft/s) 21.10 22.47 24.43 26.27 28.01 30.18
Flight Time (sec.) .78 .85 .95 1.04 1.13 1.23
6’ 6” Shot Angle (Deg.) 55.63 53.83 52.02 50.82 49.96 49.15
Shot Speed (ft/s) 20.57 21.99 24.01 25.90 27.66 29.87
Flight Time (sec.) .78 .85 .95 1.04 1.12 1.23
7’ Shot Angle (Deg.) 54.22 52.63 51.05 50.00 49.27 48.56
Shot Speed (ft/s) 20.04 21.52 23.60 25.52 27.33 29.57
Flight Time (sec.) .77 .84 .94 1.04 1.12 1.23
7’ 6” Shot Angle (Deg.) 52.76 51.40 50.06 49.18 48.56 47.97
Shot Speed (ft/s) 19.51 21.05 23.19 25.16 26.99 29.26
Flight Time (sec.) .76 .84 .94 1.03 1.12 1.23
8’ Shot Angle (Deg.) 51.26 50.15 49.07 48.35 47.86 47.38
Shot Speed (ft/s) 18.99 20.59 22.78 24.79 26.66 28.96
Flight Time (sec.) .76 .83 .94 1.03 1.12 1.22
8’ 6” Shot Angle (Deg.) 49.73 48.88 48.06 47.52 47.14 46.79
Shot Speed (ft/s) 18.48 20.13 22.38 24.43 26.33 28.66
Flight Time (sec.) .75 .83 .94 1.03 1.12 1.22
9’ Shot Angle (Deg.) 48.17 47.60 47.04 46.68 46.43 46.19
Shot Speed (ft/s) 17.98 19.68 21.99 24.08 26.00 28.36
Flight Time (sec.) .75 .83 .93 1.03 1.12 1.22
9’ 6” Shot Angle (Deg.) 46.59 46.30 46.02 45.84 45.72 45.60
Shot Speed (ft/s) 17.49 19.24 21.60 23.73 25.68 28.07
Flight Time (sec.) .75 .83 .93 1.03 1.12 1.22
10’ Shot Angle (Deg.) 45.00 45.00 45.00 45.00 45.00 45.00
Shot Speed (ft/s) 17.01 18.81 21.22 23.38 25.36 27.78
Flight Time (sec.) .75 .83 .93 1.03 1.12 1.22

In [22]:
%%writefile ml/feature_engineering/optimal_release_angle_metrics.py

"""
Module: feature_engineering_optimal_release_angle_metrics

Goal: Optimal release angle shooting feature engineering for basketball free throw predictions.

This module processes trial data to compute and validate optimal release angles based on theoretical models.
It includes functions to create reference tables, compute release angles, validate against models, and merge results
into granular and ML datasets.
"""

import numpy as np
import pandas as pd
import logging

# Configure logging
def configure_logging(debug: bool):
    """
    Configure logging based on the debug flag.

    Parameters:
    - debug: Boolean flag to set logging level.

    Returns:
    - logger: Configured logger instance.
    """
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG if debug else logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

    # Clear existing handlers to prevent duplicate logs
    if logger.hasHandlers():
        logger.handlers.clear()

    # Stream handler
    stream_handler = logging.StreamHandler()
    stream_handler.setLevel(logging.DEBUG if debug else logging.INFO)
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)

    return logger

# Initialize logger with default debug=False
logger = configure_logging(debug=False)

def create_optimal_angle_reference_table(debug: bool = False):
    """
    Create the expanded reference table for optimal release angles based on release height and shot distance.

    Parameters:
    - debug: Whether to enable debug output.

    Returns:
    - reference_df: DataFrame where rows represent release heights and columns represent distances.
    """
    global logger
    try:
        data = {
            9: [62.92, 61.85, 60.71, 59.53, 58.28, 56.98, 55.63, 54.22, 52.76, 51.26, 49.73, 48.17, 46.59, 45.00],
            11: [60.29, 59.31, 58.28, 57.22, 56.12, 54.99, 53.83, 52.63, 51.40, 50.15, 48.88, 47.60, 46.30, 45.00],
            14: [57.45, 56.60, 55.72, 54.83, 53.91, 52.97, 52.02, 51.05, 50.06, 49.07, 48.06, 47.04, 46.02, 45.00],
            17: [55.46, 54.72, 53.96, 53.19, 52.41, 51.62, 50.82, 50.00, 49.18, 48.35, 47.52, 46.68, 45.84, 45.00],
            20: [54.00, 53.35, 52.69, 52.02, 51.34, 50.65, 49.96, 49.27, 48.56, 47.86, 47.14, 46.43, 45.72, 45.00],
            24: [52.58, 52.02, 51.45, 50.88, 50.31, 49.73, 49.15, 48.56, 47.97, 47.38, 46.79, 46.19, 45.60, 45.00]
        }
        heights = [3.5, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 8.0, 8.5, 9.0, 9.5, 10.0]  # Heights in feet

        reference_df = pd.DataFrame(data, index=heights)
        reference_df.index.name = "Release Height (ft)"
        reference_df.columns.name = "Distance to Basket (ft)"

        if debug:
            logger.debug("Reference table created successfully.")
            logger.debug(f"Reference Table Shape: {reference_df.shape}")
            logger.debug("New Columns Added:")
            logger.debug(f"Columns: {reference_df.columns.tolist()} | Data Types: {reference_df.dtypes.to_dict()}")

        else:
            logger.info("Reference table created successfully.")

        return reference_df

    except Exception as e:
        logger.error(f"Error in create_optimal_angle_reference_table: {e}")
        return pd.DataFrame()

def get_optimal_angle(reference_df: pd.DataFrame, release_height: float, distance_to_basket: float = 15, debug: bool = False):
    """
    Get the optimal release angle using the reference table with interpolation.

    Parameters:
    - reference_df: DataFrame with optimal release angles.
    - release_height: Release height in feet.
    - distance_to_basket: Distance to basket in feet.
    - debug: Whether to enable debug output.

    Returns:
    - optimal_angle: The interpolated optimal release angle.
    """
    global logger
    try:
        interpolated_angles = reference_df.apply(
            lambda col: np.interp(release_height, reference_df.index, col)
        )
        optimal_angle = np.interp(distance_to_basket, interpolated_angles.index, interpolated_angles.values)

        if debug:
            logger.debug(f"Interpolated Angles Shape: {interpolated_angles.shape}")
            logger.debug("Interpolated Angles Summary:")
            logger.debug(f"Data Types: {interpolated_angles.dtypes}")
            logger.debug(f"Sample Values: {interpolated_angles.head().to_dict()}")

            logger.debug(f"Optimal Release Angle for Distance {distance_to_basket} ft: {optimal_angle:.2f}°")


        return optimal_angle

    except Exception as e:
        logger.error(f"Error in get_optimal_angle: {e}")
        return None

def compute_averaged_velocities(trial_data: pd.DataFrame, release_frame: int, frames_to_average: int = 3, debug: bool = False):
    """
    Compute the averaged velocities over multiple frames post-release.

    Parameters:
    - trial_data: DataFrame containing trial data.
    - release_frame: Index of the release frame.
    - frames_to_average: Number of frames to average post-release.
    - debug: Whether to enable debug output.

    Returns:
    - avg_velocity_x: Averaged velocity in the x-direction.
    - avg_velocity_z: Averaged velocity in the z-direction.
    """
    global logger
    try:
        post_release_frames = trial_data.loc[release_frame:release_frame + frames_to_average]

        avg_velocity_x = post_release_frames['ball_velocity_x'].mean()
        avg_velocity_z = post_release_frames['ball_velocity_z'].mean()

        if debug:
            logger.debug(f"Averaged Velocities Shape: {post_release_frames.shape}")
            logger.debug("Averaged Velocities Summary:")
            logger.debug(f"avg_velocity_x: {avg_velocity_x:.2f} ft/s | avg_velocity_z: {avg_velocity_z:.2f} ft/s")


        return avg_velocity_x, avg_velocity_z

    except Exception as e:
        logger.error(f"Error in compute_averaged_velocities: {e}")
        return None, None

def validate_against_theoretical_models(release_angle: float, optimal_release_angle: float, shot_outcome: int, debug: bool = False):
    """
    Validate calculated release parameters against theoretical models and shot outcomes.

    Parameters:
    - release_angle: Calculated release angle.
    - optimal_release_angle: Optimal release angle from theoretical models.
    - shot_outcome: Whether the shot was made (1) or missed (0).
    - debug: Whether to enable debug output.

    Returns:
    - None
    """
    global logger
    try:
        if debug:
            logger.debug("Validation Against Theoretical Models:")
            logger.debug(f"Release Angle Shape: N/A")
            logger.debug("New Columns Added: N/A")
            logger.debug(f"Actual Release Angle: {release_angle:.2f}°")
            logger.debug(f"Optimal Release Angle: {optimal_release_angle:.2f}°")
            logger.debug(f"Shot Outcome: {'Made' if shot_outcome == 1 else 'Missed'}")

        discrepancy = abs(optimal_release_angle - release_angle)
        if discrepancy > 5:
            logger.warning(f"Significant discrepancy of {discrepancy:.2f}° between actual and optimal release angles.")
            logger.warning("Consider revising calculations or incorporating additional factors (e.g., air resistance).")
        else:
            if debug:
                logger.debug(f"Discrepancy ({discrepancy:.2f}°) within acceptable range.")

    except Exception as e:
        logger.error(f"Error in validate_against_theoretical_models: {e}")

def analyze_release_all_trials(data: pd.DataFrame, reference_df: pd.DataFrame, distance_to_basket: float = 15, debug: bool = False):
    """
    Analyze the release angle and compare it with the optimal angle for all trials.
    Add results (angles and metrics) as new columns for the entire DataFrame.

    Parameters:
    - data: Full DataFrame containing all trial data.
    - reference_df: DataFrame with optimal release angles.
    - distance_to_basket: Distance from the player to the basket (ft).
    - debug: Whether to enable debug output.

    Returns:
    - data: Updated DataFrame with added columns for release analysis.
    """
    global logger
    try:
        # Initialize new columns with NaN
        new_columns = {
            'initial_release_angle': np.nan,
            'calculated_release_angle': np.nan,
            'angle_difference': np.nan,
            'distance_to_basket': distance_to_basket,
            'optimal_release_angle': np.nan
        }
        for col in new_columns:
            data[col] = new_columns[col]

        if debug:
            logger.debug(f"Added new columns: {list(new_columns.keys())}")
            logger.debug(f"DataFrame Shape after adding columns: {data.shape}")
        else:
            logger.info("Initialized new columns for release analysis.")

        # Iterate over each unique trial_id
        unique_trial_ids = data['trial_id'].unique()
        if debug:
            logger.debug(f"Number of unique trial IDs to process: {len(unique_trial_ids)}")

        # Initialize a counter
        processed_trials = 0

        for trial_id in unique_trial_ids:
            trial_data = data[data['trial_id'] == trial_id]

            # Increment the counter
            processed_trials += 1

            # Log detailed debug information for the first trial only
            log_detailed_debug = debug and (processed_trials == 1)

            if log_detailed_debug:
                logger.debug(f"=== Processing Trial ID: {trial_id} ===")

            # Identify the release frame for this trial
            release_frames = trial_data.index[trial_data['release_point_filter'] == 1].tolist()
            release_frame = release_frames[0] if release_frames else None
            if release_frame is None:
                logger.warning(f"No release frame identified for trial {trial_id}. Skipping.")
                continue

            if log_detailed_debug:
                logger.debug(f"Release Frame for Trial {trial_id}: {release_frame}")

            # Ensure required columns are present
            required_columns = ['ball_x', 'ball_y', 'ball_z', 'player_height_ft', 'ball_velocity_x', 'ball_velocity_z']
            missing_columns = [col for col in required_columns if col not in trial_data.columns]

            if missing_columns:
                logger.error(f"Missing Required Columns for trial {trial_id}: {missing_columns}")
                continue

            # Calculate release metrics
            release_height = trial_data.at[release_frame, 'ball_z']
            player_height = trial_data.at[release_frame, 'player_height_ft']

            if log_detailed_debug:
                logger.debug(f"Release Height: {release_height:.2f} ft")
                logger.debug(f"Player Height: {player_height:.2f} ft")

            # Calculate initial release angle
            ball_velocity_x = trial_data.at[release_frame, 'ball_velocity_x']
            ball_velocity_z = trial_data.at[release_frame, 'ball_velocity_z']
            initial_release_angle = np.degrees(np.arctan2(ball_velocity_z, ball_velocity_x))

            if log_detailed_debug:
                logger.debug(f"Initial Release Angle: {initial_release_angle:.2f}°")

            # Compute averaged velocities for calculated release angle
            avg_velocity_x, avg_velocity_z = compute_averaged_velocities(
                trial_data, release_frame, frames_to_average=3, debug=log_detailed_debug
            )
            if avg_velocity_x is None or avg_velocity_z is None:
                logger.error(f"Insufficient data to compute averaged velocities for trial {trial_id}.")
                continue

            calculated_release_angle = np.degrees(np.arctan2(avg_velocity_z, avg_velocity_x))

            if log_detailed_debug:
                logger.debug(f"Calculated Release Angle: {calculated_release_angle:.2f}°")

            # Calculate optimal release angle (only once per trial)
            optimal_release_angle = get_optimal_angle(
                reference_df, release_height, distance_to_basket, debug=log_detailed_debug
            )

            if optimal_release_angle is None:
                logger.error(f"Optimal release angle could not be determined for trial {trial_id}.")
                continue

            if log_detailed_debug:
                logger.debug(f"Optimal Release Angle: {optimal_release_angle:.2f}°")

            # Compute angle difference (direct subtraction)
            angle_difference = calculated_release_angle - optimal_release_angle

            if log_detailed_debug:
                logger.debug(f"Angle Difference: {angle_difference:.2f}°")

            # Populate new columns for all rows with the current trial_id
            data.loc[data['trial_id'] == trial_id, 'initial_release_angle'] = initial_release_angle
            data.loc[data['trial_id'] == trial_id, 'calculated_release_angle'] = calculated_release_angle
            data.loc[data['trial_id'] == trial_id, 'angle_difference'] = angle_difference
            data.loc[data['trial_id'] == trial_id, 'optimal_release_angle'] = optimal_release_angle

            if log_detailed_debug:
                logger.debug(f"Updated columns for Trial ID: {trial_id}")

            # Optional: Validate against theoretical models if shot outcome is available
            if 'shot_outcome' in trial_data.columns:
                shot_outcome = trial_data['shot_outcome'].iloc[0]  # Assuming same outcome per trial
                validate_against_theoretical_models(
                    calculated_release_angle,
                    optimal_release_angle,
                    shot_outcome,
                    debug=log_detailed_debug
                )

        if debug:
            logger.debug(f"Completed analysis for all trials. Final DataFrame Shape: {data.shape}")
        else:
            logger.info("Completed analysis for all trials.")

        return data  # Ensure this return is present outside the try-except block

    except Exception as e:
        logger.error(f"Error in analyze_release_all_trials: {e}")  # Corrected function name
        return data  # Ensure that even in exception, a DataFrame is returned


def check_duplicates(df: pd.DataFrame, df_name: str, debug: bool = False):
    """
    Check for duplicate trial_ids in the DataFrame.

    Parameters:
    - df: The DataFrame to check.
    - df_name: Name of the DataFrame (for logging purposes).
    - debug: Whether to enable debug output.

    Returns:
    - None
    """
    global logger
    try:
        duplicate_ids = df['trial_id'][df['trial_id'].duplicated()].unique()
        df_shape = df.shape

        if len(duplicate_ids) > 0:
            logger.warning(f"Duplicate trial_ids found in {df_name}: {duplicate_ids}")
            if debug:
                logger.debug(f"DataFrame Shape: {df_shape}")
        else:
            if debug:
                logger.debug(f"DataFrame Shape: {df_shape} | No duplicate trial_ids found in {df_name}.")
            else:
                logger.info(f"No duplicate trial_ids found in {df_name}.")

    except Exception as e:
        logger.error(f"Error in check_duplicates: {e}")

def log_trial_ids(data: pd.DataFrame, stage: str, debug: bool = False):
    """
    Log the unique trial IDs and their counts.

    Parameters:
    - data: DataFrame containing trial data.
    - stage: A string indicating the processing stage.
    - debug: Whether to enable debug output.

    Returns:
    - None
    """
    global logger
    try:
        trial_ids = data['trial_id'].unique()
        trial_id_counts = data['trial_id'].value_counts()
        if debug:
            logger.debug(f"=== Trial IDs at {stage} ===")
            logger.debug(f"Unique Trial IDs ({len(trial_ids)}): {trial_ids.tolist()}")
            logger.debug(f"Trial ID Counts: {trial_id_counts.to_dict()}")
        else:
            logger.info(f"Logged trial IDs at {stage}.")

    except Exception as e:
        logger.error(f"Error in log_trial_ids: {e}")


def aggregate_angles(granular_data: pd.DataFrame, debug: bool = False):
    """
    Aggregate the computed angles per trial.

    Parameters:
    - granular_data: DataFrame containing granular data with computed angles.
    - debug: Whether to enable debug output.

    Returns:
    - aggregated_df: DataFrame with one row per trial_id containing the angles and angle_difference.
    """
    global logger
    try:
        aggregated_df = granular_data.groupby('trial_id').agg({
            'initial_release_angle': 'first',
            'calculated_release_angle': 'first',
            'optimal_release_angle': 'first',
            'angle_difference': 'first'
        }).reset_index()

        if debug:
            logger.debug(f"Aggregated angles Shape: {aggregated_df.shape}")
            logger.debug("Aggregated Angles Columns and Data Types:")
            logger.debug(f"{aggregated_df.dtypes.to_dict()}")

        else:
            logger.info("Aggregated angles per trial successfully.")

        return aggregated_df

    except Exception as e:
        logger.error(f"Error in aggregate_angles: {e}")
        return pd.DataFrame()

def merge_final_ml_dataset(final_ml_df: pd.DataFrame, aggregated_angles_df: pd.DataFrame, debug: bool = False):
    """
    Merge the aggregated angles into the final ML dataset by trial_id.

    Parameters:
    - final_ml_df: DataFrame representing the final ML dataset.
    - aggregated_angles_df: DataFrame with aggregated angles per trial_id.
    - output_filename: The name of the output CSV file.
    - debug: Whether to enable debug output.

    Returns:
    - merged_df: The merged DataFrame.
    """
    global logger
    try:
        # Capture the shape before merging
        initial_shape = final_ml_df.shape

        # Merge on 'trial_id'
        merged_df = final_ml_df.merge(
            aggregated_angles_df,
            on='trial_id',
            how='left',
            validate='one_to_one'
        )

        # Capture the shape after merging
        final_shape = merged_df.shape

        if debug:
            logger.debug(f"Initial final_ml_df shape: {initial_shape}")
            logger.debug(f"Final merged_df shape: {final_shape}")

        # Check if any rows were added or removed
        if initial_shape[0] != final_shape[0]:
            logger.error(f"Row count mismatch after merging: Before={initial_shape[0]}, After={final_shape[0]}")
        else:
            if debug:
                logger.debug(f"Merge successful. Row count remains the same: {final_shape[0]} rows.")
            else:
                logger.info("Merge successful. Row count remains unchanged.")

        # Check for any NaN values in the merged angle columns
        angle_columns = ['initial_release_angle', 'calculated_release_angle', 'optimal_release_angle', 'angle_difference']
        missing_angles = merged_df[angle_columns].isna().any(axis=1)
        num_missing = missing_angles.sum()
        if num_missing > 0:
            logger.warning(f"{num_missing} trials in final_ml_df did not receive angle data.")
            if debug:
                logger.debug(f"Trials missing angle data: {merged_df.loc[missing_angles, 'trial_id'].unique()}")
        else:
            if debug:
                logger.debug("All trials in final_ml_df have corresponding angle data.")
            else:
                logger.info("All trials in final_ml_df have corresponding angle data.")

        return merged_df

    except Exception as e:
        logger.error(f"Error merging final_ml_df with aggregated angles: {e}")
        return final_ml_df

def compare_unique_trial_ids(original_df: pd.DataFrame, processed_df: pd.DataFrame, stage: str, debug: bool = False):
    """
    Compare unique trial IDs between two DataFrames.

    Parameters:
    - original_df: Original DataFrame before processing.
    - processed_df: Processed DataFrame after certain operations.
    - stage: A string indicating the comparison stage.
    - debug: Whether to enable debug output.

    Returns:
    - None
    """
    global logger
    try:
        original_ids = set(original_df['trial_id'])
        processed_ids = set(processed_df['trial_id'])
        missing_ids = original_ids - processed_ids
        extra_ids = processed_ids - original_ids

        if missing_ids:
            logger.error(f"Missing Trial IDs after {stage}: {missing_ids}")
        if extra_ids:
            logger.error(f"Extra Trial IDs after {stage}: {extra_ids}")
        if not missing_ids and not extra_ids:
            if debug:
                logger.debug(f"All Trial IDs are consistent after {stage}.")
            else:
                logger.info(f"All Trial IDs are consistent after {stage}.")

    except Exception as e:
        logger.error(f"Error in compare_unique_trial_ids: {e}")

def add_optimized_angles_to_granular(final_granular_df: pd.DataFrame, final_ml_df: pd.DataFrame, reference_df: pd.DataFrame, debug: bool = False):
    """
    Add optimized angles to the granular dataset.

    Parameters:
    - final_granular_df: DataFrame containing granular trial data.
    - final_ml_df: DataFrame containing ML data with player heights.
    - reference_df: DataFrame containing the reference table for optimal angles.
    - debug: Whether to display the outputs for debugging.

    Returns:
    - final_granular_df_with_optimal_release_angle: Updated granular dataset with optimized angles.
    """
    global logger
    try:
        # Merge player height into granular dataset
        merged_df = final_granular_df.merge(
            final_ml_df[['trial_id', 'player_height_in_meters']],
            on='trial_id',
            how='left',
            validate='many_to_one'
        )
        merged_df['player_height_ft'] = merged_df['player_height_in_meters'] * 3.28084

        if debug:
            logger.debug("Merged player_height_in_meters into granular dataset.")
            logger.debug(f"Merged DataFrame Shape: {merged_df.shape}")
        else:
            logger.info("Merged player heights into granular dataset.")

        # Analyze releases and add angles
        analyzed_df = analyze_release_all_trials(
            merged_df,
            reference_df,
            distance_to_basket=15,
            debug=debug
        )

        if debug:
            logger.debug("Final granular dataset with optimized angles created.")
            logger.debug(f"Final Granular DataFrame Shape: {analyzed_df.shape}")
        else:
            logger.info("Final granular dataset with optimized angles created.")

        return analyzed_df

    except Exception as e:
        logger.error(f"Error in add_optimized_angles_to_granular: {e}")
        return final_granular_df

def add_optimized_angles_to_ml(final_ml_df: pd.DataFrame, aggregated_angles_df: pd.DataFrame, debug: bool = False):
    """
    Merge optimized angles into the ML dataset.

    Parameters:
    - final_ml_df: DataFrame representing the ML dataset.
    - aggregated_angles_df: DataFrame with aggregated angles per trial_id.
    - debug: Whether to enable debug output.

    Returns:
    - final_ml_df_with_angles: Updated ML dataset with optimized angles.
    """
    global logger
    try:
        # Merge aggregated angles into ML dataset
        merged_df = merge_final_ml_dataset(final_ml_df, aggregated_angles_df, debug=debug)

        if debug:
            logger.debug("Final ML dataset with optimized angles created.")
            logger.debug(f"Final ML DataFrame Shape: {merged_df.shape}")
        else:
            logger.info("Merged optimized angles into ML dataset.")

        return merged_df

    except Exception as e:
        logger.error(f"Error in add_optimized_angles_to_ml: {e}")
        return final_ml_df

if __name__ == "__main__":
    try:
        # Define the debug flag
        debug = True  # Set to True for detailed logs
        # Reconfigure logging based on the debug flag
        logger = configure_logging(debug=debug)

        # Log trial IDs before processing
        log_trial_ids(final_granular_df, "Initial Load - Granular DF", debug=debug)
        log_trial_ids(final_ml_df, "Initial Load - ML DF", debug=debug)

        # Check for duplicates in final_ml_df instead of final_granular_df
        check_duplicates(final_ml_df, 'final_ml_df', debug=debug)

        # Create reference table
        reference_df = create_optimal_angle_reference_table(debug=debug)

        # Add optimized angles to granular dataset
        final_granular_df_with_optimal_release_angles = add_optimized_angles_to_granular(
            final_granular_df,
            final_ml_df,
            reference_df,
            debug=debug
        )

        # Aggregate angles (now includes angle_difference)
        aggregated_angles_df = aggregate_angles(final_granular_df_with_optimal_release_angles, debug=debug)

        # Add optimized angles to ML dataset
        final_ml_df_with_optimal_release_angles = add_optimized_angles_to_ml(
            final_ml_df,
            aggregated_angles_df,
            debug=debug
        )
        print(final_ml_df_with_optimal_release_angles)
        logger.info("All processing steps completed successfully.")

    except Exception as e:
        logger.critical(f"Critical error in main execution: {e}")


Overwriting ml/feature_engineering/optimal_release_angle_metrics.py


In [None]:
%%writefile ml/feature_engineering/energy_exhaustion_metrics.py

import pandas as pd
import numpy as np


def _print_debug_info(df_before, df_after, new_columns, step_name, debug):
    """
    Helper function to print debug information.

    Parameters:
    - df_before (pd.DataFrame): DataFrame before processing.
    - df_after (pd.DataFrame): DataFrame after processing.
    - new_columns (list): List of new columns added.
    - step_name (str): Name of the processing step.
    - debug (bool): Flag to enable debug printing.
    """
    if debug:
        print(f"Step: {step_name}")
        print(f"DataFrame shape before: {df_before.shape}")
        print(f"DataFrame shape after: {df_after.shape}")
        if new_columns:
            print(f"New columns added: {new_columns}")
            for col in new_columns:
                dtype = df_after[col].dtype
                sample = df_after[col].dropna().unique()[:5]
                print(f" - {col}: dtype={dtype}, sample values={sample}")
        print("-" * 50)
    else:
        print(f"Step '{step_name}' completed.")


def calculate_continuous_frame_time(df, debug=False):
    """
    Calculates continuous frame time across all trials.
    Resets time for each trial and maintains a cumulative time across all trials.
    """
    step_name = "Calculating continuous frame time"
    df_before = df.copy()

    # Ensure the DataFrame is sorted by trial_id and frame_time for calculations
    df = df.sort_values(by=['trial_id', 'frame_time']).reset_index(drop=True)

    # Calculate trial-relative time
    df['by_trial_time'] = df.groupby('trial_id')['frame_time'].transform(lambda x: x - x.min())

    # Add cumulative time across trials
    trial_offsets = df.groupby('trial_id')['by_trial_time'].max().cumsum().shift(fill_value=0)
    df['continuous_frame_time'] = df['by_trial_time'] + df['trial_id'].map(trial_offsets)

    df_after = df.copy()
    new_columns = ['by_trial_time', 'continuous_frame_time']

    # Validation
    if (df['continuous_frame_time'] < 0).any():
        raise ValueError("Continuous frame time contains negative values.")

    _print_debug_info(df_before, df_after, new_columns, step_name, debug)

    return df


def initialize_first_row_power(df, power_columns, debug=False):
    """
    Sets the first row of each trial's power columns to 0 to avoid NaN values in energy calculations.

    Parameters:
    - df (pd.DataFrame): The DataFrame containing the data.
    - power_columns (list): List of power column names to initialize.
    - debug (bool): Flag to enable debug printing.

    Returns:
    - pd.DataFrame: The DataFrame with initialized power columns.
    """
    step_name = "Initializing first row of power columns to 0 for each trial"
    df_before = df.copy()

    # Identify the first row index for each trial
    first_rows = df.groupby('trial_id').head(1).index.tolist()

    # Set the specified power columns to 0 for these first rows
    df.loc[first_rows, power_columns] = 0

    df_after = df.copy()
    new_columns = []  # No new columns added, only modifying existing ones

    if debug:
        added_info = {col: df_after[col].iloc[first_rows].unique().tolist() for col in power_columns}
        print(f"Step: {step_name}")
        print(f"Modified columns: {power_columns}")
        for col, vals in added_info.items():
            print(f" - {col}: sample values={vals}")
        print("-" * 50)
    else:
        print(f"Step '{step_name}' completed.")

    return df


def calculate_energy_metrics(df, power_columns, debug=False):
    """
    Calculates energy metrics for each joint and total energy per frame.
    """
    step_name = "Calculating energy metrics"
    df_before = df.copy()

    new_columns = []
    for power_col in power_columns:
        energy_col = power_col.replace('ongoing_power', 'energy')
        df[energy_col] = df[power_col] * df['dt']
        new_columns.append(energy_col)

    # Calculate total energy
    total_energy_columns = [col.replace('ongoing_power', 'energy') for col in power_columns]
    df['total_energy'] = df[total_energy_columns].sum(axis=1)
    new_columns.append('total_energy')

    df_after = df.copy()

    _print_debug_info(df_before, df_after, new_columns, step_name, debug)

    return df


def calculate_by_trial_energy(df, energy_columns, debug=False):
    """
    Calculates energy metrics (by-trial and overall) and exhaustion scores.
    """
    step_name = "Calculating by-trial energy and exhaustion scores"
    df_before = df.copy()

    new_columns = ['by_trial_energy', 'by_trial_exhaustion_score',
                   'overall_cumulative_energy', 'overall_exhaustion_score']

    # By-trial energy
    df['by_trial_energy'] = df.groupby('trial_id')['total_energy'].cumsum()
    # By-trial exhaustion score
    df['by_trial_exhaustion_score'] = (
        df.groupby('trial_id')['by_trial_energy']
        .transform(lambda x: x / x.max())
    )

    # Overall cumulative energy
    df['overall_cumulative_energy'] = df['total_energy'].cumsum()
    max_overall_cumulative_energy = df['overall_cumulative_energy'].max()
    df['overall_exhaustion_score'] = (
        df['overall_cumulative_energy'] / max_overall_cumulative_energy
    )

    df_after = df.copy()

    _print_debug_info(df_before, df_after, new_columns, step_name, debug)

    return df


def calculate_joint_energy_metrics(df, power_columns, debug=False):
    """
    Calculates per-joint by-trial energy and exhaustion scores.
    """
    step_name = "Calculating per-joint energy metrics"
    df_before = df.copy()

    new_columns = []
    for power_col in power_columns:
        energy_col = power_col.replace('ongoing_power', 'energy')
        # By-trial energy
        by_trial_col = f'{energy_col}_by_trial'
        df[by_trial_col] = df.groupby('trial_id')[energy_col].cumsum()
        new_columns.append(by_trial_col)

        # By-trial exhaustion score
        by_trial_exhaustion_col = f'{energy_col}_by_trial_exhaustion_score'
        df[by_trial_exhaustion_col] = (
            df[by_trial_col] /
            df.groupby('trial_id')[by_trial_col].transform('max')
        )
        new_columns.append(by_trial_exhaustion_col)

        # Overall cumulative energy
        overall_cumulative_col = f'{energy_col}_overall_cumulative'
        df[overall_cumulative_col] = df[energy_col].cumsum()
        new_columns.append(overall_cumulative_col)

        # Overall exhaustion score
        overall_exhaustion_col = f'{energy_col}_overall_exhaustion_score'
        df[overall_exhaustion_col] = (
            df[overall_cumulative_col] /
            df[overall_cumulative_col].max()
        )
        new_columns.append(overall_exhaustion_col)

    df_after = df.copy()

    _print_debug_info(df_before, df_after, new_columns, step_name, debug)

    return df


def validate_metrics(df, power_columns, debug=False):
    """
    Validates the calculated metrics for consistency.
    """
    step_name = "Validating metrics"
    df_before = df.copy()

    # Validation steps
    # Check continuous frame time
    if (df['continuous_frame_time'] < 0).any():
        raise ValueError("Continuous frame time contains negative values.")

    # Validate energy columns
    for power_col in power_columns:
        energy_col = power_col.replace('ongoing_power', 'energy')
        if energy_col not in df.columns:
            raise ValueError(f"Missing energy column: {energy_col}")
        if df[energy_col].isnull().any():
            raise ValueError(f"Energy column {energy_col} contains NaN values.")

    # Validate total energy calculation
    total_energy_columns = [col.replace('ongoing_power', 'energy') for col in power_columns]
    calculated_total_energy = df[total_energy_columns].sum(axis=1)
    if not calculated_total_energy.equals(df['total_energy']):
        raise ValueError("Total energy does not match the sum of individual energy columns.")

    # Validate exhaustion scores
    if not ((df['by_trial_exhaustion_score'] >= 0) & (df['by_trial_exhaustion_score'] <= 1)).all():
        raise ValueError("By-trial exhaustion scores are not normalized between 0 and 1.")
    if not ((df['overall_exhaustion_score'] >= 0) & (df['overall_exhaustion_score'] <= 1)).all():
        raise ValueError("Overall exhaustion scores are not normalized between 0 and 1.")

    # Per-joint metrics validation can be added similarly if needed

    new_columns = []  # No new columns added during validation

    _print_debug_info(df_before, df, new_columns, step_name, debug)

    return True


def main_granular_ongoing_exhaustion_pipeline(df, power_columns, debug=False):
    """
    Main pipeline to calculate and validate all metrics.

    Parameters:
    - df (pd.DataFrame): The input DataFrame.
    - power_columns (list): List of power column names.
    - debug (bool): Flag to enable debug printing.

    Returns:
    - pd.DataFrame: The processed DataFrame with all metrics calculated.
    """
    step_name = "Starting main pipeline"
    if debug:
        print(f"Step: {step_name}")
        print(f"Initial DataFrame shape: {df.shape}")
        print("-" * 50)
    else:
        print(f"Step '{step_name}' completed.")

    # Step 1: Calculate continuous frame time
    df = calculate_continuous_frame_time(df, debug=debug)

    # Step 2: Initialize first row of power columns to 0
    df = initialize_first_row_power(df, power_columns, debug=debug)

    # Step 3: Calculate energy metrics
    df = calculate_energy_metrics(df, power_columns, debug=debug)

    # Step 4: Calculate by-trial energy and exhaustion scores
    energy_columns = [col.replace('ongoing_power', 'energy') for col in power_columns]
    df = calculate_by_trial_energy(df, energy_columns, debug=debug)

    # Step 5: Calculate per-joint metrics
    df = calculate_joint_energy_metrics(df, power_columns, debug=debug)

    # Step 6: Validate metrics
    validate_metrics(df, power_columns, debug=debug)

    if debug:
        print("Step: Main pipeline completed successfully.")
        print(f"Final DataFrame shape: {df.shape}")
        print("-" * 50)
    else:
        print("Step 'Main pipeline' completed successfully.")

    return df


def summarize_joint_energy_by_trial(processed_df, power_columns, debug=False):
    """
    Summarizes joint energy metrics by trial for inclusion in the ML dataset.

    Args:
        processed_df (pd.DataFrame): Granular dataset containing joint energy metrics.
        power_columns (list): List of joint power columns to process.
        debug (bool): If True, prints debugging information.

    Returns:
        pd.DataFrame: Summary of joint energy metrics by trial.
    """
    step_name = "Summarizing joint energy metrics by trial"
    df_before = processed_df.copy()

    if debug:
        print(f"Step: {step_name}")
        print(f"Initial processed_df shape: {processed_df.shape}")
        print("-" * 50)

    # Replace power column names with their corresponding energy columns
    energy_columns = [col.replace('ongoing_power', 'energy') for col in power_columns]

    # Validate energy columns exist in processed_df
    missing_columns = [col for col in energy_columns if col not in processed_df.columns]
    if missing_columns:
        raise ValueError(f"Missing energy columns in processed_df: {missing_columns}")

    # Create summarized statistics for each trial
    summary = processed_df.groupby('trial_id')[energy_columns].agg(
        **{f'{col}_mean': (col, 'mean') for col in energy_columns},
        **{f'{col}_max': (col, 'max') for col in energy_columns},
        **{f'{col}_std': (col, 'std') for col in energy_columns}
    ).reset_index()

    df_after = summary.copy()
    new_columns = list(summary.columns)

    if debug:
        print(f"Summary DataFrame shape: {summary.shape}")
        print(f"Number of trials processed: {summary['trial_id'].nunique()}")
        print(f"Columns in summary DataFrame: {summary.columns.tolist()}")
        print("-" * 50)
    else:
        print(f"Step '{step_name}' completed.")

    return summary


def merge_joint_energy_with_ml_dataset(processed_df, final_ml_df, power_columns, debug=False):
    """
    Merges summarized joint energy metrics from the granular dataset into the ML dataset.

    Args:
        processed_df (pd.DataFrame): Granular dataset containing joint energy metrics.
        final_ml_df (pd.DataFrame): Machine learning dataset.
        power_columns (list): List of joint power columns to process.
        debug (bool): If True, prints debugging information.

    Returns:
        pd.DataFrame: Updated ML dataset with joint energy metrics added.
    """
    step_name = "Merging joint energy metrics into ML dataset"
    df_before = final_ml_df.copy()

    if debug:
        print(f"Step: {step_name}")
        print(f"Shape of processed_df before summarization: {processed_df.shape}")
        print(f"Shape of final_ml_df before merge: {final_ml_df.shape}")
        print("-" * 50)

    # Summarize joint energy by trial
    energy_summary = summarize_joint_energy_by_trial(processed_df, power_columns, debug=debug)

    # Identify overlapping columns between final_ml_df and energy_summary (excluding the merge key 'trial_id')
    overlapping_columns = set(final_ml_df.columns).intersection(set(energy_summary.columns)) - {'trial_id'}

    if overlapping_columns:
        if debug:
            print(f"Overlapping columns detected: {overlapping_columns}")

        # Rename overlapping columns in energy_summary
        energy_summary = energy_summary.rename(columns={col: f"{col}_summary" for col in overlapping_columns})
        if debug:
            print(f"Renamed overlapping columns in energy_summary: {list(energy_summary.columns)}")
            print("-" * 50)

    # Merge the summarized data into the ML dataset with suffixes to handle additional collisions
    try:
        pre_merge_columns = set(final_ml_df.columns)
        final_ml_df = final_ml_df.merge(energy_summary, on='trial_id', how='left', suffixes=('', '_merged'))

        # Check for duplicate columns and resolve them
        duplicated_columns = [col for col in final_ml_df.columns if col.endswith('_merged')]
        if duplicated_columns:
            if debug:
                print(f"Duplicated columns after merge: {duplicated_columns}")
            # Drop duplicated columns or handle them based on preference
            final_ml_df = final_ml_df.drop(columns=duplicated_columns)

        if debug:
            post_merge_columns = set(final_ml_df.columns)
            added_columns = post_merge_columns - pre_merge_columns
            print(f"Shape of final_ml_df after merge: {final_ml_df.shape}")
            print(f"Columns added during merge: {sorted(added_columns)}")
            print("-" * 50)
        else:
            print(f"Step '{step_name}' completed.")

    except Exception as e:
        raise RuntimeError(f"Error during merging: {e}")

    return final_ml_df


def output_dataset(dataset, filename="final_ml_dataset.csv"):
    """
    Outputs the final dataset to a file and prints a summary.

    Args:
        dataset (pd.DataFrame): The DataFrame to output.
        filename (str): The name of the output file (default: 'final_ml_dataset.csv').
    """
    step_name = "Outputting final dataset"
    print(f"[{step_name}]")

    # Save the dataset to a CSV file
    dataset.to_csv(filename, index=False)
    print(f"Dataset saved to {filename}")

    # Print a summary of the dataset
    print("Dataset Summary:")
    print(dataset.info())
    print("First few rows of the dataset:")
    print(dataset.head())
    print("-" * 50)


# Example Usage
if __name__ == "__main__":
    import logging

    # Configure logging if needed
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger(__name__)

    # Define power columns for joints
    power_columns = [
        'L_ANKLE_ongoing_power', 'R_ANKLE_ongoing_power',  # Ankles
        'L_KNEE_ongoing_power', 'R_KNEE_ongoing_power',    # Knees
        'L_HIP_ongoing_power', 'R_HIP_ongoing_power',      # Hips
        'L_ELBOW_ongoing_power', 'R_ELBOW_ongoing_power',  # Elbows
        'L_WRIST_ongoing_power', 'R_WRIST_ongoing_power',  # Wrists
        'L_1STFINGER_ongoing_power', 'R_1STFINGER_ongoing_power',  # Index fingers
        'L_5THFINGER_ongoing_power', 'R_5THFINGER_ongoing_power',   # Pinky fingers
        'L_1STTOE_ongoing_power', 'R_1STTOE_ongoing_power',  # Index fingers
        'L_5THTOE_ongoing_power', 'R_5THTOE_ongoing_power'   # Pinky fingers
    ]

    # Assuming final_granular_df_with_optimal_release_angles and final_ml_df_with_optimal_release_angles
    # are predefined DataFrames loaded elsewhere in the code.


    # Inspect trial IDs and their counts
    trial_ids = final_granular_df_with_optimal_release_angles['trial_id'].unique()
    trial_id_counts = final_granular_df_with_optimal_release_angles['trial_id'].value_counts()
    logger.debug(f"Unique trial IDs: {trial_ids}")
    logger.debug(f"Trial ID counts: {trial_id_counts.to_dict()}")

    # Check for missing or zero 'dt' values
    logger.debug(f"Missing dt values: {final_granular_df_with_optimal_release_angles['dt'].isnull().sum()}")
    logger.debug(f"Zero dt values: {(final_granular_df_with_optimal_release_angles['dt'] == 0).sum()}")

    # Run the pipeline with debug enabled
    final_granular_df_with_energy = main_granular_ongoing_exhaustion_pipeline(
        final_granular_df_with_optimal_release_angles,
        power_columns,
        debug=False
    )
    
    # Summarize and Merge joint energy metrics into ML dataset
    final_ml_df_with_energy = merge_joint_energy_with_ml_dataset(
        final_granular_df_with_energy,
        final_ml_df_with_optimal_release_angles,
        power_columns,
        debug=False
    )

    # # Output the datasets to files
    output_dataset(final_ml_df_with_energy, filename="../../data/processed/final_ml_dataset.csv")
    output_dataset(final_granular_df_with_energy, filename="../../data/processed/final_granular_dataset.csv")


Overwriting ml/feature_engineering/energy_exhaustion_metrics.py


In [18]:
%%writefile ml/feature_engineering/categorize_categoricals.py

"""
Automated Categorization Module
This script automates the categorization of continuous variables into bins 
with specified labels and applies transformations to multiple columns.

To use:
1. Define a bin configuration dictionary with the desired bins and labels.
2. Pass your DataFrame and configuration to the `transform_features_with_bins` function.

Author: Your Name
"""

import pandas as pd
import numpy as np
import pickle
import logging
import os

def categorize_column(df, column_name, bins, labels, new_column_name=None, debug=False):
    """
    Categorizes a column into bins with specified labels.

    Args:
        df (DataFrame): The dataset to transform.
        column_name (str): Name of the column to bin.
        bins (list): Bin edges for categorization.
        labels (list): Labels corresponding to each bin.
        new_column_name (str): Optional; name of the new column. Defaults to "<column_name>_category".
        debug (bool): If True, prints debugging information.

    Returns:
        Series: The newly categorized column as a pandas Series.
    """
    try:
        if new_column_name is None:
            new_column_name = f"{column_name}_category"

        # Apply binning
        categorized_column = pd.cut(df[column_name], bins=bins, labels=labels)

        if debug:
            print(f"\nBinning applied to '{column_name}' -> New column: '{new_column_name}'")
            print(pd.DataFrame({column_name: df[column_name], new_column_name: categorized_column}).head())

        return categorized_column
    except KeyError:
        print(f"Error: Column '{column_name}' not found in DataFrame.")
        return pd.Series(index=df.index)  # Return an empty series if the column is missing
    except Exception as e:
        print(f"Unexpected error while categorizing '{column_name}': {e}")
        return pd.Series(index=df.index)  # Return an empty series if there's an error

def transform_features_with_bins(df, bin_config, debug=False):
    """
    Applies binning transformations to multiple columns based on the provided configuration.

    Args:
        df (DataFrame): The dataset to transform.
        bin_config (dict): Configuration dictionary where keys are column names and values are
                           dictionaries with 'bins', 'labels', and optionally 'new_column_name'.
        debug (bool): If True, prints debugging information.

    Returns:
        DataFrame: A new DataFrame containing only the categorized columns.
    """
    categorized_df = pd.DataFrame(index=df.index)  # Initialize an empty DataFrame with the same index
    for column, config in bin_config.items():
        bins = config['bins']
        labels = config['labels']
        new_column_name = config.get('new_column_name', f"{column}_category")  # Default new column name
        categorized_df[new_column_name] = categorize_column(df, column, bins, labels, debug=debug)

    return categorized_df

def load_default_bin_config(config_path=None):
    """
    Loads the default bin configuration. If not present, creates and saves a default configuration.

    Args:
        config_path (str): Path to save/load the bin configuration. Defaults to '../../data/model/pipeline/category_bin_config.pkl'.

    Returns:
        dict: The bin configuration dictionary.
    """
    if config_path is None:
        config_path = os.path.join(os.path.dirname(__file__), '../../data/model/pipeline/category_bin_config.pkl')
    
    if os.path.exists(config_path):
        with open(config_path, 'rb') as f:
            bin_config = pickle.load(f)
    else:
        # Define default bin configuration
        bin_config = {
            'player_height_in_meters': {
                'bins': [0, 1.80, 2.00, np.inf],
                'labels': ["Short", "Average", "Tall"]
            },
            'player_weight__in_kg': {
                'bins': [0, 75, 95, np.inf],
                'labels': ["Lightweight", "Average", "Heavy"]
            },
            'player_estimated_wingspan_cm': {
                'bins': [0, 190, 220, np.inf],
                'labels': ["Small", "Medium", "Large"]
            },
            'player_estimated_standing_reach_cm': {
                'bins': [0, 230, 250, np.inf],
                'labels': ["Short", "Average", "Tall"]
            },
            'player_estimated_hand_length_cm': {
                'bins': [0, 20, 25, np.inf],
                'labels': ["Small", "Medium", "Large"]
            }
        }
        # Save the default bin configuration
        os.makedirs(os.path.dirname(config_path), exist_ok=True)
        with open(config_path, 'wb') as f:
            pickle.dump(bin_config, f)
    
    return bin_config

if __name__ == "__main__":
    # Example usage for testing
    debug = True

    # Define the path to save the bin configuration
    config_path = '../../data/model/pipeline/category_bin_config.pkl'

    # Load the category bin configuration
    bin_config = load_default_bin_config(config_path=config_path)

    file_path = "../../data/processed/final_ml_dataset.csv"
    final_ml_df = pd.read_csv(file_path)

    # Transform player features using the configuration
    categorized_columns_df = transform_features_with_bins(final_ml_df, bin_config, debug=debug)

    # Combine the original DataFrame with the categorized columns
    final_ml_df_categoricals = pd.concat([final_ml_df, categorized_columns_df], axis=1)

    # Debugging output
    if debug:
        print("\nFinal DataFrame with Categorized Features:")
        print(final_ml_df_categoricals.columns)


Overwriting ml/feature_engineering/categorize_categoricals.py


In [19]:
%%writefile ml/feature_engineering/ml_dataset_definitions.py
# ML Definition and Final ML dataset merge before Preprocessing pipeline


def get_ml_dataset_column_definitions():
    """
    Define column descriptions for the ML dataset, including unique key features for predictive modeling.

    Returns:
        column_definitions (dict): Dictionary where keys are column names and values are descriptions.
    """
    column_definitions = {
        # Outcome and trial metadata
        'result': "Binary indicator of shot outcome: 1 if made, 0 if missed.",
        'trial_id': "Unique identifier for each trial, formatted as 'Txxxx'.",
        'shot_id': "Sequential shot ID for organizing shots within a trial.",
        # Add new columns
        'initial_ball_release_angle': "The initially recorded ball release angle based on motion capture.",
        'calculated_ball_release_angle': "Ball release angle computed with adjusted trajectory factors.",
        'distance_to_basket': "Euclidean distance from the ball's release point to the basket plane.",
        'optimal_ball_release_angle': "Theoretical optimal release angle based on shot trajectory models.",
        'angle_difference': "Difference between the calculated and optimal ball release angle",
    
        # Landing and entry metrics
        'landing_x': "X coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.",
        'landing_y': "Y coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.",
        'entry_angle': "Angle at which the ball enters the hoop plane, measured in degrees to indicate entry precision.",

        # Joint power metrics (minimum, maximum, average, standard deviation)
        'L_ANKLE_min_power': "Minimum power generated by the left ankle during the motion.",
        'L_ANKLE_max_power': "Maximum power generated by the left ankle during the motion.",
        'L_ANKLE_avg_power': "Average power generated by the left ankle during the motion.",
        'L_ANKLE_std_power': "Standard deviation of the power generated by the left ankle during the motion.",
        'R_ANKLE_min_power': "Minimum power generated by the right ankle during the motion.",
        'R_ANKLE_max_power': "Maximum power generated by the right ankle during the motion.",
        'R_ANKLE_avg_power': "Average power generated by the right ankle during the motion.",
        'R_ANKLE_std_power': "Standard deviation of the power generated by the right ankle during the motion.",
        # Repeat similar definitions for L_KNEE, R_KNEE, L_HIP, R_HIP, L_ELBOW, R_ELBOW, L_WRIST, R_WRIST, L_1STFINGER, L_5THFINGER, R_1STFINGER, R_5THFINGER

        # Joint angles and release angles
        'elbow_max_angle': "Maximum angle achieved by the elbow during the motion.",
        'elbow_release_angle': "Angle of the elbow at the point of ball release.",
        'wrist_max_angle': "Maximum angle achieved by the wrist during the motion.",
        'wrist_release_angle': "Angle of the wrist at the point of ball release.",
        'knee_max_angle': "Maximum angle achieved by the knee during the motion.",
        'knee_release_angle': "Angle of the knee at the point of ball release.",

        # Release ball dynamics; defined by Meters Per Second
        'release_ball_speed': "Speed (m/s) of the ball at the point of release, derived from velocity components.",
        'release_ball_velocity_x': "Velocity of the ball along the x-axis at the point of release.",
        'release_ball_velocity_y': "Velocity of the ball along the y-axis at the point of release.",
        'release_ball_velocity_z': "Velocity of the ball along the z-axis at the point of release.",
        'release_ball_direction_x': "Normalized direction of the ball's velocity along the x-axis at the point of release.",
        'release_ball_direction_y': "Normalized direction of the ball's velocity along the y-axis at the point of release.",
        'release_ball_direction_z': "Normalized direction of the ball's velocity along the z-axis at the point of release.",
        'release_ball_x': "X coordinate of the ball's position at the release point.",
        'release_ball_y': "Y coordinate of the ball's position at the release point.",
        'release_ball_z': "Z coordinate of the ball's position (height) at the release point.",
        'release_frame_time': "Frame timestamp at the ball release point, relative to trial start.",
        'release_angle': "Angle of the ball's trajectory at the point of release.",
        'time_to_peak': "Time taken from ball release to the point where the ball reaches its peak height.",
        'peak_height_relative': "Difference between the release height and the peak ball height, representing the shot arc.",

        # Player characteristics
        'player_participant_id': "Unique identifier for the player, used for grouping and analysis.",
        'player_height_in_meters': "Player's height in meters, derived from participant data.",
        'player_weight__in_kg': "Player's weight in kilograms, derived from participant data.",
        'player_dominant_hand': "Player's dominant hand (e.g., 'Left' or 'Right'), based on shooting preference.",
        'player_estimated_wingspan_cm': "Estimated wingspan of the player in centimeters, derived from biometric data.",
        'player_estimated_standing_reach_cm': "Estimated standing reach of the player in centimeters, based on height and arm length.",
        'player_estimated_hand_length_cm': "Estimated hand length of the player in centimeters, derived from biometric data.",
    
        # Joint energy metrics
        'joint_energy': "Energy expended by a specific joint in a single frame, calculated as power multiplied by time delta.",
        'joint_energy_by_trial': "Cumulative energy expended by a specific joint within a trial.",
        'joint_energy_by_trial_exhaustion_score': "Normalized exhaustion score for a specific joint within a trial, scaled to the maximum trial energy.",
        'joint_energy_overall_cumulative': "Cumulative energy expended by a specific joint across all trials.",
        'joint_energy_overall_exhaustion_score': "Normalized exhaustion score for a specific joint across all trials, scaled to the maximum overall energy.",

        # Summarized Energy metrics
        'mean_energy': "Mean energy expended per trial, averaged across all frames.",
        'max_energy': "Maximum energy expended in a single frame during a trial.",
        'mean_exhaustion': "Mean exhaustion score for a trial, averaged across all frames.",
        'max_exhaustion': "Maximum exhaustion score for a trial, representing peak effort.",


    }

    return column_definitions



if __name__ == "__main__":

    # Output column definitions
    print("[Step: Final ML Dataset Column Definitions]")
    column_definitions = get_ml_dataset_column_definitions()
    for col, desc in column_definitions.items():
        print(f"{col}: {desc}")



Overwriting ml/feature_engineering/ml_dataset_definitions.py


In [9]:
%cd ml/

/workspaces/spl_freethrow_biomechanics_analysis_ml_prediction/notebooks/freethrow_predictions/ml


In [2]:
# %%writefile ml/load_and_feature_engineer_data.py

import os
import logging
import pandas as pd
import numpy as np  # Added numpy import for consistency
from ml.data_load_prepare.key_feature_extraction import load_player_info, get_column_definitions
from ml.data_load_prepare.main_load_and_prepare import bulk_process_directory
from ml.feature_engineering.ml_dataset_definitions import get_ml_dataset_column_definitions
from ml.feature_engineering.energy_exhaustion_metrics import (
    main_granular_ongoing_exhaustion_pipeline,
    merge_joint_energy_with_ml_dataset,
    output_dataset
)
from ml.feature_engineering.optimal_release_angle_metrics import (
    log_trial_ids,
    check_duplicates,
    create_optimal_angle_reference_table,
    add_optimized_angles_to_granular,
    aggregate_angles,
    add_optimized_angles_to_ml
)
from ml.feature_engineering.categorize_categoricals import (
    transform_features_with_bins,
    load_default_bin_config
)

def process_basketball_data(
    directory_path,
    player_info_path,
    output_ml_path,
    output_granular_path,
    power_columns=None,
    bin_config=None,  # New parameter to accept custom bin configurations
    debug=False,
    log_level=logging.INFO,
    new_data=True  # New parameter added
):
    """
    Processes basketball free throw data for feature engineering and ML dataset preparation.
    Can append to existing datasets or overwrite them based on the `new_data` flag.

    Parameters:
    - directory_path (str): Path to the data directory.
    - player_info_path (str): Path to the player information JSON file.
    - output_ml_path (str): Path to save the final ML dataset CSV.
    - output_granular_path (str): Path to save the final granular dataset CSV.
    - power_columns (list, optional): List of power column names. Defaults to predefined list.
    - bin_config (dict, optional): Custom bin configuration. If None, loads default configuration.
    - debug (bool, optional): Flag to enable debug mode. Defaults to False.
    - log_level (int, optional): Logging level. Defaults to logging.INFO.
    - new_data (bool, optional): If True, append to existing datasets. If False, overwrite. Defaults to True.

    Returns:
    - None
    """
    # Configure logging
    logging.basicConfig(
        level=log_level,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler()
        ]
    )
    logger = logging.getLogger(__name__)

    if power_columns is None:
        power_columns = [
            'L_ANKLE_ongoing_power', 'R_ANKLE_ongoing_power',  # Ankles
            'L_KNEE_ongoing_power', 'R_KNEE_ongoing_power',    # Knees
            'L_HIP_ongoing_power', 'R_HIP_ongoing_power',      # Hips
            'L_ELBOW_ongoing_power', 'R_ELBOW_ongoing_power',  # Elbows
            'L_WRIST_ongoing_power', 'R_WRIST_ongoing_power',  # Wrists
            'L_1STFINGER_ongoing_power', 'R_1STFINGER_ongoing_power',  # Index fingers
            'L_5THFINGER_ongoing_power', 'R_5THFINGER_ongoing_power'   # Pinky fingers
        ]

    logger.info("Starting basketball data processing pipeline.")

    try:
        # ---------Bulk Load-----------
        logger.debug("Starting bulk processing of data directory.")
        final_granular_df, final_ml_df = bulk_process_directory(
            directory_path, player_info_path, debug=debug
        )
        logger.info("Bulk processing completed.")
        
        # Output column definitions
        logger.info("[Step: Final Granular Dataset Column Definitions]")
        column_definitions = get_column_definitions()
        for col, desc in column_definitions.items():
            logger.info(f"{col}: {desc}")
        
        # Output ML Dataset Column Definitions
        logger.debug("Retrieving ML dataset column definitions.")
        column_definitions_ml = get_ml_dataset_column_definitions()
        logger.info("ML Dataset Column Definitions:")
        for col, desc in column_definitions_ml.items():
            logger.info(f"{col}: {desc}")

        #------------Feature Engineer----------
        logger.debug("Starting feature engineering.")

        # Log trial IDs before processing
        log_trial_ids(final_granular_df, "Initial Load - Granular DF", debug=debug)
        log_trial_ids(final_ml_df, "Initial Load - ML DF", debug=debug)

        # Check for duplicates in final_ml_df
        check_duplicates(final_ml_df, 'final_ml_df', debug=debug)

        # Create reference table
        reference_df = create_optimal_angle_reference_table(debug=debug)

        # Add optimized angles to granular dataset
        final_granular_df_with_optimal_release_angles = add_optimized_angles_to_granular(
            final_granular_df,
            final_ml_df,
            reference_df,
            debug=debug
        )

        # Aggregate angles (now includes angle_difference)
        aggregated_angles_df = aggregate_angles(final_granular_df_with_optimal_release_angles, debug=debug)

        # Add optimized angles to ML dataset
        final_ml_df_with_optimal_release_angles = add_optimized_angles_to_ml(
            final_ml_df,
            aggregated_angles_df,
            debug=debug
        )
        logger.info("Optimized release angles added to ML dataset.")

        # Run the energy exhaustion pipeline
        logger.debug("Running energy exhaustion pipeline.")
        final_granular_df_with_energy = main_granular_ongoing_exhaustion_pipeline(
            final_granular_df_with_optimal_release_angles,
            power_columns,
            debug=debug
        )
        logger.info("Energy exhaustion metrics computed.")

        # Merge energy metrics into ML dataset
        final_ml_df_with_energy = merge_joint_energy_with_ml_dataset(
            final_granular_df_with_energy,
            final_ml_df_with_optimal_release_angles,
            power_columns,
            debug=debug
        )
        logger.info("Energy metrics merged into ML dataset.")

        #-------------Categoricals Handling-------------
        logger.debug("Starting categoricals handling.")

        # Load bin configuration
        if bin_config is None:
            logger.debug("Loading default bin configuration.")
            bin_config = load_default_bin_config()
        else:
            logger.debug("Using provided bin configuration.")

        # Transform player features using the configuration
        categorized_columns_df = transform_features_with_bins(final_ml_df_with_energy, bin_config, debug=debug)

        # Combine the original ML DataFrame with the categorized columns
        final_ml_df_categoricals = pd.concat([final_ml_df_with_energy, categorized_columns_df], axis=1)

        # Debugging output
        if debug:
            logger.debug("\nFinal DataFrame with Categorized Features:")
            logger.debug(final_ml_df_categoricals.head())
            logger.debug("Categorized Columns:")
            logger.debug(final_ml_df_categoricals.columns)

        # ---------Handle Appending or Overwriting----------
        if new_data:
            logger.info("Appending new data to existing datasets.")

            # Handle ML Dataset
            if os.path.exists(output_ml_path):
                logger.debug(f"Loading existing ML dataset from {output_ml_path}.")
                existing_ml_df = pd.read_csv(output_ml_path)
                combined_ml_df = pd.concat([existing_ml_df, final_ml_df_categoricals], ignore_index=True)
                combined_ml_df.drop_duplicates(inplace=True)
                logger.info("Appended new data to ML dataset and removed duplicates.")
            else:
                logger.warning(f"ML dataset file {output_ml_path} does not exist. Creating a new one.")
                combined_ml_df = final_ml_df_categoricals.copy()

            # Handle Granular Dataset
            if os.path.exists(output_granular_path):
                logger.debug(f"Loading existing granular dataset from {output_granular_path}.")
                existing_granular_df = pd.read_csv(output_granular_path)
                combined_granular_df = pd.concat([existing_granular_df, final_granular_df_with_energy], ignore_index=True)
                combined_granular_df.drop_duplicates(inplace=True)
                logger.info("Appended new data to granular dataset and removed duplicates.")
            else:
                logger.warning(f"Granular dataset file {output_granular_path} does not exist. Creating a new one.")
                combined_granular_df = final_granular_df_with_energy.copy()

            # Output the combined datasets to files
            output_dataset(combined_ml_df, filename=output_ml_path)
            output_dataset(combined_granular_df, filename=output_granular_path)
            logger.info(f"Final datasets appended and saved to {output_ml_path} and {output_granular_path}.")

        else:
            logger.info("Overwriting existing datasets with new data.")

            # Output the datasets to files, overwriting existing ones
            output_dataset(final_ml_df_categoricals, filename=output_ml_path)
            output_dataset(final_granular_df_with_energy, filename=output_granular_path)
            logger.info(f"Final datasets overwritten and saved to {output_ml_path} and {output_granular_path}.")

        logger.info("Basketball data processing pipeline completed successfully.")

    except Exception as e:
        logger.exception("An error occurred during the processing pipeline.")
        raise e

if __name__ == "__main__":
    import logging
    import os
    
    print("current working directory:", os.getcwd())
    # Define paths
    directory_path = "../../data/basketball/freethrow/data/P0001"
    player_info_path = "../../data/basketball/freethrow/participant_information.json"
    output_ml_path = "../../data/processed/final_ml_dataset.csv"
    output_granular_path = "../../data/processed/final_granular_dataset.csv"

    # Optional: Define a new bin configuration if needed
    # new_bin_config = {
    #     'player_height_in_meters': {
    #         'bins': [0, 1.75, 1.95, np.inf],
    #         'labels': ["Short", "Medium", "Tall"]
    #     },
    #     # Add or modify other columns as needed
    # }

    # Call the processing function
    process_basketball_data(
        directory_path=directory_path,
        player_info_path=player_info_path,
        output_ml_path=output_ml_path,
        output_granular_path=output_granular_path,
        debug=True,  # Enable debug mode for detailed logs
        log_level=logging.DEBUG,  # Set logging level to DEBUG
        new_data=False,  # Set to True to append, False to overwrite
        bin_config=None  # Pass `new_bin_config` here if using a custom configuration
    )


2025-02-07 12:26:32,012 - INFO - Starting basketball data processing pipeline.
2025-02-07 12:26:32,013 - DEBUG - Starting bulk processing of data directory.


current working directory: c:\Users\ghadf\vscode_projects\docker_projects\spl_freethrow_biomechanics_analysis_ml_prediction\notebooks\freethrow_predictions
Debug: Loaded player info:
 {'player_participant_id': 'P0001', 'player_height_in_meters': 1.91, 'player_weight__in_kg': 90.7, 'player_dominant_hand': 'R', 'player_estimated_wingspan_cm': 202.46, 'player_estimated_standing_reach_cm': 225.72, 'player_estimated_hand_length_cm': 20.25}
Debug: Processing file: ../../data/basketball/freethrow/data/P0001\BB_FT_P0001_T0001.json
Debug: Calling main_create_dataframe with trial data and parameters.
Debug: Initializing dataframe creation with given trial metadata.
Debug: Processing frame 0 with time 0. Ball position: [nan, nan, nan]
Debug: Processing frame 1 with time 33. Ball position: [nan, nan, nan]
Debug: Processing frame 2 with time 66. Ball position: [nan, nan, nan]
Debug: Processing frame 3 with time 100. Ball position: [nan, nan, nan]
Debug: Processing frame 4 with time 133. Ball positi

2025-02-07 12:27:08,774 - INFO - Bulk processing completed.
2025-02-07 12:27:08,774 - INFO - [Step: Final Granular Dataset Column Definitions]
2025-02-07 12:27:08,775 - INFO - trial_id: Unique identifier for each trial, formatted as 'Txxxx'.
2025-02-07 12:27:08,775 - INFO - shot_id: Sequential shot ID for organizing shots within a trial.
2025-02-07 12:27:08,776 - INFO - result: Binary indicator of shot outcome: 1 if made, 0 if missed.
2025-02-07 12:27:08,776 - INFO - landing_x: X coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.
2025-02-07 12:27:08,776 - INFO - landing_y: Y coordinate of ball landing position on the hoop plane, measured in inches with the hoop front as origin.
2025-02-07 12:27:08,777 - INFO - entry_angle: Angle at which the ball enters the hoop plane, measured in degrees to indicate entry precision.
2025-02-07 12:27:08,778 - INFO - frame_time: Timestamp in milliseconds for each frame, relative to trial start.
2025-

Debug: Final granular DataFrame created with shape: (16047, 130)
Debug: Final ML DataFrame created with shape: (125, 89)


2025-02-07 12:27:09,443 - INFO - Optimized release angles added to ML dataset.
2025-02-07 12:27:09,443 - DEBUG - Running energy exhaustion pipeline.


Step: Starting main pipeline
Initial DataFrame shape: (16047, 137)
--------------------------------------------------
Step: Calculating continuous frame time
DataFrame shape before: (16047, 137)
DataFrame shape after: (16047, 139)
New columns added: ['by_trial_time', 'continuous_frame_time']
 - by_trial_time: dtype=int64, sample values=[  0  33  66 100 133]
 - continuous_frame_time: dtype=int64, sample values=[  0  33  66 100 133]
--------------------------------------------------
Step: Initializing first row of power columns to 0 for each trial
Modified columns: ['L_ANKLE_ongoing_power', 'R_ANKLE_ongoing_power', 'L_KNEE_ongoing_power', 'R_KNEE_ongoing_power', 'L_HIP_ongoing_power', 'R_HIP_ongoing_power', 'L_ELBOW_ongoing_power', 'R_ELBOW_ongoing_power', 'L_WRIST_ongoing_power', 'R_WRIST_ongoing_power', 'L_1STFINGER_ongoing_power', 'R_1STFINGER_ongoing_power', 'L_5THFINGER_ongoing_power', 'R_5THFINGER_ongoing_power']
 - L_ANKLE_ongoing_power: sample values=[0.0]
 - R_ANKLE_ongoing_powe

2025-02-07 12:27:09,695 - INFO - Energy exhaustion metrics computed.
2025-02-07 12:27:09,741 - INFO - Energy metrics merged into ML dataset.
2025-02-07 12:27:09,741 - DEBUG - Starting categoricals handling.
2025-02-07 12:27:09,742 - DEBUG - Loading default bin configuration.
2025-02-07 12:27:09,762 - DEBUG - 
Final DataFrame with Categorized Features:
2025-02-07 12:27:09,762 - DEBUG -    result  landing_x  landing_y  entry_angle  L_ANKLE_min_power  \
0       0      7.150     12.755        40.90           0.088235   
1       1     -2.288     12.661        43.97           0.074227   
2       0      7.397      6.421        41.50           0.149971   
3       0     -5.883      3.493        46.66           0.074227   
4       1     -0.641      8.974        41.91           0.117647   

   L_ANKLE_max_power  L_ANKLE_avg_power  L_ANKLE_std_power  R_ANKLE_min_power  \
0           0.928084           0.432471           0.314813           0.090909   
1           0.937493           0.469817        

 - L_ELBOW_energy_by_trial: dtype=float64, sample values=[0.         0.0130384  0.03446269 0.06248054 0.09338362]
 - L_ELBOW_energy_by_trial_exhaustion_score: dtype=float64, sample values=[0.         0.00259341 0.00685482 0.01242773 0.01857452]
 - L_ELBOW_energy_overall_cumulative: dtype=float64, sample values=[0.         0.0130384  0.03446269 0.06248054 0.09338362]
 - L_ELBOW_energy_overall_exhaustion_score: dtype=float64, sample values=[0.00000000e+00 1.78868400e-05 4.72779173e-05 8.57144311e-05
 1.28109061e-04]
 - R_ELBOW_energy_by_trial: dtype=float64, sample values=[0.         0.037      0.07476242 0.11541952 0.15832639]
 - R_ELBOW_energy_by_trial_exhaustion_score: dtype=float64, sample values=[0.         0.00605294 0.01223061 0.01888183 0.0259011 ]
 - R_ELBOW_energy_overall_cumulative: dtype=float64, sample values=[0.         0.037      0.07476242 0.11541952 0.15832639]
 - R_ELBOW_energy_overall_exhaustion_score: dtype=float64, sample values=[0.00000000e+00 4.43833737e-05 8.96813

2025-02-07 12:27:11,742 - INFO - Final datasets overwritten and saved to ../../data/processed/final_ml_dataset.csv and ../../data/processed/final_granular_dataset.csv.
2025-02-07 12:27:11,742 - INFO - Basketball data processing pipeline completed successfully.


Dataset saved to ../../data/processed/final_granular_dataset.csv
Dataset Summary:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16047 entries, 0 to 16046
Columns: 214 entries, trial_id to R_5THFINGER_energy_overall_exhaustion_score
dtypes: float64(205), int32(1), int64(7), object(1)
memory usage: 26.1+ MB
None
First few rows of the dataset:
  trial_id  result  landing_x  landing_y  entry_angle  frame_time  ball_x  \
0    T0001       0       7.15     12.755         40.9        2200  39.667   
1    T0001       0       7.15     12.755         40.9        2233  39.604   
2    T0001       0       7.15     12.755         40.9        2266  39.556   
3    T0001       0       7.15     12.755         40.9        2300  39.465   
4    T0001       0       7.15     12.755         40.9        2333  39.440   

   ball_y  ball_z  R_EYE_x  ...  R_1STFINGER_energy_overall_cumulative  \
0  -1.501   4.279   26.946  ...                               0.000000   
1  -1.514   4.348   26.970  ...           