<a href="https://colab.research.google.com/github/HSaurabh0919/tresta/blob/main/Reinforcement_Learning/stabe_baselines_customenv_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install stable-baselines3[extra]

Collecting stable-baselines3[extra]
  Downloading stable_baselines3-1.5.0-py3-none-any.whl (177 kB)
[?25l[K     |█▉                              | 10 kB 12.7 MB/s eta 0:00:01[K     |███▊                            | 20 kB 19.3 MB/s eta 0:00:01[K     |█████▌                          | 30 kB 20.3 MB/s eta 0:00:01[K     |███████▍                        | 40 kB 16.5 MB/s eta 0:00:01[K     |█████████▏                      | 51 kB 6.5 MB/s eta 0:00:01[K     |███████████                     | 61 kB 7.6 MB/s eta 0:00:01[K     |█████████████                   | 71 kB 8.6 MB/s eta 0:00:01[K     |██████████████▊                 | 81 kB 8.3 MB/s eta 0:00:01[K     |████████████████▋               | 92 kB 9.2 MB/s eta 0:00:01[K     |██████████████████▍             | 102 kB 9.9 MB/s eta 0:00:01[K     |████████████████████▎           | 112 kB 9.9 MB/s eta 0:00:01[K     |██████████████████████▏         | 122 kB 9.9 MB/s eta 0:00:01[K     |████████████████████████        | 133

In [2]:
import numpy as np
import gym 
from gym import spaces

In [13]:
#Gym environment skeleton
class GoleftEnv(gym.Env):
  metadata = {'render.modes':['console']}
  LEFT = 0
  RIGHT = 1

  def __init__(self,grid_size = 10):
    super(GoleftEnv,self).__init__()

    #Size of 1d grid
    self.grid_size = grid_size 
    self.agent_pos = grid_size - 1 
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)
    self.observation_space = spaces.Box(low = 0,high=self.grid_size,shape=(1,),dtype=np.float32)

  def reset(self):
    #Initialize agent at right of the grid
    self.agent_pos = self.grid_size - 1
    return np.array([self.agent_pos]).astype(np.float32)

  def step(self,action):
    if (action==self.LEFT):
      self.agent_pos -=1
    elif (action==self.RIGHT):
      self.agent_pos +=1
    else:
      raise ValueError("Received invalid action={} which is not part of the action space".format(action))
    #Account for boundaries of the grid
    self.agent_pos = np.clip(self.agent_pos,0,self.grid_size)

    done = bool(self.agent_pos==0)

    reward = 1 if self.agent_pos==0 else 0 

    info = {}

    return np.array([self.agent_pos]).astype(np.float32), reward, done, info

  def render(self,mode='console'):
    if mode!='console':
      raise NotImplementedError()
    print("."*self.agent_pos, end="")
    print("x",end="")
    print("." * (self.grid_size - self.agent_pos))
  
  def close(self):
    pass  


In [4]:
from stable_baselines3.common.env_checker import check_env

In [14]:
env = GoleftEnv()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

In [15]:
#Test the environment
env = GoleftEnv(grid_size=10)

obs = env.reset()
env.render()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
# Hardcoded best agent: always go left!
n_steps = 20
for step in range(n_steps):
  print("Step {}".format(step + 1))
  obs, reward, done, info = env.step(GO_LEFT)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render()
  if done:
    print("Goal reached!", "reward=", reward)
    break

.........x.
Box([0.], [10.], (1,), float32)
Discrete(2)
1
Step 1
obs= [8.] reward= 0 done= False
........x..
Step 2
obs= [7.] reward= 0 done= False
.......x...
Step 3
obs= [6.] reward= 0 done= False
......x....
Step 4
obs= [5.] reward= 0 done= False
.....x.....
Step 5
obs= [4.] reward= 0 done= False
....x......
Step 6
obs= [3.] reward= 0 done= False
...x.......
Step 7
obs= [2.] reward= 0 done= False
..x........
Step 8
obs= [1.] reward= 0 done= False
.x.........
Step 9
obs= [0.] reward= 1 done= True
x..........
Goal reached! reward= 1


In [17]:
#Try baselines
from stable_baselines3 import PPO, A2C # DQN coming soon
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
env = GoleftEnv(grid_size=10)
# wrap it
env = make_vec_env(lambda: env, n_envs=1)

In [18]:
# Train the agent
model = A2C('MlpPolicy', env, verbose=1).learn(5000)

Using cpu device
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 10       |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 765      |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -0.0811  |
|    explained_variance | 0.544    |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | 0.00248  |
|    value_loss         | 0.00792  |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 9.48     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 769      |
|    iterations         | 200      |
|    time_elapsed       | 1        |
|    total_timesteps    | 1000     |
| train/             

In [19]:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  print("Step {}".format(step + 1))
  print("Action: ", action)
  obs, reward, done, info = env.step(action)
  print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render(mode='console')
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break

Step 1
Action:  [0]
obs= [[8.]] reward= [0.] done= [False]
........x..
Step 2
Action:  [0]
obs= [[7.]] reward= [0.] done= [False]
.......x...
Step 3
Action:  [0]
obs= [[6.]] reward= [0.] done= [False]
......x....
Step 4
Action:  [0]
obs= [[5.]] reward= [0.] done= [False]
.....x.....
Step 5
Action:  [0]
obs= [[4.]] reward= [0.] done= [False]
....x......
Step 6
Action:  [0]
obs= [[3.]] reward= [0.] done= [False]
...x.......
Step 7
Action:  [0]
obs= [[2.]] reward= [0.] done= [False]
..x........
Step 8
Action:  [0]
obs= [[1.]] reward= [0.] done= [False]
.x.........
Step 9
Action:  [0]
obs= [[9.]] reward= [1.] done= [ True]
.........x.
Goal reached! reward= [1.]
