In [1]:
from typing import List, Dict
from enum import Enum

import math
import random

import numpy as np
import pandas as pd

In [2]:
df_players = pd.read_csv('../../data/mlb/player_stats.csv', index_col=None)
df_players.season = df_players.season.astype(str)
df_players = df_players[np.logical_and(df_players.player == 'correca01', df_players.season == '2022')]
df_players = df_players.rename(columns={
    'SO': 'K'
})

columns = ['PA', 'AB', 'SH', 'SF', 'K', 'BB', 'HBP', '1B', '2B', '3B', 'HR', 'R']

correa = df_players[columns].to_dict('records')[0]

ichiro = {
    'AB': 704, ## Appearance
    'SH': 2, ## Sac Bunts
    'SF': 3, ## Sac Flys
    'K': 63,
    'BB': 49,
    'HBP': 4,
    '1B': 225,
    '2B': 24,
    '3B': 5,
    'HR': 8
}

df_teams = pd.read_csv('../../data/mlb/season_stats.csv', index_col=None)
df_teams.season = df_teams.season.astype(str)
df_teams = df_teams[np.logical_and(df_teams.season == '2022', df_teams.team == 17)]
df_teams = df_teams.rename(columns={
    'SO': 'K'
})

team_columns = columns + ['R/G']
twins = df_teams[team_columns].to_dict('records')[0]

In [3]:
twins

{'PA': 6113,
 'AB': 5476,
 'SH': 10,
 'SF': 46,
 'K': 1353,
 'BB': 518,
 'HBP': 62,
 '1B': 891.0,
 '2B': 269,
 '3B': 18,
 'HR': 178,
 'R': 696,
 'R/G': 4.3}

In [4]:
class EventCodes(Enum):
    Strikeout = 1
    Walk = 2
    HBP = 3
    Error = 4
    LongSingle = 5
    MediumSingle = 6
    ShortSingle = 7
    ShortDouble = 8
    LongDouble = 9
    Triple = 10
    HR = 11
    GIDP = 12
    NormalGroundBall = 13
    LineDriveInfieldFly = 14
    LongFly = 15
    MediumFly = 16
    ShortFly = 17
    NoAdvanceGroundBall = 18

Single = [EventCodes.LongSingle, EventCodes.MediumSingle, EventCodes.ShortSingle]
Double = [EventCodes.ShortDouble, EventCodes.LongDouble]
OnBase = Single + [EventCodes.Error]
SingleOut = [
    EventCodes.Strikeout,
    EventCodes.ShortFly,
    EventCodes.MediumFly,
    EventCodes.LongFly,
    EventCodes.LineDriveInfieldFly,
    EventCodes.NormalGroundBall,
    EventCodes.NoAdvanceGroundBall,
]

def validate_event_against_state(state: dict, event: EventCodes) -> EventCodes:
    if event == event.GIDP:
        is_double_play = state['bases'] == [1, 0, 0] or state['bases'] == [1, 1, 0] or state['bases'] == [1, 0, 1] or state['bases'] == [1, 1, 1]

        if state['outs'] == 2 or not is_double_play:
            return EventCodes.NoAdvanceGroundBall
        
    return event


def execute_event_on_state(state: dict, event: EventCodes) -> dict:
    if event in SingleOut:
        state['outs'] += 1

    if event == EventCodes.GIDP:
        state['outs'] += 2

    if state['outs'] == 3:
        return state

    if event == EventCodes.Error:
        state['bases'] = [1] + state['bases']

    if event in [EventCodes.Walk, EventCodes.HBP]:
        state['bases'] = [1] + state['bases']

        x = None
        for i, v in enumerate(state['bases']):
            if v == 0:
                x = i
                break
        
        if not x is None:
            state['bases'] = state['bases'][0:x] + state['bases'][x+1:] 

    elif event == EventCodes.NormalGroundBall:
        if state['bases'] == [0, 1, 1]:
            pass
        else:
            state['bases'] = state['bases'][:1] + [0] + state['bases'][1:]

    elif event == EventCodes.GIDP:
        if state['bases'] == [1, 0, 0]:
            state['bases'] = [0, 0, 0]

        elif state['bases'] == [1, 1, 0]:
            state['bases'] = [1, 0, 0]

        elif state['bases'] == [1, 0, 1]:
            state['bases'] = [0, 0, 0, 1]

        elif state['bases'] == [1, 1, 1]:
            state['bases'] = [0, 1, 1]

    elif event in Single:
        if event == EventCodes.LongSingle:
            state['bases'] = [0] + state['bases']

        if event == EventCodes.MediumSingle:
            state['bases'] = state['bases'][:1] + [0, 0] + state['bases'][1:]
    
        state['bases'] = [1] + state['bases']

    elif event == EventCodes.LongFly:
        state['bases'] = state['bases'][:1] + [0] + state['bases'][1:]

    elif event == EventCodes.MediumFly:
        state['bases'] = state['bases'][:2] + [0] + state['bases'][2:]

    elif event in Double:
        if event == EventCodes.LongDouble:
            state['bases'] = [0] + state['bases']

        state['bases'] = [0, 1] + state['bases']

    elif event == EventCodes.Triple:
        state['bases'] = [0, 0, 1] + state['bases']

    elif event == EventCodes.HR:
        state['bases'] = [0, 0, 0, 1] + state['bases']

    state['runs'] += sum(state['bases'][3:])
    state['bases'] = state['bases'][:3]

    return state

event = EventCodes.GIDP
state = { 'bases': [1,0,1], 'runs': 0, 'outs': 1}
event = validate_event_against_state(state, event)
execute_event_on_state(state, event)

{'bases': [1, 0, 1], 'runs': 0, 'outs': 3}

In [5]:
codes_to_test = [
    EventCodes.Strikeout,
    EventCodes.Walk,
    EventCodes.HBP,
    EventCodes.Error,
    EventCodes.LongSingle,
    EventCodes.MediumSingle,
    EventCodes.ShortSingle,
    EventCodes.ShortDouble,
    EventCodes.LongDouble,
    EventCodes.Triple,
    EventCodes.HR,
    EventCodes.GIDP,
    EventCodes.NormalGroundBall,
    EventCodes.LineDriveInfieldFly,
    EventCodes.LongFly,
    EventCodes.MediumFly,
    EventCodes.ShortFly,
]

runners = [ [0,0,0], [1,0,0], [1,1,0], [1,0,1], [0,1,1], [0,1,0], [0,0,1], [1,1,1]]
out = 0

assert_outcomes = {}

for bases in runners:

    header = ''.join(map(str, bases))
    ugh = []

    for code_to_test in codes_to_test:

        state = {
            'bases': bases,
            'runs': 0,
            'outs': out
        }

        ev = validate_event_against_state(state, code_to_test)
        runs = execute_event_on_state(state, ev)['runs']
        ugh.append(runs)

            
    assert_outcomes.update({ header: ugh })
        
df_assert = pd.DataFrame(assert_outcomes)
df_assert['c'] = codes_to_test

df_assert

Unnamed: 0,000,100,110,101,011,010,001,111,c
0,0,0,0,0,0,0,0,0,EventCodes.Strikeout
1,0,0,0,0,0,0,0,1,EventCodes.Walk
2,0,0,0,0,0,0,0,1,EventCodes.HBP
3,0,0,0,1,1,0,1,1,EventCodes.Error
4,0,0,1,1,2,1,1,2,EventCodes.LongSingle
5,0,0,1,1,2,1,1,2,EventCodes.MediumSingle
6,0,0,0,1,1,0,1,1,EventCodes.ShortSingle
7,0,0,1,1,2,1,1,2,EventCodes.ShortDouble
8,0,1,2,2,2,1,1,3,EventCodes.LongDouble
9,0,1,2,2,2,1,1,3,EventCodes.Triple


In [6]:
def create_historical_record(current_state, event):
    outcome = current_state.copy()

    outcome['event'] = event
    outcome['desc'] = f'{event.name}'

    return outcome

In [7]:
## baseball ref

## PA == PA
## AB == AB
## SB == SH

In [8]:
class PlayerStats():
    def __init__(self, data):
        self.data = data.copy()

        for key in ['SH', 'SF', 'K', 'BB', 'HBP', '1B', '2B', '3B', 'HR']:
            assert key in self.data

        assert 'PA' in self.data or 'AB' in self.data

        self.data['HITS'] = sum([ self.data[key] for key in ['1B', '2B', '3B', 'HR']])

        if not 'PA' in self.data:
            self.data['PA'] = sum([ self.data[key] for key in ['BB', 'HBP', 'AB', 'SH', 'SF']])

        self.data['E'] = math.floor(.018 * self.data['PA'])
        self.data['AtBats'] = sum([ self.data[key] for key in ['AB', 'SF', 'SH']])
        self.data['Outs'] = self.data['AtBats'] - sum([ self.data[key] for key in ['HITS', 'E', 'K']])

    def likelihoods(self):
        keys = [
            'E',
            'Outs',
            'K',
            'BB',
            'HBP',
            '1B',
            '2B',
            '3B',
            'HR'
        ]

        lh = {}
        for key in keys:
            lh[key] = self.data[key] / self.data['PA']
        
        return lh


player_stats = PlayerStats(ichiro)
player_stats.likelihoods()

{'E': 0.01706036745406824,
 'Outs': 0.4868766404199475,
 'K': 0.08267716535433071,
 'BB': 0.06430446194225722,
 'HBP': 0.005249343832020997,
 '1B': 0.2952755905511811,
 '2B': 0.031496062992125984,
 '3B': 0.006561679790026247,
 'HR': 0.010498687664041995}

In [9]:
class EventVariable():
    def __init__(self, key: str, probability: float, event_code: EventCodes):
        self.key = key
        self.probability = probability
        self.event_code = event_code

    def __repr__(self):
        return f'<EventVariables {self.key} - {self.probability}%>'

class EventVariableHierarchy(EventVariable):
    def __init__(self, key: str, probability: float, event_code: EventCodes=None, children: list = []):
        self.key = key
        self.probability = probability
        self.event_code = event_code
        self.children = children

    def __repr__(self):
        return f'<EventVariables {self.key} - {self.probability}%, C={len(self.children)}>'

class EventVariableHierarchyFactory():
    def create(self, likelihoods):
        return [
            EventVariableHierarchy(
                key='Error',
                probability=likelihoods['E'],
                event_code=EventCodes.Error
            ),
            EventVariableHierarchy(
                key='Outs',
                probability=likelihoods['Outs'],
                children=[
                    ## Grounders
                    EventVariableHierarchy(
                        key='Ground Out',
                        probability=.538,
                        children=[
                            EventVariableHierarchy(
                                key='Double Play',
                                probability=.5,
                                event_code=EventCodes.GIDP
                            ),
                            EventVariableHierarchy(
                                key='Normal Ground Out',
                                probability=.5,
                                event_code=EventCodes.NormalGroundBall
                            )
                        ]
                    ),
                    ## Infield
                    EventVariableHierarchy(
                        key='Infield Fly / Line Drive',
                        probability=.153,
                        event_code=EventCodes.LineDriveInfieldFly
                    ),
                    ## Fly
                    EventVariableHierarchy(
                        key='Fly Out',
                        probability=.309,
                        children=[
                            EventVariableHierarchy(
                                key='Long Fly Out',
                                probability=.2,
                                event_code=EventCodes.LongFly
                            ),
                            EventVariableHierarchy(
                                key='Medium Fly Out',
                                probability=.5,
                                event_code=EventCodes.MediumFly
                            ),
                            EventVariableHierarchy(
                                key='Short Fly Out',
                                probability=.3,
                                event_code=EventCodes.ShortFly
                            )
                        ]
                    )
                ]
            ),
            EventVariableHierarchy(
                key='K',
                probability=likelihoods['K'],
                event_code=EventCodes.Strikeout
            ),
            EventVariableHierarchy(
                key='BB',
                probability=likelihoods['BB'],
                event_code=EventCodes.Walk
            ),
            EventVariableHierarchy(
                key='HBP',
                probability=likelihoods['HBP'],
                event_code=EventCodes.HBP
            ),
            EventVariableHierarchy(
                key='1Bs',
                probability=likelihoods['1B'],
                children=[
                    EventVariableHierarchy(
                        key='Long 1B',
                        probability=.3,
                        event_code=EventCodes.LongSingle
                    ),
                    EventVariableHierarchy(
                        key='Medium 1B',
                        probability=.5,
                        event_code=EventCodes.MediumSingle
                    ),
                    EventVariableHierarchy(
                        key='Short 1B',
                        probability=.2,
                        event_code=EventCodes.ShortSingle
                    )
                ]
            ),
            EventVariableHierarchy(
                key='2B',
                probability=likelihoods['2B'],
                children=[
                    EventVariableHierarchy(
                        key='Short 2B',
                        probability=.8,
                        event_code=EventCodes.ShortDouble
                    ),
                    EventVariableHierarchy(
                        key='Long 2B',
                        probability=.2,
                        event_code=EventCodes.LongDouble
                    ),
                ]
            ),
            EventVariableHierarchy(
                key='3B',
                probability=likelihoods['3B'],
                event_code=EventCodes.Triple
            ),
            EventVariableHierarchy(
                key='HR',
                probability=likelihoods['HR'],
                event_code=EventCodes.HR
            ),
        ]

class EventVariableFactory():
    def __init__(self, hierarchy_factory = EventVariableHierarchyFactory()):
        self.hierarchy_factory = hierarchy_factory

    def create(self, likelihoods: Dict[str, float]):
        hierarchy = self.hierarchy_factory.create(likelihoods)
        return self.flatten_hierarchy(hierarchy)

    def flatten_hierarchy(self, event_variable_hierarchy: List[EventVariableHierarchy], parent_probability: float = 1):
        events = []
        for event_variable in event_variable_hierarchy:
            key = event_variable.key
            probability = event_variable.probability
            event_code = event_variable.event_code
            children = event_variable.children

            if event_variable.event_code is None:
                events.extend(
                    self.flatten_hierarchy(event_variable.children, parent_probability * probability)
                )
            else:
                events.append(EventVariable(key, parent_probability * probability, event_code))

        return events

event_variables = EventVariableFactory().create(player_stats.likelihoods())
event_variables

[<EventVariables Error - 0.01706036745406824%>,
 <EventVariables Double Play - 0.1309698162729659%>,
 <EventVariables Normal Ground Out - 0.1309698162729659%>,
 <EventVariables Infield Fly / Line Drive - 0.07449212598425196%>,
 <EventVariables Long Fly Out - 0.030088976377952756%>,
 <EventVariables Medium Fly Out - 0.07522244094488188%>,
 <EventVariables Short Fly Out - 0.04513346456692913%>,
 <EventVariables K - 0.08267716535433071%>,
 <EventVariables BB - 0.06430446194225722%>,
 <EventVariables HBP - 0.005249343832020997%>,
 <EventVariables Long 1B - 0.08858267716535433%>,
 <EventVariables Medium 1B - 0.14763779527559054%>,
 <EventVariables Short 1B - 0.05905511811023622%>,
 <EventVariables Short 2B - 0.025196850393700787%>,
 <EventVariables Long 2B - 0.006299212598425197%>,
 <EventVariables 3B - 0.006561679790026247%>,
 <EventVariables HR - 0.010498687664041995%>]

In [10]:
def get_prob_ranges(event_variables):
    i = 0

    ranges = []
    for ev in event_variables:
        ranges.append(
          ev.probability + i
        )

        i += ev.probability

    ranges[-1] = 1

    baseball_events = list(zip(ranges, event_variables))
    baseball_events

    return baseball_events

In [11]:
get_prob_ranges(event_variables)

[(0.01706036745406824, <EventVariables Error - 0.01706036745406824%>),
 (0.14803018372703414, <EventVariables Double Play - 0.1309698162729659%>),
 (0.279, <EventVariables Normal Ground Out - 0.1309698162729659%>),
 (0.353492125984252,
  <EventVariables Infield Fly / Line Drive - 0.07449212598425196%>),
 (0.3835811023622048, <EventVariables Long Fly Out - 0.030088976377952756%>),
 (0.45880354330708667, <EventVariables Medium Fly Out - 0.07522244094488188%>),
 (0.5039370078740157, <EventVariables Short Fly Out - 0.04513346456692913%>),
 (0.5866141732283464, <EventVariables K - 0.08267716535433071%>),
 (0.6509186351706037, <EventVariables BB - 0.06430446194225722%>),
 (0.6561679790026247, <EventVariables HBP - 0.005249343832020997%>),
 (0.744750656167979, <EventVariables Long 1B - 0.08858267716535433%>),
 (0.8923884514435695, <EventVariables Medium 1B - 0.14763779527559054%>),
 (0.9514435695538057, <EventVariables Short 1B - 0.05905511811023622%>),
 (0.9766404199475065, <EventVariables S

In [12]:
## assert ranges

globs = [0 for _ in list(enumerate(event_variables))]
probs = get_prob_ranges(event_variables)
iterations = 100000

for _ in range(iterations):
    rv = random.random()
    for i, tup in enumerate(probs):
        p = tup[0]
        if rv <= p:
            globs[i] += 1
            break


list(zip([ x / iterations for x in globs ], event_variables))

[(0.01735, <EventVariables Error - 0.01706036745406824%>),
 (0.13123, <EventVariables Double Play - 0.1309698162729659%>),
 (0.13053, <EventVariables Normal Ground Out - 0.1309698162729659%>),
 (0.07399, <EventVariables Infield Fly / Line Drive - 0.07449212598425196%>),
 (0.03006, <EventVariables Long Fly Out - 0.030088976377952756%>),
 (0.07541, <EventVariables Medium Fly Out - 0.07522244094488188%>),
 (0.04456, <EventVariables Short Fly Out - 0.04513346456692913%>),
 (0.08337, <EventVariables K - 0.08267716535433071%>),
 (0.06377, <EventVariables BB - 0.06430446194225722%>),
 (0.00547, <EventVariables HBP - 0.005249343832020997%>),
 (0.08981, <EventVariables Long 1B - 0.08858267716535433%>),
 (0.14709, <EventVariables Medium 1B - 0.14763779527559054%>),
 (0.05879, <EventVariables Short 1B - 0.05905511811023622%>),
 (0.02501, <EventVariables Short 2B - 0.025196850393700787%>),
 (0.00643, <EventVariables Long 2B - 0.006299212598425197%>),
 (0.0064, <EventVariables 3B - 0.00656167979002

In [13]:
def generate_event(event_variables):
    rv = random.random()
    probs = get_prob_ranges(event_variables)

    for p, ev in probs:
        if rv <= p:
            return ev.event_code


    raise ValueError('No Event Code was generated!')

In [14]:
def play_half_inning(event_variables):
    current_state = {
        'bases': [0, 0, 0],
        'runs': 0,
        'outs': 0,
    }

    player = {}

    history = []
    while current_state['outs'] < 3:
        event = validate_event_against_state(
            current_state,
            generate_event(event_variables)
        )
        
        current_state = execute_event_on_state(current_state, event)

        history.append(
            create_historical_record(current_state, event)
        )

    return history

play_half_inning(event_variables)

[{'bases': [0, 0, 0],
  'runs': 0,
  'outs': 1,
  'event': <EventCodes.ShortFly: 17>,
  'desc': 'ShortFly'},
 {'bases': [0, 0, 0],
  'runs': 0,
  'outs': 2,
  'event': <EventCodes.NormalGroundBall: 13>,
  'desc': 'NormalGroundBall'},
 {'bases': [1, 0, 0],
  'runs': 0,
  'outs': 2,
  'event': <EventCodes.LongSingle: 5>,
  'desc': 'LongSingle'},
 {'bases': [1, 1, 0],
  'runs': 0,
  'outs': 2,
  'event': <EventCodes.Walk: 2>,
  'desc': 'Walk'},
 {'bases': [1, 1, 0],
  'runs': 1,
  'outs': 2,
  'event': <EventCodes.MediumSingle: 6>,
  'desc': 'MediumSingle'},
 {'bases': [1, 1, 0],
  'runs': 1,
  'outs': 3,
  'event': <EventCodes.Strikeout: 1>,
  'desc': 'Strikeout'}]

In [15]:
avg_innings_per_game = (26.72 / 3)
iterations = 50000

def run_simulation(data):
    event_variables = EventVariableFactory().create(
        PlayerStats(data).likelihoods()
    )

    runs = 0
    for _ in range(iterations):
        half_inning = play_half_inning(event_variables)
        runs += half_inning[-1]['runs']

    return runs / iterations

In [16]:
ichiro_runs = run_simulation(ichiro)

print(ichiro_runs)
print(ichiro_runs * avg_innings_per_game, 'runs per game')
print((ichiro_runs * avg_innings_per_game) * 162, 'runs per season')


0.80012
7.126402133333333 runs per game
1154.4771456 runs per season


In [17]:
twins_runs = run_simulation(twins)

print(twins_runs)
print(twins_runs * avg_innings_per_game, 'runs per game', 'vs', twins['R/G'])
print((twins_runs * avg_innings_per_game) * 162, 'runs per season')

0.4799
4.274309333333333 runs per game vs 4.3
692.4381119999999 runs per season


In [18]:
correa_runs = run_simulation(correa)

print(correa_runs)
print(correa_runs * avg_innings_per_game, 'runs per game')
print((correa_runs * avg_innings_per_game) * 162, 'runs per season')

0.68996
6.145243733333333 runs per game
995.5294848 runs per season


In [19]:
def remove_player_from_team(team, player):
    team_minus_player = team.copy()

    for key in player.keys():
        team_minus_player[key] = team_minus_player[key] - player[key]

    return team_minus_player

In [20]:
twins_minus_correa_runs = run_simulation(
    remove_player_from_team(twins, correa)
)

print(twins_minus_correa_runs)
print(twins_minus_correa_runs * avg_innings_per_game, 'runs per game')
print((twins_minus_correa_runs * avg_innings_per_game) * 162, 'runs per season')

0.46984
4.184708266666666 runs per game
677.9227391999999 runs per season
