In [1]:
class WindyGrid:
    def __init__(self, rows, cols, start):
        self.rows = rows
        self.cols = cols
        self.i = start[0]
        self.j = start[1]

    def set(self, rewards, actions, probs):
        # rewards should be a dict of: (i, j): r (row, col): reward
        # actions should be a dict of: (i, j): A (row, col): list of possible actions
        self.rewards = rewards
        self.actions = actions
        self.probs = probs

    def set_state(self, s):
        self.i = s[0]
        self.j = s[1]

    def current_state(self):
        return (self.i, self.j)

    def is_terminal(self, s):
        return s not in self.actions

    def move(self, action):
        s = (self.i, self.j)
        a = action

        next_state_probs = self.probs[(s, a)]
        next_states = list(next_state_probs.keys())
        next_probs = list(next_state_probs.values())
        next_state_idx = np.random.choice(len(next_states), p=next_probs)
        s2 = next_states[next_state_idx]

        # update the current state
        self.i, self.j = s2

        # return a reward (if any)
        return self.rewards.get(s2, 0)

    def game_over(self):
        # returns true if game is over, else false
        # true if we are in a state where no actions are possible
        return (self.i, self.j) not in self.actions

    def all_states(self):
        # possibly buggy but simple way to get all states
        # either a position that has possible next actions
        # or a position that yields a reward
        return set(self.actions.keys()) | set(self.rewards.keys())

In [2]:
def next_state_by_action(current_state, action):
    if action == 'U':
        return current_state[0]-1, current_state[1]
    if action == 'L':
        return current_state[0], current_state[1]-1
    if action == 'D':
        return current_state[0]+1, current_state[1]
    if action == 'R':
        return current_state[0], current_state[1]+1


def update_probs(probs, state, action, windy_states):
    i,j = state
    if ((i,j), action) in windy_states:
        next_actions = windy_states[((i,j), action)]
        next_states = [next_state_by_action((i,j), action) for action in next_actions]
        num_next_states = len(next_states)
        next_state_prob = 1/len(next_states)
        probs[((i,j), action)] = dict()
        for ns in next_states:
            probs[((i,j), action)][ns] = next_state_prob
    else:
        probs[((i,j), action)] = {next_state_by_action((i,j), action): 1}
    return probs


def rewards_actions_probs(g, terminal_states_rewards, walls, step_cost, windy_states):
    height, width = g.rows, g.cols
    rewards = dict()
    actions = dict()
    probs = dict()

    for i in range(height):
        for j in range(width):
            state_actions = []
            if (i,j) in walls:
                continue
            if (i,j) in terminal_states_rewards:
                rewards[(i,j)] = terminal_states_rewards[(i,j)]
            else:
                rewards[(i,j)] = step_cost
                if i > 0 and (i-1,j) not in walls:
                    state_actions.append('U')
                    probs = update_probs(probs, (i,j), 'U', windy_states)
                else:
                    probs[((i,j), 'U')] = {(i,j):1}
                if j > 0 and (i,j-1) not in walls:
                    state_actions.append('L')
                    probs = update_probs(probs, (i,j), 'L', windy_states)
                else:
                    probs[((i,j), 'L')] = {(i,j):1}
                if i < height-1 and (i+1,j) not in walls:
                    state_actions.append('D')
                    probs = update_probs(probs, (i,j), 'D', windy_states)
                else:
                    probs[((i,j), 'D')] = {(i,j):1}
                if j < width-1 and (i,j+1) not in walls:
                    state_actions.append('R')
                    probs = update_probs(probs, (i,j), 'R', windy_states)
                else:
                    probs[((i,j), 'R')] = {(i,j):1}
                actions[(i,j)] = state_actions
    return rewards, actions, probs




In [44]:
def windy_grid_penalized(step_cost=-0.1):
    
    # p(s' | s, a) represented as:
    # KEY: (s, a) --> VALUE: {s': p(s' | s, a)}
    height = 4
    width = 5

    start = (3,0)

    g = WindyGrid(height, width, start)

    terminal_states_rewards = {
        (0,4): 1,
        (1,4): -1
    }
    walls = [(1,1), (3,2), (2,1)]

    windy_states = {
        ((1,3), 'U'): ['U', 'R']
    }

    rewards, actions, probs = rewards_actions_probs(g, terminal_states_rewards, walls, step_cost, windy_states)
    g.set(rewards, actions, probs)
    return g

In [45]:
import numpy as np

SMALL_ENOUGH = 1e-3
GAMMA = 0.9

ACTION_SPACE = ['U', 'R', 'D', 'L']

def print_values(V, g):
    for i in range(g.rows):
        print("-"*10*g.cols)
        for j in range(g.cols):
            v = V.get((i,j), 0)
            if v >= 0:
                print("   %.2f  |" % v, end="")
            else:
                print("  %.2f  |" % v, end="") # -ve sign takes up an extra space
        print("")


def print_policy(P, g):
    for i in range(g.rows):
        print("-"*10*g.cols)
        for j in range(g.cols):
            a = P.get((i,j), ' ')
            print("    %s    |" % a, end="")
        print("")


# copied from iterative_policy_evaluation
def get_transition_probs_and_rewards(grid):
	### define transition probabilities and grid ###
	# the key is (s, a, s'), the value is the probability
	# that is, transition_probs[(s, a, s')] = p(s' | s, a)
	# any key NOT present will considered to be impossible (i.e. probability 0)
	transition_probs = {}

	# to reduce the dimensionality of the dictionary, we'll use deterministic
	# rewards, r(s, a, s')
	# note: you could make it simpler by using r(s') since the reward doesn't
	# actually depend on (s, a)
	rewards = {}

	for (s, a), v in grid.probs.items():
		for s2, p in v.items():
			transition_probs[(s, a, s2)] = p
			rewards[(s, a, s2)] = grid.rewards.get(s2, 0)

	return transition_probs, rewards


def evaluate_deterministic_policy(grid, policy, initV=None):
    # initialize V(s) = 0
    if initV is None:
        V = {}
        for s in grid.all_states():
            V[s] = 0
    else:
        # it's faster to use the existing V(s) since the value won't change
        # that much from one policy to the next
        V = initV

    # repeat until convergence
    it = 0
    while True:
        biggest_change = 0
        for s in grid.all_states():
            if not grid.is_terminal(s):
                old_v = V[s]
                new_v = 0 # we will accumulate the answer
                for a in ACTION_SPACE:
                    for s2 in grid.all_states():

                        # action probability is deterministic
                        action_prob = 1 if policy.get(s) == a else 0
                        
                        # reward is a function of (s, a, s'), 0 if not specified
                        r = rewards.get((s, a, s2), 0)
                        new_v += action_prob * transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

                # after done getting the new value, update the value table
                V[s] = new_v
                biggest_change = max(biggest_change, np.abs(old_v - V[s]))
        it += 1

        if biggest_change < SMALL_ENOUGH:
            break
    return V


if __name__ == '__main__':

    grid = windy_grid_penalized(-0.1)
    # grid = windy_grid()
    transition_probs, rewards = get_transition_probs_and_rewards(grid)

    # print rewards
    print("rewards:")
    print_values(grid.rewards, grid)

    # state -> action
    # we'll randomly choose an action and update as we learn
    policy = {}
    for s in grid.actions.keys():
        policy[s] = np.random.choice(ACTION_SPACE)

    # initial policy
    print("initial policy:")
    print_policy(policy, grid)

    # repeat until convergence - will break out when policy does not change
    V = None
    while True:

        # policy evaluation step - we already know how to do this!
        V = evaluate_deterministic_policy(grid, policy, initV=V)

        # policy improvement step
        is_policy_converged = True
        for s in grid.actions.keys():
            old_a = policy[s]
            new_a = None
            best_value = float('-inf')

            # loop through all possible actions to find the best current action
            for a in ACTION_SPACE:
                v = 0
                for s2 in grid.all_states():
                    # reward is a function of (s, a, s'), 0 if not specified
                    r = rewards.get((s, a, s2), 0)
                    v += transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

                if v > best_value:
                    best_value = v
                    new_a = a

            # new_a now represents the best action in this state
            policy[s] = new_a
            if new_a != old_a:
                is_policy_converged = False

        if is_policy_converged:
            break

    # once we're done, print the final policy and values
    print("values:")
    print_values(V, grid)
    print("policy:")
    print_policy(policy, grid)

rewards:
--------------------------------------------------
  -0.10  |  -0.10  |  -0.10  |  -0.10  |   1.00  |
--------------------------------------------------
  -0.10  |   0.00  |  -0.10  |  -0.10  |  -1.00  |
--------------------------------------------------
  -0.10  |   0.00  |  -0.10  |  -0.10  |  -0.10  |
--------------------------------------------------
  -0.10  |  -0.10  |   0.00  |  -0.10  |  -0.10  |
initial policy:
--------------------------------------------------
    L    |    U    |    D    |    R    |         |
--------------------------------------------------
    R    |         |    R    |    L    |         |
--------------------------------------------------
    D    |         |    D    |    L    |    U    |
--------------------------------------------------
    U    |    D    |         |    L    |    U    |
values:
--------------------------------------------------
   0.46  |   0.62  |   0.80  |   1.00  |   0.00  |
-------------------------------------------------

In [13]:
rewards

{(0, 0): -0.1,
 (0, 1): -0.1,
 (0, 2): -0.1,
 (0, 3): 1,
 (1, 0): -0.1,
 (1, 2): -0.1,
 (1, 3): -1,
 (2, 0): -0.1,
 (2, 1): -0.1,
 (2, 2): -0.1,
 (2, 3): -0.1}

In [15]:
len(probs)

36