In [267]:
import numpy as np
import grid_world as gw
import typing

In [268]:
def print_V(V:typing.Tuple, width:int, height:int):
    for i in range(width):
        line = ""
        for j in range(height):
            index = (i, j)
            if index in V:
                val = V[index]
                if val >= 0:
                    line += "| %1.2f" % val
                else:
                    line += "|%1.2f" % val

            else:
                line += '|' + " 0.00"
        print(line+'|')
        print('-------------------------')

In [269]:
def print_policy(grid:gw.Grid, p:typing.Callable[[typing.Tuple, any], typing.List[typing.Tuple[float,any]]], width:int, height:int):
    for i in range(width):
        line = ""
        for j in range(height):
            index = (i, j)
            if index in grid.actions:
                if type(p) == type({}):
                    actions = [(1., p[index])]
                else:
                    actions = p(index, grid)
                highest_prob = 0.
                action_to_take = None

                for a in actions:
                    prob, action = a
                    if prob > highest_prob:
                        highest_prob = prob
                        action_to_take = action

                line += '|  ' + str(action_to_take) + "  "
            else:
                line += '|  ' + "X" + "  "
        print(line+'|')
        print('-------------------------')

In [395]:
class Policy():
        
        ACTIONS = ['L', 'R', 'U', 'D']
        np.random.seed()
        policy = {
            (2, 0): np.random.choice(ACTIONS),
            (1, 0): np.random.choice(ACTIONS),
            (0, 0): np.random.choice(ACTIONS),
            (0, 1): np.random.choice(ACTIONS),
            (0, 2): np.random.choice(ACTIONS),
            (1, 2): np.random.choice(ACTIONS),
            (2, 1): np.random.choice(ACTIONS),
            (2, 2): np.random.choice(ACTIONS),
            (2, 3): np.random.choice(ACTIONS),
            (0, 3):None,
            (1, 1):None,
            (1, 3):None,
        }
        verbose = False
        
        def __init__(self, epsilon=1.0):
            self.epsilon = epsilon
            pass
        
        def set_verbose(self, verbose:bool):
            self.verbose = verbose

        def __call__(self, s:any, env:any) -> typing.List[typing.Tuple[float, any]]:
            if np.random.rand() < self.epsilon:
                # on policy
                action = self.policy[s]
            else:
                # off policy
                action = np.random.choice(self.ACTIONS)
                if self.verbose:
                    print("Due to randomness, I take move {} instead of {}".format(action, self.policy[s]))
                
            return [(1., action)]

In [481]:
def play_gridworld_episode(grid:gw.Grid, policy:typing.Callable):
    in_terminal_state = False
    start_states = [*grid.actions.keys()]
    curr_state = start_states[np.random.choice(len(start_states))]
    
    queue = [(0, curr_state)]
    max_steps = (grid.width * grid.height) ** 5 #otherwise we could run for almost ever if unlucky
    step = 0
    
    grid.set_state(curr_state)
    while not grid.game_over() and step < max_steps:
        step += 1
        action = policy(curr_state, grid)[0][1]
        r = grid.move(action)
        curr_state = grid.current_state()
        queue.append((r, curr_state))
        
    return queue

In [482]:
p = Policy(0.5)
p.set_verbose(True)
trace = play_gridworld_episode(grid=gw.standard_grid(), policy=p)
print(trace[-1])

(1, (0, 3))


In [486]:
def play_game(grid:gw.Grid, policy:typing.Callable, rounds:int=1000):
    V = {k: 0 for k in grid.all_states()}
    
    for i in range(rounds):
        policy.epsilon = 1 - 0.90 ** i # some exploration
        trace = play_gridworld_episode(grid, policy=policy)
        rewards = {k: [] for k, v in V.items()}
        
        # get mean of immidiate rewards for each state visited
        for r, s in reversed(trace):
            rewards[s].append(r)
        
        # rolling mean would avoid that, but meh
        rewards = {k: np.sum(v) / len(v) for k, v in rewards.items()  if len(v) > 0}
        
        # calculate returns     
        returns = []
        G = 0
        gamma = .9
        for r, s in reversed(trace):
            returns.append((G, s))
            G = rewards[s] + gamma * G
        
        # average returns also
        rewards_per_state =  {k: [] for k, v in rewards.items()}
        for r, s in returns:
            rewards_per_state[s].append(r)
        
        # rolling mean would avoid that also, but meh meh
        rewards_per_state = {k: np.sum(v) / len(v) for k, v in rewards_per_state.items()}
        #print(rewards_per_state)

        # update V
        lr = 0.1
        for state, reward in rewards_per_state.items():
            # print(state, V[state], rewards_per_state[state])
            V[state] = V[state] + lr * (r + rewards_per_state[state] - V[state])
        
        print_V(V=V, width=grid.width, height=grid.height)
        
        # update policy
        for s in grid.actions.keys():
            best_val = float("-inf")
            best_action = None
            for action in grid.actions[s]:
                grid.set_state(s)
                r = grid.move(action, force=True)
                state = grid.current_state()
                val = V[state] + r
                if val > best_val:
                    best_val = val
                    best_action = action
                    
            policy.policy[s] = best_action
        
        print_policy(grid=grid, width=grid.width, height=grid.height, p=policy)
    return policy

In [487]:
play_game(grid=gw.standard_grid(windy=True), policy=Policy(), rounds=100)

Due to wind I switched action R to L
Due to wind I switched action D to R
Due to wind I switched action R to U
Due to wind I switched action U to R
| 0.03| 0.03| 0.07| 0.01|
-------------------------
| 0.02| 0.00| 0.08| 0.00|
-------------------------
| 0.02| 0.02| 0.01| 0.01|
-------------------------
|  L  |  D  |  D  |  X  |
-------------------------
|  U  |  X  |  U  |  X  |
-------------------------
|  D  |  L  |  U  |  D  |
-------------------------
Due to wind I switched action L to D
Due to wind I switched action L to D
Due to wind I switched action R to D
Due to wind I switched action U to R
Due to wind I switched action D to L
Due to wind I switched action R to U
Due to wind I switched action R to L
| 0.08| 0.11| 0.17| 0.01|
-------------------------
| 0.05| 0.00| 0.08| 0.00|
-------------------------
| 0.04| 0.03| 0.04| 0.01|
-------------------------
|  D  |  L  |  R  |  X  |
-------------------------
|  D  |  X  |  L  |  X  |
-------------------------
|  L  |  D  |  U  |  

<__main__.Policy at 0x1930a1cbcf8>

In [488]:
play_game(grid=gw.negative_grid(windy=True), policy=Policy(), rounds=100)

Due to wind I switched action D to R
Due to wind I switched action R to U
| 0.06| 0.03| 0.03|-0.01|
-------------------------
| 0.00| 0.00| 0.00| 0.00|
-------------------------
| 0.00| 0.00| 0.00| 0.00|
-------------------------
|  L  |  D  |  L  |  X  |
-------------------------
|  D  |  X  |  R  |  X  |
-------------------------
|  L  |  D  |  L  |  L  |
-------------------------
Due to wind I switched action D to L
Due to wind I switched action L to U
Due to wind I switched action D to L
Due to wind I switched action L to U
|-0.14|-0.17|-0.17|-0.01|
-------------------------
|-0.19| 0.00|-0.20|-0.10|
-------------------------
|-0.19|-0.19|-0.20| 0.00|
-------------------------
|  L  |  U  |  R  |  X  |
-------------------------
|  D  |  X  |  L  |  X  |
-------------------------
|  L  |  L  |  L  |  L  |
-------------------------
|-0.14|-0.17|-0.00| 0.06|
-------------------------
|-0.19| 0.00|-0.20|-0.10|
-------------------------
|-0.19|-0.19|-0.20| 0.00|
------------------------

<__main__.Policy at 0x1937fe9bd68>