In [1]:
import sys
import termcolor
import numpy as np
import pandas as pd
from IPython.display import display, clear_output


class FrozenLake:
    FROZEN, BLOCKED, HOLE, GOAL = range(4)
    TILE_NAMES = ["F", " ", "H", "G"]
    TILE_COLORS = ["grey", "white", "red", "green"]
    LEFT, TOP, RIGHT, BOTTOM = range(4)
    ACTIONS = [LEFT, TOP, RIGHT, BOTTOM]
    ACTION_MOVES = np.array([0, -1]), np.array([-1, 0]), np.array([0, 1]), np.array([1, 0])
    ACTION_NAMES = ["LEFT", "TOP", "RIGHT", "BOTTOM"]
    
    def __init__(self, grid, starting_pos, error_proba=0):
        self.grid = np.array(grid)
        self.states = list(range(self.grid.size))
        self.actions = self.ACTIONS
        self.starting_pos = np.array(starting_pos)
        self.error_proba = error_proba
        assert 0 <= self.error_proba <= 1
        self.reset()
    
    def state(self, pos=None):
        y, x = pos if pos is not None else self.pos 
        return y * len(self.grid[0]) + x
    
    def state_to_pos(self, state):
        y = state // len(self.grid[0])
        x = state % len(self.grid[0])
        return np.array([y, x])
        
    def reset(self, pos=None, state=None):
        if pos is None and state is None:
            pos = self.starting_pos
        elif state is not None:
            pos = self.state_to_pos(state)
        assert self.is_pos_valid(pos)
        self.pos = pos
        return self.state()

    def tile(self, pos=None):
        if pos is None:
            pos = self.pos
        y, x = pos
        return self.grid[y, x]
    
    def is_state_valid(self, state):
        return self.is_pos_valid(self.state_to_pos(state))
    
    def is_pos_valid(self, pos):
        y, x = pos
        if x < 0 or y < 0:
            return False
        try:
            tile = self.tile(pos=pos)
        except IndexError:
            return False
        return tile != self.BLOCKED
    
    def is_terminate_pos(self, pos):
        tile = self.tile(pos=pos)
        return tile in (self.GOAL, self.HOLE)
    
    def is_terminate_state(self, state):
        return self.is_terminate_pos(self.state_to_pos(state))

    def _reward(self, pos=None):
        tile = self.tile(pos=pos)
        if tile == self.GOAL:
            return 1
        elif tile == self.HOLE:
            return -1
        return 0

    def _perturbate_action(self, action):
        p = np.random.rand()
        if p < self.error_proba:
            return action
        if p < self.error_proba + (1-self.error_proba)/2:
            return self.ACTIONS[action-1]
        return self.ACTIONS[(action+1)%len(self.ACTIONS)]
    
    def step(self, action):
        if type(action) == str:
            action = self.ACTIONS[self.ACTION_NAMES.index(action)]
        pos = self.pos + self.ACTION_MOVES[self._perturbate_action(action)]
        if self.is_pos_valid(pos):
            self.pos = pos
        return self.state(), self._reward(), self.tile() in (self.GOAL, self.HOLE)

    def plot(self):
        print("")
        y, x = self.pos
        for i, _ in enumerate(self.grid):
            for j, _ in enumerate(self.grid[0]):
                if i == y and j == x:
                    termcolor.cprint("X", "blue", end="")
                else:
                    tile = self.tile([i, j])
                    termcolor.cprint(self.TILE_NAMES[tile], self.TILE_COLORS[tile], end="")
            print("")
        print("")

In [7]:
env = FrozenLake([
    [FrozenLake.FROZEN, FrozenLake.FROZEN, FrozenLake.FROZEN, FrozenLake.GOAL],
    [FrozenLake.FROZEN, FrozenLake.BLOCKED, FrozenLake.FROZEN, FrozenLake.HOLE],
    [FrozenLake.FROZEN, FrozenLake.BLOCKED, FrozenLake.BLOCKED, FrozenLake.FROZEN],
    [FrozenLake.FROZEN, FrozenLake.FROZEN, FrozenLake.FROZEN, FrozenLake.FROZEN],
], [3, 0], error_proba=0.8)
env.plot()
state, reward, done = env.step(env.RIGHT)


[30mF[0m[30mF[0m[30mF[0m[32mG[0m
[30mF[0m[37m [0m[30mF[0m[31mH[0m
[30mF[0m[37m [0m[37m [0m[30mF[0m
[34mX[0m[30mF[0m[30mF[0m[30mF[0m



In [8]:
def train_tabular(Q, env, episodes=2000, max_steps=200, discount_factor=0.9, lr=0.618, cb_episode=None):
    for episode in range(episodes):
        for state in env.states:
            if not env.is_state_valid(state) or env.is_terminate_state(state):
                continue
            for action in env.actions:
                env.reset(state=state)
                next_state, reward, _ = env.step(action)
                Q.iloc[state, action] = (1-lr) * Q.iloc[state, action] + lr * (reward + discount_factor * Q.loc[next_state].max())
        if not episode % 200:
            print('Episode:', episode, '/', episodes)
    
def init_Q_zeros(env):
    return pd.DataFrame(np.zeros((env.grid.size, len(env.ACTIONS))), columns=FrozenLake.ACTION_NAMES)

def plot_policy(env, Q):
    return(np.array([
        [
            Q.iloc[env.state(pos=(i, j))].idxmax() if env.is_pos_valid((i, j)) and not env.is_terminate_pos((i, j)) else ''
            for j in range(env.grid.shape[1])
        ]
        for i in range(env.grid.shape[0])
    ]))

In [9]:
Q = init_Q_zeros(env)
train_tabular(Q, env)
Q

Episode: 0 / 2000
Episode: 200 / 2000
Episode: 400 / 2000
Episode: 600 / 2000
Episode: 800 / 2000
Episode: 1000 / 2000
Episode: 1200 / 2000
Episode: 1400 / 2000
Episode: 1600 / 2000
Episode: 1800 / 2000


Unnamed: 0,LEFT,TOP,RIGHT,BOTTOM
0,0.645323,0.647484,0.731115,0.59993
1,0.655599,0.753016,0.88029,0.77876
2,0.77782,0.890933,0.998679,0.642683
3,0.0,0.0,0.0,0.0
4,0.593492,0.653152,0.642869,0.552852
5,0.0,0.0,0.0,0.0
6,0.817224,0.664035,-0.54264,0.690885
7,0.0,0.0,0.0,0.0
8,0.478448,0.589981,0.533566,0.448868
9,0.0,0.0,0.0,0.0


In [11]:
import pandas as pd

df = pd.DataFrame(plot_policy(env, Q))
df

Unnamed: 0,0,1,2,3
0,RIGHT,RIGHT,RIGHT,
1,TOP,,LEFT,
2,TOP,,,BOTTOM
3,TOP,LEFT,LEFT,LEFT
