# Counterfactual Regret Minimization with Kuhn Poker

Nel Kuhn Poker si sfidano due giocatori con un mazzo composto da tre carte, in ordine di importanza: K (la più alta), Q e J. 

Ciascun giocatore riceve una carta e posiziona nel piatto una puntata iniziale obbligatoria di 1 chip, detta "ante". 

Successivamente, si svolge un unico turno di puntate durante il quale ogni giocatore ha la possibilità di fare "bet" (aggiungendo un ulteriore chip al piatto) o "check" (non aggiungendo ulteriori chip al piatto).

In [1]:
import numpy as np

N_ACTIONS = 2 # 0 = check, 1 = bet
N_CARDS = 3 # 0 = jack, 1 = queen, 2 = king
  

Rappresentiamo gli Information Set con la classe InformationSet. 


Alla fine di ogni iterazione, il metodo self.next_strategy() aggiorna self.strategy_sum, self.strategy e reimposta self.reach_pr. 

La prossima strategia viene generata nel metodo calc_strategy(). Intuitivamente, calc_strategy() sceglie una strategia proporzionale agli elementi positivi di self.regret_sum. Qualsiasi azione con un regret negativo viene ignorata nella prossima strategia. Se la somma di tutti gli elementi di self.regret_sum è non positiva, allora la prossima strategia consiste nel scegliere casualmente qualsiasi azione in modo uniforme.

In [2]:
class InformationSet():
    """
    Classe che rappresenta un information set.
    """
    def __init__(self, key):
        # key = history + carta del giocatore
        self.key = key
        # array contentente la somma dei counterfactual regrets per ogni azione in tutte le visite dell'information set. 
        # Check = index 0, Bet = index 1
        self.regret_sum = np.zeros(N_ACTIONS) 
        # array contentente la somma della strategie di ogni visita moltiplicata per la reach probability del player corrente
        self.strategy_sum = np.zeros(N_ACTIONS) 
        self.strategy = np.repeat(1/N_ACTIONS, N_ACTIONS) # array contentente la strategia corrente (inizializzata a uniforme)
        self.reach_pr = 0 # reach probability del player corrente
        self.reach_pr_sum = 0

    def next_strategy(self):
        """
        Aggiorna la strategia corrente e la somma delle strategie.
        """
        # aggiorna la somma delle strategie
        self.strategy_sum += self.reach_pr * self.strategy
        # calcola la nuova strategia con regret matching
        self.strategy = self.calc_strategy()
        # resetta la reach probability corrente e aggiorna la somma delle reach probability
        self.reach_pr_sum += self.reach_pr
        self.reach_pr = 0 

    def calc_strategy(self):
        """
        Calcola la strategia corrente a partire dai counterfactual regrets.
        Sceglie una strategia proporzionale ai agli elementi positivi del regret_sum facendo il Regret Matching.
        """
        
        strategy = np.where(self.regret_sum > 0, self.regret_sum, 0) # prende solo i regret positivi

        total = sum(strategy) # somma dei regret positivi
        if total > 0: 
            strategy = strategy / total # normalizza i regret positivi
        else:
            n = N_ACTIONS
            strategy = np.repeat(1/n, n) # se non ci sono regret positivi, sceglie una strategia uniforme

        return strategy

    def get_average_strategy(self):
        """
        Calcola la strategia media su tutte le iterazioni. Questa è la strategia di Nash.
        """
        strategy = self.strategy_sum / self.reach_pr_sum
        # Rimuove le strategie trascuraibili
        strategy = np.where(strategy < 0.001, 0, strategy)
        # Rinormalizza
        total = sum(strategy)
        strategy /= total

        return strategy


    def __str__(self):
        strategies = ['{:03.2f}'.format(x)
                      for x in self.get_average_strategy()]
        return '{} {}'.format(self.key.ljust(6), strategies)

In [3]:
def card_str(card):
    """
    Ritorna una string representation delle carte.
    """
    if card == 0:
        return "J"
    elif card == 1:
        return "Q"
    return "K"


def get_info_set(i_map, card, history):
    """
    Ritorna l'information set associato alla carta e alla history.
    Se non esiste, allora lo crea e lo aggiunge al dizionario.
    """
    # La chiave è composta da una stringa che rappresenta la carta e la history
    key = card_str(card) + " " + history
    info_set = None
    
    # Se la chiave non è presente nel dizionario, allora creo un nuovo information set
    if key not in i_map:
        # Creo un'istanza della classe InformationSet
        info_set = InformationSet(key)
        # Aggiungo il nuovo information set al dizionario
        i_map[key] = info_set
        return info_set
    
    # Altrimenti ritorno l'information set associato alla chiave
    return i_map[key]

La funzione cfr() esegue una visita in profondità ricorsiva lungo l'albero del gioco attraversandolo interamente e restituisce l'expected value di quella iterazione. 

A seconda della tipologia di nodo (history) data in ingresso svolge attività differenti. 

In [4]:

def cfr(i_map, history="", card_1=-1, card_2=-1, pr_1=1, pr_2=1, pr_c=1):
    """
    Counterfactual regret minimization algorithm.

    Parameteri
    ----------
    i_map: dizionario degli information set
    history : [{'r', 'c', 'b'}], stringa che rappresenta il path preso nel game tree
        'r': random chance action
        'c': check action
        'b': bet action
    card_1 : carta del player 1
    card_2 : carta del player 2
    pr_1 : Probabilità che il player 1 arrivi a history
    pr_2 : Probabilità che il player 2 arrivi a history
    pr_c: Contributo di probabilità del chance node per raggiungere history

    """
    
    # Se l'history è un chance node, allora ritorna il valore atteso 
    if is_chance_node(history):
        return chance_util(i_map)
    
    # Se l'history è un terminal node, allora ritorna la terminal utility di questa combinazione di carte
    if is_terminal(history):
        return terminal_util(history, card_1, card_2)

    
    # --------------Se arrivo qui, allora l'history è un decision node----------------
    
    # Calcolo il numero di azioni che sono state prese fino ad ora
    n = len(history)    
    # Se n è pari, allora è il turno del player 1, altrimenti è il turno del player 2
    is_player_1 = n % 2 == 0
    # Deduco l'infromation set della history del player corrente
    info_set = get_info_set(i_map, card_1 if is_player_1 else card_2, history)
    # Deduco la strategia corrente del player corrente,calcolata all'iterazione precedente
    strategy = info_set.strategy
    # Aggiungo la reach probability del player corrente all'information set
    if is_player_1:
        info_set.reach_pr += pr_1 # 
    else:
        info_set.reach_pr += pr_2

    # array delle Counterfactual utility per azione
    action_utils = np.zeros(N_ACTIONS)

    # Chiamo ricorsivamente cfr per ogni azione possibile  
    for i, action in enumerate(["c", "b"]):
        # genero il prossimo nodo dell'history
        next_history = history + action
        # ogni player sceglie l'action a con probabilità strategy[i] che deve essere moltipliata per la reach probability del player corrente
        if is_player_1:
            # moltiplico per -1 perchè cfr ritorna l'utility per il prossimo turno
            action_utils[i] = -1 * cfr(i_map= i_map, history= next_history,
                                       card_1= card_1, card_2= card_2,
                                       pr_1= pr_1 * strategy[i], pr_2= pr_2, pr_c= pr_c) 
        else:
            action_utils[i] = -1 * cfr(i_map = i_map, history= next_history,
                                       card_1= card_1,card_2= card_2,
                                       pr_1= pr_1,pr_2= pr_2 * strategy[i],pr_c= pr_c)

    # Calcolo gli expected utility su tutte le azioni
    expected_value = sum(action_utils * strategy) 
    # calcolo i counterfactual regrets
    regrets = action_utils - expected_value 
    # Aggiorno i counterfactual regrets dell'information set
    if is_player_1:
        info_set.regret_sum += pr_2 * pr_c * regrets
    else:
        info_set.regret_sum += pr_1 * pr_c * regrets
    
    return expected_value

La funzione is_chance_node() determina se ci troviamo in un nodo di chance controllando se la history è vuota.

Se restituisce true, allora chance_util() enumera tutte le 6 possibili combinazioni di mani generate dal chance node. Per ogni possibilità, richiamiamo in modo ricorsivo cfr() considerando che nessun giocatore ha preso alcuna azione, quindi le loro reach probability sono entrambe 1. Ogni combinazione di mani ha una probabilità uniforme. LA funzione ritorna il valore atteso su tutte le combinazioni.

In [5]:
def is_chance_node(history):
    return history == "" # return True if history == "" else False

# se sono in un chance node 
def chance_util(i_map):
    expected_value = 0
    n_possibilities = 6
    # 6 possibili combinazioni di carte
    for i in range(N_CARDS):
        for j in range(N_CARDS):
            if i != j:
                # ottengo la somma dell'ev di ogni combinazione
                expected_value += cfr(i_map= i_map,
                                      history= "rr",
                                      card_1= i,
                                      card_2= j,
                                      pr_1= 1, # probabilità che il giocatore 1 arrivi a quel nodo è 1 perchè ancora non ha giocato
                                      pr_2= 1, # probabilità che il giocatore 2 arrivi a quel nodo è 1 perchè ancora non ha giocato
                                      pr_c= 1/n_possibilities) # probabilità che il chance node arrivi a quel nodo è 1/6 perchè è uan distribuzione uniforme
                
    return expected_value/n_possibilities # ritorno la media degli ev

La funzione is_terminal() verifica se la history è presente nell'insieme delle history possibili dei terminal nodes. Se True, terminal_util() restituisce l'utility del player corrente. 

Poiché i giocatori si alternano, il player corrente è 1 se il numero di azioni nella history è pari, altrimenti è il 2. Per calcolare l'utility di una terminal history, ci sono tre casi:

- Se l'ultimo giocatore ha fatto "check"(fold), il player corrente vince 1 chip.
- Se entrambi i giocatori fanno "check" durante il round di puntate, si arriva allo showdown e il giocatore con la carta più alta vince 1 chip.
- I giocatori arrivano a showdown con un piatto di 4 chip. Il giocatore con la carta più alta vince 2 chip.

In [6]:
def is_terminal(history):
    """
    Ritorna True se l`history` è una terminal history.
    """    
    # 5 possibili terminal history
    possibilities = {"rrcc": True, # showdown con check e check
                     "rrbb": True, # showdown con bet e bet(call)                  
                     "rrcbb": True, # showdown con check, bet e bet(call)
                     "rrbc": True, # non showdown con bet e check(fold)
                     "rrcbc": True, # non showdown con check, bet e check(fold)
                     }
    return history in possibilities


def terminal_util(history, card_1, card_2):
    """
    Ritorna l'utility di una terminal history per il player corrente. 
    Siccome i players is alternano ad ogni turno, se il numero di azioni è pari allora
    il player corrente è il player 1 altrimenti è il player 2 
    """
    n = len(history) # numero di azioni fatte
    # se il numero di azioni è pari allora è il player 1 altrimenti è il player 2
    card_player = card_1 if n % 2 == 0 else card_2 
    card_opponent = card_2 if n % 2 == 0 else card_1

    # No showdown
    if history == "rrcbc" or history == "rrbc":
        # Opponent ha foldato, il player corrente vince 1
        return 1
    # Showdown senza bets
    elif history == "rrcc":
        # Chi ha la carta più alta vince 1
        return 1 if card_player > card_opponent else -1

    # Showdown con bet e call
    if history == "rrcbb" or history == "rrbb":
        # Chi ha la carta più alta vince 2
        return 2 if card_player > card_opponent else -2

In [7]:
def display_results(ev, i_map):
    print('player 1 expected value: {}'.format(ev))
    print('player 2 expected value: {}'.format(-1 * ev))

    print()
    print('player 1 strategies:')
    print(f"History  Check    Bet")
    sorted_items = sorted(i_map.items(), key=lambda x: x[0])
    for _, v in filter(lambda x: len(x[0]) % 2 == 0, sorted_items):
        print(v)
    print()
    print('player 2 strategies:')
    print(f"History  Check    Bet")
    for _, v in filter(lambda x: len(x[0]) % 2 == 1, sorted_items):
        print(v)
        

Ogni iterazione di cfr() restituisce il valore atteso del gioco per quella specifica iterazione. Siamo in grado di approssimare il vero valore del gioco facendo la media su tutte le iterazioni. Il vero valore del gioco rappresenta l'importo atteso che un giocatore vincerà seguendo l' average strategy.

In [8]:

i_map = {}   # dizionario degli information set
n_iterations = 10000 # numero di iterazioni
expected_game_value = 0

for _ in range(n_iterations): # run CFR algorithm n_iterations times
    # update game value
    expected_game_value += cfr(i_map) # update game value
    # update strategies per ogni information set
    for _, infoset in i_map.items():
        infoset.next_strategy() 

expected_game_value /= n_iterations

display_results(expected_game_value, i_map) 

player 1 expected value: -0.05666979368427769
player 2 expected value: 0.05666979368427769

player 1 strategies:
History  Check    Bet
J rr   ['0.79', '0.21']
J rrcb ['1.00', '0.00']
K rr   ['0.39', '0.61']
K rrcb ['0.00', '1.00']
Q rr   ['1.00', '0.00']
Q rrcb ['0.45', '0.55']

player 2 strategies:
History  Check    Bet
J rrb  ['1.00', '0.00']
J rrc  ['0.67', '0.33']
K rrb  ['0.00', '1.00']
K rrc  ['0.00', '1.00']
Q rrb  ['0.66', '0.34']
Q rrc  ['1.00', '0.00']


Il game value del Kuhn poker è di 1/18= 0.0555. Gia dopo 10000 iterazioni si nota la convergenza al game value. Al crescere del numero di iterazioni l'average total counterfactual regret converge verso 0 mentre l' avarage strategy converge al Nash equilibium.
