Skip to content

Commit

Permalink
Reinitialize GAIL dataloader after saving (#847)
Browse files Browse the repository at this point in the history
* Reinitialize GAIL dataloader after saving

* Add test for GAIL + Callback

* Custom pickling

* Remove unused imports

* Remove blank lines

* Reformat

* Excluded attributes as class constants

* Use temp paths

* Tmp path for the callback

* Fix for python 3.5

* Fix typo
  • Loading branch information
araffin committed May 14, 2020
1 parent 6347da3 commit c4c31cb
Show file tree
Hide file tree
Showing 39 changed files with 107 additions and 80 deletions.
1 change: 1 addition & 0 deletions docs/misc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Bug Fixes:
- Fix reward threshold in ``test_identity.py``
- Partially fix tensorboard indexing for PPO2 (@enderdead)
- Fixed potential bug in ``DummyVecEnv`` where ``copy()`` was used instead of ``deepcopy()``
- Fixed a bug in ``GAIL`` where the dataloader was not available after saving, causing an error when using ``CheckpointCallback``

Deprecations:
^^^^^^^^^^^^^
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/a2c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="A
masks.reshape((self.n_envs, self.n_steps)),
writer, self.num_timesteps)


if self.verbose >= 1 and (update % log_interval == 0 or update == 1):
explained_var = explained_variance(values, rewards)
logger.record_tabular("nupdates", update)
Expand Down
4 changes: 1 addition & 3 deletions stable_baselines/acer/acer_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ def __init__(self, policy, env, gamma=0.99, n_steps=20, num_procs=None, q_coef=0
_init_setup_model=_init_setup_model, policy_kwargs=policy_kwargs,
seed=seed, n_cpu_tf_sess=n_cpu_tf_sess)


if _init_setup_model:
self.setup_model()

Expand Down Expand Up @@ -345,7 +344,7 @@ def custom_getter(getter, name, *args, **kwargs):
rho_i = tf.reshape(f_i, [-1, 1]) / (self.mu_ph + eps)
rho_i_ = tf.reshape(f_i_, [-1, 1]) / (self.mu_ph + eps)

qret = q_retrace(self.reward_ph, self.done_ph, q_i, value, tf.pow(rho_i, 1/self.n_act),
qret = q_retrace(self.reward_ph, self.done_ph, q_i, value, tf.pow(rho_i, 1 / self.n_act),
self.n_envs, self.n_steps, self.gamma)
else:
# strip off last step
Expand Down Expand Up @@ -606,7 +605,6 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="A
names_ops, values_ops = self._train_step(obs, actions, rewards, dones, mus, self.initial_state, masks,
self.num_timesteps, writer)


if self.verbose >= 1 and (int(steps / self.n_batch) % log_interval == 0):
logger.record_tabular("total_timesteps", self.num_timesteps)
logger.record_tabular("fps", int(steps / (time.time() - t_start)))
Expand Down
2 changes: 0 additions & 2 deletions stable_baselines/acktr/acktr.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
import warnings

import numpy as np
import tensorflow as tf
from gym.spaces import Box, Discrete

Expand Down Expand Up @@ -364,7 +363,6 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="A
masks.reshape((self.n_envs, self.n_steps)),
writer, self.num_timesteps)


if self.verbose >= 1 and (update % log_interval == 0 or update == 1):
explained_var = explained_variance(values, returns)
logger.record_tabular("nupdates", update)
Expand Down
1 change: 1 addition & 0 deletions stable_baselines/bench/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Monitor(gym.Wrapper):
"""
EXT = "monitor.csv"
file_handler = None

def __init__(self,
env: gym.Env,
filename: Optional[str],
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/common/atari_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(self, env, skip=4):
"""
gym.Wrapper.__init__(self, env)
# most recent raw observations (for max pooling across time steps)
self._obs_buffer = np.zeros((2,)+env.observation_space.shape, dtype=env.observation_space.dtype)
self._obs_buffer = np.zeros((2,) + env.observation_space.shape, dtype=env.observation_space.dtype)
self._skip = skip

def step(self, action):
Expand Down
3 changes: 2 additions & 1 deletion stable_baselines/common/base_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ def action_probability(self, observation, state=None, mask=None, actions=None, l
std = np.exp(logstd)

n_elts = np.prod(mean.shape[1:]) # first dimension is batch size
log_normalizer = n_elts/2 * np.log(2 * np.pi) + 1/2 * np.sum(logstd, axis=1)
log_normalizer = n_elts / 2 * np.log(2 * np.pi) + 0.5 * np.sum(logstd, axis=1)

# Diagonal Gaussian action probability, for every action
logprob = -np.sum(np.square(actions - mean) / (2 * std), axis=1) - log_normalizer
Expand Down Expand Up @@ -1033,6 +1033,7 @@ def load(cls, load_path, env=None, custom_objects=None, **kwargs):

return model


class _UnvecWrapper(VecEnvWrapper):
def __init__(self, venv):
"""
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/common/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def next_batch(self, batch_size):

data_map = dict()
for key in self.data_map:
data_map[key] = self.data_map[key][cur_id:cur_id+cur_batch_size]
data_map[key] = self.data_map[key][cur_id:cur_id + cur_batch_size]
return data_map

def iterate_once(self, batch_size):
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/common/mpi_moments.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def mpi_moments(arr, axis=0, comm=None, keepdims=False):
assert count1 == count
std = np.sqrt(meansqdiff)
if not keepdims:
newshape = mean.shape[:axis] + mean.shape[axis+1:]
newshape = mean.shape[:axis] + mean.shape[axis + 1:]
mean = mean.reshape(newshape)
std = std.reshape(newshape)
return mean, std, count
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/common/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def traj_segment_generator(policy, env, horizon, reward_giver=None, gail=False,

cur_ep_ret = 0 # return in current episode
current_it_len = 0 # len of current iteration
current_ep_len = 0 # len of current episode
current_ep_len = 0 # len of current episode
cur_ep_true_ret = 0
ep_true_rets = []
ep_rets = [] # returns of completed episodes in this segment
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/common/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,4 @@ def value_steps(self, steps):
:param steps: (int) The current number of iterations
:return: (float) the value for the current number of iterations
"""
return self.initial_value * self.schedule(steps / self.nvalues)
return self.initial_value * self.schedule(steps / self.nvalues)
2 changes: 1 addition & 1 deletion stable_baselines/common/tf_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,4 @@ def conv_to_fc(input_tensor):
"""
n_hidden = np.prod([v.value for v in input_tensor.get_shape()[1:]])
input_tensor = tf.reshape(input_tensor, [-1, n_hidden])
return input_tensor
return input_tensor
4 changes: 2 additions & 2 deletions stable_baselines/common/tf_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def __init__(self, inputs, outputs, updates, givens):
:param givens: (dict) the values known for the output
"""
for inpt in inputs:
if not hasattr(inpt, 'make_feed_dict') and not (isinstance(inpt, tf.Tensor)and len(inpt.op.inputs) == 0):
if not hasattr(inpt, 'make_feed_dict') and not (isinstance(inpt, tf.Tensor) and len(inpt.op.inputs) == 0):
assert False, "inputs should all be placeholders, constants, or have a make_feed_dict method"
self.inputs = inputs
updates = updates or []
Expand Down Expand Up @@ -502,7 +502,7 @@ def total_episode_reward_logger(rew_acc, rewards, masks, writer, steps):
summary = tf.Summary(value=[tf.Summary.Value(tag="episode_reward", simple_value=rew_acc[env_idx])])
writer.add_summary(summary, steps + dones_idx[0, 0])
for k in range(1, len(dones_idx[:, 0])):
rew_acc[env_idx] = sum(rewards[env_idx, dones_idx[k-1, 0]:dones_idx[k, 0]])
rew_acc[env_idx] = sum(rewards[env_idx, dones_idx[k - 1, 0]:dones_idx[k, 0]])
summary = tf.Summary(value=[tf.Summary.Value(tag="episode_reward", simple_value=rew_acc[env_idx])])
writer.add_summary(summary, steps + dones_idx[k, 0])
rew_acc[env_idx] = sum(rewards[env_idx, dones_idx[-1, 0]:])
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/common/tile_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ def tile_images(img_nhwc):
# img_Hh_Ww_c
out_image = out_image.reshape(new_height * height, new_width * width, n_channels)
return out_image

1 change: 0 additions & 1 deletion stable_baselines/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D
episode_rewards_history = deque(maxlen=100)
episode_successes = []


with self.sess.as_default(), self.graph.as_default():
# Prepare everything.
self._reset()
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/ddpg/policies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import tensorflow as tf
import numpy as np
from gym.spaces import Box

from stable_baselines.common.policies import BasePolicy, nature_cnn, register_policy
Expand Down
32 changes: 28 additions & 4 deletions stable_baselines/gail/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class ExpertDataset(object):
:param sequential_preprocessing: (bool) Do not use subprocess to preprocess
the data (slower but use less memory for the CI)
"""
# Excluded attribute when pickling the object
EXCLUDED_KEYS = {'dataloader', 'train_loader', 'val_loader'}

def __init__(self, expert_path=None, traj_data=None, train_fraction=0.7, batch_size=64,
traj_limitation=-1, randomize=True, verbose=1, sequential_preprocessing=False):
Expand Down Expand Up @@ -118,13 +120,35 @@ def init_dataloader(self, batch_size):
sequential=self.sequential_preprocessing)

def __del__(self):
del self.dataloader, self.train_loader, self.val_loader
# Exit processes if needed
for key in self.EXCLUDED_KEYS:
if self.__dict__.get(key) is not None:
del self.__dict__[key]

def prepare_pickling(self):
def __getstate__(self):
"""
Exit processes in order to pickle the dataset.
Gets state for pickling.
Excludes processes that are not pickleable
"""
# Remove processes in order to pickle the dataset.
return {key: val for key, val in self.__dict__.items() if key not in self.EXCLUDED_KEYS}

def __setstate__(self, state):
"""
self.dataloader, self.train_loader, self.val_loader = None, None, None
Restores pickled state.
init_dataloader() must be called
after unpickling before using it with GAIL.
:param state: (dict)
"""
self.__dict__.update(state)
for excluded_key in self.EXCLUDED_KEYS:
assert excluded_key not in state
self.dataloader = None
self.train_loader = None
self.val_loader = None

def log_info(self):
"""
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/her/replay_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def can_sample(self, n_samples):
def __len__(self):
return len(self.replay_buffer)


def _sample_achieved_goal(self, episode_transitions, transition_idx):
"""
Sample an achieved goal according to the sampling strategy.
Expand Down
2 changes: 0 additions & 2 deletions stable_baselines/her/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def __init__(self, env):
else:
assert len(goal_space_shape) == 1, "Only 1D observation spaces are supported yet"


if isinstance(self.spaces[0], spaces.MultiBinary):
total_dim = self.obs_dim + 2 * self.goal_dim
self.observation_space = spaces.MultiBinary(total_dim)
Expand All @@ -59,7 +58,6 @@ def __init__(self, env):
else:
raise NotImplementedError("{} space is not supported".format(type(self.spaces[0])))


def convert_dict_to_obs(self, obs_dict):
"""
:param obs_dict: (dict<np.ndarray>)
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/ppo1/pposgd_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from stable_baselines.trpo_mpi.utils import add_vtarg_and_adv



class PPO1(ActorCriticRLModel):
"""
Proximal Policy Optimization algorithm (MPI version).
Expand Down
2 changes: 1 addition & 1 deletion stable_baselines/ppo2/ppo2.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from stable_baselines.common.tf_util import total_episode_reward_logger
from stable_baselines.common.math_util import safe_mean


class PPO2(ActorCriticRLModel):
"""
Proximal Policy Optimization algorithm (GPU version).
Expand Down Expand Up @@ -172,7 +173,6 @@ def setup_model(self):
tf.clip_by_value(train_model.value_flat - self.old_vpred_ph,
- self.clip_range_vf_ph, self.clip_range_vf_ph)


vf_losses1 = tf.square(vpred - self.rewards_ph)
vf_losses2 = tf.square(vpred_clipped - self.rewards_ph)
self.vf_loss = .5 * tf.reduce_mean(tf.maximum(vf_losses1, vf_losses2))
Expand Down
1 change: 1 addition & 0 deletions stable_baselines/ppo2/run_atari.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def train(env_id, num_timesteps, seed, policy,
# Free memory
del model


def main():
"""
Runs the test
Expand Down
4 changes: 1 addition & 3 deletions stable_baselines/sac/sac.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
import time
import warnings

Expand Down Expand Up @@ -242,7 +241,6 @@ def setup_model(self):
# policy_loss = (policy_kl_loss + policy_regularization_loss)
policy_loss = policy_kl_loss


# Target for value fn regression
# We update the vf towards the min of two Q-functions in order to
# reduce overestimation bias from function approximation error.
Expand Down Expand Up @@ -530,7 +528,7 @@ def predict(self, observation, state=None, mask=None, deterministic=True):
observation = observation.reshape((-1,) + self.observation_space.shape)
actions = self.policy_tf.step(observation, deterministic=deterministic)
actions = actions.reshape((-1,) + self.action_space.shape) # reshape to the correct action shape
actions = unscale_action(self.action_space, actions) # scale the output for the prediction
actions = unscale_action(self.action_space, actions) # scale the output for the prediction

if not vectorized_env:
actions = actions[0]
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/td3/policies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import tensorflow as tf
import numpy as np
from gym.spaces import Box

from stable_baselines.common.policies import BasePolicy, nature_cnn, register_policy
Expand Down
2 changes: 0 additions & 2 deletions stable_baselines/td3/td3.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
import time
import warnings

Expand Down Expand Up @@ -391,7 +390,6 @@ def learn(self, total_timesteps, callback=None,

callback.on_rollout_start()


episode_rewards[-1] += reward_
if done:
if self.action_noise is not None:
Expand Down
14 changes: 6 additions & 8 deletions stable_baselines/trpo_mpi/trpo_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ def allmean(arr):
tf_util.function([observation, old_policy.obs_ph, action, atarg, ret],
[self.summary, tf_util.flatgrad(optimgain, var_list)] + losses)

def _initialize_dataloader(self):
"""Initialize dataloader."""
batchsize = self.timesteps_per_batch // self.d_step
self.expert_dataset.init_dataloader(batchsize)

def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="TRPO",
reset_num_timesteps=True):

Expand Down Expand Up @@ -297,9 +302,7 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="T
if self.using_gail:
true_reward_buffer = deque(maxlen=40)

# Initialize dataloader
batchsize = self.timesteps_per_batch // self.d_step
self.expert_dataset.init_dataloader(batchsize)
self._initialize_dataloader()

# Stats not used for now
# TODO: replace with normal tb logging
Expand Down Expand Up @@ -338,7 +341,6 @@ def fisher_vector_product(vec):
observation, action = seg["observations"], seg["actions"]
atarg, tdlamret = seg["adv"], seg["tdlamret"]


vpredbefore = seg["vpred"] # predicted value function before update
atarg = (atarg - atarg.mean()) / (atarg.std() + 1e-8) # standardized advantage function estimate

Expand Down Expand Up @@ -430,7 +432,6 @@ def fisher_vector_product(vec):
grad = self.allmean(self.compute_vflossandgrad(mbob, mbob, mbret, sess=self.sess))
self.vfadam.update(grad, self.vf_stepsize)


# Stop training early (triggered by the callback)
if not seg.get('continue_training', True): # pytype: disable=attribute-error
break
Expand Down Expand Up @@ -504,9 +505,6 @@ def fisher_vector_product(vec):
return self

def save(self, save_path, cloudpickle=False):
if self.using_gail and self.expert_dataset is not None:
# Exit processes to pickle the dataset
self.expert_dataset.prepare_pickling()
data = {
"gamma": self.gamma,
"timesteps_per_batch": self.timesteps_per_batch,
Expand Down
1 change: 0 additions & 1 deletion stable_baselines/trpo_mpi/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import gym
import numpy as np


Expand Down
1 change: 1 addition & 0 deletions tests/test_0deterministic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
N_STEPS_TRAINING = 5000
SEED = 0


# Weird stuff: TD3 would fail if another algorithm is tested before
# with n_cpu_tf_sess > 1
@pytest.mark.parametrize("algo", [A2C, ACKTR, ACER, DDPG, DQN, PPO1, PPO2, SAC, TRPO, TD3])
Expand Down
16 changes: 8 additions & 8 deletions tests/test_a2c_conv.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def test_conv_kernel():
with tf.Graph().as_default():
_, scaled_images = observation_input(ob_space, n_batch, scale=scale)
activ = tf.nn.relu
layer_1 = activ(conv(scaled_images, 'c1', n_filters=32, filter_size=filter_size_1, stride=4
, init_scale=np.sqrt(2), **kwargs))
layer_2 = activ(conv(layer_1, 'c2', n_filters=32, filter_size=filter_size_2, stride=4
, init_scale=np.sqrt(2), **kwargs))
assert layer_1.shape == target_shape_1 \
, "The shape of layer based on the squared kernel matrix is not correct. " \
layer_1 = activ(conv(scaled_images, 'c1', n_filters=32, filter_size=filter_size_1,
stride=4, init_scale=np.sqrt(2), **kwargs))
layer_2 = activ(conv(layer_1, 'c2', n_filters=32, filter_size=filter_size_2,
stride=4, init_scale=np.sqrt(2), **kwargs))
assert layer_1.shape == target_shape_1, \
"The shape of layer based on the squared kernel matrix is not correct. " \
"The current shape is {} and the desired shape is {}".format(layer_1.shape, target_shape_1)
assert layer_2.shape == target_shape_2 \
, "The shape of layer based on the non-squared kernel matrix is not correct. " \
assert layer_2.shape == target_shape_2, \
"The shape of layer based on the non-squared kernel matrix is not correct. " \
"The current shape is {} and the desired shape is {}".format(layer_2.shape, target_shape_2)
env.close()

0 comments on commit c4c31cb

Please sign in to comment.