In [0]:
import datetime
import os
import gym
import numpy
import torch

from .abstract_game import AbstractGame

#MuZero Config

In [0]:
class MuZeroConfig:
    def __init__(self):
        self.seed = 0  # Seed for numpy, torch and the game
        ### Game

        #What are the observations of our TSP?
        self.observation_shape = (3, 3, 3)  # Dimensions of the game observation, must be 3D (channel, height, width). For a 1D array, please reshape it to (1, 1, length of array)
        #How many possible actions do we have
        self.action_space = [i for i in range(9)]  # Fixed list of all possible actions. You should only edit the length
        #Fixed to single player
        self.players = [i for i in range(1)]  # List of players. You should only edit the length
        self.stacked_observations = 0  # Number of previous observation and previous actions to add to the current observation



        ### Self-Play
        self.num_actors = 1  # Number of simultaneous threads self-playing to feed the replay buffer
        self.max_moves = 9  # Maximum number of moves if game is not finished before
        #To be changed later
        self.num_simulations = 25  # Number of future moves self-simulated
        self.discount = 1  # Chronological discount of the reward
        #what?
        self.temperature_threshold = 6  # Number of moves before dropping temperature to 0 (ie playing according to the max)

        # Root prior exploration noise
        self.root_dirichlet_alpha = 0.1
        self.root_exploration_fraction = 0.25

        # UCB formula
        self.pb_c_base = 19652
        self.pb_c_init = 1.25



        ### Network
        self.network = "resnet"  # "resnet" / "fullyconnected"
        self.support_size = 10  # Value and reward are scaled (with almost sqrt) and encoded on a vector with a range of -support_size to support_size

        # Residual Network
        self.downsample = False  # Downsample observations before representation network (See paper appendix Network Architecture)
        self.blocks = 1  # Number of blocks in the ResNet
        self.channels = 16  # Number of channels in the ResNet
        self.reduced_channels = 16  # Number of channels before heads of dynamic and prediction networks
        self.resnet_fc_reward_layers = [8]  # Define the hidden layers in the reward head of the dynamic network
        self.resnet_fc_value_layers = [8]  # Define the hidden layers in the value head of the prediction network
        self.resnet_fc_policy_layers = [8]  # Define the hidden layers in the policy head of the prediction network

        # Fully Connected Network
        self.encoding_size = 32
        self.fc_reward_layers = [16]  # Define the hidden layers in the reward network
        self.fc_value_layers = []  # Define the hidden layers in the value network
        self.fc_policy_layers = []  # Define the hidden layers in the policy network
        self.fc_representation_layers = []  # Define the hidden layers in the representation network
        self.fc_dynamics_layers = [16]  # Define the hidden layers in the dynamics network



        ### Training
        self.results_path = os.path.join(os.path.dirname(__file__), "../results", os.path.basename(__file__)[:-3], datetime.datetime.now().strftime("%Y-%m-%d--%H-%M-%S"))  # Path to store the model weights and TensorBoard logs
        self.training_steps = 100000  # Total number of training steps (ie weights update according to a batch)
        self.batch_size = 64  # Number of parts of games to train on at each training step
        self.checkpoint_interval = 10  # Number of training steps before using the model for sef-playing
        self.value_loss_weight = 0.25  # Scale the value loss to avoid overfitting of the value function, paper recommends 0.25 (See paper appendix Reanalyze)
        self.training_device = "cuda" if torch.cuda.is_available() else "cpu"  # Train on GPU if available

        self.optimizer = "Adam"  # "Adam" or "SGD". Paper uses SGD
        self.weight_decay = 1e-4  # L2 weights regularization
        self.momentum = 0.9  # Used only if optimizer is SGD

        # Exponential learning rate schedule
        self.lr_init = 0.01  # Initial learning rate
        self.lr_decay_rate = 1  # Set it to 1 to use a constant learning rate
        self.lr_decay_steps = 10000


        ### Replay Buffer
        self.window_size = 3000  # Number of self-play games to keep in the replay buffer
        self.num_unroll_steps = 20  # Number of game moves to keep for every batch element
        self.td_steps = 20  # Number of steps in the future to take into account for calculating the target value
        self.use_last_model_value = False  # Use the last model to provide a fresher, stable n-step value (See paper appendix Reanalyze)

        # Prioritized Replay (See paper appendix Training)
        self.PER = True  # Select in priority the elements in the replay buffer which are unexpected for the network
        self.use_max_priority = True  # Use the n-step TD error as initial priority. Better for large replay buffer
        self.PER_alpha = 0.5  # How much prioritization is used, 0 corresponding to the uniform case, paper suggests 1
        self.PER_beta = 1.0



        ### Adjust the self play / training ratio to avoid over/underfitting
        self.self_play_delay = 0  # Number of seconds to wait after each played game
        self.training_delay = 0  # Number of seconds to wait after each training step
        self.ratio = None  # Desired self played games per training step ratio. Equivalent to a synchronous version, training can take much longer. Set it to None to disable it


In [0]:

def visit_softmax_temperature_fn(self, trained_steps):
    """
    Parameter to alter the visit count distribution to ensure that the action selection becomes greedier as training progresses.
    The smaller it is, the more likely the best action (ie with the highest visit count) is chosen.

    Returns:
        Positive float.
    """
    return 1


In [1]:
from itertools import permutations 

class MuZeroTSP():
    """
    Game wrapper.
    """

    #I added this from our exercise
    def __init__(self, args):
        self.env = MuZeroTSP()
        self.num_node = args.num_node # number of nodes in the graph
        self.graph = np.random.rand(self.num_node, 2) # each index representing xy coordinates

    #I added this from our exercsie
    def getInitState(self):
        """
        Returns:
            first_state: a representation of the graph
            left column representing visited nodes
            right column will always have a single 1 and the rest are 0's. index with the 1 in the right column is current node
        """
        
        # Always start with first node as current node 
        first_state = np.zeros([self.num_node, 2])
        first_state[0,0] = 1
        first_state[0,1] = 1
        return first_state

    def getNextState(self, state, action):
        """
        Input:
            state: current state
            action: action taken by current player
        Returns:
            next_s: graph after applying action
            reward: reward from action
        """
        
        next_s = state.copy()
        # zero out current node
        next_s[:, 1] = 0
        # 1 in left column for visited, 1 in right column for current node
        next_s[action, :] = 1
        prev_a = np.where(state[:, 1] == 1)[0][0]
        # get xy coordinates for prev_node and current_node from the graph
        prev_node = self.graph[prev_a]
        current_node = self.graph[action]
        reward = 1 - np.linalg.norm(current_node - prev_node)
        if self.num_node == np.sum(next_s[:, 0]): #end of game
            reward += 1 - np.linalg.norm(current_node - self.graph[0])
            
        return next_s, reward

    #I added this from our exercsie
    def getValidMoves(self, state):
        """
        Input:
            state: current state
        Returns:
            list of valid moves, 1 for valid, 0 for invalid 
        """
        return 1 - state[:, 0]

    #I added this from our exercsie

    def getGameEnded(self, state):
        """
        Input:
            state: current state
        Returns:
            r: 0 if game has not ended. 1 if it has
               
        """
        end = 0
        if self.num_node == np.sum(state[:, 0]):
            end = 1
        return end

    def step(self, action):
        """
        Apply action to the game.
        
        Args:
            action : action of the action_space to take.

        Returns:
            The new observation, the reward and a boolean if the game has ended.
        """
        observation, reward, done = self.env.step(action)
        return observation, reward*20, done

    def to_play(self):
        """
        Return the current player.

        Returns:
            The current player, it should be an element of the players list in the config. 
        """
        return self.env.to_play()

    def legal_actions(self):
        """
        Should return the legal actions at each turn, if it is not available, it can return
        the whole action space. At each turn, the game have to be able to handle one of returned actions.
        
        For complex game where calculating legal moves is too long, the idea is to define the legal actions
        equal to the action space but to return a negative reward if the action is illegal.
    
        Returns:
            An array of integers, subset of the action space.
        """
        return self.env.legal_actions()

    def reset(self):
        """
        Reset the game for a new game.
        
        Returns:
            Initial observation of the game.
        """
        return self.env.reset()

    def close(self):
        """
        Properly close the game.
        """
        pass

    def render(self):
        """
        Display the game observation.
        """
        self.env.render()
        input("Press enter to take a step ")

    def encode_board(self):
        return self.env.encode_board()

    def human_to_action(self):
        """
        For multiplayer games, ask the user for a legal action
        and return the corresponding action number.

        Returns:
            An integer from the action space.
        """
        choice = input(
            "Enter the column to play for the player {}: ".format(self.to_play())
        )
        while choice not in [str(action) for action in self.legal_actions()]:
            choice = input("Enter another column : ")
        return int(choice)

    def action_to_string(self, action_number):
        """
        Convert an action number to a string representing the action.
        
        Args:
            action_number: an integer from the action space.

        Returns:
            String representing the action.
        """
        return "Play column {}".format(action_number)

    def optimal(self):
      current_reward = 0
      val = 100000
      path = []
      graph = self.graph
      for perm in list(permutations(np.arange(self.num_node)[1:])):
        current_reward = 0
        
        current_reward += np.linalg.norm(graph[0] - graph[perm[0]])
        for i in range(len(perm) - 1):
            j = perm[i]
            k = perm[i+1]
            current_reward += np.linalg.norm(graph[k] - graph[j])
        current_reward += np.linalg.norm(graph[perm[-1]] - graph[0])
        
        if val > current_reward:
            val = current_reward
            path = perm
            
      return val, path

ERROR! Session/line number was not unique in database. History logging moved to new session 59


#TicTacToe.py class

In [0]:
class TicTacToe:
    def __init__(self):
        self.board = numpy.zeros((3, 3)).astype(int)
        self.player = 1

    def to_play(self):
        return 0 if self.player == 1 else 1

    def reset(self):
        self.board = numpy.zeros((3, 3)).astype(int)
        self.player = 1
        return self.get_observation()

    def step(self, action):
        row = action // 3
        col = action % 3
        self.board[row, col] = self.player

        done = self.is_finished()

        reward = 1 if done and 0 < len(self.legal_actions()) else 0

        self.player *= -1

        return self.get_observation(), reward, done

    def get_observation(self):
        board_player1 = numpy.where(self.board == 1, 1.0, 0.0)
        board_player2 = numpy.where(self.board == -1, 1.0, 0.0)
        board_to_play = numpy.full((3, 3), self.player).astype(float)
        return numpy.array([board_player1, board_player2, board_to_play])

    def legal_actions(self):
        legal = []
        for i in range(9):
            row = i // 3
            col = i % 3
            if self.board[row, col] == 0:
                legal.append(i)
        return legal

    def is_finished(self):
        # Horizontal and vertical checks
        for i in range(3):
            if (self.board[i, :] == self.player * numpy.ones(3).astype(int)).all():
                return True
            if (self.board[:, i] == self.player * numpy.ones(3).astype(int)).all():
                return True

        # Diagonal checks
        if (
            self.board[0, 0] == self.player
            and self.board[1, 1] == self.player
            and self.board[2, 2] == self.player
        ):
            return True
        if (
            self.board[2, 0] == self.player
            and self.board[1, 1] == self.player
            and self.board[0, 2] == self.player
        ):
            return True

        # No legal actions means a draw
        if len(self.legal_actions()) == 0:
            return True

        return False

In [0]:
def render(self):
    print(self.board[::-1])

#MuZero.Py Class

In [0]:
import copy
import importlib
import os
import time

import numpy
import ray
import torch
from torch.utils.tensorboard import SummaryWriter

import models
import replay_buffer
import self_play
import shared_storage
import trainer


class MuZero:
    """
    Main class to manage MuZero.

    Args:
        game_name (str): Name of the game module, it should match the name of a .py file
        in the "./games" directory.

    Example:
        >>> muzero = MuZero("cartpole")
        >>> muzero.train()
        >>> muzero.test()
    """

    def __init__(self, game_name):
        self.game_name = game_name

        # Load the game and the config from the module with the game name
        try:
            game_module = importlib.import_module("games." + self.game_name)
            self.config = game_module.MuZeroConfig()
            self.Game = game_module.Game
        except Exception as err:
            print(
                '{} is not a supported game name, try "cartpole" or refer to the documentation for adding a new game.'.format(
                    self.game_name
                )
            )
            raise err

        # Fix random generator seed
        numpy.random.seed(self.config.seed)
        torch.manual_seed(self.config.seed)

        # Weights used to initialize components
        self.muzero_weights = models.MuZeroNetwork(self.config).get_weights()

    def train(self):
        ray.init()
        os.makedirs(self.config.results_path, exist_ok=True)
        writer = SummaryWriter(self.config.results_path)

        # Initialize workers
        training_worker = trainer.Trainer.options(
            num_gpus=1 if "cuda" in self.config.training_device else 0
        ).remote(copy.deepcopy(self.muzero_weights), self.config)
        shared_storage_worker = shared_storage.SharedStorage.remote(
            copy.deepcopy(self.muzero_weights), self.game_name, self.config,
        )
        replay_buffer_worker = replay_buffer.ReplayBuffer.remote(self.config)
        self_play_workers = [
            self_play.SelfPlay.remote(
                copy.deepcopy(self.muzero_weights),
                self.Game(self.config.seed + seed),
                self.config,
            )
            for seed in range(self.config.num_actors)
        ]
        test_worker = self_play.SelfPlay.remote(
            copy.deepcopy(self.muzero_weights),
            self.Game(self.config.seed + self.config.num_actors),
            self.config,
        )

        # Launch workers
        [
            self_play_worker.continuous_self_play.remote(
                shared_storage_worker, replay_buffer_worker
            )
            for self_play_worker in self_play_workers
        ]
        test_worker.continuous_self_play.remote(shared_storage_worker, None, True)
        training_worker.continuous_update_weights.remote(
            replay_buffer_worker, shared_storage_worker
        )

        print(
            "\nTraining...\nRun tensorboard --logdir ./results and go to http://localhost:6006/ to see in real time the training performance.\n"
        )
        # Save hyperparameters to TensorBoard
        hp_table = [
            "| {} | {} |".format(key, value)
            for key, value in self.config.__dict__.items()
        ]
        writer.add_text(
            "Hyperparameters",
            "| Parameter | Value |\n|-------|-------|\n" + "\n".join(hp_table),
        )
        # Loop for monitoring in real time the workers
        counter = 0
        infos = ray.get(shared_storage_worker.get_infos.remote())
        try:
            while infos["training_step"] < self.config.training_steps:
                # Get and save real time performance
                infos = ray.get(shared_storage_worker.get_infos.remote())
                writer.add_scalar(
                    "1.Total reward/1.Total reward", infos["total_reward"], counter,
                )
                writer.add_scalar(
                    "1.Total reward/2.Episode length", infos["episode_length"], counter,
                )
                writer.add_scalar(
                    "1.Total reward/3.Player 0 MuZero reward",
                    infos["player_0_reward"],
                    counter,
                )
                writer.add_scalar(
                    "1.Total reward/4.Player 1 Random reward",
                    infos["player_1_reward"],
                    counter,
                )
                writer.add_scalar(
                    "2.Workers/1.Self played games",
                    ray.get(replay_buffer_worker.get_self_play_count.remote()),
                    counter,
                )
                writer.add_scalar(
                    "2.Workers/2.Training steps", infos["training_step"], counter
                )
                writer.add_scalar(
                    "2.Workers/3.Self played games per training step ratio",
                    ray.get(replay_buffer_worker.get_self_play_count.remote())
                    / max(1, infos["training_step"]),
                    counter,
                )
                writer.add_scalar("2.Workers/4.Learning rate", infos["lr"], counter)
                writer.add_scalar(
                    "3.Loss/1.Total weighted loss", infos["total_loss"], counter
                )
                writer.add_scalar("3.Loss/Value loss", infos["value_loss"], counter)
                writer.add_scalar("3.Loss/Reward loss", infos["reward_loss"], counter)
                writer.add_scalar("3.Loss/Policy loss", infos["policy_loss"], counter)
                print(
                    "Last test reward: {0:.2f}. Training step: {1}/{2}. Played games: {3}. Loss: {4:.2f}".format(
                        infos["total_reward"],
                        infos["training_step"],
                        self.config.training_steps,
                        ray.get(replay_buffer_worker.get_self_play_count.remote()),
                        infos["total_loss"],
                    ),
                    end="\r",
                )
                counter += 1
                time.sleep(0.5)
        except KeyboardInterrupt as err:
            # Comment the line below to be able to stop the training but keep running
            # raise err
            pass
        self.muzero_weights = ray.get(shared_storage_worker.get_weights.remote())
        # End running actors
        ray.shutdown()

    def test(self, render, opponent, muzero_player):
        """
        Test the model in a dedicated thread.

        Args:
            render: Boolean to display or not the environment.

            opponent: "self" for self-play, "human" for playing against MuZero and "random"
            for a random agent.

            muzero_player: Integer with the player number of MuZero in case of multiplayer
            games, None let MuZero play all players turn by turn.
        """
        print("\nTesting...")
        ray.init()
        self_play_workers = self_play.SelfPlay.remote(
            copy.deepcopy(self.muzero_weights),
            self.Game(self.config.seed + self.config.num_actors),
            self.config,
        )
        history = ray.get(
            self_play_workers.play_game.remote(0, 0, render, opponent, muzero_player)
        )
        ray.shutdown()
        return sum(history.reward_history)

    def load_model(self, path=None):
        if not path:
            path = os.path.join(self.config.results_path, "model.weights")
        try:
            self.muzero_weights = torch.load(path)
            print("\nUsing weights from {}".format(path))
        except FileNotFoundError:
            print("\nThere is no model saved in {}.".format(path))


if __name__ == "__main__":
    print("\nWelcome to MuZero! Here's a list of games:")
    # Let user pick a game
    games = [
        filename[:-3]
        for filename in sorted(os.listdir("./games"))
        if filename.endswith(".py") and filename != "abstract_game.py"
    ]
    for i in range(len(games)):
        print("{}. {}".format(i, games[i]))
    choice = input("Enter a number to choose the game: ")
    valid_inputs = [str(i) for i in range(len(games))]
    while choice not in valid_inputs:
        choice = input("Invalid input, enter a number listed above: ")

    # Initialize MuZero
    choice = int(choice)
    muzero = MuZero(games[choice])

    while True:
        # Configure running options
        options = [
            "Train",
            "Load pretrained model",
            "Render some self play games",
            "Play against MuZero",
            "Exit",
        ]
        print()
        for i in range(len(options)):
            print("{}. {}".format(i, options[i]))

        choice = input("Enter a number to choose an action: ")
        valid_inputs = [str(i) for i in range(len(options))]
        while choice not in valid_inputs:
            choice = input("Invalid input, enter a number listed above: ")
        choice = int(choice)
        if choice == 0:
            muzero.train()
        elif choice == 1:
            path = input("Enter a path to the model.weights: ")
            while not os.path.isfile(path):
                path = input("Invalid path. Try again: ")
            muzero.load_model(path)
        elif choice == 2:
            muzero.test(render=True, opponent="self", muzero_player=None)
        elif choice == 3:
            muzero.test(render=True, opponent="human", muzero_player=0)
        else:
            break
        print("\nDone")

    ## Successive training, create a new config file for each experiment
    # experiments = ["cartpole", "tictactoe"]
    # for experiment in experiments:
    #     print("\nStarting experiment {}".format(experiment))
    #     try:
    #         muzero = MuZero(experiment)
    #         muzero.train()
    #     except:
    #         print("Skipping {}, an error has occurred.".format(experiment))

#MCTS

In [0]:
import math
import numpy as np
EPS = 1e-8

class MCTS():
    """
    This class handles the MCTS tree.
    """

    def __init__(self, game, nnet, args):
        self.game = game
        self.nnet = nnet
        self.args = args
        self.Qsa = {}       # stores Q values for s,a (as defined in the paper)
        self.Nsa = {}       # stores #times edge s,a was visited
        self.Ns = {}        # stores #times board s was visited
        self.Ps = {}        # stores initial policy (returned by neural net)

        self.Es = {}        # stores game.getGameEnded ended for board s
        self.Vs = {}        # stores game.getValidMoves for board s
        
        self.plot = [1000]
        self.num_sim = [0]

    def getActionProb(self, graphState, temp=1):
        """
        This function performs numMCTSSims simulations of MCTS starting from
        canonicalBoard.
        Returns:
            probs: a policy vector where the probability of the ith action is
                   proportional to Nsa[(s,a)]**(1./temp)
        """
        for i in range(self.args.numMCTSSims):
            self.search(graphState, i)

        s = self.game.stringRepresentation(graphState)
        counts = [self.Nsa[(s,a)] if (s,a) in self.Nsa else 0 for a in range(self.game.getActionSize())]

        if temp==0:
            bestA = np.argmax(counts)
            probs = [0]*len(counts)
            probs[bestA]=1
            return probs

        counts = [x**(1./temp) for x in counts]
        counts_sum = float(sum(counts))
        probs = [x/counts_sum for x in counts]
        return probs


    def search(self, graphState, num_sim):
        """
        This function performs one iteration of MCTS. It is recursively called
        till a leaf node is found. The action chosen at each node is one that
        has the maximum upper confidence bound as in the paper.
        Once a leaf node is found, the neural network is called to return an
        initial policy P and a value v for the state. This value is propagated
        up the search path. In case the leaf node is a terminal state, the
        outcome is propagated up the search path. The values of Ns, Nsa, Qsa are
        updated.
        NOTE: the return values are the negative of the value of the current
        state. This is done since v is in [-1,1] and if v is the value of a
        state for the current player, then its value is -v for the other player.
        Returns:
            v: the negative of the value of the current canonicalBoard
        """

        s = self.game.stringRepresentation(graphState)

        if s not in self.Es:
            self.Es[s] = self.game.getGameEnded(graphState)
        if self.Es[s]!=0:
            # terminal node
            return 0

        if s not in self.Ps:
            # leaf node
            if self.nnet is not None:
                self.Ps[s], v = self.nnet.predict(graphState, self.game.graph)
            else:
                self.Ps[s] = np.ones(self.game.getActionSize()) # random policy
                v = 0
            valids = self.game.getValidMoves(graphState)
            self.Ps[s] = self.Ps[s]*valids      # masking invalid moves
            sum_Ps_s = np.sum(self.Ps[s])
            if sum_Ps_s > 0:
                self.Ps[s] /= sum_Ps_s    # renormalize
            else:
                # if all valid moves were masked make all valid moves equally probable
                
                # NB! All valid moves may be masked if either your NNet architecture is insufficient or you've get overfitting or something else.
                # If you have got dozens or hundreds of these messages you should pay attention to your NNet and/or training process.   
                print("All valid moves were masked, do workaround.")
                self.Ps[s] = self.Ps[s] + valids
                self.Ps[s] /= np.sum(self.Ps[s])

            self.Vs[s] = valids
            self.Ns[s] = 0
            return v

        valids = self.Vs[s]
        cur_best = -float('inf')
        best_act = -1

        # pick the action with the highest upper confidence bound
        for a in range(self.game.getActionSize()):
            if valids[a]:
                if (s,a) in self.Qsa:
                    u = self.Qsa[(s,a)] + self.args.cpuct*self.Ps[s][a]*math.sqrt(self.Ns[s])/(1+self.Nsa[(s,a)])
                else:
                    u = self.args.cpuct*self.Ps[s][a]*math.sqrt(self.Ns[s] + EPS)     # Q = 0 ?

                if u > cur_best:
                    cur_best = u
                    best_act = a

        a = best_act
        next_s, reward = self.game.getNextState(graphState, a)
        # next_s = self.game.getCanonicalForm(next_s, next_player)

        v = self.search(next_s, num_sim) + reward

        if (s,a) in self.Qsa:
            self.Qsa[(s,a)] = (self.Nsa[(s,a)]*self.Qsa[(s,a)] + v)/(self.Nsa[(s,a)]+1)
            self.Nsa[(s,a)] += 1

        else:
            self.Qsa[(s,a)] = v
            self.Nsa[(s,a)] = 1
            
        if v > self.game.getActionSize() - self.plot[-1]:
            self.plot.append(self.game.getActionSize() - v)
            self.num_sim.append(num_sim)

        self.Ns[s] += 1
        return v