In [1]:
import numpy as np
import gym

In [2]:
def action_value_function(state, action, V, transition_matrix, gamma):
    """(Helper method)
    Action-value function. Returns the expected reward to get from the given state and 
    taking the given action. 
    
    Args:
        state: current state.
        action: action to take from the current state.
        V: Vector of the optimal state values for the next time step.
        transition_matrix: Transition matrix of the MDP, each cell [s][a] is a list of tuples 
            (probability, next state, reward, done). Meaning that in state s and taking
            action a, we can get to any of those next states with the respective probability
            and reward.
        gamma: discount factor.
        
    Returns:
        The q value (also known as action value) for the given state and action
    """
    q_value = 0
    for prob, next_state, reward, _ in transition_matrix[state][action]:
        q_value += prob * (reward + gamma * V[next_state])
    return q_value

In [3]:
def value_iteration(env, gamma, stop_callback):
    """Value iteration algorithm is used to find the optimal state values (or action values
    but generally the state values are computed because they need less or the same memory) for 
    a Markov Decision Process where everything is known (the set of states, set of actions, reward 
    function and transition function).
    
    Value iteration uses a Dynamic Programming approach to find the optimal values because the
    formula can be expressed in a recursive way and the subproblems overlap. Thanks to this we
    have an exact method to calculate the optimal values in polynomial complexity. 
    
    However, as many other problems, an exact method is very inefficient for very large problems.
    This and having to know every parameter of the MDP are the weak points of the algorithms based
    on Dynamic Programming for getting an optimal policy (or optimal values that we can use to get 
    the policy).
    
    Another thing to note is that this is not Reinforcement Learning, note that here we know everything
    about the MDP and we are just applying an exact method to "solve" it, we are not learning anything.
    
    Args:
        env: The Markov Decision Process with all parameters known.
        gamma: The discount factor.
        stop_callback: Callback to indicate when to stop. It receives the current
            iteration index, a numpy vector of the previous state values and a numpy
            vector of the current state values. It must return a boolean indicating whether
            the algorithm should stop or not (return True to stop, False to continue). This
            way we can stop the algorithm by a maximum number of iterations or when the difference
            between the values from one iteration to the next is so small that we can consider
            that the algorithm converged.

    Returns:
        A numpy array with the optimal state values when there are k time steps left
        to finish the game.
    """
    # Maximum number of iterations to do
    # So the algorithm doesn't run forever
    MAX_K = int(1E9)

    # Initialization:
    # k=0. We have 0 time steps left, meaning that the game
    # is finished and the agent cannot do anything
    # so the value of every state is 0
    # Note that we are formulating the problem this way to use a Dynamic
    # Programming tabulation approach
    V = np.zeros(env.nS)

    # Now, having the optimal state values when there are k-1 time steps left
    # we can calculate the optimal state values when there are k time steps left.
    # This is thanks to using a Dynamic Programming tabulation approach
    for k in range(1, MAX_K):
        # Keep a copy of V because we are going to be modifying V and we
        # need to be taking a look at the values of the previous V
        V_prev = np.copy(V)
        
        # Calculate the optimal value for each state when there
        # are k time steps left
        for state in range(env.nS):
            # Calculate all the optimal Q values (action values) for the current state and time step
            q_values = [action_value_function(state, action, V_prev, env.P, gamma) for action in range(env.nA)]
            
            # The value of the state in the current time step is the maximum of the Q values
            V[state] = max(q_values)

        # Break if we are done
        if (stop_callback(k, V_prev, V)):
            break
            
    return V

In [4]:
def policy_extraction(env, V, gamma):
    """Policy extraction algorithm, it simply calculates the policy
    given the state values and the Markov Decision Process. 
    
    Basically this is done by looking around each state and assigning
    as the best action the one that maximizes the expected reward. We
    cannot say that the best action is the one which leads to the best
    neighbor state because in a stochastic environment the action may fail
    and end up in another state which could be the worst state (like a die state), 
    so that is why we take the action that maximizes the expected reward.
    
    Args:
        env: MDP
        V: Vector of state values
        gamma: discount factor
        
    Returns:
        Policy extracted from the given state values
    """
    policy = np.zeros(env.nS, dtype=int)
    
    for state in range(env.nS):
        q_values = [action_value_function(state, action, V, env.P, gamma) for action in range(env.nA)]
        policy[state] = np.argmax(q_values)

    return policy
    

In [14]:
def run_episode(env, policy, render=True):
    """Interact with the given environment following the given policy
    
        Args:
            env: MDP model of the environment
            policy: Array acting as a map of state to actions
            render: boolean indicating whether to call env.render() to
                display the environment in each step or not
                
        Returns:
            The total reward gotten
    """
    current_env_state = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        if render:
            env.render()
        action = policy[current_env_state]
        current_env_state, reward, done, _ = env.step(action)
        total_reward += reward
            
    return total_reward

In [26]:
# MDP model of the environment
env = gym.make('FrozenLake-v0').unwrapped

# Discount factor
gamma = 1

# Callback used in value_iteration to know when to stop iterating
# In this case we stop when the difference between the values from one time step
# to the next are smaller than a threshold
stop_evaluator = lambda k, V_prev, V: np.sum(np.fabs(V_prev - V)) <= 1E-9

# Get the optimal state values via value_iteration
V = value_iteration(env, gamma, stop_evaluator)

# Get the optimal policy via policy_extraction with the optimal values
policy = policy_extraction(env, V, gamma)

# Use the optimal policy to run an episode and print also the reward
total_reward = run_episode(env, policy)
print(f'Reward: {total_reward}')

env.close()


[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
[41mS[0mFFF
FHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
FFFH
H[41mF[0mFG
  (Right)
SFFF
FHFH
F[41mF[0mFH
HFFG
  (Down)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
[41mF[0mHFH
FFFH
HFFG
  (Left)
SFFF
FHFH
[41mF[0mFFH
HFFG
  (Up)
SFFF
FHFH
[41mF[0mFFH
HFF