Reference:

https://mpatacchiola.github.io/blog/2017/01/15/dissecting-reinforcement-learning-2.html

In [6]:
import numpy as np
from basic_environments.gridworld import GridWorld

## GridWorld Environment initialize

In [7]:
env = GridWorld(3, 4)

state_matrix = np.zeros((3, 4))
state_matrix[0, 3] = 1 # terminal states
state_matrix[1, 3] = 1 # terminal states
state_matrix[1, 1] = -1 # obstacle

reward_matrix = np.full((3, 4), -0.04)
reward_matrix[0, 3] = 1
reward_matrix[1, 3] = -1

# For each one of the four actions there is a probability
# This matrix defines the transition rules for all the 4 possible actions.
# The first row corresponds to the probabilities of executing each one of
# the 4 actions when the policy orders to the robot to go UP. In this case
# the transition model says that with a probability of 0.8 the robot will
# go UP, with a probaiblity of 0.1 RIGHT, 0.0 DOWN and 0.10 LEFT.
transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                              [0.1, 0.8, 0.1, 0.0],
                              [0.0, 0.1, 0.8, 0.1],
                              [0.1, 0.0, 0.1, 0.8]])
# Define the policy matrix
# 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT, NaN=Obstacle, -1=NoAction
# This is the optimal policy for world with reward=-0.04
policy_matrix = np.array([[1,      1,  1,  -1],
                          [0, np.NaN,  0,  -1],
                          [0,      3,  3,   3]])
# Set the matrices 
env.setStateMatrix(state_matrix)
env.setRewardMatrix(reward_matrix)
env.setTransitionMatrix(transition_matrix)

In [14]:
#Reset the environment
observation = env.reset()
#Display the world printing on terminal
env.render()

 -  -  -  * 
 -  #  -  * 
 ○  -  -  - 



In [15]:
for _ in range(6):
    action = policy_matrix[observation[0], observation[1]]
    observation, reward, done = env.step(action)
    print("")
    print("ACTION: " + str(action))
    print("REWARD: " + str(reward))
    print("DONE: " + str(done))
    env.render()
    if done: break


ACTION: 0.0
REWARD: -0.04
DONE: False
 -  -  -  * 
 ○  #  -  * 
 -  -  -  - 


ACTION: 0.0
REWARD: -0.04
DONE: False
 ○  -  -  * 
 -  #  -  * 
 -  -  -  - 


ACTION: 1.0
REWARD: -0.04
DONE: False
 -  ○  -  * 
 -  #  -  * 
 -  -  -  - 


ACTION: 1.0
REWARD: -0.04
DONE: False
 -  -  ○  * 
 -  #  -  * 
 -  -  -  - 


ACTION: 1.0
REWARD: 1.0
DONE: True
 -  -  -  ○ 
 -  #  -  * 
 -  -  -  - 



### discount reward

In [19]:
def calculate_return(state_list, gamma):
    """
    params:
        state_list: a list containing a tuple (position, reward)
        gamma: discount factor
    return:
        a scalar value of total discounted return
    """
    discount_factor = 1
    total_return = 0
    for visit in state_list:
        immediate_reward = visit[1]
        total_return += discount_factor * immediate_reward
        discount_factor *= gamma
        
    return total_return

### MC with known policy

In [20]:
value_matrix = np.zeros((3, 4))

# init with 1.0e-10 to avoid division by zero
running_mean_matrix = np.full((3,4), 1.0e-10) 

gamma = 0.999
total_epoch = 50000
print_epoch = 10000

for epoch in range(total_epoch):
    # starting a new episode
    episode_list = list()
    
    # reset
    observation = env.reset(exploring_starts = False)
    for _ in range(1000):
        # Take the action from the action matrix
        action = policy_matrix[observation[0], observation[1]]
        # Move one step in the environment and get obs and reward
        observation, reward, done = env.step(action)
        # Append the visit in the episode list
        episode_list.append((observation, reward))
        if done: break
            
    # The episode is finished
    counter = 0
    # Checkup to identify if it is the first visit to a state
    checkup_matrix = np.zeros((3,4))
    # This cycle is the implementation of First-Visit MC.
    # For each state stored in the episode list it checks if it
    # is the first visit and then estimates the return.
    for visit in episode_list:
        observation = visit[0]
        row = observation[0]
        col = observation[1]
        reward = visit[1]
        if(checkup_matrix[row, col] == 0): # first visit the state [row, col]
            return_value = calculate_return(episode_list[counter:], gamma)
            running_mean_matrix[row, col] += 1
            value_matrix[row, col] += return_value
            checkup_matrix[row, col] = 1
        counter += 1
    if(epoch % print_epoch == 0):
        print("Value matrix after " + str(epoch+1) + " iterations:") 
        print(value_matrix / running_mean_matrix)

#Time to check the value matrix obtained
print("Value matrix after " + str(total_epoch) + " iterations:")
print(value_matrix / running_mean_matrix)

Value matrix after 1 iterations:
[[ 0.87712296  0.918041    0.959       1.        ]
 [ 0.83624584  0.          0.          0.        ]
 [ 0.          0.          0.          0.        ]]
Value matrix after 10001 iterations:
[[ 0.805301    0.86290038  0.91428591  1.        ]
 [ 0.75407064  0.          0.64573221 -1.        ]
 [ 0.69643872  0.6520033   0.          0.        ]]
Value matrix after 20001 iterations:
[[ 0.80787739  0.86524621  0.91642166  1.        ]
 [ 0.75674586  0.          0.65504244 -1.        ]
 [ 0.70355814  0.65563777  0.          0.        ]]
Value matrix after 30001 iterations:
[[ 0.80858277  0.86600025  0.91719579  1.        ]
 [ 0.75755278  0.          0.65622958 -1.        ]
 [ 0.70421566  0.65589214  0.          0.        ]]
Value matrix after 40001 iterations:
[[ 0.80810452  0.86559092  0.91681455  1.        ]
 [ 0.75707877  0.          0.65656877 -1.        ]
 [ 0.70245172  0.65710168  0.          0.        ]]
Value matrix after 50000 iterations:
[[ 0.8089148

The utility estimations for the states (4,1) and (3,1) are zero. This can be considered one of the limitations and at the same time one of the advantage of MC methods. 

The policy we are using, the transition probabilities, and the fact that **the robot always start from the same position (bottom-left corner) are responsible of the wrong estimation for those states**. 

Starting from the state (1,1) the robot will never reach those states and it cannot estimate the corresponding utilities. As I told you this is a problem because we cannot estimate those values but at the same time it is an advantage. In a very big grid world we can estimate the utilities only for the states we are interested in, saving time and resources and focusing only on a particular subspace of the world.

In [21]:
value_matrix = np.zeros((3, 4))

# init with 1.0e-10 to avoid division by zero
running_mean_matrix = np.full((3,4), 1.0e-10) 

gamma = 0.999
total_epoch = 50000
print_epoch = 10000

for epoch in range(total_epoch):
    # starting a new episode
    episode_list = list()
    
    # reset
    observation = env.reset(exploring_starts = True)
    for _ in range(1000):
        # Take the action from the action matrix
        action = policy_matrix[observation[0], observation[1]]
        # Move one step in the environment and get obs and reward
        observation, reward, done = env.step(action)
        # Append the visit in the episode list
        episode_list.append((observation, reward))
        if done: break
            
    # The episode is finished
    counter = 0
    # Checkup to identify if it is the first visit to a state
    checkup_matrix = np.zeros((3,4))
    # This cycle is the implementation of First-Visit MC.
    # For each state stored in the episode list it checks if it
    # is the first visit and then estimates the return.
    for visit in episode_list:
        observation = visit[0]
        row = observation[0]
        col = observation[1]
        reward = visit[1]
        if(checkup_matrix[row, col] == 0): # first visit the state [row, col]
            return_value = calculate_return(episode_list[counter:], gamma)
            running_mean_matrix[row, col] += 1
            value_matrix[row, col] += return_value
            checkup_matrix[row, col] = 1
        counter += 1
    if(epoch % print_epoch == 0):
        print("Value matrix after " + str(epoch+1) + " iterations:") 
        print(value_matrix / running_mean_matrix)

#Time to check the value matrix obtained
print("Value matrix after " + str(total_epoch) + " iterations:")
print(value_matrix / running_mean_matrix)

Value matrix after 1 iterations:
[[ 0.        0.918041  0.959     1.      ]
 [ 0.        0.        0.        0.      ]
 [ 0.        0.        0.        0.      ]]
Value matrix after 10001 iterations:
[[ 0.80895536  0.86652174  0.91826584  1.        ]
 [ 0.75498824  0.          0.66168892 -1.        ]
 [ 0.6999732   0.64864294  0.61078822  0.32014546]]
Value matrix after 20001 iterations:
[[ 0.80795984  0.86578339  0.91728605  1.        ]
 [ 0.7552574   0.          0.66454441 -1.        ]
 [ 0.69752128  0.64614438  0.61276607  0.36230217]]
Value matrix after 30001 iterations:
[[ 0.80918353  0.86557823  0.9167413   1.        ]
 [ 0.75719981  0.          0.66114796 -1.        ]
 [ 0.69778092  0.64462955  0.60876963  0.38806528]]
Value matrix after 40001 iterations:
[[ 0.80810371  0.86504114  0.9164067   1.        ]
 [ 0.75639743  0.          0.65687153 -1.        ]
 [ 0.69748257  0.64470798  0.60605839  0.38750324]]
Value matrix after 50000 iterations:
[[ 0.80797203  0.8651374   0.9165257

To solve the issue that not every state has value, we could explore the state randomly. 

To enable the exploring starts in our code the only thing to do is to set the parameter **exploring_strarts** in the **reset() function to True**

### MC without known policy -- Monte Carlo control

In [22]:
def calculate_q_return(state_list, gamma):
    """
    params:
        state_list: list of tuple (observation, action, reward)
    
    return q value of (observation, action)
    """
    
    total_q_return = 0.0
    discount_factor = 1.0
    for visit in state_list:
        imm_reward = visit[2]
        total_q_return += discount_factor * imm_reward
        discount_factor *= gamma
        
    return total_q_return

In [23]:
# could imporve the performance
def improve_policy(episode_list, policy_matrix, state_action_matrix):
    """
    Imporve the policy by current new q values
    return:
        updated improved policy
    """
    shape = policy_matrix.shape
    for visit in episode_list:
        observation = visit[0]
        col = observation[1] + (observation[0] * shape[1])
        if policy_matrix[observation[0], observation[1]] >= 0:
            policy_matrix[observation[0], observation[1]] = np.argmax(state_action_matrix[:, col])
    return policy_matrix

In [30]:
def print_policy(policy):
    """
    param:
        policy: action: 0=Up, 1=right, 2=Down, 3=left, -1=terminal, -2=obstacle. Shape: (state_num, )
    return: null
    """
    
    def action_translate(index):
        if index == -1: return " * "
        elif index == 0: return " ^ "
        elif index == 1: return " > "
        elif index == 2: return " v "
        elif index == 3: return " < "
        elif index == -2: return " # "
        else: return " - "
        
    v_action_translate = np.vectorize(action_translate)
    symbols = v_action_translate(policy)
    print(symbols)

In [50]:
shape = (3, 4)

env = GridWorld(shape[0], shape[1])

#Define the state matrix
state_matrix = np.zeros(shape)
state_matrix[0, 3] = 1
state_matrix[1, 3] = 1
state_matrix[1, 1] = -1

#Define the reward matrix
reward_matrix = np.full(shape, -0.04)
reward_matrix[0, 3] = 1
reward_matrix[1, 3] = -1

transition_matrix = np.array([[0.8, 0.1, 0.0, 0.1],
                              [0.1, 0.8, 0.1, 0.0],
                              [0.0, 0.1, 0.8, 0.1],
                              [0.1, 0.0, 0.1, 0.8]])

# Random policy matrix
policy_matrix = np.random.randint(low=0, high=4, 
                                  size=shape)
policy_matrix[1,1] = -2 # -2 for the obstacle at (1,1)
policy_matrix[0,3] = policy_matrix[1,3] = -1 #No action (terminal states)

#Set the matrices in the world
env.setStateMatrix(state_matrix)
env.setRewardMatrix(reward_matrix)
env.setTransitionMatrix(transition_matrix)

# State-action matrix (init to zeros or to random values)
state_action_matrix = np.zeros((4, shape[0] * shape[1])) # Q

# init with 1.0e-10 to avoid division by zero
running_mean_matrix = np.full(state_action_matrix.shape, 1.0e-10) 

total_epoch = 100000
print_epoch = 200000
gamma = 0.999
epsilon = 0.1

for epoch in range(total_epoch):
    episode_list = list()
    observation = env.reset(exploring_starts = True)
    
    for _ in range(1000):
        # Take the action from policy matrix
        action = policy_matrix[observation[0], observation[1]]
        if np.random.rand() < epsilon: # 10% random choose
            action = np.random.randint(0, 4)
            
        new_observation, reward, done = env.step(action)
        episode_list.append((observation, action, reward))
        observation = new_observation
        if done: break
            
    # The episode is finished.
    counter = 0
    # Checkup to identify if it is the first visit to a state-action
    checkup_matrix = np.zeros_like(state_action_matrix)
    # This cycle is the implementation of First-Visit MC.
    # For each state-action stored in the episode list it checks if 
    # it is the first visit and then estimates the return. 
    # This is the Evaluation step of the GPI.
    for visit in episode_list:
        observation = visit[0]
        action = visit[1]
        col = observation[1] + (observation[0]*shape[1])
        row = action
        if(checkup_matrix[row, col] == 0):
            return_value = calculate_q_return(episode_list[counter:], gamma)
            running_mean_matrix[row, col] += 1
            state_action_matrix[row, col] += return_value
            checkup_matrix[row, col] = 1
        counter += 1
    # Policy Update (Improvement)
    policy_matrix = improve_policy(episode_list, 
                                   policy_matrix, 
                                   state_action_matrix/running_mean_matrix)
    # Printing
    if(epoch + 1 % print_epoch == 0):
        print("")
        print("State-Action matrix after " + str(epoch+1) + " iterations:") 
        print(state_action_matrix / running_mean_matrix)
        print("Policy matrix after " + str(epoch+1) + " iterations:") 
        print(policy_matrix)
        print_policy(policy_matrix)
    
# Time to check the utility matrix obtained
print("Utility matrix after " + str(total_epoch) + " iterations:")
print(state_action_matrix / running_mean_matrix)
print(policy_matrix)
print_policy(policy_matrix)


State-Action matrix after 1 iterations:
[[ 0.        0.        0.        0.        0.        0.        0.        0.
   0.        0.        0.        0.      ]
 [ 0.        0.        0.        0.        0.        0.        0.        0.
   0.        0.        0.       -1.077961]
 [ 0.        0.        0.        0.        0.        0.        0.        0.
   0.        0.        0.        0.      ]
 [ 0.        0.        0.        0.        0.        0.        0.        0.
   0.        0.        0.        0.      ]]
Policy matrix after 1 iterations:
[[ 1  1  1 -1]
 [ 0 -2  3 -1]
 [ 2  3  3  0]]
[[' > ' ' > ' ' > ' ' * ']
 [' ^ ' ' # ' ' < ' ' * ']
 [' v ' ' < ' ' < ' ' ^ ']]


KeyboardInterrupt: 

In [49]:
state_action_value = state_action_matrix / running_mean_matrix
optimal_value = np.argmax(state_action_value, axis = 0)
optimal_value = state_action_value[optimal_value, range(len(optimal_value))]
print(optimal_value.reshape(3, 4))

[[ 0.82247475  0.88623304  0.9494786   0.        ]
 [ 0.76526165  0.          0.68211216  0.        ]
 [ 0.70197114  0.64712202  0.59587756  0.34773295]]
