In [2]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces
import random

class GridWorld(gym.Env):
    """Custom Environment that follows gym interface."""

    metadata = {"render_modes": ["console"], "render_fps": 30}

    LEFT = 0
    RIGHT = 1
    UP = 2
    DOWN = 3

    def __init__(self, grid_size, render_mode="console"):
        super().__init__()

        self.render_mode = render_mode

        self.grid_size = grid_size
        
        self.agent_pos_x = random.randint(0, self.grid_size)
        self.agent_pos_y = random.randint(0, self.grid_size)

        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Box(
            low=0, high=self.grid_size, shape=(2,), dtype=np.float32
        )


    def step(self, action):
        
        match action:
            case GridWorld.LEFT:
                self.agent_pos_x -= 1
            case GridWorld.RIGHT:
                self.agent_pos_x += 1
            case GridWorld.UP:
                self.agent_pos_y += 1
            case GridWorld.DOWN:
                self.agent_pos_y -= 1
            
        self.agent_pos_x = np.clip(0, self.agent_pos_x, self.grid_size)
        self.agent_pos_y = np.clip(0, self.agent_pos_y, self.grid_size)

        observation = np.array([self.agent_pos_x, self.agent_pos_y]).astype(np.float32)
        info = {}

        terminated = bool(self.agent_pos_x == 0 and self.agent_pos_y == 0)
        truncated = False

        reward = 1 if terminated else 0

        return observation, reward, terminated, truncated, info


    def reset(self, seed=None, options=None):
        
        super().reset(seed=seed, options=options)

        random.seed = seed
        self.agent_pos_x = random.randint(0, self.grid_size)
        self.agent_pos_y = random.randint(0, self.grid_size)

        observation = np.array([self.agent_pos_x, self.agent_pos_y]).astype(np.float32)
        info = {}

        return observation, info


    def render(self):
        for i in range(self.grid_size):
            for j in range(self.grid_size):
                if self.agent_pos_x == i and self.agent_pos_y == j:
                    print("x", end="")
                else:
                    print(".", end="")
            print("")
    

    def close(self):
        pass

In [3]:
from stable_baselines3.common.env_checker import check_env

In [4]:
env = GridWorld(5)

In [5]:
check_env(env, warn=True)

In [6]:
obs, _ = env.reset()
env.render()

print(env.action_space)
print(env.observation_space)
print(env.action_space.sample())

.....
.....
.....
.....
.....
Discrete(4)
Box(0.0, 5.0, (2,), float32)
0


In [7]:
done = False
obs, _ = env.reset()

step = 1

while not done:
    if obs[0] > 0:
        action = GridWorld.LEFT
    else:
        action = GridWorld.DOWN
    
    print(f"Step {step}: {action}")

    obs, reward, terminated, truncated, _ = env.step(action)
    env.render()

    done = terminated or truncated
    if done:
        print(f"Finished at step {step} with reward {reward}")

    step += 1

Step 1: 0
.....
...x.
.....
.....
.....
Step 2: 0
...x.
.....
.....
.....
.....
Step 3: 3
..x..
.....
.....
.....
.....
Step 4: 3
.x...
.....
.....
.....
.....
Step 5: 3
x....
.....
.....
.....
.....
Finished at step 5 with reward 1


In [8]:
from stable_baselines3 import A2C
from stable_baselines3.common.env_util import make_vec_env

In [9]:
vec_env = make_vec_env(GridWorld, n_envs=1, env_kwargs=dict(grid_size=10, render_mode="console"))
env = GridWorld(grid_size=10, render_mode="console")

In [10]:
model = A2C("MlpPolicy", env, verbose=1).learn(1000)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 52.1     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 2034     |
|    iterations         | 100      |
|    time_elapsed       | 0        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -1.19    |
|    explained_variance | -43.4    |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | -0.119   |
|    value_loss         | 0.0135   |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 25.4     |
|    ep_rew_mean        | 1        |
| time/                 |          |
|    fps                | 2339     |
|    iterations         | 200      |
|    time_elapsed 

In [21]:
from IPython.display import clear_output
from time import sleep

obs = vec_env.reset()
done = False
step = 1

while not done:
    clear_output(wait=True)
    vec_env.render()
    sleep(.1)

    action = model.predict(obs, deterministic=True)
    obs, reward, done, _ = vec_env.step(action)
    
    print(f"Step: {step}, Obs: {obs}, Action: {action}, Reward: {reward}")

    if done:
        print(f"Finished at step {step} with reward {reward}")

    step += 1


.x........
..........
..........
..........
..........
..........
..........
..........
..........
..........
Step: 7, Obs: [[9. 6.]], Action: (array([3]), None), Reward: [1.]
Finished at step 7 with reward [1.]
