In [3]:
import numpy as np
from collections import defaultdict

import rospy
from std_msgs.msg import String
from geometry_msgs.msg import Point

In [4]:
class MonteCarloTreeSearchNode():
    def __init__(self, state, parent=None, parent_action=None):
        self.state = state
        self.parent = parent
        self.parent_action = parent_action
        self.children = []
        self._number_of_visits = 0
        self._results = defaultdict(int)
        self._results[1] = 0
        self._results[-1] = 0
        self._untried_actions = None
        self._untried_actions = self.untried_actions()
        return

    def untried_actions(self):

        self._untried_actions = self.state.get_legal_actions()
        return self._untried_actions

    def q(self):
        '''
        Update to reflect robot scenario rather than wins and losses
        '''
        wins = self._results[1]
        loses = self._results[-1]
        return wins - loses

    def n(self):
        return self._number_of_visits

    def expand(self):

        action = self._untried_actions.pop()
        next_state = self.state.move(action)
        child_node = MonteCarloTreeSearchNode(
            next_state, parent=self, parent_action=action)

        self.children.append(child_node)
        return child_node 

    def is_terminal_node(self):
        return self.state.is_game_over()

    def simulation(self):
        current_simulation_state = self.state

        while not current_simulation_state.is_game_over():

            possible_moves = current_simulation_state.get_legal_actions()

            action = self.simulation_policy(possible_moves)
            current_simulation_state = current_simulation_state.move(action)
        return current_simulation_state.game_result()

    def backpropagate(self, result):
        self._number_of_visits += 1.
        self._results[result] += 1.
        if self.parent:
            self.parent.backpropagate(result)

    def is_fully_expanded(self):
        return len(self._untried_actions) == 0

    def best_child(self, c_param=0.1):

        choices_weights = [(c.q() / c.n()) + c_param * np.sqrt((2 * np.log(self.n()) / c.n())) for c in self.children]
        return self.children[np.argmax(choices_weights)]

    def simulation_policy(self, possible_moves):

        return possible_moves[np.random.randint(len(possible_moves))]

    def _tree_policy(self):

        current_node = self
        while not current_node.is_terminal_node():

            if not current_node.is_fully_expanded():
                return current_node.expand()
            else:
                current_node = current_node.best_child()
        return current_node

    def best_action(self):
        simulation_no = 100


        for i in range(simulation_no):

            v = self._tree_policy()
            reward = v.simulation()
            v.backpropagate(reward)

        return self.best_child(c_param=0.)

    def get_legal_actions(self): 
        '''
        Modify according to your game or
        needs. Constructs a list of all
        possible actions from current state.
        Returns a list.
        '''

    def is_game_over(self):
        '''
        Modify according to your game or 
        needs. It is the game over condition
        and depends on your game. Returns
        true or false
        '''

    def game_result(self):
        '''
        Modify according to your game or 
        needs. Returns 1 or 0 or -1 depending
        on your state corresponding to win,
        tie or a loss.
        '''

    def move(self,action):
        '''
        Modify according to your game or 
        needs. Changes the state of your 
        board with a new value. For a normal
        Tic Tac Toe game, it can be a 3 by 3
        array with all the elements of array
        being 0 initially. 0 means the board 
        position is empty. If you place x in
        row 2 column 3, then it would be some 
        thing like board[2][3] = 1, where 1
        represents that x is placed. Returns 
        the new state after making a move.
        '''

def main():
    root = MonteCarloTreeSearchNode(state = initial_state)
    selected_node = root.best_action()
    return 

In [None]:
class Robot():
    def __init__(self, robot_id, start_loc, goal_loc, time_interval, env):
        self.robot_id = robot_id
        self.start_loc = start_loc
        self.goal_loc = goal_loc
        self.loc_list = []
        self.observations_list = []
        self.time_interval = time_interval
        self.env = None
        self.pub = rospy.Publisher('robot_loc_'+robot_id, Point, queue_size=10)
        rospy.init_node('robot_'+robot_id , anonymous=True)
        self.rate = rospy.Rate(1/time_interval)
        
    def register_env(self, env):
        '''
        Associates an Environment object with the robot
        '''
        self.env = env
        
    def get_observations(self):
        '''
        Return a dictionary of N,E,S,W and whether there
        is a wall in that direction from the robot's current 
        location
        '''
        return self.env.get_walls_from_loc(loc_list[-1])
        
    def move(self, direction):
        '''
        Move one step in direction passed as argument
        if fail due to obstructing wall from get_observations()
        stay in same location
        '''
        
    def update(self):
        '''
        Move to next position, update observations, update locations, run MCTS,
        publish to ROS topic
        '''
        
    def listener(self):
        '''
        Implement timer listener at frequency 1/time_interval to call to update() 
        '''

In [None]:
class Environment():
    def __init__(self, width, height, walls, start, goal, render_interval=0.5):
        self.width = width
        self.height = height
        self.walls = walls
        self.start = start
        self.goal = goal
        self.render_interval = render_interval
        self.timestep = 0
        self.complete = False
        self.robot_list = [] #Tuple of robot ID and location
        
    def add_robot(self, robot):
        '''
        Add robot with start location start_loc, goal goal_loc and pass self as env
        Add to self.robot_list
        ''' 
        
    def get_walls_from_loc(self, loc):
        '''
        return a dictionary of N,E,S,W and whether there
        is a wall in that direction from loc 
        '''
        
    def render(self):
        '''
        render
        '''
    
    def update_loc(self, loc_msg):
        '''
        Callback function for robot locations
        Update locations of robots for rendering
        '''
        
    def listener(self):
        '''
        Implement listener code, call to update_loc()
        Implement timer listener at frequency 1/render_interval to call to render()
        '''
        rospy.init_node('Environment', anonymous=True)
        for (robot_id,_) in self.robot_list:
            rospy.Subscriber('robot_loc_'+robot_id, Point, update_loc, robot_id)

In [32]:
import math
import random

import numpy as np
from collections import defaultdict

from scipy import stats

import json

def json_dumps_tuple_keys(mapping):
    string_keys = {json.dumps(k): v for k, v in mapping.items()}
    return json.dumps(string_keys)

def json_loads_tuple_keys(string):
    mapping = json.loads(string)
    return {tuple(json.loads(k)): v for k, v in mapping.items()}

def Agent_Info_from_JSON(json_str):
    json_dict = json.loads(json_str)
    state = Agent_State(json_dict['state']['loc'], json_dict['state']['obs'])
    probs = json_loads_tuple_keys(json_dict['probs'])
    timestamp = json_dict['time']
    robot_id = json_dict['robot_id']
    return Agent_Info(robot_id, state, probs, timestamp)

class Agent_State():
    def __init__(self, location, observations):
        self.loc = location
        self.obs = observations

class Agent_Info():
    def __init__(self, robot_id, state, probs, timestamp):
        self.state = state #Type of Agent_State
        self.probs = json_dumps_tuple_keys(probs) #Dictionary: use JSON - use json_loads_tuple_keys() to turn back to normal tuple
        self.time = timestamp #integer - number of times update called with execute action True
        self.robot_id = robot_id #UUID      
        
    def toJSON(self):
        return json.dumps(self, default=lambda o: o.__dict__, 
            sort_keys=True, indent=4)

    def select_random_plan(self):
        plan, _ = np.random.choice(self.probs.keys, p=self.probs.values).copy()
        return plan

In [33]:
a = Agent_Info(0, Agent_State((4,5),[(1,2),(2,3)]), {(5,7):0.6, (4,8):0.3}, 10)

In [34]:
b = a.toJSON()

In [35]:
json.loads(b)

{'probs': '{"[5, 7]": 0.6, "[4, 8]": 0.3}',
 'robot_id': 0,
 'state': {'loc': [4, 5], 'obs': [[1, 2], [2, 3]]},
 'time': 10}

In [36]:
c = Agent_Info_from_JSON(b)

In [43]:
c.robot_id

0