In [1]:
from Quoridor_agent import *
from Quoridor_environment import *
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
from collections import deque
import copy
from tqdm import tqdm
import random

import matplotlib.pyplot as plt

import numpy as np
import time, datetime
import matplotlib.pyplot as plt


from pathlib import Path

In [2]:
WIN_REWARD = 100  # The reward gained by the winner of game

In [3]:
save_dir = Path("checkpoints") / datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
save_dir.mkdir(parents=True)

use_cuda = torch.cuda.is_available()
print(f"Using CUDA: {use_cuda}")
print()

Using CUDA: False



In [4]:
def create_env(render, n=5):
    '''
    Returns a new AI Gym environment for the Quoridor game

        Parameters:
            render (string): "human" to display the game
                             "rgb_array" otherwise
            n (int): the size of the board

        Returns:
            env: the AI Gym environment
            state_dim (int): the number of possible states
            action_dim (int): the number of possible actions
    '''
    env = QuoridorWorld(render_mode=render, grid_size=n, n_fences=3)
    state_dim = 2*n*n + 2
    action_dim = 4+2*n*n
    return env, state_dim, action_dim

In [5]:
def observation2state(obs, state_dim, n):
    '''
    Returns the environment state in the proper form

        Parameters:
            obs: the output of the environment step function
            state_dim (int): the number of possible states of the environment
            n (int): the size of the board
    '''
    a = np.zeros((1, state_dim))
    k = ij2k(obs["player_1"][0], obs["player_1"][1], n)
    a[0,k] = 1
    k = ij2k(obs["player_2"][0], obs["player_2"][1], n)
    a[0,n*n+k] = 1
    a[0,-2] = obs["fences_player_1"]
    a[0,-1] = obs["fences_player_2"]
    return a

In [6]:
def dummy_agent_V0(env):
    """
    Fully-random dummy agent. It acts and returns a bool to indicate if it won

        Parameters:
            env: the environment
    """
    _actions = [0,1,2,3]
    if env.r_fences[1]>0:
        f = env.fences
        for k, i in enumerate(f):
            if i == 0:
                _actions.append(4+k)
                pass
    flag = True
    while flag:
        if len(_actions) == 0:
            break
        act = random.choices(_actions)[0]
        state_, reward, done, flag, info = env.step([2,act])
        _actions.remove(act)
    return done

def dummy_agent_V1(env):
    """
    Stochastic dummy agent: tends to choose the good direction
    
        Parameters:
            env: the environment
    """
    _actions = [0,1,2,3]
    _weights = [4, 8, 50, 8]
    if env.r_fences[1]>0:
        f = env.fences
        for k, i in enumerate(f):
            if i == 0:
                _actions.append(4+k)
                _weights.append(1)
                pass
    flag = True
    while flag:
        if len(_actions) == 0:
            break
        act = random.choices(_actions, weights=_weights)[0]
        state_, reward, done, flag, info = env.step([2,act])
        l = _actions.index(act)
        _actions.remove(act)
        _weights.pop(l)
    return done

def agent_act(agent, env, player, state, state_dim, n):
    """
    The agent plays a step

        Parameters:
            agent: the playing agent
            env: the environment
            player (int): 1 if first (= comes from left) player
                          2 otherwise
            state: current state of the environment
            state_dim (int): the number of possible states of the environment
            n (int): the size of the board

        Returns:
            observation: next state of the environment (after the action of the agent)
            reward (int): the reward gained by the agent
            done (bool): True if the agent won
            flag (bool): True if the chosen action is illegal
            action (int): the chosen action
    """
    _acts = [0,1,2,3]
    if env.r_fences[player-1]>0:
        f = env.fences
        for k, i in enumerate(f):
            if i == 0:
                _acts.append(4+k)

    flag = True
    while flag and len(_acts) != 0:
        # Run agent on the state
        action = agent.act(state=state, actions = _acts)

        # Agent performs action
        next_state, reward, done, flag, info = env.step([player,action])
        _acts.remove(action)
    if done:
        reward = WIN_REWARD
    return (observation2state(next_state, state_dim, n), reward, done, flag, action)

In [7]:
def save_model(agent, name):
    """
    Saves the agent parameters into the 'name' file
    """
    torch.save({
        'online_state_dict': agent.net.online.state_dict(),
        'target_state_dict': agent.net.target.state_dict(),
        'optimizer_state_dict': agent.optimizer.state_dict(),
    }, name)
    return

def load_model(agent, name):
    """
    Returns the agent with weights coming from the 'name' file
    """
    cp = torch.load(name)
    agent.net.online.load_state_dict(cp['online_state_dict'])
    agent.net.target.load_state_dict(cp['target_state_dict'])
    agent.optimizer.load_state_dict(cp['optimizer_state_dict'])
    return agent

In [8]:
def agentVSdummy(env, agent, dummy, epochs, learn = True, save = None):
    """
    Plays Quoridor games between an agent (first = left player) and a dummy bot

    Parameters:
        env: the quoridor environment
        agent: the playing agent
        dummy: the dummy agent (V0 or V1)
        epochs (int): number of played games
        learn (bool): if True the agent will be trained
        save (string): the name of the file to save the best agent. If None no save will be made

    Returns:
        agent: the agent
        vic_rate: the victory rate of the agent
    """

    cnt_victory = 0
    n = env.grid_size
    state_dim = 2*n*n + 2
    action_dim = 4+2*n*n
    logger = MetricLogger(save_dir)
    best_reward = float('-inf')

    for _ in range(epochs):
        s, info = env.reset()
        state = observation2state(s, state_dim=state_dim, n=n)
        tot_rew = 0

        while True:

            next_state, reward, done, flag, action = agent_act(agent, env, 1, state, state_dim=state_dim, n=n)

            if not done:
                done_dummy = dummy(env)
            else: 
                cnt_victory+=1
            
            if learn:
                if done_dummy:
                    reward = -WIN_REWARD
                agent.cache(state=state, 
                          next_state=next_state, 
                          action=action, 
                          reward=reward,
                          done=done)
                q, loss = agent.learn()
                logger.log_step(reward=reward, loss=loss, q=q)

            state = next_state
            tot_rew += reward

            if done or done_dummy or flag:
                break

        if learn:
            logger.log_episode()

        if learn and _ % 10 == 0:
            logger.record(episode=_, epsilon=agent.exploration_rate, step=agent.curr_step)
        if save != None:
            mean_ep_reward = tot_rew/10
            if mean_ep_reward > best_reward:
                best_reward = mean_ep_reward
                save_model(agent, save)

    print("Victory rate = ", cnt_victory/epochs)
    env.close()
    return agent, cnt_victory/epochs


In [9]:
def agentVSagent(env, agents, epochs, learn = [True, True]):
    """
    Plays Quoridor games between an agent (first = left player) and a dummy bot

    Parameters:
        env: the quoridor environment
        agents: the playing agents
        dummy: the dummy agent (V0 or V1)
        epochs (int): number of played games
        learn list(bool): if True the agents will be trained

    Returns:
        agents: the agents
        victory_rate: victory rate of the first = left player
    """

    cnt_victory = 0
    n = env.grid_size
    state_dim = 2*n*n + 2
    action_dim = 4+2*n*n
    logger0 = MetricLogger(save_dir)
    logger1 = MetricLogger(save_dir)    

    for e in range(epochs):

        s, info = env.reset()
        state = observation2state(s, state_dim=state_dim, n=n)
        turn = 0

        while True:

            next_state, reward, done, flag, action = agent_act(agents[0], 
                                                            env, 
                                                            1, 
                                                            state, 
                                                            state_dim=state_dim, 
                                                            n=n)

            if not done:
                final_state, opp_reward, opp_done, opp_flag, opp_action = agent_act(agents[1], 
                                                                                env, 
                                                                                2, 
                                                                                next_state, 
                                                                                state_dim=state_dim, 
                                                                                n=n)

            else: 
                cnt_victory+=1
                opp_reward = -WIN_REWARD
            
            if opp_done:
                reward = -WIN_REWARD
            
            if learn[0]:
                agents[0].cache(state=state, 
                                next_state=next_state, 
                                action=action, 
                                reward=reward,
                                done=done)
                q, loss = agents[0].learn()
                logger0.log_step(reward=reward, loss=loss, q=q)

            if learn[1]:
                agents[1].cache(state=next_state, 
                                next_state=final_state, 
                                action=opp_action, 
                                reward=opp_reward,
                                done=opp_done)
                q, loss = agents[1].learn()
                logger1.log_step(reward=opp_reward, loss=loss, q=q)

            state = final_state

            if done or opp_done or flag:
                break
        
        if learn[0]:
            logger0.log_episode()
        if learn[1]:
            logger1.log_episode()

        if e % 10 == 0:
            if learn[0]:
                logger0.record(episode=e, epsilon=agents[0].exploration_rate, step=agents[0].curr_step)
            if learn[1]:
                logger1.record(episode=e, epsilon=agents[1].exploration_rate, step=agents[1].curr_step)

    env.close()
    print("Victory rate = ", cnt_victory/epochs)
    return agents, cnt_victory/epochs

In [11]:
# Run a simple test to see how it works:

test_env, state_dim, action_dim = create_env("human")   # Create the environment
test_agent = DQN2(state_dim, action_dim, save_dir)      # Create the agent

trained_agent = agentVSdummy(test_env,
             test_agent,
             dummy_agent_V1,
             10,
             True)

Episode 0 - Step 13 - Epsilon 0.9948124617142915 - Mean Reward -60.0 - Mean Length 10.0 - Mean Loss 0.0 - Mean Q Value 0.0 - Time Delta 5.669 - Time 2023-02-25T00:59:44
Victory rate =  0.0


(<agent_V1.DQN2 at 0x2817ca0cbb0>, 0.0)

<Figure size 640x480 with 0 Axes>