In [1]:
from collections import deque
import time
import random
import numpy as np

In [2]:
from deepgroebner.wrapped import CLeadMonomialsEnv as LeadMonomialsEnv

In [3]:
def epsilon_greedy(epsilon=0.09):
    """Return an epsilon-greedy tree policy."""
    def policy(node):
        if random.random() < epsilon:
            return random.choice(node.children)
        else:
            return max(node.children, key=lambda n: n.value[node.env.turn])
    return policy


def ucb(c=np.sqrt(2)):
    """Return an upper confidence bound tree policy."""
    def policy(node):
        def v(n):
            if n.visits == 0:
                return np.inf
            else:
                return n.value[node.env.turn] + c * np.sqrt(np.log(node.visits)/n.visits)
        return max(node.children, key=v)
    return policy


class TreeNode:
    """A tree node for Monte Carlo tree search."""

    def __init__(self, parent, action, reward, env):
        self.parent = parent
        self.children = []
        self.action = action
        self.reward = reward
        self.env = env
        self.visits = 0
        self.value = np.zeros(env.players)


class MCTSAgent:
    """A Monte Carlo tree search agent.

    Parameters
    ----------
    tree_policy : function
        A function which maps node to child node.
    timeout : float, optional
        The amount of time in seconds to perform rollouts before choosing an action.

    """

    def __init__(self, tree_policy=ucb(), timeout=1.0):
        self.tree_policy = tree_policy
        self.timeout = timeout
        self.root = None

    def act(self, env):
        """Return a chosen action for the env.

        Parameters
        ----------
        env : environment
            The current environment.

        """
        self.root = self.find_root(env)
        limit = time.time() + self.timeout
        while time.time() < limit:
            leaf = self.expand(self.root)
            value = self.simulate(leaf)
            self.backup(leaf, value)
        return max(self.root.children, key=lambda node: node.visits).action

    def expand(self, node):
        """Return an unvisited or terminal leaf node following the tree policy.

        Before returning, this function performs all possible actions from the
        leaf node and adds new nodes for them to the tree as children of the
        leaf node.
        """
        while node.visits != 0 and len(node.children) > 0:
            node = self.tree_policy(node)
        if not node.env.done:
            for action in node.env.actions:
                env = node.env.copy()
                _, reward, _, _ = env.step(action)
                node.children.append(TreeNode(node, action, reward, env))
        return node

    def simulate(self, node):
        """Return one total reward from node following uniform random policy."""
        env = node.env.copy()
        total_rewards = np.zeros(env.players)
        while not env.done:
            action = random.choice(env.actions)
            _, rewards, _, _ = env.step(action)
            total_rewards += rewards
        return total_rewards

    def backup(self, node, value):
        """Backup the return from a rollout from node."""
        while node is not None:
            value += node.reward
            node.visits += 1
            node.value = (node.visits - 1)/node.visits * node.value + value/node.visits
            node = node.parent

    def find_root(self, env):
        """Return node corresponding to env in current tree using BFS."""
        if self.root is not None:
            q = deque(self.root.children)
            while q:
                node = q.popleft()
                if node.env == env:
                    return node
                q.extend(node.children)
        return TreeNode(None, None, np.zeros(env.players), env)


class MCTSWrapper:
    """A wrapper for LeadMonomialsEnv environments to interact with the MCTSAgent."""

    def __init__(self, env):
        self.env = env
        self.players = 1
        self.turn = 0
        self.state = None
        self.done = None
        self.actions = []

    def reset(self):
        self.state = self.env.reset()
        self.done = False
        self.actions = list(range(len(self.state)))
        return self.state

    def step(self, action):
        self.state, reward, self.done, info = self.env.step(action)
        self.actions = list(range(len(self.state)))
        return self.state, reward, self.done, info

    def copy(self):
        copy = MCTSWrapper(self.env.copy())
        copy.state = self.state.copy()
        copy.done = self.done
        copy.actions = self.actions.copy()
        return copy

## Small Example

In [4]:
env = MCTSWrapper(LeadMonomialsEnv('cyclic-4'))
env.reset()
env.env.value(gamma=1.0)

-38.0

In [5]:
agent = MCTSAgent(timeout=10)
agent.act(env)

2

In [6]:
agent.root.value

array([-36.00814578])

## Random Searches

In [7]:
def best_random_performance(env, seed, samples):
    """Return best total return from random agent on env over given samples."""
    best = -np.inf
    for _ in range(samples):
        env.seed(seed)
        env.reset()
        best = max(best, env.value('random', gamma=1.0))
    return best

In [8]:
%%time

env = LeadMonomialsEnv('3-20-10-weighted')
weighted_returns = [best_random_performance(env, seed, 1000)
                    for seed in np.random.randint(1000, size=10_000)]

CPU times: user 40min 25s, sys: 18.5 ms, total: 40min 25s
Wall time: 40min 24s


In [9]:
np.mean(weighted_returns), np.std(weighted_returns)

(-56.9911, 17.585136359721524)

In [10]:
%%time

env = LeadMonomialsEnv('3-20-10-uniform')
uniform_returns = [best_random_performance(env, seed, 1000)
                   for seed in np.random.randint(1000, size=10_000)]

CPU times: user 1h 38min 29s, sys: 260 ms, total: 1h 38min 30s
Wall time: 1h 38min 29s


In [11]:
np.mean(uniform_returns), np.std(uniform_returns)

(-104.4735, 32.71985785039415)

In [12]:
%%time

env = LeadMonomialsEnv('3-20-10-maximum')
maximum_returns = [best_random_performance(env, seed, 1000)
                   for seed in np.random.randint(1000, size=10_000)]

CPU times: user 2h 45min 38s, sys: 488 ms, total: 2h 45min 39s
Wall time: 2h 45min 38s


In [13]:
np.mean(maximum_returns), np.std(maximum_returns)

(-334.6879, 88.4650049092295)

## MCTS episodes

In [14]:
def run_episode(agent, env):
    env.reset()
    total_reward = 0.0
    while not env.done:
        action = agent.act(env)
        _, reward, _, _ = env.step(action)
        total_reward += reward
    return total_reward

In [15]:
agent = MCTSAgent(timeout=1)
env = MCTSWrapper(LeadMonomialsEnv('3-20-10-uniform'))

In [16]:
run_episode(agent, env)

-62.0