# Simulation Environment Setup - PyBullet Installation & Simulation Script
Include code to install and import PyBullet, load the humanoid URDF on a plane, and run a basic simulation loop to verify dynamics and model behavior.

In [None]:
# Install PyBullet
%pip install pybullet

In [None]:
!pip install pybullet

In [None]:
# Import PyBullet
import pybullet as p
import pybullet_data
import time

# Connect to PyBullet in GUI mode
p.connect(p.GUI)

# Set the search path for URDF files
p.setAdditionalSearchPath(pybullet_data.getDataPath())

# Load the plane URDF
plane_id = p.loadURDF("plane.urdf")

# Load the humanoid URDF
humanoid_id = p.loadURDF("humanoid.urdf", [0, 0, 1])  # Position the humanoid above the plane

# Set gravity for the simulation
p.setGravity(0, 0, -9.8)

# Run a basic simulation loop
for _ in range(1000):  # Run for 1000 simulation steps
    p.stepSimulation()  # Step the simulation
    time.sleep(1.0 / 240.0)  # Sleep to match real-time simulation speed

# Disconnect from PyBullet
p.disconnect()

# Custom Gym Environment Development - Environment Creation & Integration
Develop a Gym environment by subclassing gym.Env and implementing reset(), step(action), and optionally render(). Integrate the PyBullet simulation to define observation and action spaces along with a reward function.

In [None]:
# Import necessary libraries
import gym
from gym import spaces
import numpy as np

class HumanoidBipedalEnv(gym.Env):
    def __init__(self):
        super(HumanoidBipedalEnv, self).__init__()
        
        # Connect to PyBullet in DIRECT mode for headless simulation
        self.client = p.connect(p.DIRECT)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        
        # Load the plane and humanoid URDFs
        self.plane_id = p.loadURDF("plane.urdf")
        self.humanoid_id = p.loadURDF("humanoid.urdf", [0, 0, 1])
        
        # Set gravity
        p.setGravity(0, 0, -9.8)
        
        # Define observation space (e.g., joint angles, velocities)
        self.num_joints = p.getNumJoints(self.humanoid_id)
        obs_high = np.array([np.pi] * self.num_joints + [np.inf] * self.num_joints, dtype=np.float32)
        self.observation_space = spaces.Box(-obs_high, obs_high, dtype=np.float32)
        
        # Define action space (e.g., joint torques)
        action_high = np.array([1.0] * self.num_joints, dtype=np.float32)
        self.action_space = spaces.Box(-action_high, action_high, dtype=np.float32)
        
    def reset(self):
        # Reset the simulation
        p.resetSimulation(self.client)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        self.plane_id = p.loadURDF("plane.urdf")
        self.humanoid_id = p.loadURDF("humanoid.urdf", [0, 0, 1])
        p.setGravity(0, 0, -9.8)
        
        # Return the initial observation
        return self._get_observation()
    
    def step(self, action):
        # Apply actions to the humanoid joints
        for joint_index in range(self.num_joints):
            p.setJointMotorControl2(
                bodyUniqueId=self.humanoid_id,
                jointIndex=joint_index,
                controlMode=p.TORQUE_CONTROL,
                force=action[joint_index]
            )
        
        # Step the simulation
        p.stepSimulation()
        
        # Calculate reward
        reward = self._calculate_reward()
        
        # Check if the episode is done
        done = self._check_done()
        
        # Get the next observation
        observation = self._get_observation()
        
        return observation, reward, done, {}
    
    def render(self, mode="human"):
        # Optional: Implement rendering logic if needed
        pass
    
    def _get_observation(self):
        # Get joint states (angles and velocities)
        joint_states = p.getJointStates(self.humanoid_id, range(self.num_joints))
        joint_angles = np.array([state[0] for state in joint_states], dtype=np.float32)
        joint_velocities = np.array([state[1] for state in joint_states], dtype=np.float32)
        return np.concatenate([joint_angles, joint_velocities])
    
    def _calculate_reward(self):
        # Example reward function: forward movement and stability
        base_position, _ = p.getBasePositionAndOrientation(self.humanoid_id)
        forward_reward = base_position[0]  # Reward for moving forward
        stability_penalty = 0.0  # Add penalties for instability if needed
        return forward_reward - stability_penalty
    
    def _check_done(self):
        # Example termination condition: humanoid falls
        base_position, _ = p.getBasePositionAndOrientation(self.humanoid_id)
        if base_position[2] < 0.5:  # If the humanoid falls below a threshold
            return True
        return False
    
    def close(self):
        # Disconnect from PyBullet
        p.disconnect(self.client)

# Instantiate the environment to test its creation
env = HumanoidBipedalEnv()
obs = env.reset()
print("Initial Observation:", obs)

# Training the RL Agent - Training Script with Stable-Baselines3
Implement training code using a suitable RL algorithm (e.g., PPO or SAC) from Stable-Baselines3. Include model instantiation, training loops, checkpoint saving, and logging of performance metrics.

In [None]:
# Install Stable-Baselines3
!pip install stable-baselines3[extra]

In [None]:
# Import necessary libraries
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import CheckpointCallback
import os

# Create a vectorized environment for training
env = make_vec_env(lambda: HumanoidBipedalEnv(), n_envs=4)

# Define the directory to save model checkpoints
checkpoint_dir = "./checkpoints/"
os.makedirs(checkpoint_dir, exist_ok=True)

# Define a callback to save model checkpoints
checkpoint_callback = CheckpointCallback(save_freq=10000, save_path=checkpoint_dir, name_prefix="rl_model")

# Instantiate the PPO model
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="./ppo_humanoid_tensorboard/")

# Train the model
model.learn(total_timesteps=100000, callback=checkpoint_callback)

# Save the final model
model.save("ppo_humanoid_final")

# Load the trained model for evaluation (optional)
trained_model = PPO.load("ppo_humanoid_final")

# Evaluate the trained model
obs = env.reset()
for _ in range(1000):  # Run for 1000 steps
    action, _states = trained_model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()

# Evaluation and Visualization - Performance Evaluation & Plotting
Write evaluation loops to test the trained model in simulation. Use matplotlib to visualize reward progression, state observations, and performance metrics.

In [None]:
# Import necessary libraries for evaluation and visualization
import matplotlib.pyplot as plt

# Evaluate the trained model
def evaluate_model(env, model, num_episodes=10):
    rewards = []
    for episode in range(num_episodes):
        obs = env.reset()
        episode_reward = 0
        done = False
        while not done:
            action, _states = model.predict(obs)
            obs, reward, done, info = env.step(action)
            episode_reward += reward
        rewards.append(episode_reward)
    return rewards

# Run evaluation
num_episodes = 10
evaluation_rewards = evaluate_model(env, trained_model, num_episodes=num_episodes)

# Plot reward progression
plt.figure(figsize=(10, 6))
plt.plot(range(1, num_episodes + 1), evaluation_rewards, marker='o', label="Episode Reward")
plt.title("Evaluation Rewards Over Episodes")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.grid()
plt.legend()
plt.show()

# Visualize state observations (example: joint angles over time for one episode)
obs = env.reset()
joint_angles_over_time = []
for _ in range(1000):  # Run for 1000 steps
    action, _states = trained_model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    joint_angles = obs[:env.num_joints]  # Extract joint angles from observation
    joint_angles_over_time.append(joint_angles)
    if dones:
        break

# Convert joint angles to a NumPy array for plotting
joint_angles_over_time = np.array(joint_angles_over_time)

# Plot joint angles over time
plt.figure(figsize=(12, 8))
for joint_idx in range(env.num_joints):
    plt.plot(joint_angles_over_time[:, joint_idx], label=f"Joint {joint_idx}")
plt.title("Joint Angles Over Time")
plt.xlabel("Time Step")
plt.ylabel("Joint Angle (radians)")
plt.grid()
plt.legend()
plt.show()

# Advanced Integration (Optional) - ROS Integration Skeleton Code
Provide skeleton code to facilitate the integration of the simulation model with ROS, including basic setup for handling hardware communication and the simulation-to-real transfer process.

In [None]:
# Install ROS-related Python libraries
!pip install rospy rospkg

# Import necessary libraries for ROS integration
import rospy
from std_msgs.msg import Float32MultiArray
from sensor_msgs.msg import JointState

class ROSIntegration:
    def __init__(self):
        # Initialize the ROS node
        rospy.init_node('humanoid_bipedal_ros', anonymous=True)
        
        # Publisher for sending joint commands
        self.joint_command_pub = rospy.Publisher('/humanoid/joint_commands', Float32MultiArray, queue_size=10)
        
        # Subscriber for receiving joint states
        self.joint_state_sub = rospy.Subscriber('/humanoid/joint_states', JointState, self.joint_state_callback)
        
        # Placeholder for joint states
        self.joint_states = None

    def joint_state_callback(self, msg):
        # Callback to update joint states
        self.joint_states = msg

    def send_joint_commands(self, commands):
        # Publish joint commands to the robot
        command_msg = Float32MultiArray()
        command_msg.data = commands
        self.joint_command_pub.publish(command_msg)

    def get_joint_states(self):
        # Return the latest joint states
        return self.joint_states

# Example usage of the ROSIntegration class
if __name__ == "__main__":
    ros_integration = ROSIntegration()
    
    # Example loop to send commands and read joint states
    rate = rospy.Rate(10)  # 10 Hz
    while not rospy.is_shutdown():
        # Example: Send zero torques to all joints
        num_joints = 10  # Replace with the actual number of joints
        commands = [0.0] * num_joints
        ros_integration.send_joint_commands(commands)
        
        # Retrieve and print joint states
        joint_states = ros_integration.get_joint_states()
        if joint_states:
            rospy.loginfo(f"Joint States: {joint_states}")
        
        rate.sleep()