## Frozen Lake Monte Carlo Example
Monte Carlo evaluation / improvement on the openai gym FrozenLake enironment. 

NOTE: posibly broken due to low win ratio at the bottom

In [16]:
import gym
import numpy as np
import operator
from IPython.display import clear_output
from time import sleep
import random
import itertools
import tqdm

tqdm.monitor_interval = 0

The Frozen Lake environment is a 4×4 grid which contain four possible areas  — Safe (S), Frozen (F), Hole (H) and Goal (G).

Actions are:
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

The episode ends when you reach the goal or fall in a hole. You receive a reward of 1 if you reach the goal, and zero otherwise.

In [17]:
env = gym.make('FrozenLake-v0')
print("Action Count:", env.action_space.n)
print("Observation Count:", env.observation_space.n)
env.render()

Action Count: 4
Observation Count: 16

SFFF
FHFH
FFFH
HFFG


In [18]:
env.env.P

{0: {0: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  2: [(0.3333333333333333, 4, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)],
  3: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 1: {0: [(0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True)],
  1: [(0.3333333333333333, 0, 0.0, False),
   (0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False)],
  2: [(0.3333333333333333, 5, 0.0, True),
   (0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False)],
  3: [(0.3333333333333333, 2, 0.0, False),
   (0.3333333333333333, 1, 0.0, False),
   (0.3333333333333333, 0, 0.0, False)]},
 2:

## Monte Carlo
Function for Random Policy. Returns equal probabilities for each action for all states.

In [19]:
def create_random_policy(env):
    policy = {}
    for key in range(0, env.observation_space.n):
        current_end = 0
        p = {}
        for action in range(0, env.action_space.n):
            p[action] = 1 / env.action_space.n
        policy[key] = p
    return policy

Function to play episode

In [20]:
def play_episode(env, policy, display=True):
    """Play an episode of the game

    Args:
      env: 
      policy:
      display:
    Returns:
      episode: list of timestamps [state, action, reward]
    """
    env.reset()
    episode = []
    finished = False

    while not finished:
        s = env.env.s
        if display:
            clear_output(True)
            env.render()
            sleep(1)

        timestep = []
        timestep.append(s)
        
        # get a random action to take
        n = random.uniform(0, sum(policy[s].values()))
        top_range = 0
        for prob in policy[s].items():
            top_range += prob[1]
            if n < top_range:
                action = prob[0]
                break 
        
        # take the action and record results
        state, reward, finished, info = env.step(action)
        timestep.append(action)
        timestep.append(reward)

        # add timestep to the episode
        episode.append(timestep)

    if display:
        clear_output(True)
        env.render()
        sleep(1)
    return episode

Function to test policy and print win percentage

In [21]:
def test_policy(policy, env):
    wins = 0
    r = 100
    for i in range(r):
        w = play_episode(env, policy, display=False)[-1][-1]
        if w == 1:
            wins += 1
    return wins / r

First Visit Monte Carlo Prediction and Control using epsilon soft exploration

In [22]:
def monte_carlo_e_soft(env, episodes=100, policy=None, epsilon=0.01):
    # Create a random policy of none was specified    
    if not policy:
        policy = create_random_policy(env)  
    
    # Create empty dictionary for storing rewards for each state-action pair
    Q = {}
    for key in policy.keys():
         Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
            
    returns = {} # 3.
    
    for _ in range(episodes): # Looping through episodes
        G = 0 # Store cumulative reward in G (initialized at 0)
        episode = run_game(env=env, policy=policy, display=False) # Store state, action and value respectively 
        
        # for loop through reversed indices of episode array. 
        # The logic behind it being reversed is that the eventual reward would be at the end. 
        # So we have to go back from the last timestep to the first one propagating result from the future.
        for i in reversed(range(0, len(episode))):   
            s_t, a_t, r_t = episode[i] 
            state_action = (s_t, a_t)
            G += r_t # Increment total reward by reward on current timestep
            
            if not state_action in [(x[0], x[1]) for x in episode[0:i]]: # 
                if returns.get(state_action):
                    returns[state_action].append(G)
                else:
                    returns[state_action] = [G]   
                    
                Q[s_t][a_t] = sum(returns[state_action]) / len(returns[state_action]) # Average reward across episodes
                
                Q_list = list(map(lambda x: x[1], Q[s_t].items())) # Finding the action with maximum value
                indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
                max_Q = random.choice(indices)
                
                A_star = max_Q # 14.
                
                for a in policy[s_t].items(): # Update action probability for s_t in policy
                    if a[0] == A_star:
                        policy[s_t][a[0]] = 1 - epsilon + (epsilon / abs(sum(policy[s_t].values())))
                    else:
                        policy[s_t][a[0]] = (epsilon / abs(sum(policy[s_t].values())))

    return policy

Run this algorithm to solve an 8×8 frozen lake environment and check the reward

In [25]:
env = gym.make("FrozenLake8x8-v0")
policy = monte_carlo_e_soft(env, episodes=5000)
print("Win Ratio: ", test_policy(policy, env))

Win Ratio:  0.11
