In [9]:
import numpy as np
import tensorflow as tf
import reverb
from tf_agents.environments.py_environment import PyEnvironment
from tf_agents.environments.tf_py_environment import TFPyEnvironment
from tf_agents.environments.utils import validate_py_environment
from tf_agents.specs.array_spec import BoundedArraySpec
from tf_agents.trajectories import time_step as ts
from tf_agents.agents.ddpg import actor_network, critic_network
from tf_agents.agents.td3 import td3_agent
from tf_agents.policies import random_py_policy
from custom_dynamic_episode_driver import DynamicEpisodeDriver # This is a custom  script, derived from
                                                               # https://github.com/tensorflow/agents/blob/master/tf_agents/drivers/dynamic_episode_driver.py
                                                               # Added to this "investigation" repository. Please take a look.

In [10]:
"""
This is the script which attempts to redo 
https://github.com/tensorflow/agents/blob/master/tf_agents/agents/td3/examples/v2/train_eval.py
while using dm's reverb framework to manage the experience buffer.
"""

"\nThis is the script which attempts to redo \nhttps://github.com/tensorflow/agents/blob/master/tf_agents/agents/td3/examples/v2/train_eval.py\nwhile using dm's reverb framework to manage the experience buffer.\n"

In [11]:
"""
Firstly, I specify a dummy TF_Agents PyEnvironment
"""

class SampleEnvironment(PyEnvironment):
    def __init__(self):
        self._observation_spec = self._init_observation_spec()
        self._action_spec = self._init_action_spec()
        self.current_step_index = 0
        self.max_no_steps = 5
    
    def observation_spec(self):
        return self._observation_spec

    def action_spec(self):
        return self._action_spec
    
    def _init_observation_spec(self):
        observation_spec = BoundedArraySpec(shape=(6,), dtype=np.int32, minimum=0, maximum=10)
        return observation_spec
    
    def _init_action_spec(self):
        action_spec = BoundedArraySpec((3,), np.float32, minimum=-1.0, maximum=1.0)
        return action_spec
    
    def _step(self, action):
        observation = np.random.randint(low=0, high=10, size=(6,), dtype=np.int32)
        reward = 1
        if self.current_step_index < self.max_no_steps:
            print('Transition step no.{}: {}'.format(self.current_step_index, observation.shape))
            print('-----')
            self.current_step_index +=1
            return ts.transition(observation,reward=reward, discount=1.0)
        else:
            print('Termination step no.{}: {}'.format(self.current_step_index, observation.shape))
            print('-----')
            self.current_step_index = 0
            return ts.termination(observation, reward=reward)
        
    def _reset(self):
        observation = np.random.randint(low=0, high=10, size=(6,), dtype=np.int32)
        print('Reset step: {}'.format(observation.shape))
        print('-----')
        return ts.restart(observation)

In [12]:
"""
Let's create PyEnvironment, derive a TFPyEnvironment from it and finally validate that the dummy environment is valid.
"""
py_env = SampleEnvironment()
tf_env = TFPyEnvironment(py_env)
validate_py_environment(py_env, episodes=2)

Reset step: (6,)
-----
Transition step no.0: (6,)
-----
Transition step no.1: (6,)
-----
Transition step no.2: (6,)
-----
Transition step no.3: (6,)
-----
Transition step no.4: (6,)
-----
Termination step no.5: (6,)
-----
Reset step: (6,)
-----
Transition step no.0: (6,)
-----
Transition step no.1: (6,)
-----
Transition step no.2: (6,)
-----
Transition step no.3: (6,)
-----
Transition step no.4: (6,)
-----
Termination step no.5: (6,)
-----
Reset step: (6,)
-----


In [13]:
"""
Creating a server, sampling client (to pass experience to the table) and training client (to retrieve experience from
the table and feed it into the tf_agent.train() method.)
"""
dm_server= reverb.Server(
    tables=[
        reverb.Table(
            name='my_table',
            sampler=reverb.selectors.Prioritized(priority_exponent=0.8),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            rate_limiter=reverb.rate_limiters.MinSize(1)),
    ],
    # Sets the port to None to make the server pick one automatically.
    port=None)

sampling_client = reverb.Client('localhost:{}'.format(dm_server.port))
training_client = reverb.TFClient('localhost:{}'.format(dm_server.port))

In [14]:
"""
Defining tf.agent as specified in the original train_eval.py code
"""

actor_fc_layers=(400, 300)
critic_obs_fc_layers=(400,)
critic_action_fc_layers=None
critic_joint_fc_layers=(300,)
exploration_noise_std=0.1
target_update_tau=0.05
target_update_period=5
actor_update_period=2
gradient_clipping=None
dqda_clipping=None
td_errors_loss_fn=tf.compat.v1.losses.huber_loss
gamma=0.995
reward_scale_factor=1.0
gradient_clipping=None
actor_learning_rate=1e-4
critic_learning_rate=1e-3

global_step = tf.compat.v1.train.get_or_create_global_step()

actor_net = actor_network.ActorNetwork(
    tf_env.time_step_spec().observation,
    tf_env.action_spec(),
    fc_layer_params=actor_fc_layers)

critic_net_input_specs = (tf_env.time_step_spec().observation,
                          tf_env.action_spec())

critic_net = critic_network.CriticNetwork(
    critic_net_input_specs,
    observation_fc_layer_params=critic_obs_fc_layers,
    action_fc_layer_params=critic_action_fc_layers,
    joint_fc_layer_params=critic_joint_fc_layers)


tf_agent = td3_agent.Td3Agent(
    tf_env.time_step_spec(),
    tf_env.action_spec(),
    actor_network=actor_net,
    critic_network=critic_net,
    actor_optimizer=tf.compat.v1.train.AdamOptimizer(learning_rate=actor_learning_rate),
    critic_optimizer=tf.compat.v1.train.AdamOptimizer(learning_rate=critic_learning_rate),
    exploration_noise_std=exploration_noise_std,
    target_update_tau=target_update_tau,
    target_update_period=target_update_period,
    actor_update_period=actor_update_period,
    dqda_clipping=dqda_clipping,
    td_errors_loss_fn=td_errors_loss_fn,
    gamma=gamma,
    reward_scale_factor=reward_scale_factor,
    gradient_clipping=gradient_clipping,
    train_step_counter=global_step)

tf_agent.initialize()


In [15]:
random_policy = random_py_policy.RandomPyPolicy(time_step_spec=py_env.time_step_spec(),
                                                action_spec=py_env.action_spec())

collect_driver = DynamicEpisodeDriver(tf_env, 
                                      random_policy,
                                      observers=[],
                                      sampling_client = sampling_client, # Note, that my custom driver uses sampling client
                                      num_episodes=1)
collect_driver.run()

Reset step: (6,)
-----
Transition step no.0: (6,)
-----
Trajectory: Trajectory(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([0], dtype=int32)>, observation=<tf.Tensor: shape=(1, 6), dtype=int32, numpy=array([[0, 5, 3, 1, 6, 2]], dtype=int32)>, action=array([[-0.08907502, -0.43950248,  0.12805901]], dtype=float32), policy_info=(), next_step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>)
Transition step no.1: (6,)
-----
Trajectory: Trajectory(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>, observation=<tf.Tensor: shape=(1, 6), dtype=int32, numpy=array([[6, 3, 9, 8, 2, 5]], dtype=int32)>, action=array([[-0.9718742 ,  0.77809495,  0.02244174]], dtype=float32), policy_info=(), next_step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>,

(TimeStep(step_type=<tf.Tensor: shape=(1,), dtype=int32, numpy=array([1], dtype=int32)>, reward=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, discount=<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>, observation=<tf.Tensor: shape=(1, 6), dtype=int32, numpy=array([[0, 9, 7, 9, 9, 4]], dtype=int32)>),
 ())

In [16]:
sequence_length = 1
data_spec = tf_agent.collect_data_spec # Set in the constructor of the ReverbRB

# Before calling client.dataset 
get_dtype = lambda x: tf.as_dtype(x.dtype)
get_shape = lambda x: (sequence_length,) + x.shape
shapes = tf.nest.map_structure(get_shape, data_spec)
dtypes = tf.nest.map_structure(get_dtype, data_spec)
print(shapes)
print('----')
print(dtypes)

dataset = training_client.dataset(
        table='my_table',
        dtypes=dtypes,
        shapes=dtypes)

print(dataset)
#dataset = dataset.batch(128)

for sample in dataset.take(1):
    print(sample.info.key)        
    print(sample.info.probability)
    print(sample.data)

Trajectory(step_type=TensorShape([1]), observation=TensorShape([1, 6]), action=TensorShape([1, 3]), policy_info=(), next_step_type=TensorShape([1]), reward=TensorShape([1]), discount=TensorShape([1]))
----
Trajectory(step_type=tf.int32, observation=tf.int32, action=tf.float32, policy_info=(), next_step_type=tf.int32, reward=tf.float32, discount=tf.float32)


TypeError: Error converting shapes to a TensorShape: Dimension value must be integer or None or have an __index__ method, got value 'tf.int32' with type '<class 'tensorflow.python.framework.dtypes.DType'>'.