# Gymnasium - criando um MDP

Baseado no "Stable Baselines3 Tutorial - Creating a custom Gym environment" - https://colab.research.google.com/github/araffin/rl-tutorial-jnrr19/blob/sb3/5_custom_gym_env.ipynb

## Introducão

Este notebook é uma introdução à interface Gym para ambientes de aprendizado por reforço. A biblioteca Gymnasium fornece uma interface que permite a modelagem do ambiente de uma forma padronizada para avaliação e teste de algoritmos de aprendizado por reforço.

Neste notebook, veremos como usar um algoritmo já pronto em um ambiente novo.

## Instalação do Stable Baselines3 (incluindo gymnasium) usando Pip

A célula abaixo instala no ambiente de execução do Colab a biblioteca Stable Baselines3 de algoritmos de RL. Uma das dependências é a gymnasium, que fornece a interface padronizada de ambientes.



In [None]:
!pip install "stable-baselines3>=2.0.0a4"

Collecting stable-baselines3>=2.0.0a4
  Downloading stable_baselines3-2.4.0a11-py3-none-any.whl.metadata (4.5 kB)
Collecting gymnasium<1.1.0,>=0.29.1 (from stable-baselines3>=2.0.0a4)
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium<1.1.0,>=0.29.1->stable-baselines3>=2.0.0a4)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading stable_baselines3-2.4.0a11-py3-none-any.whl (183 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.9/183.9 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, stable-baselines3
Successfully installed farama-notifications-0.0.4 gymn

##  Formato do ambiente

Um MDP é composto por estados, ações, transição e recompensa. Para modelar um MDP no gymnasium, voce deve definir duas propriedades e os métodos da interface.

### Propriedades

As propriedades são:
- `observation_space` contém que tipo de espaço gym (gym space: `Discrete`, `Box`, ...) e a forma da observação (e.g. matriz 4x3).
- `action_space` também é um objeto tipo gym space, definindo o tipo de ação que pode ser feita.

O melhor jeito de aprender sobre gym spaces é olhando o [código](https://github.com/Farama-Foundation/Gymnasium/tree/main/gymnasium/spaces). De qualquer forma, os principais tipos são::
- `gym.spaces.Box`: Um espaço (possivelmente sem limites) em $R^n$. Especificamente, Box representa o produto cartesiano de n intervalos fechados. Cada intervalo pode ser do tipo [a, b], (-oo, b], [a, oo), or (-oo, oo). Por exemplo, um vetor 1D ou uma imagem podem ser descritas como Box.

- `gym.spaces.Discrete`: um conjunto discreto $\{ 0, 1, \dots, n-1 \}$, geralmente útil para definir ações.
  Exemplo: se você tem duas ações (esquerda e direita), você pode representar seu espaço de ações com `Discrete(2)`, e ao implementar os métodos do ambiente você faz a ação 0 representar "esquerda" e a 1 representar "direita".


### Métodos

Devem ser implementados 3 métodos obrigatórios e um opcional:

* `reset(seed)` para (re)iniciar o ambiente. Deve retornar a observação/estado inicial para o agente e um dict com informação adicional (pode ser vazio). Recebe uma semente aleatória para usar caso haja aleatoriedade. É chamado sempre que um novo episódio for começar.
* `step(action)` recebe a ação a ser realizada no ambiente. O método deve realizar a ação e retornar uma tupla: `observation, reward, terminated, truncated, info` contendo, respectivamente, a observação (estado atingido), a recompensa recebida, se o episodio terminou por atingir um estado terminal (terminated) ou se foi interrompido (truncado, e.g. limite de tentativas atingido) e um dict com informações adicionais (pode ser vazio).
* `close()`: se precisar "limpar" alguma coisa ao fechar o ambiente
* (opcional) `render(method)`: gera uma visualização do ambiente. A função recebe o método de renderização (string) e deve gerar a visualização apropriada.



## Ambiente já existente

Vamos usar um ambiente dentre os vários que implementam a interface gym. No caso, vamos usar o CartPole. Documentação [aqui](https://gymnasium.farama.org/environments/classic_control/cart_pole/).

In [None]:
from sklearn.preprocessing import KBinsDiscretizer
import numpy as np
import time, math, random
from typing import Tuple
import matplotlib.pyplot as plt
from matplotlib import animation
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
%matplotlib inline

import gymnasium as gym

In [None]:
env = gym.make("CartPole-v1")

# Box(4,) means that it is a Vector with 4 components
print("Observation space:", env.observation_space)
# Discrete(2) means that there is two discrete actions
print("Action space:", env.action_space)

# The reset method is called at the beginning of an episode
obs = env.reset()
# Sample a random action
action = env.action_space.sample()
print("Sampled action:", action)
obs, reward, truncated, terminated, info = env.step(action)
# Note the obs is a numpy array
# info is an empty dict for now but can contain any debugging info
# reward is a scalar
print(obs, reward, truncated, terminated, info)

Observation space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Action space: Discrete(2)
Sampled action: 0
[-0.03704422 -0.1750163   0.00064026  0.2645507 ] 1.0 False False {}


### Executando um agente no ambiente

Uma vez que o ambiente segue a interface gym, é bem facil plugar qualquer algoritmo do stable-baselines.

Utilizaremos aqui o **Deep Q-learning**, utilizando uma rede **neural multilayer perceptron** para aproximar funções.

In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Train the agent
#model = PPO('MlpPolicy', env, verbose=1).learn(5000)
# Crie e treine o agente PPO
model = DQN("MlpPolicy", env, verbose=0)
model.learn(total_timesteps=5000)

<stable_baselines3.dqn.dqn.DQN at 0x7a33a05f8be0>

### Avaliação do agente treinado
A celula abaixo verifica a execução do agente

In [None]:
from stable_baselines3.common.evaluation import evaluate_policy
import numpy as np
# Avalie o agente treinado
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=100)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

# Feche o ambiente após a renderização
env.close()



mean_reward:9.48 +/- 0.93


## Criando um novo ambiente

Abaixo há uma implementação de um ambiente extremamente simples. É um grid unidimensional onde o agente começa na posição mais à direita e "vence" se chegar na posição mais à esquerda.

In [None]:
import numpy as np
import gymnasium as gym
import gymnasium.spaces as spaces


class GoLeftEnv(gym.Env):
  """
  Custom Environment that follows gym interface.
  This is a simple env where the agent must learn to go always left.
  """
  # Usando renderização console, em vez de GUI ('human' render mode)
  metadata = {'render.modes': ['console']}
  # Define constants for clearer code
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # Size of the 1D-grid
    self.grid_size = grid_size

    # Initialize the agent at the right of the grid
    self.agent_pos = grid_size - 1

    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions, we have two: left and right
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)

    # The observation will be the coordinate of the agent
    # this can be described both by Discrete and Box space, here we use Discrete
    # the agent can be in any position from 0 to grid_size-1
    self.observation_space = spaces.Discrete(self.grid_size)

  def reset(self, seed=None):
    """
    Important: se observation_space é Box, a observação deve ser um array numpy
    :return: Tuple[np.array, dict] (obs e info; info é sempre vazio)
    """
    # Initialize the agent at the right of the grid
    self.agent_pos = self.grid_size - 1
    obs = self.agent_pos #np.array([self.agent_pos]) #.astype(np.uint8)
    info = {}
    return obs, info


  def step(self, action):
    """
    Implementa a dinâmica do MDP. No nosso caso, basta ajustar a posição do agente
    """
    if action != self.LEFT and action != self.RIGHT:    #checks for validity
      raise ValueError(f"Invalid action={action}")

    # position decreases if we move left, and increases if we move right
    increment = -1 if action == self.LEFT else 1
    self.agent_pos += increment

    # Account for the boundaries of the world (keeps the agent between 0 and grid_size-1)
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size-1)

    # The game "finishes" when the agent gets to the leftmost position
    terminated = bool(self.agent_pos == 0)

    # zero reward everywhere except when reaching the goal (left of the grid)
    reward = 1 if self.agent_pos == 0 else 0

    # Optionally we can pass additional info, we are not using that for now
    info = {}

    # converts the state to a numpy array and returns the experience tuple
    obs = self.agent_pos

    # for now, the episode has no timestep limit, so, it's never truncated
    truncated = False
    return obs, reward, terminated, truncated, info

  def render(self, mode='console'):
    """
    'Desenha' o ambiente para visualização
    """
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size-1 - self.agent_pos))


  def close(self):
    pass


### Validar o ambiente

Stable Baselines3 fornece um [helper](https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html) pra verificar se o ambiente segue a interface Gym. Também checa se o ambiente é compativel com os ambientes do Stable-Baselines (e dá warning se precisar).

In [None]:
from stable_baselines3.common.env_checker import check_env
env = GoLeftEnv()

# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

### Testando o ambiente com um **agente aleatorio**

In [None]:
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset(seed=None)
print('Obs space: ', env.observation_space)
print('Action space:', env.action_space)
print(f"Initial state: obs={obs}. Render:".ljust(58), end=' ')
env.render()

for step in range(50):
  print(f"Step {step + 1}:", end=' ')
  action = env.action_space.sample()#seleciona uma ação aleatória
  obs, reward, term, trunc, info = env.step(action)
  print(f'obs={obs}, rwd={reward}, trunc={trunc}, term={term}. Render:'.ljust(50), end=' ')
  env.render()
  if term:
    print("Goal reached!", "reward=", reward)
    break

Obs space:  Discrete(10)
Action space: Discrete(2)
Initial state: obs=9. Render:                              .........x
Step 1: obs=9, rwd=0, trunc=False, term=False. Render:     .........x
Step 2: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 3: obs=9, rwd=0, trunc=False, term=False. Render:     .........x
Step 4: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 5: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 6: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 7: obs=9, rwd=0, trunc=False, term=False. Render:     .........x
Step 8: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 9: obs=9, rwd=0, trunc=False, term=False. Render:     .........x
Step 10: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 11: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 12: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 13: obs=9, rwd=0, trunc=False, 

### Tente com algum algoritmo do Stable-Baselines

Uma vez que o ambiente segue a interface gym, é bem facil plugar qualquer algoritmo do stable-baselines

In [None]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
env = GoLeftEnv(grid_size=10)

In [None]:
# Train the agent
obs, _ = env.reset()
model = DQN('MlpPolicy', env, verbose=0).learn(300)

## Teste o agente treinado

In [None]:
# Test the trained agent
obs, _ = env.reset()
n_steps = 20
for step in range(n_steps):
  action, _ = model.predict(obs, deterministic=True)
  obs, reward, term, trunc, info = env.step(action)
  print(f"Step {step + 1}:", end=' ')
  env.render(mode='console')
  if term:
    print("Goal reached!", "reward=", reward)
    break

Step 1: ........x.
Step 2: .......x..
Step 3: ......x...
Step 4: .....x....
Step 5: ....x.....
Step 6: ...x......
Step 7: ..x.......
Step 8: .x........
Step 9: x.........
Goal reached! reward= 1
