In [94]:
import os
import math
import random
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import collections
from tensorflow import keras


from tensorflow.keras import layers, losses
from tensorflow.keras.layers import Dense, Flatten, Reshape, LeakyReLU
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.models import load_model
from tensorflow.keras.optimizers import Adam, RMSprop
from datetime import datetime
import keras
import keras.callbacks
from keras.callbacks import TensorBoard

In [95]:
from pypokerengine.players import BasePokerPlayer
from pypokerengine.api.emulator import Emulator
from pypokerengine.utils.game_state_utils import restore_game_state
import handcomparator 


In [96]:
# Program settings
global DEBUG
DEBUG = False
global WATCH_GAME
WATCH_GAME = True

# Reinforcement learning active
global train_model
train_model = True

# Load saved model or create new
global save_model_to_file
global load_saved_model
global num_saves
global model_pathname
save_model_to_file = False
load_saved_model = False
num_saves = 0
model_pathname = 'pathname'

# Model info
global model_input_size
model_input_size = 21

In [97]:
# Deep RL constrants
gamma = 0.2 

# Step length taken to update the estimation of Q(S, A)
alpha = 1

# Greedy policy
# Probability of choosing any action at random (vs. action with highest Q value)
epsilon = 0.1

In [98]:
def create_model(input_shape):
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=input_shape),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(3, activation='softmax')  # Assuming simple output (fold, call, or raise)
    ])
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

In [99]:
# Helper methods
def flatten(x):
        if isinstance(x, collections.Iterable):
            return [a for i in x for a in flatten(i)]
        else:
            return [x]
        

In [100]:
class PokerBot(BasePokerPlayer):
    
    # TODO: Read some saved state of the model to allow reinforcement learning over time
    def __init__(self):
        # Initialize model
        if load_saved_model and os.path.isfile(model_pathname):
            load_model()
        else:
            self.model = create_model((model_input_size,))  # Input shape, adjust based on features
        # Save game states and actions in input output matrices for Q learning
        self.x = [] # States
        self.Y = [] # Actions
        self.Q = [] # Q Scores (chip rewards)
    
    # TODO: Look at Emulator implementation to make the model work with reinforcement learning
    # Make an action based on the model output
    def declare_action(self, valid_actions, hole_card, round_state):
        # Prepare feature vector based on the game state
        feature_vector = self._extract_features(hole_card, round_state)
        if DEBUG:
            print("input size: " + str(len(feature_vector)))
            print("input shape: " + str(feature_vector.shape))
        
        
        # Use the model to predict the action
        action_probs = self.model.predict(feature_vector).flatten()

        # If raising invalid move restrict valid moves to fold and call
        if valid_actions[2]["amount"]["max"] == -1:
            action_probs = action_probs[:2]

        if random.random() < epsilon:
            outcome = random.randint(0, len(action_probs)-1)
        else:
            outcome = np.argmax(action_probs)
        
        if DEBUG:
            print("action_probs length: " + str(len(action_probs)))
            print(action_probs)
            print("action argmax: " + str(outcome))
        
        action_info = valid_actions[outcome]
        action = action_info['action']
        if outcome == 2:
            # Scale raise to the confidence of the model
            amount = action_info['amount']['min'] + math.floor((action_info['amount']['max'] - action_info['amount']['min']) * action_probs[outcome])
            if DEBUG:
                print(valid_actions)
                print(str((action_info['amount']['max'] - action_info['amount']['min']) * action_probs[outcome]))
        else:
            amount = action_info['amount']
        
        # Update Q learning input, output (observation, action)
        # np.append(self.x, feature_vector, axis=1)
        # np.append(self.Y, [action, amount], axis=1)
        self.x.append(feature_vector)
        self.Y.append([action, amount])
        
        return action, amount
    
    # Setup Emulator object by registering game information
    def receive_game_start_message(self, game_info):
        return
        
        # Emulator skeleton code
        player_num = game_info['player_num']
        max_round = game_info['rule']['max_round']
        small_blind_amount = game_info['rule']['small_blind_amount']
        ante_amount = game_info['rule']['ante']
        blind_structure = game_info['rule']['blind_structure']
        
        self.emulator = Emulator()
        self.emulator.set_game_rule(player_num, max_round, small_blind_amount, ante_amount)
        self.emulator.set_blind_structure(blind_structure)
        
        # Register algorithm of each player which used in the simulation.
        for player_info in game_info['seats']['players']:
            self.emulator.register_player(player_info['uuid'], PokerBot())

    # Not neccesarily useful
    def receive_round_start_message(self, round_count, hole_card, seats):
        # Reset Round info for Q learning
        # NOTE: Only for debugging purposes. Optimal practices may vary
        self.x = []
        self.Y = []
        self.Q = []
        pass

    # Not neccesarily useful
    def receive_street_start_message(self, street, round_state):
        pass

    # Can incorporate player observation in model updated with each move
    def receive_game_update_message(self, new_action, round_state):
        pass
    
    # Update model with each round result
    def receive_round_result_message(self, winners, hand_info, round_state):
        # Calculate net chip gain from round
        if winners[0]['uuid'] == self.uuid:
            # Player won the round
            print("Player ", winners[0]['uuid'], " won the round")
            gain = 1
        else:
            gain = -1
        stake = 0                    
        streets = ['preflop', 'flop', 'turn', 'river']
        for street in streets:
            if street in round_state['action_histories']:
                for action in round_state['action_histories'][street]:
                    # print(action)
                    # if action['action'] == 'ANTE' or 'SMALLBLIND' or 'BIGBLIND' or 'FOLD':
                    #     # Do nothing for now
                    #     print(action['action'])
                    #     pass
                    # else:
                    #     stake += action['paid']
                    #     print("Q stake", stake)
                    #     self.Q.append(stake * gain)
                    if action['uuid'] == self.uuid:
                        if action['action'] == 'CALL' or action['action'] == 'RAISE':
                            stake += action['paid']
                            self.Q.append(stake * gain)
                        elif action['action'] == 'FOLD':
                            self.Q.append(0)
        
        if WATCH_GAME:
            print("Round actions: player " + self.uuid)
            print("Y: ", self.Y)
            print("Q: ", self.Q)
        
        # Update model
        if train_model:
            # Update model with round results
            pass
        if save_model_to_file:
            # Save model to file
            save_model()
        pass
    
    # Additional methods
    
    # Produce a feature vector of length 17
    def _extract_features(self, hole_card, round_state):
        
        #simulate hand against 10000 flops extracting hand strength estimate
        
        hand_strength = self._hand_strength_sim(hole_card, round_state['community_card'])

        # 8 Standard features
        standard_features = [
            round_state['round_count'],
            round_state['pot']['main']['amount'],
            sum([side_pot['amount'] for side_pot in round_state['pot']['side']]),
            round_state['dealer_btn'],
            round_state['small_blind_pos'],
            round_state['big_blind_pos'],
            round_state['small_blind_amount'],
            self._street_to_feature(round_state['street'])
        ]

        # 8 Action history features (2 {# raises, # calls} for each betting stage: preflop, flop, turn, river)
        action_history_features = self._aggregate_action_histories(round_state['action_histories'])

        # Combine all features into a single fixed-size feature vector of length 34
        # Flatten the list of lists
        features = flatten([hand_strength] + standard_features + action_history_features)
        features = np.array(features)
        features = features.reshape(1, -1)
        return features
    
    def _hand_strength_sim(self, cards, board):
        deck = Deck()
        deck.shuffle()
        my_hand = []
        board = []
        for c in cards:
            print(c)
            card = Card(c[0], c[-1])
            my_hand.append(card)
        for b in board:
            card = Card(b[0], b[-1])
            my_hand.append(card)
        deck.remove(my_hand+board)
        board_size = len(board)
        score = 0
        for i in range(1000):
            draw = random.sample(deck.get_allcards, 7-board_size)
            opp_hand = board + draw
            my_hand = my_hand + draw[2:]
            if handcomparator.find_winner(my_hand, opp_hand):
                score += 1
        return score/10000
    
    def _street_to_feature(self, street):
        # Convert street to a numerical feature
        streets = {'preflop': 1, 'flop': 2, 'turn': 3, 'river': 4, 'showdown': 5}
        return streets.get(street, 0)

    def _aggregate_action_histories(self, action_histories):
        '''
        # Aggregate action histories into a fixed-length vector
        # Example: Count the number of raises, calls, etc.
        raise_count = sum(1 for action in action_histories.get('preflop', []) if action['action'] == 'raise')
        call_count = sum(1 for action in action_histories.get('preflop', []) if action['action'] == 'call')
        # Add more aggregated features as needed
        # Ensure the length of this vector is fixed
        return [raise_count, call_count]
        '''
        
        # Initialize counts
        raise_count = [0, 0, 0, 0]  # Preflop, Flop, Turn, River
        call_count = [0, 0, 0, 0]
        fold_count = [0, 0, 0, 0]

        # Define rounds
        rounds = ['preflop', 'flop', 'turn', 'river']

        # Count actions in each round
        for i, round in enumerate(rounds):
            for action in action_histories.get(round, []):
                if action['action'] == 'raise':
                    raise_count[i] += 1
                elif action['action'] == 'call':
                    call_count[i] += 1
                elif action['action'] == 'fold':
                    fold_count[i] += 1

        # Flatten and return
        return raise_count + call_count + fold_count
    
    
    def save_model(self):
        self.model.save(model_pathname + '_V' + num_saves)
    
    def load_model(self):
        self.model = load_model(model_pathname)

In [101]:
from pypokerengine.api.game import setup_config, start_poker

# Declare game setup paramers
config = setup_config(max_round=10, initial_stack=100, small_blind_amount=5)
config.register_player(name="p1", algorithm=PokerBot())
config.register_player(name="p2", algorithm=PokerBot())
config.register_player(name="p3", algorithm=PokerBot())
game_result = start_poker(config, verbose=1)

Started the round 1
Street "preflop" started. (community card = [])
SJ


KeyError: 'J'