In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from pettingzoo.classic import texas_holdem_v4

env = texas_holdem_v4.env(render_mode="ansi")

In [2]:
class Pi(nn.Module):
    def __init__(self, input_size, output_size, hidden_size=10):
        super(Pi, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.softmax(x)
        return x

input_size = 1
output_size = 4

model = Pi(input_size, output_size)

In [3]:
class BasePlayer:
    def __init__(self, name):
        self.name = name
        self.wins = 0
        self.acum = 0

        self.current_reward = []

        self.n = 0
        self.avg = 0
    
    def reward(self, r):
        self.current_reward.append(r)
    
    def learn(self):
        self.acum += sum(self.current_reward)
        self.wins += int(sum(self.current_reward)>0)
        self.n += 1
        self.avg += (sum(self.current_reward) - self.avg) / self.n
        self.current_reward=[]

    def act(self, s, mask, best=False):
        if mask[0]: # Call
            return 0 
        if mask[3]: # Check
            return 3 
        if mask[2]: # Fold
            return 2 

class Player(BasePlayer):
    def __init__(self, name, pi=None, gamma=.9, learning_rate=1e-5):
        # self.pi = Pi(72+4, 4) # mask as input
        self.pi = (pi or Pi(72, 4))
        self.optimizer = optim.SGD(self.pi.parameters(), lr=learning_rate)

        self.learning_rate = learning_rate
        self.gamma = gamma

        self.current_mask = None
        self.current_data = []
        super().__init__(name)
    
    def learn(self):
        if len(self.current_data) > 0 and sum(self.current_reward) > 0:
            returns = torch.tensor([self.gamma**t * sum([self.gamma**(k-t) * r for k, r in enumerate(self.current_reward[t:])]) for t, _ in enumerate(self.current_reward)])
            
            policy_loss = [-log_prob * R for log_prob, R in zip(self.current_data, returns[:-1])]
            policy_loss = torch.cat(policy_loss).sum()
            self.optimizer.zero_grad()
            policy_loss.backward()
            self.optimizer.step()
        
        self.current_data=[]
        super().learn()

    def act(self, s, mask, best=False):
        state = torch.from_numpy(s).float().unsqueeze(0)
        self.current_mask = torch.from_numpy(mask).float().unsqueeze(0)
        
        # probs = self.pi(torch.cat((state, m), axis=1)) # mask as inputs
        masked = self.pi(state)*self.current_mask
        probs = masked/torch.sum(masked)
        
        if best:
            return torch.argmax(probs)
        else:
            m = torch.distributions.Categorical(probs)
            action = m.sample()
            self.current_data.append(m.log_prob(action))
            return action.item()
        

names = ['player_0', 'player_1'] 
agents = {name: Player(name) for name in names}
{name: agent.acum for name, agent in agents.items()}

{'player_0': 0, 'player_1': 0}

In [7]:
def generate_episode(players):
    s = env.reset(seed=42)
    winner = None
    for agent in env.agent_iter():
        player = players[agent]
        observation, r, termination, truncation, info = env.last()
        s = observation['observation']
        mask = observation["action_mask"]
        
        player.reward(r)
        if termination or truncation:
            a = None
        else:
            a = player.act(s, mask)
        env.step(a)

        if r>0:
            winner = player
    return winner
    
    


def reinforce(players, max_iter=1000):
    for i in range(max_iter):
        generate_episode(players)
        
        for player in players.values():
            player.learn()




# Treino

In [5]:
num_episodes = int(1e4)
learning_rate = 1e-5
gamma = 0.999

# agents = {name: Player(name, gamma, learning_rate) for name in ['player_0', 'player_1'] }
shared_pi = Pi(72, 4)
agents = {
    'player_0': Player('player_0', shared_pi, gamma, learning_rate),
    # 'player_1': Player('player_1', gamma, learning_rate)  
    'player_1': Player('player_1', shared_pi, gamma, learning_rate)  
}
# reinforce(agents, num_episodes)

In [6]:
num_episodes = int(1e5)
reinforce(agents, num_episodes)
{name: agent.wins for name, agent in agents.items()}

{'player_0': 4048, 'player_1': 5952}

# Trainado vs Basico

In [20]:
agents = {
    'player_0': Player('player_0', shared_pi, gamma, learning_rate),
    'player_1': BasePlayer('player_1')  
}

reinforce(agents, 1)
{name: agent.wins for name, agent in agents.items()}

{'player_0': 0, 'player_1': 1}