In [7]:
import gym
import numpy as np
import operator
from IPython.display import clear_output
from time import sleep
from gym.spaces.tuple_space import Tuple
from gym.envs.registration import register
from gym import wrappers
import random
import itertools
import tqdm

## Create Environment 

In [2]:
register(
    id='FrozenLakeNotSlippery-v0',
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name' : '4x4', 'is_slippery': False},
    max_episode_steps=200
)

register(
    id='FrozenLakeNotSlippery8x8-v0',
    entry_point='gym.envs.toy_text:FrozenLakeEnv',
    kwargs={'map_name' : '8x8', 'is_slippery': False},
    max_episode_steps=200
)


fl_slippery = {
    'small': 'FrozenLake-v0',
    'big': 'FrozenLake8x8-v0'
}

fl_not_slippery = {
    'small': 'FrozenLakeNotSlippery-v0',
    'big': 'FrozenLakeNotSlippery8x8-v0'
}


def create_environment(slippery=False, big=False):
    if slippery:
        env = gym.make(fl_slippery['big'] if big else fl_slippery['small'])
    else:
        env = gym.make(fl_not_slippery['big'] if big else fl_not_slippery['small'])
    env.reset()
    return env

def create_random_policy(env):
    policy = {}
    for key in range(0, env.observation_space.n):
        current_end = 0
        p = {}
        for action in range(0, env.action_space.n):
            p[action] = 1 / env.action_space.n
        policy[key] = p
    return policy


def create_state_action_dictionary(env, policy):
    Q = {}
    for key in policy.keys():
        Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
    return Q    

def run_game(env, policy, display=True):
    env.reset()
    episode = []
    finished = False
    
    while not finished:
        s = env.env.s
            
        if display:
            clear_output(True)
            env.render()
            sleep(0.1)

        timestep = []
        timestep.append(s)
        
        n = random.uniform(0, sum(policy[s].values()))
        top_range = 0
        for prob in policy[s].items():
            top_range += prob[1]
            if n < top_range:
                action = prob[0]
                break   
        
        state, reward, finished, info =  env.step(action)
        timestep.append(action)
        timestep.append(reward)
        
        episode.append(timestep)
        
    if display:
        clear_output(True)
        env.render()
        sleep(0.05)
    
    return episode

def test_policy(policy, env):
    wins = 0
    r = 100
    for i in range(r):
        w = run_game(env, policy, display=False)[-1][-1]
        if w == 1:
            wins += 1
    return wins / r


### Build Monte Carlo Policy

In [3]:
def monte_carlo(env, episodes=100, policy=None, epsilon=0.01):
    if not policy:
        policy = create_random_policy(env)  
        
    Q = create_state_action_dictionary(env, policy) 
    returns = {} 
    
    for _ in range(episodes): 
        G = 0 
        episode = run_game(env=env, policy=policy, display=False) 
        for i in reversed(range(0, len(episode))): 
            s_t, a_t, r_t = episode[i] 
            G += r_t 
            
            if not state_action in [(x[0], x[1]) for x in episode[0:i]]: 
                if returns.get(state_action): 
                    returns[state_action].append(G)
                else:
                    returns[state_action] = [G]   
                    
                Q[s_t][a_t] = sum(returns[state_action]) / len(returns[state_action]) 
                
                Q_list = list(map(lambda x: x[1], Q[s_t].items())) 
                indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
                max_Q = random.choice(indices)
                
                A_star = max_Q 
                
                for a in policy[s_t].items(): 
                    if a[0] == A_star:
                        policy[s_t][a[0]] = 1 - epsilon + (epsilon / abs(sum(policy[s_t].values())))
                    else:
                        policy[s_t][a[0]] = (epsilon / abs(sum(policy[s_t].values())))

    return policy

##### Slippery

In [4]:
env = create_environment(slippery=False, big=False)
policy = monte_carlo_e_soft(env, episodes=200)
test_policy(policy, env)

run_game(env, policy)

  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


[[0, 1, 0.0],
 [4, 1, 0.0],
 [8, 2, 0.0],
 [9, 2, 0.0],
 [10, 1, 0.0],
 [14, 2, 1.0]]

#### Not Slippery

In [6]:
env = create_environment(slippery=True, big=False)
policy = monte_carlo_e_soft(env, episodes=500)
test_policy(policy, env)

run_game(env, policy)

  (Down)
SFFF
FHFH
FFFH
HFF[41mG[0m


[[0, 1, 0.0],
 [1, 3, 0.0],
 [1, 3, 0.0],
 [2, 3, 0.0],
 [1, 3, 0.0],
 [0, 1, 0.0],
 [4, 0, 0.0],
 [4, 0, 0.0],
 [4, 0, 0.0],
 [8, 3, 0.0],
 [8, 3, 0.0],
 [4, 0, 0.0],
 [4, 0, 0.0],
 [4, 0, 0.0],
 [8, 3, 0.0],
 [9, 1, 0.0],
 [13, 2, 0.0],
 [13, 2, 0.0],
 [9, 1, 0.0],
 [13, 2, 0.0],
 [13, 2, 0.0],
 [13, 2, 0.0],
 [13, 2, 0.0],
 [14, 1, 1.0]]