### SARSA and Q-learning

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

#### Make an environment

In [None]:
# Make a maze

fig = plt.figure(figsize=(5, 5))
ax = plt.gca()

# draw red walls
plt.plot([1, 1], [0, 1], color='red', linewidth=2)
plt.plot([1, 2], [2, 2], color='red', linewidth=2)
plt.plot([2, 2], [2, 1], color='red', linewidth=2)
plt.plot([2, 3], [1, 1], color='red', linewidth=2)

# denote states as 'S0'~'S8'
plt.text(0.5, 2.5, 'S0', size=14, ha='center')
plt.text(1.5, 2.5, 'S1', size=14, ha='center')
plt.text(2.5, 2.5, 'S2', size=14, ha='center')
plt.text(0.5, 1.5, 'S3', size=14, ha='center')
plt.text(1.5, 1.5, 'S4', size=14, ha='center')
plt.text(2.5, 1.5, 'S5', size=14, ha='center')
plt.text(0.5, 0.5, 'S6', size=14, ha='center')
plt.text(1.5, 0.5, 'S7', size=14, ha='center')
plt.text(2.5, 0.5, 'S8', size=14, ha='center')
plt.text(0.5, 2.3, 'START', ha='center')
plt.text(2.5, 0.3, 'GOAL', ha='center')

# set the range and remove ticks
ax.set_xlim(0, 3)
ax.set_ylim(0, 3)
plt.tick_params(axis='both', which='both', bottom=False, top=False,
                labelbottom=False, right=False, left=False, labelleft=False)

# Draw green circle on 'S0' to indicate the current state
line, = ax.plot([0.5], [2.5], marker="o", color='g', markersize=60)

#### Make a random policy

In [None]:
# Set theta_0 for random policy

# row denotes state 0~7, and column denotes an action direction (order: up,right,down,left)
theta_0 = np.array([[np.nan, 1, 1, np.nan],  # s0
                    [np.nan, 1, np.nan, 1],  # s1
                    [np.nan, np.nan, 1, 1],  # s2
                    [1, 1, 1, np.nan],  # s3
                    [np.nan, np.nan, 1, 1],  # s4
                    [1, np.nan, np.nan, np.nan],  # s5
                    [1, np.nan, np.nan, np.nan],  # s6
                    [1, 1, np.nan, np.nan],  # s7、※Since s8 is the goal(terminal state), we do not need the policy
                    ])

In [None]:
# Make a random policy using theta_0

def simple_convert_into_pi_from_theta(theta):

    [m, n] = theta.shape
    pi = np.zeros((m, n))
    for i in range(0, m):
        pi[i, :] = theta[i, :] / np.nansum(theta[i, :])  # calculate proportion

    pi = np.nan_to_num(pi)  # change nan to 0

    return pi

# calculate a random policy pi_0
pi_0 = simple_convert_into_pi_from_theta(theta_0)
print(pi_0)

#### **(1) SARSA**

In [None]:
# Initialize the action-value function Q

[a, b] = theta_0.shape
Q1 = np.random.rand(a, b) * theta_0 * 0.1 # change the value of impossible action to nan

#### **(2) Q-learning**

In [None]:
# Initialize the action-value function Q

Q2 = np.random.rand(a, b) * theta_0 * 0.1 # change the value of impossible action to nan

#### Implement ε-greedy algorithm

In [None]:
def get_action(s, Q, epsilon, pi_0):
    direction = ["up", "right", "down", "left"]

    # for the exploration, we take a random action with probability epsilon, 
    # and we take an action that maximizes the Q-value with probability 1-epsilon
    if np.random.rand() < epsilon:
        # take a random action
        next_direction = np.random.choice(direction, p=pi_0[s, :])
    else:
        # take an action that maximizes the Q-value
        next_direction = direction[np.nanargmax(Q[s, :])]

    # change an action to an index
    if next_direction == "up":
        action = 0
    elif next_direction == "right":
        action = 1
    elif next_direction == "down":
        action = 2
    elif next_direction == "left":
        action = 3

    return action


def get_s_next(s, a):
    direction = ["up", "right", "down", "left"]
    next_direction = direction[a]

    # decide the next state with an action
    if next_direction == "up":
        s_next = s - 3  # if we take a 'up' action, the state number decreases by 3
    elif next_direction == "right":
        s_next = s + 1  # if we take a 'right' action, the state number increases by 1
    elif next_direction == "down":
        s_next = s + 3  # if we take a 'down' action, the state number increases by 3
    elif next_direction == "left":
        s_next = s - 1  # if we take a 'left' action, the state number decreases by 1

    return s_next

#### SARSA and Q-learning algorithm

In [None]:
# Modify the Q-value with SARSA algorithm

def Sarsa(s, a, r, s_next, a_next, Q, eta, gamma):

    if s_next == 8:  # when we reach the goal
        Q[s, a] = Q[s, a] + eta * (r - Q[s, a])

    else:
        Q[s, a] = Q[s, a] + eta * (r + gamma * Q[s_next, a_next] - Q[s, a])

    return Q


In [None]:
# Modify the Q-value with Q-learning algorithm

def Q_learning(s, a, r, s_next, Q, eta, gamma):

    if s_next == 8:  # when we reach the goal
        Q[s, a] = Q[s, a] + eta * (r - Q[s, a])

    else:
        Q[s, a] = Q[s, a] + eta * (r + gamma * np.nanmax(Q[s_next,: ]) - Q[s, a])

    return Q

In [None]:
# Make a function to escape the maze: it returns (state-action) history and the final Q-value

def goal_maze_ret_s_a_Q(Q, epsilon, eta, gamma, pi, algorithm):
    assert algorithm in {'SARSA', 'Sarsa', 'Q-learning'}, "algorithm should be one of {'SARSA', 'Sarsa', 'Q-learning'}"

    s = 0  # initial state
    a = a_next = get_action(s, Q, epsilon, pi)  # first action
    s_a_history = [[0, np.nan]]  # history of states and actions

    while (1):  
        a = a_next  # decide next action

        s_a_history[-1][1] = a
        # append the current action to history

        s_next = get_s_next(s, a)
        # decide the next state

        s_a_history.append([s_next, np.nan])
        # append the next state to history (since we do not know the next action at this point, leave it as nan)

        # get the reward and decide next action
        if s_next == 8:
            r = 1  
            a_next = np.nan
        else:
            r = 0
            a_next = get_action(s_next, Q, epsilon, pi)
        
        # modify the Q-function
        if algorithm == 'SARSA' or algorithm == 'Sarsa':
            Q = Sarsa(s, a, r, s_next, a_next, Q, eta, gamma)

        if algorithm == 'Q-learning':
            Q = Q_learning(s, a, r, s_next, Q, eta, gamma)

        # decide whether the episode ends
        if s_next == 8:  # terminate if we reach the goal
            break
        else:
            s = s_next

    return [s_a_history, Q]

#### Escape the maze with SARSA algorithm

In [None]:
eta = 0.1  # learning rate
gamma = 0.9  # discount factor
epsilon = 0.5  # initial epsilon value for epsilon-greedy algorithm
v = np.nanmax(Q1, axis=1)  # find maximum Q-value for each state
is_continue = True
episode = 1

V1 = []  # save the state-value for each episode
V1.append(np.nanmax(Q1, axis=1))

while is_continue:  
    print("epsiode: " + str(episode))

    # decrease epsilon gradually
    epsilon = epsilon / 2

    [s_a_history, Q1] = goal_maze_ret_s_a_Q(Q1, epsilon, eta, gamma, pi_0, 'Sarsa')

    # change the state-value function
    new_v = np.nanmax(Q1, axis=1)  
    print(np.sum(np.abs(new_v - v)))  # print how much the state-value function is changed
    v = new_v
    V1.append(v) # append the state-value at the end of current episode

    print("The number of steps to reach the goal is " + str(len(s_a_history) - 1))

    # repeat 100 episodes
    episode = episode + 1
    if episode > 100:
        break


#### Escape the maze with Q-learning algorithm

In [None]:
# Escape the maze

eta = 0.1  # learning rate
gamma = 0.9  # discount factor
epsilon = 0.5  # initial epsilon value for epsilon-greedy algorithm
v = np.nanmax(Q2, axis=1)  # find maximum Q-value for each state
is_continue = True
episode = 1

V2 = []  # save the state-value for each episode
V2.append(np.nanmax(Q2, axis=1))

while is_continue:  
    print("epsiode: " + str(episode))

    # decrease epsilon gradually
    epsilon = epsilon / 2

    [s_a_history, Q2] = goal_maze_ret_s_a_Q(Q2, epsilon, eta, gamma, pi_0, 'Q-learning')

    # change the state-value function
    new_v = np.nanmax(Q2, axis=1)  
    print(np.sum(np.abs(new_v - v)))  # print how much the state-value function is changed
    v = new_v
    V2.append(v) # append the state-value at the end of current episode

    print("The number of steps to reach the goal is " + str(len(s_a_history) - 1))

    # repeat 100 episodes
    episode = episode + 1
    if episode > 100:
        break


In [None]:
# Visualize the change of state-value
# c.f.) URL http://louistiao.me/posts/notebooks/embedding-matplotlib-animations-in-jupyter-notebooks/
from matplotlib import animation
from IPython.display import HTML
import matplotlib.cm as cm  # color map


def init():
    # initialize the color of background
    line.set_data([], [])
    return (line,)


def animate1(i):
    # draw a picture by frame
    # color each state based on the state-value
    line, = ax.plot([0.5], [2.5], marker="s",
                    color=cm.jet(V1[i][0]), markersize=85)  # S0
    line, = ax.plot([1.5], [2.5], marker="s",
                    color=cm.jet(V1[i][1]), markersize=85)  # S1
    line, = ax.plot([2.5], [2.5], marker="s",
                    color=cm.jet(V1[i][2]), markersize=85)  # S2
    line, = ax.plot([0.5], [1.5], marker="s",
                    color=cm.jet(V1[i][3]), markersize=85)  # S3
    line, = ax.plot([1.5], [1.5], marker="s",
                    color=cm.jet(V1[i][4]), markersize=85)  # S4
    line, = ax.plot([2.5], [1.5], marker="s",
                    color=cm.jet(V1[i][5]), markersize=85)  # S5
    line, = ax.plot([0.5], [0.5], marker="s",
                    color=cm.jet(V1[i][6]), markersize=85)  # S6
    line, = ax.plot([1.5], [0.5], marker="s",
                    color=cm.jet(V1[i][7]), markersize=85)  # S7
    line, = ax.plot([2.5], [0.5], marker="s",
                    color=cm.jet(1.0), markersize=85)  # S8
    return (line,)


# make an animation
anim = animation.FuncAnimation(
    fig, animate1, init_func=init, frames=len(V1), interval=200, repeat=False)

HTML(anim.to_jshtml())

In [None]:
def animate2(i):
    # draw a picture by frame
    # color each state based on the state-value
    line, = ax.plot([0.5], [2.5], marker="s",
                    color=cm.jet(V2[i][0]), markersize=85)  # S0
    line, = ax.plot([1.5], [2.5], marker="s",
                    color=cm.jet(V2[i][1]), markersize=85)  # S1
    line, = ax.plot([2.5], [2.5], marker="s",
                    color=cm.jet(V2[i][2]), markersize=85)  # S2
    line, = ax.plot([0.5], [1.5], marker="s",
                    color=cm.jet(V2[i][3]), markersize=85)  # S3
    line, = ax.plot([1.5], [1.5], marker="s",
                    color=cm.jet(V2[i][4]), markersize=85)  # S4
    line, = ax.plot([2.5], [1.5], marker="s",
                    color=cm.jet(V2[i][5]), markersize=85)  # S5
    line, = ax.plot([0.5], [0.5], marker="s",
                    color=cm.jet(V2[i][6]), markersize=85)  # S6
    line, = ax.plot([1.5], [0.5], marker="s",
                    color=cm.jet(V2[i][7]), markersize=85)  # S7
    line, = ax.plot([2.5], [0.5], marker="s",
                    color=cm.jet(1.0), markersize=85)  # S8
    return (line,)


# make an animation
anim = animation.FuncAnimation(
    fig, animate2, init_func=init, frames=len(V2), interval=200, repeat=False)

HTML(anim.to_jshtml())

### **Reference**
Pytorch를 활용한 강화학습/심층강화학습 실전 입문