**Matthew Woo
20236203**

In [None]:
import numpy as np
from enum import IntEnum
from copy import deepcopy
import matplotlib.pyplot as plt
plt.style.use('seaborn-notebook')
plt.style.use('seaborn-whitegrid')
import matplotlib.colors as mcolors

  plt.style.use('seaborn-notebook')
  plt.style.use('seaborn-whitegrid')


In [None]:
class Action(IntEnum):
    up = 0
    right = 1
    down = 2
    left = 3

action_to_str = {
    Action.up : "up",
    Action.right : "right",
    Action.down : "down",
    Action.left : "left",
}

action_to_offset = {
    Action.up : (-1, 0),
    Action.right : (0, 1),
    Action.down : (1, 0),
    Action.left : (0, -1),
}

In [None]:
class GridWorld:

    def __init__(self, height, width, goal, goal_value=5.0, danger=[], danger_value=-5.0, blocked=[], noise=0.0):
        """
        Initialize the GridWorld environment.
        Creates a gridworld like MDP
         - height (int): Number of rows
         - width (int): Number of columns
         - goal (int): Index number of goal cell
         - goal_value (float): Reward given for goal cell
         - danger (list of int): Indices of cells marked as danger
         - danger_value (float): Reward given for danger cell
         - blocked (list of int): Indices of cells marked as blocked (can't enter)
         - noise (float): probability of resulting state not being what was expected
        """

        self._width = width
        self._height = height
        self._grid_values = [0 for _ in range(height * width)] # Initialize state values.
        self._goal_value = goal_value
        self._danger_value = danger_value
        self._goal_cell = goal
        self._danger_cells = danger
        self._blocked_cells = blocked
        self._noise = noise # Noise level in the environment.
        assert noise >= 0 and noise < 1 # Ensure valid noise value.
        self.create_next_values() # Initialize the next state values.


    def reset(self):
        """
        Reset the state values to their initial state.
        """
        self._grid_values = [0 for _ in range(self._height * self._width)]
        self.create_next_values()


    def _inbounds(self, state):
        """
        Check if a state index is within the grid boundaries.
        """
        return state >= 0 and state < self._width * self._height

    def _inbounds_rc(self, state_r, state_c):
        """
        Check if row and column indices are within the grid boundaries.
        """
        return state_r >= 0 and state_r < self._height and state_c >= 0 and state_c < self._width

    def _state_to_rc(self, state):
        """
        Convert a state index to row and column indices.
        """
        return state // self._width, state % self._width

    def _state_from_action(self, state, action):
        """
        Gets the state as a result of applying the given action
        """
        #TO DO:
        #[1,2,3,4,5,6,7,8,9,10,11,12]
        a = self.get_actions(state)

        if action in a:
            if action==0:
                return state-self._width
            elif action==1:
                return state+1
            elif action==2:
                return state+self._width
            else:
                return state-1
        return state

    def is_terminal(self, state):
        """
        Returns true if a state is terminal (goal, or danger)
        """
        #To Do:
        return (state == self._goal_cell) or (state in self._danger_cells)

    def get_states(self):
        """
        Gets all non-terminal states in the environment
        """
        #TO DO:
        non_term = []
        for x in range(self._width*self._height):
            if not self.is_terminal(x) and x not in self._blocked_cells:
                non_term.append(x)
        return non_term

    def get_actions(self, state):
        """
        Returns a list of valid actions given the current state
        """
        #TO DO:
        #Returns only actions that go to non block states

        #Obtaining row and col of all block states
        block = []
        for i in self._blocked_cells:
            r,c = self._state_to_rc(i)
            block.append([r,c])

        #Loop through all actions, check if the taking action is within bounds
        valid_act = []
        r,c = self._state_to_rc(state)
        for x in range(0,4):
            a = action_to_offset[x]
            if self._inbounds_rc(r+a[0],c+a[1]) and [r+a[0],c+a[1]] not in block:
                valid_act.append(x)
        return valid_act


    def get_reward(self, state):
        """
        Get the reward for being in the current state
        """
        assert self._inbounds(state)
        # Reward is non-zero for danger or goal
        #TO DO:
        if state == self._goal_cell:
            return 5
        elif state in self._danger_cells:
            return -5
        return 0

    def get_transitions(self, state, action):
        """
        Get a list of transitions as a result of attempting the action in the current state
        Each item in the list is a dictionary, containing the probability of reaching that state and the state itself
        """
        #TO DO:
        #Loops through all actions, transition that uses the given action has a probability of 1-noise
        #other actions get probability of noise/3
        #All probabilities stored in separete dictionaries with key/values state/state to transition to and prob/probability to transition
        trans = []
        for x in range(0,4):
            p = self._noise/3.0
            if x==action:
                p = 1-self._noise
            s = self._state_from_action(state,x)
            trans.append({"state":s,"prob":p})
        return trans



    def get_value(self, state):
        """
        Get the current value of the state
        """
        assert self._inbounds(state)
        return self._grid_values[state]

    def create_next_values(self):
        """
        Creates a temporary storage for state value updating
        If this is not used, then asynchronous updating may result in unexpected results
        To use properly, run this at the start of each iteration
        """
        #TO DO:
        self.temp = self._grid_values.copy()


    def set_next_values(self):
        """
        Set the state values from the temporary copied values
        To use properly, run this at the end of each iteration
        """
        # TO DO:
        self._grid_values = self.temp.copy()

    def set_value(self, state, value):
        """
        Set the value of the state into the temporary copy
        This value will not update into main storage until self.set_next_values() is called.
        """
        assert self._inbounds(state)
        #TO DO:
        self.temp[state] = value

    def solve_linear_system(self, discount_factor=1.0):
        """
        Solve the gridworld using a system of linear equations.
        :param discount_factor: The discount factor for future rewards.
        """
        #To Do:
        #Ax = B
        A = []
        B = []

        for j in self.get_states():
            l = []
            states = []
            rew = 0
            #Sums up the rewards of the current value state equation and appends it to B
            for r in range(4):
                curstate = self._state_from_action(j,r)
                states.append(curstate)
                rew+=0.25*(self.get_reward(curstate))
            B.append(rew)

            #Calculates the coeffiecients for each state in the current value state equation
            for s in self.get_states():
                if s == j:
                    l.append(-1*0.25*discount_factor*(states.count(s)) + 1)
                elif(s in states):
                    l.append(-1*0.25*discount_factor)
                else:
                    l.append(0)
            A.append(l)

        #Obtain the inverse of A and multiply both side by it
        #A^-1 * A * x = A^-1 * B
        #x = A^-1 * B
        A = np.linalg.inv(np.array(A))
        B=np.array(B)
        x=np.dot(A,B)
        count = 0

        #Set values for the grid
        for i in self.get_states():
              self._grid_values[i] = x[count]
              count+=1
        return self


    def __str__(self):
        """
        Pretty print the state values
        """
        out_str = ""
        for r in range(self._height):
            for c in range(self._width):
                cell = r * self._width + c
                if cell in self._blocked_cells:
                    out_str += "{:>6}".format("----")
                elif cell == self._goal_cell:
                    out_str += "{:>6}".format("GOAL")
                elif cell in self._danger_cells:
                    out_str += "{:>6.2f}".format(self._danger_value)
                else:
                    out_str += "{:>6.2f}".format(self._grid_values[cell])
                out_str += " "
            out_str += "\n"
        return out_str

In [None]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)

# Solve the linear system
values_grid = simple_gw.solve_linear_system(discount_factor=0.95)
print(values_grid)

 -2.69  -3.60  -5.00  -1.64   0.00 
 -2.35   ----   ----   0.00   1.64 
 -2.51   ----   ----   0.00   GOAL 
 -3.19  -3.91  -4.10  -5.00  -0.57 
 -3.82  -5.00  -3.99  -3.45  -1.82 



In [None]:
simple_gw = GridWorld(height=9, width=9, goal=25, danger=[1,7,20,21,22,23,24,26], blocked=[2,6,7,8,9,60], noise=0.0)
values_grid = simple_gw.solve_linear_system(discount_factor=0.95)
print(values_grid)

 -4.35  -5.00   ----  -3.11  -3.04  -2.95   ----   ----   ---- 
  ----  -3.92  -4.05  -3.83  -3.71  -3.47  -2.70   0.07  -2.35 
 -2.32  -3.27  -5.00  -5.00  -5.00  -5.00  -5.00   GOAL  -5.00 
 -1.85  -2.28  -2.96  -3.15  -3.14  -2.93  -2.24   0.10  -1.93 
 -1.34  -1.52  -1.78  -1.89  -1.88  -1.71  -1.33  -0.69  -1.02 
 -0.94  -1.01  -1.11  -1.16  -1.15  -1.07  -0.95  -0.65  -0.66 
 -0.66  -0.69  -0.73  -0.75  -0.74  -0.71   ----  -0.45  -0.45 
 -0.49  -0.50  -0.51  -0.52  -0.50  -0.45  -0.34  -0.33  -0.33 
 -0.41  -0.41  -0.42  -0.41  -0.40  -0.36  -0.31  -0.29  -0.28 



In [None]:
def value_iteration(gw, discount, tolerance=0.1):

    maxvalue = tolerance+1
    iterations = 0
    while (maxvalue >= tolerance):
        print(iterations)
        print(gw)
        iterations+=1
        maxvalue = 0
        #Check every single non terminal state
        #For every action get all transitions and sum them up and save the action with largest sum
        for x in range(len(gw._grid_values)):
            if x in gw._blocked_cells or gw.is_terminal(x):
                continue

            maxaction = -1*10^9
            for i in range(0,4):
                value = 0
                for t in gw.get_transitions(x,i):
                    value += t["prob"]*(gw.get_reward(t["state"])+discount*gw.get_value(t["state"]))
                maxaction = max(maxaction,value)

            gw.set_value(x,maxaction)

            #Store the state with highest value for tolerance comparison
            maxvalue = max(maxvalue,abs(gw.get_value(x)-maxaction))
        gw.set_next_values()
    print(iterations)

In [None]:
# Initialize your GridWorld
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.95
tolerance = 0.1

In [None]:
value_iteration(simple_gw, discount, 0.1)
print(simple_gw)

0
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

1
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   0.00   0.00 

2
  0.00   0.00  -5.00   0.00   4.75 
  0.00   ----   ----   4.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   0.00   4.75 

3
  0.00   0.00  -5.00   4.51   4.75 
  0.00   ----   ----   4.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   4.51   4.75 

4
  0.00   0.00  -5.00   4.51   4.75 
  0.00   ----   ----   4.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   4.29   4.51   4.75 

5
  0.00   0.00  -5.00   4.51   4.75 
  0.00   ----   ----   4.75   5.00 
  0.00   --

In [None]:
value_iteration(noisy_gw, discount, 0.1)
print(noisy_gw)

0
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

1
  0.00  -0.33  -5.00  -0.33   0.00 
  0.00   ----   ----   0.00   4.00 
  0.00   ----   ----   3.67   GOAL 
  0.00  -0.33  -0.33  -5.00   3.67 
 -0.33  -5.00  -0.33  -0.33   0.00 

2
 -0.02  -0.38  -5.00  -0.35   3.02 
  0.00   ----   ----   3.25   4.25 
  0.00   ----   ----   3.90   GOAL 
 -0.04  -0.38  -0.63  -5.00   3.90 
 -0.38  -5.00  -0.63  -0.38   2.77 

3
 -0.03  -0.40  -5.00   2.31   3.59 
 -0.00   ----   ----   3.66   4.67 
 -0.00   ----   ----   4.12   GOAL 
 -0.05  -0.43  -0.70  -5.00   4.09 
 -0.41  -5.00  -0.70   1.70   3.29 

4
 -0.03  -0.40  -5.00   2.82   4.15 
 -0.00   ----   ----   4.19   4.76 
 -0.00   ----   ----   4.16   GOAL 
 -0.06  -0.44  -0.75  -5.00   4.13 
 -0.42  -5.00   0.87   2.23   3.63 

5
 -0.03  -0.41  -5.00   3.29   4.32 
 -0.00   ----   ----   4.32   4.83 
 -0.01   --

In [None]:
simple_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.0)
noisy_gw = GridWorld(height=5, width=5, goal=14, danger=[2, 18, 21], blocked=[6, 7, 11, 12], noise=0.2)
discount = 0.75
tolerance = 0.1

In [None]:
value_iteration(simple_gw, discount, 0.1)
print(simple_gw)

0
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

1
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   0.00   0.00 

2
  0.00   0.00  -5.00   0.00   3.75 
  0.00   ----   ----   3.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   0.00   3.75 

3
  0.00   0.00  -5.00   2.81   3.75 
  0.00   ----   ----   3.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   0.00   2.81   3.75 

4
  0.00   0.00  -5.00   2.81   3.75 
  0.00   ----   ----   3.75   5.00 
  0.00   ----   ----   5.00   GOAL 
  0.00   0.00   0.00  -5.00   5.00 
  0.00  -5.00   2.11   2.81   3.75 

5
  0.00   0.00  -5.00   2.81   3.75 
  0.00   ----   ----   3.75   5.00 
  0.00   --

In [None]:
value_iteration(noisy_gw, discount, 0.1)
print(noisy_gw)

0
  0.00   0.00  -5.00   0.00   0.00 
  0.00   ----   ----   0.00   0.00 
  0.00   ----   ----   0.00   GOAL 
  0.00   0.00   0.00  -5.00   0.00 
  0.00  -5.00   0.00   0.00   0.00 

1
  0.00  -0.33  -5.00  -0.33   0.00 
  0.00   ----   ----   0.00   4.00 
  0.00   ----   ----   3.67   GOAL 
  0.00  -0.33  -0.33  -5.00   3.67 
 -0.33  -5.00  -0.33  -0.33   0.00 

2
 -0.02  -0.37  -5.00  -0.35   2.38 
  0.00   ----   ----   2.57   4.20 
  0.00   ----   ----   3.85   GOAL 
 -0.03  -0.37  -0.57  -5.00   3.85 
 -0.37  -5.00  -0.57  -0.37   2.18 

3
 -0.02  -0.38  -5.00   1.31   2.74 
 -0.00   ----   ----   2.82   4.46 
 -0.00   ----   ----   3.99   GOAL 
 -0.04  -0.40  -0.61  -5.00   3.97 
 -0.39  -5.00  -0.61   0.93   2.51 

4
 -0.02  -0.38  -5.00   1.56   3.01 
 -0.00   ----   ----   3.08   4.50 
 -0.00   ----   ----   4.01   GOAL 
 -0.04  -0.41  -0.63  -5.00   3.99 
 -0.40  -5.00   0.16   1.19   2.68 

5
 -0.02  -0.38  -5.00   1.74   3.08 
 -0.00   ----   ----   3.13   4.53 
 -0.00   --

In [None]:
def policy_iteration(gw, discount, tolerance=0.1):
    #TO DO