In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

> Making Environments

In [2]:
# We are going to make 4 by 4 Grid World where agent starts moving at (0,0) and needs to arrive (3,3)
# Each step, agent could go to 4 direction near to its block.
# There are termination hole at (2,1) where agent immediately recives -100 reward and terminates.
# Also, Collision with walls is -5 reward, Goal zone gives +100 reward
# Going to the goal or termination hole, agent terminates.

goal_pos = (3, 3)  # position of the goal
trap_pos = (2, 1) # position where trap exists

value_PI = np.zeros((4,4)) # matrix where value function of each states are saved while using PI
value_VI = np.zeros((4,4)) # matrix where value function of each states are saved while using VI

Rewards = np.zeros((4,4))
Rewards[goal_pos] = 100
Rewards[trap_pos] = -100 # Rewards matrix is matrix with rewards given when arrived at such state.

directions = ['Left','Up','Right','Down', 'Stay'] # Possible agent's actions could choose (stay only when termination occur)
state = np.array([0,0])
action = {'Up' : np.array([-1,0]), 'Down':np.array([1,0]), 'Right':np.array([0,1]), 'Left' : np.array([0,-1]), 'Stay' : np.array([0,0]) } # possible actions
policy_PI = [['Right','Up','Left','Down'],['Right','Up','Left','Down'],['Right','Stay','Left','Down'],['Right','Up','Left','Stay']] # Random Initial Policy


# parameters for training PI
gamma = 0.9
num_improve = 10
num_eval_per_improve = 30

# parameters for training VI
gamma = gamma
num_value_iteration = 1000

def update_state(state, direction):
    # Returns Updated state, considering collision with walls / Return new state & whether collision had occured
    if np.array_equal(state, goal_pos) or np.array_equal(state, trap_pos):
        return state, False
    else:
        state_prime = state + action[direction]
        if(0<=state_prime[0]<4) and (0<=state_prime[1]<4):
            return state_prime , False
        else:
            return state, True


Function Visualizing Policy

In [3]:
# Function to plot policy with arrows (directions)
def plot_arrows(policy):
    fig, ax = plt.subplots(4, 4)
    for i in range(4):
        for j in range(4):
            if policy[i][j] == 'Up':
                ax[i, j].arrow(0, 0, 0, 0.5, head_width=0.2, head_length=0.2, fc='k', ec='k')
            elif policy[i][j] == 'Down':
                 ax[i, j].arrow(0, 0, 0, -0.5, head_width=0.2, head_length=0.2, fc='k', ec='k')
            elif policy[i][j] == 'Left':
                ax[i, j].arrow(0, 0, -0.5, 0, head_width=0.2, head_length=0.2, fc='k', ec='k')
            elif policy[i][j] == 'Right':
                ax[i, j].arrow(0, 0, 0.5, 0, head_width=0.2, head_length=0.2, fc='k', ec='k')

            ax[i, j].set_xlim([-1, 1])
            ax[i, j].set_ylim([-1, 1])
            ax[i, j].set_xticks([])
            ax[i, j].set_yticks([])

    plt.show()


> Policy Evaluation

In [4]:
# updating the value function using current policy
def policy_eval(policy, value):
    new_value = np.zeros((4,4))
    for i in range(4):
        for j in range(4):
            s_prime , collision = update_state(np.array([i,j]), policy[i][j])
            if ((i,j)==trap_pos or (i,j)==goal_pos):
                new_value[i,j] = 0
            else:
                new_value[i, j] = Rewards[tuple(s_prime)] + gamma * value[tuple(s_prime)]
                if collision:
                    new_value[i, j] -= 5
    return new_value


> Policy Improvement

In [5]:
# Selecting Action (Directions) Maximizing value function of each state
def policy_improve(value):
    new_policy = [[None, None, None, None],[None, None, None, None],[None, None, None, None],[None, None, None, None]]
    for i in range(4):
        for j in range(4):
            if((i,j)==trap_pos) or ((i,j)==goal_pos):
                new_policy[i][j] = 'Stay' # terminate
            else:
                max_value = float('-inf')
                max_action = None
                for direction in directions:
                    state_prime, collision = update_state(np.array([i, j]), direction)
                    if not collision:
                        value_prime = Rewards[tuple(state_prime)] + gamma * value[tuple(state_prime)]
                        if value_prime > max_value:
                            max_value = value_prime
                            max_action = direction
                new_policy[i][j] = max_action
    return new_policy


Policy Iteration

In [None]:
for i in range(num_improve):
    for j in range(num_eval_per_improve):
        new_value_PI = policy_eval(policy_PI, value_PI)
        value_PI = new_value_PI
    policy_PI = policy_improve(value_PI)
print('total iterations:', num_improve*num_eval_per_improve)
print(policy_PI)
plot_arrows(policy_PI)
print(value_PI)

Value Iteration

In [7]:
def update_value(value):
    # proceed one step of value iteration to update value function, and also return corresponding policy (action choosed to update value function)
    new_value = np.zeros((4,4))
    opt_policy = [[None,None,None,None],[None,None,None,None],[None,None,None,None],[None,None,None,None]]
    for i in range(4):
        for j in range(4):
            if((i,j)==trap_pos) or ((i,j)==goal_pos):
                opt_policy[i][j] = 'Stay' # terminate
                new_value[i,j] = 0
            else:
                max_value = float('-inf')
                max_action = None
                for direction in directions:
                    state_prime, collision = update_state(np.array([i, j]), direction)
                    if not collision:
                        value_prime = Rewards[tuple(state_prime)] + gamma * value[tuple(state_prime)]
                        if value_prime > max_value:
                            max_value = value_prime
                            max_action = direction
                opt_policy[i][j] = max_action
                new_value[i, j] = max_value

    return new_value, opt_policy

In [None]:
# Selecting action maximizing bellman optimal equation.
for i in range(num_value_iteration):
    new_value_VI , policy_VI = update_value(value_VI)
    value_VI = new_value_VI

print('total iterations:', num_value_iteration)
print(policy_VI)
plot_arrows(policy_VI)
print(value_VI)

Visuallize your result

In [9]:
robot_pos = np.array((0, 0))  # starting position of the robot

# fig, ax = plt.subplots()
policy = policy_VI # choose from policy_PI, policy_VI
model = update_state

def animate(i):
    global robot_pos
    if i == 0:
        robot_pos = np.array((0, 0))  # starting position of the robot
    plt.cla()
    plt.xlim(-0.5, 3.5)
    plt.ylim( 3.5, - 0.5)
    plt.xticks([-0.5, 0.5, 1.5, 2.5, 3.5])
    plt.yticks([-0.5, 0.5, 1.5, 2.5, 3.5])
    plt.gca().set_aspect('equal', adjustable='box')
    plt.plot(robot_pos[1], robot_pos[0], 'ro', markersize=20)
    plt.plot(goal_pos[1], goal_pos[0], 'g*', markersize=40)
    plt.plot(trap_pos[1], trap_pos[0], 'bs', markersize=30)

    robot_pos, c= model(robot_pos, policy[robot_pos[0]][robot_pos[1]])

    plt.grid(True, linewidth=0.5, color='gray')

# Creating the animation
fig = plt.figure(figsize=(5, 5))
anim = FuncAnimation(fig, animate, frames=range(7), interval=100)
ani = HTML(anim.to_jshtml())
plt.close()
ani