In [152]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
import matplotlib.pyplot as plt
# plt.style.use('seaborn-notebook')
# plt.style.use('seaborn-whitegrid')
import matplotlib.colors as mcolors

In [153]:
class Action(IntEnum):
    up = 0
    right = 1
    down = 2
    left = 3

action_to_str = {
    Action.up : "up",
    Action.right : "right",
    Action.down : "down",
    Action.left : "left",
}

action_to_offset = {
    Action.up : (-1, 0),
    Action.right : (0, 1),
    Action.down : (1, 0),
    Action.left : (0, -1),
}

In [154]:
class GridWorld:

    def __init__(self, height, width, goal, goal_value=5.0, danger=[], danger_value=-5.0, blocked=[], noise=0.0):
        """
        Initialize the GridWorld environment.
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - goal (int): Index number of goal cell
         - goal_value (float): Reward given for goal cell
         - danger (list of int): Indices of cells marked as danger
         - danger_value (float): Reward given for danger cell
         - blocked (list of int): Indices of cells marked as blocked (can't enter)
         - noise (float): probability of resulting state not being what was expected
        """

        self._width = width
        self._height = height
        self._grid_values = [0 for _ in range(height * width)] # Initialize state values.
        self._goal_value = goal_value
        self._danger_value = danger_value
        self._goal_cell = goal
        self._danger_cells = danger
        self._blocked_cells = blocked
        self._noise = noise # Noise level in the environment.
        assert noise >= 0 and noise < 1 # Ensure valid noise value.

    def reset(self):
        """
        Reset the state values to their initial state.
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]

    def get_index_state(self, state):
        """
        Get the index of the state in the grid
        """
        return state[0] * self._width + state[1]

    def _state_reward_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """

        random_noise = np.random.rand()
        actions_list = list(action_to_str.keys())
        action_base = action

        # adding noise to the model, choose random actions with probability < noise
        if random_noise < self._noise:
            action = np.random.choice(actions_list)

        is_action_same = action == action_base
        # get the next state based on the actions table
        next_state_r, next_state_c = (state[0] + action_to_offset[action][0]), (state[1] + action_to_offset[action][1])

        # if the actions leads to out of bounds states then we keep the state we have
        if next_state_r < 0 or next_state_r >= self._height:
            next_state_r = state[0]
        if next_state_c < 0 or next_state_c >= self._width:
            next_state_c = state[1]

        next_state = next_state_r, next_state_c
        # get the index of the next state in the grid to be consistent with the test cases
        next_state_index = self.get_index_state(next_state)

        # if the next state is a blocke cell then next state will still be the current state
        if next_state_index in self._blocked_cells:
            next_state = state
            next_state_index = self.get_index_state(next_state)

        return next_state_index, self.get_reward(next_state_index)

    def is_terminal(self, state):
        """
        Returns true if a state is terminal (goal, or danger)
        """
        return (state == self._goal_cell) or (state in self._danger_cells)

    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        if state == self._goal_cell:
            return self._goal_value
        elif state in self._danger_cells:
            return self._danger_value
        else:
            return 0.0

    def execute_actions(self, state, row, col, discount_factor, A, b):
        """
        Execute the action in the current state
        """
        actions_list = list(action_to_str.keys())
        number_actions = len(actions_list)

        for action in actions_list:
            # for each action we get the next state and the reward and calculate A and b for the linear solver
            next_state_index, reward = self._state_reward_from_action((row, col), action)

            # Based on the equations in the lecture, we substitute to solve for the Value FFunction
            # Here 1/number_actions will be 1/4 but I will use the variable for consistency in case
            # I wanted to add diagonal actions in the future

            A[state, next_state_index] -= ((1/number_actions) * discount_factor)
            b[state] += reward/number_actions

        return A, b

    def solve_linear_system(self, discount_factor=1.0):
        """
        Solve the gridworld using a system of linear equations.
        :param discount_factor: The discount factor for future rewards.
        """
        n_states = self._height * self._width
        A = np.zeros((n_states, n_states))
        b = np.zeros(n_states)

        # looping over the state to calculate the
        for row in range(self._height):
            for col in range(self._width):
                state_index = row * self._width + col

                A[state_index, state_index] = 1.0
                if self.is_terminal(state_index) or (state_index in self._blocked_cells):
                    b[state_index] = self.get_reward(state_index)

                else:
                    A, b = self.execute_actions(state_index, row, col, discount_factor, A, b)

        self._grid_values = np.linalg.solve(A, b)
        return self._grid_values

    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                if cell in self._blocked_cells:
                    out_str += "{:>6}".format("----")
                elif cell == self._goal_cell:
                    out_str += "{:>6}".format("GOAL")
                elif cell in self._danger_cells:
                    out_str += "{:>6}".format("-5.00")
                else:
                    out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        print(out_str)
        return out_str

In [155]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)

# # Solve the linear system
values_grid = simple_gw.solve_linear_system(discount_factor=1)

In [156]:
values_grid.reshape((5, 5))

array([[-9.97136722e+00, -9.98568361e+00, -5.00000000e+00,
        -3.33333333e+00,  6.35389177e-16],
       [-9.95705082e+00,  0.00000000e+00,  0.00000000e+00,
         5.12410627e-16,  3.33333333e+00],
       [-9.94273443e+00,  0.00000000e+00,  0.00000000e+00,
         0.00000000e+00,  5.00000000e+00],
       [-9.92841804e+00, -9.87831067e+00, -9.70651396e+00,
        -5.00000000e+00, -1.60343593e+00],
       [-9.96420902e+00, -5.00000000e+00, -9.24123121e+00,
        -8.01717967e+00, -4.81030780e+00]])

In [157]:
simple_gw.__str__()

 -9.97  -9.99  -5.00  -3.33   0.00 
 -9.96   ----   ----   0.00   3.33 
 -9.94   ----   ----   0.00   GOAL 
 -9.93  -9.88  -9.71  -5.00  -1.60 
 -9.96  -5.00  -9.24  -8.02  -4.81 



' -9.97  -9.99  -5.00  -3.33   0.00 \n -9.96   ----   ----   0.00   3.33 \n -9.94   ----   ----   0.00   GOAL \n -9.93  -9.88  -9.71  -5.00  -1.60 \n -9.96  -5.00  -9.24  -8.02  -4.81 \n'

In [158]:
import numpy as np

def value_iteration(grid, discount, tolerance=0.1):
    """
    Value iteration algorithm for gridworld as follows:
    1. Start with arbitrary (zero) initialized value function.
    2. Loop over each state and do 1 step lookahead
    """

    n_states = grid._height * grid._width
    values = np.zeros(n_states)
    actions_list = list(action_to_str.keys())
    count = 0

    # now we loop indefinetly till delta is less than the tolerance that we pre-specify
    # at each iteration (state) we calculate the new value based on the old ones and the immediate reward
    while True:
        # we now make a copy of the value table to calculate the new values given the old values
        # This is done so that we don't alter the old values during calculation
        value_table_tmp = np.copy(values)
        delta = 0

        # Looping over each state where a state is a tuple of row and column in the grid
        for row in range(grid._height):
            for col in range(grid._width):
                state = row, col
                # getting the index of the state in the grid
                state_index = grid.get_index_state(state)

                # if the state is a termina or blocked then we will not calculate the value from them
                # because if it is a terminal state then the game ends and if it is blocked then
                # you are not allowed to visit it so I wouldn't calculate the value from it
                if grid.is_terminal(state_index) or (state_index in grid._blocked_cells):
                    continue
                else:
                    # According to the value iteration algorithm, we need to make a 1 step lookahead
                    # so at each state we check all available actions and choose the one with the largest value
                    curr_max_value = -np.inf
                    for action in actions_list:
                        next_state_index, reward = grid._state_reward_from_action(state, action)
                        next_state_value = reward + discount * values[next_state_index]
                        curr_max_value = max(curr_max_value, next_state_value)
                    value_table_tmp[state_index] = curr_max_value
                    delta = max(delta, np.abs(value_table_tmp[state_index] - values[state_index]))

        values = value_table_tmp

        count += 1
        if delta < tolerance:
            break

    # Now I add the values of the fire cells as they were not included above since they are terminal states
    # They will just be used in calculating the optimal policy.
    values[grid._goal_cell] = grid._goal_value
    for danger in grid._danger_cells:
        values[danger] = grid._danger_value

    print(f"Number of iterations till reaching tolerance = {count}")
    return values


def update_policy(actions_list, grid, state, value_func, discount):
    max_value = float('-inf')
    best_action = None

    # for the current state, loop over all the actions and choose the one
    # that maximizes the value given reward, gamma and the calculate value function
    for action in actions_list:
      next_state_index, reward = grid._state_reward_from_action(state, action)
      next_value = reward + discount * value_func[next_state_index]

      # find the action with maximum value
      if next_value > max_value:
        max_value = next_value
        best_action = action

    return best_action

def get_optimal_policy(grid, value_func, discount):
    policy = np.zeros((grid._height, grid._width))
    actions_list = list(action_to_str.keys())

    # for each state, find the action with maximum value
    for row in range(grid._height):
        for col in range(grid._width):
            state = (row, col)
            state_index = grid.get_index_state(state)
            if grid.is_terminal(state_index) or (state_index in grid._blocked_cells):
                policy[row, col] = np.nan
            else:
                policy[row, col] = update_policy(actions_list, grid, state, value_func, discount)
    return policy

def print_policy(policy, action_to_str):
    # printing the policy in 2d grid
    action_to_str = {k: v for k, v in action_to_str.items()}
    convert = lambda x: action_to_str[x][0].upper()
    action_to_str[''] = "-"
    str_policy = []

    for i in policy:
        cleanedList = [x if not np.isnan(x) else '' for x in i]
        str_policy.append(list(map(convert, cleanedList)))

    print(np.array(str_policy))

In [159]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.95
tolerance = 0.1

In [160]:
simple_gw.__str__()

  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 



'  0.00   0.00  -5.00   0.00   0.00 \n  0.00   ----   ----   0.00   0.00 \n  0.00   ----   ----   0.00   GOAL \n  0.00   0.00   0.00  -5.00   0.00 \n  0.00  -5.00   0.00   0.00   0.00 \n'

In [161]:
def calculate_policy(discount, tolerance, noise = 0):
  simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=noise)

  value_arr = value_iteration(simple_gw, discount, 0.1);
  print(value_arr.round(2).reshape((5, 5)))
  policy = get_optimal_policy(simple_gw, value_arr, discount);
  print_policy(policy, action_to_str)

In [162]:
%%time
calculate_policy(0.95, 0.1)

Number of iterations till reaching tolerance = 12
[[ 3.15  2.99 -5.    4.51  4.75]
 [ 3.32  0.    0.    4.75  5.  ]
 [ 3.49  0.    0.    5.    5.  ]
 [ 3.68  3.87  4.07 -5.    5.  ]
 [ 3.49 -5.    4.29  4.51  4.75]]
[['D' 'L' '-' 'R' 'D']
 ['D' '-' '-' 'R' 'D']
 ['D' '-' '-' 'R' '-']
 ['R' 'R' 'D' '-' 'U']
 ['U' '-' 'R' 'R' 'U']]
CPU times: user 7.73 ms, sys: 921 µs, total: 8.65 ms
Wall time: 15.3 ms


In [163]:
%%time
calculate_policy(0.75, 0.1)

Number of iterations till reaching tolerance = 12
[[ 0.38  0.28 -5.    2.81  3.75]
 [ 0.5   0.    0.    3.75  5.  ]
 [ 0.67  0.    0.    5.    5.  ]
 [ 0.89  1.19  1.58 -5.    5.  ]
 [ 0.67 -5.    2.11  2.81  3.75]]
[['D' 'L' '-' 'R' 'D']
 ['D' '-' '-' 'R' 'D']
 ['D' '-' '-' 'R' '-']
 ['R' 'R' 'D' '-' 'U']
 ['U' '-' 'R' 'R' 'U']]
CPU times: user 8.09 ms, sys: 0 ns, total: 8.09 ms
Wall time: 8.46 ms


In [164]:
calculate_policy(0.75, 0.1)

Number of iterations till reaching tolerance = 12
[[ 0.38  0.28 -5.    2.81  3.75]
 [ 0.5   0.    0.    3.75  5.  ]
 [ 0.67  0.    0.    5.    5.  ]
 [ 0.89  1.19  1.58 -5.    5.  ]
 [ 0.67 -5.    2.11  2.81  3.75]]
[['D' 'L' '-' 'R' 'D']
 ['D' '-' '-' 'R' 'D']
 ['D' '-' '-' 'R' '-']
 ['R' 'R' 'D' '-' 'U']
 ['U' '-' 'R' 'R' 'U']]


In [165]:
%%time

calculate_policy(0.95, 0.1, 0.1)

Number of iterations till reaching tolerance = 626
[[ 3.15  2.99 -5.    4.51  4.75]
 [ 3.32  0.    0.    4.75  5.  ]
 [ 3.49  0.    0.    5.    5.  ]
 [ 3.68  3.87  4.07 -5.    5.  ]
 [ 3.49 -5.    4.29  4.51  4.75]]
[['D' 'L' '-' 'R' 'D']
 ['D' '-' '-' 'R' 'R']
 ['D' '-' '-' 'R' '-']
 ['R' 'R' 'D' '-' 'U']
 ['U' '-' 'R' 'R' 'U']]
CPU times: user 423 ms, sys: 7.4 ms, total: 430 ms
Wall time: 462 ms


In [None]:
%%time

calculate_policy(0.95, 0.1, 0.2)