<a href="https://colab.research.google.com/github/Jbaruz/gymnasium/blob/main/TFAgents_lunar_landler_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TF Agents Lunar Lander Project

This project demonstrates the use of TF Agents to train an agent to land a lunar lander in the OpenAI Gym environment.

## Install Necessary Packages

First, we need to install the necessary packages:

- `swig`: A tool for creating scripting language interfaces to C/C++ code.
- `gymnasium[box2d]`: Gymnasium library with Box2D environments, including the Lunar Lander environment.
- `tf-agents[reverb]`: TensorFlow Agents library for reinforcement learning.
- `pyvirtualdisplay`: A Python wrapper for Xvfb, a display server.
- `xvfb`: X virtual framebuffer, a display server that enables headless display.

Run the following code to install the packages:


In [4]:
!apt-get install -y swig
!pip install gymnasium[box2d]
!pip install tf-agents[reverb]
!pip install pyvirtualdisplay
!apt-get install -y xvfb


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.10).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


## Setup Virtual Display

To render the Lunar Lander environment without a physical display, we use `pyvirtualdisplay`. This is particularly useful when running the code in a headless environment like Google Colab.

Run the following code to set up the virtual display:


In [9]:
import pyvirtualdisplay
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900))
display.start()


<pyvirtualdisplay.display.Display at 0x7b36c40ee680>

## Setup and Render the Lunar Lander Environment

Next, we'll set up the Lunar Lander environment using Gymnasium and render it to ensure everything is working correctly. We'll use `pyvirtualdisplay` to handle the rendering in Google Colab.

Run the following code to set up and render the Lunar Lander environment:


In [10]:
import gymnasium as gym
env = gym.make("LunarLander-v2", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.close()

## Import Necessary Libraries

To start, we need to import all the necessary libraries for our TF Agents Lunar Lander project. These include libraries for rendering, image processing, display management, and reinforcement learning.

Run the following code to import the required libraries:


In [11]:
# Import necessary libraries
import base64
import imageio
import IPython
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image
import pyvirtualdisplay
import tensorflow as tf
import reverb


## Set Up Virtual Display and Import TF-Agents Libraries

In this section, we set up a virtual display for rendering OpenAI gym environments. This is useful for running the code in a headless environment like Google Colab. We also import necessary TF-Agents libraries for building and training our DQN agent.

Run the following code to set up the virtual display and import TF-Agents libraries:


In [12]:
from tf_agents.agents.dqn.dqn_agent import DqnAgent
from tf_agents.networks import q_network
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.utils import common
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.drivers import dynamic_step_driver
from tf_agents.metrics import tf_metrics
from tf_agents.policies import random_tf_policy
from tf_agents.networks.q_network import QNetwork

# Set up virtual display for rendering OpenAI gym environments
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

## Verify the LunarLander Environment

Next, we verify the LunarLander environment to ensure it is set up correctly. We'll create an instance of the LunarLander environment, seed the action and observation spaces for reproducibility, and run a simple loop to render the environment.

Run the following code to verify and render the LunarLander environment:


In [13]:
import gymnasium as gym

# Verify the LunarLander environment
env = gym.make("LunarLander-v2", render_mode="human")
env.action_space.seed(123)
env.observation_space.seed(123)
observation, info = env.reset()

done = False
while not done:
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)
    env.render()

    if terminated or truncated:
        done = True
        observation, info = env.reset()

env.close()
print("LunarLander-v2 environment is available.")


LunarLander-v2 environment is available.


## Initialize the DQN Agent

In this section, we initialize the Deep Q-Network (DQN) agent for the LunarLander environment. This process involves the following steps:

1. **Set up the TF-Agents environment**:
   We convert the OpenAI Gym environment (`LunarLander-v2`) into a TF-Agents environment using `tf_py_environment.TFPyEnvironment`.

2. **Define the Q-Network architecture**:
   We create a Q-Network with two fully connected layers of 512 and 256 units respectively. This network will be used by the DQN agent to estimate the Q-values for each action.

3. **Configure the optimizer and training step counter**:
   We use the Adam optimizer with a learning rate of `0.001` and set up a variable to keep track of the training steps.

4. **Set up the epsilon-greedy policy**:
   We use an epsilon-greedy policy with epsilon decreasing from `0.07` to `0.01` over `25000 // 4` steps. This policy helps in balancing exploration and exploitation during training.

5. **Initialize the DQN agent**:
   We configure the DQN agent with the environment specifications, Q-Network, optimizer, and other parameters, then initialize the agent.

You can modify the following parameters to change the behavior and performance of the agent:
- `fc_layer_params`: Adjust the size and number of layers in the Q-Network.
- `learning_rate`: Change the learning rate of the Adam optimizer.
- `initial_learning_rate` and `end_learning_rate`: Modify the starting and ending epsilon values for the epsilon-greedy policy.
- `gamma`: Change the discount factor for future rewards.

Run the following code to initialize your DQN agent:


In [14]:
# Initialize your DQN agent (assuming you have set up your environment and network)
tf_env = tf_py_environment.TFPyEnvironment(suite_gym.load('LunarLander-v2'))

fc_layer_params = [512, 256]

q_net = QNetwork(
    tf_env.observation_spec(),
    tf_env.action_spec(),
    fc_layer_params=fc_layer_params,
    activation_fn=tf.keras.activations.softmax,
)

optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=1e-3)
train_step_counter = tf.Variable(0)
epsilon_fn = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate=0.07,
    decay_steps=25000 // 4,  # update_period = 4
    end_learning_rate=0.01
)

agent = DqnAgent(
    tf_env.time_step_spec(),
    tf_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    gamma=0.95,
    train_step_counter=train_step_counter,
    epsilon_greedy=lambda: epsilon_fn(train_step_counter),
)

agent.initialize()

## Set Up Replay Buffer, Dataset, and Driver

In this section, we set up the replay buffer, dataset, and driver for training the DQN agent. These components are crucial for efficiently storing and sampling experiences during training.

1. **Replay Buffer**:
   The replay buffer stores experiences (state, action, reward, next state) collected by the agent. We use `tf_uniform_replay_buffer.TFUniformReplayBuffer` to create a replay buffer with a maximum length of `100000` and a batch size equal to the environment's batch size.

2. **Dataset**:
   The dataset provides a way to sample batches of experiences from the replay buffer for training. We create a dataset from the replay buffer using `as_dataset` method with the following parameters:
   - `num_parallel_calls`: The number of parallel calls for sampling.
   - `sample_batch_size`: The size of the sample batches.
   - `num_steps`: The number of time steps in each sample.
   The dataset is prefetching 3 batches to improve training efficiency.

3. **Driver**:
   The driver is responsible for collecting experiences from the environment. We use `dynamic_step_driver.DynamicStepDriver` to create a driver that interacts with the environment using the agent's collect policy and stores the collected experiences in the replay buffer. The number of steps to collect per call is set to `1` (this can be adjusted as needed).

You can modify the following parameters to change the behavior and performance of the experience collection and sampling process:
- `max_length`: Change the maximum size of the replay buffer.
- `sample_batch_size`: Adjust the size of the batches sampled from the replay buffer.
- `num_steps`: Change the number of time steps in each sample.
- `num_steps` in `DynamicStepDriver`: Adjust the number of steps collected per call.

Run the following code to set up the replay buffer, dataset, and driver:


In [35]:
# Replay Buffer
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=tf_env.batch_size,
    max_length=100000
)

# Dataset
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3,
    sample_batch_size=64,
    num_steps=2
).prefetch(3)

# Driver
collect_driver = dynamic_step_driver.DynamicStepDriver(
    tf_env,
    agent.collect_policy,
    observers=[replay_buffer.add_batch],
    num_steps=1  # Adjust as needed
)

## Collect Initial Data

Before training the DQN agent, we need to collect some initial data to fill the replay buffer. This data collection process involves the following steps:

1. **Set up a Random Policy**:
   We create a random policy using `random_tf_policy.RandomTFPolicy`, which will be used to collect the initial experiences. The random policy selects actions randomly, providing diverse experiences to start training.

2. **Define the Collect Step Function**:
   The `collect_step` function collects a single step of experience from the environment using the provided policy and stores it in the replay buffer. This function:
   - Gets the current time step from the environment.
   - Uses the policy to select an action.
   - Steps the environment with the selected action to get the next time step.
   - Converts the experience into a trajectory.
   - Adds the trajectory to the replay buffer.

3. **Collect Initial Data**:
   We loop over a specified number of steps (in this case, 1000) to collect initial data using the random policy and store it in the replay buffer. This data provides a starting point for the agent to begin learning.

You can modify the following parameters to change the initial data collection process:
- `range(1000)`: Adjust the number of initial steps collected to fill the replay buffer.

Run the following code to collect the initial data:


In [36]:
from tf_agents.trajectories import trajectory

# Collect initial data
random_policy = random_tf_policy.RandomTFPolicy(tf_env.time_step_spec(), tf_env.action_spec())

def collect_step(environment, policy, buffer):
    time_step = environment.current_time_step()
    action_step = policy.action(time_step)
    next_time_step = environment.step(action_step.action)
    traj = trajectory.from_transition(time_step, action_step, next_time_step)

    # Add trajectory to the replay buffer
    buffer.add_batch(traj)

# Collect initial data to fill the replay buffer
for _ in range(1000):
    collect_step(tf_env, random_policy, replay_buffer)


## Create a Video of the Trained Policy

Once the agent is trained, it's helpful to visualize how well it performs. In this section, we define functions to create a video of the trained policy interacting with the environment.

1. **Function to Create Policy Evaluation Video**:
   The `create_policy_eval_video` function runs the policy for a specified number of episodes, collects frames, and saves them as a video file. This function:
   - Resets the environment at the beginning of each episode.
   - Uses the policy to select actions and steps through the environment.
   - Collects frames rendered by the environment and writes them to a video file.
   
2. **Function to Convert Frames to RGB Arrays**:
   The `convert_to_rgb_array` function ensures that the rendered frames are in the correct format for video creation. It handles different possible formats of the frames and converts them to RGB arrays.
   
3. **Function to Embed Video in Notebook**:
   The `embed_mp4` function embeds the created video in the notebook for easy viewing.

4. **Collect Initial Data and Train the Agent**:
   We collect initial data using a random policy to fill the replay buffer. Then, we define a training loop to train the agent over a specified number of iterations.

5. **Create and Display the Video**:
   Finally, we use the `create_policy_eval_video` function to create a video of the trained policy and display it in the notebook.

You can modify the following parameters to change the behavior of the video creation and training process:
- `num_episodes`: Change the number of episodes to run for creating the evaluation video.
- `fps`: Adjust the frames per second for the video.
- `num_iterations`: Change the number of training iterations.

Run the following code to train the agent and create a video of the trained policy:


In [38]:
import imageio
import base64
import IPython
import numpy as np
from PIL import Image
import tensorflow as tf

def create_policy_eval_video(policy, environment, filename, num_episodes=5, fps=30):
    filename = filename + ".mp4"
    with imageio.get_writer(filename, fps=fps, codec='libx264') as video:
        for _ in range(num_episodes):
            time_step = environment.reset()
            video.append_data(convert_to_rgb_array(environment.render()))
            while not time_step.is_last():
                action_step = policy.action(time_step)
                time_step = environment.step(action_step.action)
                video.append_data(convert_to_rgb_array(environment.render()))
    return embed_mp4(filename)

def convert_to_rgb_array(rendered_frame):
    if isinstance(rendered_frame, tf.Tensor):
        rendered_frame = rendered_frame.numpy()  # Convert tensor to numpy array

    # Ensure rendered_frame is a numpy array
    rendered_frame = np.asarray(rendered_frame)

    # Handle multi-dimensional arrays
    if rendered_frame.ndim == 4 and rendered_frame.shape[0] == 1:
        rendered_frame = np.squeeze(rendered_frame, axis=0)
    if rendered_frame.ndim == 3 and rendered_frame.shape[2] == 1:
        rendered_frame = np.repeat(rendered_frame, 3, axis=2)
    if rendered_frame.ndim == 2:
        # Convert single channel to RGB
        rendered_frame = np.stack((rendered_frame,)*3, axis=-1)
    if rendered_frame.shape[2] == 4:
        rendered_frame = np.array(Image.fromarray(rendered_frame, 'RGBA').convert('RGB'))

    return rendered_frame

def embed_mp4(filename):
    video = open(filename, 'rb').read()
    b64 = base64.b64encode(video)
    tag = '''<video width="640" height="480" controls>
    <source src="data:video/mp4;base64,{0}" type="video/mp4"></video>'''.format(b64.decode())
    return IPython.display.HTML(tag)

# Collect initial data
random_policy = random_tf_policy.RandomTFPolicy(tf_env.time_step_spec(), tf_env.action_spec())

def collect_step(environment, policy, buffer):
    time_step = environment.current_time_step()
    action_step = policy.action(time_step)
    next_time_step = environment.step(action_step.action)
    traj = trajectory.from_transition(time_step, action_step, next_time_step)

    # Add trajectory to the replay buffer
    buffer.add_batch(traj)

# Collect initial data to fill the replay buffer
for _ in range(1000):
    collect_step(tf_env, random_policy, replay_buffer)

# Training Loop
def train_agent(num_iterations):
    iterator = iter(dataset)
    for _ in range(num_iterations):
        # Collect a few steps and save to the replay buffer.
        collect_driver.run()

        # Sample a batch of data from the buffer and update the agent's network.
        trajectories, _ = next(iterator)
        train_loss = agent.train(experience=trajectories)

        # Add additional logging if necessary

# Execute training
train_agent(2000)

# Create a video of the trained policy
create_policy_eval_video(agent.policy, tf_env, "trained-agent")


