In [54]:
! jupyter nbconvert --to python Grid_World_Environment.ipynb

[NbConvertApp] Converting notebook Grid_World_Environment.ipynb to python
[NbConvertApp] Writing 3086 bytes to Grid_World_Environment.py


In [55]:
import numpy as np
from grid_world import standard_grid, ACTION_SPACE

In [56]:
SMALL_ENOUGH = 1e-3
GAMMA = 0.9

In [57]:
def print_values(V, g):
  for i in range(g.rows):
    print("---------------------------")
    for j in range(g.cols):
      v = V.get((i,j), 0)
      if v >= 0:
        print(" %.2f|" % v, end="")
      else:
        print("%.2f|" % v, end="") # -ve sign takes up an extra space
    print("")


def print_policy(P, g):
  for i in range(g.rows):
    print("---------------------------")
    for j in range(g.cols):
      a = P.get((i,j), ' ')
      print("  %s  |" % a, end="")
    print("")


In [58]:
def get_transition_probs_and_rewards(grid):
  ### define transition probabilities and grid ###
  # the key is (s, a, s'), the value is the probability
  # that is, transition_probs[(s, a, s')] = p(s' | s, a)
  # any key NOT present will considered to be impossible (i.e. probability 0)
  transition_probs = {}

  # to reduce the dimensionality of the dictionary, we'll use deterministic
  # rewards, r(s, a, s')
  # note: you could make it simpler by using r(s') since the reward doesn't
  # actually depend on (s, a)
  rewards = {}

  for i in range(grid.rows):
    for j in range(grid.cols):
      s = (i, j)
      if not grid.is_terminal(s):
        for a in ACTION_SPACE:
          s2 = grid.get_next_state(s, a)
          transition_probs[(s, a, s2)] = 1
          if s2 in grid.rewards:
            rewards[(s, a, s2)] = grid.rewards[s2]

  return transition_probs, rewards

In [59]:
def evaluate_deterministic_policy(grid, policy, initV=None):
  # initialize V(s) = 0
  if initV is None:
    V = {}
    for s in grid.all_states():
      V[s] = 0
  else:
    # it's faster to use the existing V(s) since the value won't change
    # that much from one policy to the next
    V = initV

  # repeat until convergence
  it = 0
  while True:
    biggest_change = 0
    for s in grid.all_states():
      if not grid.is_terminal(s):
        old_v = V[s]
        new_v = 0 # we will accumulate the answer
        for a in ACTION_SPACE:
          for s2 in grid.all_states():

            # action probability is deterministic
            action_prob = 1 if policy.get(s) == a else 0
            
            # reward is a function of (s, a, s'), 0 if not specified
            r = rewards.get((s, a, s2), 0)
            new_v += action_prob * transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

        # after done getting the new value, update the value table
        V[s] = new_v
        biggest_change = max(biggest_change, np.abs(old_v - V[s]))
    it += 1

    if biggest_change < SMALL_ENOUGH:
      break
  return V

In [60]:
grid = standard_grid()
transition_probs, rewards = get_transition_probs_and_rewards(grid)

# print rewards
print("rewards:")
print_values(grid.rewards, grid)

# state -> action
# we'll randomly choose an action and update as we learn
policy = {}
for s in grid.actions.keys():
  policy[s] = np.random.choice(ACTION_SPACE)

# initial policy
print("initial policy:")
print_policy(policy, grid)

rewards:
---------------------------
 0.00| 0.00| 0.00| 1.00|
---------------------------
 0.00| 0.00| 0.00|-1.00|
---------------------------
 0.00| 0.00| 0.00| 0.00|
initial policy:
---------------------------
  R  |  D  |  R  |     |
---------------------------
  R  |     |  L  |     |
---------------------------
  R  |  U  |  D  |  L  |


In [61]:
# repeat until convergence - will break out when policy does not change
V = None
it = 0
while True:

    # policy evaluation step - we already know how to do this!
    V = evaluate_deterministic_policy(grid, policy, initV=V)

    # policy improvement step
    is_policy_converged = True
    for s in grid.actions.keys():
        old_a = policy[s]
        new_a = None
        best_value = float('-inf')

        # loop through all possible actions to find the best current action
        for a in ACTION_SPACE:
            v = 0
            for s2 in grid.all_states():
                # reward is a function of (s, a, s'), 0 if not specified
                r = rewards.get((s, a, s2), 0)
                v += transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

            if v > best_value:
                best_value = v
                new_a = a

        # new_a now represents the best action in this state
        policy[s] = new_a
        if new_a != old_a:
            is_policy_converged = False
    if is_policy_converged:
        break
    
    # print the policy and values in each iteration
    print("Iteration:" + str(it))
    print("values:")
    print_values(V, grid)
    print("policy:")
    print_policy(policy, grid)
    print("************************************")
    it += 1
    




Iteration:0
values:
---------------------------
 0.00| 0.00| 1.00| 0.00|
---------------------------
 0.00| 0.00| 0.00| 0.00|
---------------------------
 0.00| 0.00| 0.00| 0.00|
policy:
---------------------------
  U  |  R  |  R  |     |
---------------------------
  U  |     |  U  |     |
---------------------------
  U  |  U  |  U  |  D  |
************************************
Iteration:1
values:
---------------------------
 0.00| 0.90| 1.00| 0.00|
---------------------------
 0.00| 0.00| 0.90| 0.00|
---------------------------
 0.00| 0.00| 0.81| 0.00|
policy:
---------------------------
  R  |  R  |  R  |     |
---------------------------
  U  |     |  U  |     |
---------------------------
  U  |  R  |  U  |  L  |
************************************
