In [1]:
import uuid
import random
import gym
import numpy as np
from math import sqrt, log

class Node:
    def __init__(self, state, action, action_space, reward, terminal):
        self.identifier = str(uuid.uuid1())
        self.parent_identifier = None
        self.children_identifiers = []
        self.untried_actions = list(range(action_space))
        self.state = state
        self.total_simulation_reward = 0
        self.num_visits = 0
        self.performance = 0
        self.action = action
        self.reward = reward
        self.terminal = terminal

    def __str__(self):
        return "{}: (action={}, visits={}, reward={:d}, ratio={:0.4f})".format(
                                                  self.state,
                                                  self.action,
                                                  self.num_visits,
                                                  int(self.total_simulation_reward),
                                                  self.performance)

    def untried_action(self):
        action = random.choice(self.untried_actions)
        self.untried_actions.remove(action)
        return action

def vertical_lines(last_node_flags):
    vertical_lines = []
    vertical_line = '\u2502'
    for last_node_flag in last_node_flags[0:-1]:
        if last_node_flag == False:
            vertical_lines.append(vertical_line + ' ' * 3)
        else:
            vertical_lines.append(' ' * 4)
    return ''.join(vertical_lines)

def horizontal_line(last_node_flags):
    horizontal_line = '\u251c\u2500\u2500 '
    horizontal_line_end = '\u2514\u2500\u2500 '
    if last_node_flags[-1]:
        return horizontal_line_end
    else:
        return horizontal_line

class Tree:
    def __init__(self):
        self.nodes = {}
        self.root = None

    def is_expandable(self, node):
        if node.terminal:
            return False
        if len(node.untried_actions) > 0:
            return True
        return False

    def iter(self, identifier, depth, last_node_flags):
        if identifier is None:
            node = self.root
        else:
            node = self.nodes[identifier]

        if depth == 0:
            yield "", node
        else:
            yield vertical_lines(last_node_flags) + horizontal_line(last_node_flags), node

        children = [self.nodes[identifier] for identifier in node.children_identifiers]
        last_index = len(children) - 1

        depth += 1
        for index, child in enumerate(children):
            last_node_flags.append(index == last_index)
            for edge, node in self.iter(child.identifier, depth, last_node_flags):
                yield edge, node
            last_node_flags.pop()

    def add_node(self, node, parent=None):
        self.nodes.update({node.identifier: node})

        if parent is None:
            self.root = node
            self.nodes[node.identifier].parent = None
        else:
            self.nodes[parent.identifier].children_identifiers.append(node.identifier)
            self.nodes[node.identifier].parent_identifier = parent.identifier

    def children(self, node):
        children = []
        for identifier in self.nodes[node.identifier].children_identifiers:
            children.append(self.nodes[identifier])
        return children

    def parent(self, node):
        parent_identifier = self.nodes[node.identifier].parent_identifier
        if parent_identifier is None:
            return None
        else:
            return self.nodes[parent_identifier]

    def show(self):
        lines = ""
        for edge, node in self.iter(identifier=None, depth=0, last_node_flags=[]):
            lines += "{}{}\n".format(edge, node)
        print(lines)

    def render_policy(self):
        node = self.tree.root
        path = []
        print("Rendering final policy...\n")

        while node and not node.terminal:
            print(node)
            path.append(node.state)
            node = max(self.tree.children(node), key=lambda n: n.num_visits)
        
        if node:
            print(node)
            path.append(node.state)
        
        print("\nFinal policy path (states):", path)





In [7]:
import numpy as np

def compute_v_table_from_q_table(q_table_file):

    # Carica la Q-table dal file
    q_table = np.load(q_table_file)
    q_table = q_table[0]
    
    # Compute V-table by taking the max of each row (state) from Q-table
    v_table = np.max(q_table, axis=1)
    
    # Replace any zero values in the V-table with -1
    v_table[v_table == 0] = -1
    v_table[15]=1
    # Save the updated V-table
    np.save("V_slippery.npy", v_table)
    return v_table


In [8]:

import gym
import numpy as np
import random
import json  # Import json to print JSON formatted data

import gym
import numpy as np
import random
import json  # Import json to print JSON formatted data

class MonteCarloTreeSearch:
    def __init__(self, env, V, initial_state=None, max_steps=2, slippery_prob=0.3):
        self.env = env
        self.V = V  # Value matrix
        self.action_space = self.env.action_space.n
        self.max_steps = max_steps  # Maximum steps to simulate forward
        self.initial_state = initial_state
        self.max_expansions = 8
        self.slippery_prob = slippery_prob  # Probability of slipping
        self.reset_tree()
        self.slips = []  # Track slips with detailed information

    def reset_tree(self):
        """Reset the tree with the given state as the root."""
        if self.initial_state is None:
            state = self.env.reset()[0]
        else:
            self.env.reset()  # Reset the environment to the start.
            self.env.unwrapped.s = self.initial_state  # Manually set the state to initial_state.
            state = self.initial_state
        
        root_node = Node(state=state, action=None, action_space=self.action_space, reward=0, terminal=False)
        self.tree = Tree()
        self.tree.add_node(root_node)
        print("Root node:", root_node)

    def expand(self, node, action):
        """Expand a given node by taking an action."""
        previous_state = self.env.unwrapped.s
        
        state, reward, done, _, _ = self.env.step(action)
        new_node = Node(state=state, action=action, action_space=self.action_space, reward=self.V[state], terminal=done)
        self.tree.add_node(new_node, node)

        self.env.unwrapped.s = previous_state

        return new_node
    
    def simulate(self, node):
        """Simulate from the node with custom slippery behavior, returning the value from the V table."""
        current_state = node.state
        env_state = self.env.unwrapped.s 
        action = node.action

        if (current_state != env_state):
            self.env.unwrapped.s = current_state

        if not (node.terminal) and (random.random() < self.slippery_prob):
            # Custom slippery effect: take an additional move in the same direction if not done
            second_state, _, done, _, _ = self.env.step(action)
            # Record slip event with detailed information
            self.slips.append({
                'from_state': current_state,
                'action': action,
                'slipped_to_state': second_state
            })
            # Set the environment state to the next state to continue the simulation
            self.env.unwrapped.s = env_state   
            return self.V[second_state]
        
        self.env.unwrapped.s = env_state 
        return self.V[current_state]

    def backpropagate(self, node, reward):
        """Backpropagate the simulation results up the tree."""
        while node:
            node.num_visits += 1
            node.total_simulation_reward += reward
            node.performance = node.total_simulation_reward / node.num_visits
            node.reward = (node.reward + 0.99 * reward) / node.num_visits  # Discount reward for parent
            node = self.tree.parent(node)

    def build_depth_n_tree(self):
        """Build a depth-n tree using selection, expansion, simulation, and backpropagation."""
        root_node = self.tree.root

        for _ in range(self.max_expansions):
            # 1. Selection: Use select_node to choose the most promising node
            selected_node = root_node
            while not self.tree.is_expandable(selected_node) and not selected_node.terminal:
                selected_node = self.select_node(selected_node)  # Epsilon-greedy selection
            
            # 2. Expansion: If the node is not terminal, expand it
            if not selected_node.terminal and self.tree.is_expandable(selected_node):
                expanded_node = self.expand(selected_node, selected_node.untried_action())
            else:
                expanded_node = selected_node  # No expansion if terminal or not expandable

            # 3. Simulation: Simulate from the expanded node
            simulation_reward = self.simulate(expanded_node)

            # 4. Backpropagation: Backpropagate the result of the simulation
            self.backpropagate(expanded_node, simulation_reward)

        # Reset environment state to the root's state after building the tree
        self.env.unwrapped.s = root_node.state

        # Visualize the tree after building
        self.tree.show()

    def select_node(self, node):
        """Select the most promising node to expand using epsilon-greedy."""
        if self.tree.is_expandable(node):
            return self.expand(node, node.untried_action())
        else:
            # Epsilon-greedy selection
            if random.random() < 0.3:
                # Explore: choose a random child
                node = random.choice(self.tree.children(node))
            else:
                # Exploit: choose the best child based on performance
                node = max(self.tree.children(node), key=lambda n: n.performance, default=None)
            return node

    def forward(self):
        """Perform a single iteration of MCTS with depth-n simulation as default policy."""
        self.build_depth_n_tree()
        root_node = self.tree.root
        best_child = max(self.tree.children(root_node), key=lambda n: n.performance, default=None)
        if best_child:
            return best_child.action, best_child.state
        else:
            return None, None

    def run(self):
        """Run a single iteration, now reflecting the deeper default policy."""
        self.forward()
        # Save the slip events to a JSON file
        with open('slips.json', 'w+') as f:
            json.dump(self.slips, f, indent=4)

    def choose_best_action(self):
        """After running the MCTS, choose the best action based on the highest value of the final state."""
        children = self.tree.children(self.tree.root)
        
        # Check if any child has the state equal to 15
        for child in children:
            if child.state == 15:
                return child.action, child.state
        
        # If no child state is 15, choose the one with the highest performance
        best_child = max(children, key=lambda n: n.performance, default=None)
        
        if best_child:
            return best_child.action, best_child.state
        else:
            return None, None


In [14]:

def main():
    env = gym.make('FrozenLake-v1', is_slippery=False)
    env2 = gym.make('FrozenLake-v1', is_slippery=False)

    # q_table_file = 'Q.npy'
    # v_table = compute_v_table_from_q_table(q_table_file)
    v_table=np.load("V_slippery.npy")
    print(v_table)
    
    target_state = 15
    
    stateMCTS = env.reset()[0]
    print(f"Initial state: {stateMCTS}")
    env2.reset()[0]
    
    path = [stateMCTS]
    terminal_state_reached = False
    max_iterations_without_convergence = 1000
    iteration_count = 0

    print("Starting MCTS...")
    while not terminal_state_reached and iteration_count < max_iterations_without_convergence:
        monteCarloTreeSearch = MonteCarloTreeSearch(env=env, V=v_table, initial_state=stateMCTS, max_steps=2, slippery_prob=0.8)  # Adjust max_steps as needed
        monteCarloTreeSearch.run()
        print(f"\nBuilt tree for the cell\n")
        monteCarloTreeSearch.tree.show()
        
        action, next_state = monteCarloTreeSearch.choose_best_action()

        if action is None:
            print("No valid actions found, stopping.")
            break

        print(f"Chosen action: {action}, leads to state: {next_state}")
        path.append(next_state)

        env2_state, reward, done, _, _ = env2.step(action)
        print(f"New state after action {action}: {env2_state}, reward: {reward}, done: {done}")

        if env2_state == target_state:
            print("Target state reached.")
            terminal_state_reached = True
        elif done:
            print("Fell into the lake, retrying.")
            stateMCTS = env.reset()[0] 
            env2.reset()  
            path = [stateMCTS]
            iteration_count = 0  
        else:
            stateMCTS = env2_state
            iteration_count += 1

    print("\nFinal path (states):", path)
    if terminal_state_reached:
        path = []

   

if __name__ == "__main__":
    main()


[ 0.95099005  0.96059601  0.97029794  0.96059601  0.96059601 -1.
  0.9801     -1.          0.970299    0.9801      0.99       -1.
 -1.          0.99        1.          1.        ]
Initial state: 0
Starting MCTS...
Root node: 0: (action=None, visits=0, reward=0, ratio=0.0000)
0: (action=None, visits=8, reward=7, ratio=0.9606)
├── 0: (action=3, visits=1, reward=0, ratio=0.9510)
├── 0: (action=0, visits=1, reward=0, ratio=0.9510)
├── 1: (action=2, visits=2, reward=1, ratio=0.9606)
│   └── 0: (action=0, visits=1, reward=0, ratio=0.9510)
└── 4: (action=1, visits=4, reward=3, ratio=0.9655)
    ├── 0: (action=0, visits=1, reward=0, ratio=0.9510)
    ├── 4: (action=1, visits=1, reward=0, ratio=0.9703)
    └── 1: (action=2, visits=1, reward=0, ratio=0.9703)


Built tree for the cell

0: (action=None, visits=8, reward=7, ratio=0.9606)
├── 0: (action=3, visits=1, reward=0, ratio=0.9510)
├── 0: (action=0, visits=1, reward=0, ratio=0.9510)
├── 1: (action=2, visits=2, reward=1, ratio=0.9606)
│   └──

In [5]:
# import gym
# import numpy as np
# import random

# class MonteCarloTreeSearch:
#     def __init__(self, env, V, initial_state=None, max_steps=2, slippery_prob=0.3):
#         self.env = env
#         self.V = V  # Value matrix
#         self.action_space = self.env.action_space.n
#         self.max_steps = max_steps  # Maximum steps to simulate forward
#         self.initial_state = initial_state
#         self.max_expansions = 8
#         self.slippery_prob = slippery_prob  # Probability of slipping
#         self.reset_tree()

#     def reset_tree(self):
#         """Reset the tree with the given state as the root."""
#         if self.initial_state is None:
#             state = self.env.reset()[0]
#         else:
#             self.env.reset()  # Reset the environment to the start.
#             self.env.unwrapped.s = self.initial_state  # Manually set the state to initial_state.
#             state = self.initial_state
        
#         root_node = Node(state=state, action=None, action_space=self.action_space, reward=0, terminal=False)
#         self.tree = Tree()
#         self.tree.add_node(root_node)
#         print("Root node:", root_node)

#     def expand(self, node, action):
#         """Expand a given node by taking an action."""
#         previous_state = self.env.unwrapped.s
        
#         state, reward, done, _, _ = self.env.step(action)
#         new_node = Node(state=state, action=action, action_space=self.action_space, reward=self.V[state], terminal=done)
#         self.tree.add_node(new_node, node)

#         self.env.unwrapped.s = previous_state

#         return new_node
    
#     def simulate(self, node):
#             """Simulate from the node with custom slippery behavior, returning the value from the V table."""
#             current_state = node.state
#             env_state = self.env.unwrapped.s 
#             if (current_state != env_state):
#                 self.env.unwrapped.s = current_state
#             action = node.action
            
#             if not (node.terminal) & (random.random() < self.slippery_prob):
#                 # Custom slippery effect: take an additional move in the same direction if not done
#                 second_state, _, done, _, _ = self.env.step(action)
#                 # Set the environment state to the next state to continue the simulation
#                 self.env.unwrapped.s = env_state   
#                 return self.V[second_state]
          
#             self.env.unwrapped.s = env_state 
#             return self.V[current_state]


#     def backpropagate(self, node, reward):
#         """Backpropagate the simulation results up the tree."""
#         while node:
#             node.num_visits += 1
#             node.total_simulation_reward += reward
#             node.performance = node.total_simulation_reward / node.num_visits
#             node.reward = (node.reward + 0.99 * reward)/ node.num_visits # Discount reward for parent
#             node = self.tree.parent(node)

#     def build_depth_n_tree(self):
#         """Build a depth-n tree using selection, expansion, simulation, and backpropagation."""
#         root_node = self.tree.root

#         for _ in range(self.max_expansions):
#             # 1. Selection: Use select_node to choose the most promising node
#             selected_node = root_node
#             while not self.tree.is_expandable(selected_node) and not selected_node.terminal:
#                 selected_node = self.select_node(selected_node)  # Epsilon-greedy selection
            
#             # 2. Expansion: If the node is not terminal, expand it
#             if not selected_node.terminal and self.tree.is_expandable(selected_node):
#                 expanded_node = self.expand(selected_node, selected_node.untried_action())
#             else:
#                 expanded_node = selected_node  # No expansion if terminal or not expandable

#             # 3. Simulation: Simulate from the expanded node
#             simulation_reward = self.simulate(expanded_node)

#             # 4. Backpropagation: Backpropagate the result of the simulation
#             self.backpropagate(expanded_node, simulation_reward)

#         # Reset environment state to the root's state after building the tree
#         self.env.unwrapped.s = root_node.state

#         # Visualize the tree after building
#         self.tree.show()


#     def select_node(self, node):
#         """Select the most promising node to expand using epsilon-greedy."""
#         if self.tree.is_expandable(node):
#             return self.expand(node, node.untried_action())
#         else:
#             # Epsilon-greedy selection
#             if random.random() < 0.3:
#                 # Explore: choose a random child
#                 node = random.choice(self.tree.children(node))
#             else:
#                 # Exploit: choose the best child based on performance
#                 node = max(self.tree.children(node), key=lambda n: n.performance, default=None)
#             return node

#     def forward(self):
#         """Perform a single iteration of MCTS with depth-n simulation as default policy."""
#         self.build_depth_n_tree()
#         root_node = self.tree.root
#         best_child = max(self.tree.children(root_node), key=lambda n: n.performance, default=None)
#         if best_child:
#             return best_child.action, best_child.state
#         else:
#             return None, None

#     def run(self):
#         """Run a single iteration, now reflecting the deeper default policy."""
#         self.forward()

#     def choose_best_action(self):
#         """After running the MCTS, choose the best action based on the highest value of the final state."""
#         children = self.tree.children(self.tree.root)
        
#         # Check if any child has the state equal to 15
#         for child in children:
#             if child.state == 15:
#                 return child.action, child.state
        
#         # If no child state is 15, choose the one with the highest performance
#         best_child = max(children, key=lambda n: n.performance, default=None)
        
#         if best_child:
#             return best_child.action, best_child.state
#         else:
#             return None, None

