[View in Colaboratory](https://colab.research.google.com/github/aztecman/Move37/blob/master/Gridworld_via_Colin_Skow.ipynb)

In [0]:
# From The School of AI's Move 37 Course https://www.theschool.ai/courses/move-37-course/
# Coding demo by Colin Skow
# Forked from https://github.com/lazyprogrammer/machine_learning_examples/tree/master/rl
# Credit goes to LazyProgrammer

import numpy as np


class Grid: # Environment
  def __init__(self, width, height, start):
    # i is vertical axis, j is horizontal
    self.width = width
    self.height = height
    self.i = start[0]
    self.j = start[1]

  def set(self, rewards, actions, obey_prob):
    # rewards should be a dict of: (i, j): r (row, col): reward
    # actions should be a dict of: (i, j): A (row, col): list of possible actions
    self.rewards = rewards
    self.actions = actions
    self.obey_prob = obey_prob

  def non_terminal_states(self):
    return self.actions.keys()

  def set_state(self, s):
    self.i = s[0]
    self.j = s[1]

  def current_state(self):
    return (self.i, self.j)

  def is_terminal(self, s):
    return s not in self.actions

  def check_move(self, action):
    i = self.i
    j = self.j
    # check if legal move first
    if action in self.actions[(self.i, self.j)]:
      if action == 'U':
        i -= 1
      elif action == 'D':
        i += 1
      elif action == 'R':
        j += 1
      elif action == 'L':
        j -= 1
    # return a reward (if any)
    reward = self.rewards.get((i, j), 0)
    return ((i, j), reward)

  def get_transition_probs(self, action):
    # returns a list of (probability, reward, s') transition tuples
    probs = []
    state, reward = self.check_move(action)
    probs.append((self.obey_prob, reward, state))
    disobey_prob = 1 - self.obey_prob
    if not (disobey_prob > 0.0):
      return probs
    if action == 'U' or action == 'D':
      state, reward = self.check_move('L')
      probs.append((disobey_prob / 2, reward, state))
      state, reward = self.check_move('R')
      probs.append((disobey_prob / 2, reward, state))
    elif action == 'L' or action == 'R':
      state, reward = self.check_move('U')
      probs.append((disobey_prob / 2, reward, state))
      state, reward = self.check_move('D')
      probs.append((disobey_prob / 2, reward, state))
    return probs

  def game_over(self):
    # returns true if game is over, else false
    # true if we are in a state where no actions are possible
    return (self.i, self.j) not in self.actions

  def all_states(self):
    # possibly buggy but simple way to get all states
    # either a position that has possible next actions
    # or a position that yields a reward
    return set(self.actions.keys()) | set(self.rewards.keys())


def standard_grid(obey_prob=1.0, step_cost=None):
  # define a grid that describes the reward for arriving at each state
  # and possible actions at each state
  # the grid looks like this
  # x means you can't go there
  # s means start position
  # number means reward at that state
  # .  .  .  1
  # .  x  . -1
  # s  .  .  .
  # obey_brob (float): the probability of obeying the command
  # step_cost (float): a penalty applied each step to minimize the number of moves (-0.1)
  g = Grid(3, 4, (2, 0))
  rewards = {(0, 3): 1, (1, 3): -1}
  actions = {
    (0, 0): ('D', 'R'),
    (0, 1): ('L', 'R'),
    (0, 2): ('L', 'D', 'R'),
    (1, 0): ('U', 'D'),
    (1, 2): ('U', 'D', 'R'),
    (2, 0): ('U', 'R'),
    (2, 1): ('L', 'R'),
    (2, 2): ('L', 'R', 'U'),
    (2, 3): ('L', 'U'),
  }
  g.set(rewards, actions, obey_prob)
  if step_cost is not None:
    g.rewards.update({
      (0, 0): step_cost,
      (0, 1): step_cost,
      (0, 2): step_cost,
      (1, 0): step_cost,
      (1, 2): step_cost,
      (2, 0): step_cost,
      (2, 1): step_cost,
      (2, 2): step_cost,
      (2, 3): step_cost,
    })
  return g

In [0]:
def print_values(V, g):
  for i in range(g.width):
    print("---------------------------")
    for j in range(g.height):
      v = V.get((i,j), 0)
      if v >= 0:
        print(" %.2f|" % v, end="")
      else:
        print("%.2f|" % v, end="") # -ve sign takes up an extra space
    print("")

def print_policy(P, g):
  for i in range(g.width):
    print("---------------------------")
    for j in range(g.height):
      a = P.get((i,j), ' ')
      print("  %s  |" % a, end="")
    print("")

In [3]:
# SMALL_ENOUGH is referred to by the mathematical symbol theta in equations
SMALL_ENOUGH = 1e-3
GAMMA = 0.9
ALL_POSSIBLE_ACTIONS = ('U', 'D', 'L', 'R')

def best_action_value(grid, V, s):
  # finds the highest value action (max_a) from state s, returns the action and value
  best_a = None
  best_value = float('-inf')
  grid.set_state(s)
  # loop through all possible actions to find the best current action
  for a in ALL_POSSIBLE_ACTIONS:
    transititions = grid.get_transition_probs(a)
    expected_v = 0
    expected_r = 0
    for (prob, r, state_prime) in transititions:
      expected_r += prob * r
      expected_v += prob * V[state_prime]
    v = expected_r + GAMMA * expected_v
    if v > best_value:
      best_value = v
      best_a = a
  return best_a, best_value

def calculate_values(grid):
  # initialize V(s)
  V = {}
  states = grid.all_states()
  for s in states:
    V[s] = 0
  # repeat until convergence
  # V[s] = max[a]{ sum[s',r] { p(s',r|s,a)[r + gamma*V[s']] } }
  while True:
    # biggest_change is referred to by the mathematical symbol delta in equations
    biggest_change = 0
    for s in grid.non_terminal_states():
      old_v = V[s]
      _, new_v = best_action_value(grid, V, s)
      V[s] = new_v
      biggest_change = max(biggest_change, np.abs(old_v - new_v))

    if biggest_change < SMALL_ENOUGH:
      break
  return V

def initialize_random_policy():
  # policy is a lookup table for state -> action
  # we'll randomly choose an action and update as we learn
  policy = {}
  for s in grid.non_terminal_states():
    policy[s] = np.random.choice(ALL_POSSIBLE_ACTIONS)
  return policy

def calculate_greedy_policy(grid, V):
  policy = initialize_random_policy()
  # find a policy that leads to optimal value function
  for s in policy.keys():
    grid.set_state(s)
    # loop through all possible actions to find the best current action
    best_a, _ = best_action_value(grid, V, s)
    policy[s] = best_a
  return policy


# this grid gives you a reward of -0.1 for every non-terminal state
# we want to see if this will encourage finding a shorter path to the goal
grid = standard_grid(obey_prob=1.0, step_cost=None)

# print rewards
print("rewards:")
print_values(grid.rewards, grid)

# calculate accurate values for each square
V = calculate_values(grid)

# calculate the optimum policy based on our values
policy = calculate_greedy_policy(grid, V)

# our goal here is to verify that we get the same answer as with policy iteration
print("values:")
print_values(V, grid)
print("policy:")
print_policy(policy, grid)


rewards:
---------------------------
 0.00| 0.00| 0.00| 1.00|
---------------------------
 0.00| 0.00| 0.00|-1.00|
---------------------------
 0.00| 0.00| 0.00| 0.00|
values:
---------------------------
 0.81| 0.90| 1.00| 0.00|
---------------------------
 0.73| 0.00| 0.90| 0.00|
---------------------------
 0.66| 0.73| 0.81| 0.73|
policy:
---------------------------
  R  |  R  |  R  |     |
---------------------------
  U  |     |  U  |     |
---------------------------
  U  |  R  |  U  |  L  |
