In [1]:
import numpy as np
from gym.envs.mujoco import HopperEnv

In [2]:
import gym
from env.custom_hopper import *
from stable_baselines3 import SAC
from stable_baselines3.common.evaluation import evaluate_policy

In [3]:
class RandomizedHopperEnv(HopperEnv):
    def __init__(self, randomize_params=True):
        super().__init__()
        self.randomize_params = randomize_params
        self.original_masses = self.model.body_mass.copy()
        self.original_frictions = self.model.geom_friction.copy()

    def reset(self):
        if self.randomize_params:
            self._randomize_parameters()
        return super().reset()

    def _randomize_parameters(self):
        # Randomize masses
        mass_multipliers = np.random.uniform(0.8, 1.2, size=self.model.body_mass.shape)
        self.model.body_mass[:] = self.original_masses * mass_multipliers

        # Randomize friction
        friction_multipliers = np.random.uniform(0.8, 1.2, size=self.model.geom_friction.shape)
        self.model.geom_friction[:] = self.original_frictions * friction_multipliers

In [4]:
class AutoDR:
    def __init__(self, env, performance_threshold, adaptation_rate=0.1):
        self.init_env = env
        self.env = env
        self.performance_threshold = performance_threshold
        self.adaptation_rate = adaptation_rate
        self.randomization_ranges = {
            'mass': (0.8, 1.2),
            'friction': (0.8, 1.2)
        }

    def update_ranges(self, performance):
        if performance > self.performance_threshold:
            # Increase randomization range
            self._adjust_ranges(self.adaptation_rate)
            # only call the randomize_env function if the ranges have been updated
            print("updated ranges: ", self.randomization_ranges)
            
            print("Mass were:    ", self.env.model.body_mass[2:])
            self.randomize_env()

            print("Mass are now: ", self.env.model.body_mass[2:])
        # else:
            # Decrease randomization range
            # if we are lower than the threshold, keep the weights as they are -> set the range to (1, 1)
            # self.randomization_ranges = {
                # 'mass': (1, 1),
                # 'friction': (1, 1)
            # }
            
        
        

    def _adjust_ranges(self, adjustment):
        for key in self.randomization_ranges:
            lower, upper = self.randomization_ranges[key]
            new_lower = min(1, max(0, lower - adjustment))
            new_upper = max(1, min(2, upper + adjustment))
            self.randomization_ranges[key] = (new_lower, new_upper)
        
    
    def randomize_env(self):
        # self.env.model.body_mass[2] = self.env.model.body_mass[2] * np.random.uniform(*self.randomization_ranges['mass'])
        self.env.model.body_mass[2] = self.init_env.model.body_mass[2] * np.random.uniform(*self.randomization_ranges['mass'])
        self.env.model.body_mass[3] = self.init_env.model.body_mass[3] * np.random.uniform(*self.randomization_ranges['mass'])
        self.env.model.body_mass[4] = self.init_env.model.body_mass[4] * np.random.uniform(*self.randomization_ranges['mass'])
        

    def get_randomized_env(self):
        self.env.randomize_params = True
        self.env._randomize_parameters = self._custom_randomize
        return self.env

    def _custom_randomize(self):
        mass_range = self.randomization_ranges['mass']
        friction_range = self.randomization_ranges['friction']

        mass_multipliers = np.random.uniform(*mass_range, size=self.env.model.body_mass.shape)
        self.env.model.body_mass[:] = self.env.original_masses * mass_multipliers

        friction_multipliers = np.random.uniform(*friction_range, size=self.env.model.geom_friction.shape)
        self.env.model.geom_friction[:] = self.env.original_frictions * friction_multipliers

In [5]:
env = gym.make('CustomHopper-source-v0')
# Initialize AutoDR
auto_dr = AutoDR(env, performance_threshold = 250)  # Adjust the threshold as needed

In [8]:
env.observation_space

Box([-inf -inf -inf -inf -inf -inf -inf -inf -inf -inf -inf], [inf inf inf inf inf inf inf inf inf inf inf], (11,), float64)

In [64]:
model = SAC("MlpPolicy", auto_dr.get_randomized_env(), verbose=1)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [65]:
total_timesteps = 100000
eval_interval = 100
timesteps = 0

while timesteps < total_timesteps:
    # Train for a bit
    model.learn(total_timesteps=eval_interval, reset_num_timesteps=False)
    timesteps += eval_interval

    
    # Evaluate the model
    mean_reward, _ = evaluate_policy(model, model.get_env(), n_eval_episodes=10)
    print(f"Timesteps: {timesteps}, Mean reward: {mean_reward:.2f}")

    # Update AutoDR ranges
    auto_dr.update_ranges(mean_reward)


Timesteps: 100, Mean reward: 21.46
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 26.8     |
|    ep_rew_mean     | 19.8     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 72       |
|    time_elapsed    | 0        |
|    total_timesteps | 119      |
| train/             |          |
|    actor_loss      | -3.24    |
|    critic_loss     | 2.33     |
|    ent_coef        | 0.995    |
|    ent_coef_loss   | -0.0258  |
|    learning_rate   | 0.0003   |
|    n_updates       | 18       |
---------------------------------
Timesteps: 200, Mean reward: 39.96
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 24.6     |
|    ep_rew_mean     | 19.8     |
| time/              |          |
|    episodes        | 8        |
|    fps             | 64       |
|    time_elapsed    | 0        |
|    total_timesteps | 218      |
| train/             |          |
|    actor_l