<a href="https://colab.research.google.com/github/cyrilgabriele/RL/blob/main/Lab09/Lab9_Off_Policy_MC_V2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Monte Carlo with Q-Function (Action-Value Fucntion Q(s,a)) as State Value Function**


In [69]:
from collections import defaultdict, namedtuple
from enum import Enum
from typing import Tuple, List
import random
from IPython.display import clear_output
import copy
import time
import math
import numpy as np

In [70]:
Point = namedtuple('Point', ['x', 'y'])
class Direction(Enum):
  NORTH = "⬆"
  EAST = "⮕"
  SOUTH = "⬇"
  WEST = "⬅"

  @classmethod
  def values(self):
    return [v for v in self]


In [71]:
# this is our environment => like self.env = gym.make('CartPole-v0') from Lab02
class SimpleGridWorld(object):

  def __init__(self, width: int = 5, height: int = 5, debug: bool = False):
    print("This is our environment")
    self.width = width
    self.height = height
    self.debug = debug
    self.action_space = [d for d in Direction]
    self.reset()

  def reset(self):
    self.cur_pos = Point(x=0, y=(self.height - 1))
    self.goal = Point(x=(self.width - 1), y=0)
    # If debug, print state
    if self.debug:
      print(self)
    return self.cur_pos, 0, False

  def step(self, action: Direction):
    # Depending on the action, mutate the environment state
    if action == Direction.NORTH:
      self.cur_pos = Point(self.cur_pos.x, self.cur_pos.y + 1)
    elif action == Direction.EAST:
      self.cur_pos = Point(self.cur_pos.x + 1, self.cur_pos.y)
    elif action == Direction.SOUTH:
      self.cur_pos = Point(self.cur_pos.x, self.cur_pos.y - 1)
    elif action == Direction.WEST:
      self.cur_pos = Point(self.cur_pos.x - 1, self.cur_pos.y)
    # Check if out of bounds
    if self.cur_pos.x >= self.width:
      self.cur_pos = Point(self.width - 1, self.cur_pos.y)
    if self.cur_pos.y >= self.height:
      self.cur_pos = Point(self.cur_pos.x, self.height - 1)
    if self.cur_pos.x < 0:
      self.cur_pos = Point(0, self.cur_pos.y)
    if self.cur_pos.y < 0:
      self.cur_pos = Point(self.cur_pos.x, 0)

    # If at goal, terminate
    is_terminal = self.cur_pos == self.goal

    # Constant -1 reward to promote speed-to-goal

    reward = -1

    # If debug, print state
    if self.debug:
      print(self)

    return self.cur_pos, reward, is_terminal

  def peek(self, action: Direction):
  # get next position without mutating the environment
    if action == Direction.NORTH:
      new_pos = Point(self.cur_pos.x, self.cur_pos.y + 1)
    elif action == Direction.EAST:
       new_pos = Point(self.cur_pos.x + 1, self.cur_pos.y)
    elif action == Direction.SOUTH:
       new_pos = Point(self.cur_pos.x, self.cur_pos.y - 1)
    elif action == Direction.WEST:
      new_pos = Point(self.cur_pos.x - 1, self.cur_pos.y)
    # Check if out of bounds
    if new_pos.x >= self.width:
      new_pos = Point(self.width - 1, self.cur_pos.y)
    if new_pos.y >= self.height:
      new_pos = Point(self.cur_pos.x, self.height - 1)
    if new_pos.x < 0:
      new_pos = Point(0, self.cur_pos.y)
    if new_pos.y < 0:
      new_pos = Point(self.cur_pos.x, 0)
    return new_pos

  def get_valid_actions(self):
    # get only the actions that change the current position
    valid_actions = []
    for action in self.action_space:
      if self.peek(action) != self.cur_pos:
        valid_actions.append(action)
    return valid_actions

  def __repr__(self):
    res = ""
    for y in reversed(range(self.height)):
      for x in range(self.width):
        if self.goal.x == x and self.goal.y == y:
          if self.cur_pos.x == x and self.cur_pos.y == y:
            res += "@"
          else:
            res += "o"
          continue
        if self.cur_pos.x == x and self.cur_pos.y == y:
          res += "x"
        else:
          res += "_"
      res += "\n"
    return res

In [72]:
class MonteCarloGeneration(object):
  def __init__(self, env: object, max_steps: int = 1000, debug: bool = False, decay=50, min_epsilon=0.1):
    self.env = env
    self.max_steps = max_steps
    self.debug = debug
    # self.steps = np.zeros(self.num_episodes)

    # Create a Q-table dictionary and initialize it random
    self.Q_table = defaultdict(lambda: defaultdict(float))
    for x in range(self.env.width):
      for y in range(self.env.height):
        state = Point(x, y)
        for action in self.env.action_space:
            self.Q_table[state][action] = np.random.random()

  def policy_b():
    action = random.choice(self.env.action_space) # Random action => soft policy
    return action

  def run(self, n_run) -> List:
    buffer = []
    n_steps = 0 # Keep track of the number of steps so I can bail out if it takes too long
    state, _, _ = self.env.reset() # Reset environment back to start
    terminal = False
    while not terminal: # Run until terminal state

        action = self.policy_b

        next_state, reward, terminal = self.env.step(action) # Take action in environment
        buffer.append((state, action, reward)) # Store the result
        state = next_state # Ready for the next step
        n_steps += 1
        if n_steps >= self.max_steps:
          if self.debug:
            print("Terminated early due to large number of steps")
          terminal = True # Bail out if we've been working for too long
    return buffer

In [73]:
# This class is our agent
class MonteCarloExperiment(object):
  def __init__(self, generator: MonteCarloGeneration, num_episodes=1000, min_lr=0.1, min_epsilon=0.1, discount=1.0, decay=10):
    self.generator = generator
    self.num_episodes = num_episodes
    self.min_lr = min_lr
    self.min_epsilon = min_epsilon
    # Discount factor gamma
    self.discount = discount
    self.decay = decay
    self.env = env
    self.steps = np.zeros(self.num_episodes)

# Create a Q-table dictionary and initialize it random
    self.Q_table = defaultdict(lambda: defaultdict(float))
    for x in range(self.env.width):
      for y in range(self.env.height):
        state = Point(x, y)
        for action in self.env.action_space:
            self.Q_table[state][action] = np.random.rand()

# Create a C-table dictionary and initialize it with all 0.0
    self.C_table = defaultdict(lambda: defaultdict(float))
    for x in range(self.env.width):
      for y in range(self.env.height):
        state = Point(x, y)
        for action in self.env.action_space:
            self.C_table[state][action] = 0.0


  '''
  def choose_action(self, state):
    if (np.random.random() < self.epsilon):
        return random.choice(self.env.action_space)
    else:
        return max(self.env.action_space, key=lambda a: self.Q_table[state][a])
  '''

  def update_q(self, state, action, W, G):
    """
    Updates Q-table using the rule as described by Sutton and Barto in
    Reinforcement Learning.
    """

    self.Q_table[state][action] += (W/self.C_table[state][action]) * (G - self.Q_table[state][action])

  def update_c(self, state, action, W):
    """
    Updates C using the rule as described by Sutton and Barto Section 5.7.
    """

    self.C_table[state][action] += W


  def get_epsilon(self, t):
    """Gets value for epsilon. It declines as we advance in episodes."""
    # Ensures that there's almost at least a min_epsilon chance of randomly exploring
    return max(self.min_epsilon, min(1., 1. - math.log10((t + 1) / self.decay)))

  def get_learning_rate(self, t):
    """Gets value for learning rate. It declines as we advance in episodes."""
    # Learning rate also declines as we add more episodes
    return max(self.min_lr, min(1., 1. - math.log10((t + 1) / self.decay)))

  def run_episode(self, n_run) -> None:
    G = 0 # Return G
    W = 1
    trajectory = self.generator.run(n_run) # Generate a trajectory
    #print("this is trjectory: ", trajectory)
    for i, t in enumerate(reversed(trajectory)): # Starting from the terminal state
      state, action, reward = t
      G += self.discount * G + reward

      # TODO fix the states here => maybe need to be returned from run()
      # Update C
      self.update_c(state, action, W)
      # Update Q
      self.update_q(state, action, W, G)

      policy_pi = max(self.Q_table[state].values())
      if action is not policy_pi:
        break
      else:
        W += (1/random.choice(self.env.action_space))

In [76]:
def next_best_value_2d(agent):
  res = ""
  for y in reversed(range(agent.env.height)):
    for x in range(agent.env.width):
      state = Point(x, y)
      if agent.env.goal.x == x and agent.env.goal.y == y:
        res += "@"
      else:
        # Find the action that has the highest value
        best_action = max(agent.env.action_space, key=lambda a: agent.Q_table[state][a])
        q_value = agent.Q_table[state][best_action]
        res += f'{best_action.value} ({q_value:.2f})'
      res += " | "
    res += "\n"
  return res

In [77]:
env = SimpleGridWorld() # Instantiate the environment
generator = MonteCarloGeneration(env=env) # Instantiate the trajectory generator
agent = MonteCarloExperiment(generator=generator)
for i in range(1000):
  clear_output(wait=True)
  agent.run_episode(i)
  print(f"Iteration: {i}")
  print(next_best_value_2d(agent))
  # time.sleep(5)

Iteration: 999
⬇ (0.30) | ⮕ (0.89) | ⬆ (0.91) | ⮕ (0.89) | ⮕ (0.89) | 
⬇ (0.36) | ⮕ (0.97) | ⮕ (0.43) | ⮕ (0.62) | ⬆ (0.27) | 
⮕ (0.85) | ⬆ (0.60) | ⬆ (0.92) | ⬆ (0.96) | ⬅ (0.67) | 
⬇ (0.69) | ⮕ (0.94) | ⮕ (0.62) | ⬅ (0.91) | ⬇ (0.86) | 
⬅ (0.44) | ⬅ (0.65) | ⬇ (0.94) | ⬅ (0.91) | @ | 

