# Assignment 1: Implementing and Analyzing a Basic RL Algorithm

 **Components of the Markov Decision Process (MDP):**
 1. *States (S):* Each cell in the grid represents a state. The agent can be in any cell except for the blocked cells. Let's denote the states by their grid coordinates (i, j).
 1. *Actions (A):* The agent can move in four directions:
  * North (N)
  * South (S)
  * East (E)
  * West (W)
 1. *Transition Probability (P):* The agent moves in the chosen direction with a probability of 1−noise. With probability noise, it may move in a different direction. For simplicity, let's assume the noise is evenly distributed among the other three directions.
 1. *Reward Function (R):* The reward structure is as follows:
  * Reward of 0 for moving into any non-terminal and non-blocked cell.
  * Reward of -5 for moving into a fire cell (red cell).
  * Reward of +5 for moving into the goal cell (G).
 1. *Discount Factor (𝛾):* The discount factors to be considered are 0.95 and 0.75.
 1. *Termination:* The episode ends when the agent reaches the goal cell (G) or a fire cell (red cell).

In [1]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
import matplotlib.pyplot as plt
plt.style.use('seaborn-notebook')
plt.style.use('seaborn-whitegrid')
import matplotlib.colors as mcolors
from tabulate import tabulate

  plt.style.use('seaborn-notebook')
  plt.style.use('seaborn-whitegrid')


### a. Linear Solver Method

In [2]:
class Action(IntEnum):
    """The class Action represents the set of possible actions in gridworld."""
    up = 0
    right = 1
    down = 2
    left = 3

# String representation for each action saved numerically (easier for human interpertation)
action_to_str = {
    Action.up: "up",
    Action.right: "right",
    Action.down: "down",
    Action.left: "left",
}

# How locations should be updated given any of the 4 actions
action_to_offset = {
    Action.up: (-1, 0),
    Action.right: (0, 1),
    Action.down: (1, 0),
    Action.left: (0, -1),
}

# For printing the found policy
action_symbols = {
    Action.up: "^",
    Action.right: ">",
    Action.down: "v",
    Action.left: "<"
}

In [5]:
class GridWorld:

    def __init__(self, height, width, goal, goal_value=5.0, danger=[], danger_value=-5.0, blocked=[], noise=0.0):
        """
        Initialize the GridWorld environment.
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - goal (int): Index number of goal cell
         - goal_value (float): Reward given for goal cell
         - danger (list of int): Indices of cells marked as danger
         - danger_value (float): Reward given for danger cell
         - blocked (list of int): Indices of cells marked as blocked (can't enter)
         - noise (float): probability of resulting state not being what was expected
        """
        self._width = width
        self._height = height
        self._grid_values = [0 for _ in range(height * width)]
        self._goal_value = goal_value
        self._danger_value = danger_value
        self._goal_cell = goal
        self._danger_cells = danger
        self._blocked_cells = blocked
        self._noise = noise  # Noise level in the environment.
        assert noise >= 0 and noise < 1  # Ensure valid noise value.
        self.create_next_values()  # Initialize the next state values.

    def reset(self):
        """
        Reset the state values to their initial state.
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()

    def _inbounds(self, state):
        """
        Check if a state index is within the grid boundaries.
        """
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        """
        Check if row and column indices are within the grid boundaries.
        """
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        """
        Convert a state index to row and column indices.
        """
        return state // self._width, state % self._width

    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        # Get current state indecies
        row, col = self._state_to_rc(state)
        # Find the new indecies based on this action
        offset_row, offset_col = action_to_offset[action]
        next_row, next_col = row + offset_row, col + offset_col
        # Translate indecies to a state
        next_state = next_row * self._width + next_col
        # Assure the calculated state is acceptable aka. within boundaries and not an obstacle
        if self._inbounds_rc(next_row, next_col):
            if next_state not in self._blocked_cells:
                return next_state
        return state

    def is_terminal(self, state):
        """
        Returns true if a state is terminal (goal, or danger)
        """
        return state == self._goal_cell or state in self._danger_cells

    def get_states(self):
        """
        Gets all non-terminal states in the environment
        """
        return [s for s in range(self._height * self._width) if s not in self._blocked_cells]

    def get_actions(self, state):
        """
        Returns a list of valid actions given the current state
        """
        return [Action.up, Action.right, Action.down, Action.left]

    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        assert self._inbounds(state)
        # Reward is non-zero for danger or goal
        if state in self._danger_cells:
            return self._danger_value
        if state == self._goal_cell:
            return self._goal_value
        return 0

    def get_transitions(self, state, action):
        """
        Get a list of transitions as a result of attempting the action in the current state
        Each item in the list is a dictionary, containing the probability of reaching that state and the state itself
        """
        # When the state is terminal then only one probable action is available
        if self.is_terminal(state):
            return [{'prob': 1.0, 'state': state}]
        transitions = []
        primary_state = self._state_from_action(state, action)
        # Probability that the agent wants to move to state s' and it actually moved there
        transitions.append({'prob': 1 - self._noise, 'state': primary_state})
        if self._noise > 0:
            # from the actions list find valid states that are not s'
            other_actions = [a for a in self.get_actions(state) if a != action]
            noise_prob = self._noise / len(other_actions)
            for other_action in other_actions:
                secondary_state = self._state_from_action(state, other_action)
                # Given the noise: probability that the agent didn't move to the direction in the decision
                transitions.append({'prob': noise_prob, 'state': secondary_state})
        return transitions

    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        self._next_values = self._grid_values.copy()

    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        self._grid_values = self._next_values.copy()

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        self._next_values[state] = value

    def _state_to_index(self, states):
        """Initiate a dictionary maps a state to its index"""
        return {s: idx for idx, s in enumerate(states)}

    def solve_linear_system(self, discount_factor=1.0):
        """
        Solve the gridworld using a system of linear equations.
        :param discount_factor: The discount factor for future rewards.
        """
        # To solve the linear system of equations
        #   find the values of metrices A and B

        # Initialize metrices
        num_states = self._width * self._height
        A = np.zeros((num_states, num_states))
        b = np.zeros(num_states)

        # Assign values to A and B
        for state in range(num_states):
            # When the state is at an obstacle or terminal don't calculate an s(v) it's always the same as reward
            if state in self._blocked_cells or self.is_terminal(state):
                A[state, state] = 1.0
                b[state] = self.get_reward(state)
            else:
              # If it's not terminal nor obstacle, calculate its values
              A[state, state] = 1.0
              for action in self.get_actions(state):
                  # v(s) = P(s'|s,a)(r+)
                  n_actions = len(self.get_actions(state))
                  next_state = self._state_from_action(state, action)
                  reward = self.get_reward(next_state)
                  A[state, next_state] -= discount_factor / n_actions
                  b[state] += reward / n_actions

        # Solve the equation
        V = np.round(np.linalg.solve(A, b), 2)

        for state in range(num_states):
            self.set_value(state, V[state])

        return V.reshape((self._height, self._width))


    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                if cell in self._blocked_cells:
                    out_str += "{:>6}".format("----")
                elif cell == self._goal_cell:
                    out_str += "{:>6}".format("GOAL")
                elif cell in self._danger_cells:
                    out_str += "{:>6.2f}".format(self._danger_value)
                else:
                    out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str

In [53]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)

print("Initial Grid:")
print(simple_gw)

# Solve the linear system
print("Solving with linear solver:")
values_grid = simple_gw.solve_linear_system()
print(values_grid)


Initial Grid:
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

Solving with linear solver:
[[-9.97 -9.99 -5.   -3.33  0.  ]
 [-9.96  0.    0.    0.    3.33]
 [-9.94  0.    0.    0.    5.  ]
 [-9.93 -9.88 -9.71 -5.   -1.6 ]
 [-9.96 -5.   -9.24 -8.02 -4.81]]


In [49]:
print("Initial Grid:")
print(simple_gw)
print("Solving with linear solver: (discount_factor=0.95)")
values_grid = simple_gw.solve_linear_system(discount_factor=0.95)
print(values_grid)


Initial Grid:
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

Solving with linear solver: (discount_factor=0.95)
[[-5.25 -7.02 -5.   -3.2  -0.  ]
 [-4.59  0.    0.   -0.    3.2 ]
 [-4.89  0.    0.   -0.    5.  ]
 [-6.22 -7.62 -8.   -5.   -1.1 ]
 [-7.46 -5.   -7.78 -6.72 -3.54]]


In [56]:
print("Initial Grid:")
print(simple_gw)
print("Solving with linear solver: (discount_factor=0.75)")
values_grid = simple_gw.solve_linear_system(discount_factor=0.75)
print(values_grid)


Initial Grid:
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

Solving with linear solver: (discount_factor=0.75)
[[-1.37 -3.91 -5.   -2.69  0.  ]
 [-0.66  0.    0.    0.    2.69]
 [-0.84  0.    0.    0.    5.  ]
 [-2.14 -4.29 -4.78 -5.   -0.3 ]
 [-4.14 -5.   -4.74 -4.09 -1.32]]




---



### b. Dynamic Programming Solver

In [46]:
def value_iteration(gw, discount, tolerance=0.1):
    """Solve the gridworld using a dynamic programming method (value iteration algorithm)"""
    gw.reset()
    loops = 0
    while True:
        loops += 1
        delta = 0
        gw.create_next_values()
        for state in gw.get_states():
            # If the current state is a terminal with a predefined value, skip it.
            if gw.is_terminal(state):
                continue
            action_values = []
            # Calculate the value function at each action
            #  Consider the current action as the prime action
            for action in gw.get_actions(state):
                action_value = 0
                # Fidn the probabilty of all other actions given the current prime action
                for transition in gw.get_transitions(state, action):
                    next_state = transition['state']
                    prob = transition['prob']
                    reward = gw.get_reward(next_state)
                    # Calculate the current value
                    action_value += prob * (reward + discount * gw.get_value(next_state))
                action_values.append(action_value)
            # Concider the choice of the best action (max value)
            best_value = max(action_values)
            delta = max(delta, abs(best_value - gw.get_value(state)))
            gw.set_value(state, best_value)
        gw.set_next_values()
        # Stop when the values difference is less than tolerance
        if delta < tolerance:
            break
    return loops

def print_policy(gw):
    """Use the values found by the algorithm to find the policy and print the grid with actions choosen"""
    policy_grid = [["\t" for _ in range(gw._width)] for _ in range(gw._height)]
    states = [s for s in range(gw._height * gw._width)]
    for state in states:
        if state in gw._blocked_cells:
            policy_grid[gw._state_to_rc(state)[0]][gw._state_to_rc(state)[1]] = "X"
        elif state == gw._goal_cell:
            policy_grid[gw._state_to_rc(state)[0]][gw._state_to_rc(state)[1]] = "G"
        elif state in gw._danger_cells:
            policy_grid[gw._state_to_rc(state)[0]][gw._state_to_rc(state)[1]] = "D"
        else:
            best_action = None
            best_value = float('-inf')
            for action in gw.get_actions(state):
                action_value = 0
                for transition in gw.get_transitions(state, action):
                    next_state = transition['state']
                    prob = transition['prob']
                    reward = gw.get_reward(next_state)
                    action_value += prob * (reward + discount * gw.get_value(next_state))
                if action_value > best_value:
                    best_value = action_value
                    best_action = action
            policy_grid[gw._state_to_rc(state)[0]][gw._state_to_rc(state)[1]] = action_symbols[best_action]

    for row in policy_grid:
        print(" ".join(row))

In [36]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.95
tolerance = 0.1
results = []  # trial, noise, discount, tolerance, iterations

In [37]:
# Reset and solve using value iteration
simple_gw.reset()
print("Solving with value iteration (discount_factor=1, noise=0.0):")
iterations = value_iteration(simple_gw, discount=1, tolerance=0.1)
print(simple_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(simple_gw)

# Save to a results table
results.append([1, 0.0, 1, tolerance, iterations])

Solving with value iteration (discount_factor=1, noise=0.0):
  5.00   5.00  -5.00   5.00   5.00 
  5.00   ----   ----   5.00   5.00 
  5.00   ----   ----   5.00   GOAL 
  5.00   5.00   5.00  -5.00   5.00 
  5.00  -5.00   5.00   5.00   5.00 

Found in 12 trials.

**----------------------**
^ ^ D ^ ^
^ X X ^ v
^ X X > G
^ ^ ^ D ^
^ D ^ > ^


In [38]:
# Reset and solve using value iteration
simple_gw.reset()
print("Solving with value iteration (discount_factor=0.95, noise=0.0):")
iterations = value_iteration(simple_gw, discount, tolerance=0.1)
print(simple_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(simple_gw)

# Save to a results table
results.append([2, 0.0, discount, tolerance, iterations])

Solving with value iteration (discount_factor=0.95, noise=0.0):
  3.15   2.99  -5.00   4.51   4.75 
  3.32   ----   ----   4.75   5.00 
  3.49   ----   ----   5.00   GOAL 
  3.68   3.87   4.07  -5.00   5.00 
  3.49  -5.00   4.29   4.51   4.75 

Found in 12 trials.

**----------------------**
v < D > v
v X X > v
v X X > G
> > v D ^
^ D > > ^


In [39]:
# Reset and solve using value iteration
simple_gw.reset()
print("Solving with value iteration (discount_factor=0.75, noise=0.0):")
iterations = value_iteration(simple_gw, discount=0.75, tolerance=0.1)
print(simple_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(simple_gw)

results.append([3, 0.0, 0.75, tolerance, iterations])

Solving with value iteration (discount_factor=0.75, noise=0.0):
  0.38   0.28  -5.00   2.81   3.75 
  0.50   ----   ----   3.75   5.00 
  0.67   ----   ----   5.00   GOAL 
  0.89   1.19   1.58  -5.00   5.00 
  0.67  -5.00   2.11   2.81   3.75 

Found in 12 trials.

**----------------------**
v < D > v
v X X > v
v X X > G
> > v D ^
^ D > > ^


For a noisy grid world

In [40]:
print("Solving with value iteration (discount_factor=1, noise=0.2):")
iterations = value_iteration(noisy_gw, discount=1, tolerance=0.1)
print(noisy_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(noisy_gw)

results.append([4, 0.2, 1, tolerance, iterations])

Solving with value iteration (discount_factor=1, noise=0.2):
  1.34   0.78  -5.00   4.21   4.93 
  1.43   ----   ----   4.88   4.99 
  1.46   ----   ----   4.28   GOAL 
  1.48   1.52   2.07  -5.00   4.23 
  0.97  -5.00   2.71   3.40   4.16 

Found in 18 trials.

**----------------------**
v < D > v
v X X > v
v X X > G
> > v D ^
^ D > > ^


In [41]:
print("Solving with value iteration (discount_factor=0.95, noise=0.2):")
iterations = value_iteration(noisy_gw, discount, tolerance=0.1)
print(noisy_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(noisy_gw)
# optimum policy found

results.append([5, 0.2, 0.95, tolerance, iterations])

Solving with value iteration (discount_factor=0.95, noise=0.2):
  0.42  -0.10  -5.00   3.60   4.51 
  0.56   ----   ----   4.49   4.88 
  0.65   ----   ----   4.22   GOAL 
  0.72   0.83   1.40  -5.00   4.17 
  0.23  -5.00   2.09   2.90   3.84 

Found in 15 trials.

**----------------------**
v < D > v
v X X > v
v X X > G
> > v D ^
^ D > > ^


In [42]:
# For a noisy grid world  --trial 2
print("Solving with value iteration (discount_factor=0.75, noise=0.2):")
iterations = value_iteration(noisy_gw, discount=0.75, tolerance=0.1)
print(noisy_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(noisy_gw)
# Bad Policy

results.append([6, 0.2, 0.75, tolerance, iterations])

Solving with value iteration (discount_factor=0.75, noise=0.2):
 -0.02  -0.39  -5.00   1.81   3.12 
 -0.00   ----   ----   3.17   4.54 
 -0.00   ----   ----   4.03   GOAL 
 -0.04  -0.39  -0.08  -5.00   4.00 
 -0.40  -5.00   0.51   1.40   2.74 

Found in 7 trials.

**----------------------**
v < D v v
> X X > v
^ X X > G
^ < v D ^
^ D > > ^


In [43]:
# For a noisy grid world  --trial 3 smaller tolerance
print("Solving with value iteration (discount_factor=0.75, noise=0.2):")
iterations = value_iteration(noisy_gw, discount=0.75, tolerance=0.001)
print(noisy_gw)
print(f"Found in {iterations} trials.\n")
print("**----------------------**")
print_policy(noisy_gw)

results.append([7, 0.2, 0.75, 0.001, iterations])

Solving with value iteration (discount_factor=0.75, noise=0.2):
 -0.02  -0.39  -5.00   1.82   3.13 
 -0.00   ----   ----   3.18   4.54 
 -0.01   ----   ----   4.03   GOAL 
 -0.04  -0.37  -0.03  -5.00   4.00 
 -0.40  -5.00   0.54   1.41   2.75 

Found in 13 trials.

**----------------------**
v < D v v
> X X > v
^ X X > G
^ > v D ^
^ D > > ^


In [44]:
headers = ["#Trial", "Noise", "Discount", "Tolerance", "Iterations"]

# Generate and print the table
print(tabulate(results, headers=headers, tablefmt="grid"))

+----------+---------+------------+-------------+--------------+
|   #Trial |   Noise |   Discount |   Tolerance |   Iterations |
|        1 |     0   |       1    |       0.1   |           12 |
+----------+---------+------------+-------------+--------------+
|        2 |     0   |       0.95 |       0.1   |           12 |
+----------+---------+------------+-------------+--------------+
|        3 |     0   |       0.75 |       0.1   |           12 |
+----------+---------+------------+-------------+--------------+
|        4 |     0.2 |       1    |       0.1   |           18 |
+----------+---------+------------+-------------+--------------+
|        5 |     0.2 |       0.95 |       0.1   |           15 |
+----------+---------+------------+-------------+--------------+
|        6 |     0.2 |       0.75 |       0.1   |            7 |
+----------+---------+------------+-------------+--------------+
|        7 |     0.2 |       0.75 |       0.001 |           13 |
+----------+---------+---

In [45]:
# compare using: different discount factors, noise values, number of iterations, tolerance values