https://colab.research.google.com/github/google-deepmind/mujoco/blob/main/python/tutorial.ipynb#scrollTo=3jHYTV-bwfrS

blob:https://github.com/b2fdb28b-7584-4d19-b79f-111062fc964f


In [None]:
import os
!apt update && apt install -y \
    libosmesa6-dev \
    libgl1-mesa-glx \
    libglfw3 \
    patchelf \
    python3-dev

# install mujoco-python bindings and other libraries
!pip install mujoco mujoco-py gym matplotlib numpy robot_descriptions


!wget -q https://mujoco.org/download/mujoco210-linux-x86_64.tar.gz
!tar -xvf mujoco210-linux-x86_64.tar.gz -C /root/.mujoco/
!rm mujoco210-linux-x86_64.tar.gz


os.environ['LD_LIBRARY_PATH'] = "/root/.mujoco/mujoco210/bin:/usr/lib/nvidia"
os.environ['MUJOCO_GL'] = "egl"
os.environ['PATH'] += ":/root/.mujoco/mujoco210/bin"

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,527 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,626 kB]
Fetched 11.5 MB in 6s (1,789 kB/s)
Reading package lists... Done
Building dependency tree...

In [None]:
import numpy as np
import mujoco
from robot_descriptions.loaders.mujoco import load_robot_description
from scipy.spatial.transform import Rotation as R
import imageio
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw  # Import PIL for drawing
from scipy.signal import find_peaks


def get_jacobian_mujoco(model, data):
    """Get the geometric Jacobian from MuJoCo for position only."""
    jacp = np.zeros((3, model.nv))
    ee_body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "hand")
    mujoco.mj_jac(model, data, jacp, None, data.xpos[ee_body_id], ee_body_id)
    return jacp[:, :7], ee_body_id


def get_current_state(data, ee_body_id):
    """Get current end-effector position."""
    ee_pos = data.xpos[ee_body_id]
    return ee_pos


def generate_circular_trajectory(radius, omega, z_fixed, max_reach, time_steps):
    """Generates a circular trajectory in task space."""
    t = np.linspace(0, 2 * np.pi, time_steps)
    x = max_reach + radius * np.cos(omega * t)
    y = radius * np.sin(omega * t)
    z = np.full_like(t, z_fixed)
    return np.column_stack((x, y, z))


def compute_control_effort(delta_q_history):
    """Compute the cumulative control effort."""
    effort = np.sum([np.linalg.norm(delta_q)**2 for delta_q in delta_q_history])
    return effort


def apply_wind_force(data, ee_body_id, wind_strength=0.5):
    """Apply a simulated wind disturbance to the end-effector."""
    wind = wind_strength * np.random.normal(-1, 1, size=3)
    data.xfrc_applied[ee_body_id, :3] += wind


def plot_error_signal(time_history, error_history, peaks_positive, peaks_negative, oscillation_threshold, Kp, axis):
    """Utility function to plot the signed error signal and detected peaks."""
    plt.figure(figsize=(12, 6))
    plt.plot(time_history, error_history, label='Signed Error')
    plt.plot(np.array(time_history)[peaks_positive], np.array(error_history)[peaks_positive], "x", label='Positive Peaks')
    plt.plot(np.array(time_history)[peaks_negative], np.array(error_history)[peaks_negative], "o", label='Negative Peaks')
    plt.axhline(y=oscillation_threshold, color='r', linestyle='--', label='Positive Threshold')
    plt.axhline(y=-oscillation_threshold, color='r', linestyle='--', label='Negative Threshold')
    plt.title(f'Signed Error Signal for Kp = {Kp} on {axis.upper()} Axis')
    plt.xlabel('Time (s)')
    plt.ylabel('Signed Error')
    plt.legend()
    plt.grid(True)
    plt.show()


class PIDController:
    def __init__(self, Kp=0.0, Ki=0.0, Kd=0.0, integral_limit=1.0):
        """
        Initializes the PID Controller for a single axis.

        Args:
            Kp (float): Proportional gain.
            Ki (float): Integral gain.
            Kd (float): Derivative gain.
            integral_limit (float): Limit for the integral term to prevent windup.
        """
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.integral_limit = integral_limit
        self.error_sum = 0.0  # For more stable integral term
        self.prev_error = None
        self.derivative_filter = 0.1  # Low-pass filter coefficient

    def compute(self, error, error_dot, dt):
        """
        Computes the PID control effort.

        Args:
            error (float): Current error.
            error_dot (float): Derivative of error.
            dt (float): Time step.

        Returns:
            float: Control effort.
        """
        # Proportional term
        p_term = self.Kp * error

        # Improved integral term with decay
        self.error_sum = 0.95 * self.error_sum + error * dt  # Decay factor
        self.error_sum = np.clip(self.error_sum, -self.integral_limit, self.integral_limit)
        i_term = self.Ki * self.error_sum

        # Filtered derivative term
        if self.prev_error is None:
            self.prev_error = error
        filtered_error_dot = (self.derivative_filter * error_dot +
                            (1 - self.derivative_filter) * (error - self.prev_error) / dt)
        d_term = self.Kd * filtered_error_dot
        self.prev_error = error

        # Total control effort
        u = p_term + i_term + d_term

        return u


class AdaptivePIDController(PIDController):
    def __init__(self, initial_Ku, initial_Pu, adaptation_rate=0.1, integral_limit=1.0):
        # Adjust initial gains to be more conservative
        Kp_init = 0.3 * initial_Ku  # Reduced from 0.6
        Ki_init = 0.6 * initial_Ku / initial_Pu if initial_Pu != 0 else 0  # Reduced from 1.2
        Kd_init = 0.075 * initial_Ku * initial_Pu  # Keep the same

        super().__init__(Kp=Kp_init, Ki=Ki_init, Kd=Kd_init, integral_limit=integral_limit)

        # Reduce adaptation rate and add axis-specific bounds
        self.gamma = adaptation_rate * 0.1  # Reduced adaptation rate
        self.error_integral = 0.0

        # Initialize parameter estimates
        self.Kp_hat = Kp_init
        self.Ki_hat = Ki_init
        self.Kd_hat = Kd_init

        # Store baseline values for reference
        self.Ku = initial_Ku
        self.Pu = initial_Pu

    def adapt_gains(self, error, error_dot):
        """
        Update controller gains using Lyapunov-based adaptation laws
        """
        # Compute Lyapunov function components
        V = 0.5 * (error**2 + self.error_integral**2 + error_dot**2)

        # Compute adaptation laws based on Lyapunov stability
        dKp = -self.gamma * error * V
        dKi = -self.gamma * self.error_integral * V
        dKd = -self.gamma * error_dot * V

        # Update gains with stability bounds
        self.Kp = np.clip(self.Kp_hat + dKp, 0, 2 * self.Ku)
        self.Ki = np.clip(self.Ki_hat + dKi, 0, 4 * self.Ku / self.Pu if self.Pu != 0 else 0)
        self.Kd = np.clip(self.Kd_hat + dKd, 0, 0.25 * self.Ku * self.Pu)

        # Update estimates for next iteration
        self.Kp_hat = self.Kp
        self.Ki_hat = self.Ki
        self.Kd_hat = self.Kd

        # Update error integral for Ki adaptation
        self.error_integral = 0.95 * self.error_integral + error  # With forgetting factor

    def compute(self, error, error_dot, dt):
        """
        Compute control signal and adapt gains
        """
        # Adapt gains using Lyapunov-based method
        self.adapt_gains(error, error_dot)

        # Use parent class to compute control signal with adapted gains
        return super().compute(error, error_dot, dt)

def find_ku_pu_per_axis(model, initial_qpos, target_pos, ee_body_id, joint_indices, max_steps=2500, dt=0.01,
                        Kp_min=0, Kp_max=75, initial_Kp_step=0.1,
                        oscillation_threshold_multiplier=10, required_oscillations=13,
                        min_peak_distance=50, plot=True, discount_time=2.0,
                        max_divergences=100):
    """
    Determines Ku and Pu for each position axis individually (3 position axes).
    If an axis diverges max_divergences times, it skips to the next axis and sets Kp to zero for that axis.

    Returns:
        Ku: array of ultimate gains per axis.
        Pu: array of ultimate periods per axis.
    """
    Ku = np.zeros(3)  # 3 position axes
    Pu = np.zeros(3)  # 3 position axes

    axes = ['x', 'y', 'z']

    for axis in range(3):
        print(f"\nDetermining Ku and Pu for {axes[axis].upper()} axis ({axis + 1}/3)")
        Kp = Kp_min
        axis_Ku = None
        axis_Pu = None
        divergence_count = 0  # Initialize divergence counter for the current axis

        while Kp <= Kp_max:
            print(f"  Testing Kp = {Kp:.2f} for {axes[axis].upper()} axis")
            # Initialize simulation data
            data = mujoco.MjData(model)
            data.qpos[joint_indices] = initial_qpos.copy()
            data.qvel[joint_indices] = 0
            mujoco.mj_forward(model, data)

            # Initialize PID controller with current Kp, Ki=0, Kd=0 for this axis
            pid = PIDController(Kp=0.0, Ki=0.0, Kd=0.0)
            pid.Kp = 0.0
            pid.Ki = 0.0
            pid.Kd = 0.0
            pid.Kp = Kp  # Apply Kp only to the current axis

            error_history = []
            time_history = []
            prev_error = 0.0

            # Run simulation for a fixed duration to observe behavior
            for step in range(max_steps):
                # Compute control effort
                ee_pos = get_current_state(data, ee_body_id)

                # Construct the error for the current axis
                error = target_pos[axis] - ee_pos[axis]

                # Compute derivative of error
                if step == 0:
                    error_dot = 0.0
                else:
                    error_dot = (error - prev_error) / dt
                prev_error = error

                # Compute control effort
                u = pid.compute(error, error_dot, dt)

                # Get Jacobian
                jacp, _ = get_jacobian_mujoco(model, data)
                jacobian = jacp  # (3,7)

                # Extract the Jacobian row corresponding to the current axis
                jacobian_axis = jacobian[axis, :]  # (7,)

                # Map to joint velocities using damped least squares for the current axis
                try:
                    lambda_reg = 0.01  # Adjust as needed
                    J_pinv = jacobian_axis.T / (jacobian_axis @ jacobian_axis.T + lambda_reg)
                    delta_q = J_pinv * u
                except np.linalg.LinAlgError:
                    print("    Singular matrix encountered while computing pseudoinverse.")
                    break

                # Define maximum joint velocities
                max_velocity = 4.0  # radians per second, adjust as appropriate

                # Before setting qvel
                delta_q = np.clip(delta_q, -max_velocity, max_velocity)
                data.qvel[joint_indices] = delta_q

                # Update joint positions and velocities
                data.qpos[joint_indices] += data.qvel[joint_indices] * dt
                # Enforce joint limits
                data.qpos[joint_indices] = np.clip(data.qpos[joint_indices],
                                                   model.jnt_range[joint_indices, 0],
                                                   model.jnt_range[joint_indices, 1])
                mujoco.mj_forward(model, data)

                # Record signed error for the current axis only
                error_history.append(error)
                time_history.append(step * dt)

                # Early termination if system diverges
                if np.abs(error) > 5 * np.abs(target_pos[axis]):
                    print(f"    Kp = {Kp} caused system divergence on {axes[axis].upper()} axis.")
                    Kp += initial_Kp_step * 5
                    divergence_count += 1
                    break  # Exit the simulation loop for the current Kp

            # Check if divergence threshold is reached
            if divergence_count >= max_divergences:
                print(f"  Exceeded maximum divergences for {axes[axis].upper()} axis. Setting Kp to 0.")
                Ku[axis] = 0.0
                Pu[axis] = 0.1
                break  # Move to the next axis

            # Discount the initial transient response
            valid_indices = [i for i, t in enumerate(time_history) if t > discount_time]
            if not valid_indices:
                print(f"    Not enough data after discount_time for {axes[axis].upper()} axis.")
                Kp += 5 * initial_Kp_step
                continue  # Move to the next Kp value

            filtered_time_history = np.array(time_history)[valid_indices]
            filtered_error_array = np.array(error_history)[valid_indices]

            # Analyze filtered error_history for peaks
            initial_segment_length = max(int(0.1 * len(filtered_error_array)), 1)
            initial_segment = filtered_error_array[:initial_segment_length]
            initial_error_std = np.std(initial_segment) if len(initial_segment) > 0 else 1e-3
            oscillation_threshold = oscillation_threshold_multiplier * initial_error_std

            # Detect peaks (both positive and negative)
            peaks_positive, _ = find_peaks(filtered_error_array, distance=min_peak_distance)
            peaks_negative, _ = find_peaks(-filtered_error_array, distance=min_peak_distance)
            total_peaks = len(peaks_positive) + len(peaks_negative)
            print(f"    Detected {total_peaks} peaks with threshold ±{oscillation_threshold:.4f} (Std: {initial_error_std:.4f}).")

            # Optional plotting
            if plot:
                plot_error_signal(filtered_time_history, filtered_error_array, peaks_positive, peaks_negative, oscillation_threshold, Kp, axes[axis])

            # Check for sufficient oscillations to determine Ku and Pu
            if total_peaks >= required_oscillations:
                # Calculate Pu from consecutive peaks
                peak_times = np.sort(np.concatenate((filtered_time_history[peaks_positive], filtered_time_history[peaks_negative])))
                if len(peak_times) >= 2:
                    periods = np.diff(peak_times)
                    Pu_axis = np.mean(periods[-required_oscillations:])  # Average the last few periods
                else:
                    Pu_axis = None
                axis_Ku = Kp
                axis_Pu = Pu_axis
                print(f"    Confirmed Ku = {axis_Ku:.2f}, Pu = {Pu_axis:.2f} for {axes[axis].upper()} axis.")
                break  # Move to the next axis after successful determination

            # Update Kp and step size
            Kp += initial_Kp_step

        if axis_Ku is not None and axis_Pu is not None:
            Ku[axis] = axis_Ku
            Pu[axis] = Pu_axis
        else:
            if divergence_count < max_divergences:
                print(f"    Failed to determine Ku and Pu for {axes[axis].upper()} axis. Setting Kp to 0.")
                Ku[axis] = 0.0
                Pu[axis] = 0.0
            # Else, already set Ku and Pu to 0 above due to max divergences

    return Ku, Pu

def run_simulation(Ku, Pu, max_iterations=50000, tolerance=1e-3, tune_params=False):
    """Run the simulation with different PID tuning methods and save videos."""
    err_hist = {}
    delta_q_history = {}
    effort = {}
    gain_histories = {}
    actual_positions = {}  # To store actual positions per method

    tuning_methods = ['ziegler-nichols', 'chao', 'adaptive']
    axes = ['x', 'y', 'z']

    # Initialize all tracking dictionaries
    for method in tuning_methods:
        err_hist[method] = {axis: [] for axis in axes}
        delta_q_history[method] = {axis: [] for axis in axes}
        actual_positions[method] = {axis: [] for axis in axes}
        effort[method] = {axis: 0.0 for axis in axes}
        if method == 'adaptive':
            gain_histories[method] = {
                'Kp': {axis: [] for axis in axes},
                'Ki': {axis: [] for axis in axes},
                'Kd': {axis: [] for axis in axes}
            }

    # Load the model once
    model = load_robot_description("panda_mj_description")
    ee_body_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "hand")
    initial_qpos = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
    joint_start = model.jnt_dofadr[0]
    num_joints = 7
    joint_indices = np.arange(joint_start, joint_start + num_joints)

    for method in tuning_methods:
        print(f"\nRunning simulation with {method.replace('-', ' ').capitalize()} tuning...")
        # Initialize simulation data
        data = mujoco.MjData(model)
        data.qpos[joint_indices] = initial_qpos.copy()
        data.qvel[joint_indices] = 0
        mujoco.mj_forward(model, data)

        # Initialize prev_error for each axis
        prev_error = {axis: 0.0 for axis in axes}

        # Initialize PID controllers for each axis
        pid_controllers = {}
        if method == 'adaptive':
            for i, axis in enumerate(axes):
                pid_controllers[axis] = AdaptivePIDController(
                    initial_Ku=Ku[i],
                    initial_Pu=Pu[i],
                    adaptation_rate=0.1
                )
        else:
            for i, axis in enumerate(axes):
                pid_controllers[axis] = PIDController(
                    Kp=0.6 * Ku[i] if method == 'ziegler-nichols' else 0.2 * Ku[i],
                    Ki=1.2 * Ku[i] / Pu[i] if method == 'ziegler-nichols' else 0.3636 * Ku[i] / Pu[i],
                    Kd=0.075 * Ku[i] * Pu[i] if method == 'ziegler-nichols' else 0.066 * Ku[i] * Pu[i]
                )

        # Simulation parameters
        lambda_reg = 0.01
        dt = model.opt.timestep  # Use MuJoCo's simulation timestep

        # Generate trajectory
        trajectory = generate_circular_trajectory(
            radius=0.2,  # Reduced radius
            omega=2.5,   # Slower motion
            z_fixed=0.2,
            max_reach=0.3,  # Reduced reach
            time_steps=max_iterations
        )

        # Begin simulation
        for step in range(max_iterations):
            # Get Jacobian
            jacp, _ = get_jacobian_mujoco(model, data)
            jacobian = jacp  # (3,7)
            mujoco.mj_forward(model, data)
            apply_wind_force(data, ee_body_id)  # Uncomment if wind disturbance is desired

            # Get current trajectory point
            target_cartesian_pos = trajectory[step % len(trajectory)]

            # Get current state and compute error for each axis
            ee_pos = get_current_state(data, ee_body_id)
            error_pos = target_cartesian_pos - ee_pos

            # Create combined control vector
            u_combined = np.zeros(3)

            # Compute control effort for each axis
            for i, axis in enumerate(axes):
                axis_name = axes[i]
                error = error_pos[i]
                error_dot = (error - prev_error[axis_name]) / dt
                prev_error[axis_name] = error

                # Adapt gains if using adaptive controller
                if method == 'adaptive':
                    pid_controllers[axis_name].adapt_gains(error, error_dot)

                # Compute control effort for the current axis
                u_combined[i] = pid_controllers[axis_name].compute(error, error_dot, dt)

            # Compute joint velocities using damped least squares for the full Jacobian
            lambda_reg = 0.01  # Consider increasing this for better stability
            J_pinv = jacobian.T @ np.linalg.inv(jacobian @ jacobian.T + lambda_reg * np.eye(3))
            delta_q = J_pinv @ u_combined

            # Increase damping in unstable regions
            if np.linalg.cond(jacobian @ jacobian.T) > 1e3:
                lambda_reg *= 10

            # Consider reducing max velocity for better stability
            max_velocity = 2.0  # Reduced from 4.0
            delta_q = np.clip(delta_q, -max_velocity, max_velocity)
            data.qvel[joint_indices] = delta_q

            # Update joint positions
            data.qpos[joint_indices] += data.qvel[joint_indices] * dt
            data.qpos[joint_indices] = np.clip(data.qpos[joint_indices],
                                             model.jnt_range[joint_indices, 0],
                                             model.jnt_range[joint_indices, 1])
            mujoco.mj_forward(model, data)

            # Record delta_q for each axis
            for i, axis in enumerate(axes):
                err_hist[method][axis].append(np.abs(error_pos[i]))
                actual_positions[method][axis].append(ee_pos[i])
                delta_q_history[method][axis].append(delta_q.copy())  # Make sure to record delta_q

            # Add break condition if all axes have converged
            if all(err_hist[method][axis][-1] < tolerance for axis in axes):
                print(f"Converged for method {method} at step {step}")
                break

        # Compute cumulative control effort per axis
        effort[method] = {axis: compute_control_effort(delta_q_history[method][axis]) for axis in axes}

        # Save the gain histories for the adaptive controller
        if method == 'adaptive':
            gain_histories[method] = {
                'Kp': {axis: pid_controllers[axis].Kp for axis in axes},
                'Ki': {axis: pid_controllers[axis].Ki for axis in axes},
                'Kd': {axis: pid_controllers[axis].Kd for axis in axes}
            }

        # After the simulation loop, save the figures
        for method in tuning_methods:
            # Position tracking plots
            fig_method, axs_method = plt.subplots(3, 1, figsize=(15, 18))
            fig_method.suptitle(f'End Effector Position Tracking - {method.replace("-", " ").capitalize()} Control', fontsize=16)

            for i, axis in enumerate(axes):
                ax = axs_method[i]
                actual = np.array(actual_positions[method][axis])
                desired = trajectory[:len(actual), i]

                ax.plot(actual, label='Actual', color='blue')
                ax.plot(desired, linestyle='--', label='Desired', color='orange')
                ax.set_title(f'End Effector {axis.upper()} Position')
                ax.set_xlabel('Timestep')
                ax.set_ylabel(f'{axis.upper()} Position (m)')
                ax.legend()
                ax.grid(True)

            plt.tight_layout(rect=[0, 0.03, 1, 0.95])
            plt.savefig(f'position_tracking_{method}.png')
            plt.close()

            # Controller effort plots
            fig_effort, axs_effort = plt.subplots(3, 1, figsize=(15, 18))
            fig_effort.suptitle(f'Control Effort - {method.replace("-", " ").capitalize()} Control', fontsize=16)

            for i, axis in enumerate(axes):
                ax = axs_effort[i]
                effort_data = np.array([np.linalg.norm(dq) for dq in delta_q_history[method][axis]])
                ax.plot(effort_data, label=f'{axis.upper()} Axis Effort')
                ax.set_title(f'Control Effort for {axis.upper()} Axis')
                ax.set_xlabel('Timestep')
                ax.set_ylabel('Control Effort')
                ax.legend()
                ax.grid(True)

            plt.tight_layout(rect=[0, 0.03, 1, 0.95])
            plt.savefig(f'control_effort_{method}.png')
            plt.close()

    return err_hist, delta_q_history, effort, gain_histories, actual_positions, trajectory


# Example Ku and Pu values for each axis (x, y, z)
Ku = np.array([2.7, 2.3, 2.4])
Pu = np.array([0.08307692, 0.03615385, 0.03461538])

Ku = np.array([20.2, 24.7,  2.5])
Pu = np.array([0.32692308, 0.33076923, 0.86230769])

# Run the simulation
err_hist, delta_q_history, effort, gain_histories, actual_positions, trajectory = run_simulation(
    Ku=Ku,
    Pu=Pu,
    max_iterations=15000,
    tolerance=1e-3,
    tune_params=False
)

