From 98fefa2d8550bddbdff8f44004062dc5e72bf56b Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 26 Feb 2020 16:10:35 -0800 Subject: [PATCH 1/3] added observation normalization for policy gradient mujoco, using running-mean-std model --- rlpyt/agents/pg/mujoco.py | 14 ++++++ rlpyt/algos/pg/a2c.py | 3 ++ rlpyt/algos/pg/ppo.py | 2 + .../configs/mujoco/pg/mujoco_a2c.py | 2 +- .../configs/mujoco/pg/mujoco_ppo.py | 4 +- .../mujoco/pg/launch/launch_mujoco_a2c_cpu.py | 6 +++ .../mujoco/pg/launch/launch_mujoco_ppo_cpu.py | 6 +++ .../pg/launch/launch_mujoco_ppo_serial.py | 8 +++- .../mujoco/pg/train/mujoco_ff_a2c_cpu.py | 8 ++-- ..._ppo_serial.py => mujoco_ff_ppo_serial.py} | 0 rlpyt/models/pg/mujoco_ff_model.py | 20 +++++++++ rlpyt/models/pg/mujoco_lstm_model.py | 28 +++++++++--- rlpyt/models/running_mean_std.py | 45 +++++++++++++++++++ 13 files changed, 132 insertions(+), 14 deletions(-) rename rlpyt/experiments/scripts/mujoco/pg/train/{mujoco_ppo_serial.py => mujoco_ff_ppo_serial.py} (100%) create mode 100644 rlpyt/models/running_mean_std.py diff --git a/rlpyt/agents/pg/mujoco.py b/rlpyt/agents/pg/mujoco.py index 70dafd22..1af6f584 100644 --- a/rlpyt/agents/pg/mujoco.py +++ b/rlpyt/agents/pg/mujoco.py @@ -4,13 +4,16 @@ RecurrentGaussianPgAgent, AlternatingRecurrentGaussianPgAgent) from rlpyt.models.pg.mujoco_ff_model import MujocoFfModel from rlpyt.models.pg.mujoco_lstm_model import MujocoLstmModel +from rlpyt.utils.buffer import buffer_to class MujocoMixin: """ Mixin class defining which environment interface properties are given to the model. + Now supports observation normalization, including multi-GPU. """ + _ddp = False # Sets True if data parallel, for normalized obs def make_env_to_model_kwargs(self, env_spaces): """Extract observation_shape and action_size.""" @@ -18,6 +21,17 @@ def make_env_to_model_kwargs(self, env_spaces): return dict(observation_shape=env_spaces.observation.shape, action_size=env_spaces.action.shape[0]) + def update_obs_rms(self, observation): + observation = buffer_to(observation, device=self.device) + if self._ddp: + self.model.module.update_obs_rms(observation) + else: + self.model.update_obs_rms(observation) + + def data_parallel(self, *args, **kwargs): + super().data_parallel(*args, **kwargs) + self._ddp = True + class MujocoFfAgent(MujocoMixin, GaussianPgAgent): diff --git a/rlpyt/algos/pg/a2c.py b/rlpyt/algos/pg/a2c.py index 15dac532..6f3d8556 100644 --- a/rlpyt/algos/pg/a2c.py +++ b/rlpyt/algos/pg/a2c.py @@ -42,6 +42,9 @@ def optimize_agent(self, itr, samples): """ Train the agent on input samples, by one gradient step. """ + if hasattr(self.agent, "update_obs_rms"): + # NOTE: suboptimal--obs sent to device here and in agent(*inputs). + self.agent.update_obs_rms(samples.env.observation) self.optimizer.zero_grad() loss, entropy, perplexity = self.loss(samples) loss.backward() diff --git a/rlpyt/algos/pg/ppo.py b/rlpyt/algos/pg/ppo.py index ae28082d..d28eed60 100644 --- a/rlpyt/algos/pg/ppo.py +++ b/rlpyt/algos/pg/ppo.py @@ -70,6 +70,8 @@ def optimize_agent(self, itr, samples): prev_reward=samples.env.prev_reward, ) agent_inputs = buffer_to(agent_inputs, device=self.agent.device) + if hasattr(self.agent, "update_obs_rms"): + self.agent.update_obs_rms(agent_inputs.observation) return_, advantage, valid = self.process_returns(samples) loss_inputs = LossInputs( # So can slice all. agent_inputs=agent_inputs, diff --git a/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py b/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py index 9571983f..58578dbe 100644 --- a/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py +++ b/rlpyt/experiments/configs/mujoco/pg/mujoco_a2c.py @@ -14,7 +14,7 @@ normalize_advantage=True, ), env=dict(id="Hopper-v3"), - model=dict(), + model=dict(normalize_observation=False), optim=dict(), runner=dict( n_steps=1e6, diff --git a/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py b/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py index d5486846..9d668dbc 100644 --- a/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py +++ b/rlpyt/experiments/configs/mujoco/pg/mujoco_ppo.py @@ -16,10 +16,10 @@ ratio_clip=0.2, normalize_advantage=True, linear_lr_schedule=True, - bootstrap_timelimit=False, + # bootstrap_timelimit=False, ), env=dict(id="Hopper-v3"), - model=dict(), + model=dict(normalize_observation=False), optim=dict(), runner=dict( n_steps=1e6, diff --git a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_a2c_cpu.py b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_a2c_cpu.py index 4102ab00..5d10d34d 100644 --- a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_a2c_cpu.py +++ b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_a2c_cpu.py @@ -22,6 +22,12 @@ keys = [("env", "id")] variant_levels.append(VariantLevel(keys, values, dir_names)) +norm_obs = [True] +values = list(zip(norm_obs)) +dir_names = ["TrueNormObs"] +keys = [("model", "normalize_observation")] +variant_levels.append(VariantLevel(keys, values, dir_names)) + variants, log_dirs = make_variants(*variant_levels) run_experiments( diff --git a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_cpu.py b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_cpu.py index 4f196e75..d6f1962d 100644 --- a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_cpu.py +++ b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_cpu.py @@ -22,6 +22,12 @@ keys = [("env", "id")] variant_levels.append(VariantLevel(keys, values, dir_names)) +norm_obs = [True] +values = list(zip(norm_obs)) +dir_names = ["TrueNormObs"] +keys = [("model", "normalize_observation")] +variant_levels.append(VariantLevel(keys, values, dir_names)) + variants, log_dirs = make_variants(*variant_levels) run_experiments( diff --git a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_serial.py b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_serial.py index 94bb73aa..549c0b3c 100644 --- a/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_serial.py +++ b/rlpyt/experiments/scripts/mujoco/pg/launch/launch_mujoco_ppo_serial.py @@ -7,7 +7,7 @@ affinity_code = encode_affinity( n_cpu_core=2, n_gpu=0, - hyperthread_offset=2, + hyperthread_offset=4, n_socket=1, cpu_per_run=2, ) @@ -22,6 +22,12 @@ keys = [("env", "id")] variant_levels.append(VariantLevel(keys, values, dir_names)) +norm_obs = [True] +values = list(zip(norm_obs)) +dir_names = ["TrueNormObs"] +keys = [("model", "normalize_observation")] +variant_levels.append(VariantLevel(keys, values, dir_names)) + variants, log_dirs = make_variants(*variant_levels) run_experiments( diff --git a/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_a2c_cpu.py b/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_a2c_cpu.py index b485f1bd..22f2e2df 100644 --- a/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_a2c_cpu.py +++ b/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_a2c_cpu.py @@ -2,8 +2,8 @@ import sys from rlpyt.utils.launching.affinity import affinity_from_code -from rlpyt.samplers.cpu.parallel_sampler import CpuParallelSampler -from rlpyt.samplers.cpu.collectors import ResetCollector +from rlpyt.samplers.parallel.cpu.sampler import CpuSampler +from rlpyt.samplers.parallel.cpu.collectors import CpuResetCollector from rlpyt.envs.gym import make as gym_make from rlpyt.algos.pg.a2c import A2C from rlpyt.agents.pg.mujoco import MujocoFfAgent @@ -20,10 +20,10 @@ def build_and_train(slot_affinity_code, log_dir, run_ID, config_key): variant = load_variant(log_dir) config = update_config(config, variant) - sampler = CpuParallelSampler( + sampler = CpuSampler( EnvCls=gym_make, env_kwargs=config["env"], - CollectorCls=ResetCollector, + CollectorCls=CpuResetCollector, **config["sampler"] ) algo = A2C(optim_kwargs=config["optim"], **config["algo"]) diff --git a/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ppo_serial.py b/rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_serial.py similarity index 100% rename from rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ppo_serial.py rename to rlpyt/experiments/scripts/mujoco/pg/train/mujoco_ff_ppo_serial.py diff --git a/rlpyt/models/pg/mujoco_ff_model.py b/rlpyt/models/pg/mujoco_ff_model.py index 36e2a633..710ef837 100644 --- a/rlpyt/models/pg/mujoco_ff_model.py +++ b/rlpyt/models/pg/mujoco_ff_model.py @@ -4,6 +4,7 @@ from rlpyt.utils.tensor import infer_leading_dims, restore_leading_dims from rlpyt.models.mlp import MlpModel +from rlpyt.models.running_mean_std import RunningMeanStdModel class MujocoFfModel(torch.nn.Module): @@ -21,6 +22,9 @@ def __init__( hidden_nonlinearity=torch.nn.Tanh, # Module form. mu_nonlinearity=torch.nn.Tanh, # Module form. init_log_std=0., + normalize_observation=False, + norm_obs_clip=10, + norm_obs_var_clip=1e-6, ): """Instantiate neural net modules according to inputs.""" super().__init__() @@ -44,6 +48,11 @@ def __init__( nonlinearity=hidden_nonlinearity, ) self.log_std = torch.nn.Parameter(init_log_std * torch.ones(action_size)) + if normalize_observation: + self.obs_rms = RunningMeanStdModel(observation_shape) + self.norm_obs_clip = norm_obs_clip + self.norm_obs_var_clip = norm_obs_var_clip + self.normalize_observation = normalize_observation def forward(self, observation, prev_action, prev_reward): """ @@ -56,6 +65,13 @@ def forward(self, observation, prev_action, prev_reward): # Infer (presence of) leading dimensions: [T,B], [B], or []. lead_dim, T, B, _ = infer_leading_dims(observation, self._obs_ndim) + if self.normalize_observation: + obs_var = self.obs_rms.var + if self.norm_obs_var_clip is not None: + obs_var = torch.clamp(obs_var, min=self.norm_obs_var_clip) + observation = torch.clamp((observation - self.obs_rms.mean) / + obs_var.sqrt(), -self.norm_obs_clip, self.norm_obs_clip) + obs_flat = observation.view(T * B, -1) mu = self.mu(obs_flat) v = self.v(obs_flat).squeeze(-1) @@ -65,3 +81,7 @@ def forward(self, observation, prev_action, prev_reward): mu, log_std, v = restore_leading_dims((mu, log_std, v), lead_dim, T, B) return mu, log_std, v + + def update_obs_rms(self, observation): + if self.normalize_observation: + self.obs_rms.update(observation) diff --git a/rlpyt/models/pg/mujoco_lstm_model.py b/rlpyt/models/pg/mujoco_lstm_model.py index 7e280236..31ac1150 100644 --- a/rlpyt/models/pg/mujoco_lstm_model.py +++ b/rlpyt/models/pg/mujoco_lstm_model.py @@ -4,6 +4,7 @@ from rlpyt.utils.tensor import infer_leading_dims, restore_leading_dims from rlpyt.models.mlp import MlpModel +from rlpyt.models.running_mean_std import RunningMeanStdModel from rlpyt.utils.collections import namedarraytuple RnnState = namedarraytuple("RnnState", ["h", "c"]) @@ -21,6 +22,9 @@ def __init__( hidden_sizes=None, # None for default (see below). lstm_size=256, nonlinearity=torch.nn.ReLU, + normalize_observation=False, + norm_obs_clip=10, + norm_obs_var_clip=1e-6, ): super().__init__() self._obs_n_dim = len(observation_shape) @@ -36,6 +40,13 @@ def __init__( mlp_output_size = hidden_sizes[-1] if hidden_sizes else mlp_input_size self.lstm = torch.nn.LSTM(mlp_output_size + action_size + 1, lstm_size) self.head = torch.nn.Linear(lstm_size, action_size * 2 + 1) + if normalize_observation: + self.obs_rms = RunningMeanStdModel(observation_shape) + self.norm_obs_clip = norm_obs_clip + self.norm_obs_var_clip = norm_obs_var_clip + self.normalize_observation = normalize_observation + + def forward(self, observation, prev_action, prev_reward, init_rnn_state): """ @@ -46,15 +57,16 @@ def forward(self, observation, prev_action, prev_reward, init_rnn_state): not given. Used both in sampler and in algorithm (both via the agent). Also returns the next RNN state. """ - - - - """Feedforward layers process as [T*B,H]. Return same leading dims as - input, can be [T,B], [B], or [].""" - # Infer (presence of) leading dimensions: [T,B], [B], or []. lead_dim, T, B, _ = infer_leading_dims(observation, self._obs_n_dim) + if self.normalize_observation: + obs_var = self.obs_rms.var + if self.norm_obs_var_clip is not None: + obs_var = torch.clamp(obs_var, min=self.norm_obs_var_clip) + observation = torch.clamp((observation - self.obs_rms.mean) / + obs_var.sqrt(), -self.norm_obs_clip, self.norm_obs_clip) + mlp_out = self.mlp(observation.view(T * B, -1)) lstm_input = torch.cat([ mlp_out.view(T, B, -1), @@ -74,3 +86,7 @@ def forward(self, observation, prev_action, prev_reward, init_rnn_state): next_rnn_state = RnnState(h=hn, c=cn) return mu, log_std, v, next_rnn_state + + def update_obs_rms(self, observation): + if self.normalize_observation: + self.obs_rms.update(observation) diff --git a/rlpyt/models/running_mean_std.py b/rlpyt/models/running_mean_std.py new file mode 100644 index 00000000..39ea24b4 --- /dev/null +++ b/rlpyt/models/running_mean_std.py @@ -0,0 +1,45 @@ + +import torch +import torch.distributed as dist +from rlpyt.utils.tensor import infer_leading_dims + + +class RunningMeanStdModel(torch.nn.Module): + + """Adapted from OpenAI baselines. Maintains a running estimate of mean + and variance of data along each dimension, accessible in the `mean` and + `var` attributes. Supports multi-GPU training by all-reducing statistics + across GPUs.""" + + def __init__(self, shape): + super().__init__() + self.register_buffer("mean", torch.zeros(shape)) + self.register_buffer("var", torch.ones(shape)) + self.register_buffer("count", torch.zeros(())) + self.shape = shape + + def update(self, x): + _, T, B, _ = infer_leading_dims(x, len(self.shape)) + x = x.view(T * B, *self.shape) + batch_mean = x.mean(dim=0) + batch_var = x.var(dim=0, unbiased=False) + batch_count = T * B + if dist.is_initialized(): # Assume need all-reduce. + mean_var = torch.stack([batch_mean, batch_var]) + dist.all_reduce(mean_var) + world_size = dist.get_world_size() + mean_var /= world_size + batch_count *= world_size + batch_mean, batch_var = mean_var[0], mean_var[1] + if self.count == 0: + self.mean[:] = batch_mean + self.var[:] = batch_var + else: + delta = batch_mean - self.mean + total = self.count + batch_count + self.mean[:] = self.mean + delta * batch_count / total + m_a = self.var * self.count + m_b = batch_var * batch_count + M2 = m_a + m_b + delta ** 2 * self.count * batch_count / total + self.var[:] = M2 / total + self.count += batch_count From d288f2d0f9ddc96376a2ea8bf1c8bca926f7cf4b Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 26 Feb 2020 16:31:03 -0800 Subject: [PATCH 2/3] fixed calculation for n_itr to round up to next log itr --- rlpyt/runners/minibatch_rl.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rlpyt/runners/minibatch_rl.py b/rlpyt/runners/minibatch_rl.py index 6850b930..1b79e7cc 100644 --- a/rlpyt/runners/minibatch_rl.py +++ b/rlpyt/runners/minibatch_rl.py @@ -112,8 +112,9 @@ def get_n_itr(self): # Log at least as often as requested (round down itrs): log_interval_itrs = max(self.log_interval_steps // self.itr_batch_size, 1) - # FIXME: To run at least as many steps as requested, round up log interval? - n_itr = math.ceil(self.n_steps / self.log_interval_steps) * log_interval_itrs + n_itr = self.n_steps // self.itr_batch_size + if n_itr % log_interval_itrs > 0: # Keep going to next log itr. + n_itr += log_interval_itrs - (n_itr % log_interval_itrs) self.log_interval_itrs = log_interval_itrs self.n_itr = n_itr logger.log(f"Running {n_itr} iterations of minibatch RL.") From 7b0550e6b2fd10f89c84d93a309bcba1d0007221 Mon Sep 17 00:00:00 2001 From: Adam Date: Wed, 26 Feb 2020 16:58:37 -0800 Subject: [PATCH 3/3] quick fix rewards as numpy object in gym wrapper --- rlpyt/envs/gym.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rlpyt/envs/gym.py b/rlpyt/envs/gym.py index be2e6031..abd46432 100644 --- a/rlpyt/envs/gym.py +++ b/rlpyt/envs/gym.py @@ -72,6 +72,8 @@ def step(self, action): else: info["timeout"] = False info = info_to_nt(info) + if isinstance(r, float): + r = np.dtype("float32").type(r) # Scalar float32. return EnvStep(obs, r, d, info) def reset(self):