In [1]:
! git clone https://github.com/goin2crazy/mujoco_robotic_arm_playground

Cloning into 'mujoco_robotic_arm_playground'...
remote: Enumerating objects: 22, done.[K
remote: Counting objects: 100% (22/22), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 22 (delta 6), reused 21 (delta 5), pack-reused 0 (from 0)[K
Receiving objects: 100% (22/22), 28.18 KiB | 5.63 MiB/s, done.
Resolving deltas: 100% (6/6), done.


In [2]:
!pip install mujoco

# Set up GPU rendering.
from google.colab import files
import distutils.util
import os
import subprocess
# if subprocess.run('nvidia-smi').returncode:
#   raise RuntimeError(
#       'Cannot communicate with GPU. '
#       'Make sure you are using a GPU Colab runtime. '
#       'Go to the Runtime menu and select Choose runtime type.')

# Add an ICD config so that glvnd can pick up the Nvidia EGL driver.
# This is usually installed as part of an Nvidia driver package, but the Colab
# kernel doesn't install its driver via APT, and as a result the ICD is missing.
# (https://github.com/NVIDIA/libglvnd/blob/master/src/EGL/icd_enumeration.md)
NVIDIA_ICD_CONFIG_PATH = '/usr/share/glvnd/egl_vendor.d/10_nvidia.json'
if not os.path.exists(NVIDIA_ICD_CONFIG_PATH):
  with open(NVIDIA_ICD_CONFIG_PATH, 'w') as f:
    f.write("""{
    "file_format_version" : "1.0.0",
    "ICD" : {
        "library_path" : "libEGL_nvidia.so.0"
    }
}
""")

# Configure MuJoCo to use the EGL rendering backend (requires GPU)
print('Setting environment variable to use GPU rendering:')
%env MUJOCO_GL=egl

# Check if installation was succesful.
try:
  print('Checking that the installation succeeded:')
  import mujoco
  mujoco.MjModel.from_xml_string('<mujoco/>')
except Exception as e:
  raise e from RuntimeError(
      'Something went wrong during installation. Check the shell output above '
      'for more information.\n'
      'If using a hosted Colab runtime, make sure you enable GPU acceleration '
      'by going to the Runtime menu and selecting "Choose runtime type".')

print('Installation successful.')

# Other imports and helper functions
import time
import itertools
import numpy as np

# Graphics and plotting.
print('Installing mediapy:')
!command -v ffmpeg >/dev/null || (apt update && apt install -y ffmpeg)
!pip install -q mediapy
import mediapy as media
import matplotlib.pyplot as plt

# More legible printing from numpy.
np.set_printoptions(precision=3, suppress=True, linewidth=100)

from IPython.display import clear_output
clear_output()


In [3]:
%cd mujoco_robotic_arm_playground

import mujoco
import numpy as np
import cv2
import os
import time  # Import the time module
import logging  # Import the logging module

from utils import *
from observation import get_observation
from states import *


/content/mujoco_robotic_arm_playground


In [4]:
from google.colab.patches import cv2_imshow

duration = 3.8  # (seconds)
framerate = 60  # (Hz)

def visualize_mujoco(model, data):
    """
    Visualizes the MuJoCo model using OpenCV.
    """


    # Simulate and display video.
    frames = []
    mujoco.mj_resetData(model, data)  # Reset state and time.

    # Initialize the renderer.
    with mujoco.Renderer(model) as renderer:
        start_time = time.time()
        egg_start_pos = data.xpos[get_body_id(model, "egg")][:2].copy()  # stores the initial xy position of the egg.
        if np.any(np.isnan(egg_start_pos)):
            egg_start_pos = np.array([0, 0])

        egg_dist_to_target = 99999
        # Main simulation loop
        while data.time < duration:
            # Simulate the model.
            try:
                mujoco.mj_step(model, data)
            except Exception as e:
                logging.error(f"Error in mujoco.mj_step: {e}")
                break  # Exit the loop on error

            if len(frames) < data.time * framerate:

              # Update and render the scene.
              mujoco.mj_forward(model, data)
              renderer.update_scene(data)
              img = renderer.render()  # Get the rendered image
              frames.append(img)

              # Convert the image to a format OpenCV can use (BGR)
              # img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)

              # Display the image using OpenCV
              # cv2_imshow(img_bgr)
              # cv2.waitKey(1)  # 1 millisecond delay for real-time update

              # Get and print the observation vector.
              observation = get_observation(model, data)
              rewards, egg_dist_to_target = reward_function(model, data, prev_dist=egg_dist_to_target)

              logging.info(f"Observation: {observation}")

              # Print contact information.  Consider using logging here as well.
              logging.info(f"Egg at the start: {egg_at_the_start(model, data)}")
              logging.info(f"Egg on the floor: {egg_on_the_floor(model, data)}")
              logging.info(f"Egg at the holding: {egg_at_the_holding(model, data)}")
              logging.info(f"Egg in target: {egg_in_target(model, data)}")

              # Check for session end.
              done, addictional_reward = check_session_end(model, data, start_time, egg_start_pos)
              rewards += addictional_reward

              logging.info(f"Current rewards: {rewards}")

              if done:
                  break
    media.show_video(frames, fps=framerate)


In [5]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
try:
    model, data = load_model_and_data("egg_test.xml")
    visualize_mujoco(model, data)
except Exception as e:
    logging.critical(f"An error occurred: {e}")  # Log the error and exit

0
This browser does not support the video tag.


In [6]:
! pip install -q stable-baselines3 gymnasium

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m184.5/184.5 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m42.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m40.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m41.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [7]:
import gymnasium as gym
from stable_baselines3.common.env_checker import check_env

In [8]:
import numpy as np

def roughness_penalty(actions, max_penalty=1.0, min_penalty=-1.0):
    """
    Calculate a penalty based on how 'rough' the robot's actions are.

    Parameters:
    - actions: np.array or torch.Tensor of actions (e.g. torques)
    - max_penalty: float, maximum penalty when action is extremely large
    - min_penalty: float, least penalty when action is small or gentle

    Returns:
    - penalty: float, negative reward for roughness
    """
    # Convert to numpy if it's a torch tensor
    if not isinstance(actions, np.ndarray):
        actions = actions.detach().cpu().numpy()

    # Measure "roughness" by L2 norm (magnitude) of action vector
    roughness = np.linalg.norm(actions)

    # Normalize the roughness to 0...1 scale (you can tune this)
    normalized = np.clip(roughness / 5.0, 0.0, 1.0)  # assuming 5.0 is "very rough"

    # Linearly map to penalty between min and max
    penalty = - (min_penalty + (max_penalty - min_penalty) * normalized)
    return penalty


In [14]:
from gymnasium import spaces

class MujocoRobotArmEnv(gym.Env):
    """
    A Gymnasium environment for controlling a MuJoCo robot arm.
    """

    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 60}

    def __init__(self, model_path="your_model.xml"):  # Add model_path
        """
        Initializes the environment.
        Args:
            model_path (str): Path to the MuJoCo XML model file.
        """
        super().__init__()

        # Load the MuJoCo model.
        try:
            self.model = mujoco.MjModel.from_xml_path(model_path)
        except Exception as e:
            raise ValueError(f"Error loading MuJoCo model from {model_path}: {e}")

        self.data = mujoco.MjData(self.model)

        # Define action and observation spaces.  CRITICAL.
        # Example:  Action space is the joint torque limits.
        # low = self.model.actuator_ctrlrange[:, 0]
        # high = self.model.actuator_ctrlrange[:, 1]
        # self.action_space = spaces.Box(low=low, high=high, dtype=np.float32)

        # Example: Observation space is joint positions and velocities
        # num_joints = self.model.njnt
        # obs_dim = num_joints * 2  # For positions and velocities
        # self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32)
        self.action_space = spaces.Box(low=-1, high=1, shape=self.data.ctrl.shape, dtype=np.float32) # Placeholder
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=get_observation(self.model, self.data).shape, dtype=np.float64) # Placeholder

        self.renderer = None  # Initialize renderer lazily

        self._time = 0  # Track the current time in the simulation.
        self.egg_start_pos = None # store the initial position of the egg

    def step(self, action):
        """
        Simulates one step of the environment.

        Args:
            action (np.ndarray): The action to take.

        Returns:
            tuple: (observation, reward, terminated, truncated, info)
        """
        # Apply the action (e.g., set joint torques).
        assert action.shape == self.action_space.shape # Important: Check shape

        try:
            self.data.ctrl[:] = action  # Apply action (replace with your control logic)
            mujoco.mj_step(self.model, self.data)
        except Exception as e:
            logging.error(f"Error in mj_step: {e}")
            # Handle the error appropriately (e.g., set terminated/truncated, return a default observation)
            observation = np.zeros(self.observation_space.shape)  # Or some other safe value
            return observation, 0, True, False, {"error": str(e)}

        self._time = self.data.time # update the time

        # Get the observation.
        observation = get_observation(self.model, self.data)

        # Calculate the reward.
        reward, _ = reward_function(self.model, self.data, prev_dist=9999) #TODO prev_dist

        # Penalty for motions roughness
        roughtness_penalty = roughness_penalty(action)
        reward += roughtness_penalty

        # Check for the end of the session.
        terminated, additional_reward = check_session_end(self.model, self.data, time.time(), self.egg_start_pos) #TODO start time

        reward += additional_reward

        info = {
            "egg_at_start": egg_at_the_start(self.model, self.data),
            "egg_on_floor": egg_on_the_floor(self.model, self.data),
            "egg_at_holding": egg_at_the_holding(self.model, self.data),
            "egg_in_target": egg_in_target(self.model, self.data),
            "time": self._time,
        }

        # Important:  Return a valid tuple, even if there's an error.
        return observation, reward, terminated, False, info  # truncated is always false

    def reset(self, *, seed=None, options=None):
        """
        Resets the environment to its initial state.

        Args:
            seed (int, optional): Random seed.
            options (dict, optional): Additional reset options.

        Returns:
            tuple: (observation, info)
        """
        super().reset(seed=seed, options=options)  # Handle seed

        # Reset MuJoCo state.
        mujoco.mj_resetData(self.model, self.data)
        mujoco.mj_forward(self.model, self.data) # Forward simulation to ensure initial state is correct

        self._time = 0  # Reset time
        self.egg_start_pos = self.data.xpos[get_body_id(self.model, "egg")][:2].copy()  # stores the initial xy position of the egg.
        if np.any(np.isnan(self.egg_start_pos)):
            self.egg_start_pos = np.array([0, 0])

        # Get the initial observation.
        observation = get_observation(self.model, self.data)

        info = {}  # Add any relevant info here

        return observation, info

    def render(self, mode="human"):
        """
        Renders the environment.

        Args:
            mode (str): The rendering mode ("human" or "rgb_array").

        Returns:
            np.ndarray or None: The rendered image if mode is "rgb_array", None otherwise.
        """
        if mode not in self.metadata["render_modes"]:
            raise ValueError(f"Invalid render mode: {mode}")

        if self.renderer is None:
            self.renderer = mujoco.Renderer(self.model)

        # Update the scene.
        mujoco.mj_forward(self.model, self.data) #  Make sure data is consistent before rendering
        self.renderer.update_scene(self.data)

        if mode == "human":
            self.renderer.render()  # Render to the default GLFW window
            return None
        elif mode == "rgb_array":
            img = self.renderer.render()
            return img  # Return the raw image data
        else:
            return None

    def close(self):
        """
        Closes the environment and releases resources.
        """
        if self.renderer:
            self.renderer.close()
        self.renderer = None
        # No need to close self.model or self.data, they don't have close() methods in mujoco

    def __del__(self):
        # Ensure resources are cleaned up.  Important for preventing memory leaks.
        self.close()


roboenv_1 = MujocoRobotArmEnv("egg_final.xml")
check_env(roboenv_1)

In [15]:
import torch
from stable_baselines3 import DDPG
from stable_baselines3.common.noise import OrnsteinUhlenbeckActionNoise  # Better for physical systems
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv
import numpy as np


# Environment setup
roboenv_1 = make_vec_env(lambda: MujocoRobotArmEnv("egg_final.xml"), n_envs=4)  # Parallel environments

In [16]:

import torch as th
import torch.nn as nn
from gymnasium import spaces

from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor


class CustomFeatureExtractor(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: spaces.Box, features_dim: int = 256, hidden_dim=512):
        super().__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        n_input_channels = observation_space.shape[0]
        self.mlp = nn.Sequential(
            nn.Linear(n_input_channels, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, features_dim),
            nn.ReLU(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.mlp(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.mlp(observations))

In [17]:
# Optimized noise parameters
n_actions = roboenv_1.action_space.shape[-1]
noise_sigma = 0.2 * np.ones(n_actions)
action_noise = OrnsteinUhlenbeckActionNoise(
    mean=np.zeros(n_actions),
    sigma=noise_sigma,
    theta=0.15,  # Higher theta = faster noise decay
    dt=1e-2
)


policy_kwargs = dict(
    features_extractor_class=CustomFeatureExtractor,
)

# Create model
model = DDPG("MlpPolicy",
             roboenv_1,
             action_noise=action_noise,
             policy_kwargs=policy_kwargs,
             verbose=1,
             tensorboard_log="./ddpg_logs/")

Using cuda device


In [None]:

# Add input normalization
from stable_baselines3.common.preprocessing import get_flattened_obs_dim
from stable_baselines3.common.vec_env import VecNormalize

roboenv_1 = VecNormalize(roboenv_1, norm_obs=True, norm_reward=True)

# Callbacks
eval_callback = EvalCallback(
    roboenv_1,
    best_model_save_path="./best_model/",
    log_path="./logs/",
    eval_freq=10_000,
    deterministic=True,
    render=False
)

checkpoint_callback = CheckpointCallback(
    save_freq=50_000,
    save_path="./checkpoints/",
    name_prefix="ddpg_model"
)

# Train with progressive noise decay
model.learn(
    total_timesteps=1_000_000,
    callback=[eval_callback, checkpoint_callback],
    log_interval=4,
    progress_bar=True
)

# Save final model
model.save("ddpg_robotic_arm")

Logging to ./ddpg_logs/DDPG_1


Output()



---------------------------------
| rollout/           |          |
|    ep_len_mean     | 420      |
|    ep_rew_mean     | -597     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 165      |
|    time_elapsed    | 11       |
|    total_timesteps | 1908     |
| train/             |          |
|    actor_loss      | 1.71     |
|    critic_loss     | 0.0696   |
|    learning_rate   | 0.001    |
|    n_updates       | 451      |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 421      |
|    ep_rew_mean     | -617     |
| time/              |          |
|    episodes        | 8        |
|    fps             | 172      |
|    time_elapsed    | 20       |
|    total_timesteps | 3596     |
| train/             |          |
|    actor_loss      | 4.74     |
|    critic_loss     | 14.7     |
|    learning_rate   | 0.001    |
|    n_updates       | 873      |
--------------

---------------------------------
| eval/              |          |
|    mean_ep_length  | 433      |
|    mean_reward     | -683     |
| time/              |          |
|    total_timesteps | 40000    |
| train/             |          |
|    actor_loss      | 52.6     |
|    critic_loss     | 2.25     |
|    learning_rate   | 0.001    |
|    n_updates       | 9974     |
---------------------------------


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 430      |
|    ep_rew_mean     | -624     |
| time/              |          |
|    episodes        | 76       |
|    fps             | 165      |
|    time_elapsed    | 250      |
|    total_timesteps | 41568    |
| train/             |          |
|    actor_loss      | 56.4     |
|    critic_loss     | 1.05     |
|    learning_rate   | 0.001    |
|    n_updates       | 10366    |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 431      |
|    ep_rew_mean     | -624     |
| time/              |          |
|    episodes        | 80       |
|    fps             | 166      |
|    time_elapsed    | 260      |
|    total_timesteps | 43300    |
| train/             |          |
|    actor_loss      | 56.9     |
|    critic_loss     | 0.406    |
|    learning_rate   | 0.001    |
|    n_updates       | 10799    |
--------------

---------------------------------
| eval/              |          |
|    mean_ep_length  | 433      |
|    mean_reward     | -683     |
| time/              |          |
|    total_timesteps | 80000    |
| train/             |          |
|    actor_loss      | 63.7     |
|    critic_loss     | 0.26     |
|    learning_rate   | 0.001    |
|    n_updates       | 19974    |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 432      |
|    ep_rew_mean     | -627     |
| time/              |          |
|    episodes        | 104      |
|    fps             | 153      |
|    time_elapsed    | 542      |
|    total_timesteps | 83324    |
| train/             |          |
|    actor_loss      | 59.5     |
|    critic_loss     | 0.217    |
|    learning_rate   | 0.001    |
|    n_updates       | 20805    |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_me