# [環境](https://www.tensorflow.org/agents/tutorials/2_environments_tutorial)

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import tensorflow as tf
import numpy as np

from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts

## Python環境

`step(action)`は与えられた環境で行動を適用し次のステップに関する以下の情報を返す。

1. `observation`：これは、エージェントが次のステップで行動を選択するために観察できる環境状態の一部です。
2. `reward`：エージェントは、複数のステップにわたってこれらの報酬の合計を最大化することを学習します。
3. `step_type`：環境との相互作用は通常、シーケンス/エピソードの一部です (チェスのゲームで複数の動きがあるように)。step_type は、`FIRST`、`MID`または`LAST`のいずれかで、このタイムステップがシーケンスの最初、中間、または最後のステップかどうかを示します。
4. `discount`：これは、現在のタイムステップでの報酬に対する次のタイムステップでの報酬の重み付けを表す浮動小数です。

これらは、名前付きタプル`TimeStep(step_type, reward, discount, observation)`にグループ化されます

In [2]:
environment = suite_gym.load('CartPole-v0')
print('action_spec:', environment.action_spec())
print('time_step_spec.observation:', environment.time_step_spec().observation)
print('time_step_spec.step_type:', environment.time_step_spec().step_type)
print('time_step_spec.discount:', environment.time_step_spec().discount)
print('time_step_spec.reward:', environment.time_step_spec().reward)

action_spec: BoundedArraySpec(shape=(), dtype=dtype('int64'), name='action', minimum=0, maximum=1)
time_step_spec.observation: BoundedArraySpec(shape=(4,), dtype=dtype('float32'), name='observation', minimum=[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], maximum=[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38])
time_step_spec.step_type: ArraySpec(shape=(), dtype=dtype('int32'), name='step_type')
time_step_spec.discount: BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0)
time_step_spec.reward: ArraySpec(shape=(), dtype=dtype('float32'), name='reward')


In [3]:
action = np.array(1, dtype=np.int32)
time_step = environment.reset()
print(time_step)
while not time_step.is_last():
    time_step = environment.step(action)
    print(time_step)

TimeStep(step_type=array(0, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([ 0.03754664, -0.01755501,  0.03016233,  0.03129518], dtype=float32))
TimeStep(step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32), observation=array([ 0.03719554,  0.1771217 ,  0.03078823, -0.2517207 ], dtype=float32))
TimeStep(step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32), observation=array([ 0.04073798,  0.3717908 ,  0.02575382, -0.53453565], dtype=float32))
TimeStep(step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32), observation=array([ 0.04817379,  0.5665413 ,  0.01506311, -0.8189937 ], dtype=float32))
TimeStep(step_type=array(1, dtype=int32), reward=array(1., dtype=float32), discount=array(1., dtype=float32), observation=array([ 0.05950462,  0.76145387, -0.00131677, -1.106901  ], dtype=float32))
TimeStep(s

### カスタム環境の作成

例：簡単なブラックジャック

1. ゲームは、1～10 の数値が付けられた無限のカード一式を使用してプレイします。
2. 毎回、エージェントは2つの行動 (新しいランダムカードを取得する、またはその時点のラウンドを停止する) を実行できます。
3. 目標はラウンド終了時にカードの合計を 21 にできるだけ近づけることです。

この時の環境：

1. `action`：2つの行動があります（行動0：新しいカードを取得、行動1：その時点のラウンドを終了）。
2. `observation`：その時点のラウンドのカードの合計。
3. `reward`：目標は、21 にできるだけ近づけることなので、ラウンド終了時に次の報酬を使用します。`sum_of_cards - 21 if sum_of_cards <= 21 else -21`

In [4]:
class CardGameEnv(py_environment.PyEnvironment):

    def __init__(self):
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')
        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(1,), dtype=np.int32, minimum=0, name='observation')
        self._state = 0
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = 0
        self._episode_ended = False
        return ts.restart(np.array([self._state], dtype=np.int32))

    def _step(self, action):

        if self._episode_ended:
            # The last action ended the episode. Ignore the current action and start a new episode.
            return self.reset()

        # Make sure episodes don't go on forever.
        if action == 1:
            self._episode_ended = True
        elif action == 0:
            new_card = np.random.randint(1, 11)
            self._state += new_card
        else:
            raise ValueError('`action` should be 0 or 1.')

        if self._episode_ended or self._state >= 21:
            reward = self._state - 21 if self._state <= 21 else -21
            return ts.termination(np.array([self._state], dtype=np.int32), reward)
        else:
            return ts.transition(
                np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

環境の検証。生成された`observation`と`time_steps`が仕様で定義されている正しい形状とタイプに従っていることを確認する。

以下ではランダムなポリシーをを5つのエピソードで実行している。問題があればエラーが返ってくる。

In [6]:
environment = CardGameEnv()
utils.validate_py_environment(environment, episodes=5)

固定ポリシーを適用する。ここでは3枚のカードを要求して終了する。

In [7]:
get_new_card_action = np.array(0, dtype=np.int32)
end_round_action = np.array(1, dtype=np.int32)

environment = CardGameEnv()
time_step = environment.reset()
print(time_step)
cumulative_reward = time_step.reward

for _ in range(3):
    time_step = environment.step(get_new_card_action)
    print(time_step)
    cumulative_reward += time_step.reward

time_step = environment.step(end_round_action)
print(time_step)
cumulative_reward += time_step.reward
print('Final Reward = ', cumulative_reward)

TimeStep(step_type=array(0, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([0], dtype=int32))
TimeStep(step_type=array(1, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([6], dtype=int32))
TimeStep(step_type=array(1, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([12], dtype=int32))
TimeStep(step_type=array(1, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([16], dtype=int32))
TimeStep(step_type=array(2, dtype=int32), reward=array(-5., dtype=float32), discount=array(0., dtype=float32), observation=array([16], dtype=int32))
Final Reward =  -5.0


一般的なラッパーは`environments/wrappers.py`にある。これらはPython環境`py_environment.PyEnvironment`を受け取り、変更された環境`py_environment.PyEnvironment`を返す。例：

1. `ActionDiscretizeWrapper`：連続空間で定義された行動を離散化された行動に変換します。
2. `RunStats`：実行したステップ数、完了したエピソード数など、環境の実行統計をキャプチャします。
3. `TimeLimit`：一定のステップ数の後にエピソードを終了します。

## TensorFlow環境

`environments/tf_environment.TFEnvironment`で定義されている。以下の点でPython環境とは異なる。

- 配列の代わりにテンソルオブジェクトを生成する
- 仕様と比較したときに生成されたテンソルにバッチディメンションを追加します

Python環境をTF環境に変換すると、TensorFlowで操作を並列化できます。たとえば、環境からデータを収集して`replay_buffer`に追加する`collect_experience_op`、および、`replay_buffer`から読み取り、エージェントをトレーニングする`train_op`を定義し、TensorFlowで自然に並列実行することができます。

### カスタム環境の作成

直接作るのは難しいので、Python環境をTensorFlow環境に`TFPyEnvironment`ラッパーを使って変換する。

In [8]:
env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

print(isinstance(tf_env, tf_environment.TFEnvironment))
print("TimeStep Specs:", tf_env.time_step_spec())
print("Action Specs:", tf_env.action_spec())

True
TimeStep Specs: TimeStep(step_type=TensorSpec(shape=(), dtype=tf.int32, name='step_type'), reward=TensorSpec(shape=(), dtype=tf.float32, name='reward'), discount=BoundedTensorSpec(shape=(), dtype=tf.float32, name='discount', minimum=array(0., dtype=float32), maximum=array(1., dtype=float32)), observation=BoundedTensorSpec(shape=(4,), dtype=tf.float32, name='observation', minimum=array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32), maximum=array([4.8000002e+00, 3.4028235e+38, 4.1887903e-01, 3.4028235e+38],
      dtype=float32)))
Action Specs: BoundedTensorSpec(shape=(), dtype=tf.int64, name='action', minimum=array(0), maximum=array(1))


In [9]:
env = suite_gym.load('CartPole-v0')

tf_env = tf_py_environment.TFPyEnvironment(env)
# reset() creates the initial time_step after resetting the environment.
time_step = tf_env.reset()
num_steps = 3
transitions = []
reward = 0
for i in range(num_steps):
    action = tf.constant([i % 2])
    # applies the action and returns the new TimeStep.
    next_time_step = tf_env.step(action)
    transitions.append([time_step, action, next_time_step])
    reward += next_time_step.reward
    time_step = next_time_step

np_transitions = tf.nest.map_structure(lambda x: x.numpy(), transitions)
print('\n'.join(map(str, np_transitions)))
print('Total reward:', reward.numpy())

[TimeStep(step_type=array([0], dtype=int32), reward=array([0.], dtype=float32), discount=array([1.], dtype=float32), observation=array([[ 0.02313676, -0.03736085, -0.023516  ,  0.02066958]],
      dtype=float32)), array([0], dtype=int32), TimeStep(step_type=array([1], dtype=int32), reward=array([1.], dtype=float32), discount=array([1.], dtype=float32), observation=array([[ 0.02238954, -0.2321378 , -0.02310261,  0.30584118]],
      dtype=float32))]
[TimeStep(step_type=array([1], dtype=int32), reward=array([1.], dtype=float32), discount=array([1.], dtype=float32), observation=array([[ 0.02238954, -0.2321378 , -0.02310261,  0.30584118]],
      dtype=float32)), array([1], dtype=int32), TimeStep(step_type=array([1], dtype=int32), reward=array([1.], dtype=float32), discount=array([1.], dtype=float32), observation=array([[ 0.01774678, -0.03669438, -0.01698579,  0.0059627 ]],
      dtype=float32))]
[TimeStep(step_type=array([1], dtype=int32), reward=array([1.], dtype=float32), discount=array([

In [10]:
env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)

time_step = tf_env.reset()
rewards = []
steps = []
num_episodes = 5

for _ in range(num_episodes):
    episode_reward = 0
    episode_steps = 0
    while not time_step.is_last():
        action = tf.random.uniform([1], 0, 2, dtype=tf.int32)
        time_step = tf_env.step(action)
        episode_steps += 1
        episode_reward += time_step.reward.numpy()
    rewards.append(episode_reward)
    steps.append(episode_steps)
    time_step = tf_env.reset()

num_steps = np.sum(steps)
avg_length = np.mean(steps)
avg_reward = np.mean(rewards)

print('num_episodes:', num_episodes, 'num_steps:', num_steps)
print('avg_length', avg_length, 'avg_reward:', avg_reward)

num_episodes: 5 num_steps: 136
avg_length 27.2 avg_reward: 27.2
