In [1]:
import numpy as np
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories.time_step import restart
from tf_agents.trajectories.time_step import termination
from tf_agents.trajectories.time_step import transition
from tf_agents.environments.py_environment import PyEnvironment

  for external in metadata.entry_points().get(self.group, []):


Creating a Custom TF-Agents Environment

In [2]:
class MyEnvironment(PyEnvironment):
    def __init__(self, discount=1.0):
        super().__init__()
        self._action_spec = BoundedArraySpec(
            shape=(), dtype=np.int32, name='action', minimum=0, maximum=3)
        self._observation_spec = BoundedArraySpec(
            shape=(4, 4), dtype=np.int32, name='observation', minimum=0, maximum=1)
        self.discount = discount

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = np.zeros(2, dtype=np.int32)
        obs = np.zeros((4, 4), dtype=np.int32)
        obs[self._state[0], self._state[1]] = 1
        return restart(obs)

    def _step(self, action):
        self._state += [(-1, 0), (+1, 0), (0, -1), (0, +1)][action]
        reward = 0
        obs = np.zeros((4, 4), dtype=np.int32)
        done = (self._state.min() < 0 or self._state.max() > 3)
        if not done:
            obs[self._state[0], self._state[1]] = 1
        if done or np.all(self._state == np.array([3, 3])):
            reward = -1 if done else +10
            return termination(obs, reward)
        else:
            return transition(obs, reward, self.discount)

In [3]:
my_env = MyEnvironment()
time_step = my_env.reset()
time_step

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([[1, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0]]),
 'reward': array(0., dtype=float32),
 'step_type': array(0)})

In [4]:
time_step = my_env.step(1)
time_step

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([[0, 0, 0, 0],
       [1, 0, 0, 0],
       [0, 0, 0, 0],
       [0, 0, 0, 0]]),
 'reward': array(0., dtype=float32),
 'step_type': array(1)})

Deque vs Rotating List

In [5]:
from collections import deque

np.random.seed(42)

max_size = 1000000
mem = deque(maxlen=max_size)
for i in range(max_size):
    mem.append(i)
[mem[i] for i in np.random.randint(max_size, size=5)]

[121958, 671155, 131932, 365838, 259178]

In [6]:
%timeit mem.append(1)

44.9 ns ± 0.411 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)


In [7]:
%timeit [mem[i] for i in np.random.randint(max_size, size=5)]

172 µs ± 3.1 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [8]:
class ReplayMemory:
    def __init__(self, max_size):
        self.buffer = np.empty(max_size, dtype=np.object)
        self.max_size = max_size
        self.index = 0
        self.size = 0
        
    def append(self, obj):
        self.buffer[self.index] = obj
        self.size = min(self.size + 1, self.max_size)
        self.index = (self.index + 1) % self.max_size
        
    def sample(self, batch_size):
        indices = np.random.randint(self.size, size=batch_size)
        return self.buffer[indices]

In [9]:
np.random.seed(42)

mem = ReplayMemory(max_size=max_size)
for i in range(max_size):
    mem.append(i)
mem.sample(5)

array([121958, 671155, 131932, 365838, 259178], dtype=object)

In [10]:
%timeit mem.append(1)

541 ns ± 1.73 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)


In [11]:
%timeit mem.sample(5)

8.51 µs ± 13.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
