# AI4R Workshop 3: Gymnasium and Reinforcement Learning Training

## Activity 1: Setting up the Environment

### Dependencies
- We will be using `gymnasium` framework as an interface to represent general RL Problems. Dependencies for installation: `swig`
- The example for demonstration is the `Lunar Lander` which requires additional dependencies (`Box2D`)
- Deep Learning Training Library `stable_baselines3`

### Next Steps
- [ ] Press `Connect` in the top-right corner of the notebook to connect to a Python Kernel (running on a remote server hosted by Google).
- [ ] To run each cell, place the cursor on the cell and press: `shift` + `enter/return`. An easier option is to just
- Cells should be marked 'square with a spinning circle' while running, which will go away once the cell execution is complete, and cell outputs will be displayed below (if applicable)
- It might take some time for the dependency installation, please be patient and let it continue.

In [None]:
!pip install swig

In [None]:
!pip install gymnasium[box2D]

In [None]:
!pip install stable_baselines3

### Library Imports

Let's import the necessary libraries for our activity

In [None]:
import gymnasium as gym
from stable_baselines3 import PPO, DDPG, SAC, TD3

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

from IPython.display import HTML

Supress warnings:

In [None]:
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning, module="ipykernel.ipkernel")

## Activity 2: Getting Familiar with Gymnasium Environments

> Gymnasium is an API standard for Reinforcement Learning (RL) with an interface that is simple, pythonic, and capable of representing general RL problems.

Key functions:
- `make()`: Create the environment
- `reset()`: Reset the environment to the 'initial state'
- `step()`: Compute the transition kernel, as the environment takes a step with a current action
- `render()`: Render the environment for visualisation

Key details:
- `observation_space` : defines the observation space
- `action_space`: defines the action space

### Let's define our Gymnasium Environment [Lunar Lander](https://gymnasium.farama.org/environments/box2d/lunar_lander/)

the `make()` function creates the `LunarLander-v2` environment (built-in environment) in `continuous` mode. Other arguments set details such as `gravity`, `wind`, `wind_power`, `turbulence`, `render_mode`, etc.  

Other Details:
> If `enable_wind=True` is passed, there will be wind effects applied to the lander. The wind is generated using the function `tanh(sin(2 k (t+C)) + sin(pi k (t+C)))`. k is set to 0.01. C is sampled randomly between -9999 and 9999.

> `wind_power` dictates the maximum magnitude of linear wind applied to the craft. The recommended value for `wind_power` is between 0.0 and 20.0. `turbulence_power` dictates the maximum magnitude of rotational wind applied to the craft. The recommended value for `turbulence_power` is between 0.0 and 2.0.

In [None]:
env = gym.make(
    "LunarLander-v2",
    continuous=True,
    gravity=-10.0,
    enable_wind=False,
    wind_power=15.0,
    turbulence_power=1.5,
    render_mode = "rgb_array"
)

### Observations:
The state is an 8-dimensional vector: the coordinates of the lander in `x` & `y`, its linear velocities in `x` & `y`, its `angle`, its `angular velocity`, and `two booleans` that represent whether each leg is in contact with the ground or not. Let's take a look at the observation space for the Lunar Lander environment:

In [None]:
# Print the observation space
print("Observation Space:", env.observation_space)
print("Shape:", env.observation_space.shape)
print("Sample Observation:", env.observation_space.sample())

### Initial State

Let's **`reset()`** the environment to it's initial state: (`obs` is the observation corresponding to the initial state)

In [None]:
obs, info = env.reset()
print(obs)

### Render the environment at Initial State:

In [None]:
def plot_current_frame(env):
    frame = env.render()    # calls the `render()` function to render the frame
    plt.imshow(frame)
    plt.axis('off')
    plt.show()

plot_current_frame(env)

### Action Space

- If `continuous=True` is passed, continuous actions (corresponding to the throttle of the engines) will be used and the action space will be `Box(-1, +1, (2,), dtype=np.float32)`.
- The first coordinate of an action determines the throttle of the main engine, while the second coordinate specifies the throttle of the lateral boosters.
    - Given an action `np.array([main, lateral])`
        - the main engine will be turned off completely if main < 0
        - the throttle scales affinely from 50% to 100% for `0 <= main <= 1` (in particular, the main engine doesn’t work with less than 50% power).
    - Similarly
        - if `-0.5 < lateral < 0.5`, the lateral boosters will not fire at all.
        - If lateral < -0.5, the left booster will fire, and
        - if lateral > 0.5, the right booster will fire.
    - Again, the throttle scales affinely from 50% to 100% between -1 and -0.5 (and 0.5 and 1, respectively).

In [None]:
# Print the action space
print("Action Space:", env.action_space)
print("Shape:", env.action_space.shape)
print("Low:", env.action_space.low)
print("High:", env.action_space.high)
print("Sample Action:", env.action_space.sample())

### Reward Function / `Step()`:

After every step a reward is granted. The total reward of an episode is the sum of the rewards for all the steps within that episode.

For each step, the reward:

- is increased/decreased the closer/further the lander is to the landing pad.
- is increased/decreased the slower/faster the lander is moving.
- is decreased the more the lander is tilted (angle not horizontal).
- is increased by 10 points for each leg that is in contact with the ground.
- is decreased by 0.03 points each frame a side engine is firing.
- is decreased by 0.3 points each frame the main engine is firing.
- The episode receive an additional reward of -100 or +100 points for crashing or landing safely respectively.

An episode is considered a solution if it scores at least 200 points.

If you are interested you can take a look at how the rewards of the Lunar Lander are calculated inside the [Lunar Lander Class](https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/box2d/lunar_lander.py#L638-L662)

Let's sample a random action from the action space and take a `step()` within the environment:

In [None]:
# Get a random action from the action space:
action = env.action_space.sample()
print("Random action: ", action)

# Take a step with the random action:
obs, reward, done, terminate, info = env.step(action)

print("Observation:", obs)
print("Reward:", reward)
print("Done:", done)
print("Terminate:", terminate)
print("Info:", info)

Let's render the environment after the Random Action:

In [None]:
plot_current_frame(env)

Let's create a function that takes a random step:

In [None]:
def take_random_step(env):
    action = env.action_space.sample()
    print("Action: {}".format(action))
    obs, reward, done, truncate, info = env.step(action)

In [None]:
take_random_step(env)
plot_current_frame(env)

## Activity 3: Training a Vanilla RL Policy

There are [different policies](https://stable-baselines3.readthedocs.io/en/master/guide/algos.html) to choose from.

### RL Algorithms

This table displays the RL algorithms that are implemented in the Stable Baselines3 project, along with some useful characteristics: support for discrete/continuous actions, multiprocessing.

| Name           | Box | Discrete | MultiDiscrete | MultiBinary | Multi Processing |
|----------------|-----|----------|---------------|-------------|------------------|
| ARS            | ✔️   | ✔️        | ❌             | ❌           | ✔️                |
| A2C            | ✔️   | ✔️        | ✔️             | ✔️           | ✔️                |
| DDPG           | ✔️   | ❌        | ❌             | ❌           | ✔️                |
| DQN            | ❌   | ✔️        | ❌             | ❌           | ✔️                |
| HER            | ✔️   | ❌        | ❌             | ❌           | ✔️                |
| PPO            | ✔️   | ✔️        | ✔️             | ✔️           | ✔️                |
| QR-DQN         | ❌   | ✔️        | ❌             | ❌           | ✔️                |
| RecurrentPPO   | ✔️   | ✔️        | ✔️             | ✔️           | ✔️                |
| SAC            | ✔️   | ❌        | ❌             | ❌           | ✔️                |
| TD3            | ✔️   | ❌        | ❌             | ❌           | ✔️                |
| TQC            | ✔️   | ❌        | ❌             | ❌           | ✔️                |
| TRPO           | ✔️   | ✔️        | ✔️             | ✔️           | ✔️                |
| Maskable PPO   | ✔️   | ✔️        | ✔️             | ✔️           | ✔️                |

Since our action space is `continuous` and in `Box` mode, our choice of algorithms can be found from this table above. A2C, DDPG, PPO, SAV, TD3, etc. are some algorithms that we can try.

### Define the Policy

Let's try training a simple policy-based approach to begin with (PPO). First, we need to define the policy:

In [None]:
model = PPO("MlpPolicy", env, verbose = 1, learning_rate = 0.0003, n_steps = 2048)

This line of code initialises a Proximal Policy Optimisation (PPO) reinforcement learning model using the `MlpPolicy` which is using an MLP (Multi-Layer Perceptron), which is a neural network as the policy network architecture.

`verbose = 1` controls the level of verbosity (output information)
`learning_rate` sets the learning rate of the network during training.
`n_steps = 2048` defines the number of steps the agent will take in the environment before updating the policy. This is also known as the number of steps per epoch.

### Training the Policy

Now, let's train the policy for a total of 100,000 timesteps. With an `n_steps = 2048` that would be at least 49 iterations. Observe that the mean episode length and reward increase with training.

In [None]:
model.learn(total_timesteps = 100000)

### Loading/Saving the policy

Saving the trained model

In [None]:
model.save("ppo_lunarlander")

Loading saved model

In [None]:
model = PPO.load("ppo_lunarlander", env = env)

### Evaluating the Policy

Let's evaluate the model and observe the rewards for a single episode:

In [None]:
# Reset the environment to it's initial state
obs, info = env.reset()

# max_steps to run simulation for
max_steps = 1000
rewards = []

for i in range(max_steps):
    action, _ = model.predict(obs, deterministic = True)
    obs, reward, terminate, truncate, info = env.step(action)
    rewards.append(reward)
    if terminate or truncate:
        break

print(f"Total Reward:{sum(rewards):.2f}")
print("Total Number of steps: ", i)
print("Terminated:", terminate)

Let's wrap this code into a function for future use:

In [None]:
def evaluate_episode(env, model, max_steps=1000):
    obs, info = env.reset()
    rewards = []

    for i in range(max_steps):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminate, truncate, info = env.step(action)
        rewards.append(reward)
        if terminate or truncate:
            break

    total_reward = sum(rewards)
    terminated = terminate

    print(f"Total Reward: {total_reward:.2f}")
    print("Total Number of steps:", i)
    print("Terminated:", terminated)

    return total_reward, i, terminated

Let's plot the rewards over timesteps:

In [None]:
# Plotting rewards over timesteps
plt.figure(figsize=(10, 6))
plt.plot(rewards, marker='', linestyle='-', color='b')
plt.title('Rewards Over Timesteps')
plt.xlabel('Timestep')
plt.ylabel('Reward')
plt.grid(True)
plt.show()

### Visualising the Episode

Function to Simulate and Render Episode Frames

In [None]:
# Function to Simulate and Render Episode Frames
def simulate_and_render(env, model, max_steps = 1000):
    # Reset environment to initial state
    obs, info = env.reset()
    frames = []
    terminate = truncate = False
    for _ in range(max_steps):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminate, truncate, info = env.step(action)
        frame = env.render()
        frames.append(frame)
        if terminate or truncate:
            break
    return frames

Function to Animate Frames to Display in Notebook

In [None]:
# Function to Animate Frames to Display in Notebook
def animate_episode_frames(frames, interval=50):
    fig, ax = plt.subplots()
    patch = ax.imshow(frames[0])
    plt.axis('off')
    plt.close()

    def animate(i):
        patch.set_data(frames[i])

    ani = animation.FuncAnimation(fig, animate, frames=len(frames), interval=interval)

    return HTML(ani.to_jshtml())

Putting it together:

In [None]:
frames = simulate_and_render(env, model)
animate_episode_frames(frames)

*The cell above might be responsible for increasing the notebook size. If that becomes an issue, you can just clear the output of the cell*

## Activity 4: Customising the Gym Environment (Observations, Actions, Rewards)

### Custom Observations

Gym supports the use of `ObservationWrapper` to modify the observation space and create Custom Observations.

Observation wrappers are useful if you want to apply some function to the observations that are returned by an environment. If you implement an observation wrapper, you only need to define this transformation by implementing the `gymnasium.ObservationWrapper.observation()` method. Moreover, you should remember to update the observation space, if the transformation changes the shape of observations (e.g. by transforming dictionaries into numpy arrays, as in the following example).

The following values are available to us -
- observation scale factors
- lander mass
- value of g

Let's create a custom observation to demonstrate this. The custom observation we choose to add is the Potential Energy of the Lander.

$Potential Energy = m * g * h$

In [None]:
scale_factors = np.array([10, 6.666, 5, 7.5, 1, 2.5, 1, 1], dtype=np.float32)
lander_mass = 4.82 # kg
g = 10.0 # m/s^2

Defining the custom observation wrapper

In [None]:
class CustomObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super(CustomObservationWrapper, self).__init__(env)
        # Get the old observation space
        old_space = env.observation_space

        # Define new observation space bounds
        low = np.concatenate([old_space.low, [0]])
        high = np.concatenate([old_space.high, [np.inf]])
        # Define new observation space
        self.observation_space = gym.spaces.Box(
            low=low, high=high, dtype=np.float32
        )

    def observation(self, obs):
        # Extract and scale original observations
        x, y, vx, vy, theta, omega, leg1, leg2 = obs * scale_factors

        # Calculate potential energy
        PE = lander_mass * g * y

        return np.append(obs, PE)

How to wrap the environment with the custom observation wrapper:

In [None]:
# env = CustomObservationWrapper(env)

#### Exercise

Modify the `CustomObservationWrapper` to include two additional observations:

- Kinetic Energy Along x-axis
- Kinetic Energy Along y-axis

*Hints:*
- The velocity along x and y axis is available in the original observations
- $Kinetic Energy = 1/2 * mass * (velocity)^2$

In [None]:
class CustomObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env):
        super(CustomObservationWrapper, self).__init__(env)
        # Get the old observation space
        old_space = env.observation_space

        # Define new observation space bounds
        low = np.concatenate([old_space.low, [0]])          # Modify HERE
        high = np.concatenate([old_space.high, [np.inf]])   # Modify HERE
        # Define new observation space
        self.observation_space = gym.spaces.Box(
            low=low, high=high, dtype=np.float32
        )

    def observation(self, obs):
        # Extract and scale original observations
        x, y, vx, vy, theta, omega, leg1, leg2 = obs * scale_factors

        # Calculate potential energy
        PE = lander_mass * g * y

        # Calculate Kinetic Energies HERE

        return np.append(obs, PE)

In [None]:
# env = CustomObservationWrapper(env)

### Custom Actions

*Do you remember the action space?*

### Action Space

- If `continuous=True` is passed, continuous actions (corresponding to the throttle of the engines) will be used and the action space will be `Box(-1, +1, (2,), dtype=np.float32)`.
- The first coordinate of an action determines the throttle of the main engine, while the second coordinate specifies the throttle of the lateral boosters.
    - Given an action `np.array([main, lateral])`
        - the main engine will be turned off completely if main < 0
        - the throttle scales affinely from 50% to 100% for `0 <= main <= 1` (in particular, the main engine doesn’t work with less than 50% power).
    - Similarly
        - if `-0.5 < lateral < 0.5`, the lateral boosters will not fire at all.
        - If lateral < -0.5, the left booster will fire, and
        - if lateral > 0.5, the right booster will fire.
    - Again, the throttle scales affinely from 50% to 100% between -1 and -0.5 (and 0.5 and 1, respectively).

**Let's Simplify The Action Space!**

- Define 3 Action variables corresponding to the 3 boosters - values will range from 0 to 1
- Modified throttle control:
    - throttle 0 means idle
    - throttle > 0 will start from 50% with throttle 1 will equal 100%

In [None]:
class CustomActionWrapper(gym.ActionWrapper):
    def __init__(self, env):
        super().__init__(env)
        # Define new action space with 3 actions ranging from 0 to 1
        self.action_space = gym.spaces.Box(
            low = 0, high = 1, shape = (3,), dtype = np.float32
        )

    def action(self, action):
        # Extract the three acitons
        main, left, right = action

        # Calculate the main engine throttle
        main_engine = -1 if main == 0 else (main/2 + 0.5)

        # Calculate the lateral engine throttles
        left_engine = (-1)*left/2 + 0.5
        right_engine = right/2 + 0.5

        lateral = left_engine + right_engine
        return np.array([main_engine, lateral])


#### Exercise:

Some issues with this action space:

- What to do if both left/right thrusters have positive throttle? -> Fire the one with higher value
- What if they have the same throttle? -> Fire None
- If only 0 means idle then the throttle might be active most of the time for the rockets, let's set a lower cutoff like 0.25 instead to discourage using of thrusters.

This relationship can be expressed by the following equation: $ y = 2/3 (x-1) + 1$

Let's fix the action wrapper accordingly!

In [None]:
class CustomActionWrapper(gym.ActionWrapper):
    def __init__(self, env):
        super().__init__(env)
        # Define new action space with 3 actions ranging from 0 to 1
        self.action_space = gym.spaces.Box(
            low = 0, high = 1, shape = (3,), dtype = np.float32
        )

    def action(self, action):
        # Extract the three acitons
        main, left, right = action

        # Calculate the main engine throttle
        main_engine = -1 if main == 0 else (main/2 + 0.5)

        # Make changes around HERE
        # Calculate the lateral engine throttles
        left_engine = (-1)*left/2 + 0.5
        right_engine = right/2 + 0.5
        lateral = left_engine + right_engine

        return np.array([main_engine, lateral])

### Custom Rewards

Reward wrappers are used to transform the reward that is returned by an environment. As for the previous wrappers, you need to specify that transformation by implementing the `gymnasium.RewardWrapper.reward()` method. Also, you might want to update the reward range of the wrapper.

The following is an example of how to use the `RewardWrapper` class for custom rewards:

In [None]:
class CustomReward(gym.RewardWrapper):
    def __init__(self, env, min_reward, max_reward):
        super().__init__(env)

    def reward(self, reward):
        # Make modifications to the reward function here
        return reward

Let's try something different now and use the `gym.Wrapper` class to reproduce the existing rewards so that we can modify them:

In [None]:
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.prev_shaping = None

    def step(self, action):
        state, reward, terminate, truncate, info = self.env.step(action)  # take a step using the original environment

        x, y, vx, vy, theta, omega, leg1, leg2 = \
            state[0], state[1], state[2], state[3], state[4], state[5], state[6], state[7]

        shaping = (
            -100 * np.sqrt(x*x + y*y)
            - 100 * np.sqrt(vx*vx + vy*vy)
            - 100 * abs(theta)
            + 10 * leg1
            + 10 * leg2
        )

        if self.prev_shaping is not None:
            reward = shaping - self.prev_shaping
        self.prev_shaping = shaping

        # calculate engine powers from action
        m_power = (np.clip(action[0], 0.0, 1.0) + 1.0) * 0.5
        direction = np.sign(action[1])
        s_power = np.clip(np.abs(action[1]), 0.5, 1.0)

        # Subtract power costs from the reward
        reward -= (m_power * 0.30)  # Main engine power penalty
        reward -= (s_power * 0.03)  # Side engine power penalty

        # Check for termination conditions
        if self.unwrapped.game_over or abs(state[0]) >= 1.0:
            reward = -100
        if not self.unwrapped.lander.awake:
            reward = +100

        return state, reward, terminate, truncate, info

*We can also achieve the same using the `gym.RewardWrapper`. Showing `gym.Wrapper` class as an example*

#### Exercise

Make the following modifications to the reward function -

- [ ] Change the Reward shaping: Introduce a Higher Penalty for deviating from the center
- [ ] Add another penalty to avoid having high values of Angular Velocity

Think of at least one other Custom Reward and implement it into the Custom Reward Wrapper.

### Putting Everything Together

Now that we have multiple wrappers to our environment, let's put them together. We can just simply wrap each environment on top of each other like this:

In [None]:
env = gym.make(
    "LunarLander-v2",
    continuous=True,
    gravity=-10.0,
    enable_wind=False,
    wind_power=15.0,
    turbulence_power=1.5,
    render_mode = "rgb_array"
)

env = CustomObservationWrapper(env)
env = CustomActionWrapper(env)
env = CustomRewardWrapper(env)

Let's quickly check our new wrapped environment now to make sure everything is in order:

In [None]:
print("Observation Space: ", env.observation_space)
print("Action Space: ", env.action_space)

# Reset environment
obs, info = env.reset()

# Take a random action and get the reward
random_action = env.action_space.sample()
obs, reward, terminate, truncate, info = env.step(random_action)
print("Reward: ", reward)

### Different Policies

We can easily swap out policies in a plug-and-play manner with the `stable_baselines3` RL Library. Following are a few examples. Feel free to try them out:

In [None]:
#model = DDPG("MlpPolicy", env, verbose=1)
#model = SAC("MlpPolicy", env, verbose=1)
#model = TD3("MlpPolicy", env, verbose=1)
model = PPO("MlpPolicy", env, verbose=1)

Train & Save Model

In [None]:
model.learn(total_timesteps=100000)
model.save("lunarlander_policy_custom")

Evaluate:

In [None]:
evaluate_episode(env, model)

In [None]:
frames = simulate_and_render(env, model)
animate_episode_frames(frames)

#### Exercise

- Swap out the existing policy for a different policy than PPO. Train your environment for `100000` steps again and evaluate/see the results.

Try on your own time (might take a while to run):
- Make modifications to the environment
- For each modification, train for a small number of iterations (100,000) to observe that it doesn't negatively impact the policy behavior
- Train your choice of policy for 1 Million timesteps and observe the results (don't forget to save and download your policy for future use!)

## Activity 5: Interdiscilinery Reflection