In [1]:
import gym
import numpy as np
from gym.envs.toy_text.frozen_lake import generate_random_map

### Define environment and Constant

In [3]:
GAMMA = 1
# customize rewards for training
ACTION_REWARD = 0.0
SUCCESS_REWARD = 1.0
HOLD_REWARD = -0.5
MAX_STEPS = 100  # max step limitation of game

def make_env(size=4, random=False, is_slippery=True):
    """help function to generate environment"""
    if random:
        desc = generate_random_map(size=size)
    else:
        desc = None
    map_name = f'{size}x{size}'
    env = gym.make("FrozenLake-v1", desc=desc, map_name=map_name, is_slippery=is_slippery)
    return env

SIZE = 8
ENV = make_env(size=SIZE, is_slippery=True, random=False)
MAP = ENV.desc

### Define Help functions

In [4]:
def state_to_idx(s):
    """ state -> row, column """
    return s // SIZE, s % SIZE

def idx_to_state(row, column):
    """ row, column -> state """
    return row * SIZE + column

def is_end(s):
    """verify state is end or not: in a hole or in a gain"""
    idx = state_to_idx(s)
    return MAP[idx] == b"H" or MAP[idx] == b"G"

def s_rewards(s):
    """Customize State Reword"""
    idx = state_to_idx(s)
    if MAP[idx] == b"H":
        return HOLD_REWARD
    elif MAP[idx] == b"G":
        return SUCCESS_REWARD
    return 0.0

In [5]:
def get_next_states(s, a):
    """Get next states and its percentage according to the current state and actions."""
    if is_end(s):
        return []

    row, column = state_to_idx(s)
    if a == 0:  # left
        next_idx_list = [
            ((row, column - 1), 1 / 3),  # left
            ((row - 1, column), 1 / 3),  # top
            ((row + 1, column), 1 / 3),  # down
        ]
    elif a == 1:  # down
        next_idx_list = [
            ((row, column - 1), 1 / 3),  # left
            ((row, column + 1), 1 / 3),  # right
            ((row + 1, column), 1 / 3),  # down
        ]
    elif a == 2:  # right
        next_idx_list = [
            ((row, column + 1), 1 / 3),  # right
            ((row - 1, column), 1 / 3),  # top
            ((row + 1, column), 1 / 3),  # down
        ]
    elif a == 3:  # top
        next_idx_list = [
            ((row, column - 1), 1 / 3),  # left
            ((row, column + 1), 1 / 3),  # right
            ((row - 1, column), 1 / 3),  # top
        ]
    else:
        raise ValueError()
    next_states = []
    stay_p = 0  # stay in the same block
    for (row, column), p in next_idx_list:
        if row < 0 or row >= SIZE or column < 0 or column >= SIZE:
            stay_p += p
            continue
        next_states.append((idx_to_state(row, column), p))
    next_states.append((s, stay_p))
    return next_states

In [6]:
def get_available_actions(s):
    """Get available actions for the current state"""
    if is_end(s):
        return []
    else:
        return [
            (0, 1 / 4),
            (1, 1 / 4),
            (2, 1 / 4),
            (3, 1 / 4),
        ]

### Train functions

In [8]:
def train_V():
    """Train V table
    
    Important: In our model the number of states is not SIZE * SIZE, because the game has limited steps (<=100), so it's (SIZE * SIZE * (MAX_STEPS + 1)), 
    
    """
    v_current = np.zeros((MAX_STEPS + 1, SIZE * SIZE))  # v_current[i, j] means the V in grad j when it remains i steps.

    # initialize the v when there's 0 step remain
    for s in range(SIZE * SIZE):
        v_current[0, s] = s_rewards(s)

    iter = 0
    while True:
        iter += 1
        v_next = np.zeros_like(v_current)
        v_next[0] = v_current[0]  # inherent the value of 0 step remain

        for step_limit in range(1, MAX_STEPS+1):
            for s in range(SIZE * SIZE):
                # update value when step_limit steps remain in grid s
                v_list = [v_current[step_limit-1, s]]  # inherit from step_limit-1 steps remain in grid s
                for a, _ in get_available_actions(s):
                    g_a = ACTION_REWARD  # evaluate gain after take `a` step.
                    for next_s, next_p in get_next_states(s, a):
                        g_a += GAMMA * v_current[step_limit-1, next_s] * next_p
                    v_list.append(g_a)
                v_next[step_limit, s] = max(v_list)
        if np.allclose(v_current, v_next):
            # print(f"Trained: {iter}")
            break
        v_current = v_next.copy()
    return v_current

In [9]:
def train_Q(v):
    """Train Q table"""
    q = np.zeros((MAX_STEPS + 1, SIZE * SIZE, 4))
    for step_limit in range(1, MAX_STEPS+1):
        for s in range(SIZE * SIZE):
            for a, _ in get_available_actions(s):
                q_sum = ACTION_REWARD
                for next_s, next_p in get_next_states(s, a):
                    q_sum += GAMMA * v[step_limit-1, next_s] * next_p
                q[step_limit, s, a] = q_sum
    return q

In [10]:
def get_action(step_limit, s, q):
    """Get action for state s according to Q table, when step_limit steps remain"""
    a = np.argmax(q[min(step_limit, MAX_STEPS), s])
    return a


def print_V(v):
    """Print V Table"""
    for step_limit in range(MAX_STEPS + 1):
        print(f"Step Limits: {step_limit}")
        for row in range(SIZE):
            idx_list = []
            for column in range(SIZE):                    
                idx_list.append(idx_to_state(row, column))
            print(v[step_limit, idx_list])


def print_Q(q):
    """Print Q Table"""
    for step_limit in range(MAX_STEPS + 1):
        if step_limit < 10 or step_limit == MAX_STEPS:
            print(f"Step Limits: {step_limit}")
        for row in range(SIZE):
            a_list = []
            for column in range(SIZE):
                s = idx_to_state(row, column)
                a_list.append(np.argmax(q[step_limit, s]))
            if step_limit < 10 or step_limit == MAX_STEPS:
                print(a_list)

In [11]:
## train V table
V = train_V()
print_V(V)

Step Limits: 0
[0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0.]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.   0.   0.   0.   0.  -0.5  0.   0. ]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.  -0.5 -0.5  0.   0.   0.  -0.5  0. ]
[ 0.  -0.5  0.   0.  -0.5  0.  -0.5  0. ]
[ 0.   0.   0.  -0.5  0.   0.   0.   1. ]
Step Limits: 1
[0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0.]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.   0.   0.   0.   0.  -0.5  0.   0. ]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.  -0.5 -0.5  0.   0.   0.  -0.5  0. ]
[ 0.         -0.5         0.          0.         -0.5         0.
 -0.5         0.33333333]
[ 0.          0.          0.         -0.5         0.          0.
  0.33333333  1.        ]
Step Limits: 2
[0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0.]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.   0.   0.   0.   0.  -0.5  0.   0. ]
[ 0.   0.   0.  -0.5  0.   0.   0.   0. ]
[ 0.         -0.5        -0.5         0.          0.          0.
 -0.5 

In [12]:
# Train Q table
Q = train_Q(V)
print_Q(Q)

Step Limits: 0
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0]
Step Limits: 1
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 3, 0, 0, 0, 0]
[0, 0, 0, 0, 2, 3, 0, 0]
[0, 0, 0, 1, 0, 0, 2, 0]
[0, 3, 0, 0, 2, 1, 3, 0]
[0, 0, 0, 1, 3, 0, 0, 2]
[0, 0, 1, 0, 0, 0, 0, 2]
[0, 1, 0, 0, 1, 0, 1, 0]
Step Limits: 2
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 3, 0, 0, 0, 0]
[0, 0, 0, 0, 2, 3, 0, 0]
[0, 0, 0, 1, 0, 0, 2, 0]
[0, 3, 0, 0, 2, 1, 3, 0]
[0, 0, 0, 1, 3, 0, 0, 2]
[0, 0, 1, 0, 0, 0, 0, 2]
[0, 1, 0, 0, 1, 1, 1, 0]
Step Limits: 3
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 3, 0, 0, 0, 0]
[0, 0, 0, 0, 2, 3, 0, 0]
[0, 0, 0, 1, 0, 0, 2, 0]
[0, 3, 0, 0, 2, 1, 3, 0]
[0, 0, 0, 1, 3, 0, 0, 2]
[0, 0, 1, 0, 0, 0, 0, 2]
[0, 1, 0, 0, 1, 1, 1, 0]
Step Limits: 4
[0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 3, 0, 0, 0, 0]
[0, 0, 0, 0, 2, 3, 0, 0]
[0, 0, 0, 1, 0, 0, 2, 0]
[0, 3, 0, 0, 2, 1, 3, 1]


## Evaluate

In [13]:
total_gain = 0
count = 10000
for iter in range(count):
    state = ENV.reset()
    # ENV.render()
    done = False
    gain = 0
    remain_steps = 100
    while not done:
        action = get_action(remain_steps, state, Q)
        state, reward, done, info = ENV.step(action)
        idx = state_to_idx(state)
        # print(f"-> ({idx[0]}, {idx[1]})")
        gain += reward
        # ENV.render()
        remain_steps -= 1
    # if not gain:
        # ENV.render()
        # print(steps)
        # break
    total_gain += gain
    if iter % 100 == 0:
        print(f"{iter}: {total_gain/(iter+1)}")
print(total_gain/count)

0: 1.0
100: 0.6336633663366337
200: 0.6616915422885572
300: 0.6611295681063123
400: 0.655860349127182
500: 0.654690618762475
600: 0.6522462562396006
700: 0.6676176890156919
800: 0.66541822721598
900: 0.6692563817980022
1000: 0.6683316683316683
1100: 0.670299727520436
1200: 0.668609492089925
1300: 0.6671790930053805
1400: 0.668807994289793
1500: 0.6662225183211192
1600: 0.6689569019362899
1700: 0.6690182245737801
1800: 0.6629650194336479
1900: 0.6596528143082588
2000: 0.6536731634182908
2100: 0.6544502617801047
2200: 0.651976374375284
2300: 0.6492829204693612
2400: 0.6518117451062058
2500: 0.6541383446621352
2600: 0.6528258362168397
2700: 0.6508700481303221
2800: 0.6515530167797215
2900: 0.6483971044467425
3000: 0.6434521826057981
3100: 0.6449532408900355
3200: 0.6426116838487973
3300: 0.6431384428960921
3400: 0.6421640693913555
3500: 0.6443873179091688
3600: 0.6425992779783394
3700: 0.643339637935693
3800: 0.6427255985267035
3900: 0.6403486285567803
4000: 0.6383404148962759
4100: 0.638