In [1]:
import numpy as np
from enum import Enum
from typing import Tuple

In [2]:
class Action(Enum):
    NORTH = 0
    SOUTH = 1
    EAST = 2
    WEST = 3

In [11]:
class Environment():
    def __init__(self, height: int, width: int):
        self.height = height
        self.width = width
        self.rewards = -1 * np.ones((height, width))
        self.policy = 0.25 * np.ones((height, width, len(Action)))
        self.value = np.zeros((height, width))

        # Zero reward on terminal.
        self.rewards[0, 0] = 0
        self.rewards[-1, -1] = 0

    def print_value(self):
        print(f"{self.value}\n")

    def print_policy(self):
        for i in range(self.height):
            str = "|"
            for j in range(self.width):
                str += "N" if self.policy[i, j, Action.NORTH.value] != 0 else " "
                str += "S" if self.policy[i, j, Action.SOUTH.value] != 0 else " "
                str += "E" if self.policy[i, j, Action.EAST.value] != 0 else " "
                str += "W" if self.policy[i, j, Action.WEST.value] != 0 else " "
                
                if (j == self.width - 1):
                    str += "|"
                    print(str)
                else:
                    str += "|"
        print()

    def __next_state(self, i: int, j: int, action: Action) -> Tuple[int, int]:
        # Move to next state based on action. Constrain if off border.
        if action == Action.NORTH:
            next_h = max(0, i - 1)
            next_j = j
        elif action == Action.SOUTH:
            next_h = min(self.height - 1, i + 1)
            next_j = j
        elif action == Action.EAST:
            next_h = i
            next_j = min(self.width - 1, j + 1)
        else:
            next_h = i
            next_j = max(0, j - 1)

        return next_h, next_j


    def __bellman_lookahead(self, values: np.ndarray, i: int, j: int, gamma: int) -> float:
        # Terminal state has 0 reward.
        if (i == 0 and j == 0) or (i == self.height -1 and j == self.width - 1):
            return 0

        total = 0

        # One step lookahead.
        for action in Action:
            next_i, next_j = self.__next_state(i, j, action)
            total += self.policy[i, j, action.value] * (self.rewards[next_i, next_j] + gamma * values[next_i, next_j])

        return total

    def __policy_evaluation(self, gamma: float, theta: float) -> int:
        counter = 0
        # Keep updating values until they converge based on policy.
        while True:
            delta = 0
            old_values = self.value.copy()
            for i in range(self.height):
                for j in range(self.width):
                    value = self.value[i, j]
                    self.value[i, j] = self.__bellman_lookahead(old_values, i, j, gamma)
                    delta = max(delta, abs(value - self.value[i, j]))

            counter += 1
            if delta < theta:
                break
        
        return counter        

    def __policy_improvement(self, gamma: float) -> bool:
        # Find best actions for each state.
        policy_stable = True
        for i in range(self.height):
            for j in range(self.width):
                policy = self.policy[i, j, :].copy()

                max_value = None
                best_actions = []

                for action in Action:
                    next_i, next_j = self.__next_state(i, j, action)
                    value = self.rewards[next_i, next_j] + gamma * self.value[next_i, next_j]

                    if max_value is None:
                        max_value = value
                        best_actions.append(action)
                    elif value > max_value:
                        max_value = value
                        best_actions = [action]
                    elif value == max_value:
                        best_actions.append(action)

                # Divide probability over tied best actions.
                new_prob = 1 / len(best_actions)
                for action in Action:
                    self.policy[i, j, action.value] = new_prob if action in best_actions else 0

                if not (self.policy[i, j, :] == policy).all():
                    policy_stable = False

        return policy_stable

    def policy_iteration(self, gamma: float = 1, theta: float = 0.1):
        print("Starting Policy Iteration")
        counter = 0
        while True:
            counter += 1
            eval_count = self.__policy_evaluation(gamma, theta)
            print(f"Policy evaluation for improvement iteration {counter} took {eval_count} iterations to converge.")
            if self.__policy_improvement(gamma):
                break
        
        print(f"{counter} policy improvement iterations for policy convergence.")
        print("Value Function:")
        self.print_value()
        print("Policy:")
        self.print_policy()

    def __optimal_bellman(self, values: np.ndarray, i: int, j: int, gamma: float) -> float:
        # Terminal state has 0 reward.
        if (i == 0 and j == 0) or (i == self.height -1 and j == self.width - 1):
            return 0

        max_value = None

        # One step lookahead.
        for action in Action:
            next_i, next_j = self.__next_state(i, j, action)
            value = self.policy[i, j, action.value] * (self.rewards[next_i, next_j] + gamma * values[next_i, next_j])
            if max_value == None or value > max_value:
                max_value = value

        return max_value

    def value_iteration(self, gamma: float = 1, theta: float = 0.000001):
        print("Starting Value Iteration")
        counter = 0
        # Keep updating values until they converge.
        while True:
            delta = 0
            old_values = self.value.copy()
            for i in range(self.height):
                for j in range(self.width):
                    value = self.value[i, j]
                    self.value[i, j] = self.__optimal_bellman(old_values, i, j, gamma)
                    delta = max(delta, abs(value - self.value[i, j]))

            counter += 1
            if delta < theta:
                break

        self.__policy_improvement(gamma)
        
        print(f"{counter} iterations for value iteration to converge.")
        print("Value Function:")
        self.print_value()
        print("Policy:")
        self.print_policy()
                    

In [12]:
policy_env = Environment(4, 4)
policy_env.policy_iteration()

value_env = Environment(4, 4)
value_env.value_iteration()

policy_env = Environment(7, 17)
policy_env.policy_iteration()

value_env = Environment(7, 17)
value_env.value_iteration()

Starting Policy Iteration
Policy evaluation for improvement iteration 1 took 46 iterations to converge.
Policy evaluation for improvement iteration 2 took 4 iterations to converge.
Policy evaluation for improvement iteration 3 took 1 iterations to converge.
3 policy improvement iterations for policy convergence.
Value Function:
[[ 0.  0. -1. -2.]
 [ 0. -1. -2. -1.]
 [-1. -2. -1.  0.]
 [-2. -1.  0.  0.]]

Policy:
|N  W|   W|   W| S W|
|N   |N  W|NSEW| S  |
|N   |NSEW| SE | S  |
|N E |  E |  E | SE |

Starting Value Iteration
3 iterations for value iteration to converge.
Value Function:
[[ 0.      0.     -0.25   -0.3125]
 [ 0.     -0.25   -0.3125 -0.25  ]
 [-0.25   -0.3125 -0.25    0.    ]
 [-0.3125 -0.25    0.      0.    ]]

Policy:
|N  W|   W|   W| S W|
|N   |N  W|NSEW| S  |
|N   |NSEW| SE | S  |
|N E |  E |  E | SE |

Starting Policy Iteration
Policy evaluation for improvement iteration 1 took 685 iterations to converge.
Policy evaluation for improvement iteration 2 took 14 iterations