[Gymnasium](https://github.com/Farama-Foundation/Gymnasium) is the most common library for RL environments. It picked up and improved the abandoned [OpenAI Gym](https://github.com/openai/gym), it is compatible with a large number of other simulators (e.g., [ALE](https://github.com/Farama-Foundation/Arcade-Learning-Environment) for Atari games and [MuJoCo](https://github.com/google-deepmind/mujoco)), and we will use it for this course. 

The only things you need to know for this course are how to make environments and how to run actions to collect data. Let's look at the code below.

In [1]:
import gymnasium

env = gymnasium.make("Pendulum-v1")
s, info = env.reset(seed=42)
a = env.action_space.sample()
s_next, r, terminated, truncated, info = env.step(a)

`gymnasium.make(env_id)` makes an instance of the environment. In this case, we are making the [`Pendulum-v1` environment](https://gymnasium.farama.org/environments/classic_control/pendulum/), a simple toy problem with continuous states and actions. 
You can pass optional arguments to `make`, depending on the environment. The most important that usually all environments support are:
- `render_mode`, to either render the environment to visualize it (`render_mode="human"`) or to learn from pixels (`render_mode="rgb_array"`).
- `max_episode_steps`, to set the maximum number of steps per episodes. More on this later.

Before doing anything else, we must always call `env.reset()`. This resets the state of the simulator, making it ready to run a new episode. The `seed` argument is optional, but it ensures reproducibility of your experiment as the environment may have some stochasticity (e.g., the initial state is usually random).  
Setting the seed with `reset(seed)` sets every random generator of the environment, most notably `env.np_random`, `env.observation_space.np_random`, and `env.action_space.np_random`.  
**Note!** You either want to set the seed at every reset or only at the first reset. In the former case, you must be sure to set it always to a different seed, or you'll keep running the same episode (e.g., the initial state will always be the same). More on this later.

Two of the most important attributes of the environment are `env.observation_space` and `env.action_space`. 
- If a space is continuous, it will be an instance of `gymnasium.spaces.Box` and it will have the attributes `.low` and `.high` to denote its upper/lower bounds.
- If a space is discrete, it will be an instance of `gymnasium.spaces.Discrete` and will the have the attribute `.n` to denote the finite number of states/actions in the space.

Spaces also have the `sample()` function to draw a random state/action according to the random uniform distribution. 

For example, `Pendulum-v1` has continuous state and action spaces, while `Acrobot-v1` has discrete actions.

In [13]:
import gymnasium

env = gymnasium.make("Pendulum-v1")
print("Pendulum")
print(env.observation_space.low, env.observation_space.high)
print(env.action_space.low, env.action_space.high)
print(env.action_space.sample())

print()

print("Acrobot")
env = gymnasium.make("Acrobot-v1")
print(env.observation_space.low, env.observation_space.high)
print(env.action_space.n)
print(env.action_space.sample())

Pendulum
[-1. -1. -8.] [1. 1. 8.]
[-2.] [2.]
[-0.24875921]

Acrobot
[ -1.        -1.        -1.        -1.       -12.566371 -28.274334] [ 1.        1.        1.        1.       12.566371 28.274334]
3
1


**Note!** See that continuous actions/states are **always** `np.arrays`, i.e., they have shape `(n,)` where `n` is the dimensionality of the state/action.  
On the contrary, discrete states/actions are **always** integers, i.e., they have no shape.

Let's continue with the most important environment method: `step(action)`. This performs an agent-environment interaction and changes the internal state of the simulator. In turns, it returns:
- `s_next`, the next state,
- `r`, the reward,
- `terminated`, `True` if the agent has reached a terminal (absorbing) state,
- `truncated`, `True` if the agent has performed `max_episode_steps` in the environment.

`info` is a dictionary (also returned by `reset()`) with any additional information that could be useful to the agent, to log important metrics, or just to debug. Usually it is empty, but **it is always returned**.

**Note!** The state returned by the simulator is actually an **observation** of the state, i.e., it may not have all information to solve the task, or it may have redundant information (e.g., pixel observations instead of just coordinates). This is why, often, you'll find `s` and `s_next` named `obs` and `obs_next`, respectively.

**Note!** The flag `terminated` is extremely important when we use TD methods, because it tells us when we have to stop bootstrapping. The flag `truncated`, instead, is used to know when to reset the environment and to distringuish episodes (e.g., if we use Monte Carlo methods).  
For example, if `terminated == True`, the TD error of the transition is just $V(s) - r$ rather than $V(s) - (r + \gamma V(s_\text{next}))$. And we do not care about `truncated == True` in this case!

You now know the basics to simulate environment interactions. Here is a simple loop that runs a random-policy agent for a few episodes and collects data in lists. At the end, we stack data into `np.array`.

To ensure reproducibility, we set the episode seed based on a unique hashing function depending on two integers:
- A fixed seed that usually denotes the experiment seed. For example, if we run this code 10 times, we'd set `seed = 1, 2, ..., 10`.
- The episode counter.

Below, we use [Cantor pairing](https://en.wikipedia.org/wiki/Pairing_function), but any unique hashing function is fine.

In [18]:
import numpy as np
import gymnasium

def cantor_pairing(x, y):
    return int(0.5 * (x + y) * (x + y + 1) + y)
    
env = gymnasium.make("Pendulum-v1")
n_episodes = 10
seed = 1

data = dict()
data["s"] = []
data["a"] = []
data["r"] = []
data["s_next"] = []
data["terminated"] = []
data["truncated"] = []

for ep in range(n_episodes):
    episode_seed = cantor_pairing(ep, seed)
    s, _ = env.reset(seed=episode_seed)
    done = False

    while not done:
        a = env.action_space.sample()
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated

        data["s"].append(s)
        data["a"].append(s)
        data["r"].append(r)
        data["s_next"].append(s_next)
        data["terminated"].append(terminated)
        data["truncated"].append(truncated)

        s = s_next

s = np.vstack(data["s"])
print(env.observation_space.shape, env._max_episode_steps, s.shape)  # 10 episodes of 200 steps each

(3,) 200 (2000, 3)


**Note!** We must update the current state with `s = s_next` to tell the agent that the environment has changed. But we must do it **after** we store `s`!

This loop can be used to collect data based on **episodes**, i.e., it's good for Monte Carlo methods. 
It is inefficient, though, because we use lists. This cannot really be fixed if we do not know the number of steps beforehand (`Pendulum-v1` is an infinite-horizon MDP with 200 steps limit, but many other MDPs are different). 

If we collect data based on **steps**, however, we can pre-allocate memory with `np.array`. The idea is to initialize zeros-array and fill them in as we collect data, using an index/counter to keep track of how many samples we collected.

In [19]:
import numpy as np
import gymnasium

def cantor_pairing(x, y):
    return int(0.5 * (x + y) * (x + y + 1) + y)
    
env = gymnasium.make("Pendulum-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
n_steps = 1876
seed = 1

data = dict()
data["s"] = np.zeros((n_steps, state_dim))
data["a"] = np.zeros((n_steps, action_dim))
data["r"] = np.zeros((n_steps,))
data["s_next"] = np.zeros((n_steps, state_dim))
data["terminated"] = np.zeros((n_steps,))
data["truncated"] = np.zeros((n_steps,))

idx_data = 0
for ep in range(n_episodes):
    episode_seed = cantor_pairing(ep, seed)
    s, _ = env.reset(seed=episode_seed)
    done = False

    while not done and idx_data < n_steps:
        a = env.action_space.sample()
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated

        data["s"][idx_data] = s
        data["a"][idx_data] = a
        data["r"][idx_data] = r
        data["s_next"][idx_data] = s_next
        data["terminated"][idx_data] = terminated
        data["truncated"][idx_data] = truncated

        idx_data += 1
        s = s_next

    if idx_data == n_steps:
        break

print(env.observation_space.shape, env._max_episode_steps, data["s"].shape)

(3,) 200 (1876, 3)


**Note!** If spaces are discrete, we don't need to check their dimensionality, and we can just initialize the data as follows. 

In [20]:
import numpy as np
import gymnasium

def cantor_pairing(x, y):
    return int(0.5 * (x + y) * (x + y + 1) + y)
    
env = gymnasium.make("Acrobot-v1")
state_dim = env.observation_space.shape[0]
n_steps = 1876
seed = 1

data = dict()
data["s"] = np.zeros((n_steps, state_dim))
data["a"] = np.zeros((n_steps,))  # THIS IS JUST A 1D ARRAY!
data["r"] = np.zeros((n_steps,))
data["s_next"] = np.zeros((n_steps, state_dim))
data["terminated"] = np.zeros((n_steps,))
data["truncated"] = np.zeros((n_steps,))

idx_data = 0
for ep in range(n_episodes):
    episode_seed = cantor_pairing(ep, seed)
    s, _ = env.reset(seed=episode_seed)
    done = False

    while not done and idx_data < n_steps:
        a = env.action_space.sample()
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated

        data["s"][idx_data] = s
        data["a"][idx_data] = a
        data["r"][idx_data] = r
        data["s_next"][idx_data] = s_next
        data["terminated"][idx_data] = terminated
        data["truncated"][idx_data] = truncated

        idx_data += 1
        s = s_next

    if idx_data == n_steps:
        break

print(env.observation_space.shape, env._max_episode_steps, data["s"].shape)

(6,) 500 (1876, 6)


This covers the basic of Gymnasium and data collection for RL.  
You can have separate functions for data collection, or have it interleaved with the training of the agent. An example of the latter would be the code below.

In [25]:
def generic_training(**kwargs):
    max_steps = 10000  # max training steps
    n_samples = 100  # number of samples to collect before an update
    episode = 0  # use it to keep track of the episode and set the seed
    tot_steps = 0  # to know when to stop training
    idx_data = 0  # to know where to store data
    seed = 1  # seed of the experiment

    data = dict()
    data["s"] = np.zeros((n_samples, state_dim))
    data["a"] = np.zeros((n_samples, action_dim))
    data["r"] = np.zeros((n_samples))
    data["s_next"] = np.zeros((n_samples, state_dim))
    data["terminated"] = np.zeros((n_samples))
    data["truncated"] = np.zeros((n_samples))

    while tot_steps < max_steps:
        episode_seed = cantor_pairing(episode, seed)
        s, _ = env.reset(seed=episode_seed)
        done = False
        episode += 1

        while not done and tot_steps < max_steps:
            a = env.action_space.sample()  # have your policy here
            s_next, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated

            # store data
            data["s"][idx_data] = s
            data["a"][idx_data] = a
            data["r"][idx_data] = r
            data["s_next"][idx_data] = s_next
            data["terminated"][idx_data] = terminated
            data["truncated"][idx_data] = truncated
            idx_data += 1

            if idx_data == n_samples:
                idx_data = 0  # reset index, so next time you will overwrite old data
                pass # do your update

            if tot_steps % log_frequency == 0:
                pass # log whatever you want and/or print info

            s = s_next
            tot_steps += 1
            pass # do whatever else is needed, eg, decay exploration