# Baseball pitcher WAR calculation using Statcast data

by Lau Sze Yui (13/2/2019)

Here is an example on how to calculate Wins Over Replacement (WAR) using Statcast data.

[Baseball Reference](https://www.baseball-reference.com/about/war_explained.shtml) and [Fangraphs](https://library.fangraphs.com/misc/war/) both provide a detail disucssion and calculation steps for WAR. It's recommanded to read both sites before getting back to this calculation.

To retreive Statcast data from [Baseballsavant](https://baseballsavant.mlb.com/), python package [pybaseball](https://github.com/jldbc/pybaseball) is used. It can be installed via

```
pip install pybaseball
```

Sample script of downloading Statcast database to your computer and stored in SQL format is as follow:

In [None]:
import pybaseball

data_April = pybaseball.statcast(start_dt='2018-03-29', end_dt='2018-04-30')
data_May = pybaseball.statcast(start_dt='2018-05-01', end_dt='2018-06-01')
data_June = pybaseball.statcast(start_dt='2018-06-02', end_dt='2018-06-30')
data_July = pybaseball.statcast(start_dt='2018-07-01', end_dt='2018-08-01')
data_August = pybaseball.statcast(start_dt='2018-08-02', end_dt='2018-08-30')
data_Septemeber = pybaseball.statcast(start_dt='2018-08-31', end_dt='2018-10-01')

The following calculation assumes that Statcast_data is put in a table named 'statcast_year', e.g. data corresponding to year 2018 is
```
statcast_2018
```


In [None]:
import psycopg2 #postgreSQL
#import sqlite3 #SQLite
#import pymysql #MySQL
import sqlalchemy 


conn = psycopg2.connect("dbname='dbname' user='username' host='localhost' password='password'") #postgreSQL
#conn = sqlite3.connect('dbname.db') #SQLite
#conn  =  pymysql.connect (host='127.0.0.1',user='username',passwd = "password" ,db = 'dbname') #MySQL

cur = conn.cursor()

engine = sqlalchemy.create_engine('dialect://username:dialect@localhost/dbname', echo=False)

In [None]:
data_April.to_sql('statcast_2018', con=engine, if_exists='repalce')
data_May.to_sql('statcast_2018', con=engine, if_exists='append')
data_June.to_sql('statcast_2018', con=engine, if_exists='append')
data_July.to_sql('statcast_2018', con=engine, if_exists='append')
data_August.to_sql('statcast_2018', con=engine, if_exists='append')
data_Septemeber.to_sql('statcast_2018', con=engine, if_exists='append')

In [None]:
import pandas as pd
import numpy as np

The first things we do here is to calculate the number of innings pitched by each pither. We now collect events that contribute to 1,2 or 3 outs and sum together.

One inning is defined by three outs made so it would be a weighted sum of number of events. Also we would like to calculate number of games and number of starts in those games as well.

In [None]:
query = '''
SELECT 
pitcher,COUNT(DISTINCT(game_pk)) AS game_count



FROM statcast_2018

GROUP  BY pitcher,game_pk
'''
statcast_game_count = pd.read_sql(query,engine)

In [None]:
query = '''
SELECT 
pitcher,COUNT(DISTINCT(game_pk)) AS game_start



FROM statcast_2018
WHERE inning = 1 and outs_when_up = 0 AND pitch_number = 1 AND at_bat_number= 1
GROUP  BY pitcher,game_pk
'''
statcast_game_start = pd.read_sql(query,engine)

In [None]:
query = '''
SELECT 
pitcher,COUNT(pitcher) AS count_one



FROM statcast_2018
WHERE pitcher IS NOT NULL AND events NOT LIKE '%%double_play%%' AND events NOT LIKE '%%triple_play%%'
AND (events LIKE '%%out%%' OR events LIKE '%%caught%%' OR events LIKE '%%sac%%' OR events LIKE '%%choice%%')
GROUP  BY pitcher,outs_when_up,inning,game_pk
'''
statcast_all_inning_one = pd.read_sql(query,engine)

In [None]:
query = '''
SELECT 
pitcher,COUNT(pitcher) AS count_two



FROM statcast_2018
WHERE pitcher IS NOT NULL AND events  LIKE '%%double_play%%' AND events NOT LIKE '%%triple_play%%'
GROUP  BY pitcher,outs_when_up,inning,game_pk
'''
statcast_all_inning_two = pd.read_sql(query,engine)

In [None]:
query = '''
SELECT 
pitcher,COUNT(pitcher) AS count_three



FROM statcast_2018
WHERE pitcher IS NOT NULL AND events LIKE '%%triple_play%%'
GROUP  BY pitcher,outs_when_up,inning,game_pk
'''
statcast_all_inning_three = pd.read_sql(query,engine)

In [None]:
pitcher_one_out = statcast_all_inning_one.groupby(['pitcher']).count()['count_one']
pitcher_two_out = statcast_all_inning_two.groupby(['pitcher']).count()['count_two']
pitcher_three_out = statcast_all_inning_three.groupby(['pitcher']).count()['count_three']
pitcher_game_count = statcast_game_count.groupby(['pitcher']).count()['game_count']
pitcher_game_start = statcast_game_start.groupby(['pitcher']).count()['game_start']

pitcher_inning_df = pd.concat([pitcher_one_out,pitcher_two_out,pitcher_three_out,pitcher_game_count,pitcher_game_start],axis=1).fillna(0)
pitcher_inning_df['IP'] = (pitcher_inning_df['count_one'] + 2*pitcher_inning_df['count_two'] + 3*pitcher_inning_df['count_three'])/3

In [None]:
pitcher_inning_df = pitcher_inning_df[['game_count','game_start','IP']]
pitcher_inning_df.head()

The next thing we do is to collect expected weighted On Base Average(xwoba) data from Statcast database combining data with walks and strikeout. 

The standard wOBA formula is as follow:

$$wOBA = \frac{0.7 * BB + 0.9 * 1B + 1.25 * 2B + 1.6 * 3B + 2 * HR} {PA}$$

What Statcast does is that the system track the exit velocity and exit angle of eached batted ball and assign a expected wOBA value on it.

[Breakdown of estimated wOBA on Baseballsavant](https://baseballsavant.mlb.com/statcast_hit_probability)

In [None]:
query = '''
SELECT pitcher,estimated_woba_using_speedangle,woba_value,woba_denom,at_bat_number,game_pk,home_team,away_team,
CASE
    WHEN inning_topbot = 'Top' THEN home_team
    ELSE away_team
END AS pitch_team,
CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
     ELSE estimated_woba_using_speedangle
     END AS xwoba
     
 FROM statcast_2018
WHERE  woba_denom = 1 AND (events != 'intentional_walk' AND events != 'catcher_interf')




'''
statcast_pitcher_pitch_woba = pd.read_sql(query,engine)

Batting performance in baseball are dependent on the stadium the team play due to altitude, dimensions or other factors.

It is commonly referred as 'park factor'. The formula is simply:

$$Park Factor = \frac{xwOBA\, at\, home\, + opponent\, xwOBA\, at\, home}
{xwOBA\, at\, away\, + opponent\, xwOBA\, at\, away} * 100$$

To eliminate sample size effect, three years worth of data is used.

In [None]:
#Select one year data only

# query = '''

# SELECT home_team,away_team,
# CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
#      ELSE estimated_woba_using_speedangle
#      END AS xwoba

#  FROM statcast_2018
# WHERE  woba_denom = 1 AND (events != 'intentional_walk' AND events != 'catcher_interf')

# '''


# statcast_park_factor = pd.read_sql(query,engine)


In [None]:
query = '''
SELECT home_team,away_team,
CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
     ELSE estimated_woba_using_speedangle
     END AS xwoba

 FROM statcast_2016 
WHERE  woba_denom = 1 AND (events != 'intentional_walk' AND events != 'catcher_interf')

UNION ALL

SELECT home_team,away_team,
CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
     ELSE estimated_woba_using_speedangle
     END AS xwoba

 FROM statcast_2017 
WHERE  woba_denom = 1 AND (events != 'intentional_walk' AND events != 'catcher_interf')

UNION ALL

SELECT home_team,away_team,
CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
     ELSE estimated_woba_using_speedangle
     END AS xwoba

 FROM statcast_2018
WHERE  woba_denom = 1 AND (events != 'intentional_walk' AND events != 'catcher_interf')

'''


statcast_park_factor = pd.read_sql(query,engine)



In [None]:
home_xwoba = statcast_park_factor['xwoba'].groupby(statcast_pitcher_pitch_woba['home_team']).mean()
away_xwoba = statcast_park_factor['xwoba'].groupby(statcast_pitcher_pitch_woba['away_team']).mean()
park_factor = home_xwoba/away_xwoba*100

Also, American League in MLB adopted Designated Hitter (DH) rule which pitcher can be replaced by player when at bat, and thus AL as a better batting stat overall. 

We would like to consider the effect as well by adjusting to league average, or weighted average by Plate Appearance if the player has switch league during the season.

In [None]:
AL_team = ['BAL','BOS','CLE','CWS','DET','HOU','KC','LAA','MIN','NYY','OAK','SEA','TB','TEX','TOR']
NL_team = ['ARI','ATL','CHC','CIN','COL','LAD','MIA','MIL','NYM','PHI','PIT','SD','SF','STL','WSH']

AL_average = np.average(statcast_pitcher_pitch_woba[statcast_pitcher_pitch_woba['pitch_team'].isin(AL_team)]['xwoba'])
NL_average = np.average(statcast_pitcher_pitch_woba[statcast_pitcher_pitch_woba['pitch_team'].isin(NL_team)]['xwoba'])

AL_average_df = pd.DataFrame({'pitch_team':AL_team,'league_avg': AL_average, 'AL_team':1})
NL_average_df = pd.DataFrame({'pitch_team':NL_team,'league_avg': NL_average, 'AL_team':0})

statcast_pitcher_pitch_woba = statcast_pitcher_pitch_woba.merge(pd.DataFrame({'park_factor':  park_factor}),on='home_team')
statcast_pitcher_pitch_woba = statcast_pitcher_pitch_woba.merge(pd.concat([AL_average_df,NL_average_df]),on='pitch_team')

In [None]:
statcast_pitcher_pitch_woba

Then we will group xwOBA value of each pitcheres and calculate Runs Above Average (RAA) using the following formula:

$$ RAA = \frac{((wOBA*PF/100–lgAvg)}{wOBA Scale} * PA$$

[The calculation of wOBA scale is described in detail by Fangraphs here](https://library.fangraphs.com/principles/linear-weights/) To simplified the calculation, we assume wOBA scale = 1.2 here. 

In [None]:
statcast_pitcher_xwoba = statcast_pitcher_pitch_woba.groupby(['pitcher'])['xwoba'].mean()
pitcher_park_factor = statcast_pitcher_pitch_woba.groupby(['pitcher'])['park_factor'].mean()
pitcher_league_avg = statcast_pitcher_pitch_woba.groupby(['pitcher'])['league_avg'].mean()
pitcher_PA_count = statcast_pitcher_pitch_woba.groupby(['pitcher'])['league_avg'].count()
pitcher_AL_prop = statcast_pitcher_pitch_woba.groupby(['pitcher'])['AL_team'].mean()

In [None]:
pitcher_raa = -(statcast_pitcher_xwoba * pitcher_park_factor/100 - pitcher_league_avg )/1.2 * pitcher_PA_count

In [None]:
np.max(pitcher_raa)

In [None]:
pitcher_run_out = pd.DataFrame({'RAA':pitcher_raa,'PF':pitcher_park_factor,'league_avg':pitcher_league_avg,'AL_prop':pitcher_AL_prop})
pitcher_run_df =  pitcher_inning_df[['IP','game_count', 'game_start']].merge(pitcher_run_out,on='pitcher')
pitcher_run_df['RAA_per_game'] = pitcher_run_df['RAA'] / pitcher_run_df['game_count']

To convert runs to wins, since pitcher quality will directly affact the scoring enviornment, each pitchers are assigned with a specific runs to wins ratio. More details on [Baseball-reference.](https://www.baseball-reference.com/about/war_explained_runs_to_wins.shtml)

The calculation steps are as follow:

1) Calculate league average runs per out: 

In [None]:
query = '''
SELECT MAX(post_bat_score) AS run, 
CASE
    WHEN inning_topbot = 'Top' THEN home_team
    ELSE away_team
END AS pitch_team
 FROM statcast_2018
 WHERE post_bat_score IS NOT NULL
GROUP BY game_pk,pitch_team




'''
statcast_season_run = pd.read_sql(query,engine)

In [None]:
AL_run_per_out = statcast_season_run[statcast_season_run.pitch_team.isin(AL_team)]['run'].sum()/162/len(AL_team)/26.8
NL_run_per_out = statcast_season_run[statcast_season_run.pitch_team.isin(NL_team)]['run'].sum()/162/len(NL_team)/26.8

2) Calculate pitcher specific Pythagorean Win component by:

$$x = (53.6 * league\, average\, runs\, per\, out - pitcherRAA)^.285$$

In [None]:
pitcher_pythcom = (53.6 * (pitcher_run_df['AL_prop'] *AL_run_per_out + (1 - pitcher_run_df['AL_prop']) * NL_run_per_out)
                   - pitcher_run_df['RAA_per_game'])** 0.285

3) Calculate pitcher W-L%:

$$Win\% = \frac{4.14^x}{4.14^x + (4.14-pitcherRAAperGame)^x}$$

In [None]:
pithcher_pywin = 4.14**pitcher_pythcom / (4.14**pitcher_pythcom+ (4.14-pitcher_run_df['RAA_per_game'])**pitcher_pythcom)

And Win Above Average (WAA) per game = $Win\% - 0.5$

In [None]:
pitcher_run_df['WAA_per_game'] = (pithcher_pywin - 0.5)

In [None]:
max(pithcher_pywin )

We have finally arrived to calculate replacement level of pitcher. Replacement level is perhaps the most difficult concept for the whole calculation. Fangraphs has two great article on its concept:

[The Beginner’s Guide to Replacement Level](https://library.fangraphs.com/the-beginners-guide-to-replacement-level/)
[The Recent Examples of a Replacement Level Player](https://blogs.fangraphs.com/the-recent-examples-of-a-replacement-level-player/)

In modern baseball, reliever will throw in much fewer innings than starter and generally has a better performance. To account for this effect, the replacement level is set as :

$$Replacement\, level =  (0.03*\frac{1-Game\,Start}{Game\, played}+0.12*\frac{Game\, start}{Game\, played})*IP/9$$

In [None]:
pitcher_run_df['replacement_level'] = (0.03* (1- pitcher_run_df['game_start']/pitcher_run_df['game_count']) + 0.12* (pitcher_run_df['game_start']/pitcher_run_df['game_count'])) * pitcher_run_df['IP']/9

(unadjusted) WAR is simply $WAA\, per\, game * Game\, played +  replacement\, level$

In [None]:
pitcher_run_df['unadj_WAR'] = (pitcher_run_df['WAA_per_game'] * pitcher_run_df['game_count'] + pitcher_run_df['replacement_level'] )

We would like to reward pitcher who pitch in high leverage situation too, i.e. the game score is close, late inning or more players are on the base, since the pitcher would affect the chance of winning more. [Baseball referece has a long discussion on how leverage is calculated](https://www.baseball-reference.com/about/wpa.shtml).

Again to simplified the calculation the leverage index table is copied from [The book: playing the percentages in baseball](http://www.insidethebook.com/li.shtml).

To adjust for leverage, the formula is $$unadj_WAR * (1+ (gmLI/2)),$$

where gmLI is the leverage when the pitcher enter the game.

In [None]:
leverage_array = np.array(pd.read_csv('baseball_leverage.csv'))

In [None]:
query = '''
SELECT pitcher,at_bat_number,outs_when_up,inning,game_pk,home_team,on_1b,on_2b,on_3b,bat_score,fld_score,inning_topbot FROM statcast_2018
WHERE  (pitch_number,at_bat_number,outs_when_up,inning,game_pk) IN ( SELECT min(pitch_number),min(at_bat_number),min(outs_when_up),min(inning),game_pk FROM statcast_2018 GROUP BY pitcher,game_pk)

AND inning_topbot = 'Top'



'''
statcast_game_leverage_top = pd.read_sql(query,engine)

query = '''
SELECT pitcher,at_bat_number,outs_when_up,inning,game_pk,home_team,on_1b,on_2b,on_3b,bat_score,fld_score,inning_topbot FROM statcast_2018
WHERE  (pitch_number,at_bat_number,outs_when_up,inning,game_pk) IN ( SELECT min(pitch_number),min(at_bat_number),min(outs_when_up),min(inning),game_pk FROM statcast_2018 GROUP BY pitcher,game_pk)

AND inning_topbot = 'Bot'



'''
statcast_game_leverage_bot = pd.read_sql(query,engine)

statcast_game_leverage = pd.concat([statcast_game_leverage_top,statcast_game_leverage_bot]).drop_duplicates().reset_index(drop=True)

In [None]:
gm_leverage = []

for i,inning in enumerate(statcast_game_leverage['inning']):
    score_diff = statcast_game_leverage['bat_score'][i] - statcast_game_leverage['fld_score'][i] 
    
    if(pd.isna(statcast_game_leverage['on_1b'][i]) and pd.isna(statcast_game_leverage['on_2b'][i]) and pd.isna(statcast_game_leverage['on_3b'][i])):
        runner_offset = 0
    elif(pd.isna(statcast_game_leverage['on_2b'][i]) and pd.isna(statcast_game_leverage['on_3b'][i])):
        runner_offset = 1
    elif(pd.isna(statcast_game_leverage['on_1b'][i]) and pd.isna(statcast_game_leverage['on_3b'][i])) :
        runner_offset = 2
    elif(pd.isna(statcast_game_leverage['on_1b'][i]) and pd.isna(statcast_game_leverage['on_2b'][i])) :
        runner_offset = 3
    elif(pd.isna(statcast_game_leverage['on_3b'][i])):
        runner_offset = 4
    elif(pd.isna(statcast_game_leverage['on_2b'][i])):
        runner_offset = 5    
    elif(pd.isna(statcast_game_leverage['on_1b'][i])):
        runner_offset = 6
    else:
        runner_offset = 7
    if(statcast_game_leverage['inning_topbot'][i] == 'Top'):
        top_bot = 0
    else:
        top_bot = 1

    gm_leverage.append(leverage_array[int(48 * (min(statcast_game_leverage['inning'][i],9)-1)+  
                                  24 * top_bot + 
                                  8* statcast_game_leverage['outs_when_up'][i]+
                                 runner_offset)][int(min(max(score_diff,-4),4))+5])
    

In [None]:
statcast_game_leverage['gmLI'] = gm_leverage
pitcher_run_df = pitcher_run_df.merge(pd.DataFrame(statcast_game_leverage.groupby(['pitcher']).mean()[['gmLI']]),on='pitcher')

Both Fangraphs and baseball-reference had set the replacement level to .294, which means a team consisit of only replacement level players is going to win 29.4% of time or about 48 games.

The total number of win above replacement for 162 games and 30 teams league is $(0.5-0.294)*30*162 = 1001$ games. Note that due to playoff tiebreaker or other factor teams may not exactly play 162 games in a season.

Since pitcher and batter perform independently, it is better to award wins separately to two groups. Here 43% of wins are award to pitchers, same as Fangraphs calculation

In [None]:
total_war_leverage = np.sum(pitcher_run_df['unadj_WAR'] * (1+pitcher_run_df['gmLI'])/2)
total_war_leverage

Now we add the difference between unadjusted WAR and total WAR to pitcher per inning pitched.

In [None]:
query = '''
SELECT 
COUNT(DISTINCT(game_pk)) 



FROM statcast_2018
game_pk
'''
total_game = pd.read_sql(query,engine)['count'][0]

In [None]:
pitcher_run_df['pitch_WAR'] = (total_game*2 * (0.5-0.294) * 0.43 - total_war_leverage )/ \
                                np.sum(pitcher_run_df['IP']) * \
                                pitcher_run_df['IP'] + pitcher_run_df['unadj_WAR'] * (1+pitcher_run_df['gmLI'])/2

Pitcher is also at bat in NL park so we need to consider their contribution at bat too. Here  the replacement level of pitcher batting is same as league average xwoba for pitcher as well.

In [None]:
query = '''
SELECT batter AS pitcher,woba_denom,at_bat_number,game_pk,away_team,
CASE WHEN estimated_woba_using_speedangle IS NULL THEN woba_value
     ELSE estimated_woba_using_speedangle
     END AS xwoba
 FROM statcast_2018
WHERE  woba_denom = 1




'''
statcast_pitcher_bat_woba = pd.read_sql(query,engine)

In [None]:
pitcher_xwoba_bat = statcast_pitcher_bat_woba['xwoba'].groupby(statcast_pitcher_bat_woba['pitcher']).mean()
pitcher_pa_count = statcast_pitcher_bat_woba['xwoba'].groupby(statcast_pitcher_bat_woba['pitcher']).count()

Run per wins formula for batter used here is developed by Tom Tango: 
    
$$9*(MLB Runs Scored / MLB Innings Pitched)*1.5 + 3$$

In [None]:
batter_run_per_win = 9*np.sum(statcast_season_run['run'])/np.sum(pitcher_inning_df['IP'])*1.5+3

In [None]:
pitcher_bat_win = (pitcher_xwoba_bat - np.mean(statcast_pitcher_bat_woba['xwoba']))/1.2*pitcher_pa_count / batter_run_per_win
pitcher_run_df = pitcher_run_df.merge(pd.DataFrame({'bat_win':pitcher_bat_win}),on='pitcher')

By combining bat wins we can have total WAR of pitcher

In [None]:
pitcher_run_df['total_WAR'] = (pitcher_run_df['pitch_WAR']  + pitcher_run_df['bat_win']).round(2)

To convert player ID from MLBAM to pitcher, [Baseball Databank](https://github.com/chadwickbureau/baseballdatabank) data is used.

In [None]:
# url="https://raw.githubusercontent.com/chadwickbureau/register/master/data/people.csv"
# player_table=pd.read_csv(url)

player_table=pd.read_csv('people.csv')

In [None]:
pitcher_run_df = pitcher_run_df.merge(player_table[['key_mlbam','name_first','name_last']],left_on='pitcher',right_on='key_mlbam')
    

In [None]:
pitcher_run_df['player_full_name'] = pitcher_run_df['name_first'] + ' ' +  pitcher_run_df['name_last']

In [None]:
pitcher_run_df.head()

In [None]:
pitcher_run_df[['player_full_name', 'total_WAR']].sort_values(by=['total_WAR'],ascending=False).reset_index(drop=True)