In [45]:
import time
import numpy as np
from IPython.display import clear_output

R = 5
C = 5
A = 5

N = R * C

epsilon = 0.1
# epsilon = 0.0

action_grid = [
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
[1, 1, 1, 1, 1],
]
# action_grid = (np.random.rand(R, C) * A).astype(int)

# Policy Matrix.
# pi[i][j] mean the probability of `at state i choose action j`.
pi = np.zeros((N, A))
for r_ in range(R):
    for c_ in range(C):
        pi[r_ * C + c_] = 1 / A

# Block states.
blocks = np.array([
[-1,  -1,  -1,  -1, -1],
[-1, -10, -10,  -1, -1],
[-1,  -1, -10,  -1, -1],
[-1, -10,   0, -10, -1],
[-1, -10,  -1,  -1, -1],
])
# blocks = np.array([
# [0,   0,   0,   0,  0],
# [0, -10, -10,   0,  0],
# [0,   0, -10,   0,  0],
# [0, -10,   1, -10,  0],
# [0, -10,   0,   0,  0],
# ])

action_to_direction = {}
action_to_direction[0] = np.array([-1, 0])
action_to_direction[1] = np.array([0, 1])
action_to_direction[2] = np.array([1, 0])
action_to_direction[3] = np.array([0, -1])
action_to_direction[4] = np.array([0, 0])

gamma = 0.9

# State Values.
v = np.ones((R * C, 1))

# Episode Length.
# If not decrease epsilon, this E should be large for convergence.
E = 100

# Sarsa step.
sarsa_n = 3

q_table = np.zeros((N, A))

num_episode = 300

alpha = 0.1
r_start = 0
c_start = 0
r_target = 3
c_target = 2

for episode in range(num_episode):
    # 1. Random start.
    state = (r_start, c_start)
    a_start = np.random.choice(range(A), size=1, replace=False, p=pi[r_start * C + c_start])[0]

    # 2. Generate a Episode.
    episode_history = []
    for e in range(E):
        r_, c_ = state
        a = a_start
        if e != 0:
            a = np.random.choice(range(A), size=1, replace=False, p=pi[r_ * C + c_])[0]

        delta = action_to_direction[a]
        r_p = r_ + delta[0]
        c_p = c_ + delta[1]

        if r_p >= R or r_p < 0 or c_p >= C or c_p < 0:
            reward = -10
            next_state = (r_, c_)
        else:
            reward = blocks[r_p][c_p]
            next_state = (r_p, c_p)

        # # 1. Naive Sarsa.
        # if len(episode_history) > 0:
        #     (r_last, c_last), a_last, reward_last = episode_history[-1]
        #     TD_target = reward_last + gamma * q_table[r_ * C + c_][a]
        #     TD_error = q_table[r_last * C + c_last][a_last] - TD_target
        #     q_table[r_last * C + c_last][a_last] -= alpha * TD_error
        #     # Update Policy.
        #     max_a = np.argmax(q_table[r_last * C + c_last])
        #     pi[r_last * C + c_last] = epsilon / A
        #     pi[r_last * C + c_last][max_a] = 1 - epsilon / A * (A - 1)


        # # 2. Expected Sarsa.
        # r_next, c_next = next_state
        # # Update q-value.
        # TD_target = reward
        # for a_ in range(A):
        #     TD_target += gamma * pi[r_next * C + c_next][a_] * q_table[r_next * C + c_next][a_]
        # TD_error = q_table[r_ * C + c_][a] - TD_target
        # q_table[r_ * C + c_][a] -= alpha * TD_error
        # # Update Policy.
        # max_a = np.argmax(q_table[r_ * C + c_])
        # pi[r_ * C + c_] = epsilon / A
        # pi[r_ * C + c_][max_a] = 1 - epsilon / A * (A - 1)

        # 3. n-step Sarsa.
        if len(episode_history) > sarsa_n - 1:
            head_idx = -(sarsa_n)
            (r_head, c_head), a_head, reward_head = episode_history[head_idx]
            TD_target = reward_head
            g = gamma
            for i in range(1, sarsa_n + 1):
                (r_last, c_last), a_last, reward_last = episode_history[head_idx + i]
                if i != sarsa_n:
                    TD_target += g * reward_last
                else:
                    TD_target += g * q_table[r_ * C + c_][a]
                g *= gamma
            TD_error = q_table[r_head * C + c_head][a_head] - TD_target
            q_table[r_head * C + c_head][a_head] -= alpha * TD_error
            # Update Policy.
            max_a = np.argmax(q_table[r_head * C + c_head])
            pi[r_head * C + c_head] = epsilon / A
            pi[r_head * C + c_head][max_a] = 1 - epsilon / A * (A - 1)

        episode_history.append((state, a, reward))
        state = next_state

        if next_state[0] == r_target and next_state[1] == c_target:
            break

    # 3. n-step Sarsa. Coner cases.
    for sarsa_n_corner in range(1, sarsa_n):
        if len(episode_history) <= sarsa_n_corner - 1:
            continue
        head_idx = -(sarsa_n_corner)
        (r_head, c_head), a_head, reward_head = episode_history[head_idx]
        TD_target = reward_head
        g = gamma
        for i in range(1, sarsa_n_corner + 1):
            (r_last, c_last), a_last, reward_last = episode_history[head_idx + i]
            if i != sarsa_n_corner:
                TD_target += g * reward_last
            else:
                TD_target += g * q_table[state[0] * C + state[1]][a]
            g *= gamma
        TD_error = q_table[r_head * C + c_head][a_head] - TD_target
        q_table[r_head * C + c_head][a_head] -= alpha * TD_error
        # Update Policy.
        max_a = np.argmax(q_table[r_head * C + c_head])
        pi[r_head * C + c_head] = epsilon / A
        pi[r_head * C + c_head][max_a] = 1 - epsilon / A * (A - 1)

    # Enhance Exploitation.
    # if epsilon > 0.001:
    #     epsilon -= 0.001

    time.sleep(0.2)
    clear_output(wait=True)

    print('len(episode_history):', len(episode_history))
    action_grid = np.zeros((R, C))
    for r_ in range(R):
        for c_ in range(C):
            a = np.argmax(pi[r_ * C + c_])
            action_grid[r_][c_] = a
    print(f'{episode}th Action Grid:\n', action_grid)

clear_output(wait=True)

action_grid = np.zeros((R, C))
for r_ in range(R):
    for c_ in range(C):
        a = np.argmax(pi[r_ * C + c_])
        action_grid[r_][c_] = a
print(f'Converged Action Grid:\n', action_grid, flush=True)



Converged Action Grid:
 [[1. 1. 1. 2. 2.]
 [2. 2. 2. 2. 3.]
 [2. 1. 2. 1. 2.]
 [1. 1. 0. 3. 2.]
 [4. 1. 0. 3. 3.]]


In [46]:
print(action_grid)

[[1. 1. 1. 2. 2.]
 [2. 2. 2. 2. 3.]
 [2. 1. 2. 1. 2.]
 [1. 1. 0. 3. 2.]
 [4. 1. 0. 3. 3.]]
