# 2. Hands on Reinforcement Learning

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import gym
import numpy as np
import time
import math

In [3]:
env = gym.make('FrozenLake-v0')

### Utility functions

In [4]:
def getReward(env):
    n_states, n_actions = env.observation_space.n, env.action_space.n
    
    R = np.zeros((n_states, n_actions))
    for s in range(n_states):
        for a, moves in env.env.P[s].items():
            for possible_move in moves:
                prob, _, r, _ = possible_move
                R[s, a] += r * prob
    
    return R

def getProb(env):
    n_states, n_actions = env.observation_space.n, env.action_space.n
    
    P = np.zeros((n_states, n_actions, n_states))
    for s in range(n_states):
        for a in range(n_actions):
            for moves in env.env.P[s][a]:
                prob, next_s, _, _ = moves
                P[s, a, next_s] += prob
    
    return P

def print_value(V, width=4, height=4):
    return np.around(np.resize(V, (width, height)), 4)

# let's plot the policy matrix (as in Part 1). according to
# https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py
# LEFT = 0   DOWN = 1   RIGHT = 2  UP = 3
def print_policy(V, width=4, height=4):
    table = {0: "←", 1: "↓", 2: "→", 3: "↑"}
    policy = np.resize(V, (width, height))
    
    # transform using the dictionary
    return np.vectorize(table.get)(policy)

### Policy Iteration

In [5]:
# to evaluate the policy, as there is no max in the equation we can just solve
# the linear system
def policy_evaluation(pi, P, R, gamma, n_states):
    p = np.zeros((n_states, n_states))
    r = np.zeros((n_states, 1))
    
    for s in range(n_states):
        r[s] = R[s, pi[s]]
        p[s, :] = P[s, pi[s], :]
    
    # we take [:, 0] to return a vector because otherwise we have
    # a matrix of size (# states, 1)
    return np.linalg.inv((np.eye(n_states) - gamma * p)).dot(r)[:, 0]

def policy_iteration(env, epsilon, gamma, max_iter=10000):
    n_states, n_actions = env.observation_space.n, env.action_space.n
    
    # initialize arbitrary value function
    V = np.zeros(n_states)
    
    # initialize arbitrary policy
    pi = np.ones(n_states, dtype=int)
    
    R = getReward(env)
    P = getProb(env)
    
    i = 0
    
    while True and i < max_iter:
        V_prev = V.copy()
        
        # evaluate the policy
        V = policy_evaluation(pi, P, R, gamma, n_states)
        
        # policy improvement
        for s in range(n_states):
            pi[s] = np.argmax(R[s,:] + gamma * P[s, :, :].dot(V)) 
        
        if np.linalg.norm(V_prev - V) < epsilon:
            print("Policy iteration converged after ", i+1, "epochs")
            break
        
        i += 1
    
    return V, pi

In [6]:
V, pi = policy_iteration(env, 1e-8, 0.8)

Policy iteration converged after  4 epochs


In [7]:
print_value(V)
print_policy(pi, width=4, height=4)

array([['↓', '↑', '→', '↑'],
       ['←', '←', '←', '←'],
       ['↑', '↓', '←', '←'],
       ['←', '→', '↓', '←']], dtype='<U1')

### Value iteration

In [8]:
def valueIteration(env, epsilon, gamma, max_iter=10000):
    n_states, n_actions = env.observation_space.n, env.action_space.n
    
    # initialize utilities to 0
    V = np.zeros(n_states)
    
    R = getReward(env)
    P = getProb(env)
    
    i = 0
    while True and i < max_iter:
        i += 1
        prev_V = V.copy()
        for s in range(n_states):
            V[s] = max(R[s,:] + gamma * P[s, :, :].dot(V))

        if np.linalg.norm(prev_V - V) <= epsilon:
            print("Value iteration converged after ", i+1, "epochs")
            break
    
    return V

In [9]:
V = valueIteration(env, 1e-8, 0.8)

# display value function:
print_value(V)

Value iteration converged after  46 epochs


array([[0.0154, 0.0156, 0.0274, 0.0157],
       [0.0269, 0.    , 0.0598, 0.    ],
       [0.0584, 0.1338, 0.1967, 0.    ],
       [0.    , 0.2465, 0.5442, 0.    ]])

give us the expected reward we would get if we actually DO that action

In [10]:
# transform value function into a policy
def value_to_policy(env, gamma, V):
    n_states, n_actions = env.observation_space.n, env.action_space.n
    
    policy = np.zeros(n_states, dtype=int)
    for state in range(n_states):
        best_action = 0
        best_reward = -float("inf")
        for action in range(n_actions):
            moves = env.env.P[state][action] # [(prob, next_state, reward, terminate), ...]
            avg_reward = sum([prob * reward + gamma * V[next_state] for (prob, next_state, reward, _) in moves])
            
            if avg_reward > best_reward:
                best_reward = avg_reward
                best_action = action
        
        policy[state] = best_action
    
    return policy

In [11]:
pol = value_to_policy(env, 0.8, V)
pol

array([1, 3, 2, 3, 0, 0, 0, 0, 3, 1, 0, 0, 0, 2, 1, 0])

In [12]:
print_policy(pol, width=4, height=4)

array([['↓', '↑', '→', '↑'],
       ['←', '←', '←', '←'],
       ['↑', '↓', '←', '←'],
       ['←', '→', '↓', '←']], dtype='<U1')