### TF Agents 사용

https://www.tensorflow.org/agents/tutorials/2_environments_tutorial?hl=ko#introduction

In [None]:
!pip install tf_agents
import tf_agents

from ale_py import ALEInterface

ale = ALEInterface()

In [20]:
import random
# gym 사용
import gym
import numpy as np

In [33]:
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts

In [31]:
# 블랙잭
class CardGameEnv(py_environment.PyEnvironment):
  # 각 계수 초기화. 0에서 1까지 행동을 정함
  def __init__(self):
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')
    self._observation_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=0, name='observation')
    self._state = 0
    self._episode_ended = False
  # 행동 값. 0 아니면 1
  def action_spec(self):
    return self._action_spec
  # 현재 라운드에서 카드 합
  def observation_spec(self):
    return self._observation_spec
  # 초기화
  def _reset(self):
    self._state = 0
    self._episode_ended = False
    return ts.restart(np.array([self._state], dtype=np.int32))
  # step = 라운드
  def _step(self, action):

    if self._episode_ended:
      # 액션이 끝나면 에피소드도 끝난 것. 현재 행동은 무시
      # 에피소드 초기화
      return self.reset()

    # 행동 : 0일 경우 새 카드 받기. 1일 경우 현재 라운드 종료
    if action == 1:
      self._episode_ended = True
    elif action == 0:
      new_card = np.random.randint(1, 11)
      self._state += new_card
    else:
      raise ValueError('`action` should be 0 or 1.')
    # 목표 : 21에 가까울수록 높은 리워드
    if self._episode_ended or self._state >= 21:
      reward = self._state - 21 if self._state <= 21 else -21
      return ts.termination(np.array([self._state], dtype=np.int32), reward)
    else:
      return ts.transition(
          np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

In [99]:
# 에피소드 5개
environment = CardGameEnv()
utils.validate_py_environment(environment, episodes=100)

In [35]:
# 새 카드 뽑기/ 라운드 종료
get_new_card_action = np.array(0, dtype=np.int32)
end_round_action = np.array(1, dtype = np.int32)

# 한번의 스텝
time_step = environment.reset()
print(time_step)

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([0], dtype=int32),
 'reward': array(0., dtype=float32),
 'step_type': array(0, dtype=int32)})


In [38]:
# 스텝 끝나면 리워드
cumulative_reward = time_step.reward
cumulative_reward

array(0., dtype=float32)

In [39]:
# 행동 3회 반복
for i in range(3):
    time_step = environment.step(get_new_card_action)
    print(f"time step:{time_step}")
    cumulative_reward += time_step.reward

time step:TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([6], dtype=int32),
 'reward': array(0., dtype=float32),
 'step_type': array(1, dtype=int32)})
time step:TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([13], dtype=int32),
 'reward': array(0., dtype=float32),
 'step_type': array(1, dtype=int32)})
time step:TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([20], dtype=int32),
 'reward': array(0., dtype=float32),
 'step_type': array(1, dtype=int32)})


In [41]:
time_step = environment.step(end_round_action)
print(time_step)

TimeStep(
{'discount': array(1., dtype=float32),
 'observation': array([0], dtype=int32),
 'reward': array(0., dtype=float32),
 'step_type': array(0, dtype=int32)})


In [42]:
cumulative_reward += time_step.reward
print(f"Final reward :{cumulative_reward}")

Final reward :0.0


In [111]:
# 함수화
def card_game(i):
    get_new_card_action = np.array(0, dtype=np.int32)
    end_round_action = np.array(1, dtype = np.int32)
    time_step = environment.reset()
    cumulative_reward = time_step.reward
    for i in range(1, i+1):
        time_step = environment.step(get_new_card_action)
        print(f"{i} 라운드 점수:{time_step[3]}")
        # print(f"{i} 원본:{time_step}")
        cumulative_reward += time_step.reward
    time_step = environment.step(end_round_action)
    cumulative_reward += time_step.reward
    print(f"최종 점수는? : {cumulative_reward}")
card_game(3)

1 라운드 점수:[6]
2 라운드 점수:[12]
3 라운드 점수:[14]
최종 점수는? : -7.0


In [113]:
card_game(10)

1 라운드 점수:[9]
2 라운드 점수:[10]
3 라운드 점수:[16]
4 라운드 점수:[19]
5 라운드 점수:[24]
6 라운드 점수:[33]
7 라운드 점수:[39]
8 라운드 점수:[42]
9 라운드 점수:[46]
10 라운드 점수:[55]
최종 점수는? : -147.0


#### LunarLander의 경우

In [1]:
!pip install -U gym[box2d]

Collecting gym[box2d]
  Downloading gym-0.26.2.tar.gz (721 kB)
     ------------------------------------- 721.7/721.7 kB 22.9 MB/s eta 0:00:00
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting gym-notices>=0.0.4
  Downloading gym_notices-0.0.8-py3-none-any.whl (3.0 kB)
Collecting swig==4.*
  Downloading swig-4.1.1-py2.py3-none-win_amd64.whl (2.5 MB)
     ---------------------------------------- 2.5/2.5 MB 12.1 MB/s eta 0:00:00
Collecting box2d-py==2.3.5
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
     ------------------------------------- 374.4/374.4 kB 24.3 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting pygame==2.1.0

  error: subprocess-exited-with-error
  
  python setup.py bdist_wheel did not run successfully.
  exit code: 1
  
  [16 lines of output]
  Using setuptools (version 65.6.3).
  running bdist_wheel
  running build
  running build_py
  creating build
  creating build\lib.win-amd64-cpython-39
  creating build\lib.win-amd64-cpython-39\Box2D
  copying library\Box2D\Box2D.py -> build\lib.win-amd64-cpython-39\Box2D
  copying library\Box2D\__init__.py -> build\lib.win-amd64-cpython-39\Box2D
  creating build\lib.win-amd64-cpython-39\Box2D\b2
  copying library\Box2D\b2\__init__.py -> build\lib.win-amd64-cpython-39\Box2D\b2
  running build_ext
  building 'Box2D._Box2D' extension
  swigging Box2D\Box2D.i to Box2D\Box2D_wrap.cpp
  swig.exe -python -c++ -IBox2D -small -O -includeall -ignoremissing -w201 -globals b2Globals -outdir library\Box2D -keyword -w511 -D_SWIG_KWARGS -o Box2D\Box2D_wrap.cpp Box2D\Box2D.i
  error: command 'swig.exe' failed: None
  [end of output]
  
  note: This error originate

NameError: name 'gym' is not defined

In [3]:
# 설치
# jupyter notebook으로만 가능
import gym
env = gym.make("LunarLander-v2")

In [11]:
env.observation_space

Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32)

In [14]:
# 상태 초기화
obs = env.reset()
obs

(array([ 0.00476694,  1.4094868 ,  0.4828205 , -0.06370707, -0.00551687,
        -0.10936606,  0.        ,  0.        ], dtype=float32),
 {})

https://gymnasium.farama.org/environments/box2d/lunar_lander/

![lunar_lander.gif](attachment:lunar_lander.gif)

In [37]:
# policy network
import keras
import tensorflow as tf
import numpy as np
n_inputs = env.observation_space.shape[0]
n_outputs = env.action_space.n

model = keras.models.Sequential([
    keras.layers.Dense(32, activation="relu", input_shape=[n_inputs]),
    keras.layers.Dense(32, activation="relu"),
    keras.layers.Dense(n_outputs, activation="softmax"),
])

In [79]:
# cart_pole에서 사용한 one step 함수 수정(- 초기 하나의 스텝 정의)

def lander_play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        obs = np.array(obs)
        probas = model(obs[np.newaxis])
        logits = tf.math.log(probas + keras.backend.epsilon())
        action = tf.random.categorical(logits, num_samples=1)
        loss = tf.reduce_mean(loss_fn(action, probas))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(action[0, 0].numpy())
    return obs, reward, done, grads

In [92]:
def lander_play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        obs = tf.convert_to_tensor(obs)
        obs = tf.expand_dims(obs, axis=0)
        probas = model(obs)
        logits = tf.math.log(probas + keras.backend.epsilon())
        action = tf.random.categorical(logits, num_samples=1)
        loss = tf.reduce_mean(loss_fn(action, probas))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(action[0, 0].numpy())
    return obs, reward, done, grads

In [93]:
# 마찬가지로 multiple_episode도 로드하여 수정(에피소드를 계속 반복하여 보상과 그레디언트 저장)
def lander_play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = lander_play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

In [88]:
# 할인된 리워드 저장
def discount_rewards(rewards, discount_rate):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted
# 에피소드를 지속하면서 할인된 리워드 normalize
# 그 행동이 좋은 것인지 나쁜 것인지 지
def discount_and_normalize_rewards(all_rewards, discount_rate):
    all_discounted_rewards = [discount_rewards(rewards, discount_rate)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]

In [89]:
# 범주형 크로스엔트로피 - 최적화함수
optimizer = keras.optimizers.Nadam(learning_rate=0.005)
loss_fn = keras.losses.sparse_categorical_crossentropy

In [90]:
# 계수 설정
n_iterations = 100
n_episodes_per_update = 8
n_max_steps = 500
discount_rate = 0.99

In [94]:
# 모델 학습 시작
mean_rewards = []

for iteration in range(n_iterations):
    all_rewards, all_grads = lander_play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)
    mean_reward = sum(map(sum, all_rewards)) / n_episodes_per_update
    print("\rIteration: {}/{}, mean reward: {:.1f}  ".format(
        iteration + 1, n_iterations, mean_reward), end="")
    mean_rewards.append(mean_reward)
    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_rate)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

ValueError: Can't convert non-rectangular Python sequence to Tensor.