<a href="https://colab.research.google.com/github/AndreasKing-Goks/MIR_Reinforcement-Learning/blob/main/TP_01_The_gym_environment_%5Bgymnasium%5D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Gymnasium in Colab

This is a quick lab to learn how to use [`gymnasium`](https://gymnasium.farama.org/) a Python module wrapping several environments under the same API.

We provide a class to record videos, so that environment runs can be rendered easily in Colaboratory and Jupyter Lab.

## Installing gymnasium and the Atari ROMs

In [None]:
!pip install gymnasium
!pip install gymnasium[accept-rom-license]
!pip install gymnasium[toy_text]
!pip install gymnasium[box2d]
!pip install gymnasium[atari]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting gymnasium
  Downloading gymnasium-0.27.1-py3-none-any.whl (883 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 KB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jax-jumpy>=0.2.0
  Downloading jax_jumpy-0.2.0-py3-none-any.whl (11 kB)
Collecting gymnasium-notices>=0.0.1
  Downloading gymnasium_notices-0.0.1-py3-none-any.whl (2.8 kB)
Installing collected packages: gymnasium-notices, jax-jumpy, gymnasium
Successfully installed gymnasium-0.27.1 gymnasium-notices-0.0.1 jax-jumpy-0.2.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting autorom[accept-rom-license]~=0.4.2
  Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Collecting AutoROM.accept-rom-license
  Downloading AutoROM.accept-rom-license-0.5.4.tar.gz (12 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting r

In [None]:
import gymnasium as gym
from IPython.display import clear_output, HTML, display
import matplotlib.pyplot as plt
%matplotlib notebook

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")
  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


In [None]:
#@title Wrapper for recording an environment into a video

from __future__ import annotations

from copy import deepcopy
from typing import Any, SupportsFloat

from gymnasium.core import ActType, ObsType, RenderFrame, WrapperActType, WrapperObsType
from gymnasium.error import DependencyNotInstalled

class RecordVideo(gym.Wrapper):
    """Adapted from https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/experimental/wrappers/rendering.py#L87
    """

    def __init__(self, env):
        """Initialize a :class:`HumanRendering` instance.
        Args:
            env: The environment that is being wrapped
        """
        super().__init__(env)
        assert env.render_mode in [
            "rgb_array",
            "rgb_array_list",
        ], f"Expected env.render_mode to be one of 'rgb_array' or 'rgb_array_list' but got '{env.render_mode}'"

        if "render_fps" not in env.metadata:
            env.metadata["render_fps"] = 24

        assert (
            "render_fps" in env.metadata
        ), "The base environment must specify 'render_fps' to be used with the HumanRendering wrapper"

        if "human" not in self.metadata["render_modes"]:
            self.metadata = deepcopy(self.env.metadata)
            self.metadata["render_modes"].append("human")

        self.artists = []
        self.figure = None

    @property
    def render_mode(self):
        """Always returns ``'human'``."""
        return "human"

    def step(
        self, action: WrapperActType
    ) -> tuple[WrapperObsType, SupportsFloat, bool, bool, dict]:
        """Perform a step in the base environment and render a frame to the screen."""
        result = super().step(action)
        self._render_frame()
        return result

    def reset(
        self, *, seed: int | None = None, options: dict[str, Any] | None = None
    ) -> tuple[WrapperObsType, dict[str, Any]]:
        """Reset the base environment and render a frame to the screen."""
        result = super().reset(seed=seed, options=options)
        self._render_frame()
        return result

    def video(self):
        """This method renders all frames collected up to now."""
        if self.figure is not None:
            from IPython.display import HTML
            import matplotlib.animation

            animation = matplotlib.animation.ArtistAnimation(self.figure, self.artists,
                                                             interval=1000//self.metadata["render_fps"],
                                                             blit=True,
                                                             repeat=True,
                                                             repeat_delay=2000)
            return HTML(animation.to_html5_video())

        return None

    def _render_frame(self):
        """Fetch the last frame from the base environment and render it to the screen."""
        try:
            import matplotlib.animation
            import numpy as np
        except ImportError:
            raise DependencyNotInstalled(
                "matplotlib is not installed, run `pip install matplotlib`"
            )
        if self.env.render_mode == "rgb_array_list":
            rgb_arrays = self.env.render()
        elif self.env.render_mode == "rgb_array":
            rgb_arrays = [self.env.render()]
        else:
            raise Exception(
                f"Wrapped environment must have mode 'rgb_array' or 'rgb_array_list', actual render mode: {self.env.render_mode}"
            )

        assert isinstance(rgb_arrays, list)

        for rgb_array in rgb_arrays:
            assert isinstance(rgb_array, np.ndarray)

        if self.figure is None:
            self.figure = plt.figure()
            plt.axis('off')

        self.artists.append([plt.imshow(rgb_array) for rgb_array in rgb_arrays])

    def close(self):
        """Close the rendering window."""
        result = self.video()
        super().close()

        return result

## Toy text environments

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array"))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

## Classic control environments

In [None]:
env = RecordVideo(gym.make("CartPole-v1", render_mode="rgb_array"))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

## Atari environments

In [None]:
env = RecordVideo(gym.make("ALE/Breakout-v5", render_mode="rgb_array"))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

## 2D physics environments

In [None]:
env = RecordVideo(gym.make("LunarLander-v2", render_mode="rgb_array"))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

# The `gym` API

In [None]:
env = gym.make("FrozenLake-v1")

print("Action space: ", env.action_space)
print("Observation space: ", env.observation_space)

Action space:  Discrete(4)
Observation space:  Discrete(16)


### Exercise 1: Explore the attributes of the environment (e.g. sliperiness). What does it do?


In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", is_slippery=True))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

### Exercise 2: Modify other environement attributes (e.g. map size)

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", map_name="8x8", is_slippery=True))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

### Exercise 3: Modify other environement attributes (e.g. map size)

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", map_name="8x8", is_slippery=False))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

Now let's access the internal dynamics of the environment

Reset the environement to the standard 4x4 frozenlake.

`env.env.P` is a dictionnary containging the following

```
{state: {action: [(probability, next_state, reward, is_next_state_terminal) for each possible action outcome]}}
```

### Exercise 4: Which are the terminal states of this environment?

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", is_slippery=True))
env.reset()

for i in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)

    done = terminated or truncated
    if done:
        env.reset()

display(env.video())

<IPython.core.display.Javascript object>

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", is_slippery=True))
internal_dynamics = env.env.P
for state,properties in internal_dynamics.items():
  for action, result in properties.items():
    print(f'State : {state}, Action : {action}, Possible action outcome : {result}')

State : 0, Action : 0, Possible action outcome : [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False)]
State : 0, Action : 1, Possible action outcome : [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False)]
State : 0, Action : 2, Possible action outcome : [(0.3333333333333333, 4, 0.0, False), (0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False)]
State : 0, Action : 3, Possible action outcome : [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False)]
State : 1, Action : 0, Possible action outcome : [(0.3333333333333333, 1, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True)]
State : 1, Action : 1, Possible action outcome : [(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 5, 0.0, True), (0.3333333333333333, 2, 0.0, False)]
State : 1, Action : 2, Possible acti

In [None]:
env = RecordVideo(gym.make("FrozenLake-v1", render_mode="rgb_array", is_slippery=False))
internal_dynamics = env.env.P
for state,properties in internal_dynamics.items():
  for action, result in properties.items():
    print(f'State : {state}, Action : {action}, Possible action outcome : {result}')

State : 0, Action : 0, Possible action outcome : [(1.0, 0, 0.0, False)]
State : 0, Action : 1, Possible action outcome : [(1.0, 4, 0.0, False)]
State : 0, Action : 2, Possible action outcome : [(1.0, 1, 0.0, False)]
State : 0, Action : 3, Possible action outcome : [(1.0, 0, 0.0, False)]
State : 1, Action : 0, Possible action outcome : [(1.0, 0, 0.0, False)]
State : 1, Action : 1, Possible action outcome : [(1.0, 5, 0.0, True)]
State : 1, Action : 2, Possible action outcome : [(1.0, 2, 0.0, False)]
State : 1, Action : 3, Possible action outcome : [(1.0, 1, 0.0, False)]
State : 2, Action : 0, Possible action outcome : [(1.0, 1, 0.0, False)]
State : 2, Action : 1, Possible action outcome : [(1.0, 6, 0.0, False)]
State : 2, Action : 2, Possible action outcome : [(1.0, 3, 0.0, False)]
State : 2, Action : 3, Possible action outcome : [(1.0, 2, 0.0, False)]
State : 3, Action : 0, Possible action outcome : [(1.0, 2, 0.0, False)]
State : 3, Action : 1, Possible action outcome : [(1.0, 7, 0.0, T

### Exercise 5: Show the reward for each state transition

In [None]:
import numpy as np

# Check the number of states
state = internal_dynamics.items()

# Create container of Reward
Reward = np.zeros((len(state),1))

# Change the last state reward
Reward[(len(state)-1),0] = 1

Reward

array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [1.]])

### Exercise 6: Build the transition matrix assuming a uniform policy.

In [None]:
# Check the number of states and action
num_of_states = env.observation_space.n
num_of_action = env.action_space.n

# Create Probability Matrix container
P_matrix = np.zeros((num_of_states,num_of_states))

# Probability of taking one action
prob_taking_act = 1 / num_of_action

# Remember dict structure => state[0-15] : action[0-3] : [prob, next_state, reward, terminal_state]*every action outcome
# action outcome = result
# every action outcomes from action = internal_dynamics

# Do the looping
for i in range(num_of_states):
  # Check the state properties at grid i
  state = env.env.P[i] # From the docs
  for action in state:
    internal_dynamics = state[action] # Accessing the dict to get the action outcomes/result
    for result in internal_dynamics:
      prob = result[0]
      next_state = result[1]

      # Update the Probability Matrix
      P_matrix[i, next_state] += prob*prob_taking_act

print(P_matrix)

[[0.5  0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.   0.25 0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.   0.   0.25 0.5  0.   0.   0.   0.25 0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.   0.   0.   0.   0.
  0.   0.  ]
 [0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25 0.   0.   0.
  0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.   1.   0.   0.   0.   0.   0.   0.
  0.   0.  ]
 [0.   0.   0.   0.   0.25 0.   0.   0.   0.25 0.25 0.   0.   0.25 0.
  0.   0.  ]
 [0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.   0.25
  0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.25 0.   0.   0.25 0.   0.25 0.   0.
  0.25 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   0.   1.   0.   0.
  0.   0.  ]
 [

### Exercise 7: Solve the direct Bellman equation formulation.

In [None]:
# Chech Slide 2 page 26

# Initiate gamma
gamma = 0.9

Value_Function = np.linalg.inv(np.identity(num_of_states) - (gamma * P_matrix)) @ Reward

Value_Function

array([[ 4.02953462e-02],
       [ 3.80021094e-02],
       [ 9.06008086e-02],
       [ 3.70639671e-02],
       [ 6.04976257e-02],
       [-1.08110653e-15],
       [ 2.37003375e-01],
       [ 0.00000000e+00],
       [ 1.68085365e-01],
       [ 5.18463074e-01],
       [ 9.62747525e-01],
       [ 0.00000000e+00],
       [ 4.55649547e-16],
       [ 1.17344744e+00],
       [ 3.52341144e+00],
       [ 1.00000000e+01]])

### Exercise 8: Move the agent to an arbitrary position

The state is stored in `env.env.s`



### Exercise 9: Print information provided by each state. How does the probabilities changed between the two slipery options?