# Navigating a 2D grid with an obstacle by reinforcement learning (discrete action space)

## Salient features:
1) Custom Gym environment
2) Training a reinforcement learning agent using Stable-baselines

## Important links
Stable-Baselines: https://github.com/hill-a/stable-baselines

Documentation: https://stable-baselines.readthedocs.io/en/master/

RL Baselines zoo: https://github.com/araffin/rl-baselines-zoo

In [1]:
from stable_baselines3.common.env_checker import check_env
import gym
import numpy as np
import gym
from gym import spaces
from stable_baselines3 import DQN, A2C
from stable_baselines3.common.env_util import make_vec_env

## The gym interface

The gym interface provides mainly three methods:
- `reset()` called at the beginning of an episode, it returns an observation
- `step(action)` called to take an action with the environment, it returns the next observation, the immediate reward, whether the episode is over and additional information
- (Optional) `render(method='human')` which allow to visualize the agent in action. Note that graphical interface does not work on google colab, so we cannot use it directly (we have to rely on `method='rbg_array'` to retrieve an image of the scene

Under the hood, it also contains two useful properties:
- `observation_space` which one of the gym spaces (`Discrete`, `Box`, ...) and describe the type and shape of the observation
- `action_space` which is also a gym space object that describes the action space, so the type of action that can be taken

[Documentation on custom env](https://stable-baselines.readthedocs.io/en/master/guide/custom_env.html)

##  Gym environment

In [2]:
class ReachEndEnv(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where the agent must learn to reach the destination with obstacles in between. 
    """
    # Because of google colab, we cannot implement the GUI ('human' render mode)
    metadata = {'render.modes': ['console']}
    # Define constants for clearer code
    LEFT = 0
    RIGHT = 1
    UP = 2
    DOWN = 3

    def __init__(self, grid_size=4,block_low=1,block_high=2):
        super(ReachEndEnv, self).__init__()

        # Size of the 2D-grid
        self.grid_size = grid_size
        # Initialize the agent at the top left corner
        self.initial_pos = np.zeros((2,),dtype=np.float32) 
        self.agent_pos = np.zeros((2,),dtype=np.float32)
        # Specify the blocked region
        self.block_low =block_low
        self.block_high =block_high
        # Define action and observation space
        # They must be gym.spaces objects
        # Example when using discrete actions, we have two: left and right
        n_actions = 4
        self.action_space = spaces.Discrete(n_actions)
        # The observation will be the coordinate of the agent
        # this can be described both by Discrete and Box space
        self.observation_space = spaces.Box(low=0, high=self.grid_size, shape=(2,), dtype=np.float32)
        # Check if goal is reached
        self.endcheck = False

    def reset(self):
        """
        Important: the observation must be a numpy array
        :return: (np.array) 
        """
        # Initialize the agent at the right of the grid
        self.agent_pos = np.zeros((2,),dtype=np.float32)#self.initial_pos
        # here we convert to float32 to make it more general (in case we want to use continuous actions)
        # return np.array([self.agent_pos]).astype(np.float32)
        return self.agent_pos

    def step(self, action):
        if action == self.LEFT:
            self.agent_pos[1] -= 1.0
        elif action == self.RIGHT:
            self.agent_pos[1] += 1.0
        elif action == self.UP:
            self.agent_pos[0] -= 1.0
        elif action == self.DOWN:
            self.agent_pos[0] += 1.0
        else:
            raise ValueError("Received invalid action={} which is not part of the action space".format(action))

        # Account for the boundaries of the grid
        self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size-1)

        # Are we at the end?
        done = bool(self.agent_pos[0] == self.grid_size-1 and self.agent_pos[1]==self.grid_size-1)

        # Null reward everywhere except when reaching the goal (right corner)
        if self.agent_pos[0] == self.grid_size-1 and self.agent_pos[1]==self.grid_size-1: 
            reward = 1
        elif self.agent_pos[0]>=self.block_low and self.agent_pos[0]<=self.block_high and \
        self.agent_pos[1]>=self.block_low and self.agent_pos[1]<=self.block_high:
            reward = -10
        else:
            reward = -1

        # Optionally we can pass additional info, we are not using that for now
        info = {}
        self.endcheck = done

        return self.agent_pos, reward, done, info

    def render(self, mode='console'):
        if mode != 'console':
            raise NotImplementedError()
        # agent is represented as a cross, rest as a dot
        if self.endcheck:
            for i in range(self.grid_size-1):
                print ('.'*self.grid_size)
            print ('.'*(self.grid_size-1)+'x')
        else:
            for i in range(self.grid_size):                
                if i==int(self.agent_pos[0]):
                    print('.'*int(self.agent_pos[1]) + 'x' + '.'*(self.grid_size-int(self.agent_pos[1])-1))
                else:
                    print ('.'*self.grid_size)

    def close(self):
        pass


### Validate the environment

Stable Baselines provides a [helper](https://stable-baselines.readthedocs.io/en/master/common/env_checker.html) to check that your environment follows the Gym interface. It also optionally checks that the environment is compatible with Stable-Baselines (and emits warning if necessary).

In [5]:
# Instantiate the env
env = ReachEndEnv(grid_size=4,block_low=3,block_high=)
# wrap it
env = make_vec_env(lambda: env, n_envs=1)

In [4]:
# Train the agent
model = DQN('MlpPolicy', env, train_freq= 4, tensorboard_log='./log_files/',verbose=1).learn(1000000)

Using cuda device
Logging to ./log_files/DQN_4
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 617      |
|    ep_rew_mean      | -1.4e+03 |
|    exploration_rate | 0.977    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 25257    |
|    time_elapsed     | 0        |
|    total_timesteps  | 2467     |
----------------------------------
-----------------------------------
| rollout/            |           |
|    ep_len_mean      | 742       |
|    ep_rew_mean      | -1.75e+03 |
|    exploration_rate | 0.944     |
| time/               |           |
|    episodes         | 8         |
|    fps              | 13635     |
|    time_elapsed     | 0         |
|    total_timesteps  | 5939      |
-----------------------------------
-----------------------------------
| rollout/            |           |
|    ep_len_mean      | 636       |
|    ep_rew_mean      | -1.54e+03 |
|    exploration_rate | 0.92

In [9]:
# Test the trained agent
obs = env.reset()
# #print('obs=', obs, 'reward=', reward, 'done=', done)
# env.render(mode='console')
OBS = np.zeros((7,2))
n_steps = 10
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic=True)
    print("Step {}".format(step + 1))
    print("Action: ", action)
    obs, reward, done, info = env.step(action)
    print('obs=', obs, 'reward=', reward, 'done=', done)
    env.render(mode='console')
    OBS[step+1] = obs
    if done:
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        print("Goal reached!", "reward=", reward)
        break

Step 1
Action:  [3]
obs= [[1. 0.]] reward= [-1.] done= [False]
....
x...
....
....
Step 2
Action:  [3]
obs= [[2. 0.]] reward= [-1.] done= [False]
....
....
x...
....
Step 3
Action:  [3]
obs= [[3. 0.]] reward= [-1.] done= [False]
....
....
....
x...
Step 4
Action:  [1]
obs= [[3. 1.]] reward= [-1.] done= [False]
....
....
....
.x..
Step 5
Action:  [1]
obs= [[3. 2.]] reward= [-1.] done= [False]
....
....
....
..x.
Step 6
Action:  [1]
obs= [[0. 0.]] reward= [1.] done= [ True]
....
....
....
...x
Goal reached! reward= [1.]


In [6]:
# model.save('2d_grid_obstacle10x10')

In [16]:
model.save('2d_grid_obstacle4x4')

In [None]:
model = DQN.load('2d_grid_obstacle4x4')

In [13]:
from scipy import io
io.savemat('2d_grid_obstacle.mat',{'OBS':OBS})