In [22]:
import gym_utils as utils
import random
from IPython.display import clear_output
from time import sleep
import numpy as np
from random import randint

In [30]:
def run_game(env, policy, display=True, should_return=True):
    env.reset()
    episode = []
    finished = False
    
    while not finished:
        s = env.env.s  # state
        
        if display:
            clear_output(True)
            env.render()
            sleep(0.5)
        
        timestep = []
        timestep.append(s)
        action = policy[s]
        
        # new state
        state, reward, finished, info = env.step(action)
        timestep.append(action)
        timestep.append(reward)
        
        episode.append(timestep)
        
    if display:
        clear_output(True)
        env.render()
        sleep(0.5)
        
    if should_return:
        return episode

In [31]:
def  argmax_Q(Q,  s):            
    Q_list = list(map(lambda x: x[1], Q[s].items())) # 13.
    indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
    max_Q = random.choice(indices)
    return max_Q

def greedy_policy(Q):
    policy = {}
    for state in Q.keys():
        policy[state] = argmax_Q(Q, state)
    return policy

def field_list(env):
    l = []
    for row in list(map(lambda x: list([str(y)[-2] for y in x]), list(env.env.desc))):
        for field in row:
            l.append(field)
    return l   

def create_state_action_dictionary(env, policy):
    Q = {}
    fields = field_list(env)
    for key in policy.keys():
        if fields[key] in ['F', 'S']:
            Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
        else:
            Q[key] = {a: 0.0 for a in range(0, env.action_space.n)}
    return Q

def test_policy(policy, env):
    wins = 0
    r = 1000
    for i in range(r):
        w = run_game(env, policy, display=False)[-1][-1]
        if w == 1:
            wins += 1
    return wins / r

### SARSA (on-policy TD control)

- On-policy
- learn action-value function rather than state-value function

In [32]:
def sarsa(env, episodes=100, step_size=0.01, exploration_rate=0.01):
    policy = utils.create_random_policy(env)
    Q = create_state_action_dictionary(env, policy)
    for episode in range(episodes):
        env.reset()
        S = env.env.s
        A = greedy_policy(Q)[S] 
        finished = False
        while not finished:
            S_prime, reward, finished, _ = env.step(A)
            A_prime = greedy_policy(Q)[S_prime] 
            Q[S][A] = Q[S][A] + step_size * (reward + exploration_rate * Q[S_prime][A_prime] - Q[S][A])
            S = S_prime
            A = A_prime
            
    return greedy_policy(Q), Q

4 x 4

In [40]:
env = utils.create_environment(slippery=False, big=False)
policy, Q = sarsa(env, episodes=10000, step_size=0.1, exploration_rate=0.1)
test_policy(policy, env)

1.0

In [41]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


8 x 8

In [42]:
env = utils.create_environment(slippery=False, big=True)
policy, Q = sarsa(env, episodes=3000, step_size=0.2, exploration_rate=0.2)
test_policy(policy, env)

1.0

In [43]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFF[41mG[0m


### Q-Learning

In [51]:
def Q_learning(env, episodes=100, step_size=0.01, exploration_rate=0.01):
    policy = utils.create_random_policy(env)
    Q = create_state_action_dictionary(env, policy)
    for episode in range(episodes):
        env.reset()
        S = env.env.s
        finished = False
        while not finished:
            A = greedy_policy(Q)[S]
            S_prime, reward, finished, _ = env.step(A)
            Q[S][A] = Q[S][A] + step_size * (reward + exploration_rate * max(Q[S_prime].values()) - Q[S][A])
            S = S_prime
            
    return greedy_policy(Q), Q

4 x 4

In [55]:
env = utils.create_environment(slippery=False, big=False)
policy, Q = Q_learning(env, episodes=1000, step_size=0.1, exploration_rate=0.2)
test_policy(policy, env)

1.0

In [56]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


8 x 8

In [57]:
env = utils.create_environment(slippery=False, big=True)
policy, Q = Q_learning(env, episodes=1000, step_size=0.1, exploration_rate=0.2)
test_policy(policy, env)

1.0

In [58]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFF[41mG[0m


### Double Q-Learning

In [59]:
def double_Q_learning(env, episodes=100, step_size=0.01, exploration_rate=0.01):
    policy = utils.create_random_policy(env)
    Q_1 = create_state_action_dictionary(env, policy)
    Q_2 = create_state_action_dictionary(env, policy)

    for episode in range(episodes):
        env.reset()
        S = env.env.s
        finished = False
        while not finished:
            Q = {s: {a: av + Q_2[s][a] for a, av in sv.items()} for s, sv in Q_1.items()}
            A = greedy_policy(Q)[S]
            S_prime, reward, finished, _ = env.step(A)
            
            if np.random.uniform() < 0.5:
                Q_1[S][A] = Q_1[S][A] + step_size * (reward + exploration_rate * max(Q_2[S_prime].values()) - Q_1[S][A])
            else:
                Q_2[S][A] = Q_2[S][A] + step_size * (reward + exploration_rate * max(Q_1[S_prime].values()) - Q_2[S][A])


            S = S_prime
    
    Q = {s: {a: av + Q_2[s][a] for a, av in sv.items()} for s, sv in Q_1.items()}
    return greedy_policy(Q), Q

4 x 4

In [60]:
env = utils.create_environment(slippery=False, big=False)
policy, Q = double_Q_learning(env, episodes=200, step_size=0.5, exploration_rate=0.2)
test_policy(policy, env)

1.0

In [61]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Right)
SFFF
FHFH
FFFH
HFF[41mG[0m


8 x 8

In [62]:
env = utils.create_environment(slippery=False, big=True)
policy, Q = double_Q_learning(env, episodes=2000, step_size=0.3, exploration_rate=0.2)
test_policy(policy, env)

1.0

In [63]:
run_game(env=env, policy=policy, display=True, should_return=False)

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFF[41mG[0m
