In [None]:
import tensorflow as tf
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step

# 1. Define an environment class as an inheritance from PyEnvironment
Instances from this class represent first-order delay systems.

In [None]:
class MyEnv(py_environment.PyEnvironment):
    '''
    
    Y(s) = K/(1+T*s) * U(s)
    
    T * dy(t)/dt = - y(t) + K * u(t), t > 0, 
    y(0) = y_init.
    
    y(t+1) = (1-1/T) * y(t) + K / T * u(t), t = 1,2, ...
    y(0) = y_init.
    
    x(t+1) = (1-1/T) * x(t) + K / T * u(t), t = 1,2, ...
    x(0) = x_init, 
    y(t) = x(t).
    
    '''

    def __init__(self, nStepSimulation = 100, T = 10, K = 1.0, discount = 0.9):
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.float32, minimum=-1, maximum=1, name='action')

        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.float32, minimum=-1, maximum=1, name='observation')

        self._state = self.getInitialState()
        self._episode_ended = False
        self.time = 0
        self.nStepSimulation = nStepSimulation
        self.T = T
        self.K = K
        self.discount = discount
    
    def getInitialState(self):
        return 0.
    
    def getObservation(self):
        return np.array(self._state, np.float32) # (,)
    
    def getReward(self):
        sv = 1.0
        err = sv - self.getObservation()
        return np.abs(err)
    
    def action_spec(self):
        return self._action_spec
    
    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self.time = 0
        self._state = self.getInitialState()
        self._episode_ended = False
        
        return time_step.restart(self.getObservation())

    def _step(self, action):
        
        if self._episode_ended:
            # The last action ended the episode. Ignore the current action and start
            # a new episode.
            return self.reset()
        
        if self.time < self.nStepSimulation:
            
            self._state = (1-1/self.T) * self._state + self.K/self.T * action
            
            self.time += 1
            return time_step.transition(self.getObservation(), reward = self.getReward(), discount = self.discount)
        else:
            self._episode_ended = True
            return time_step.termination(self.getObservation(), reward = self.getReward())

In [None]:
def aSimpleUnitTest():
    env = MyEnv()
    assert isinstance(env, py_environment.PyEnvironment)
    utils.validate_py_environment(env, episodes=5)

def anotherSimpleUnitTest():
    env = MyEnv()
    assert isinstance(env, py_environment.PyEnvironment)

    u = np.array(np.random.randn(), np.float32) # (,)
    
    time_step = env.reset()    
    rewardAvg = time_step.reward    
    while not time_step.is_last():
        time_step = env.step(u)
        rewardAvg = (1-1/10) * rewardAvg + 1/10 * time_step.reward

In [None]:
aSimpleUnitTest()
anotherSimpleUnitTest()