### Counterfactual Regret Minimization (CFR) and its application to Kuhn Poker


Source consulted: https://modelai.gettysburg.edu/2013/cfr/cfr.pdf


Kuhn Poker is a simple 3-card poker game created by Harold E. Kuhn. Two players each bet 1 chip blind into the pot before the deal. Three cards (usually K, Q, and J) are suffled, and one card is dealt to each player and held as private information in the original Kuhn Poker game. We implement the game as described in the above paper with additional change as inspired by the [ReBeL](https://arxiv.org/abs/2007.13544) paper. Players do not have any private information -- a referee observes the players' cards and makes decision based on the players' strategy which they announce in the beginning of the game. The players then update their belief based on how the other player's play and simultaneously update their strategy. Further information can be found on the paper.  

Below are the possible actions and payoffs in Kuhn Poker

![Payoffs in Kuhn Poker](payoff.PNG)

In [1]:
## source: https://ai.plainenglish.io/building-a-poker-ai-part-6-beating-kuhn-poker-with-cfr-using-python-1b4172a6ab2d

from typing import List, Dict
import random
import numpy as np
import sys

Actions = ['B', 'C']  # bet/call vs check/fold

class InformationSet():
    def __init__(self):
        self.cumulative_regrets = np.zeros(shape=len(Actions))
        self.strategy_sum = np.zeros(shape=len(Actions))
        self.num_actions = len(Actions)

    def normalize(self, strategy: np.array) -> np.array:
        """Normalize a strategy. If there are no positive regrets,
        use a uniform random strategy"""
        if sum(strategy) > 0:
            strategy /= sum(strategy)
        else:
            strategy = np.array([1.0 / self.num_actions] * self.num_actions)
        return strategy

    def get_strategy(self, reach_probability: float) -> np.array:
        """Return regret-matching strategy"""
        strategy = np.maximum(0, self.cumulative_regrets)
        strategy = self.normalize(strategy)

        self.strategy_sum += reach_probability * strategy
        return strategy

    def get_average_strategy(self) -> np.array:
        return self.normalize(self.strategy_sum.copy())


class KuhnPoker():
    @staticmethod
    def is_terminal(history: str) -> bool:
        return history in ['BC', 'BB', 'CC', 'CBB', 'CBC']  # in cases where player 1 

    @staticmethod
    def get_payoff(history: str, cards: List[str]) -> int:
        """get payoff for 'active' player in terminal history"""
        if history in ['BC', 'CBC']:
            return +1
        else:  # CC or BB or CBB
            payoff = 2 if 'B' in history else 1
            active_player = len(history) % 2
            player_card = cards[active_player]
            opponent_card = cards[(active_player + 1) % 2]
            if player_card == 'K' or opponent_card == 'J':
                return payoff
            else:
                return -payoff


class KuhnCFRTrainer_NoRebel():
    def __init__(self):
        self.infoset_map: Dict[str, InformationSet] = {}
        self.current_average_strategy = np.array([0.5, 0.5]) # keep track of strategies, initialize with 50% bet, 50% pass

    def get_information_set(self, card_and_history: str) -> InformationSet:
        """add if needed and return"""
        if card_and_history not in self.infoset_map:
            self.infoset_map[card_and_history] = InformationSet()
        return self.infoset_map[card_and_history]

    def cfr(self, cards: List[str], history: str, reach_probabilities: np.array, active_player: int):
        if KuhnPoker.is_terminal(history):
            return KuhnPoker.get_payoff(history, cards)

        my_card = cards[active_player]
        info_set = self.get_information_set(my_card + history)

        strategy = info_set.get_strategy(reach_probabilities[active_player])
        
#         ####### CFR-AVG modification as per Rebel #############
#         strategy = (self.current_average_strategy + strategy)/2   # current strategy is not the last strategy, instead its 
#                                                                   # the current average strategy.
#         self.current_average_strategy = strategy
#         ########################################################
        opponent = (active_player + 1) % 2
        counterfactual_values = np.zeros(len(Actions))

        for ix, action in enumerate(Actions):
            action_probability = strategy[ix]

            # compute new reach probabilities after this action
            new_reach_probabilities = reach_probabilities.copy()
            new_reach_probabilities[active_player] *= action_probability

            # recursively call cfr method, next player to act is the opponent
            counterfactual_values[ix] = -self.cfr(cards, history + action, new_reach_probabilities, opponent)

        # Value of the current game state is just counterfactual values weighted by action probabilities
        node_value = counterfactual_values.dot(strategy)
        for ix, action in enumerate(Actions):
            
            info_set.cumulative_regrets[ix] += reach_probabilities[opponent] * (counterfactual_values[ix] - node_value)

        return node_value

    def train(self, num_iterations: int) -> int:
        util = 0
        kuhn_cards = ['J', 'Q', 'K']
        for _ in range(num_iterations):
            cards = random.sample(kuhn_cards, 2)
            history = ''
            reach_probabilities = np.ones(2)
            util += self.cfr(cards, history, reach_probabilities, 0)
        return util

## THINK IN TERMS OF REBEL

1. Players only have some belief over their current card.

2. Players share same belief as it is assumed that players actually "know the best policy". 

$$
\begin{aligned}
\text{belief} &= \big(P_1(J, K, Q), P_2(J, K, Q)\big)\;\;\;\text{given a probability distribution}
\end{aligned}
$$

3. The infostate (here, infoset) then becomes strings of belief + action 


4. History is just a string of cards (we omit legal actions because player can always pick either pass(call)/bet



### How to condition belief?  based on history ? 

-- If history is not empty:
                
                -- check last action : if bet --> likely had a better card. --> update belief 
--  If empty:
      
                -- use current belief (initial random??)
                

-------------------------------

**Try Implementing these below**

_______________________________


#### TO DOs:

~ Proper belief update
~ How to model signaling action
~ 

In [2]:
kuhn_cards = ['J', 'Q', 'K']
random.sample(kuhn_cards, 2)

probs = np.ones((2,3))/3

tostring = lambda x: kuhn_cards[x[0]] + kuhn_cards[x[1]]

cards = np.argmax(probs, axis = 1)
cards = tostring(cards)


## update belief



In [3]:
## source: https://ai.plainenglish.io/building-a-poker-ai-part-6-beating-kuhn-poker-with-cfr-using-python-1b4172a6ab2d

from typing import List, Dict
import random
import numpy as np
import sys

Actions = ['B', 'C']  # bet/call vs check/fold

class InformationSet():
    def __init__(self):
        self.cumulative_regrets = np.zeros(shape=len(Actions))
        self.strategy_sum = np.zeros(shape=len(Actions))
        self.num_actions = len(Actions)

    def normalize(self, strategy: np.array) -> np.array:
        """Normalize a strategy. If there are no positive regrets,
        use a uniform random strategy"""
        if sum(strategy) > 0:
            strategy /= sum(strategy)
        else:
            strategy = np.array([1.0 / self.num_actions] * self.num_actions)
        return strategy

    def get_strategy(self, reach_probability: float) -> np.array:
        """Return regret-matching strategy"""
        strategy = np.maximum(0, self.cumulative_regrets)
        strategy = self.normalize(strategy)

        self.strategy_sum += reach_probability * strategy
        return strategy

    def get_average_strategy(self) -> np.array:
        return self.normalize(self.strategy_sum.copy())


class KuhnPoker():
    @staticmethod
    def is_terminal(history: str) -> bool:
        return history in ['BC', 'BB', 'CC', 'CBB', 'CBC']  # in cases where player 1 

    @staticmethod
    def get_payoff(history: str, cards: List[str]) -> int:
        """get payoff for 'active' player in terminal history"""
        if history in ['BC', 'CBC']:
            return +1
        else:  # CC or BB or CBB
            payoff = 2 if 'B' in history else 1
            active_player = len(history) % 2
            player_card = cards[active_player]
            opponent_card = cards[(active_player + 1) % 2]
            if player_card == 'K' or opponent_card == 'J':
                return payoff
            else:
                return -payoff


class KuhnCFRTrainer():
    def __init__(self):
        self.infoset_map: Dict[str, InformationSet] = {}
        self.current_average_strategy = np.array([0.5, 0.5]) # keep track of strategies, initialize with 50% bet, 50% pass

    def get_information_set(self, card_and_history: str) -> InformationSet:
        """add if needed and return"""
        if card_and_history not in self.infoset_map:
            self.infoset_map[card_and_history] = InformationSet()
        return self.infoset_map[card_and_history]

    def cfr(self, cards: List[str], history: str, reach_probabilities: np.array, active_player: int):
        if KuhnPoker.is_terminal(history):
            return KuhnPoker.get_payoff(history, cards)
        
        opponent = (active_player + 1) % 2
        
        
        
        ### BELIEF UPDATE --- SIMPLE IMPLEMENTATION FOR CONCEPT DEMONSTRATION
        
        if not(history == ''):
            last_action = history[len(history)-1]
            
            if last_action == 'B':  # opponent likely has better card
                dist = [0.1, 0.3, 0.6]
                cards[opponent] = np.random.choice(kuhn_cards, p=dist)
                cards[active_player] = np.random.choice([c for c in kuhn_cards if c != cards[opponent]])
            elif last_action =='C': # opponent likely has bad card
                dist = [0.6, 0.3, 0.1]
                cards[opponent] = np.random.choice(kuhn_cards, p=dist)
                cards[active_player] = np.random.choice([c for c in kuhn_cards if c != cards[opponent]])

        my_card = cards[active_player]
        info_set = self.get_information_set(my_card + history)

        strategy = info_set.get_strategy(reach_probabilities[active_player])
        
        ####### CFR-AVG modification as per Rebel #############
        strategy = (self.current_average_strategy + strategy)/2   # current strategy is not the last strategy, instead its 
                                                                  # the current average strategy.
        self.current_average_strategy = strategy
        ########################################################
        #opponent = (active_player + 1) % 2
        counterfactual_values = np.zeros(len(Actions))

        for ix, action in enumerate(Actions):
            action_probability = strategy[ix]

            # compute new reach probabilities after this action
            new_reach_probabilities = reach_probabilities.copy()
            new_reach_probabilities[active_player] *= action_probability

            # recursively call cfr method, next player to act is the opponent
            counterfactual_values[ix] = -self.cfr(cards, history + action, new_reach_probabilities, opponent)

        # Value of the current game state is just counterfactual values weighted by action probabilities
        node_value = counterfactual_values.dot(strategy)
        for ix, action in enumerate(Actions):
            
            info_set.cumulative_regrets[ix] += reach_probabilities[opponent] * (counterfactual_values[ix] - node_value)

        return node_value

        for ix, action in enumerate(Actions):
            action_probability = strategy[ix]

            # compute new reach probabilities after this action
            new_reach_probabilities = reach_probabilities.copy()
            new_reach_probabilities[active_player] *= action_probability

            # recursively call cfr method, next player to act is the opponent
            counterfactual_values[ix] = -self.cfr(cards, history + action, new_reach_probabilities, opponent)

        # Value of the current game state is just counterfactual values weighted by action probabilities
        node_value = counterfactual_values.dot(strategy)
        for ix, action in enumerate(Actions):
            
            info_set.cumulative_regrets[ix] += reach_probabilities[opponent] * (counterfactual_values[ix] - node_value)

        return node_value
    
    
    def train(self, num_iterations: int) -> int:
        util = 0
        kuhn_cards = ['J', 'Q', 'K']
        actual = []
        beliefs = []
        
        for _ in range(num_iterations):
            actual_cards = random.sample(kuhn_cards, 2)
            actual.append(actual_cards) # keep track of cards
            
#             probs = np.random.rand(2,3)  # initial belief -- not quite correct - need to come up with something robust
#             belief = np.argmax(probs, axis = 1)
#             belief = tostring(belief)
#             beliefs.append(belief)
            
            i_dist = np.ones(len(kuhn_cards))/len(kuhn_cards)
            
            
            # cards cannot be the same
            same = True
            while same:
                belief = np.random.choice(kuhn_cards, size=2, p=i_dist)
                if belief[0] ==  belief[1]:
                    same = True
                else:
                    same = False
            
            history = ''
            reach_probabilities = np.ones(2)
            util += self.cfr(belief, history, reach_probabilities, 0)  # cfr w/belief
        
        return util

In [4]:
num_iterations = 1000
cfr_trainer_rebel = KuhnCFRTrainer()
cfr_trainer = KuhnCFRTrainer_NoRebel()
util_Rebel = cfr_trainer_rebel.train(num_iterations)
util_NoRebel = cfr_trainer.train(num_iterations)


print('------------CFR w/o ReBeL Implementation---------------------------------')
print(f"\nRunning Kuhn Poker chance sampling CFR for {num_iterations} iterations")
print(f"\nExpected average game value (for player 1): {(-1./18):.3f}")
print(f"Computed average game value               : {(util_NoRebel / num_iterations):.3f}\n")

print("We expect the bet frequency for a Jack to be between 0 and 1/3")
print("The bet frequency of a King should be three times the one for a Jack\n")

print(f"History  Bet  Pass")
for name, info_set in sorted(cfr_trainer.infoset_map.items(), key=lambda s: len(s[0])):
    print(f"{name:3}:    {info_set.get_average_strategy()}")
    
print("\n\n\n")
    
print('------------CFR w/ ReBeL Implementation---------------------------------')
print(f"\nComputed average game value               : {(util_Rebel / num_iterations):.3f}\n")

print("We expect the bet frequency for a Jack to be between 0 and 1/3")
print("The bet frequency of a King should be three times the one for a Jack\n")

print(f"History  Bet  Pass")
for name, info_set in sorted(cfr_trainer_rebel.infoset_map.items(), key=lambda s: len(s[0])):
    print(f"{name:3}:    {info_set.get_average_strategy()}")

------------CFR w/o ReBeL Implementation---------------------------------

Running Kuhn Poker chance sampling CFR for 1000 iterations

Expected average game value (for player 1): -0.056
Computed average game value               : -0.005

We expect the bet frequency for a Jack to be between 0 and 1/3
The bet frequency of a King should be three times the one for a Jack

History  Bet  Pass
J  :    [0.13443299 0.86556701]
Q  :    [0.08701768 0.91298232]
K  :    [0.83701956 0.16298044]
QB :    [0.33675289 0.66324711]
QC :    [0.00498534 0.99501466]
KB :    [0.99842767 0.00157233]
KC :    [0.99842767 0.00157233]
JB :    [0.00146628 0.99853372]
JC :    [0.3264208 0.6735792]
JCB:    [9.40807858e-04 9.99059192e-01]
QCB:    [0.59947144 0.40052856]
KCB:    [0.99569122 0.00430878]




------------CFR w/ ReBeL Implementation---------------------------------

Computed average game value               : 0.361

We expect the bet frequency for a Jack to be between 0 and 1/3
The bet frequency of a King 

In [None]:
# Test 

# generate initial belief
# update belief based on the history
# pick action from the strategy learned from the CFR


#Play cfr w/o rebel vs cfr w/ rebel

In [14]:
from random import shuffle

class Game:
    """ Contains the code for the game """
    def __init__(self):
        self.actions = ['B', 'C']
        self.cards = ['J', 'Q', 'K']
        self.history_map = {}
        self.player_one_moves = []
        self.player_two_moves = []
        self.initialize_new_game()
        self.players=[]
    
    def is_further_gameplay_allowed(self):
        historyString = ''.join(self.history)
        #print(historyString)
        return historyString in ['', 'C', 'B', 'CB']
    
    def initialize_new_game(self):
        self.history = []
        self.players = []

    def register_players(self, playerOne, playerTwo):
        self.players.append(playerOne)
        self.players.append(playerTwo)
  
    def draw_cards(self):
        d_cards = random.sample(self.cards, len(self.players))
    
        for count, value in enumerate(self.players):
            self.players[count].set_card(d_cards[count])

    def get_history_map(self):
        return self.history_map

    def distribute_payoffs(self):
        showdown_cases_p2 = ['BB', 'CBB']
        showdown_cases_p1 = ['CC']
    
        player_one_card = self.players[0].get_card()
        player_two_card = self.players[1].get_card()

        if (''.join(self.history) in showdown_cases_p2):
            if (player_one_card == 'K'):
                self.players[0].add_payoff(+2)
                self.players[1].add_payoff(-2)
            elif (player_one_card == 'Q'):
                if (player_two_card == 'K'):
                    self.players[0].add_payoff(-2)
                    self.players[1].add_payoff(+2)
                else:
                    self.players[0].add_payoff(+2)
                    self.players[1].add_payoff(-2)
                    
            else:
                self.players[0].add_payoff(-2)
                self.players[1].add_payoff(+2)
        elif(''.join(self.history) in showdown_cases_p1):
            if (player_one_card == 'K'):
                self.players[0].add_payoff(+1)
                self.players[1].add_payoff(-1)
            elif (player_one_card == 'Q'):
                if (player_two_card == 'K'):
                    self.players[0].add_payoff(-1)
                    self.players[1].add_payoff(+1)
                else:
                    self.players[0].add_payoff(+1)
                    self.players[1].add_payoff(-1)
            else:
                self.players[0].add_payoff(-1)
                self.players[1].add_payoff(+1)
        elif(''.join(self.history) in ['BC']):
            self.players[0].add_payoff(+1)
            self.players[1].add_payoff(-1)
        else:
            self.players[0].add_payoff(-1)
            self.players[1].add_payoff(+1)
      

    def start_game(self, playerOne, playerTwo, times = 1, shufflePlayersOnEveryRun = False):
        
        count = 0
        # Shuffle player position
        shuffle(self.players)

        # Run n-times
        while (count < times):
            print("---------Game-" + str(count+1) + "---------")
            # Re initialize game  -- clear history and players
            self.initialize_new_game()
            
            #register players
            self.register_players(playerOne, playerTwo) 
            
            # shuffle if required
            if (shufflePlayersOnEveryRun):
                shuffle(self.players)
            
            # All players draw cards
            self.draw_cards()

            # Get to the main game
            self.main_loop()

            self.distribute_payoffs()
            

            history = {}
            history['actions'] = self.history
            history['player1'] = self.players[0].name
            history['player2'] = self.players[1].name
            history['results'] = self.result()
            history['chip(s) won'] = np.max([player.getTotalPayoff() for player in self.players])
            self.history_map['Run #' + str(count+1)] = history
            
            print(self.result())
            count+=1


    def result(self):
    # Figure out what the result is
        
        if (''.join(self.history) in ['CBC']):
            #print(self.players)
            return self.players[1].name + " Won"
        elif (''.join(self.history) in ['BC']):
            return self.players[0].name + " Won"
        else:
            return self.players[0].name + " Won" if kuhn_cards.index(self.players[0].get_card()) > kuhn_cards.index(self.players[1].get_card()) else self.players[1].name + " Won" 

        
    def main_loop(self):
        while self.is_further_gameplay_allowed():
            #print(self.history)
            for player in range(len(self.players)):
                #print(''.join(self.history))
                if not(''.join(self.history) in ['BB', 'CBC', 'CBB']):
                    print("Player-" + str(player+1) + " is: " + self.players[player].name + " and has a: " + self.players[player].get_card())
                    action = self.players[player].play(self.history)
                    print(self.players[player].name + ' chooses: ' + action)
                    self.history.append(action)



class Player:
    def __init__(self, name, trainer, game):
        self.name = name
        self.trainer = trainer
        self.payoffs = []
        self.actions = game.actions


    def add_payoff(self, payoff):
        self.payoffs.append(payoff)

    def set_card(self, card):
        self.card = card

    def get_card(self):
        return self.card

    def card_is_higher_than(card):
        if (card == 'K'):
            return false

        if (card == 'Q'):
            return self.card == 'K'

            return self.card in ['K', 'Q']

    def getTotalPayoff(self):
        return np.sum(self.payoffs)

    def play(self, history):
        action = np.random.choice(self.actions, p = self.trainer
                                .infoset_map[str(self.card + ''.join(history))].get_average_strategy())
        return action

In [38]:
# PLAY THE GAME


# playerOne = Player('Op', cfr_trainer, game)
# playerTwo = Player('rebel', cfr_trainer_rebel, game)

game = Game()
playerOne = Player('Op', cfr_trainer, game)
playerTwo = Player('rebel', cfr_trainer_rebel, game)
#game.register_players(playerOne, playerTwo)

game.start_game(playerOne, playerTwo, times = 100, shufflePlayersOnEveryRun = True)

#print(game.get_history_map()['Run #1'])

#print(playerOne.name + "won: " +str(playerOne.getTotalPayoff()))
#print(playerTwo.name+ "won: " + str(playerTwo.getTotalPayoff()))

---------Game-1---------
Player-1 is: Op and has a: Q
Op chooses: C
Player-2 is: rebel and has a: J
rebel chooses: B
Player-1 is: Op and has a: Q
Op chooses: C
rebel Won
---------Game-2---------
Player-1 is: Op and has a: K
Op chooses: B
Player-2 is: rebel and has a: J
rebel chooses: C
Op Won
---------Game-3---------
Player-1 is: rebel and has a: Q
rebel chooses: B
Player-2 is: Op and has a: J
Op chooses: C
rebel Won
---------Game-4---------
Player-1 is: rebel and has a: Q
rebel chooses: B
Player-2 is: Op and has a: J
Op chooses: C
rebel Won
---------Game-5---------
Player-1 is: rebel and has a: K
rebel chooses: B
Player-2 is: Op and has a: Q
Op chooses: C
rebel Won
---------Game-6---------
Player-1 is: Op and has a: Q
Op chooses: C
Player-2 is: rebel and has a: K
rebel chooses: B
Player-1 is: Op and has a: Q
Op chooses: B
rebel Won
---------Game-7---------
Player-1 is: rebel and has a: J
rebel chooses: B
Player-2 is: Op and has a: K
Op chooses: B
Op Won
---------Game-8---------
Player

In [39]:
rebel_won = [i for i in range(len(game.get_history_map())) if game.get_history_map()['Run #'+str(i+1)]['results'] == 'rebel Won']
print("Rebel Won "+ str(len(rebel_won)) + " times.")

one_rebel = [i for i in range(len(game.get_history_map())) if game.get_history_map()['Run #' + str(i+1)]['player1'] == 'rebel']
print("Rebel was player-1 " + str(len(one_rebel)) + " times.")
print("Player-1 is at significant disadvantage in Kuhn Poker.")
print("Rebel won total of %d chips" %playerTwo.getTotalPayoff())

Rebel Won 54 times.
Rebel was player-1 51 times.
Player-1 is at significant disadvantage in Kuhn Poker.
Rebel won total of -10 chips


In [45]:
playerTwo.payoffs[7]

-2