In [1]:
import os

import numpy as np
import gymnasium as gym

import gym_env

In [2]:
# Construct the environment
env = gym.make("simple-5x5")
env.reset()

  logger.deprecation(


({'agent': array([0, 0]), 'target': array([4, 4])}, {'distance': 8.0})

In [3]:
actions = np.arange(env.action_space.n, dtype=int)
start_loc = env.unwrapped.start_loc
target_loc = env.unwrapped.target_loc
maze = env.unwrapped.maze
size = maze.size

In [4]:
print(f"actions: {actions}, start loc: {start_loc}, target loc: {target_loc}, size: {size}")

actions: [0 1 2 3], start loc: [0 0], target loc: [4 4], size: 25


In [5]:
def row_col_to_index(row, col, len):
    """
    Converts (row,col) to an index in array
    """
    return row*len + col

def index_to_row_col(index, len):
    """
    Converts index back to (row,col)
    """
    return (index // len, index % len)

In [6]:
# Uncomment to test row_col_to_index and index_to_row_col functions
# for row in range(maze.shape[0]):
#     for col in range(maze.shape[1]):
#         index = row_col_to_index(row, col, maze.shape[0])
#         print((row,col), index, index_to_row_col(index, maze.shape[0]))

In [7]:
# Get the transition matrix T N^2 x N^2
T = np.zeros(shape=(size, size))

# loop through the maze
for row in range(maze.shape[0]):
    for col in range(maze.shape[1]):
        # if we hit a barrier
        if maze[row,col] == '1':
            continue
        # at each location, we want to store the location, keep track of which new states we transition into, and how many states we transition into
        loc = np.array((row,col))
        new_states = []
        for action in actions:     # loop through actions
            env.unwrapped.agent_loc = loc                  # set new agent location based on where we are in maze
            obs, reward, term, _, _ = env.step(action)     # take action

            # if dont move because we hit a boundary, do nothing
            if (obs['agent'] == loc).all():
                continue
            new_states.append(obs['agent'])
        
        idx_cur = row_col_to_index(row, col, maze.shape[0])
        for new_state in new_states:
            idx_new = row_col_to_index(new_state[0], new_state[1], maze.shape[0])
            T[idx_cur, idx_new] = 1/len(new_states)

In [8]:
"""
Split our T into T_nn & T_nt
T_nn -> transition probability between non-terminal states 
T_nt = P -> transition probability from non-terminal to terminal states
"""

# Make T_nn by excluding the rows and columns associated with the terminal state (also works if we have multiple)
target_locs = [target_loc]
terminal_indices = [row_col_to_index(loc[0], loc[1], maze.shape[0]) for loc in target_locs]

T_nn = T.copy()

for index in terminal_indices:
    T_nn = np.delete(T_nn, index, axis=0)
    T_nn = np.delete(T_nn, index, axis=1)

# Make T_nt by selecting only the rows corresponding to the terminal states
all_indices = set(range(T.shape[0]-1))
nonterminal_indices = all_indices - set(terminal_indices)

T_nt = np.zeros((len(T)-1, len(terminal_indices)))

for i, index_term in enumerate(terminal_indices):
    for index in nonterminal_indices:
        T_nt[index, i] = T[index, index_term]

In [9]:
print(T.shape)
print(T_nt.shape)
print(T_nn.shape)

(25, 25)
(24, 1)
(24, 24)


In [10]:
"""
Now we can use T_nn to solve for our DR (M)
"""
_lambda = 1     # define lambda
c = np.full(T_nn.shape[0], -1)     # define our cost to be -1

# Make our diagonal matrix
diag_matrix = np.diag(np.exp(c / _lambda))

# Subtract from T_nn to get L
L = diag_matrix - T_nn

# Take the inverse to obtain the DR (M)
M = np.linalg.inv(L)

In [11]:
"""
Now that we have M and P (T_nt), we can solve for exp_v
"""
t = len(target_locs)
r = np.full(t, 2)  # Create the vector r filled with 2

# Calculate the right-hand side (RHS) of the equation
exp_v = M @ T_nt * np.exp(r)

In [12]:
# we need to add back the terminal states
holder = np.zeros(T.shape[0])

for idx in terminal_indices:
    holder[idx] = np.exp(r)
for i, idx in enumerate(nonterminal_indices):
    holder[idx] = exp_v[i]

  holder[idx] = np.exp(r)
  holder[idx] = exp_v[i]


In [17]:
exp_v = np.copy(holder)

In [18]:
# solve for the optimal policy by choosing the next optimal state
optimal_policy = np.zeros(len(exp_v))
print(optimal_policy.shape, exp_v.shape)
env.reset()

for row in range(maze.shape[0]):
    for col in range(maze.shape[1]):
        # if we hit a barrier or terminal
        if maze[row,col] == '1' or maze[row,col] == 'G':
            continue

        loc = np.array((row,col))
        loc_index = row_col_to_index(row, col, maze.shape[0]-1)

        max_action = 0
        for action in actions:     # loop through actions
            env.unwrapped.agent_loc = loc                  # set new agent location based on where we are in maze
            obs, reward, term, _, _ = env.step(action)     # take action

            # if dont move because we hit a boundary, do nothing
            if (obs['agent'] == loc).all():
                continue

            succ_state_index = row_col_to_index(obs['agent'][0], obs['agent'][1], maze.shape[0]-1)     # index of successor state
            if exp_v[succ_state_index] > max_action: 
                optimal_policy[loc_index] = action
                max_action = exp_v[succ_state_index]


(25,) (25,)


In [21]:
print(exp_v.reshape((5,5)))

[[-5.37901201 -1.97882793  3.92307179  4.86526285  5.61540985]
 [-1.97882793  0.          0.         -4.16899111 -0.73367517]
 [ 3.92307179  0.          0.         -8.73264603 -2.25613078]
 [ 4.86526285 -4.16899111 -8.73264603  0.          6.97636881]
 [ 5.61540985 -0.73367517 -2.25613078  6.97636881  7.3890561 ]]


In [139]:
print(maze)

[['S' '0' '0' '0' '0']
 ['0' '1' '1' '0' '0']
 ['0' '1' '1' '0' '0']
 ['0' '0' '0' '1' '0']
 ['0' '0' '0' '0' 'G']]
