In [51]:
import gym
import numpy as np

In [52]:
env = gym.make("FrozenLake-v1") # , is_slippery=False 

  "Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."
  "Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future."


We will be using OpenAI Gym. In Gym, every environment has a state and action space, accessible via `env.action_space` and `env.observation_space`

The underlying source code for the environment is available at: https://github.com/openai/gym/blob/master/gym/envs/toy_text/frozen_lake.py

In [53]:
action_size = env.action_space.n
state_size = env.observation_space.n
print(f"Action size: {action_size}")
print(f"State size: {state_size}")

Action size: 4
State size: 16


In [54]:
# action space is represnted by a Discrete object
print(env.action_space)

Discrete(4)


In [55]:
def get_random_trajectory(render=False):
    ob = env.reset()
    traj_length = 0
    rewards = []
    while True:
        action = env.action_space.sample()
        ob, reward, done, _ = env.step(action)
        traj_length += 1
        rewards.append(reward)
        if done:
            break
        if render:
            env.render()
    return rewards, traj_length

In [56]:
mean_traj_length = 0
n_episodes = 10000
for _ in range(n_episodes):
    rewards, traj_length  = get_random_trajectory(False)
    mean_traj_length += 1 / n_episodes * traj_length
    # print(np.sum(rewards))
print(mean_traj_length)

7.624800000000369


To implement value iteration and policy iteration, we need the underlying transition distributions. This is in general not available in Gym environments, but we can access it for FrozenLake.

`env.env.P` is a dictionary containing the underlying transition and reward dynamics for the environment.

In [57]:
print(env.env.P.keys())
transition_dict = env.env.P

dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])


We can view the transition distribution for the initial state. Each key corresponds to an action, and the values give transition probabilities. The possible transitions are specified via tuples representing the probability of the transition, the next state, the reward, and whether the episode terminates.

Actions are 0-indexed and correspond to ["Left", "Down", "Right", "Up"].

In [58]:
init_state = transition_dict[14]
actions = ["Left", "Down", "Right", "Up"]
for k in init_state:
    print(f"action {k}: transitions (p, ns, r, d):  {init_state[k]}")

action 0: transitions (p, ns, r, d):  [(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
action 1: transitions (p, ns, r, d):  [(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
action 2: transitions (p, ns, r, d):  [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
action 3: transitions (p, ns, r, d):  [(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]


In [59]:
# helper function to print policy
def print_policy(policy):
    reshaped_policy = policy.reshape(4,4)
    for i in range(4):
        x = " "
        for j in range(4):
            x += actions[int(reshaped_policy[i][j])]
            if j < 3:
                x += " | "
        print(x)


In [60]:
policy = np.random.randint(4, size=16)
print("Random policy: ")
print_policy(policy)

Random policy: 
 Up | Up | Down | Down
 Right | Up | Up | Right
 Left | Down | Right | Up
 Down | Right | Right | Left


We have provided some possible functions and their signatures, though you are certainly free to modify as you see fit.

In [61]:
def value_iteration(values, gamma, iterations=100, termination=1e-4):
    policy = np.zeros((16,), dtype=int)
    for _ in range(iterations):
        max_update = 0
        # can make asynchronous updates to values
        for i in range(16):
            # ********** TODO ***********
            v = values[i]
            state_transition = transition_dict[i]
            best_action = 0
            best_action_return = -np.inf
            for a in range(len(state_transition)):
                return_a = 0
                model_given_s_and_a = state_transition[a]
                for j in range(len(model_given_s_and_a)):
                    prob = model_given_s_and_a[j][0]
                    next_state = model_given_s_and_a[j][1]
                    reward = model_given_s_and_a[j][2]
                    return_a += prob * (reward + gamma * values[next_state])
                if return_a > best_action_return:
                    best_action = a
                    best_action_return = return_a
            policy[i] = best_action
            values[i] = best_action_return
            max_update = max(max_update, abs(best_action_return - v))
            # ********** TODO ***********
        # terminate if values don't change much 
        # print(max_update)   
        if max_update < termination:
            break
    return policy, values

# estimate values
def policy_evaluation(policy, init_values, gamma, termination=1e-4):
    # ********** TODO ***********
    values = init_values.copy()
    while True:
        max_update = 0
        # can make asynchronous updates to values
        for i in range(16):
            # ********** TODO ***********
            v = values[i]
            state_transition = transition_dict[i]
            a = policy[i]
            return_a = 0
            model_given_s_and_a = state_transition[a]
            for j in range(len(model_given_s_and_a)):
                prob = model_given_s_and_a[j][0]
                next_state = model_given_s_and_a[j][1]
                reward = model_given_s_and_a[j][2]
                return_a += prob * (reward + gamma * values[next_state])
            values[i] = return_a
            max_update = max(max_update, abs(return_a - v))
            # ********** TODO ***********
        # terminate if values don't change much 
        # print(max_update)   
        if max_update < termination:
            break
    return values

# update actions
def policy_improvement(values, gamma):
    # ********** TODO ***********
    policy = np.zeros((16,), dtype=int)
    for i in range(16):
        # ********** TODO ***********
        state_transition = transition_dict[i]
        best_action = 0
        best_action_return = -np.inf
        for a in range(len(state_transition)):
            return_a = 0
            model_given_s_and_a = state_transition[a]
            for j in range(len(model_given_s_and_a)):
                prob = model_given_s_and_a[j][0]
                next_state = model_given_s_and_a[j][1]
                reward = model_given_s_and_a[j][2]
                return_a += prob * (reward + gamma * values[next_state])
            if return_a > best_action_return:
                best_action = a
                best_action_return = return_a
        policy[i] = best_action
        
    return policy


In [62]:
policy, value = value_iteration(np.zeros((16,)), 0.9, iterations=100, termination=1e-4)

In [63]:
print_policy(policy)

 Left | Up | Left | Up
 Left | Left | Left | Left
 Up | Down | Left | Left
 Left | Right | Down | Left


In [64]:
print(value.reshape((4,4)))

[[0.06848032 0.06111567 0.07422254 0.05560469]
 [0.09153995 0.         0.11212558 0.        ]
 [0.14522151 0.24737863 0.29954442 0.        ]
 [0.         0.37986011 0.63898452 0.        ]]


In [65]:
def policy_iteration(gamma=0.9):
    old_policy = np.random.randint(4, size=16)
    old_value = np.zeros((16,))
    while True:
        value = policy_evaluation(old_policy, old_value, gamma)
        policy = policy_improvement(value, gamma)
        if np.sum(policy != old_policy) == 0:
            break
        old_policy = policy
        old_value = value
    return policy, value


In [66]:
policy, value = policy_iteration()

In [67]:
print_policy(policy)

 Left | Up | Left | Up
 Left | Left | Left | Left
 Up | Down | Left | Left
 Left | Right | Down | Left


In [68]:
print(value.reshape((4,4)))

[[0.06876448 0.0612918  0.07432063 0.05562446]
 [0.09175113 0.         0.11216899 0.        ]
 [0.14535878 0.24744876 0.29958535 0.        ]
 [0.         0.37990298 0.63900425 0.        ]]


In [75]:
def q_learning(n_episodes=1000, 
               alpha=0.5, 
               epsilon=0.1, 
               epsilon_decay=True,
               gamma=0.9,
               render=False):
    n_states = env.observation_space.n
    n_actions = env.action_space.n
    q = np.zeros((n_states, n_actions))

    for i in range(n_episodes):
        init_ob = env.reset()
        done = False
        while not done:
            rnd = np.random.uniform()
            if rnd <= epsilon:
                action = np.random.randint(4)
            else:
                state_q = q[init_ob, :]
                a = np.where(state_q == max(state_q))[0]
                action = a[np.random.randint(np.size(a))]
            ob, reward, done, _ = env.step(action)
            q[init_ob, action] = q[init_ob, action] +\
                alpha * (reward + gamma * max(q[ob, :]) - q[init_ob, action])
            init_ob = ob
            if render:
                env.render()

    return q

In [76]:
print_policy(np.argmax(q_learning(), axis=1))

 Left | Up | Left | Up
 Left | Left | Left | Left
 Down | Down | Down | Left
 Left | Down | Right | Left
