# GridWorld Reinforcement Learning

Reinforcement Learning implementation of GridWorld in [Reinforcement Learning: An Introduction, p72](https://web.stanford.edu/class/psych209/Readings/SuttonBartoIPRLBook2ndEd.pdf) and [Markov Decision Process and Exact Solution Methods](https://people.eecs.berkeley.edu/~pabbeel/cs287-fa19/slides/Lec2-mdps-exact-methods.pdf). The following descibes the environment and the agent:

- The agent (robot) lives in a grid
- Walls block the agent’s path
- The agent’s actions do not always go as planned:
    - 80% of the time, the action North takes the agent North (if there is no wall there)
    - 10% of the time, North takes the agent West; 10% East
    - If there is a wall in the direction the agent would have been taken, the agent stays put
    
The first section solves the model-based version of the problem. The second section solves the model-free version of the problem.

## Model-Based Reinforcement Learning

Create a `GridWorldMB` class for the model-based problem.

In [1]:
class GridWorldMB():
  # actions are the four primary directions
  actions = [[0,1], [0,-1], [-1,0], [1,0]]

  def __init__(self, size, gamma = 0.9, reward = 0, noise = 0.2):
    self.size = size          # dimension of the GridWorld
    self.blocked = []         # blocked states
    self.terminal = []        # terminal states
    self.gamma = gamma        # decay parameter
    self.r = reward           # reward per step
    self.noise = noise        # noise of action
    self.N = self.size[0] * self.size[1]
    self.Value = [0 for _ in range(self.N)]
    self.Policy = ["" for _ in range(self.N)]

  # Functions to convert from state to coord and vice versa
  def to_state(self, coord):
    return self.size[1] * coord[0] + coord[1]
  def to_coord(self, state):
    return [state//self.size[1], state%self.size[1]]

  # Functions to add blocked and terminal coordinates
  def add_blocked(self, coord):
    self.blocked.append(self.to_state(coord))
  def add_terminal(self, coord, val):
    self.terminal.append(self.to_state(coord))
    self.Value[self.to_state(coord)] = val

  # Move function
  def move(self, coord, action):
    coord_new = coord.copy()

    # Move (no restrictions)
    coord_new[0] += action[0]
    coord_new[1] += action[1]

    # Check for walls
    coord_new[0] = min(max(0,coord_new[0]),self.size[0]-1)
    coord_new[1] = min(max(0,coord_new[1]),self.size[1]-1)

    # Check for blocked coords
    return coord if self.to_state(coord_new) in self.blocked else coord_new

  # Value Iteration
  def computeValue(self, max_iter = 1000):

    for _ in range(max_iter):
      newValue = self.Value.copy()   # Create new copy
      maxValue = 0                   # Variable for max across all states

      # Check all states
      for s in range(self.N):
        if s in self.terminal + self.blocked:   # Skip terminal and blocked states
          continue

        # Current coordinate
        coord_curr = self.to_coord(s)
        max_val = -1e9

        # Run through all actions
        for a in self.actions:
          val = 0

          # Check all possible next states
          for a_next in self.actions:
            
            # (1 - noise) if correct next state
            if a_next == a:
              prob = 1 - self.noise

            # noise / 2 if adjacent next state
            elif a_next != [-a[0], -a[1]]:
              prob = self.noise / 2
            
            # 0 if opposite next state
            else:
              prob = 0

            s_next = self.to_state(self.move(coord_curr, a_next))
            
            # Get sum 
            val += prob * (self.r + self.gamma * self.Value[s_next])

          # Get max across all actions
          max_val = max(max_val, val)

        # Update max value across all states
        maxValue = max(maxValue, abs(max_val - newValue[s]))

        # Update current state
        newValue[s] = max_val

      # Replace variable
      self.Value = newValue

      # Break if threshold is reached
      if maxValue < 1e-20:
        break

    return self.Value

  # Find Optimal Policy
  def computePolicy(self):
    for s in range(self.N):
      if s in self.terminal + self.blocked:   # Skip terminal and blocked states
        continue
      
      max_val = -1e9

      for a in self.actions:
        if max_val < self.Value[self.to_state(self.move(self.to_coord(s), a))]:
          max_val = self.Value[self.to_state(self.move(self.to_coord(s), a))]
          self.Policy[s] = a

    return self.Policy

  # Convert action to arrows
  def convert(self, action):
    if action == [1,0]:
      return "→"
    if action == [-1,0]:
      return "←"
    if action == [0,1]:
      return "↑"
    if action == [0,-1]:
      return "↓"
    return "." 
  
  def result(self):
    self.Value = self.computeValue()
    self.Policy = self.computePolicy()
    
    print("Value")
    for y in range(self.size[1]-1,-1,-1):
      out = ""
      for x in range(self.size[0]):
        out += str(round(self.Value[self.to_state([x,y])],2)) + "\t"
      print(out)

    print("\nPolicy")
    for y in range(self.size[1]-1,-1,-1):
      out = ""
      for x in range(self.size[0]):
        out += self.convert(self.Policy[self.to_state([x,y])]) + "\t"
      print(out)

## Model-Free Reinforcement Learning

### Create Environment

Create a `GridWorldMF` class for the model-free problem.

In [2]:
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random
import cv2

In [3]:
class GridWorldMF(Env):
  # render parameters
  scale = 100
  robot = cv2.imread("assets/Robot.jpg") / 255
  robot = cv2.resize(robot, (scale, scale))

  def __init__(self, size, start = [0,0], noise = 0.2, reward = 0):
    self.observation_space = Box(low = 0, high = max(size), shape=(1, 2))
    self.action_space = Discrete(4)   # four primary directions
    self.state = start                # current state
    self.start = start                # starting state
    self.size = size                  # dimension of GridWorld
    self.noise = noise                # noise of action
    self.reward = reward              # reward per step
    self.canvas = np.ones((self.size[1] * self.scale, self.size[0] * self.scale, 3)) * 1 # canvas size
    self.blocked = []
    self.terminal = {}
    
  # Functions to convert from coord to state
  def to_state(self, coord):
    return self.size[1] * coord[0] + coord[1]

  # Functions to add blocked and terminal coordinates
  def add_blocked(self, coord):
    y = self.size[1] - coord[1]
    x = coord[0]
    # render canvas black
    self.canvas[y*self.scale-self.scale : y*self.scale, x*self.scale : x*self.scale+self.scale] = [0,0,0]
    self.blocked.append(self.to_state(coord))

  def add_terminal(self, coord, val):
    y = self.size[1] - coord[1]
    x = coord[0]
    adj = 1 - abs(val) / 10
    # render canvas green/red
    if val > 0:
      self.canvas[y*self.scale-self.scale : y*self.scale, x*self.scale : x*self.scale+self.scale] = [adj,1,adj]
    if val < 0:
      self.canvas[y*self.scale-self.scale : y*self.scale, x*self.scale : x*self.scale+self.scale] = [adj,adj,1]
    self.terminal[self.to_state(coord)] = val
      
  def step(self, action):
    cs = self.state.copy()
    
    # noise
    p = random.random()
    if p < self.noise / 2:
      action += 1
    elif p < self.noise:
      action -= 1

    if action%4 == 0: # up
      cs = [cs[0], cs[1] + 1]
    elif action%4 == 1: # right
      cs = [cs[0] + 1, cs[1]]
    elif action%4 == 2: # down
      cs = [cs[0], cs[1] - 1]
    elif action%4 == 3: # left
      cs = [cs[0] - 1, cs[1]]
    
    # Check walls and blocked
    if not (self.to_state(cs) in self.blocked):
      self.state = [min(self.size[0]-1,max(cs[0],0)), min(self.size[1]-1,max(cs[1],0))]

    # Calculate reward
    if self.to_state(self.state) in self.terminal:
      reward = self.terminal[self.to_state(self.state)]
    else:
      reward = self.reward
    
    # Check if is done
    if self.to_state(self.state) in self.terminal: 
      done = True
    else:
      done = False

    # Set placeholder for info
    info = {}
    
    # Return step information
    return self.state, reward, done, info
  
  # Add grids to render
  def add_grids(self, canvas):
    for x in range(0, self.size[0]):
      for y in range(0, self.size[1]):
        canvas[y*self.scale, 0:self.size[0]*self.scale] = [0,0,0]
        canvas[0:self.size[1]*self.scale, x*self.scale] = [0,0,0]
    return canvas
  
  # Overlay robot
  def overlay_robot(self):
    y = self.size[1] - self.state[1]
    x = self.state[0]
    new_canvas = self.canvas.copy()
    for nx in range(x*self.scale, x*self.scale+self.scale):
      for ny in range(y*self.scale-self.scale, y*self.scale):
        if np.sum(self.robot[ny - y*self.scale, nx - x*self.scale]) > 2.9:
          new_canvas[ny, nx] = self.canvas[ny, nx]
        else:
          new_canvas[ny, nx] = self.robot[ny - y*self.scale, nx - x*self.scale]
    return new_canvas

  def render(self, mode = "human"):
    new_canvas = self.overlay_robot()
    new_canvas = self.add_grids(new_canvas)
    if mode == "human":
      cv2.imshow("GridWorld", new_canvas)
      cv2.waitKey(500) 
    else:
      return new_canvas
  
  def reset(self):
    # Reset position
    self.state = self.start
    return self.state
  
  def close(self):
    # Close all render window
    cv2.destroyAllWindows()

### Create Reinforcement Learning Model

In [4]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam

In [5]:
def build_model(states, actions):
    model = Sequential()
    model.add(Flatten(input_shape=states))
    model.add(Dense(100, activation='relu'))
    model.add(Dense(80, activation='relu'))
    model.add(Dense(60, activation='relu'))
    model.add(Dense(40, activation='relu'))
    model.add(Dense(20, activation='relu'))
    model.add(Dense(10, activation='relu'))
    model.add(Dense(actions, activation='linear'))
    return model

### Build Agent

In [6]:
from rl.agents import DQNAgent, SARSAAgent
from rl.policy import BoltzmannQPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory

In [7]:
def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=50000, window_length=1)
    agent = DQNAgent(model=model, memory=memory, policy=policy,
                  nb_actions=actions, nb_steps_warmup=100, target_model_update=1e-2)
    # policy = EpsGreedyQPolicy()
    # agent = SARSAAgent(model=model, policy=policy, nb_actions=actions, nb_steps_warmup=10)
    return agent

### Train Agent

In [8]:
def train_agent(env):
    states = env.observation_space.shape
    actions = env.action_space.n
    model = build_model(states, actions)
    agent = build_agent(model, actions)
    agent.compile(Adam(learning_rate=1e-2), metrics=['mae'])
    agent.fit(env, nb_steps=10000, visualize=False, verbose=1)
    return agent

## Example

The example below is based on the following environment.

<img src="assets/Cliff.png" width="300">

### Model-Based Case

In [9]:
g = GridWorldMB([5,5])
g.add_blocked([1,2])
g.add_blocked([1,3])
g.add_blocked([3,2])
g.add_terminal([0,0], -10)
g.add_terminal([1,0], -10)
g.add_terminal([2,0], -10)
g.add_terminal([3,0], -10)
g.add_terminal([4,0], -10)
g.add_terminal([2,2], 1)
g.add_terminal([4,2], 10)

g.result()

Value
4.48	5.17	5.88	6.68	7.51	
3.93	0	6.03	7.51	8.65	
3.45	0	1	0	10	
2.93	2.0	3.31	5.72	8.48	
-10	-10	-10	-10	-10	

Policy
→	→	→	↓	↓	
↑	.	→	→	↓	
↑	.	.	.	.	
↑	→	→	→	↑	
.	.	.	.	.	


### Model-Free Case

In [10]:
env = GridWorldMF([5,5], start=[0,1], reward=-1)
env.add_blocked([1,2])
env.add_blocked([1,3])
env.add_blocked([3,2])
env.add_terminal([0,0], -10)
env.add_terminal([1,0], -10)
env.add_terminal([2,0], -10)
env.add_terminal([3,0], -10)
env.add_terminal([4,0], -10)
env.add_terminal([2,2], 1)
env.add_terminal([4,2], 10)

In [11]:
agent = train_agent(env)

Training for 10000 steps ...
Interval 1 (0 steps performed)


  updates=self.state_updates,


done, took 93.757 seconds


In [12]:
scores = agent.test(env, nb_episodes=10, visualize=True)
env.close()

Testing for 10 episodes ...
Episode 1: reward: -1.000, steps: 12
Episode 2: reward: 2.000, steps: 9
Episode 3: reward: -11.000, steps: 2
Episode 4: reward: -3.000, steps: 14
Episode 5: reward: 0.000, steps: 11
Episode 6: reward: 1.000, steps: 10
Episode 7: reward: 1.000, steps: 10
Episode 8: reward: -18.000, steps: 29
Episode 9: reward: 0.000, steps: 11
Episode 10: reward: 1.000, steps: 10


## Combined GridWorld Class

In [None]:
import gym
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np
import random
import cv2

In [None]:
class GridWorld(Env):
  # Action space
  actions = [0, 1, 2, 3]  # up, right, down, left
  convert = {0: "↑", 1: "→", 2: "↓", 3: "←"}

  # Agent rendering
  scale = 100
  agent = cv2.resize(cv2.imread("assets/Robot.jpg")/255, (scale, scale))
  
  def __init__(self, size, start = [0,0], noise = 0.2, reward = 0, gamma = 0.9):
    # Open AI Gym parameters
    self.action_space = Discrete(4)
    self.observation_space = Box(low = 0, high = max(size), shape=(1, 2))
    self.length = 1000
    
    # Environment and State parameters
    self.blocked = []
    self.terminal = {}
    self.size = size
    self.N = self.size[0] * self.size[1]
    self.state = start
    self.start = start

    # Other parameters
    self.noise = noise
    self.reward = reward
    self.gamma = gamma

    # MDP initialization
    self.value = [0 for _ in range(self.N)]
    self.policy = [-1 for _ in range(self.N)]
    
    # Canvas initialization
    self.canvas = np.ones((self.size[1] * self.scale, self.size[0] * self.scale, 3)) * 1
    self.bumped = -1
    self.oops = ""

  def to_state(self, coord):
    return self.size[1] * coord[0] + coord[1]
  def to_coord(self, state):
    return [state//self.size[1], state%self.size[1]]  

  def add_blocked(self, coord):
    self.blocked.append(self.to_state(coord))

    x = coord[0]; y = self.size[1] - coord[1]
    self.canvas[y*self.scale-self.scale : y*self.scale, x*self.scale : x*self.scale+self.scale] = 0
  
  def add_terminal(self, coord, val):
    self.terminal[self.to_state(coord)] = val
    self.value[self.to_state(coord)] = val

    x = coord[0]; y = self.size[1] - coord[1]
    adj = 1 - abs(val) / 10
    self.canvas[y*self.scale-self.scale : y*self.scale, x*self.scale : x*self.scale+self.scale] = [adj,1,adj] if val > 0 else [adj,adj,1]
      
  def step(self, action, noise = None, state = None):
    curr_state = self.state if state is None else state
    curr_noise = self.noise if noise is None else noise
    self.length -= 0 if state is None else 1

    # Add noise
    p = np.random.uniform()
    if p < curr_noise / 2:
      action = (action + 1 + 4) % 4
      self.oops = "Oops..."
    elif p < curr_noise:
      action = (action - 1 + 4) % 4
      self.oops = "Oops..."
    else:
      self.oops = ""

    # Move to next state
    if action == 0: # up
      curr_state = [curr_state[0], curr_state[1] + 1]
    elif action == 1: # right
      curr_state = [curr_state[0] + 1, curr_state[1]]
    elif action == 2: # down
      curr_state = [curr_state[0], curr_state[1] - 1]
    elif action == 3: # left
      curr_state = [curr_state[0] - 1, curr_state[1]]
    
    # Check walls and blocked
    if self.to_state(curr_state) not in self.blocked:
      walled_state = [min(self.size[0]-1,max(curr_state[0],0)), 
                    min(self.size[1]-1,max(curr_state[1],0))]
      if curr_state != walled_state:
        self.bumped = action
        curr_state = walled_state
      else:
        self.bumped = -1
    else:
      curr_state = self.state if state is None else state
      self.bumped = action

    # Calculate reward
    reward = self.terminal[self.to_state(curr_state)] if self.to_state(curr_state) in self.terminal else self.reward
    
    # Check if is done
    done = self.to_state(curr_state) in self.terminal or self.length == 0

    # Set placeholder for info
    info = {}
    
    # Update self.state
    if state is None:
      self.state = curr_state

    # Return step information
    return curr_state, reward, done, info
  
  # Value Iteration
  def ValueIteration(self, max_iter = 1000, tolerance = 1e-5, show_iter = False):

    for it in range(max_iter):
      new_value = self.value.copy()   # Create new copy
      delta = 0                       # Max across all states

      # Check all states
      for state in range(self.N):
        if state in list(self.terminal.keys()) + self.blocked:   # Skip terminal and blocked states
          continue

        # Current coordinate
        curr_coord = self.to_coord(state)
        max_value = -1e9

        # Run through all actions
        for action in self.actions:
          curr_value = 0

          # Check all possible next states
          for action_next in self.actions:
            
            # (1 - noise) if correct next state
            if action_next == action:
              prob = 1 - self.noise

            # noise / 2 if adjacent next state
            elif action_next != (action + 2) % 4:
              prob = self.noise / 2
            
            # 0 if opposite next state
            else:
              prob = 0

            state_next = self.to_state(self.step(action_next, noise=0, state=curr_coord)[0])
            
            # Get sum 
            curr_value += prob * (self.reward + self.gamma * self.value[state_next])

          # Get max across all actions
          max_value = max(max_value, curr_value)

        # Update max value across all states
        delta = max(delta, abs(max_value - new_value[state]))

        # Update current state
        new_value[state] = max_value

      # Replace variable
      self.value = new_value

      # Break if threshold is reached
      if delta < tolerance:
        print("Final")
        self.OptimalPolicy()
        self.show_current(policy=True)
        break

      # Display iteration
      if show_iter:
        print(f"Iteration: {it+1}")
        self.OptimalPolicy()
        self.show_current()

  # Find Optimal Policy
  def OptimalPolicy(self):
    for state in range(self.N):
      if state in list(self.terminal.keys()) + self.blocked:   # Skip terminal and blocked states
        continue
      
      max_val = -1e9
      for action in self.actions:
        if max_val < self.value[self.to_state(self.step(action, noise=0, state=self.to_coord(state))[0])]:
          max_val = self.value[self.to_state(self.step(action, noise=0, state=self.to_coord(state))[0])]
          self.policy[state] = action

  # Convert action to arrows
  def to_arrow(self, action):
    if action in self.convert:
      return self.convert[action]
    return "." 
  
  def show_current(self, value=True, policy=False):    
    if value:
      print("Value Function")
      for y in range(self.size[1]-1,-1,-1):
        out = ""
        for x in range(self.size[0]):
          out += str(round(self.value[self.to_state([x,y])],2)) + "\t"
        print(out)

    if policy:
      print("\nCurrent Policy")
      for y in range(self.size[1]-1,-1,-1):
        out = ""
        for x in range(self.size[0]):
          out += self.to_arrow(self.policy[self.to_state([x,y])]) + "\t"
        print(out)
    print('- '*20 + '\n')

  # Add grids to render
  def add_grids(self, canvas):
    for x in range(0, self.size[0]):
      for y in range(0, self.size[1]):
        canvas[y*self.scale, 0:self.size[0]*self.scale] = 0
        canvas[0:self.size[1]*self.scale, x*self.scale] = 0
    return canvas
  
  # Overlay agent
  def overlay_agent(self):
    x = self.state[0]; y = self.size[1] - self.state[1]
    new_canvas = self.canvas.copy()
    for nx in range(x*self.scale, x*self.scale+self.scale):
      for ny in range(y*self.scale-self.scale, y*self.scale):
        # if np.array_equal(self.agent[ny - y*self.scale, nx - x*self.scale], [1,1,1]):
        if np.sum(self.agent[ny - y*self.scale, nx - x*self.scale]) > 2.9:
          new_canvas[ny, nx] = self.canvas[ny, nx]
        else:
          new_canvas[ny, nx] = self.agent[ny - y*self.scale, nx - x*self.scale]  
    return new_canvas
    
  # Other errors
  def overlay_errors(self, canvas):
    x = self.state[0]; y = self.size[1] - self.state[1]

    # Add bumped area
    x_range = range(0); y_range = range(0)
    if self.bumped == 0:
      x_range = range(x*self.scale, x*self.scale+self.scale)
      y_range = range(y*self.scale-self.scale, int((y-0.95)*self.scale))
    if self.bumped == 1:
      x_range = range(int((x+0.95)*self.scale), x*self.scale+self.scale)
      y_range = range(y*self.scale-self.scale, y*self.scale)
    if self.bumped == 2:
      x_range = range(x*self.scale, x*self.scale+self.scale)
      y_range = range(int((y+0.95)*self.scale)-self.scale, y*self.scale)
    if self.bumped == 3:
      x_range = range(x*self.scale, int((x-0.95)*self.scale+self.scale))
      y_range = range(y*self.scale-self.scale, y*self.scale)

    for nx in x_range:
      for ny in y_range: 
        canvas[ny, nx] = [226/255,43/255,138/255]
    
    # Add noise 
    pos = (0, int(self.scale*0.2))
    cv2.putText(canvas, self.oops, pos, cv2.FONT_HERSHEY_DUPLEX, fontScale=0.5, color=0)

    return canvas
    
  def render(self, mode = "human", wait=800):
    new_canvas = self.overlay_agent()
    new_canvas = self.add_grids(new_canvas)
    new_canvas = self.overlay_errors(new_canvas)
    if mode == "human":
      cv2.imshow("GridWorld", new_canvas)
      cv2.waitKey(wait) 
    else:
      return new_canvas
  
  def reset(self):
    self.state = self.start
    self.bumped = -1
    self.oops = ""
    return self.state
  
  def close(self):
    cv2.destroyAllWindows()