Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PRE sample with episode for tfclient #69

Open
cmarlin opened this issue Sep 11, 2021 · 2 comments
Open

PRE sample with episode for tfclient #69

cmarlin opened this issue Sep 11, 2021 · 2 comments

Comments

@cmarlin
Copy link

cmarlin commented Sep 11, 2021

Hello,
I'm don't find how to store episodes data with a priority per position with the tfclient. Could you provide such example ? It would be valuable for pre on muzero algorithm
Thanks a lot

@fastturtle
Copy link
Collaborator

Have you seen the documentation for TFClient.insert()? This should be analogous to the example of Client.insert(). Let us know if this is not clear so we can update it.

Additionally, are you using the TFClient instead of the Client and TrajectoryWriter for performance reasons? It is often simpler to use the TrajectoryWriter where possible.

@cmarlin
Copy link
Author

cmarlin commented Oct 20, 2021

Yes, I try to use TFClient for performance, as TrajectoryWriter is really slow.
Due to performance I use batched environments and agent's policy, so it makes things complex.
Here is a pseudo code of my listener:

class ReverbObserver():
..def __init__(self, reverb_tfclient, collect_data_spec, batch_size:int):
....flat_collect_data_spec = tf.nest.flatten(collect_data_spec)
....self._writer = [tf.Variable(tf.zeros([batch_size] + s.shape, s.dtype)) for s in flat_collect_data_spec]
....self._writer_position = tf.Variable(tf.zeros([batch_size], dtype=tf.int32))

..def __call__(self, traj:tf_agents.trajectories.trajectory.Trajectory):
....flat_traj = tf.nest.flatten(traj)
....# append observation to internal buffer
....writer_indices2D = tf.stack([tf.range(self._batch_size), self._writer_position], -1)
....for writer_elt, traj_elt in zip(self._writer, flat_traj):
......writer_elt.assign(tf.tensor_scatter_nd_update(writer_elt, writer_indices2D, traj_elt))
....self._writer_position.assign(self._writer_position + 1)
....# ok, check for finished episodes
....traj_indexes = tf.where(traj.is_last())
....for traj_index in traj_indexes:
....common_data = [episode[traj_index] for episode in self._writer]
....for step in tf.range(self._writer_position[traj_index]):
....self._reverb_tfclient.insert(
......data = common_data + [step],
......tables = self._table_names,
......priorities = tf.constant([1.0], tf.float64),
......)
....self._writer_position.assign(tf.where(traj.is_last(), 0, self._writer_position))

Obviously, I still have issues at the moment (sampling probability is relative to sample, not episode, ... ).
But I would like to know if "common_data" are shared internally by client/reverb, as it stores all the episode datas. For PER I need a priority per step.
So It could be helpfull to add an efficient code sample for high throughput.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants