# Robotic Arm Control via Deep Reinforcement Learning


This notebook will guide you through:

1.  **Setting up the Environment:** Installing necessary libraries.
2.  **Choosing a Simulator:** Using PyBullet via the `panda-gym` environment, which integrates with Gymnasium (formerly OpenAI Gym).
3.  **Selecting an RL Algorithm:** Using Soft Actor-Critic (SAC) from Stable Baselines3 (SB3), a state-of-the-art algorithm suitable for continuous control tasks like robotics.
4.  **Training the Agent:** Running the training loop.
5.  **Evaluating the Agent:** Visualizing the trained agent's performance.

---

**Google Colab Notebook: Robotic Arm Control with Reinforcement Learning**

---

**1. Introduction**

This notebook demonstrates how to train a Reinforcement Learning (RL) agent to control a simulated robotic arm. We will use the `panda-gym` environment, which simulates a Franka Emika Panda arm using the PyBullet physics engine. The goal will be a basic reaching task, where the arm must move its end-effector to a target position. We will use the Stable Baselines3 library to implement the Soft Actor-Critic (SAC) algorithm.

**Key Components:**

* **Environment:** `panda-gym` (using PyBullet & Gymnasium API)
* **RL Library:** Stable Baselines3 (SB3)
* **Algorithm:** Soft Actor-Critic (SAC)
* **Task:** `PandaReach-v3` (or similar)

---

**2. Setup and Installations**

First, we need to install the required libraries. This includes Gymnasium (the environment API), `panda-gym` (the specific robot environment), PyBullet (the physics simulator), Stable Baselines3 (the RL algorithms), and utilities for rendering in Colab.

In [None]:
# @title Install Dependencies
# Stable Baselines3 for RL algorithms
!pip install stable-baselines3[extra]>=2.0.0a5 --quiet

# Gymnasium for the environment API
!pip install gymnasium --quiet

# PyBullet for physics simulation
!pip install pybullet --quiet

# Panda-Gym for the specific robotic arm environment
# Note: Might require specific versions depending on compatibility. Check the panda-gym repo if issues arise.
!pip install panda-gym --quiet

# For rendering environments in Colab
!pip install pyglet==1.5.27 --quiet # Specific version often needed for compatibility in Colab
!pip install pyvirtualdisplay --quiet
!apt-get update --quiet
!apt-get install -y xvfb python-opengl ffmpeg --quiet

print("âœ… Dependencies installed.")

# Set up a virtual display for rendering
from pyvirtualdisplay import Display
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()
print("âœ… Virtual display started.")

---

**3. Import Libraries**

Now, let's import the necessary Python libraries.

In [None]:
# @title Import Libraries
import gymnasium as gym
import panda_gym
import numpy as np
import stable_baselines3 as sb3
from stable_baselines3 import SAC
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.results_plotter import load_results, ts2xy
from stable_baselines3.common.monitor import Monitor
import matplotlib.pyplot as plt
import os
import base64
import io
from IPython.display import HTML, display

print(f"Gymnasium version: {gym.__version__}")
print(f"Stable Baselines3 version: {sb3.__version__}")
print("âœ… Libraries imported.")

# Helper function to display recorded videos
def show_video(video_path):
    """Helper function to display mp4 video in Colab"""
    mp4 = open(video_path,'rb').read()
    data_url = "data:video/mp4;base64," + base64.b64encode(mp4).decode()
    return HTML("""
    <video width=400 controls>
          <source src="%s" type="video/mp4">
    </video>
    """ % data_url)

---

**4. Environment Setup**

We'll instantiate the robotic arm environment. We'll use `PandaReach-v3`, where the objective is to move the arm's end-effector to a randomly placed target sphere. We wrap it with `Monitor` to log training statistics like rewards.

* **Observation Space:** Typically includes joint positions, velocities, and the target position.
* **Action Space:** Continuous values representing the desired change in joint positions or target end-effector velocity. `panda-gym` usually uses end-effector velocity control.

In [None]:
# @title Create Environment
# Create log dir
log_dir = "/tmp/gym/"
os.makedirs(log_dir, exist_ok=True)

# Environment ID
env_id = 'PandaReach-v3' # Try 'PandaPickAndPlace-v3', 'PandaPush-v3', 'PandaSlide-v3' for more complex tasks later

# Create and wrap the environment
# Use render_mode='rgb_array' for capturing frames
env = gym.make(env_id, render_mode='rgb_array')
env = Monitor(env, log_dir) # Wrap with Monitor to log stats

print(f"âœ… Environment '{env_id}' created.")
print(f"Observation Space: {env.observation_space}")
print(f"Action Space: {env.action_space}")
print(f"Action space sample: {env.action_space.sample()}") # Example random action

# Optional: Test the environment with random actions
# obs, _ = env.reset()
# frame = env.render()
# plt.imshow(frame)
# plt.axis('off')
# plt.show()
# action = env.action_space.sample()
# obs, reward, terminated, truncated, info = env.step(action)
# print(f"Step result: Obs shape: {obs.shape}, Reward: {reward}, Terminated: {terminated}, Truncated: {truncated}")
# env.close() # Close the env if just testing

---

**5. Define RL Agent (SAC)**

We choose the Soft Actor-Critic (SAC) algorithm from Stable Baselines3. SAC is well-suited for continuous control problems like this one. It uses an entropy maximization framework which encourages exploration and generally leads to robust policies.

We use the `MlpPolicy`, which means the policy and value functions will be represented by Multi-Layer Perceptrons (MLPs).

In [None]:
# @title Define the SAC Agent

# Use VecEnv for potential parallelization (even with n_envs=1)
# vec_env = make_vec_env(lambda: Monitor(gym.make(env_id, render_mode='rgb_array'), log_dir), n_envs=1)

# Define the SAC agent
# - 'MlpPolicy': Use standard Multi-Layer Perceptron policy
# - env: The environment instance (or VecEnv)
# - verbose=1: Print training progress
# - learning_rate: How quickly the agent updates its policy (can be tuned)
# - buffer_size: Size of the replay buffer (stores past experiences)
# - batch_size: Number of samples used for each gradient update
# - gamma: Discount factor for future rewards
# - tau: Soft update coefficient for target networks
# - learning_starts: How many steps to collect before starting training
# - tensorboard_log: Directory for TensorBoard logs
model = SAC('MultiInputPolicy',
            env,
            verbose=1,
            learning_rate=1e-4,  # Often needs tuning, default is 3e-4
            buffer_size=100_000, # Smaller buffer for faster iteration initially
            batch_size=256,
            gamma=0.98,        # Discount factor often lower in goal-based envs
            tau=0.01,
            learning_starts=1000, # Start learning after collecting some experience
            tensorboard_log="./sac_panda_tensorboard/",
            seed=42) # for reproducibility

print("âœ… SAC Agent defined.")
print(f"Policy Architecture: {model.policy}")

# Optional: You can load a pre-trained model here if you have one
# model_path = "sac_panda_reach_100k.zip"
# if os.path.exists(model_path):
#    print(f"Loading pre-trained model from {model_path}")
#    model = SAC.load(model_path, env=env)
# else:
#    print("No pre-trained model found, starting training from scratch.")

---

**6. Train the Agent**

Now we train the agent using the `model.learn()` method. We specify the total number of timesteps for training. During training, SB3 will print logs showing the progress (episode reward, episode length, etc.).

In [None]:
# @title Train the Agent
# Define training parameters
TOTAL_TIMESTEPS = 50000 # Start with a moderate number (e.g., 50k-100k). Increase for better performance.
MODEL_SAVE_PATH = f"sac_{env_id.split('-')[0].lower()}_{TOTAL_TIMESTEPS//1000}k"

print(f"ðŸš€ Starting training for {TOTAL_TIMESTEPS} timesteps...")
print(f"TensorBoard logs will be saved in: {model.tensorboard_log}")
print(f"Model will be saved to: {MODEL_SAVE_PATH}.zip")

# Add a callback for plotting reward progress (optional but helpful)
class PlottingCallback(BaseCallback):
    def __init__(self, log_dir, verbose=0):
        super().__init__(verbose)
        self.log_dir = log_dir
        self.save_path = os.path.join(log_dir, 'best_model')
        self.best_mean_reward = -np.inf

    def _on_step(self) -> bool:
        # Log scalar value (here we log the number of steps)
        # More complex logging (like mean reward) is handled by Monitor wrapper + TensorBoard
        # self.logger.record('custom/steps', self.num_timesteps)

        # Check if Monitor has enough data to calculate mean reward
        if self.num_timesteps % 1000 == 0: # Check every 1000 steps
             x, y = ts2xy(load_results(self.log_dir), 'timesteps')
             if len(x) > 0:
                 # Mean reward over last 100 episodes
                 mean_reward = np.mean(y[-100:])
                 if self.verbose > 0:
                     print(f"Num timesteps: {self.num_timesteps}, Mean reward (last 100): {mean_reward:.2f}")
                 # Simple best model saving based on mean reward
                 if mean_reward > self.best_mean_reward:
                     self.best_mean_reward = mean_reward
                     # Example saving logic (optional)
                     # print(f"New best mean reward: {self.best_mean_reward:.2f} - Saving model")
                     # self.model.save(self.save_path)
             else:
                  if self.verbose > 0:
                      print(f"Num timesteps: {self.num_timesteps}, Not enough data for mean reward yet.")

        return True # Continue training

# Create the callback
plot_callback = PlottingCallback(log_dir=log_dir, verbose=1)

# Train the model
# Note: Training robotics tasks can take time! Start with fewer timesteps.
model.learn(total_timesteps=TOTAL_TIMESTEPS,
            log_interval=10, # Log stats every 10 episodes
            tb_log_name=f"SAC_{env_id}",
            callback=plot_callback,
            reset_num_timesteps=False) # Set to False if continuing training

print(f"âœ… Training finished after {TOTAL_TIMESTEPS} timesteps.")

# Save the final model
model.save(MODEL_SAVE_PATH)
print(f"âœ… Model saved to {MODEL_SAVE_PATH}.zip")

# You can launch TensorBoard in Colab (might need a specific cell)
# %load_ext tensorboard
# %tensorboard --logdir ./sac_panda_tensorboard/

---

**7. Evaluate and Visualize the Trained Agent**

After training, let's see how well the agent performs. We'll run the agent in the environment for a few episodes and record a video of its behavior.

In [None]:
# @title Evaluate the Trained Agent and Record Video

# --- Load the trained model (if needed) ---
# If you didn't just train, load the model:
# model_path = f"{MODEL_SAVE_PATH}.zip"
# if os.path.exists(model_path):
#     print(f"Loading model from {model_path}")
#     model = SAC.load(model_path, env=env) # Make sure env is defined
# else:
#     print("Model file not found. Cannot evaluate.")
#     # exit() # Or handle appropriately

# --- Evaluation and Recording ---
print("Evaluating the trained agent...")

# Create a separate evaluation environment if needed (or reuse 'env')
eval_env = gym.make(env_id, render_mode='rgb_array')

# Record a video
video_folder = 'logs/videos/'
video_length = 500 # Number of steps to record

# Wrap the environment for video recording
eval_env = gym.wrappers.RecordVideo(eval_env, video_folder=video_folder,
                                    episode_trigger=lambda e: True, # Record every episode
                                    name_prefix=f"{MODEL_SAVE_PATH}-eval")
eval_env.reset() # Reset before starting recording

obs, _ = eval_env.reset()
cumulative_reward = 0
frames = [] # Store frames if needed for other viz

print("Starting video recording...")
for _ in range(video_length):
    action, _states = model.predict(obs, deterministic=True) # Use deterministic actions for evaluation
    obs, reward, terminated, truncated, info = eval_env.step(action)
    cumulative_reward += reward

    # frame = eval_env.render() # Handled by RecordVideo wrapper now
    # frames.append(frame)

    if terminated or truncated:
        print(f"Episode finished. Cumulative Reward: {cumulative_reward}")
        obs, _ = eval_env.reset()
        cumulative_reward = 0

eval_env.close() # Important to save the video file properly
print(f"âœ… Evaluation finished. Video saved in {video_folder}")

# --- Display the Recorded Video ---
# Find the latest video file
video_files = sorted([os.path.join(video_folder, f) for f in os.listdir(video_folder) if f.endswith('.mp4')])
if video_files:
    latest_video = video_files[-1]
    print(f"Displaying video: {latest_video}")
    display(show_video(latest_video))
else:
    print("Could not find a recorded video file.")

# --- Plot Training Curve ---
def plot_results(log_folder, title='Learning Curve'):
    """
    plot the results
    :param log_folder: the save location of the results to plot
    :param title: the title of the task to plot
    """
    x, y = ts2xy(load_results(log_folder), 'timesteps')
    y = moving_average(y, window=50) # Smooth the curve
    # Truncate x
    x = x[len(x) - len(y):]

    fig = plt.figure(title)
    plt.plot(x, y)
    plt.xlabel('Number of Timesteps')
    plt.ylabel('Rewards')
    plt.title(title + " Smoothed")
    plt.grid(True)
    plt.show()

def moving_average(values, window):
    """
    Smooth values by doing a moving average
    :param values: (numpy array)
    :param window: (int)
    :return: (numpy array)
    """
    weights = np.repeat(1.0, window) / window
    return np.convolve(values, weights, 'valid')

print("\nPlotting training results...")
plot_results(log_dir)

---

**8. Further Steps and Ideas**

* **Tune Hyperparameters:** Experiment with `learning_rate`, `buffer_size`, `batch_size`, `gamma`, network architecture (e.g., `net_arch=[256, 256]` in SAC policy_kwargs).
* **Try Different Tasks:** Use other `panda-gym` environments like `PandaPickAndPlace-v3`, `PandaPush-v3`, or `PandaSlide-v3`. These are significantly harder and require more training time and potentially different reward structures or hyperparameters.
* **Use Different Algorithms:** Try other SB3 algorithms like TD3 (Twin Delayed DDPG) or PPO (Proximal Policy Optimization).

In [None]:
# Example: Using TD3
    # from stable_baselines3 import TD3
    # model = TD3('MlpPolicy', env, verbose=1, tensorboard_log="./td3_panda_tensorboard/")
    # model.learn(total_timesteps=TOTAL_TIMESTEPS)

* **Improve Reward Shaping:** For complex tasks, the default reward might not be sufficient. Designing a better reward function (reward shaping) can significantly speed up learning but requires careful consideration to avoid unintended behaviors.
* **Domain Randomization:** If aiming for transfer to a real robot, train with variations in physics parameters (mass, friction, etc.) to make the policy more robust.
* **Custom Environments:** Create your own tasks or use different robot models by building custom Gymnasium environments using PyBullet or other simulators like MuJoCo.
* **Explore Advanced Techniques:** Hierarchical RL, Imitation Learning (learning from demonstrations), Offline RL (learning from pre-collected datasets).

---

**9. Resources**

* **Stable Baselines3 Documentation:** [https://stable-baselines3.readthedocs.io/](https://stable-baselines3.readthedocs.io/)
* **Gymnasium Documentation:** [https://gymnasium.farama.org/](https://gymnasium.farama.org/)
* **Panda-Gym Repository:** [https://github.com/qgallouedec/panda-gym](https://github.com/qgallouedec/panda-gym)
* **PyBullet:** [https://pybullet.org/](https://pybullet.org/)
* **Soft Actor-Critic Paper:** [https://arxiv.org/abs/1801.01290](https://arxiv.org/abs/1801.01290) (Original SAC) and [https://arxiv.org/abs/1812.05905](https://arxiv.org/abs/1812.05905) (SAC with automatic temperature tuning)

---

This Colab notebook provides a starting point. Remember that training RL agents for robotics can be computationally intensive and require significant tuning to achieve good performance, especially on complex tasks. Good luck!

<div class="md-recitation">
  Sources
  <ol>
  <li><a href="https://github.com/DLR-RM/stable-baselines3/issues/819">https://github.com/DLR-RM/stable-baselines3/issues/819</a></li>
  <li><a href="https://developer.aliyun.com/ask/548708">https://developer.aliyun.com/ask/548708</a></li>
  <li><a href="https://github.com/Jason-CKY/DeepRL-pytorch">https://github.com/Jason-CKY/DeepRL-pytorch</a></li>
  <li><a href="https://github.com/pypi-diff/20240413">https://github.com/pypi-diff/20240413</a></li>
  <li><a href="https://github.com/Turbo503/Deep-Wave-Trader">https://github.com/Turbo503/Deep-Wave-Trader</a></li>
  <li><a href="https://github.com/araffin/rl-baselines-zoo">https://github.com/araffin/rl-baselines-zoo</a> subject to MIT</li>
  </ol>
</div>