In [1]:
from player import Player
from dealer import Dealer
from game import TrucoGame
from actions import game_actions
from environment import TrucoEnvironment
from actions import game_actions, game_actions_list

In [2]:
num_players = 2

players = [Player(id) for id in range(num_players)]

p0 = players[0]
p1 = players[1]

env = TrucoEnvironment(players)

In [3]:
import torch
from torch import nn
from collections import deque
import itertools
import numpy as np
import random

In [4]:
GAMMA=0.99
BATCH_SIZE=32
BUFFER_SIZE=50000
MIN_REPLAY_SIZE=1000
EPSILON_START=1.0
EPSILON_END=0.02
EPSILON_DECAY=10000
TARGET_UPDATE_FREQ=1000
LEARNING_RATE = 5e-4

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
class DQNetwork(nn.Module):
    
    def __init__(self, state_space_dim, action_space_dim):
        super().__init__()
        
        self.net = nn.Sequential(
            nn.Linear(state_space_dim, 256),
            nn.Tanh(),
            nn.Linear(256, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, action_space_dim)
        )
        
    def forward(self, x):
        return self.net(x)
    
    def act(self, game_state_t):
        q_values = self(game_state_t.unsqueeze(0))
        
        return q_values.detach().squeeze()

In [6]:
replay_buffer = deque(maxlen=BUFFER_SIZE)
reward_buffer = deque([0.0], maxlen=100)

episode_reward = 0.0

In [7]:
def initialize_agent():
    # Initialize the NNs
    online_net = DQNetwork(
        state_space_dim=env.state_space_dim, 
        action_space_dim=env.action_space_dim
    ).to(device)

    target_net = DQNetwork(
        state_space_dim=env.state_space_dim, 
        action_space_dim=env.action_space_dim
    ).to(device)

    # Initialize both with the same weight
    target_net.load_state_dict(online_net.state_dict())
    
    # Initialize optimizer with online_net 
    optimizer = torch.optim.Adam(online_net.parameters(), lr=LEARNING_RATE)
    
    return online_net, target_net, optimizer

agents = [(p, initialize_agent()) for p in players]

In [8]:
# reset env
starting_player, legal_actions, game_state_t = env.reset()

# Initialize the ReplayBuffer
while len(replay_buffer) < MIN_REPLAY_SIZE:
    # pick legal action given uniform distribution
    action = np.random.choice(legal_actions, 1, [1/len(legal_actions) for i in legal_actions])
    action = game_actions_list.index(action)
    
    # Take action, observer outcome
    rew, done, next_player, next_legal_actions, new_game_state_t = env.step(starting_player, action)
    
    #Save transition for training later
    transition = (game_state_t, action, rew, done, new_game_state_t)
    replay_buffer.append(transition)
    
    starting_player = next_player
    legal_actions = next_legal_actions
    game_state_t = new_game_state_t
    
    if done:
        starting_player, legal_actions, game_state_t = env.reset()

len(replay_buffer)

1000

In [9]:
# reset env
starting_player, legal_actions, game_state_t = env.reset()

In [10]:


# Main training loop
for step in itertools.count():
    agent = [agent for player, agent in agents if player == starting_player]
    online_net, target_net, optimizer = agent[0][0], agent[0][1], agent[0][2]
    # Linearly decrease epsilon 
    epsilon = np.interp(step, [0, EPSILON_DECAY], [EPSILON_START, EPSILON_END])
    
    use_random = random.random() <= epsilon
    
    action = None
    if use_random:
        # pick legal action given uniform distribution
        action = np.random.choice(legal_actions, 1, [1/len(legal_actions) for i in legal_actions])
        action = game_actions_list.index(action)
    else: 
        # Compute Q-Values
        q_values = online_net.act(game_state_t)
        # Get index of best action
        action = np.argmax(q_values, axis=0)
        
    # Take action, observer outcome
    rew, done, next_player, next_legal_actions, new_game_state_t = env.step(starting_player, action)
    
    #Save transition for training later
    transition = (game_state_t, action, rew, done, new_game_state_t)
    replay_buffer.append(transition)
    
    starting_player = next_player
    legal_actions = next_legal_actions
    game_state_t = new_game_state_t
    
    episode_reward += rew
    
    if done:
        starting_player, legal_actions, game_state_t = env.reset()
        
        reward_buffer.append(episode_reward)
        episode_reward = 0
    
    # Start Gradient Step
    transitions = random.sample(replay_buffer, BATCH_SIZE)
    
    obs_t = np.vstack([t[0] for t in transitions])
    actions_t = np.asarray([t[1] for t in transitions], dtype=np.int64)
    rews_t = np.asarray([t[2] for t in transitions], dtype=np.float32)
    dones_t = np.asarray([t[3] for t in transitions], dtype=np.float32)
    new_obs_t = np.vstack([t[4] for t in transitions])
    
 
    obs_t = torch.as_tensor(obs_t).to(device)
    actions_t = torch.as_tensor(actions_t).unsqueeze(-1).to(device)
    rews_t = torch.as_tensor(rews_t).to(device)
    new_obs_t = torch.as_tensor(new_obs_t).to(device)
    dones_t = torch.as_tensor(dones_t).to(device)
    
    # Compute Targets
    target_q_values = target_net(new_obs_t)
    max_target_q_values = target_q_values.max(dim=1, keepdim=True)[0]
    
    targets = rews_t + GAMMA * (1 - dones_t) * max_target_q_values
    
    # Compute Loss
    q_values = online_net(obs_t)
    
    action_q_values = torch.gather(input=q_values, dim=1, index=actions_t)
    loss = nn.functional.smooth_l1_loss(action_q_values, targets)
    
    # Gradient Descent
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # Update Target Network
    if step % TARGET_UPDATE_FREQ == 0:
        target_net.load_state_dict(online_net.state_dict())
        
    #Logging
    if step % 1000 == 0:
        print(f"Step: {step} --- Avg reward: {np.mean(reward_buffer)}", end='\r')
    

  loss = nn.functional.smooth_l1_loss(action_q_values, targets)


Step: 1000 --- Avg reward: -1.56

KeyboardInterrupt: 