# Second practical exercise: Grid World and Value iteration

Repo: https://github.com/KRLGroup/RL_2025.git

# A deterministic grid world

Finite grid with some obstacles inside. The agent can move up, left, right and down.

![](imgs/grid_world.png)

In [1]:
%pip install gymnasium

Note: you may need to restart the kernel to use updated packages.


In [2]:
#import
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import random

In [6]:

# custom 2d grid world enviroment
class GridWorld(gym.Env):
    metadata = {'render.modes': ['console']}

    
    # actions available
    UP = 0
    LEFT = 1
    DOWN = 2
    RIGHT = 3


    def __init__(self, width, height):
        super(GridWorld, self).__init__()
        self.ACTION_NAMES = ["UP", "LEFT", "DOWN", "RIGHT"]
        self.num_actions = 4

        self.size = width * height  # size of the grid world
        self.num_states = self.size
        self.width = width
        self.height = height
        self.num_obstacles = int((width+height)/2)
        self.end_state = np.array([random.randrange(height) , random.randrange(width)], dtype=np.uint8) # goal state = bottom right cell
        
        while self.end_state[0] == 0 and self.end_state[1] == 0:
                self.end_state = np.array([random.randrange(height) , random.randrange(width)], dtype=np.uint8)
    
        

        # actions of agents : up, down, left and right
        self.action_space = spaces.Discrete(4)
        # observation : cell indices in the grid
        self.observation_space = spaces.MultiDiscrete([self.height, self.width])

        self.obstacles = np.zeros((height, width))

        for i in range(self.num_obstacles):
            obstacle = random.randrange(height) , random.randrange(width)
            while obstacle in [(0, 0),tuple(self.end_state)]:
                obstacle = random.randrange(height), random.randrange(width)
            self.obstacles[obstacle] = 1

        self.num_steps = 0
        self.max_steps = height*width

        self.current_state = np.zeros((2), np.uint8)#init state = [0,0]

        self.directions = np.array([
            [-1,0], #UP
            [0,-1], #LEFT
            [1,0], #DOWN
            [0,1] #RIGHT
        ])
        
    def step(self, action):
        s_prime = self.transition_function(self.current_state, action)
        reward = self.reward_function(s_prime)
        terminated, truncated = self.termination_condition(s_prime)

        self.current_state = s_prime
        self.num_steps += 1

        return self.current_state, reward, terminated, truncated, None
    
    
    def transition_function(self, s, a): # TODO
        
        # Q1
        # (a) -----------------------------------------
        # s_prime = s + a
        # if (s_prime < 0).any(): return s
        # if s_prime[0] >= self.width: return s    
        # if s_prime[1] >= self.height: return s 
        # if self.obstacles[s_prime[0], s_prime[1]] == 1: return s
        
        # (b) -----------------------------------------
        # s_prime = s + self.directions[a]
        # if (s_prime < 0).any(): return s_prime
        # if s_prime[0] <= self.height: return s  
        # if s_prime[1] <= self.width: return s 
        # if self.obstacles[s_prime[0], s_prime[1]] == 1: return s
        
        # (c) -----------------------------------------
        s_prime = s + self.directions[a]
        if (s_prime < 0).any(): return s
        if s_prime[0] >= self.height: return s
        if s_prime[1] >= self.width: return s
        if self.obstacles[s_prime[0], s_prime[1]] == 1: return s
        
        
        return s_prime

    
    def reward_function(self,s): # TODO
        
        # Q2
        # (a) -----------------------------------------
        # r = 0
        # if (s != self.end_state).all():
        #     r = 1

        # (b) -----------------------------------------
        r = 0
        if (s == self.end_state).all():
            r = 1

        # (c) -----------------------------------------
        # r = 1               
        # if (s == self.end_state).all():
        #     r = 0             

        return r

    def termination_condition(self, s):
        truncated = False
        terminated = False

        # Q3
        # (a)
        truncated = self.num_steps >= self.max_steps
        # (b) 
        # truncated = self.num_steps <= self.max_steps
        # (c) 
        # truncated = self.num_steps > 5
        #-----------------------------------------------------

        # Q4
        # (a) 
        # terminated = (s != self.end_state).any() 
        # (b) 
        # terminated = (s == self.end_state).any()
        # (c) 
        terminated = (s == self.end_state).all()

        return terminated, truncated
    
    def transition_probabilities(self, s, a):
        prob_next_state = np.zeros((self.height, self.width))
        s_prime = self.transition_function(s, a)

        prob_next_state[s_prime[0], s_prime[1]] = 1.0

        return prob_next_state#.flatten()
    
    def reset(self):
        self.current_state = np.zeros((2), np.uint8)
        self.num_steps = 0

        return self.current_state
    
    def reward_probabilities(self):
        rewards = np.zeros((self.num_states))
        i = 0
        for r in range(self.height):
            for c in range(self.width):
                state = np.array([r,c], dtype=np.uint8)
                rewards[i] = self.reward_function(state)
                i+=1

        return rewards
    
    def render(self):
        '''
            render the state
        '''

        row = self.current_state[0]
        col = self.current_state[1]

        for r in range(self.height):
            for c in range(self.width):
                if r == row and c == col:
                    print("| A ", end='')
                elif r == self.end_state[0] and c == self.end_state[1]:
                    print("| G ", end='')
                else:
                    if self.obstacles[r,c] == 1:
                        print('|///', end='')
                    else:
                        print('|___', end='')
            print('|')
        print('\n')

Simulate all the four actions

In [7]:
env = GridWorld(3,5)
env.reset()
env.render()

action_sequence = [0,1,2,3]

for a in action_sequence:
    print(env.ACTION_NAMES[a])
    env.step(a)
    env.render()

| A |___|///|
|___|///|///|
|___|___|___|
|___|///|___|
|___| G |___|


UP
| A |___|///|
|___|///|///|
|___|___|___|
|___|///|___|
|___| G |___|


LEFT
| A |___|///|
|___|///|///|
|___|___|___|
|___|///|___|
|___| G |___|


DOWN
|___|___|///|
| A |///|///|
|___|___|___|
|___|///|___|
|___| G |___|


RIGHT
|___|___|///|
| A |///|///|
|___|___|___|
|___|///|___|
|___| G |___|




Simulate a random episode

In [5]:
done = False
env.reset()
while not done:
    action = env.action_space.sample()
    print(env.ACTION_NAMES[action])
    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    env.render()


UP
| A |___|___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


DOWN
|___|___|___|
| A |___|___|
|___|___|///|
|///|___|///|
|___|///| G |


RIGHT
|___|___|___|
|___| A |___|
|___|___|///|
|///|___|///|
|___|///| G |


LEFT
|___|___|___|
| A |___|___|
|___|___|///|
|///|___|///|
|___|///| G |


UP
| A |___|___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


UP
| A |___|___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


DOWN
|___|___|___|
| A |___|___|
|___|___|///|
|///|___|///|
|___|///| G |


UP
| A |___|___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


RIGHT
|___| A |___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


RIGHT
|___|___| A |
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


LEFT
|___| A |___|
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


RIGHT
|___|___| A |
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


UP
|___|___| A |
|___|___|___|
|___|___|///|
|///|___|///|
|___|///| G |


UP
|_

## A non deterministic grid world

The agent goes with probability p to the right cell, with probability 1 - p in a different cell

In [8]:
class NonDeterministicGridWorld(GridWorld):
    def __init__(self, width, height, p=0.8):
        super(NonDeterministicGridWorld, self).__init__(width, height)
        self.probability_right_action = p

    def transition_function(self, s, a):
        s_prime = s + self.directions[a, :]

        #with probability 1 - p diagonal movement
        if random.random() <= 1 - self.probability_right_action:
            if random.random() < 0.5:
                s_prime = s_prime + self.directions[(a+1)%self.num_actions, :]
            else:
                s_prime = s_prime + self.directions[(a-1)%self.num_actions, :]


        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all():
            if self.obstacles[s_prime[0], s_prime[1]] == 0 :
                return s_prime

        return s

    def transition_probabilities(self, s, a):
        cells = []
        probs = []
        prob_next_state = np.zeros((self.height, self.width))
        s_prime_right =  s + self.directions[a, :]
        if s_prime_right[0] < self.height and s_prime_right[1] < self.width and (s_prime_right >= 0).all():
            if self.obstacles[s_prime_right[0], s_prime_right[1]] == 0 :
                prob_next_state[s_prime_right[0], s_prime_right[1]] = self.probability_right_action
                cells.append(s_prime_right)
                probs.append(self.probability_right_action)

        s_prime = s_prime_right + self.directions[(a + 1) % self.num_actions, :]
        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all():
            if self.obstacles[s_prime[0], s_prime[1]] == 0 :
                prob_next_state[s_prime[0], s_prime[1]] = (1 - self.probability_right_action) / 2
                cells.append(s_prime.copy())
                probs.append((1 - self.probability_right_action) / 2)

        s_prime = s_prime_right + self.directions[(a - 1) % self.num_actions, :]
        if s_prime[0] < self.height and s_prime[1] < self.width and (s_prime >= 0).all():
            if self.obstacles[s_prime[0], s_prime[1]] == 0 :
                prob_next_state[s_prime[0], s_prime[1]] = (1 - self.probability_right_action) / 2
                cells.append(s_prime.copy())
                probs.append((1 - self.probability_right_action) / 2)

        #normalization
        sump = sum(probs)
        #for cell in cells:
        #    prob_next_state[cell[0], cell[1]] /= sump
        prob_next_state[s[0], s[1]] = 1 - sump
        return prob_next_state


Simulate a random episode

In [9]:
env = NonDeterministicGridWorld(3,5)
state = env.reset()
env.render()
#next state if we start from state 0,0 and we do action down
next_state_prob = env.transition_probabilities(state, 2)
print(next_state_prob)
print()
print(env.reward_probabilities())

| A | G |___|
|///|___|___|
|___|___|___|
|///|///|___|
|___|___|___|


[[0.9 0.  0. ]
 [0.  0.1 0. ]
 [0.  0.  0. ]
 [0.  0.  0. ]
 [0.  0.  0. ]]

[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [8]:
done = False
while not done:
    action = env.action_space.sample()
    print(env.ACTION_NAMES[action])
    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    env.render()

DOWN
|___|___|___|
| A |___|___|
|///|___|___|
|___|___|///|
|///|___| G |


LEFT
|___|___|___|
| A |___|___|
|///|___|___|
|___|___|///|
|///|___| G |


UP
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


LEFT
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


RIGHT
|___| A |___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


LEFT
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


DOWN
|___|___|___|
| A |___|___|
|///|___|___|
|___|___|///|
|///|___| G |


UP
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


RIGHT
|___| A |___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


LEFT
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


UP
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


UP
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


UP
| A |___|___|
|___|___|___|
|///|___|___|
|___|___|///|
|///|___| G |


RIGHT
|

VALUE ITERATION

![](imgs/value_iteration_1.png)

![](imgs/value_iteration_2.png)

![](imgs/value_iteration_3.png)

![](imgs/value_iteration_4.png)

![](imgs/value_iteration_5.png)

![](imgs/value_iteration_6.png)

![](imgs/value_iteration.png)

In [10]:
def value_iteration(env, gamma=0.99, iters=100):
    #initialize values
    values = np.zeros((env.num_states))
    best_actions = np.zeros((env.num_states), dtype=int)
    STATES = np.zeros((env.num_states, 2), dtype=np.uint8)
    REWARDS = env.reward_probabilities()
    print(REWARDS)
    i = 0
    for r in range(env.height):
        for c in range(env.width):
            state = np.array([r, c], dtype=np.uint8)
            STATES[i] = state
            i += 1
    
    for i in range(iters):
        v_old = values.copy()
        for s in range(env.num_states):
            state = STATES[s]

            if (state == env.end_state).all() or i >= env.max_steps or env.obstacles[state[0],state[1]]:
                continue # if we reach the termination condition, we cannot perform any action


            max_va = -np.inf
            best_a = 0
            for a in range(env.num_actions):
                next_state_prob = env.transition_probabilities(state, a).flatten()

                #Q5
                # (a)
                # va = (next_state_prob*(REWARDS + gamma*values)).sum()
                # (b) 
                # va = (REWARDS + gamma*v_old).sum()
                # (c) 
                va = (next_state_prob*(REWARDS + gamma*v_old)).sum()
                
                #Q6
                # (a) 
                if va > max_va:
                    max_va = va
                    best_a = a
                # (b) 
                # if va < max_va:
                    # max_va = a
                    # best_a = va
                # (c) 
                # if va > values:
                    # max_va = va
                    # best_a = a

            values[s] = max_va
            best_actions[s] = best_a

    return values.reshape((env.height, env.width)), best_actions.reshape((env.height, env.width))

estimate values

In [13]:
env = NonDeterministicGridWorld(3,5)
values, best_actions = value_iteration(env, iters=100)

print(values)
print(best_actions)
env.render()

[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
[[0.9853438  0.99750623 0.        ]
 [0.97770444 0.98757397 0.99750623]
 [0.96793754 0.         0.9853438 ]
 [0.95586771 0.96336863 0.97305769]
 [0.         0.         0.961193  ]]
[[3 3 0]
 [3 0 0]
 [0 0 0]
 [0 3 0]
 [0 0 0]]
| A |___| G |
|___|___|___|
|___|///|___|
|___|___|___|
|///|///|___|




simulate optimal policy


In [14]:
done = False
state = env.reset()
while not done:
    action = best_actions[state[0],state[1]]
    print("Action:",env.ACTION_NAMES[action])
    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    env.render()


Action: RIGHT
| A |___| G |
|___|___|___|
|___|///|___|
|___|___|___|
|///|///|___|


Action: RIGHT
| A |___| G |
|___|___|___|
|___|///|___|
|___|___|___|
|///|///|___|


Action: RIGHT
|___| A | G |
|___|___|___|
|___|///|___|
|___|___|___|
|///|///|___|


Action: RIGHT
|___|___| A |
|___|___|___|
|___|///|___|
|___|___|___|
|///|///|___|


