In [29]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
import matplotlib.pyplot as plt
plt.style.use('seaborn-notebook')
plt.style.use('seaborn-whitegrid')
import matplotlib.colors as mcolors

  plt.style.use('seaborn-notebook')
  plt.style.use('seaborn-whitegrid')


In [30]:
class Action(IntEnum):
    up = 0
    right = 1
    down = 2
    left = 3

action_to_str = {
    Action.up : "up",
    Action.right : "right",
    Action.down : "down",
    Action.left : "left",
}

action_to_offset = {
    Action.up : (-1, 0),
    Action.right : (0, 1),
    Action.down : (1, 0),
    Action.left : (0, -1),
}

In [36]:
class GridWorld:

    def __init__(self, height, width, goal, goal_value=5.0, danger=[], danger_value=-5.0, blocked=[], noise=0.0):
        """
        Initialize the GridWorld environment.
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - goal (int): Index number of goal cell
         - goal_value (float): Reward given for goal cell
         - danger (list of int): Indices of cells marked as danger
         - danger_value (float): Reward given for danger cell
         - blocked (list of int): Indices of cells marked as blocked (can't enter)
         - noise (float): probability of resulting state not being what was expected
        """

        self._width = width
        self._height = height
        self._grid_values = [0 for _ in range(height * width)] # Initialize state values.
        self._goal_value = goal_value
        self._danger_value = danger_value
        self._goal_cell = goal
        self._danger_cells = danger
        self._blocked_cells = blocked
        self._noise = noise # Noise level in the environment.
        assert noise >= 0 and noise < 1 # Ensure valid noise value.
        self.create_next_values() # Initialize the next state values.


    def reset(self):
        """
        Reset the state values to their initial state.
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()


    def _inbounds(self, state):
        """
        Check if a state index is within the grid boundaries.
        """
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        """
        Check if row and column indices are within the grid boundaries.
        """
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        """
        Convert a state index to row and column indices.
        """
        return state // self._width, state % self._width

    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        #TO DO:
        #get the row and column from the state
        state_r, state_c = self._state_to_rc(state)

        #get the offset from action to apply to the row and column of the state
        offset_r, offset_c = action_to_offset[action]

        #get the new row and column after applying the offsets 
        new_r, new_c = state_r + offset_r, state_c + offset_c

        # if the new row and column are in bounds
        if self._inbounds_rc(new_r, new_c) and state not in self._blocked_cells:
            #get the new state from the new row and new column
            new_state = new_r * self._width + new_c
            return new_state
        else:
            return state
        

    def is_terminal(self, state):
        """
        Returns true if a state is terminal (goal, or danger)
        """
        #To Do:
        if state == self._goal_cell or state in self._danger_cells:
            return True
        else:
            return False


    def get_states(self):
        """
        Gets all non-terminal states in the environment
        """
        #TO DO:
        #create a an array to store the non-terminal states
        non_terminal_state = []
        #loop through all states in the grid
        for state in range(len(self._grid_values)):
            #if the state is not terminal then add to non_terminal_state array
            if self.is_terminal(state) == False and state not in self._blocked_cells:
                non_terminal_state.append(state)
        #return non_terminal_state array
        return non_terminal_state
    

    def get_actions(self, state):
        """
        Returns a list of valid actions given the current state
        """
        #TO DO:
        #create an array to store valid actions
        if self.is_terminal(state):
            return []
        else:
            return list(Action)
            
    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        assert self._inbounds(state)
        # Reward is non-zero for danger or goal
        #TO DO:
        #if the state is in a goal cell (non-zero reward) return the reward value
        if state == self._goal_cell:
            return self._goal_value
        #if the state is in a danger cell (non-zero reward) return the reward value
        elif state in self._danger_cells:
            return self._danger_value
        # else return 0 as there is no reward
        else:
            return 0 
        
    def get_transitions(self, state, action):
        """
        Get a list of transitions as a result of attempting the action in the current state
        Each item in the list is a dictionary, containing the probability of reaching that state and the state itself
        """
        transactions = []
        #this is the state that will be used to calculate the probability of reaching the new state
        new_state = self._state_from_action(state, action)

        # if there is no noise then the new state is deterministic 
        if self._noise == 0:
            transactions.append({new_state: 1.0})
        else:
            # we need to calculate the probability of reaching the new state
            noise_prob = self._noise / (len(list(Action)) - 1)
            # add the new state with the probaility of reaching it
            transactions.append({new_state: 1-self._noise})

            # loop through all the other actions and calculate the probability of reaching the new state
            for noise_action in list(Action):                
                # if the noise action is not the same as the action then calculate the probability of reaching the new state
                if noise_action != action:
                    # get the new state from the noise action
                    noisy_state = self._state_from_action(state, noise_action)
                    # add the new state with the probaility of reaching it
                    transactions.append({noisy_state: noise_prob})
        return transactions

    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        #TO DO:
        #create a copy of the grid values
        self._next_values = deepcopy(self._grid_values)



    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        # TO DO:
        #set the grid values to the next values
        self._grid_values = self._next_values
        

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        #TO DO:
        #set the value of the state to the next values
        self._next_values[state] = value


    def solve_linear_system(self, discount_factor=1.0):
        """
        Solve the gridworld using a system of linear equations.
        :param discount_factor: The discount factor for future rewards.
        """
        #To Do:
        #grab every state not just non-teminal states
        # non_terminal_states = self.get_states()
        create_next_values = self.create_next_values()
        states = range(self._height * self._width)
        num_states = len(states)

        A = np.zeros((num_states, num_states))
        b = np.zeros(num_states)

        for state in states:
            reward = self.get_reward(state)
            if self.is_terminal(state):
                A[state, state] = 1
                b[state] = self.get_reward(state)
            else:
                valid_actions = self.get_actions(state)

                for action in valid_actions:
                    transactions = self.get_transitions(state, action)

                    for transaction in transactions:
                        new_state, prob = list(transaction.items())[0]
                        total_prob = prob * 1/(len(list(Action)))

                        A[state, new_state] -= total_prob * discount_factor
                        b[state] += discount_factor * total_prob * self.get_reward(new_state)
            A[state,state] += 1

        values = np.linalg.solve(A, b)
        for i, state in enumerate(states):
            self.set_value(state, values[i])
        
        self.set_next_values()

        return self



    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                if cell in self._blocked_cells:
                    out_str += "{:>6}".format("----")
                elif cell == self._goal_cell:
                    out_str += "{:>6}".format("GOAL")
                elif cell in self._danger_cells:
                    out_str += "{:>6.2f}".format(self._danger_value)
                else:
                    out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str


In [38]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
# Solve the linear system
values_grid = simple_gw.solve_linear_system(discount_factor=0.75)
print(values_grid)


 -0.63  -1.88  -5.00  -1.73   0.00 
 -0.22   ----   ----   0.00   1.73 
 -0.31   ----   ----   0.00   GOAL 
 -1.15  -2.06  -2.33  -5.00  -0.19 
 -2.59  -5.00  -2.86  -2.58  -0.83 



In [33]:
def value_iteration(gw, discount, tolerance=0.1):
    #TO DO


SyntaxError: unexpected EOF while parsing (2476411251.py, line 2)

In [None]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.95
tolerance = 0.1

In [None]:
value_iteration(simple_gw, discount, 0.1)
print(simple_gw)

In [None]:
value_iteration(noisy_gw, discount, 0.1)
print(noisy_gw)

In [None]:
def policy_iteration(gw, discount, tolerance=0.1):
    #TO DO