##### Copyright 2019 DeepMind Technologies Limited.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Environments

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/deepmind/reverb/blob/master/examples/demo.ipynb">
    <img src="https://www.tensorflow.org/images/colab_logo_32px.png" />
    Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/deepmind/reverb/blob/master/examples/demo.ipynb">
    <img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />
    View source on GitHub</a>
  </td>
</table>

# Introduction

This colab is a demonstration of how to use Reverb through examples.

# Setup

Installs the stable build of Reverb (dm-reverb) and TensorFlow (tf) to match.

In [None]:
!pip install numpy --upgrade
!pip install dm-tree
!pip install dm-reverb[tensorflow]

In [None]:
import reverb
import tensorflow as tf

The code below defines a dummy RL environment for use in the examples below.

In [None]:
OBSERVATION_SPEC = tf.TensorSpec([10, 10], tf.uint8)
ACTION_SPEC = tf.TensorSpec([2], tf.float32)

def agent_step(unused_timestep) -> tf.Tensor:
  return tf.cast(tf.random.uniform(ACTION_SPEC.shape) > .5,
                 ACTION_SPEC.dtype)

def environment_step(unused_action) -> tf.Tensor:
  return tf.cast(tf.random.uniform(OBSERVATION_SPEC.shape, maxval=256),
                 OBSERVATION_SPEC.dtype)

# Creating a Server and Client

In [None]:
# Initialize the reverb server.
simple_server = reverb.Server(
    tables=[
        reverb.Table(
            name='my_table',
            sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            # Sets Rate Limiter to a low number for the examples.
            # Read the Rate Limiters section for usage info.
            rate_limiter=reverb.rate_limiters.MinSize(2),
            # The signature is optional but it is good practice to set it as it
            # enables data validation and easier dataset construction. Note that
            # we prefix all shapes with a 3 as the trajectories we'll be writing
            # consist of 3 timesteps.
            signature={
                'actions':
                    tf.TensorSpec([3, *ACTION_SPEC.shape], ACTION_SPEC.dtype),
                'observations':
                    tf.TensorSpec([3, *OBSERVATION_SPEC.shape],
                                  OBSERVATION_SPEC.dtype),
            },
        )
    ],
    # Sets the port to None to make the server pick one automatically.
    # This can be omitted as it's the default.
    port=None)

# Initializes the reverb client on the same port as the server.
client = reverb.Client(f'localhost:{simple_server.port}')

For details on customizing the sampler, remover, and rate limiter, see below.

# Example 1: Overlapping Trajectories


## Inserting Overlapping Trajectories

In [None]:
# Dynamically adds trajectories of length 3 to 'my_table' using a client writer.

with client.trajectory_writer(num_keep_alive_refs=3) as writer:
  timestep = environment_step(None)
  for step in range(4):
    action = agent_step(timestep)
    writer.append({'action': action, 'observation': timestep})
    timestep = environment_step(action)

    if step >= 2:
      # In this example, the item consists of the 3 most recent timesteps that
      # were added to the writer and has a priority of 1.5.
      writer.create_item(
          table='my_table',
          priority=1.5,
          trajectory={
              'actions': writer.history['action'][-3:],
              'observations': writer.history['observation'][-3:],
          }
      )

The animation illustrates the state of the server at each step in the
above code block. Although each item is being set to have the same
priority value of 1.5, items do not need to have the same priority values.
In real world scenarios, items would have differing and
dynamically-calculated priority values.



<img src="https://raw.githubusercontent.com/deepmind/reverb/master/docs/animations/diagram1.svg" />

## Sampling Overlapping Trajectories in TensorFlow

In [None]:
# Dataset samples sequences of length 3 and streams the timesteps one by one.
# This allows streaming large sequences that do not necessarily fit in memory.
dataset = reverb.TrajectoryDataset.from_table_signature(
  server_address=f'localhost:{simple_server.port}',
  table='my_table',
  max_in_flight_samples_per_worker=10)


In [None]:
# Batches 2 sequences together.
# Shapes of items is now [2, 3, 10, 10].
batched_dataset = dataset.batch(2)

for sample in batched_dataset.take(1):
  # Results in the following format.
  print(sample.info.key)              # ([2], uint64)
  print(sample.info.probability)      # ([2], float64)

  print(sample.data['observations'])  # ([2, 3, 10, 10], uint8)
  print(sample.data['actions'])       # ([2, 3, 2], float32)

# Example 2: Complete Episodes

Create a new server for this example to keep the elements of the priority table consistent.

In [None]:
EPISODE_LENGTH = 150

complete_episode_server = reverb.Server(tables=[
    reverb.Table(
        name='my_table',
        sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
        remover=reverb.selectors.Fifo(),
        max_size=int(1e6),
        # Sets Rate Limiter to a low number for the examples.
        # Read the Rate Limiters section for usage info.
        rate_limiter=reverb.rate_limiters.MinSize(2),
        # The signature is optional but it is good practice to set it as it
        # enables data validation and easier dataset construction. Note that
        # the number of observations is larger than the number of actions.
        # The extra observation is the terminal state where no action is
        # taken.
        signature={
            'actions':
                tf.TensorSpec([EPISODE_LENGTH, *ACTION_SPEC.shape],
                              ACTION_SPEC.dtype),
            'observations':
                tf.TensorSpec([EPISODE_LENGTH + 1, *OBSERVATION_SPEC.shape],
                              OBSERVATION_SPEC.dtype),
        },
    ),
])

# Initializes the reverb client on the same port.
client = reverb.Client(f'localhost:{complete_episode_server.port}')

## Inserting Complete Episodes

In [None]:
# Writes whole episodes of varying length to a Reverb server.

NUM_EPISODES = 10

# We know that episodes are at most 150 steps so we set the writer buffer size
# to 151 (to capture the final observation).
with client.trajectory_writer(num_keep_alive_refs=151) as writer:
  for _ in range(NUM_EPISODES):
    timestep = environment_step(None)

    for _ in range(EPISODE_LENGTH):
      action = agent_step(timestep)
      writer.append({'action': action, 'observation': timestep})

      timestep = environment_step(action)

    # The astute reader will recognize that the final timestep has not been
    # appended to the writer. We'll go ahead and add it WITHOUT an action. The
    # writer will automatically fill in the gap with `None` for the action
    # column.
    writer.append({'observation': timestep})

    # Now that the entire episode has been added to the writer buffer we can an
    # item with a trajectory that spans the entire episode. Note that the final
    # action must not be included as it is None and the trajectory would be
    # rejected if we tried to include it.
    writer.create_item(
        table='my_table',
        priority=1.5,
        trajectory={
            'actions': writer.history['action'][:-1],
            'observations': writer.history['observation'][:],
        })

    # This call blocks until all the items (in this case only one) have been
    # sent to the server, inserted into respective tables and confirmations
    # received by the writer.
    writer.end_episode(timeout_ms=1000)

    # Ending the episode also clears the history property which is why we are
    # able to use `[:]` in when defining the trajectory above.
    assert len(writer.history['action']) == 0
    assert len(writer.history['observation']) == 0

## Sampling Complete Episodes in TensorFlow

In [None]:
# Each sample is an entire episode.
# Adjusts the expected shapes to account for the whole episode length.
dataset = reverb.TrajectoryDataset.from_table_signature(
  server_address=f'localhost:{complete_episode_server.port}',
  table='my_table',
  max_in_flight_samples_per_worker=10,
  rate_limiter_timeout_ms=10)

# Batches 128 episodes together.
# Each item is an episode of the format (observations, actions) as above.
# Shape of items are now ([128, 151, 10, 10], [128, 150, 2]).
dataset = dataset.batch(128)

# Sample has type reverb.ReplaySample.
for sample in dataset.take(1):
  # Results in the following format.
  print(sample.info.key)              # ([128], uint64)
  print(sample.info.probability)      # ([128], float64)

  print(sample.data['observations'])  # ([128, 151, 10, 10], uint8)
  print(sample.data['actions'])       # ([128, 150, 2], float32)

# Example 3: Multiple Priority Tables

Create a server that maintains multiple priority tables.

In [None]:
multitable_server = reverb.Server(
    tables=[
        reverb.Table(
            name='my_table_a',
            sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            # Sets Rate Limiter to a low number for the examples.
            # Read the Rate Limiters section for usage info.
            rate_limiter=reverb.rate_limiters.MinSize(1)),
        reverb.Table(
            name='my_table_b',
            sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            # Sets Rate Limiter to a low number for the examples.
            # Read the Rate Limiters section for usage info.
            rate_limiter=reverb.rate_limiters.MinSize(1)),
    ])

client = reverb.Client('localhost:{}'.format(multitable_server.port))

## Inserting Sequences of Varying Length into Multiple Priority Tables


In [None]:
with client.trajectory_writer(num_keep_alive_refs=3) as writer:
  timestep = environment_step(None)

  for step in range(4):
    writer.append({'timestep': timestep})

    action = agent_step(timestep)
    timestep = environment_step(action)

    if step >= 1:
      writer.create_item(
          table='my_table_b',
          priority=4-step,
          trajectory=writer.history['timestep'][-2:])

    if step >= 2:
      writer.create_item(
          table='my_table_a',
          priority=4-step,
          trajectory=writer.history['timestep'][-3:])

This diagram shows the state of the server after executing the above cell.


<img src="https://raw.githubusercontent.com/deepmind/reverb/master/docs/animations/diagram2.svg" />

# Example 4: Samplers and Removers



##  Creating a Server with a Prioritized Sampler and a FIFO Remover

In [None]:
reverb.Server(tables=[
    reverb.Table(
        name='my_table',
        sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
        remover=reverb.selectors.Fifo(),
        max_size=int(1e6),
        rate_limiter=reverb.rate_limiters.MinSize(100)),
])

## Creating a Server with a MaxHeap Sampler and a MinHeap Remover

Setting `max_times_sampled=1` causes each item to be removed after it is
sampled once. The end result is a priority table that essentially functions
as a max priority queue.


In [None]:
max_size = 1000
reverb.Server(tables=[
    reverb.Table(
        name='my_priority_queue',
        sampler=reverb.selectors.MaxHeap(),
        remover=reverb.selectors.MinHeap(),
        max_size=max_size,
        rate_limiter=reverb.rate_limiters.MinSize(int(0.95 * max_size)),
        max_times_sampled=1,
    )
])

## Creating a Server with One Queue and One Circular Buffer

Behavior of canonical data structures such as
[circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) or a max
[priority queue](https://en.wikipedia.org/wiki/Priority_queue) can
be implemented in Reverb by modifying the `sampler` and `remover`
or by using the `PriorityTable` queue initializer.

In [None]:
reverb.Server(
    tables=[
        reverb.Table.queue(name='my_queue', max_size=10000),
        reverb.Table(
            name='my_circular_buffer',
            sampler=reverb.selectors.Fifo(),
            remover=reverb.selectors.Fifo(),
            max_size=10000,
            max_times_sampled=1,
            rate_limiter=reverb.rate_limiters.MinSize(1)),
    ])

# Example 5: Rate Limiters


## Creating a Server with a SampleToInsertRatio Rate Limiter

In [None]:
reverb.Server(
    tables=[
        reverb.Table(
            name='my_table',
            sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            rate_limiter=reverb.rate_limiters.SampleToInsertRatio(
                samples_per_insert=3.0, min_size_to_sample=3,
                error_buffer=3.0)),
    ])


This example is intended to be used in a distributed or multi-threaded
enviroment where insertion blocking will be unblocked by sample calls from
an independent thread. If the system is single threaded, the blocked
insertion call will cause a deadlock.
