In [1]:
import random
import numpy as np
from pettingzoo.utils.env import AECEnv
from copy import deepcopy
import numpy as np
import ultimate_tictactoe_v1

In [2]:
# Game env wrapper for MCTS search
class State:

    def __init__(self, env : AECEnv):
        self.env = env

    def gameEnded(self):
        _, _, done, _ = self.env.last()
        return done

    def gameReward(self):
        _, reward, _, _ = self.env.last()
        return reward

    def getActionMask(self):
        observation, _, _, _ = self.env.last()
        return observation["action_mask"]

    def getValidActions(self):
        return np.flatnonzero(self.getActionMask())

    def nextState(self, action):
        new_env = deepcopy(self.env)
        new_env.step(action)
        player_changed = self.env.agent_selection != new_env.agent_selection
        return State(new_env), player_changed

    def getObservation(self):
        return self.env.observe(self.currentAgent())["observation"]

    def currentAgent(self):
        return self.env.agent_selection

    def show(self, wait=False):
        self.env.render()
        if wait:
            input("press any key to continue")


    def __eq__(self, x):
        if not isinstance(x, State):
            return False
        # this should be enough
        same_agent = self.env.agent_selection == x.env.agent_selection
        observations_match = (self.getObservation() == x.getObservation()).all()
        return same_agent and observations_match

    def toStr(self):
        o = self.getObservation()
        # reduce dimensions from 3 to 2
        o = np.sum(o, axis = 2) * (np.argmax(o, axis = 2) + 1)
        return str(o)

    def __hash__(self):
        return hash(self.toStr())

In [3]:
env = ultimate_tictactoe_v1.env()
env.reset()
env.render()

     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
_____|_____|_____||_____|_____|_____||_____|_____|_____
     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
_____|_____|_____||_____|_____|_____||_____|_____|_____
     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
=====|=====|=====||=====|=====|=====||=====|=====|=====
     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
_____|_____|_____||_____|_____|_____||_____|_____|_____
     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
_____|_____|_____||_____|_____|_____||_____|_____|_____
     |     |     ||     |     |     ||     |     |     
  -  |  -  |  -  ||  -  |  -  |  -  ||  -  |  -  |  -  
=====|=====|=====||=====|=====|=====||=====|====

In [4]:
state = State(env)
state.getObservation().shape

(9, 9, 2)

In [5]:
def competition(p1, p2, num_games):
    p1_win = 0
    p2_win = 0
    draw = 0
    games = 0
    while (games < num_games):
        env.reset()
        for agent in env.agent_iter():
            if "player_1" in env.rewards:
                if env.rewards["player_1"] > 0:
                    p1_win += 1
                elif env.rewards["player_1"] < 0:
                    p2_win += 1
            observation, reward, done, info = env.last()
            action = None
            if agent == "player_1":
                action = p1(observation, agent) if not done else None
            else:
                action = p2(observation, agent) if not done else None
            env.step(action)
            # env.render()
            # print("\n") # this visualizes a single game
        games += 1
    draw = games - (p1_win + p2_win)
    print(f"p1_win: {p1_win / num_games}")
    print(f"p2_win: {p2_win / num_games}")
    print(f"draws: {draw / num_games}")

In [6]:
def p_random(observation, agent):
    action = random.choice(np.flatnonzero(observation['action_mask']))
    return action

In [7]:
class TrainingExample:

    def __init__(self, state, pi, reward):
        self.state = state
        self.pi = pi
        self.reward = reward

In [17]:
from neutralnet import NNet, apply_actionmask_to_policy

nnet = NNet(81)
examples = [TrainingExample(state, np.full(81, 1.0 / 81), 1) for _ in range(32)]
nnet.train(examples)

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 9, 9, 2)]    0           []                               
                                                                                                  
 conv2d_4 (Conv2D)              (None, 7, 7, 36)     684         ['input_3[0][0]']                
                                                                                                  
 conv2d_5 (Conv2D)              (None, 5, 5, 36)     11700       ['conv2d_4[0][0]']               
                                                                                                  
 flatten_2 (Flatten)            (None, 900)          0           ['conv2d_5[0][0]']               
                                                                                            

<neutralnet.NNet at 0x7f9981970e80>

In [10]:
policy, value = nnet.predict(state)
print(policy.shape)
print(value)

(81,)
0.0029999004


In [11]:
class MCTSNode:

    def __init__ (self, p, q):
        """
        Parameters
        ----------
        p : policy in this state
        q : q value of this state
        """
        self.p = p
        self.q = q
        # n[a] : number of times and action has been performed from this state
        self.n = np.zeros(len(p))
        # q_a : q values of states following performing an action a
        self.q_a = np.zeros(len(p))


class MCTS:

    def __init__(self, nnet, num_mcts_sims, max_depth = 10):
        self.nnet = nnet
        self.nodes = {}
        self.c_puct = 1.0
        self.num_mcts_sims = num_mcts_sims
        self.max_depth = max_depth
        
    def search(self, s):
        for _ in range(self.num_mcts_sims):
            self._search(s, self.max_depth)

    def _search(self, s, max_depth):
        if s.gameEnded(): return s.gameReward()

        if s not in self.nodes:
            p, v = self.nnet.predict(s)
            self.nodes[s] = MCTSNode(p, v)
            return v

        node = self.nodes[s]

        if max_depth == 0:
            # max depth reached, returning a heuristic value of this state
            return node.q
      
        # upper confidence bound
        ucb = node.q_a + self.c_puct * node.p * np.sqrt(np.sum(node.n)) / (1 + node.n)
        ucb[s.getActionMask() == 0] = -np.inf
        # choose best action based on ucb
        a = np.argmax(ucb)
        
        sp, player_changed = s.nextState(a)
        v = self._search(sp, max_depth - 1)
        if player_changed:
            v = -v

        node.q_a[a] = (node.n[a] * node.q_a[a] + v) / (node.n[a] + 1)
        node.n[a] += 1
        return v

    # improved policy
    def pi(self, s : State):
        node = self.nodes[s]
        n_sum = np.sum(node.n)
        if n_sum == 0:
            return node.p

        return node.n / n_sum

In [12]:
mcts = MCTS(nnet, 2)
mcts.search(state)
mcts.pi(state).shape

(81,)

In [18]:
class RandomPlayer:
    
    def predict(self, state : State):
        p = np.random.uniform(81)
        return apply_actionmask_to_policy(p, state.getActionMask()), 0

In [13]:
def pit(new_nnet : NNet, nnet : NNet, games_played = 40):
    new_nnet_tag = "player_1"
    nnet_tag = "player_2"
    wins = 0
    ties = 0

    for g in range(games_played):
        env = ultimate_tictactoe_v1.env()
        env.reset()
        s = State(env)
        # swap players before each round
        new_nnet_tag, nnet_tag = nnet_tag, new_nnet_tag  
        agents = {new_nnet_tag : new_nnet, nnet_tag : nnet}

        while not s.gameEnded():
            agent = agents[s.currentAgent()]
            p, _ = agent.predict(s)
            action = np.random.choice(len(p), p=p)
            s.env.step(action)

        if s.gameReward() == 0:
            ties += 1
       
        if s.gameReward() == 1 and s.currentAgent() == new_nnet_tag:
            wins += 1
    
        if s.gameReward() == -1 and s.currentAgent() != new_nnet_tag:
            wins += 1
        
            
    frac_win = wins / (games_played - ties)
    return frac_win

# training
def policyIterSP(env : AECEnv, num_iters = 10, num_eps = 10,  num_mcts_sims=25, frac_win_thresh = 0.55):
    # hard coded action space size
    nnet = NNet(81)
    frac_win = pit(nnet, RandomPlayer())                              # compare new net with a random player
    print("frac_wins against a random player", frac_win)
    examples = []
    for i in range(num_iters):
        for e in range(num_eps):
            examples += executeSelfPlayEpisode(env, nnet, num_mcts_sims)    # collect examples from this game
            print("episode done")
        new_nnet = nnet.train(examples)
        frac_win = pit(new_nnet, nnet)                                # compare new net with previous net
        print("frac_win", frac_win)
        if frac_win > frac_win_thresh:
            print("new net is better!")
            nnet = new_nnet                                           # replace with new net
            frac_win = pit(nnet, RandomPlayer())                      # compare new net with a random player
            print("frac_wins against a random player", frac_win)
        examples = random.sample(examples, len(examples) // 2)        # discard half of the examples
    return nnet

def executeSelfPlayEpisode(env : AECEnv, nnet, num_mcts_sims = 3):
    examples = []
    env.reset()
    s = State(env)
    # s.show(wait = False)
    mcts = MCTS(nnet, num_mcts_sims)

    while True:
        mcts.search(s)
        pi = mcts.pi(s)
        examples.append(TrainingExample(deepcopy(s), pi, None))  # rewards can not be determined yet
        a = np.random.choice(len(pi), p=pi)                      # sample action from improved policy
        s, _ = s.nextState(a)
        # s.show(wait = False)
        if s.gameEnded():
            examples = assignRewards(examples, s.gameReward(), s.currentAgent())
            return examples

def assignRewards(examples, reward, player_w_reward):
    for e in examples:
        e.reward = reward if e.state.currentAgent() == player_w_reward else -reward

    return examples

In [19]:
env = ultimate_tictactoe_v1.env()
nnet = policyIterSP(env, num_iters=1, num_eps=1, num_mcts_sims=3)

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 9, 9, 2)]    0           []                               
                                                                                                  
 conv2d_6 (Conv2D)              (None, 7, 7, 36)     684         ['input_4[0][0]']                
                                                                                                  
 conv2d_7 (Conv2D)              (None, 5, 5, 36)     11700       ['conv2d_6[0][0]']               
                                                                                                  
 flatten_3 (Flatten)            (None, 900)          0           ['conv2d_7[0][0]']               
                                                                                            

ValueError: operands could not be broadcast together with shapes (256,) (81,) 