In [None]:
import numpy as np
import copy

In [None]:
class GridWorld:
    def __init__(self, height, width, gamma=1, terminals=[(0, 0)], maze=None):
        self.__actions = ("U", "D", "L", "R")
        self.__reward = -1
        
        self.__gamma = gamma
        
        if maze is None:
            self.__maze = None
            self.__width = width
            self.__height = height
    
            self.__terminals = terminals
        
            self.__states = []
            for h in range(height):
                for w in range(width):
                    self.__states += [(h, w)]
        else:
            self.__maze = maze
            self.__width = len(maze[0])
            self.__height = len(maze)

            self.__terminals = []        
            self.__states = []
            for h in range(self.__height):
                for w in range(self.__width):
                    if maze[h][w] == "O":
                        self.__states += [(h, w)]
                    elif maze[h][w] == "T":
                        self.__states += [(h, w)]
                        self.__terminals += [(h, w)]                    

    def try_action(self, state, action):
        if state in self.__terminals:
            return [], [], []
        
        pos_y, pos_x = state
        if action == "U":
            pos_y = max(0, pos_y-1)
        elif action == "D":
            pos_y = min(self.height-1, pos_y+1)
        elif action == "L":
            pos_x = max(0, pos_x-1)
        elif action == "R":
            pos_x = min(self.width-1, pos_x+1)
           
        if self.__maze is not None and self.__maze[pos_y][pos_x] == "X":
            return [state], [1], [self.__reward]

        return [(pos_y, pos_x)], [1], [self.__reward]  # next_states, transition prob, reward

    @property
    def height(self):
        return self.__height
    
    @property
    def width(self):
        return self.__width
    
    @property
    def gamma(self):
        return self.__gamma
    
    @property
    def actions(self):
        return self.__actions
    
    @property
    def states(self):
        return self.__states
    
    @property
    def terminals(self):
        return self.__terminals

    
class Agent:
    def __init__(self, env, theta):
        self._env = env
        
        self._state_values = {s:0 for s in env.states}
        
        self._theta = theta
        
    def print_state_values(self):
        print("state values")
        for h in range(self._env.height):
            sv = []
            for w in range(self._env.width):
                if (h, w) in self._env.states:
                    sv += [f"{self._state_values[(h, w)]:+1.1f}"]
                else:
                    sv += ["-INF"]
            print(sv)
            
    def print_policy(self):
        raise NotImplementedError

            
class PolicyIteration(Agent):
    def __init__(self, env, theta=2):
        super().__init__(env, theta)

        self.set_random_policy()
        
    def set_random_policy(self):
        self._policy = {s:self._env.actions for s in self._env.states}
        
        self.set_terminals(self._env.terminals)
        
        self.cal_action_prob()
    
    def set_terminals(self, terminals):
        for t in terminals:
            self._policy[t] = []
    
    def cal_action_prob(self):
        self._action_prob = {}
        for s in self._env.states:
            self._action_prob[s] = {}
            for a in self._policy[s]:
                self._action_prob[s][a] = 1 / len(self._policy[s])

    def policy_evaluation(self):
        for i in range(10):  # if theta is too small so that it can't converge then stop after 10 iterations
            delta = 0
            tmp_state_values = copy.deepcopy(self._state_values)
            for s in self._env.states:
                action_value = 0
                action_prob = 0
                for a in self._policy[s]:  # try action from the current policy
                    next_states, transition_probs, rewards = self._env.try_action(s, a)
                    action_prob = self._action_prob[s][a]
                    for next_state, trans_prob, reward in zip(next_states, transition_probs, rewards):
                        action_value += trans_prob * (reward + self._env.gamma * self._state_values[next_state])

                tmp_state_values[s] = action_prob * action_value
                delta = max(delta, abs(tmp_state_values[s]-self._state_values[s]))
                
            self._state_values = copy.deepcopy(tmp_state_values)
            
            if delta < self._theta:
                break
        
    def policy_improvement(self):
        policy_stable = True
        for s in self._env.states:
            action_values = {}
            for a in self._env.actions:
                if a in self._policy[s]:
                    next_states, transition_probs, rewards = self._env.try_action(s, a)
                    action_prob = self._action_prob[s][a]
                    for next_state, trans_prob, reward in zip(next_states, transition_probs, rewards):
                        action_values[a] = trans_prob * (reward + self._env.gamma * self._state_values[next_state])
                else:
                    action_values[a] = -np.inf
               
            old_policy = self._policy[s]
            max_action_value = max(action_values.values())
            if max_action_value != -np.inf:
                self._policy[s] = tuple(a for a in self._env.actions if action_values[a]==max_action_value)  # select actions achieving max action value
                if old_policy != self._policy[s]:
                    policy_stable = False
                    
        self.cal_action_prob()
        
        return policy_stable    
     
    def perform(self, iter_num, random_policy=False):
        print("initial")
        self.print_policy()
        self.print_state_values()
            
        for i in range(iter_num):
            print("\nstep: ", i+1)
            self.policy_evaluation()

            self.print_state_values()

            policy_stable = self.policy_improvement()
            self.print_policy()
            
            #self.print_action_prob()
            if random_policy is True:
                self.set_random_policy()
            
            if policy_stable is True:
                break
    
    def print_policy(self):
        print("policy")
        for h in range(self._env.height):
            ss = []
            for w in range(self._env.width):
                if (h, w) in self._env.states:
                    action = "".join([x if x in self._policy[(h, w)] else "_" for x in self._env.actions])
                else:
                    action = "XXXX"

                ss += [action]
            print(ss)

In [None]:
h, w = 4, 4
maze = ["OOOOXXX", 
        "OXOXOTX",
        "OXOXOOX",
        "OOOOOOO",]
env = GridWorld(h, w, terminals=((0, 0), (h-1, w-1)))
#env = GridWorld(h, w, terminals=((h>>1, w>>1), ))
#env = GridWorld(h, w, terminals=((0, 0), (h-1, w-1)), maze=maze)
policy_iteration = PolicyIteration(env)
policy_iteration.perform(30, random_policy=False)

In [None]:
class ValueIteration(Agent):
    def __init__(self, env, theta=0.1):
        super().__init__(env, theta)
        
        self.init_policy()
        
    def init_policy(self):
        self._policy = {s:self._env.actions[0] for s in self._env.states}  # take up action as the default
         
        for t in self._env.terminals:
            print(t)
            self._policy[t] = "_"
    
    def optimize(self, max_iter=30):
        for i in range(max_iter):  # if theta is too small so that it can't converge then stop after max_iter iterations
            delta = 0
            tmp_state_values = copy.deepcopy(self._state_values)
            for s in self._env.states:
                if s in self._env.terminals:
                    continue
                    
                action_values = []
                for a in self._env.actions:
                    next_states, transition_probs, rewards = self._env.try_action(s, a)                        
                    for next_state, trans_prob, reward in zip(next_states, transition_probs, rewards):
                        action_values += [trans_prob * (reward + self._env.gamma * self._state_values[next_state])]

                tmp_state_values[s], self._policy[s] = max(action_values), self._env.actions[np.argmax(action_values)]
                delta = max(delta, abs(tmp_state_values[s]-self._state_values[s]))                
              
            print("\nstep: ", i+1)
            self._state_values = copy.deepcopy(tmp_state_values)
            
            self.print_state_values()
            
            if delta < self._theta:
                break
     
    def perform(self):
        print("initial")
        self.print_state_values()
        self.print_policy()

        self.optimize()

        self.print_policy()
    
    def print_policy(self):
        print("policy")
        for h in range(self._env.height):
            ss = []
            for w in range(self._env.width):
                ss += [self._policy[(h, w)]]
            print(ss)

In [None]:
h, w = 5, 5
env = GridWorld(h, w, terminals=((0, 0), (h-1, w-1)))
#env = GridWorld(h, w, terminals=((h>>1, w>>1), ))

policy_iteration = ValueIteration(env)
policy_iteration.perform()