In [2]:
import numpy as np

In [3]:
maze_str = '''
XXXXXXX
X     X
X X   X
X X1X3X
XS    X
XTTTTTX
XXXXXXX
'''
maze = [[c for c in line] for line in maze_str.strip().split('\n')]
maze

[['X', 'X', 'X', 'X', 'X', 'X', 'X'],
 ['X', ' ', ' ', ' ', ' ', ' ', 'X'],
 ['X', ' ', 'X', ' ', ' ', ' ', 'X'],
 ['X', ' ', 'X', '1', 'X', '3', 'X'],
 ['X', 'S', ' ', ' ', ' ', ' ', 'X'],
 ['X', 'T', 'T', 'T', 'T', 'T', 'X'],
 ['X', 'X', 'X', 'X', 'X', 'X', 'X']]

In [4]:
def find_start_end_position():
    start = None
    ends = []
    for i in range(len(maze)):
        for j in range(len(maze[i])):
            if maze[i][j] == 'S':
                start = (i,j)
            if maze[i][j] in ['1','2','3']:
                ends.append((i,j))
    return start, ends

START, ENDS = find_start_end_position()

In [5]:
ROW = len(maze)
COL = len(maze[0])

### Define some functions

Many of these functions are not necessary but define to match the full Value Iteration algorithm

In [6]:
# Cache actions for each state
state_actions = {}

In [20]:
def all_states(maze):
    states = []
    for i in range(len(maze)):
        for j in range(len(maze[i])):
            if maze[i][j] != 'X':
                states.append((i,j))
    return states

def get_actions(state):
    '''
    Find the possible action for a given state
    '''
    
    # No action if reach the end
    if maze[state[0]][state[1]] in ['1', '2', '3']:
        return []
    
    if state_actions.get(state):
        return state_actions[state]
    i,j = state
    possible_actions = []
    if i > 1 and maze[i-1][j] != 'X':
        possible_actions.append('UP')
    if i < ROW - 2 and maze[i+1][j] != 'X':
        possible_actions.append('DOWN')
    if j > 1 and maze[i][j-1] != 'X':
        possible_actions.append('LEFT')
    if j < COL - 2 and maze[i][j+1] != 'X':
        possible_actions.append('RIGHT')
    state_actions[state] = possible_actions
    return possible_actions

def take_action(state, action):
    '''
    Given an action, return the next state
    '''
    i, j = state
    if action == 'UP':
        return i-1, j
    if action == 'DOWN':
        return i+1, j
    if action == 'LEFT':
        return i, j-1
    if action == 'RIGHT':
        return i, j+1
    

def best_action(state):
    actions = get_actions(state)
    states = [take_action(state, action) for action in actions]
    
    idx = 0
    best_value = np.NINF
    for i,state in enumerate(states):
        if value_table[state] > best_value:
            best_value = value_table[state]
            idx = i
    return states[idx], actions[idx]
        

def reward(state):
    i,j = state
    rewards = {
        '3': 10,
        '2': 5,
        '1': 1,
        'T': -10,
        '0': -1
    }
    return rewards[maze[i][j]]

def discounted_reward(state, action, actions, noise, discount):
    '''
    Calculate the discounted reward given a state and actions
    
    sum of [P(s'|s, a) * (R(s,a,s') + gamma*V(s'))] for all s'
    '''
    
    next_state = take_action(state, action)
    other_states = [take_action(state, a) for a in actions if a != action]
    
    dr = (1-noise) * (reward(next_state) + discount * value_table[next_state])
    
    if len(other_states) == 0:
        return dr
    
    prob = noise / len(other_states)
    for other_state in other_states:
        dr += prob * (reward(other_state) + discount * value_table[other_state])
        
    return dr

### Value Iteration Part

In [21]:
# How random the transition function ~ how much risk the agent willing to take
NOISE = 0.5

# How far to the future the agent prioritize reward
DISCOUNT = 0.9

In [22]:
states = all_states(maze)

In [23]:
value_table = {s: 0 for s in states}
for end in ENDS:
    value_table[end] = reward(end)

In [24]:
states = all_states(maze)
epochs = 50
for i in range(epochs):
    for state in states:
        actions = get_actions(state)
        
        if len(actions) == 0:
            value_table[state] = reward(state)
            continue
            
        Q_sa = []
        for action in actions:
            Q_sa.append(discounted_reward(state, action, actions, NOISE, DISCOUNT))
            
        value_table[state] = np.max(Q_sa)

In [25]:
value_table

{(1, 1): -8.314345661888677,
 (1, 2): -5.068527807652714,
 (1, 3): -0.7268273105269234,
 (1, 4): 2.240674943093799,
 (1, 5): 5.076230385232314,
 (2, 1): -11.18557405573019,
 (2, 3): 1.8012786644521075,
 (2, 4): 4.977366222537935,
 (2, 5): 11.262059236748307,
 (3, 1): -14.320263546085572,
 (3, 3): 1,
 (3, 5): 10,
 (4, 1): -18.415011740770005,
 (4, 2): -15.323702939900052,
 (4, 3): -6.90175954097179,
 (4, 4): -6.916272320204137,
 (4, 5): 2.729007343346593,
 (5, 1): -23.435822307887513,
 (5, 2): -21.442371222702363,
 (5, 3): -16.771755048870858,
 (5, 4): -14.850798814695281,
 (5, 5): -10.95480616210691}

### Test solve

In [26]:
maze = [[c for c in line] for line in maze_str.strip().split('\n')]
maze

[['X', 'X', 'X', 'X', 'X', 'X', 'X'],
 ['X', ' ', ' ', ' ', ' ', ' ', 'X'],
 ['X', ' ', 'X', ' ', ' ', ' ', 'X'],
 ['X', ' ', 'X', '1', 'X', '3', 'X'],
 ['X', 'S', ' ', ' ', ' ', ' ', 'X'],
 ['X', 'T', 'T', 'T', 'T', 'T', 'X'],
 ['X', 'X', 'X', 'X', 'X', 'X', 'X']]

In [27]:
current_pos = START
while current_pos not in ENDS:
    maze[current_pos[0]][current_pos[1]] = '*'
    current_pos, action = best_action(current_pos)
print(f'Stop at {maze[current_pos[0]][current_pos[1]]}')

Stop at 3


In [28]:
maze

[['X', 'X', 'X', 'X', 'X', 'X', 'X'],
 ['X', '*', '*', '*', '*', '*', 'X'],
 ['X', '*', 'X', ' ', ' ', '*', 'X'],
 ['X', '*', 'X', '1', 'X', '3', 'X'],
 ['X', '*', ' ', ' ', ' ', ' ', 'X'],
 ['X', 'T', 'T', 'T', 'T', 'T', 'X'],
 ['X', 'X', 'X', 'X', 'X', 'X', 'X']]

In [None]:
value_table[(4,4)]