In [16]:
import os
import numpy as np
from myosuite.utils import gym
from stable_baselines3 import PPO
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback
import torch
import skvideo.io

# Custom reward wrapper to modify the cost function
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env, distance_weight=1.0, effort_weight=0.1, stability_weight=0.05):
        super().__init__(env)
        self.distance_weight = distance_weight  # Reward forward movement
        self.effort_weight = effort_weight      # Penalize control effort
        self.stability_weight = stability_weight  # Penalize instability

    def step(self, action):
        # Handle 5-tuple return from MyoSuite's gym API
        obs, reward, terminated, truncated, info = self.env.step(action)
        
        # Unwrap to base environment for sim access
        base_env = self.env
        while hasattr(base_env, 'env'):
            base_env = base_env.env

        # Custom reward: forward distance - effort - instability
        forward_distance = base_env.sim.data.body('pelvis').xpos[0]  # X-position of pelvis
        effort = np.sum(np.square(action))  # Penalize large actions
        stability_penalty = np.sum(np.abs(base_env.sim.data.qvel[1:]))  # Exclude forward velocity
        custom_reward = (
            self.distance_weight * forward_distance
            - self.effort_weight * effort
            - self.stability_weight * stability_penalty
        )
        # Return 5-tuple to match modern SB3/Gymnasium expectation
        return obs, custom_reward, terminated, truncated, info

# Define custom neural network architecture
class CustomPolicy(ActorCriticPolicy):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.net_arch = dict(pi=[256, 128, 64], vf=[256, 128, 64])  # Policy and value network layers
        self.activation_fn = torch.nn.Tanh  # Activation function (Tanh, ReLU, etc.)

# Training function with customizable parameters
def train_ppo(env_id="myoLegWalk-v0", total_timesteps=1_000_000):
    print("Initializing environment...")
    base_env = gym.make(env_id, reset_type='random')
    env = CustomRewardWrapper(base_env, distance_weight=1.0, effort_weight=0.1, stability_weight=0.05)
    print("Environment initialized.")

    # Optional: Vectorize for faster training
    # env = make_vec_env(lambda: CustomRewardWrapper(gym.make(env_id)), n_envs=4)

    # PPO hyperparameters
    ppo_params = {
        "learning_rate": 3e-4,       # Learning rate
        "n_steps": 2048,             # Steps per update
        "batch_size": 64,            # Minibatch size
        "n_epochs": 10,              # Epochs per update
        "gamma": 0.99,               # Discount factor
        "gae_lambda": 0.95,          # GAE lambda
        "clip_range": 0.2,           # Clipping parameter
        "ent_coef": 0.01,            # Entropy coefficient
        "vf_coef": 0.5,              # Value function coefficient
        "max_grad_norm": 0.5         # Gradient clipping
    }

    # Device setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Initialize PPO with custom policy and parameters
    model = PPO(
        policy=CustomPolicy,
        env=env,
        verbose=1,
        tensorboard_log="./ppo_myo_tensorboard/",
        device=device,
        **ppo_params
    )
    print("PPO model initialized.")

    # Callback for evaluation during training
    eval_env = gym.make(env_id)
    eval_callback = EvalCallback(eval_env, eval_freq=10000, n_eval_episodes=5, verbose=1)
    print("EvalCallback initialized.")

    # Train the model
    print("Starting training...")
    model.learn(total_timesteps=total_timesteps, callback=eval_callback)
    print("Training completed.")

    # Save the model
    model.save("ppo_custom_leg_model")
    print("Model saved.")
    return model

# Testing function for walking behavior
def test_model(model, env_id="myoLegWalk-v0"):
    print("Initializing test environment...")
    env = gym.make(env_id, reset_type='random')
    env.reset()
    frames = []

    print("Testing walking behavior")
    for ep in range(5):  # Run 5 episodes
        print(f"Episode {ep + 1} of 5")
        obs = env.reset()
        for _ in range(200):  # 200 steps per episode
            frame = env.render(mode='rgb_array', width=400, height=400)
            frames.append(frame[::-1, :, :])  # Flip vertically if needed
            o = env.get_obs()
            a = model.predict(o)[0]
            # Handle 5-tuple return in testing too
            next_o, r, terminated, truncated, info = env.step(a)
            done = terminated or truncated
            if done:
                break
    env.close()

    # Save video
    os.makedirs('videos', exist_ok=True)
    skvideo.io.vwrite('videos/leg_walk.mp4', np.asarray(frames), outputdict={"-pix_fmt": "yuv420p"})
    print("Video saved as 'videos/leg_walk.mp4'")

# Main execution
if __name__ == "__main__":
    # Train the model
    trained_model = train_ppo(env_id="myoLegWalk-v0", total_timesteps=1_000_000)
    
    # Test the trained model
    test_model(trained_model, env_id="myoLegWalk-v0")

Initializing environment...
Environment initialized.
Using device: cuda
Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
PPO model initialized.
EvalCallback initialized.
Starting training...
Logging to ./ppo_myo_tensorboard/PPO_11
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 36.2     |
|    ep_rew_mean     | -225     |
| time/              |          |
|    fps             | 235      |
|    iterations      | 1        |
|    time_elapsed    | 8        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 36.1        |
|    ep_rew_mean          | -224        |
| time/                   |             |
|    fps                  | 229         |
|    iterations           | 2           |
|    time_elapsed         | 17          |
|    total_timesteps      | 4096        |
| tra



Eval num_timesteps=10000, episode_reward=141.55 +/- 0.00
Episode length: 25.00 +/- 0.00
----------------------------------------
| eval/                   |            |
|    mean_ep_length       | 25         |
|    mean_reward          | 142        |
| time/                   |            |
|    total_timesteps      | 10000      |
| train/                  |            |
|    approx_kl            | 0.03444995 |
|    clip_fraction        | 0.399      |
|    clip_range           | 0.2        |
|    entropy_loss         | -114       |
|    explained_variance   | -9.41e-05  |
|    learning_rate        | 0.0003     |
|    loss                 | 1.39e+03   |
|    n_updates            | 40         |
|    policy_gradient_loss | -0.102     |
|    std                  | 1.01       |
|    value_loss           | 2.94e+03   |
----------------------------------------
New best mean reward!
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 36.2     |
|    ep_r

TypeError: render() got an unexpected keyword argument 'mode'