In [None]:
import numpy as np
import math
import random

def ucb(v, n, t, c_param=1):
    '''
    t visits to state node, n visits to an action under the state node, total value v
    '''
    return np.inf if (t == 0 or n == 0) else ((v/n) + c_param * math.sqrt(2*math.log(t)/n))

class MCTSNode:
    def __init__(self, state, parent=None):
        self.state = state      # State (s)
        self.parent = parent    # Parent node
        self.children = {}      # Dictionary to hold child nodes (action -> child node)
        self.visits = 0         # Number of visits to this node
        self.value = 0.0        # Accumulated value for this node
        self.depth = 0 if parent is None else parent.depth + 1  # Node depth

    def is_fully_expanded(self):
        return len(self.children) == len(Actions)  # All actions have been tried


    def best_child(self, c_param=1.4):
        #if self.visits == 0: # To fix the UCB calculation if self has had no visits
            #choices_weights = [
                #(child.value / child.visits)
                #for child in self.children.values()
            #]
        #else:
            #choices_weights = [
                #(child.value / child.visits) + c_param * math.sqrt((2 * math.log(self.visits) / child.visits))
                #for child in self.children.values()
            #]

        choices_weights = [
                ucb(child.value, child.visits, self.visits, c_param) for child in self.children.values()
            ]
        return list(self.children.values())[np.argmax(choices_weights)]

    def __str__(self):
        return 'd=%d n=%d v=%f' % (self.depth, self.visits, self.value)

    def __repr__(self):
        return 'd=%d n=%d v=%f' % (self.depth, self.visits, self.value)

def r(s):  # Immediate reward function
    return float(s == 2)

# State and action space
States = np.array([0, 1, 2], dtype=int)
Actions = np.array([0, 1], dtype=int)

# Transition probability matrix P[a, s, s_next]
P = np.array([
    [0.8, 0.2, 0.0],
    [0.1, 0.8, 0.1],
    [0.0, 0.2, 0.8],
    [0.2, 0.8, 0],
    [0.0, 0.2, 0.8],
    [0.0, 0.8, 0.2],
]).reshape(2, 3, 3)

gamma = 0.9  # Discount factor for future rewards
max_depth = 30  # Maximum tree depth

def simulate(state, depth):
    """Simulate a random rollout from the current state."""
    total_reward = 0.0
    for d in range(depth):
        action = random.choice(Actions)  # Choose a random action
        reward = r(state)
        total_reward += gamma**d * reward 

        next_state = np.random.choice(States, p=P[action, state])
        state = next_state
        if d == max_depth:  # Maximum depth
            break
    return total_reward

def expand(node):
    """Expand the node by trying an untried action and creating a new child."""
    tried_actions = node.children.keys()
    untried_actions = [a for a in Actions if a not in tried_actions]
    action = random.choice(untried_actions)
    
    # Sample the next state based on transition probabilities
    next_state = np.random.choice(States, p=P[action, node.state])
    
    # Apply immediate reward for the expanded node
    reward = r(node.state)  # Immediate reward, no discounting here
    
    # Create a new child node
    child_node = MCTSNode(state=next_state, parent=node)
    node.children[action] = child_node
    return child_node, reward

def backpropagate_trajectory(trajectory, rollout_reward):
    """Backpropagate the reward through the trajectory with correct discounting."""
    cumulative_reward = rollout_reward  # Start with the rollout reward
    for node, reward in reversed(trajectory):
        # Add the immediate reward to the cumulative reward and propagate
        cumulative_reward = reward + gamma * cumulative_reward
        node.visits += 1
        node.value += cumulative_reward

def best_action(root, iterations=1000):
    """Perform MCTS and return the best action."""
    for i in range(iterations):
        node = root
        #print(i, root)
        trajectory = []  # To store the nodes along the current path

        # Selection: Traverse the tree to find a node that is not fully expanded
        while node.is_fully_expanded() and node.children and node.depth < max_depth:
            child = node.best_child()
            reward = r(node.state)  # Immediate reward for the current node
            trajectory.append((node, reward))  # Add the node and reward to the trajectory
            node = child

        # Expansion: Expand the node if it's not fully expanded
        if not node.is_fully_expanded() and node.depth < max_depth:
            child, expanded_reward = expand(node)
            reward = expanded_reward  # Immediate reward for the expanded node
            trajectory.append((node, reward))  # Add the expanded node and reward to the trajectory
            node = child

        # Simulation: Perform a random rollout from the newly expanded node
        rollout_reward = simulate(node.state, max_depth - node.depth)
        #print('before update', trajectory, rollout_reward)
        
        # Backpropagation: Backpropagate the reward through the trajectory
        backpropagate_trajectory(trajectory, rollout_reward)
        #print('after update', trajectory)

    # Choose the action leading to the best child
    best_action = max(root.children, key=lambda action: root.children[action].value / root.children[action].visits)
    return best_action

policy = [1, 1, 0] # optimal policy
counts = np.zeros(3)
# Run MCTS for each initial state and print the best action
for initial_state in range(3):
    for _ in range(30):
        # Initial state
        root = MCTSNode(state=initial_state)

        # Run MCTS
        best_act = best_action(root, iterations=10000)
        counts[initial_state] += (best_act == policy[initial_state])
        #print(f"Best action from state {initial_state} is: {best_act}")

print(counts/30)


[18. 23. 22.]


In [4]:
print(counts/30)

[0.6        0.76666667 0.73333333]
