In [1]:
import os
from sqlalchemy import create_engine
import pybaseball as pyb
import pandas as pd
from dotenv import load_dotenv


In [2]:

# Load environment variables from .env file
load_dotenv()

# Build the PostgreSQL connection string
DB_URL = f"postgresql://{os.environ['DB_USER']}:{os.environ['DB_PASS']}@{os.environ['DB_HOST']}:5432/{os.environ['DB_NAME']}"

# Create the engine object for connecting
engine = create_engine(DB_URL)

print("Database connection established.")

Database connection established.


In [None]:
# This is a sample function; your final ETL will be more complex.
def extract_batting_data(year=2025):
    # fgs is FanGraphs Season Stats
    df = pyb.batting_stats(year)
    # Rename columns to match your SQL schema (e.g., 'BB' for Walks, 'K' for Strikeouts)
    df = df.rename(columns={'ID': 'player_id', 'Tm': 'team_id'})
    return df

current_year_batting = extract_batting_data(2025)

In [26]:
def extract_historical_batting(start_year=2004, end_year=2025):
    """Pulls batting stats for a range of years from FanGraphs and combines them."""
    
    all_data_frames = []
    
    # Create the list of years to pull
    years = range(start_year, end_year + 1)
    
    for year in years:
        print(f"-> Pulling FanGraphs data for {year}...")
        try:
            # 1. Extraction: Pull data for a single year
            df = pyb.batting_stats(year)
            
            # CRITICAL: Add a 'Season' column for historical tracking
            df['Season'] = year 
            
            # 2. Transformation (Initial Rename/Select)
            # You must select and rename columns here to match your PostgreSQL schema
            
            # Example: Select and rename columns (Adjust this based on your exact schema!)
            df = df.rename(columns={'Name': 'player_name', 'PlayerId': 'player_id', 
                                    'G': 'games_played', 'HR': 'home_runs', 'SO': 'strikeouts'})
            
            # 3. Append: Add the processed year's data to the list
            all_data_frames.append(df)
            
        except Exception as e:
            print(f"ERROR: Failed to pull data for {year}: {e}")
            
    # 4. Concatenate: Merge all DataFrames into one large DataFrame
    if all_data_frames:
        historical_df = pd.concat(all_data_frames, ignore_index=True)
        print(f"✅ Successfully combined data from {len(years)} seasons into {len(historical_df)} total rows.")
        return historical_df
    
    return pd.DataFrame() # Return empty if no data was pulled

if __name__ == '__main__':
    full_batting_df = extract_historical_batting()
    
    # ... (Then proceed to your advanced metrics calculation and load_data function) ...

-> Pulling FanGraphs data for 2004...
-> Pulling FanGraphs data for 2005...
-> Pulling FanGraphs data for 2006...
-> Pulling FanGraphs data for 2007...
-> Pulling FanGraphs data for 2008...
-> Pulling FanGraphs data for 2009...
-> Pulling FanGraphs data for 2010...
-> Pulling FanGraphs data for 2011...
-> Pulling FanGraphs data for 2012...
-> Pulling FanGraphs data for 2013...
-> Pulling FanGraphs data for 2014...
-> Pulling FanGraphs data for 2015...
-> Pulling FanGraphs data for 2016...
-> Pulling FanGraphs data for 2017...
-> Pulling FanGraphs data for 2018...
-> Pulling FanGraphs data for 2019...
-> Pulling FanGraphs data for 2020...
-> Pulling FanGraphs data for 2021...
-> Pulling FanGraphs data for 2022...
-> Pulling FanGraphs data for 2023...
-> Pulling FanGraphs data for 2024...
-> Pulling FanGraphs data for 2025...
✅ Successfully combined data from 22 seasons into 3184 total rows.


In [3]:
import pybaseball as pyb
import pandas as pd
from datetime import date, timedelta

def extract_statcast_data(start_date, end_date):
    """Pulls granular, pitch-by-pitch data for a specified date range."""
    print(f"-> Pulling Statcast data from {start_date} to {end_date}...")
    
    # pybaseball statcast function is designed to handle this extraction
    raw_statcast_df = pyb.statcast(start_dt=start_date, end_dt=end_date)
    
    if raw_statcast_df is None or raw_statcast_df.empty:
        print("Warning: No Statcast data returned for this date range.")
        return pd.DataFrame()
        
    return raw_statcast_df


# Example
test_start_date = '2025-10-28'
test_end_date = '2025-10-30' 

daily_data = extract_statcast_data(test_start_date, test_end_date)
print(f"Successfully extracted {len(daily_data)} individual pitches/events.")

-> Pulling Statcast data from 2025-10-28 to 2025-10-30...
This is a large query, it may take a moment to complete


  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
100%|██████████| 3/3 [00:02<00:00,  1.48it/s]

Successfully extracted 581 individual pitches/events.



  final_data = pd.concat(dataframe_list, axis=0).convert_dtypes(convert_string=False)


In [None]:
import pybaseball as pyb
import pandas as pd
from datetime import date, timedelta
from sqlalchemy.engine import Engine
import pybaseball.cache

# Enable pybaseball caching to speed up repeated queries
pybaseball.cache.enable()

# Set the connection engine using your environment variables (as previously defined)
# engine = create_engine(DB_URL) 

def update_statcast_data(engine: Engine, days_to_keep: int = 400):
    """
    Pulls recent Statcast data and keeps a rolling window of data 
    in the PostgreSQL statcast_pitches table.
    """
    today = date.today()
    
    # We pull data from the end of last season (or roughly 400 days ago) 
    # up to yesterday to ensure we have a full window for rolling metrics.
    start_date = today - timedelta(days=days_to_keep) 
    end_date = today - timedelta(days=1)
    
    start_dt_str = start_date.strftime('%Y-%m-%d')
    end_dt_str = end_date.strftime('%Y-%m-%d')
    
    print(f"Starting Statcast ETL: Pulling data from {start_dt_str} to {end_dt_str}")
    
    try:
        # Use the general statcast function for league-wide pitch data
        # 'statcast' is an alias for the league-wide Statcast search
        df = pyb.statcast(start_dt=start_dt_str, end_dt=end_dt_str)

        if df.empty:
            print("No new Statcast data retrieved. Exiting.")
            return

        # 1. Cleaning/Selection (CRITICAL)
        # Select ONLY the columns you need to prevent errors during loading.
        # Statcast column names are long; we'll rename them to match the SQL table.
        df_clean = df.rename(columns={
            'game_date': 'game_date', 
            'game_pk': 'game_pk', 
            'inning': 'inning', 
            'batter': 'batter_id', 
            'pitcher': 'pitcher_id', 
            'stand': 'stand', 
            'p_throws': 'p_throws',
            'events': 'events',
            'description': 'description',
            'launch_speed': 'launch_speed', 
            'launch_angle': 'launch_angle',
            'bb_type': 'bb_type',
            'pitch_type': 'pitch_type',
            'release_speed': 'release_speed',
            'spin_rate': 'spin_rate'
        })

        # Only keep the columns that exist in our SQL schema
        columns_to_keep = [col for col in df_clean.columns if col in ['game_date', 'game_pk', 'inning', 'batter_id', 'pitcher_id', 'stand', 'p_throws', 'events', 'description', 'launch_speed', 'launch_angle', 'bb_type', 'pitch_type', 'release_speed', 'spin_rate']]
        final_df = df_clean[columns_to_keep]

        # # 2. Loading: Use 'replace' for the first run, then switch to 'append' 
        # # for a daily ETL to avoid re-downloading old data.
        # # Since we are pulling a large historical range, we should replace the table.
        final_df.to_sql('statcast_pitches', engine, if_exists='append', index=False, chunksize=5000)
        
        # print(f"✅ Successfully loaded {len(final_df)} rows of Statcast data into 'statcast_pitches'.")
        return final_df
    except Exception as e:
        print(f"❌ Statcast ETL Failed: {e}")

# Example Run (assuming 'engine' is defined with your credentials)
statcast_data_df = update_statcast_data(engine)

Starting Statcast ETL: Pulling data from 2024-11-03 to 2025-12-07
This is a large query, it may take a moment to complete
Skipping offseason dates
Skipping offseason dates


100%|██████████| 259/259 [00:11<00:00, 21.78it/s]
  final_data = pd.concat(dataframe_list, axis=0).convert_dtypes(convert_string=False)


In [6]:
import pybaseball as pyb
import pandas as pd
from datetime import date, timedelta
from sqlalchemy.engine import Engine
from sqlalchemy import text 
import pybaseball.cache # Ensure caching is imported

pybaseball.cache.enable() # Enable caching for reliability

def update_statcast_data(engine: Engine):
    """
    Pulls Statcast data starting from the day AFTER the last record in the database
    to ensure only new events are downloaded and appended.
    """
    
    today = date.today()
    
    # --- STEP 1: FIND LAST DATE IN DB ---
    try:
        # Query the database to find the latest game_date currently stored
        with engine.connect() as connection:
            result = connection.execute(
                text("SELECT MAX(game_date) FROM statcast_pitches;")
            ).scalar()
        
        # If the table is empty, start from 400 days ago (initial load range)
        if result is None:
            print("Database is empty. Starting full initial load (400 days)...")
            last_date = today - timedelta(days=400)
        else:
            # Start the new pull from the day AFTER the last record
            last_date = result.date()
            print(f"Latest game_date found in DB: {last_date.strftime('%Y-%m-%d')}")
            
    except Exception as e:
        print(f"❌ ERROR querying database for last date: {e}. Defaulting to last 5 days.")
        last_date = today - timedelta(days=5)

    
    # --- STEP 2: DEFINE NEW EXTRACTION RANGE ---
    start_date = last_date + timedelta(days=1)
    end_date = today - timedelta(days=1) # Pull up to yesterday, as today's games aren't finished

    start_dt_str = start_date.strftime('%Y-%m-%d')
    end_dt_str = end_date.strftime('%Y-%m-%d')

    if start_date >= end_date:
        print(f"Data is up to date as of {end_dt_str}. No new extraction needed.")
        return

    print(f"Starting DAILY Statcast ETL: Pulling data from {start_dt_str} to {end_dt_str}")
    
    # --- STEP 3: EXTRACTION ---
    try:
        df = pyb.statcast(start_dt=start_dt_str, end_dt=end_dt_str)
        
        if df is None or df.empty:
            print("No new Statcast data retrieved for this date range. Exiting.")
            return

        # --- STEP 4: CLEANING AND RENAMING ---
        
        # Rename columns to match the PostgreSQL 'statcast_pitches' table schema
        df_clean = df.rename(columns={
            'game_date': 'game_date', 
            'game_pk': 'game_pk', 
            'inning': 'inning', 
            'batter': 'batter_id', 
            'pitcher': 'pitcher_id', 
            'stand': 'stand', 
            'p_throws': 'p_throws',
            'events': 'events',
            'description': 'description',
            'launch_speed': 'launch_speed', 
            'launch_angle': 'launch_angle',
            'bb_type': 'bb_type',
            'pitch_type': 'pitch_type',
            'release_speed': 'release_speed',
            'spin_rate': 'spin_rate'
        })

        # Only keep the columns that exist in our SQL schema
        columns_to_keep = [
            'game_date', 'game_pk', 'inning', 'batter_id', 'pitcher_id', 'stand', 'p_throws', 
            'events', 'description', 'launch_speed', 'launch_angle', 'bb_type', 'pitch_type', 
            'release_speed', 'spin_rate'
        ]
        
        # Filter the DataFrame to include only the necessary columns
        final_df = df_clean[columns_to_keep].copy()
        
        # Handle data types before loading (optional, but good practice)
        final_df['game_date'] = pd.to_datetime(final_df['game_date'])

        # --- STEP 5: LOADING ---
        print(f"Loading {len(final_df)} new rows into 'statcast_pitches'...")
        
        final_df.to_sql(
            'statcast_pitches', 
            engine, 
            if_exists='append', # CRITICAL: Append new data to the existing table
            index=False, 
            chunksize=5000
        )
        
        print(f"✅ Successfully appended {len(final_df)} new rows of Statcast data.")

    except Exception as e:
        print(f"❌ Statcast ETL Failed during extraction or loading: {e}")
        



In [7]:
update_statcast_data(engine)

Latest game_date found in DB: 2025-11-01
Starting DAILY Statcast ETL: Pulling data from 2025-11-02 to 2025-12-07
This is a large query, it may take a moment to complete
Skipping offseason dates


100%|██████████| 14/14 [00:02<00:00,  6.00it/s]

No new Statcast data retrieved for this date range. Exiting.



