In [1]:
import pandas as pd
import pybaseball as pyb

$\textbf{Epidemiology: Ball Tracking Data Accumulation}$

- Done for __injured__ pitchers only (non-injured cohort still needs to be selected)
- Data pulled from start of injured season (+ spring training if present) up to the date of injury
    - Some gaps will exist between last pitch & injury date if IL date doesn't match exactly

In [2]:
# load cohort
cohort = pd.read_csv('cohort_data/d3_final_cohort.csv')
cohort['injury_date'] = pd.to_datetime(cohort['injury_date'], format='%Y-%m-%d')        # clean date format

$\textit{Query Ball Tracking Data}$

In [4]:
# initialize list to store dataframes
ball_tracking_data = []
no_data_players = []                                            # players w/ no data for reference

# iterate through each player and injury date
    # use list() for flexible restart
for i, row in list(cohort.iterrows()):
    
    # query params
    bam_id = row['mlbamid']                                     # get mlbam id
    injury_date = row['injury_date'].strftime('%Y-%m-%d')       # get injury date as a string
    start_date = f"{injury_date[:4]}-01-01"                     # set start date as 1/1 of year of injury  

    # pull data  
    try:
        player_data = pyb.statcast_pitcher(start_date, injury_date, player_id=bam_id)   # pull data
        
        # append to list if not empty
        if not player_data.empty:                                                       # check if data is empty
            ball_tracking_data.append(player_data)                                      # append to list
        else:
            no_data_players.append(row)                                                 # append to no data players list
    
    except Exception as e:
        print(f"Error retrieving data for {row['mlbamid']}: {e}")                          # print error message
        no_data_players.append(row)                                                     # append to no data players list

    print(f"Finished {i+1} of {len(cohort)} players.")                                  # print progress (follows "Gathering...")

Gathering Player Data
Finished 1 of 1380 players.
Gathering Player Data
Finished 2 of 1380 players.
Gathering Player Data
Finished 3 of 1380 players.
Gathering Player Data
Finished 4 of 1380 players.
Gathering Player Data
Finished 5 of 1380 players.
Gathering Player Data
Finished 6 of 1380 players.
Gathering Player Data
Finished 7 of 1380 players.
Gathering Player Data
Finished 8 of 1380 players.
Gathering Player Data
Finished 9 of 1380 players.
Gathering Player Data
Finished 10 of 1380 players.
Gathering Player Data
Finished 11 of 1380 players.
Gathering Player Data
Finished 12 of 1380 players.
Gathering Player Data
Finished 13 of 1380 players.
Gathering Player Data
Finished 14 of 1380 players.
Gathering Player Data
Finished 15 of 1380 players.
Gathering Player Data
Finished 16 of 1380 players.
Gathering Player Data
Finished 17 of 1380 players.
Gathering Player Data
Finished 18 of 1380 players.
Gathering Player Data
Finished 19 of 1380 players.
Gathering Player Data
Finished 20 of 138

In [5]:
# concatenate data
full_ball_tracking = pd.concat(ball_tracking_data, ignore_index=True)

# save to CSV (include date for last update reference)
full_ball_tracking.to_csv('ball_tracking_data/raw_0805.csv', index=False)

$\textit{Player Metadata}$

In [10]:
import requests

In [24]:
def get_player_metadata(
        mlbam_id: int,
        url: str = "https://statsapi.mlb.com/api/v1/people"
) -> dict:
    """ 
    Fetch player metadata from MLB API. 
    
    Args:
        mlbam_id (int): MLBAM ID of the player.
        url (str): Base URL for the MLB API.
    Returns:
        dict: Player metadata including height, weight, and mlbam ID.

    """
    response = requests.get(f"{url}/{mlbam_id}")
    data = response.json()
    
    if 'people' in data and len(data['people']) > 0:
        player_data = data['people'][0]
        return {
            'mlbam_id': player_data['id'],
            'full_name': player_data['fullName'],
            'height': convert_height_to_meters(player_data['height']),
            'mass': player_data['weight'] * 0.453592,
        }
    else:
        return {}
    
def convert_height_to_meters(height_str: str) -> float:
    """ 
    Convert height from string format to meters.
    
    Args:
        height_str (str): Height in the format "6' 4\"".
    Returns:
        float: Height in meters.
    """
    feet, inches = height_str.split("'")
    feet = int(feet.strip())
    inches = int(inches.strip().replace('"', ''))
    return round((feet * 12 + inches) * 0.0254, 3)


In [None]:
# initialize list to store player metadata
player_metadata_list = []

# loop through all players and get metadata
for pitcher_id in full_ball_tracking['pitcher'].unique():
    # get metadata, total number of pitches
    player_metadata = get_player_metadata(pitcher_id)
    total_pitches = full_ball_tracking[full_ball_tracking['pitcher'] == pitcher_id].shape[0]
    
    # save metadata if available
    if player_metadata:
        player_metadata['pitches_prior_to_injury'] = total_pitches
        player_metadata_list.append(player_metadata)
    else:
        print(f"No metadata found for pitcher ID: {pitcher_id}")

# create dataframe
player_metadata_df = pd.DataFrame(player_metadata_list)

$\textbf{Upload to S3}$

In [28]:
from connections import AWS

In [29]:
# initialize AWS connection
aws_connection = AWS()
aws_connection.connect()

[AWS]: Port 5433 is free.
[AWS]: Connected to RDS endpoint.


In [30]:
# run uploads 
aws_connection.upload_to_s3(
    full_ball_tracking,
    'epidemiology/cohorts/injured/statcast_data.csv'
)
aws_connection.upload_to_s3(
    player_metadata_df,
    'epidemiology/cohorts/injured/statcast_metadata.csv'
)


[AWS]: Uploaded object to s3://pitch-ml/epidemiology/cohorts/injured/statcast_data.csv


In [32]:
# close connection
aws_connection.close()

[AWS]: Database connection closed.
[AWS]: SSH tunnel stopped.
