In [14]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt 
import seaborn as sns
import math

import requests
import json

import pybaseball
from pybaseball import statcast
pybaseball.cache.enable()

import os
import glob
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


import sys
import time

import matplotlib.pyplot as plt

%matplotlib inline
import numpy as np
import pandas as pd
from IPython.display import HTML

sys.path.append("code/.")

import mglearn
from IPython.display import display

# Classifiers and regressors
from sklearn.dummy import DummyClassifier, DummyRegressor

# Preprocessing and pipeline
from sklearn.impute import SimpleImputer

# train test split and cross validation
from sklearn.model_selection import cross_val_score, cross_validate, train_test_split
from sklearn.neighbors import KNeighborsClassifier, KNeighborsRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
    MinMaxScaler,
    OneHotEncoder,
    OrdinalEncoder,
    StandardScaler,
)
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier

pd.set_option("display.max_colwidth", 200)

In [15]:
# the 'full_training_data.csv' is generated in a different file (by Marcus) 
# only read if there is any file missing 
def read_csv_if_missing(regen = False): # set regen = True if want to regenerate the df
    missing=regen
    for key in range(1,21):
        path = f'data_2014_to_2024/metric_avg_game_{key}.csv'
        if not os.path.exists(path):
            missing=True
    path = f'data_2014_to_2024/metric_each_game.csv'
    if not os.path.exists(path): 
        missing = True 

    if missing==False:
        return None 
    df = pd.read_csv('data_2014_to_2024/full_training_data.csv', low_memory=False)
    return df 

In [16]:
def keep_n_prev_games(df, n_games):
    columns_to_keep = [
        col for col in df.columns
        if col.split('_')[-1].isdigit() and int(col.split('_')[-1]) <= n_games
    ]
    
    columns_to_keep += [col for col in df.columns if not col.split('_')[-1].isdigit()]

    return df[columns_to_keep]


In [17]:
def calc_sum_stats(df):
    '''
    given a df and specifying n_games, calculate the sum of each stat for each player (pitcher/batter1~9)
    and add as new cols of df
    colname style: f'{stat}_{pos}_{team}_{nth_game}'
    '''
    batter_stats = ['ab','bb','hbp','single','double','triple','hr','sf']
    pitcher_stats = ['n-pitches','ip','er','k','bb','h']

    df_new = df.copy()

    for team in ['away','home']:
        for pos in ['batter','pitcher']:
            if pos == 'batter':
                for order in range(1,10):
                    for stat in batter_stats:
                        prefix = f"{stat}_{pos}{order}_{team}_"
                        matching_cols = [col for col in df.columns if col.startswith(prefix)]
                        total_col_name = f"{stat}_{pos}{order}_{team}_total"
                        df_new.loc[:, total_col_name] = df[matching_cols].sum(axis=1)

            else: 
                for stat in pitcher_stats:
                    prefix = f"{stat}_{pos}_{team}_"
                    matching_cols = [col for col in df.columns if col.startswith(prefix)]
                    total_col_name = f"{stat}_{pos}_{team}_total"
                    df_new.loc[:, total_col_name] = df[matching_cols].sum(axis=1)

    # drop all unneeded cols (that ends with '_n' where n is n previous games)
    columns_to_drop = [
        col for col in df_new.columns
        if col.split('_')[-1].isdigit()
    ]
    df_dropped = df_new.drop(columns=columns_to_drop)
   
    return df_dropped



In [18]:
# batting 
def calc_ops(ab,bb,hbp,single,double,triple,hr,sf):
    '''
    calculate on base plus slugging average of a single batter  
    to calculate the rolling average, 
    simply take in the TOTAL VALUE of each param (up to last n games)
    '''
    inputs = [ab, bb, hbp, single, double, triple, hr, sf]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
     
    h = single+double+triple+hr

    if ab + bb + hbp + sf == 0:
        obp = 0  # If denominator is 0, set OBP to 0
    else:
        obp = (h + bb + hbp) / (ab + bb + hbp + sf)
    
    if ab == 0:
        slg = 0  # If denominator is 0, set SLG to 0
    else:
        slg = (single + 2 * double + 3 * triple + 4 * hr) / ab

    return obp + slg

def calc_ba(ab,bb,hbp,single,double,triple,hr,sf):
    inputs = [ab, bb, hbp, single, double, triple, hr, sf]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    h = single+double+triple+hr
    if ab == 0: 
        return 0
    return h/ab

def calc_obp(ab,bb,hbp,single,double,triple,hr,sf):
    inputs = [ab, bb, hbp, single, double, triple, hr, sf]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    h = single+double+triple+hr
    if ab + bb + hbp + sf == 0:
        obp = 0  # If denominator is 0, set OBP to 0
    else:
        obp = (h + bb + hbp) / (ab + bb + hbp + sf)
    return obp

def calc_slg(ab,bb,hbp,single,double,triple,hr,sf):
    inputs = [ab, bb, hbp, single, double, triple, hr, sf]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    if ab == 0:
        slg = 0  # If denominator is 0, set SLG to 0
    else:
        slg = (single + 2 * double + 3 * triple + 4 * hr) / ab
    return slg

# pitching

def calc_era(er,ip):
    '''
    calculate earned run average of a single pitcher
    to calculate the rolling average, 
    simply take in the TOTAL VALUE of each param (up to last 10 games)
    '''
    inputs = [er, ip]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    if ip == 0: 
        ip = 0.33
    return (er/ip)*9

def calc_k9(k,ip):
    inputs = [k, ip]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    if ip == 0: 
        ip = 0.33
    return (k/ip)*9

# def calc_bb9(bb,ip):
#     inputs = [bb, ip]
#     if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
#         return np.nan 
    
#     if ip == 0: 
#         ip = 0.33
#     return (bb/ip)*9

def calc_whip(bb,h,ip):
    inputs = [bb, h, ip]
    if any(x is None or (isinstance(x, float) and math.isnan(x)) for x in inputs):
        return np.nan 
    
    if ip == 0: 
        ip = 0.33
    return (bb+h)/ip





In [19]:
def make_metrics(df):

    home_outcome = df['home_outcome']
    df = df.copy() 

    suffixes = {
        int(col.split('_')[-1])
        for col in df.columns
        if col.split('_')[-1].isdigit()
    }
    n_games = len(suffixes)

    for team in ['away','home']:
        for game in range(1,max(1,n_games)+1):
            for pos in ['batter','pitcher']:
                if pos == 'batter':
                    for order in range(1,10):
                        if n_games == 0:
                            game = 'total'
                        df[f'batavg_{pos}{order}_{team}_{game}'] = df.apply(
                            lambda row: calc_ba(
                                row[f'ab_{pos}{order}_{team}_{game}'],
                                row[f'bb_{pos}{order}_{team}_{game}'],
                                row[f'hbp_{pos}{order}_{team}_{game}'],
                                row[f'single_{pos}{order}_{team}_{game}'],
                                row[f'double_{pos}{order}_{team}_{game}'],
                                row[f'triple_{pos}{order}_{team}_{game}'],
                                row[f'hr_{pos}{order}_{team}_{game}'],
                                row[f'sf_{pos}{order}_{team}_{game}']
                            ), axis = 1)
                        
                        df[f'obp_{pos}{order}_{team}_{game}'] = df.apply(
                            lambda row: calc_obp(
                                row[f'ab_{pos}{order}_{team}_{game}'],
                                row[f'bb_{pos}{order}_{team}_{game}'],
                                row[f'hbp_{pos}{order}_{team}_{game}'],
                                row[f'single_{pos}{order}_{team}_{game}'],
                                row[f'double_{pos}{order}_{team}_{game}'],
                                row[f'triple_{pos}{order}_{team}_{game}'],
                                row[f'hr_{pos}{order}_{team}_{game}'],
                                row[f'sf_{pos}{order}_{team}_{game}']
                            ), axis = 1)
                        
                        df[f'slg_{pos}{order}_{team}_{game}'] = df.apply(
                            lambda row: calc_slg(
                                row[f'ab_{pos}{order}_{team}_{game}'],
                                row[f'bb_{pos}{order}_{team}_{game}'],
                                row[f'hbp_{pos}{order}_{team}_{game}'],
                                row[f'single_{pos}{order}_{team}_{game}'],
                                row[f'double_{pos}{order}_{team}_{game}'],
                                row[f'triple_{pos}{order}_{team}_{game}'],
                                row[f'hr_{pos}{order}_{team}_{game}'],
                                row[f'sf_{pos}{order}_{team}_{game}']
                            ), axis = 1)

                else: 
                     if n_games == 0:
                            game = 'total'
                     df[f'era_{pos}_{team}_{game}'] = df.apply(
                         lambda row: calc_era(
                             row[f'er_{pos}_{team}_{game}'],
                             row[f'ip_{pos}_{team}_{game}'],
                         ), axis = 1)
                     
                     df[f'k9_{pos}_{team}_{game}'] = df.apply(
                         lambda row: calc_k9(
                             row[f'er_{pos}_{team}_{game}'],
                             row[f'k_{pos}_{team}_{game}'],
                         ), axis = 1)
                     
                     df[f'whip_{pos}_{team}_{game}'] = df.apply(
                         lambda row: calc_whip(
                             row[f'bb_{pos}_{team}_{game}'],
                             row[f'h_{pos}_{team}_{game}'],
                             row[f'ip_{pos}_{team}_{game}'],
                         ), axis = 1)
                     
    # drop all original cols 
    df_ops_era_cleaned = df[[col for col in df.columns 
                         if any(metric in col for metric in ['batavg', 
                                                             'obp', 
                                                             'slg', 
                                                             'era', 
                                                             'k9', 
                                                             'whip'])]]

    # add back target                 
    df_ops_era_cleaned['home_outcome'] = home_outcome # y: if home wins = 1, home lose = 0 
    return df_ops_era_cleaned 

---
# Making csv files
## 1. for rolling average of last n games 

In [20]:
def get_n_games_rolling_avg(df,n_games):
    '''
    from df, only select last n games 
    and calculate the rolling average for all the metrics
    '''

    df_only_n = keep_n_prev_games(df,n_games)
    df_summed = calc_sum_stats(df_only_n)
    df_metrics = make_metrics(df_summed)

    return df_metrics


In [21]:
# generate dfs and export to csvs
def rolling_avg_metrics_to_csv():
    dict_n_games = {}
    for key in range(1,21):
        path = f'data_2014_to_2024/metric_avg_game_{key}.csv'
        if not os.path.exists(path):
            this_df = get_n_games_rolling_avg(df, key)
            dict_n_games[key] = this_df
            this_df.to_csv(f'data_2014_to_2024/metric_avg_game_{key}.csv')

    if len(dict_n_games)==0:
        print("all required files are in the folder.")
        return None
    return dict_n_games
            


In [22]:
rolling_avg_metrics_to_csv()

all required files are in the folder.


## 2. metrics of each previous game (just one file)

In [23]:
# generate just one df and export to csv
# they contain data of every previous game (if there are any)
def each_game_metrics_to_csv():
    path = f'data_2014_to_2024/metric_each_game.csv'
    if os.path.exists(path): 
        print('the file is already in the folder.')
        return None
    # warning: the code below takes 455min to run last time!
    this_df = make_metrics(df)
    this_df.to_csv("data_2014_to_2024/metric_each_game.csv")
    return this_df

In [24]:
each_game_metrics_to_csv()

the file is already in the folder.
