In [None]:
import pybullet as p
import pybullet_data
import time

# Initialize PyBullet
physicsClient = p.connect(p.GUI)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setGravity(0, 0, -9.81)

# Load ground plane
planeId = p.loadURDF("plane.urdf")

# Load exoskeleton
startPos = [0, 0, 1.215]  # Start position above ground
startOrientation = p.getQuaternionFromEuler([0, 0, 0])
robotId = p.loadURDF("urdf/Exoskeleton.urdf", startPos, startOrientation)

# Get joint information
numJoints = p.getNumJoints(robotId)
jointInfoList = []
for i in range(numJoints):
    jointInfo = p.getJointInfo(robotId, i)
    jointInfoList.append(jointInfo)
    print(f"Joint {i}: {jointInfo[1].decode('utf-8')}")

# Set initial joint positions for standing
jointPositions = {
    'right_hip_yaw_joint': -0.15,
    'right_hip_roll_joint': 0.0,
    'right_hip_pitch_joint': -0.025,
    'right_knee_joint': 0.225,
    'right_ankle_roll_joint': 0.0,
    'right_ankle_pitch_joint': -0.2,
    'left_hip_yaw_joint': 0.15,
    'left_hip_roll_joint': 0.0,
    'left_hip_pitch_joint': -0.025,
    'left_knee_joint': 0.225,
    'left_ankle_roll_joint': 0.0,
    'left_ankle_pitch_joint': -0.2
}

# Apply initial joint positions
for joint_name, position in jointPositions.items():
    for i in range(numJoints):
        if jointInfoList[i][1].decode('utf-8') == joint_name:
            p.resetJointState(robotId, i, position)
            p.setJointMotorControl2(
                robotId, i, p.POSITION_CONTROL,
                targetPosition=position,
                force=200
            )
            break

# Configure debug visualizer
p.resetDebugVisualizerCamera(
    cameraDistance=2,
    cameraYaw=0,
    cameraPitch=-30,
    cameraTargetPosition=[0, 0, 1]
)

# Simulation loop
try:
    while True:
        p.stepSimulation()
        time.sleep(1./240.)
except KeyboardInterrupt:
    p.disconnect()

In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pybullet as p
import pybullet_data
import time

class ExoskeletonEnv(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self, render: bool = False, task: str = "stand"):
        super(ExoskeletonEnv, self).__init__()
        self.render_mode = render
        self.task = task

        # Connect to PyBullet in GUI or DIRECT mode
        if self.render_mode:
            self.physicsClient = p.connect(p.GUI)
        else:
            self.physicsClient = p.connect(p.DIRECT)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81)
        
        # Load the ground plane and exoskeleton URDF
        self.planeId = p.loadURDF("plane.urdf")
        self.startPos = [0, 0, 1.215]
        self.startOrientation = p.getQuaternionFromEuler([0, 0, 0])
        self.robotId = p.loadURDF("urdf/Exoskeleton.urdf", self.startPos, self.startOrientation)
        
        # Define the controlled joints
        self.joint_names = [
            'right_hip_yaw_joint', 'right_hip_roll_joint', 'right_hip_pitch_joint', 
            'right_knee_joint', 'right_ankle_roll_joint', 'right_ankle_pitch_joint',
            'left_hip_yaw_joint', 'left_hip_roll_joint', 'left_hip_pitch_joint', 
            'left_knee_joint', 'left_ankle_roll_joint', 'left_ankle_pitch_joint'
        ]
        self.joint_indices = {}
        self.num_joints = p.getNumJoints(self.robotId)
        for i in range(self.num_joints):
            info = p.getJointInfo(self.robotId, i)
            name = info[1].decode('utf-8')
            if name in self.joint_names:
                self.joint_indices[name] = i

        # Default (standing) joint positions as offsets
        self.default_joint_positions = {
            'right_hip_yaw_joint': -0.15, 
            'right_hip_roll_joint': 0.0, 
            'right_hip_pitch_joint': -0.025,
            'right_knee_joint': 0.225, 
            'right_ankle_roll_joint': 0.0, 
            'right_ankle_pitch_joint': -0.2,
            'left_hip_yaw_joint': 0.15, 
            'left_hip_roll_joint': 0.0, 
            'left_hip_pitch_joint': -0.025,
            'left_knee_joint': 0.225, 
            'left_ankle_roll_joint': 0.0, 
            'left_ankle_pitch_joint': -0.2
        }
        
        # Action space: small deviations (offsets) for each joint.
        self.action_space = spaces.Box(
            low=-1.0, high=1.0, shape=(len(self.joint_names),), dtype=np.float32
        )
        # Observation space: joint positions, velocities, base position (3) and base orientation (3 Euler angles).
        # Total observation dimension: 12 joints * 2 + 6 + 6 = 36.
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(36,), dtype=np.float32
        )
        
        # Initialize smoothed action
        self.prev_action = np.zeros(len(self.joint_names), dtype=np.float32)
        
        self.reset()

    def step(self, action):
        smoothing_factor = 0.05  # Adjust for smoother transitions
        self.prev_action = (1 - smoothing_factor) * self.prev_action + smoothing_factor * action
        
        # Compute target positions for each joint.
        joint_indices = [self.joint_indices[name] for name in self.joint_names]
        target_positions = []

        for idx, name in enumerate(self.joint_names):
            joint_index = self.joint_indices[name]
            
            # Get the physical joint limits from the URDF
            joint_info = p.getJointInfo(self.robotId, joint_index)
            joint_lower_limit = joint_info[8]  # Lower limit in radians
            joint_upper_limit = joint_info[9]  # Upper limit in radians

            # Map the action (in [-1, 1]) to the joint's physical range
            scaled_position = joint_lower_limit + (self.prev_action[idx] + 1) / 2 * (joint_upper_limit - joint_lower_limit)
            
            # Append the target position for the joint
            target_positions.append(scaled_position)
        
        # Apply control to all joints at once using setJointMotorControlArray.
        p.setJointMotorControlArray(
            self.robotId,
            jointIndices=joint_indices,
            controlMode=p.POSITION_CONTROL,
            targetPositions=target_positions,
            forces=[200] * len(joint_indices),
        )
        
        p.stepSimulation()
        time.sleep(1./240.)
        self.elapsed_time += 1 / 240  # Assuming 240Hz simulation
        
        # Gather joint positions and velocities.
        joint_positions = []
        joint_velocities = []
        for joint_name in self.joint_names:
            joint_index = self.joint_indices[joint_name]
            joint_state = p.getJointState(self.robotId, joint_index)
            joint_positions.append(joint_state[0])
            joint_velocities.append(joint_state[1])

        # Get base position, orientation and velocities.
        base_pos, base_orient_quat = p.getBasePositionAndOrientation(self.robotId)
        base_orient_euler = p.getEulerFromQuaternion(base_orient_quat)
        base_linear_vel, base_angular_vel = p.getBaseVelocity(self.robotId)
        
        # Concatenate joint states with base state information.
        obs = np.array(
            joint_positions + 
            joint_velocities + 
            list(base_pos) + 
            list(base_orient_euler) +
            list(base_linear_vel) +
            list(base_angular_vel), 
            dtype=np.float32)
        
        # Use the new reward function.
        reward, reward_components = self._calculate_reward()

        # End episode if base falls below a threshold.
        done = (
            bool(base_pos[2] < 0.5) or 
            bool(abs(base_orient_euler[0]) > 1) or 
            bool(abs(base_orient_euler[1]) > 1) or
            self.elapsed_time > 20  # 20s timer
            )
        truncated = False
        
        return obs, reward, done, truncated, {"reward_components": reward_components}

    def reset(self, seed=None, options=None):
        p.resetSimulation()
        p.setGravity(0, 0, -9.81)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.loadURDF("plane.urdf")
        self.robotId = p.loadURDF("urdf/Exoskeleton.urdf", self.startPos, self.startOrientation)
        
        self.elapsed_time = 0  # Timer set to 0

        # Reset smoothed action
        self.prev_action = np.zeros(len(self.joint_names), dtype=np.float32)  
        
        # Update joint indices.
        self.joint_indices = {}
        self.num_joints = p.getNumJoints(self.robotId)
        for i in range(self.num_joints):
            info = p.getJointInfo(self.robotId, i)
            name = info[1].decode('utf-8')
            if name in self.joint_names:
                self.joint_indices[name] = i
        
        # Reset joints to default positions.
        for joint_name in self.joint_names:
            joint_index = self.joint_indices[joint_name]
            p.resetJointState(self.robotId, joint_index, self.default_joint_positions[joint_name])
        
        if self.task == "stand":
            # Apply a random push in the xy plane to the robot's base.
            self.random_push = np.random.uniform(low=-3, high=3, size=2) #+-1.5
            p.resetBaseVelocity(self.robotId, linearVelocity=[self.random_push[0], self.random_push[1], 0])
        
        joint_positions = []
        joint_velocities = []
        for joint_name in self.joint_names:
            joint_index = self.joint_indices[joint_name]
            joint_state = p.getJointState(self.robotId, joint_index)
            joint_positions.append(joint_state[0])
            joint_velocities.append(joint_state[1])
        
        base_pos, base_orient_quat = p.getBasePositionAndOrientation(self.robotId)
        base_orient_euler = p.getEulerFromQuaternion(base_orient_quat)
        base_linear_vel, base_angular_vel = p.getBaseVelocity(self.robotId)
        
        # Concatenate joint states with base state information.
        obs = np.array(
            joint_positions + 
            joint_velocities + 
            list(base_pos) + 
            list(base_orient_euler) +
            list(base_linear_vel) +
            list(base_angular_vel), 
            dtype=np.float32)
        
        return obs, {}
    
    def _calculate_reward(self):
        positions = []
        velocities = []
        torques = []
        
        for joint_name in self.joint_names:
            joint_index = self.joint_indices[joint_name]
            joint_state = p.getJointState(self.robotId, joint_index)
            
            # Store position difference from default
            current_pos = joint_state[0]
            target_pos = self.default_joint_positions[joint_name]
            positions.append(current_pos - target_pos)
            
            # Store velocity and torque
            velocities.append(joint_state[1])
            torques.append(joint_state[3])
        
        base_pos, base_orient_quat = p.getBasePositionAndOrientation(self.robotId)
        base_orient_euler = p.getEulerFromQuaternion(base_orient_quat)

        time_reward = 1.0
        height_reward = np.exp(-50 * (base_pos[2] - 1.215) ** 2)
        upright_reward = np.exp(-5 * (base_orient_euler[0]** 2 + base_orient_euler[1]** 2 + base_orient_euler[2]**2))

        position_penalty = np.exp(-1 * np.sum(np.square(positions))) - 1.0
        velocity_penalty = np.exp(-0.01 * np.sum(np.square(velocities))) - 1.0
        torque_penalty = np.exp(-1e-4 * np.sum(np.square(torques))) - 1.0

        # Walk-specific rewards
        travel_reward = base_pos[0]
        drift_penalty = -1 * (base_pos[1] ** 2)

        if self.task == "stand":
            reward_components = {
                'time': 1.0 * time_reward,
                'height': 1.0 * height_reward,
                'upright': 1.0 * upright_reward,
                'position_penalty': 0.7 * position_penalty,
                'velocity_penalty': 0.3 * velocity_penalty,
                'torque_penalty': 0.3 * torque_penalty,
            }
        
        if self.task == "walk":
            reward_components = {
                'time': 1.0 * time_reward,
                'height': 1.0 * height_reward,
                'upright': 1.0 * upright_reward,
                'position_penalty': 0.7 * position_penalty,
                'velocity_penalty': 0.3 * velocity_penalty,
                'torque_penalty': 0.3 * torque_penalty,
                'travel_reward': 0.5 * travel_reward,
                'drift_penalty': 1.0 * drift_penalty, 
            }
        
        reward = sum(reward_components.values())

        return reward, reward_components
    
    def render(self, mode='human'):
        pass

    def close(self):
        p.disconnect()

from stable_baselines3.common.callbacks import BaseCallback
from collections import defaultdict

class RewardComponentsCallback(BaseCallback):
    def __init__(self, verbose=0):
        super().__init__(verbose)
        self.episode_components = None

    def _init_callback(self):
        self.episode_components = [defaultdict(float) for _ in range(self.training_env.num_envs)]

    def _on_step(self) -> bool:
        for env_idx in range(self.training_env.num_envs):
            info = self.locals['infos'][env_idx]
            if 'reward_components' in info:
                components = info['reward_components']
                for key, value in components.items():
                    self.episode_components[env_idx][key] += value
            if self.locals['dones'][env_idx]:
                if self.episode_components[env_idx]:
                    for key, value in self.episode_components[env_idx].items():
                        self.logger.record(f"reward_components/{key}", value)
                self.episode_components[env_idx] = defaultdict(float)
        return True

In [None]:
import os
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.callbacks import CheckpointCallback, CallbackList
from stable_baselines3.common.monitor import Monitor

model_name="test"

def linear_schedule(initial_value: float, final_value: float) -> callable:
    def func(progress_remaining: float) -> float:
        return final_value + (initial_value - final_value) * progress_remaining
    return func

initial_lr = 2e-3
final_lr = 5e-5
lr_schedule = linear_schedule(initial_lr, final_lr)

def make_env(render=False, task="stand"):
    def _init():
        env = ExoskeletonEnv(render=render, task=task)
        env = Monitor(env)
        return env
    return _init

for _ in range(1):
    num_envs = 24
    env_fns = [make_env(render=False, task="stand") for _ in range(num_envs)]
    env = SubprocVecEnv(env_fns)

    model = PPO("MlpPolicy", 
                env, 
                verbose=1, 
                device='cpu',
                tensorboard_log="./tensorboard",
                policy_kwargs = dict(
                    net_arch=[256, 256]
                    ),
                batch_size=8192,
                learning_rate=lr_schedule,
                gamma=0.995,
                )

    checkpoint_callback = CheckpointCallback(save_freq=10_000, save_path='./checkpoints/', name_prefix=model_name)
    components_callback = RewardComponentsCallback()

    model.learn(total_timesteps=10_000_000,
                callback=CallbackList([checkpoint_callback, components_callback]),
                tb_log_name=model_name 
                )

    model.save(os.path.join("models", model_name))

    env.close()

Using cpu device
Logging to ./tensorboard\test_1
----------------------------------
| reward_components/  |          |
|    height           | 85.6     |
|    position_penalty | -65.4    |
|    time             | 103      |
|    torque_penalty   | -30.9    |
|    upright          | 24.3     |
|    velocity_penalty | -11.2    |
| rollout/            |          |
|    ep_len_mean      | 138      |
|    ep_rew_mean      | 129      |
| time/               |          |
|    fps              | 3191     |
|    iterations       | 1        |
|    time_elapsed     | 15       |
|    total_timesteps  | 49152    |
----------------------------------
-----------------------------------------
| reward_components/      |             |
|    height               | 38.2        |
|    position_penalty     | -71.5       |
|    time                 | 112         |
|    torque_penalty       | -33.6       |
|    upright              | 68.2        |
|    velocity_penalty     | -11.9       |
| rollout/          



In [None]:
from stable_baselines3 import PPO
import time

env = ExoskeletonEnv(render=True, task="stand")

model = PPO.load("test", device='cpu')

num_episodes = 30
for episode in range(num_episodes):
    obs, _ = env.reset()
    done = False
    total_reward = 0
    print(f"Starting episode {episode+1}...")
    while not done:
        # Use deterministic policy for testing.
        action, _ = model.predict(obs, deterministic=False)
        obs, reward, done, truncated, info = env.step(action)
        total_reward += reward
        time.sleep(1./240.)
    print(f"Episode {episode+1} finished with total reward: {total_reward}")

env.close()