<a href="https://colab.research.google.com/github/eduardofae/RL/blob/main/AT-02/02%20-%20MDP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Gymnasium - criando um MDP

Gymnasium é a biblioteca que substituiu a pioneira 'gym' para definição de ambientes (MDPs) de aprendizado por reforço. No restante do notebook, manteremos o termo "interface gym", embora "interface gymnasium" seja mais preciso.

Com contribuições do "Stable Baselines3 Tutorial - Creating a custom Gym environment" - https://colab.research.google.com/github/araffin/rl-tutorial-jnrr19/blob/sb3/5_custom_gym_env.ipynb

## Introducão

Este notebook é uma introdução à interface Gym para ambientes de aprendizado por reforço. A biblioteca Gymnasium fornece uma interface que permite a modelagem do ambiente de uma forma padronizada para avaliação e teste de algoritmos de aprendizado por reforço.




##  Formato do ambiente

Um MDP é composto por estados, ações, transição e recompensa. Para modelar um MDP no gymnasium, voce deve definir duas propriedades e os métodos da interface.

### Propriedades

As propriedades são:
- `observation_space` contém que tipo de espaço gym (gym space: `Discrete`, `Box`, ...) e a forma da observação (e.g. matriz 4x3).
- `action_space` também é um objeto tipo gym space, definindo o tipo de ação que pode ser feita.

O melhor jeito de aprender sobre gym spaces é olhando o [código](https://github.com/Farama-Foundation/Gymnasium/tree/main/gymnasium/spaces). De qualquer forma, os principais tipos são::
- `gym.spaces.Box`: Um espaço (possivelmente sem limites) em $R^n$. Especificamente, Box representa o produto cartesiano de n intervalos fechados. Cada intervalo pode ser do tipo [a, b], (-oo, b], [a, oo), or (-oo, oo). Por exemplo, um vetor 1D ou uma imagem podem ser descritas como Box.
```python
# Exemplo de imagem como entrada, cada canal tem o menor valor em 0 e o maior em 255
observation_space = spaces.Box(low=0, high=255, shape=(HEIGHT, WIDTH, N_CHANNELS), dtype=np.uint8)
```

- `gym.spaces.Discrete`: um conjunto discreto $\{ 0, 1, \dots, n-1 \}$, geralmente útil para definir ações ou espaços discretos como gridworld.
  Exemplo: se você tem duas ações (esquerda e direita), você pode representar seu espaço de ações com `Discrete(2)`, e ao implementar os métodos do ambiente você faz a ação 0 representar "esquerda" e a 1 representar "direita".


### Métodos

Devem ser implementados 3 métodos obrigatórios e um opcional:

* `reset(seed)` para (re)iniciar o ambiente. Deve retornar a observação/estado inicial para o agente e um dict com informação adicional (pode ser vazio). Recebe uma semente aleatória para usar caso haja aleatoriedade. É chamado sempre que um novo episódio for começar.
* `step(action)` recebe a ação a ser realizada no ambiente. O método deve realizar a ação e retornar uma tupla: `observation, reward, terminated, truncated, info` contendo, respectivamente, a observação (estado atingido), a recompensa recebida, se o episodio terminou por atingir um estado terminal (terminated) ou se foi interrompido (truncado, e.g. limite de tentativas atingido) e um dict com informações adicionais (pode ser vazio).
* `close()`: se precisar "limpar" alguma coisa ao fechar o ambiente
* (opcional) `render(method)`: gera uma visualização do ambiente. A função recebe o método de renderização (string) e deve gerar a visualização apropriada.



## Criando um novo ambiente

Abaixo há uma implementação de um ambiente extremamente simples. É um grid unidimensional onde o agente começa na posição mais à direita e "vence" se chegar na posição mais à esquerda.

In [None]:
import numpy as np
import gymnasium as gym
import gymnasium.spaces as spaces


class GoLeftEnv(gym.Env):
  """
  Custom Environment that follows gym interface.
  This is a simple env where the agent must learn to go always left.
  """
  # Because of google colab, we cannot implement the GUI ('human' render mode)
  metadata = {'render.modes': ['console']}
  # Define constants for clearer code
  LEFT = 0
  RIGHT = 1

  def __init__(self, grid_size=10):
    super(GoLeftEnv, self).__init__()

    # Size of the 1D-grid
    self.grid_size = grid_size

    # Initialize the agent at the right of the grid
    self.agent_pos = grid_size - 1

    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions, we have two: left and right
    n_actions = 2
    self.action_space = spaces.Discrete(n_actions)

    # The observation will be the coordinate of the agent
    # this can be described both by Discrete and Box space, here we use Discrete
    # the agent can be in any position from 0 to grid_size-1
    self.observation_space = spaces.Discrete(self.grid_size)

  def reset(self, seed=None):
    """
    Important: se observation_space é Box, a observação deve ser um array numpy
    :return: Tuple[np.array, dict] (obs e info; info é sempre vazio)
    """
    # Initialize the agent at the right of the grid
    self.agent_pos = self.grid_size - 1
    obs = self.agent_pos #np.array([self.agent_pos]) #.astype(np.uint8)
    info = {}
    return obs, info


  def step(self, action):
    """
    Implementa a dinâmica do MDP. No nosso caso, basta ajustar a posição do agente
    """
    if action != self.LEFT and action != self.RIGHT:    #checks for validity
      raise ValueError(f"Invalid action={action}")

    # position decreases if we move left, and increases if we move right
    increment = -1 if action == self.LEFT else 1
    self.agent_pos += increment

    # Account for the boundaries of the world (keeps the agent between 0 and grid_size-1)
    self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size-1)

    # The game "finishes" when the agent gets to the leftmost position
    terminated = bool(self.agent_pos == 0)

    # Null reward everywhere except when reaching the goal (left of the grid)
    reward = 1 if self.agent_pos == 0 else 0

    # Optionally we can pass additional info, we are not using that for now
    info = {}

    # converts the state to a numpy array and returns the experience tuple
    obs = self.agent_pos #np.array([self.agent_pos]) #.astype(np.float32)

    # for now, the episode has no timestep limit, so, it's never truncated
    truncated = False
    return obs, reward, terminated, truncated, info

  def render(self, mode='console'):
    """
    'Desenha' o ambiente para visualização
    """
    if mode != 'console':
      raise NotImplementedError()
    # agent is represented as a cross, rest as a dot
    print("." * self.agent_pos, end="")
    print("x", end="")
    print("." * (self.grid_size-1 - self.agent_pos))


  def close(self):
    pass


### Validar o ambiente

Stable Baselines3 fornece um [helper](https://stable-baselines3.readthedocs.io/en/master/common/env_checker.html) pra verificar se o ambiente segue a interface Gym. Também checa se o ambiente é compativel com os ambientes do Stable-Baselines (e dá warning se precisar).

In [None]:
from gymnasium.utils.env_checker import check_env
env = GoLeftEnv()

try:
    check_env(env)
    print("Environment passes all checks!")
except Exception as e:
    print(f"Environment has issues: {e}")

Environment has issues: Expects the random number generator to have been generated given a seed was passed to reset. Most likely the environment reset function does not call `super().reset(seed=seed)`.


### Testando o ambiente com um agente aleatorio

In [None]:
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset(seed=None)
print('Obs space: ', env.observation_space)
print('Action space:', env.action_space)
print(f"Initial state: obs={obs}. Render:".ljust(58), end=' ')
env.render()

for step in range(50):
  print(f"Step {step + 1}:", end=' ')
  action = env.action_space.sample()
  obs, reward, term, trunc, info = env.step(action)
  print(f'obs={obs}, rwd={reward}, trunc={trunc}, term={term}. Render:'.ljust(50), end=' ')
  env.render()
  if term:
    print("Goal reached!", "reward=", reward)
    break

Obs space:  Discrete(10)
Action space: Discrete(2)
Initial state: obs=9. Render:                              .........x
Step 1: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 2: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 3: obs=6, rwd=0, trunc=False, term=False. Render:     ......x...
Step 4: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 5: obs=6, rwd=0, trunc=False, term=False. Render:     ......x...
Step 6: obs=5, rwd=0, trunc=False, term=False. Render:     .....x....
Step 7: obs=6, rwd=0, trunc=False, term=False. Render:     ......x...
Step 8: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 9: obs=6, rwd=0, trunc=False, term=False. Render:     ......x...
Step 10: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 11: obs=8, rwd=0, trunc=False, term=False. Render:     ........x.
Step 12: obs=7, rwd=0, trunc=False, term=False. Render:     .......x..
Step 13: obs=8, rwd=0, trunc=False, 

## Tarefa 1 - GoLeftEnv

Implemente e execute um agente com a política ótima no GoLeftEnv

In [None]:
## seu codigo aqui, pode se basear no agente aleatorio
env = GoLeftEnv(grid_size=10)

obs, _ = env.reset(seed=None)
print('Obs space: ', env.observation_space)
print('Action space:', env.action_space)
print(f"Initial state: obs={obs}. Render:".ljust(54), end=' ')
env.render()

step, term = 1, False
while not term:
  action = env.LEFT
  obs, reward, term, trunc, info = env.step(action)
  print(f'step={step}, obs={obs}, rwd={reward}, trunc={trunc}, term={term}. Render:'.ljust(50), end=' ')
  env.render()
  if term:
    print("Goal reached!", "reward=", reward)
  step+=1

Obs space:  Discrete(10)
Action space: Discrete(2)
Initial state: obs=9. Render:                          .........x
step=1, obs=8, rwd=0, trunc=False, term=False. Render: ........x.
step=2, obs=7, rwd=0, trunc=False, term=False. Render: .......x..
step=3, obs=6, rwd=0, trunc=False, term=False. Render: ......x...
step=4, obs=5, rwd=0, trunc=False, term=False. Render: .....x....
step=5, obs=4, rwd=0, trunc=False, term=False. Render: ....x.....
step=6, obs=3, rwd=0, trunc=False, term=False. Render: ...x......
step=7, obs=2, rwd=0, trunc=False, term=False. Render: ..x.......
step=8, obs=1, rwd=0, trunc=False, term=False. Render: .x........
step=9, obs=0, rwd=1, trunc=False, term=True. Render: x.........
Goal reached! reward= 1


### Exemplo GoLeftEnv

Para um exemplo de como instanciar um agente em um ambiente Gym, veja:  https://colab.research.google.com/drive/1ihwBXcMuVhrxk_xZjmcyAN_AOoTmRq9C#scrollTo=EIxbc7IIXL_Q

## Tarefa 2 - Grid 4x3

Implemente o ambiente do grid 4x3, preenchendo as células abaixo. Você deve permitir ao usuário especificar a recompensa de cada passo (padrão = -0.04), a probabilidade de 'escorregar' (padrão = 0.2) e o numero máximo de passos antes de encerrar o episódio. O espaço de ações deve ser discreto, com 4 ações, e o de estados também será discreto, com 12 estados (mesmo o estado 'parede' pode ser considerado nesta contagem). A convenção para numeração dos estados é (G=goal, #=parede, P=pit/buraco,S=start):
```
y=2    +----+----+----+----+
       |  8 |  9 | 10 | 11G|
       +----+----+----+----+
y=1    |  4 |  5#|  6 |  7P|
       +----+----+----+----+
y=0    |  0S|  1 |  2 |  3 |
       +----+----+----+----+
        x=0   x=1   x=2   x=3
```

Note que há métodos para converter a numeração de estados para as coordenadas x,y

In [None]:
class GridWorld4x3(gym.Env):
    """
    GridWorld 4x3 environment compatible with gymnasium.
    Actions: 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT
    """
    metadata = {"render_modes": ["human"]}

    UP = 0
    RIGHT = 1
    DOWN = 2
    LEFT = 3


    def __init__(
        self,
        reward_step: float = -0.04,
        slip: float = 0.2,
        max_steps: int = 1000,
        seed: int = None,
        render_mode = "human"
    ):
        self.reward_step = reward_step
        self.slip = slip
        self.max_steps = max_steps
        self.seed = seed

        # Grid Description
        self.grid_size = (4,3)
        self.wall  = [5]
        self.goal  = [11]
        self.pit   = [7]
        self.start = 0

        # Rewards
        self.goal_reward = 1
        self.pit_reward = -1

        # Agent Position and steps made
        self.agent_state = self.start
        self.steps = 0

        # Definition of action space
        n_actions = 4
        self.action_space = spaces.Discrete(n_actions)

        # Definition of observation space
        self.observation_space = spaces.Discrete(self.grid_size[0]*self.grid_size[1])

        # Update Render Mode
        self.metadata = {"render_modes": [render_mode]}

    # ======================
    # Métodos de conversão
    # ======================
    def pos_to_state(self, pos: tuple[int,int]) -> int:
        """Converte (x,y) → estado"""
        x, y = pos
        return y * self.grid_size[0] + x

    def state_to_pos(self, s: int) -> tuple[int,int]:
        """Converte estado → (x,y)"""
        return (s % self.grid_size[0], s // self.grid_size[0])

    # ======================
    # Gym API
    # ======================
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)

        # Reset agent position and step count
        self.agent_state = self.start
        self.steps = 0

        # Returns
        obs = self.agent_state
        info = {}
        return obs, info

    def is_vertical(self, action: int):
        return action == self.UP or action == self.DOWN

    def handle_slip(self, action: int):
        direction = 1 if np.random.rand() < 0.5 else -1
        action = action + direction
        return action%4 if action != -1 else 3

    def get_reward(self):
        if self.agent_state in self.goal:
            return self.goal_reward
        if self.agent_state in self.pit:
            return self.pit_reward
        return self.reward_step

    def step(self, action: int):
        # Get cur position
        x, y = self.state_to_pos(self.agent_state)

        # Handle slip
        if np.random.rand() < 0.2:
            action = self.handle_slip(action)

        # Update position
        if self.is_vertical(action):
            new_y = np.clip(y+(1-action), 0, self.grid_size[1]-1)
            if self.pos_to_state((x,new_y)) not in self.wall:
                y = new_y
        else:
            new_x = np.clip(x+(2-action), 0, self.grid_size[0]-1)
            if self.pos_to_state((new_x,y)) not in self.wall:
                x = new_x
        self.agent_state = self.pos_to_state((x, y))

        # Checks if goal state reached
        terminated = self.agent_state in self.goal

        # Calculates the reward
        reward = self.get_reward()

        # Not using
        info = {}

        # Returns the agent state
        obs = self.agent_state

        # Ends the episode when max steps reached
        truncated = self.steps >= self.max_steps
        self.steps += 1
        return obs, reward, terminated, truncated, info


    # ======================
    # Render
    # ======================
    def render(self, mode=None):
        div = ' ' * 7 + '+----' * self.grid_size[0] + '+'
        for i in range(self.grid_size[1]):
            y = self.grid_size[1]-i-1
            print(div)
            print(f'y={y:<5}', end='')
            for x in range(self.grid_size[0]):
                state = self.pos_to_state((x,y))
                state_class = 'G' if state in self.goal else '#' if state in self.wall else 'P' if state in self.pit else 'S' if state == self.start else ''
                state_print = (str(state) if self.agent_state != state else 'X') + state_class
                print(f'|{state_print:^4}', end='')
            print('|')
        print(div)
        print('        ', end='')
        for x in range(self.grid_size[0]):
            print(f'x={x:<2} ', end='')

    def close(self):
        pass # nao precisa implementar


In [None]:
from gymnasium.utils.env_checker import check_env

# Criar uma instância do ambiente
env = GridWorld4x3()

# This will catch many common issues
try:
    check_env(env)
    print("Environment passes all checks!")
except Exception as e:
    print(f"Environment has issues: {e}")

Environment passes all checks!


## Política simples no grid

A politica abaixo tenta se mover para a direita até atingir uma parede, e então vai pra cima.

In [None]:
# Create an instance of the environment
env = GridWorld4x3(render_mode="human")

# Define a simple policy (e.g., move right until hitting a wall, then move up)
# This is a list of actions: 1=RIGHT, 0=UP
simple_policy_actions = [1, 1, 1, 0, 1, 1, 1, 0] # Example sequence

obs, info = env.reset()
env.render()
done = False
total_reward = 0.0
step_count = 0

print("\nRunning episode with simple policy:")

for action in simple_policy_actions:
    if done:
        break
    print(f"\nTaking action: {action}")
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    done = terminated or truncated
    env.render()
    step_count += 1

print("\nEpisode finished.")
print(f"Total reward: {total_reward}")
print(f"Steps taken: {step_count}")

env.close()

       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | XS | 1  | 2  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Running episode with simple policy:

Taking action: 1
       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | X  | 2  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Taking action: 1
       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | 1  | X  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Taking action: 1
       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | 1  | 2  | X  |
       +----+----+----+----+
        x=0  

## Tarefa 3: política melhorada

Implemente uma política que você considere ótima, que mapeie qualquer estado possível para a melhor ação correspondente. A política não precisa ser realmente a ótima, mas você deve descrevê-la textualmente para vermos a consistência com o que está implementado

In [None]:
# Create an instance of the environment
env = GridWorld4x3(render_mode="human")

# Política: Sempre anda pra cima até estar na linha do topo, então vai pra direita
# Caso a parede ou o pit estejam acima, vai para a esquerda
def my_police(state: int):
    if state == 1 or state == 3:
        return env.LEFT
    if state < 8:
        return env.UP
    return env.RIGHT


obs, info = env.reset()
env.render()
done = False
total_reward = 0.0
step_count = 0

print("\nRunning episode with simple policy:")

while not done:
    action = my_police(obs)
    print(f"\nTaking action: {action}")
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    done = terminated or truncated
    env.render()
    step_count += 1

print("\nEpisode finished.")
print(f"Total reward: {total_reward}")
print(f"Steps taken: {step_count}")

env.close()

       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | XS | 1  | 2  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Running episode with simple policy:

Taking action: 0
       +----+----+----+----+
y=2    | 8  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | X  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | 1  | 2  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Taking action: 0
       +----+----+----+----+
y=2    | X  | 9  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | 1  | 2  | 3  |
       +----+----+----+----+
        x=0  x=1  x=2  x=3  
Taking action: 1
       +----+----+----+----+
y=2    | 8  | X  | 10 |11G |
       +----+----+----+----+
y=1    | 4  | 5# | 6  | 7P |
       +----+----+----+----+
y=0    | 0S | 1  | 2  | 3  |
       +----+----+----+----+
        x=0  