In [None]:
%pip install treys
%pip install pokerenv

In [None]:
from pokerkit import Automation, NoLimitTexasHoldem, State, calculate_hand_strength, parse_range, Card, Deck, StandardHighHand
from typing import List
from tqdm import tqdm
from enum import Enum
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim

#### Agent Definition

In [3]:
class Action(Enum):
    FOLD = 0
    CALL = 1
    BET = 2

In [None]:
class Agent:
    def __init__(self, name: str = None, starting_stack: int = 0):
        self.name = name
        self.actions = []
        self.observations = []
        self.rewards = []
        self.starting_stack = starting_stack
        self.stack = starting_stack

    def reset(self):
        self.actions = []
        self.observations = []
        self.rewards = []
        self.stack = self.starting_stack
        return self
    
    def get_action(self, state: State):
        pass

    def get_player_state(self, state: State):
        # Calculate percent chance of winning
        win_percent = self._calculate_strength(state)
        
        stack = state.stacks[state.actor_index]
        pot = state.total_pot_amount

        min_bet = state.min_completion_betting_or_raising_to_amount
        max_bet = state.max_completion_betting_or_raising_to_amount
        
        return win_percent, stack, pot, min_bet, max_bet

    def _get_valid_actions(self, state: State):
        if state.can_fold():
            yield Action.FOLD
        if state.can_check_or_call():
            yield Action.CALL
        if state.can_complete_bet_or_raise_to():
            yield Action.BET

    def _calculate_strength(self, state: State, samples: int = 500):
        return calculate_hand_strength(
            state.player_count,
            parse_range(''.join([str(c.rank + c.suit) for c in state.hole_cards[state.actor_index]])),
            Card.parse(''.join([str(c[0].rank + c[0].suit) for c in state.board_cards])),
            2,
            5,
            Deck.STANDARD,
            (StandardHighHand,),
            sample_count=samples
        )

In [5]:
class ExampleRandomAgent(Agent):
    def __init__(self, name: str = None, stack: int = 1000):
        super().__init__(name, stack)

    def get_action(self, state: State) -> dict:
        cards, board, stack, pot = self.get_player_state(state)

        valid_actions = list(self._get_valid_actions(state))
            
        valid_bet_low = state.min_completion_betting_or_raising_to_amount
        valid_bet_high = state.max_completion_betting_or_raising_to_amount
        chosen_action = np.random.choice(valid_actions)

        bet_size = 0
        if chosen_action is Action.BET:
            bet_size = round(np.random.uniform(valid_bet_low, valid_bet_high))

        table_action = {
            'table_action': chosen_action,
            'bet_amount': bet_size
        }
        self.actions.append(table_action)
        return table_action

In [None]:
#@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(PolicyNetwork, self).__init__()
        self.hidden_layer = nn.Linear(input_size, hidden_size)    #the hidden layer with hidden_size neurons
        #nn.init.xavier_uniform_(self.hidden_layer.weight)     # Initialize the weights with Xavier initialization
        nn.init.normal_(self.hidden_layer.weight, mean = 0, std = 0.01)
        nn.init.normal_(self.hidden_layer.bias, mean = 0, std = 0.01)
        self.action_output = nn.Linear(hidden_size, 3)    #the output layer with outputs as prob of stopping, mean, and variance of normal
        self.bet_output = nn.Linear(hidden_size, 1)    #the output layer with outputs as prob of stopping, mean, and variance of normal

        
    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    def forward(self, s):
        '''A function to do the forward pass
            Takes:
                s -- the state representation
            Returns:
                a tensor of probabilities
        '''
        s = torch.relu(self.hidden_layer(s))    #pass through the hidden layer
        #mu_s2 = self.output_layer(s)    #use softmax to get action probabilities
        a = self.action_output(s)
        a = torch.softmax(s, dim=0)
        
        b = self.bet_output(s)
        b = torch.exp(b)/(1 + torch.exp(b))
        #s = torch.softmax(s, dim=0)    #use softmax to get action probabilities
        return a, b

In [None]:
#@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
class Agent():

    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@    
    def __init__(self, config):
        self.config = config
        self.mu_s2 = PolicyNetwork(5, self.config['hidden_layer_size'])    #init the policy model
        self.optimizer = optim.Adam(self.mu_s2.parameters(), lr=self.config['learning_rate'])    #init the optimizer

    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@   
    def action_probs(self, a, s):
        '''A function to compute the logged action probabilities.  This will used used for gradient updates.
            Takes:
                a -- -1 (stop) or float in [0,1]
                state -- float in [0,100]
            Returns:
                torch tensor
        '''
        actions, bet_ratio = self.mu_s2(torch.tensor([s]))    #compute stop prob, mean, sd
        if a == Action.FOLD:   #if the action was to stop...
            p = actions[0].view(1)
            log_p = torch.log(p)
        elif a == Action.CALL:    #if the action was to ask for a...
            p = actions[1].view(1)
            log_p = torch.log(p)
        else:    #if the action was to ask for a in [0,1]...
            p = actions[2].view(1)
            action_log_p = torch.log(p)

            low = torch.max(bet_ratio - self.config['action_var'],torch.tensor([0.0]))
            high = torch.min(bet_ratio + self.config['action_var'],torch.tensor([1.0]))
            U = torch.distributions.Uniform(low, high)
            bet_log_p = U.log_prob(torch.tensor(a['bet_ratio'])) 

            log_p = action_log_p + bet_log_p

        return log_p

    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@   
    def pi_action_generator(self, s):
        '''A function to generate an action.  This will be used to generate the data.
            Takes:
                state -- float in [0,100]
            Returns:
                 -1 or a in [0,1]

        '''
        actions, bet_ratio = self.mu_s2(torch.tensor([s]))    #generate policy parameters
        fold, call, bet = actions[0], actions[1], actions[2]    #unpack 
        bet_size = 0    #init the bet size

        action_chance = np.random.uniform()    #generate a random number to decide what to do
        if action_chance < float(fold):    #if we choose to stop...
            a = Action.FOLD    #set the relevant action
        elif action_chance < float(fold) + float(call):    #if we choose to ask for a...
            a = Action.CALL
        else:    #if not...
            a = Action.BET    #set the relevant action
            min_bet_size, max_bet_size = s[-2], s[-1]    #pull out the min and max bet sizes

            low = torch.max(bet_ratio - self.config['action_var'],torch.tensor([0.0]))
            high = torch.min(bet_ratio + self.config['action_var'],torch.tensor([1.0]))
            U = torch.distributions.Uniform(low, high)
            bet_ratio = float(U.sample())

            bet_size = (bet_ratio * (max_bet_size - min_bet_size)) + min_bet_size    #compute the bet size
            
        return {
            'table_action': a,
            'bet_size': bet_size,
            'bet_ratio': bet_ratio
        }

    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    def objective(self, log_probs, episode_return, b):
        '''A function to compute the objective
            Takes:
                log_probs -- tensor, the output from the forward pass
                causal_return -- tensor, the causal return as defined in lecture
                b -- float, the baseline as defined in lecture
        '''
        return -torch.sum(log_probs * (episode_return - b))

    #@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
    def update_pi(self, batch):
        '''A function to update the gradient of the agent.
            Takes:
                batch -- a list of dictionary containing episode histories
        '''
        objective = []    #init the objectives per episode   
        for j in range(self.config['N']):    #loop over episodes
            batch_j = batch[j]    #pull out episode j
            log_probs = []    #init the log probs for this episode
            for s,a in zip(batch_j['states'][:len(batch_j['states'])-1],batch_j['actions']):    #loop over state action pairs
                log_prob = self.action_probs(a,s)    #compute the log prob for this state action pair
                log_probs.append(log_prob)    #record
            log_probs = torch.stack(log_probs)    #reshape to compute gradient over the whole episode
            if self.config['causal_return']:    #if we use causal returns...
                batch_j_reward = batch_j['causal_return']    #set that
            else:    #if not...
                batch_j_reward = batch_j['total_return']    #use the total discounted reward
            objective.append(self.objective(log_probs, batch_j_reward, batch_j['baseline']))    #compute the objective function and record
        
        objective = torch.mean(torch.stack(objective))    #reshape
        
        #run the backward pass to compute gradients
        self.optimizer.zero_grad()    #zero gradients from the previous step
        objective.backward()    #compute gradients
        self.optimizer.step()    #update policy network parameters\n"


#### Environment

In [6]:
class TexasHoldemEnvironment():
    def __init__(self, config):
        self.config = config
        self.reset_game()
    
    def reset_game(self):
        self.game = NoLimitTexasHoldem(
            automations= (
                Automation.ANTE_POSTING,
                Automation.BET_COLLECTION,
                Automation.BLIND_OR_STRADDLE_POSTING,
                Automation.CARD_BURNING,
                Automation.HOLE_DEALING,
                Automation.BOARD_DEALING,
                Automation.HOLE_CARDS_SHOWING_OR_MUCKING,
                Automation.HAND_KILLING,
                Automation.CHIPS_PUSHING,
                Automation.CHIPS_PULLING
            ),
            ante_trimming_status=True,  # Uniform antes?
            raw_antes=0,  # Antes
            raw_blinds_or_straddles=self.config['blinds'],  # Blinds
            min_bet=self.config['min_bet'],  # Minimum bet
        )
        self.stacks = np.array([self.config['starting_stack'] for _ in range(self.config['player_count'])])  # Observed Agent stack will always be at position 0
        self.poker_round = None
        
    def reset_round(self, agents: List[Agent]):
        agent_stacks = [agent.stack for agent in agents]
        self.poker_round = self.game(raw_starting_stacks=agent_stacks, player_count=agent_stacks.__len__())

    def step(self, action):
        player_action = action['table_action']
        bet_amount = action['bet_amount']

        # Update the environment state with the player action
        if player_action == Action.FOLD:
            self.poker_round.fold()
        elif player_action == Action.CALL:
            self.poker_round.check_or_call()
        elif player_action == Action.BET:
            self.poker_round.complete_bet_or_raise_to(bet_amount)
        
        done = not self.poker_round.status  # Check if the round is done

        # Update the environment stacks if the round is done
        if done:
            self.stacks = self.poker_round.stacks

        return {'state': self.poker_round, 'reward': 0, 'done': done}

#### Running the Environment

In [None]:
#run training loop
performance = {'stopping_state':[],'terminal_reward':[]}
for b in tqdm(range(agent.config['B'])):    #loop over batches
    baseline = 0    #init the baseline
    batch = []    #init the batch
    for _ in range(agent.config['N']):    #loop over episodes
        done = False    #set stopping condition
        gamma_array = [1]    #init the discounting
        states = [proc.state]    #init the state history
        actions = []    #init the action history
        rewards = []    #init the reward history
        while not done:    #while the stopping condition is not satisfied...
            a = agent.pi_action_generator(proc.state)    #sample an action according to the policy
            update = proc.step(a)    #evolve the environment
            done = update['done']    #update stopping condition
            
            #update the data from this episode
            states.append(update['state'])    #record the new state
            actions.append(a)    #record the new action
            rewards.append(update['reward'])    #record the rewards
            gamma_array.append(gamma_array[-1]*agent_config['gamma'])    #append another discount

        states = list(np.array(states).astype(np.float32))    #convert states to correct datatype for torch operations
        discounted_rewards = rewards*(agent_config['gamma']**np.array(range(len(rewards))))    #discount the reward history
        causal_return = np.cumsum((discounted_rewards)[::-1])[::-1]    #compute the causal return
        causal_return = torch.tensor(list(causal_return))    #turn into a torch tensor
        if agent.config['baseline']:    #if we'd like the agent to use baselining...
            baseline += sum(discounted_rewards)    #update the baseline with info from this episode
        batch.append({'states':states
                      ,'actions':actions
                      ,'rewards':rewards
                      ,'total_return':sum(discounted_rewards)
                      ,'causal_return':causal_return})    #add data from this episode to the batch
        performance['stopping_state'].append(proc.state)
        performance['terminal_reward'].append(update['reward'])
        proc.reset()    #reset the environment for the next go
        
    for j in range(agent.config['N']):    #once the batch is made loop over episodes
        batch[j]['baseline'] = baseline/agent.config['N']    #add the baseline to each one
    agent.update_pi(batch)    #run the gradient update\n"


In [90]:
env_config = {
    'player_count': 4,
    'blinds': (2, 4),
    'min_bet': 4,
    'starting_stack': 100
}
# Define the players in the game with Agent objects
players: List[Agent] = [ExampleRandomAgent(f'Player {_}', env_config['starting_stack']) for _ in range(env_config['player_count'])]
# Randomly select a player to be the dealer
player_offset = np.random.randint(0, len(players))
# Create the environment with the configuration
env = TexasHoldemEnvironment(env_config)
# Number of games (episodes) to play
num_games = 1000

for episode in tqdm(range(num_games)):
    # Reset all players and environment
    active_players = [agent.reset() for agent in players]
    env.reset_game()
    env.reset_round(active_players)
    num_rounds = 0   # Number of rounds played in the game

    # Play rounds until there is only one player left or 100 rounds have been played
    while len(active_players) > 1:
        done = False
        # Calculate amount to offset players to get a new dealer
        player_offset = (player_offset + 1) % len(active_players)
        # Rotate players based on offset
        active_players = active_players[player_offset:] + active_players[:player_offset]
        # Reset the environment for a round of poker
        env.reset_round(active_players)

        # Play the round until the round is over
        while not done:
            # Get the current acting player and their action
            current_player = active_players[env.poker_round.actor_index]
            action = current_player.get_action(env.poker_round)
            # Step the environment with the player's action
            current_state = env.step(action)
            # Update the player's observations, actions, and rewards
            done = current_state['done']

        # Update stacks for each player
        for agent, stack in zip(active_players, env.stacks):
            agent.stack = stack

        # Update active players in the game by removing players with 0 stack
        active_players = [agent for agent in players if agent.stack > env_config['min_bet']]
        num_rounds += 1

100%|██████████| 1000/1000 [00:29<00:00, 33.87it/s]
