<a href="https://colab.research.google.com/github/akimotolab/Policy_Optimization_Tutorial/blob/main/04_deterministic_policy_gradient.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 準備

まず，必要なパッケージのインストールとインポート，および仮想displayを設定します．

In [None]:
# 必要なパッケージのインストール
!apt update
!pip install swig
!apt install xvfb
!pip install pyvirtualdisplay
!pip install gymnasium[box2d]

In [None]:
from pyvirtualdisplay import Display
import torch

# 仮想ディスプレイの設定
_display = Display(visible=False, size=(1400, 900))
_ = _display.start()

In [None]:
import random
import numpy as np
from scipy.special import softmax
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
import seaborn as sns
import gymnasium as gym
from IPython import display

続いて，第１回の資料で定義した`rollout`などの基本的な関数をここでも定義しておきます．

# 決定的方策勾配法

今回は「Twin-Delayed Deep Deterministic Policy Gradient (TD3) アルゴリズム」について見ていきます．
TD3は2018年に発表され，以降，非常に広く利用されているActor-Critic法の一種です．

参考文献：Fujimoto et al. Addressing Function Approximation Error in Actor-Critic Methods, ICML 2018.

## TD3のコード

以下に，TD3の著者らによって公開されているTD3のコードを最新のgymnasiumのインターフェースに合わせるためにわずかに修正したコードを示します．まずは実行してみましょう．

In [None]:
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

経験再生バッファー：過去に訪問にした状態，とった行動，遷移した状態，得られた報酬，終了判定，を保存しておくバッファーです．

In [None]:
class ReplayBuffer:

    def __init__(self, state_dim, action_dim, max_size):
        self.max_size = max_size
        self.state = np.zeros((max_size, state_dim))
        self.action = np.zeros((max_size, action_dim))
        self.next_state = np.zeros((max_size, state_dim))
        self.reward = np.zeros((max_size, 1))
        self.done = np.zeros((max_size, 1))
        self.ptr = 0
        self.size = 0

    def add(self, state, action, next_state, reward, terminated):
        self.state[self.ptr] = state
        self.action[self.ptr] = action
        self.next_state[self.ptr] = next_state
        self.reward[self.ptr] = reward
        self.done[self.ptr] = float(terminated)
        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)

    def get(self, indeces):
        return (
            self.state[indeces],
            self.action[indeces],
            self.next_state[indeces],
            self.reward[indeces],
            self.done[indeces]
        )

    def clear(self):
        self.state[:, :] = 0.0
        self.action[:, :] = 0.0
        self.next_state[:, :] = 0.0
        self.reward[:] = 0.0
        self.done[:] = 0.0
        self.ptr = 0
        self.size = 0

    def sample(self, batch_size):
        ind = np.random.choice(self.size, batch_size, replace=False)
        return self.get(ind)


アクター：方策そのものです．

In [None]:
class Actor(nn.Module):
    def __init__(self, state_dim, hidden1_dim, hidden2_dim, min_action, max_action):
        super(Actor, self).__init__()
        self.nlayer = 2 if hidden2_dim == 0 else 3
        self.l1 = nn.Linear(state_dim, hidden1_dim)
        if self.nlayer == 2:
            self.l3 = nn.Linear(hidden1_dim, len(min_action))
        else:
            self.l2 = nn.Linear(hidden1_dim, hidden2_dim)
            self.l3 = nn.Linear(hidden2_dim, len(min_action))
        self.center_action = torch.FloatTensor((max_action + min_action) / 2)
        self.radius_action = torch.FloatTensor((max_action - min_action) / 2)

    def forward(self, state):
        a = F.relu(self.l1(state))
        if self.nlayer == 3:
            a = F.relu(self.l2(a))
        a = torch.tanh(self.l3(a))
        return self.center_action + self.radius_action * a


クリティック：状態行動の価値を推定する関数です．

In [None]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden1_dim, hidden2_dim):
        super(Critic, self).__init__()
        self.nlayer = 2 if hidden2_dim == 0 else 3
        self.l1 = nn.Linear(state_dim + action_dim, hidden1_dim)
        if self.nlayer == 2:
            self.l3 = nn.Linear(hidden1_dim, 1)
        else:
            self.l2 = nn.Linear(hidden1_dim, hidden2_dim)
            self.l3 = nn.Linear(hidden2_dim, 1)

    def forward(self, state, action):
        sa = torch.cat([state, action], 1)
        q = F.relu(self.l1(sa))
        if self.nlayer == 3:
            q = F.relu(self.l2(q))
        q = self.l3(q)
        return q

TD3：
- save_models : 内部のパラメータを保存し，あとから呼び出して評価できるようにする関数
- load_models : save_modelsで保存したパラメータを読み込む関数
- select_action : 状態をうけとって行動を返す関数．
- select_exploratory_action : 探索のための行動を返す関数．
- train : １ステップの学習を実行する関数

In [None]:
class TD3:
    # https://github.com/sfujim/TD3
    def __init__(self, env, actor, critic1, critic2, state_dim, min_action, max_action, discount, expl_noise, lr, startup_time, tau, policy_noise, noise_clip, policy_freq, buffer_size, batch_size):

        self.env = env

        self.actor = actor
        self.actor_target = copy.deepcopy(self.actor)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=lr)
        self.critic1 = critic1
        self.critic1_target = copy.deepcopy(self.critic1)
        self.critic1_optimizer = torch.optim.Adam(self.critic1.parameters(), lr=lr)
        self.critic2 = critic2
        self.critic2_target = copy.deepcopy(self.critic2)
        self.critic2_optimizer = torch.optim.Adam(self.critic2.parameters(), lr=lr)

        self.max_action = max_action
        self.min_action = min_action
        self.discount = discount
        self.expl_noise = expl_noise
        self.startup_time = startup_time
        self.tau = tau
        self.policy_noise = policy_noise
        self.noise_clip = noise_clip
        self.policy_freq = policy_freq

        self.batch_size = batch_size
        self.buffer = ReplayBuffer(state_dim, len(self.min_action), buffer_size)

        self.total_itr = 0

    def save_models(self, path):
        torch.save(self.actor.state_dict(), path + '_actor')
        torch.save(self.critic1.state_dict(), path + '_critic1')
        torch.save(self.critic2.state_dict(), path + '_critic2')
        torch.save(self.actor_target.state_dict(), path + '_actor_target')
        torch.save(self.critic1_target.state_dict(), path + '_critic1_target')
        torch.save(self.critic2_target.state_dict(), path + '_critic2_target')

    def load_models(self, path):
        self.actor.load_state_dict(torch.load(path + '_actor'))
        self.critic1.load_state_dict(torch.load(path + '_critic1'))
        self.critic2.load_state_dict(torch.load(path + '_critic2'))
        self.actor_target.load_state_dict(torch.load(path + '_actor_target'))
        self.critic1_target.load_state_dict(torch.load(path + '_critic1_target'))
        self.critic2_target.load_state_dict(torch.load(path + '_critic2_target'))

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1))
        return self.actor(state).detach().numpy().flatten()

    def select_exploratory_action(self, state):
        if self.total_itr < self.startup_time:
            action = self.min_action + (self.max_action - self.min_action) * np.random.rand(len(self.min_action))
        else:
            action = self.select_action(np.array(state))
            action += np.random.randn(len(self.min_action)) * (self.expl_noise / 2.0) * (self.max_action - self.min_action)
            action = action.clip(self.min_action, self.max_action)
        return action

    def train(self, state, action, next_state, reward, terminated):
        self.total_itr += 1
        self.buffer.add(state, action, next_state, reward, terminated)
        # Sample replay buffer
        if self.buffer.size < self.batch_size:
            return
        state, action, next_state, reward, terminated = self.buffer.sample(self.batch_size)
        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        next_state = torch.FloatTensor(next_state)
        reward = torch.FloatTensor(reward)
        done = torch.FloatTensor(terminated)

        # train critic
        with torch.no_grad():
            noise = (torch.randn_like(action) * self.policy_noise).clamp(-self.noise_clip, self.noise_clip)
            next_action = (self.actor_target(next_state) - self.actor_target.center_action) / self.actor_target.radius_action
            next_action = (next_action + noise).clamp(-1.0, 1.0) * self.actor_target.radius_action + self.actor_target.center_action
            target_Q1 = reward + self.discount * self.critic1_target(next_state, next_action) * (1 - done)
            target_Q2 = reward + self.discount * self.critic2_target(next_state, next_action) * (1 - done)
            target_Q = torch.min(target_Q1, target_Q2)

        # critic 1
        current_Q1 = self.critic1(state, action)
        critic1_loss = F.mse_loss(current_Q1, target_Q)
        self.critic1_optimizer.zero_grad()
        critic1_loss.backward()
        self.critic1_optimizer.step()

        # critic 2
        current_Q2 = self.critic2(state, action)
        critic2_loss = F.mse_loss(current_Q2, target_Q)
        self.critic2_optimizer.zero_grad()
        critic2_loss.backward()
        self.critic2_optimizer.step()

        if self.total_itr % self.policy_freq == 0:
            # actor
            actor_loss = -self.critic1(state, self.actor(state)).mean()
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # target network
            for param, target_param in zip(self.critic1.parameters(), self.critic1_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
            for param, target_param in zip(self.critic2.parameters(), self.critic2_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

実行スクリプト

In [None]:
from pathlib import Path

ENV_NAME = 'Pendulum-v1'
NEVALS = 10
MAX_STEPS = int(1e5)
LOG_INTERVAL = int(5e3)

seed = 1234

env = gym.make(ENV_NAME)
random.seed(seed)
envseed = random.randint(0, 1000)
actseed = random.randint(0, 1000)
observation, info = env.reset(seed=envseed)
env.action_space.seed(actseed)
torch.manual_seed(seed)
np.random.seed(seed)

directory = Path("td3_pendulum_seed{}".format(seed))
directory.mkdir(parents=True, exist_ok=True)

dim_action = env.action_space.shape[0]
min_action = env.action_space.low * np.ones(dim_action)
max_action = env.action_space.high * np.ones(dim_action)
dim_state = env.observation_space.shape[0]
min_state = env.observation_space.low * np.ones(dim_state)
max_state = env.observation_space.high * np.ones(dim_state)
hidden1_dim = 256
hidden2_dim = 0
discount = 0.99
expl_noise = 0.1
lr = 3e-4
startup_time = int(1e4)
tau = 0.005
policy_noise = 0.2
noise_clip = 0.5
policy_freq = 2
buffer_size = MAX_STEPS
batch_size = 256

actor = Actor(dim_state, hidden1_dim, hidden2_dim, min_action, max_action)
critic1 = Critic(dim_state, dim_action, hidden1_dim, hidden2_dim)
critic2 = Critic(dim_state, dim_action, hidden1_dim, hidden2_dim)
agent = TD3(env, actor, critic1, critic2, dim_state, min_action, max_action, discount, expl_noise, lr, startup_time, tau, policy_noise, noise_clip, policy_freq, buffer_size, batch_size)

state, info = env.reset()
cum_reward_train = 0.0
for t in range(1, MAX_STEPS+1):
    # 探索のための行動選択
    action = agent.select_exploratory_action(state)
    # 状態遷移
    next_state, reward, terminated, truncated, info = env.step(action)
    # Actor および Critic の学習
    agent.train(state, action, next_state, reward, terminated)
    #
    state = next_state
    cum_reward_train += reward
    if terminated or truncated:
        state, info = env.reset()
        print(t, cum_reward_train)
        cum_reward_train = 0
    if t % LOG_INTERVAL == 0:
        agent.save_models(directory.name + '/step{}'.format(t))
env.close()

最終的に得られた方策について，累積報酬（の-1倍）の経験分布関数を確認してみましょう．

In [None]:
seed = 1234

env = gym.make(ENV_NAME)
random.seed(seed)
envseed = random.randint(0, 1000)
actseed = random.randint(0, 1000)
observation, info = env.reset(seed=envseed)
env.action_space.seed(actseed)
torch.manual_seed(seed)
np.random.seed(seed)

return_array = np.zeros(50)
for i in range(len(return_array)):
    cum_reward = 0.0
    state, info = env.reset()
    for t in range(1, MAX_STEPS+1):
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, info = env.step(action)
        state = next_state
        cum_reward += reward
        if terminated or truncated:
            return_array[i] = cum_reward
            break
env.close()

fig, ax = plt.subplots()
sns.ecdfplot(data=-return_array, ax=ax)
plt.grid()

挙動を確認してみましょう．実行するたびに初期値が変わりますので，何度か実行してみましょう．

In [None]:
history = []
img = []

d = Display()
d.start()


# 可視化用エピソードの実行
observation, info = env.reset()
img.append(env.render())
terminated = False
truncated = False
while not (terminated or truncated):
    action = agent.select_action(observation)
    next_observation, reward, terminated, truncated, info = env.step(action)
    history.append([observation, action, next_observation, reward, terminated, truncated, info])
    observation = next_observation
    display.clear_output(wait=True)
    img.append(env.render())
env.close()

# 可視化
dpi = 72
interval = 50
plt.figure(figsize=(img[0].shape[1]/dpi, img[0].shape[0]/dpi), dpi=dpi)
patch = plt.imshow(img[0])
plt.axis=('off')
animate = lambda i: patch.set_data(img[i])
ani = animation.FuncAnimation(plt.gcf(), animate, frames=len(img), interval=interval)
display.display(display.HTML(ani.to_jshtml()))
plt.close()


## アルゴリズムの解説

得られている方策の振る舞いが，`3_actor_critic.ipynb`で学習した方策よりも圧倒的に良いふるまいになっていることが確認できると思います．また，学習時間も大幅に短縮されていることがわかります．その理由について，オンライン更新型のActor-Critic法との違いに着目し，解説していきます．



### 1. 決定的方策

TD3ではその名の通り，「決定的方策」を学習しています．
前述のActor-Critic法では，学習過程において様々な行動を選択し，経験できるように，確率的方策（方策が状態で条件付けされた行動選択確率を表現）を用いていました．他方，TD3では，決定的方策（方策が状態から行動への写像を表現）を採用しています．学習対象となる方策は決定的ですが，学習過程では様々な行動を選択することが必要になります．そこで，TD3では，学習対象となる方策と，探索のための方策（行動方策と呼びます）を分けています．

プログラム中では，`select_action`が学習対象となる方策による行動選択を表しており，`select_exploratory_action`が行動方策による行動選択を表しています．行動方策は，学習対象となる方策の出力に対して，ノイズをのせることで表現しています．ただし，学習初期の段階では，学習対象となる方策が正しく学習されていないため，取りうる行動からランダムに行動を選択するようにしています．

方策が決定的であっても，方策勾配定理と類似の定理が成立し，方策勾配を近似することができます．ここでは本筋から逸れるため，説明は割愛します．



### 2. 経験再生バッファー

学習効率に最も影響する変更点は，経験再生バッファーを利用している点になります．

TD3では前述のオンライン更新型Actor-Critic法と同様，環境とインタラクションするたびに学習（Actor-Criticのパラメータ更新）を実施します．ただし，その際に用いるデータに差があります．
- 前述のオンライン更新型Actor-Critic法：各ステップでの状態遷移のデータ（その時の状態，選択した行動，次状態，報酬）を用いて更新します．すなわち，毎回の学習に使うデータは最新の状態遷移のデータ一つ
- TD3：各ステップでの状態遷移データを経験再生バッファーに保存しておき，毎回の学習時にはそこからランダムに選択されたバッチサイズ（`batch_size`）を用いて学習します．すなわち，毎回の学習には必ずしも最新のデータを含まない`batch_size`個（上の実行では256個）のデータを用いて更新します．

方策勾配の推定に用いるデータ数が多くなるため，推定精度が大幅に向上し，学習率を大きくとることが可能になるため，大幅な効率改善が実現されます．

各学習に多くのデータを用いるので，より多くの計算時間が必要になると思われるかもしれませんが，GPU上で並列処理できるため，`batch_size`をある程度まで大きくとっても計算時間はあまり増加しないことに注意してください．

なお，経験再生バッファーを使用しない（前述のオンライン更新型Actor-Critic法と同様のデータ活用方法に変更）場合には，実行スクリプトにおいて以下のようにパラメータを設定します．
```
buffer_size = 1  # 最新のデータだけを保存
batch_size = 1  # 一度に使うデータは1つだけ
```

### 3. Target Actor & Target Critic

Criticの学習では，TD誤差が小さくなるように学習が進みます．
つまり，Criticが表現している方策の行動価値の推定値 $Q^\omega(s_t, a_t)$ をそのターゲットである $\Delta = r_{t+1} + \gamma Q^\omega(s_{t+1}, \pi_\theta(s_{t+1}))$ に近づけるようにCritic を更新します（`3_actor_critic.ipynb`参照．ここでは状態価値でなく，行動価値を推定していますが，ロジックはほとんど同じです．）
なお，$\omega$や$\theta$はCriticやActorのパラメータを表現しています．
しかし，ターゲットである$\Delta$自体も現在のCritic $Q$やActor $\pi$に依存しています．これが原因で学習が不安定になることが知られています．

これを解消するために，target actor $\pi_{\bar{\theta}}$ と target critic $Q^{\bar{\omega}}$ を用意し，$\Delta$の計算の際に$\pi_\theta$および$Q^\omega$に代わりこれらを用いる方法が提案されています．Target actor と target criticのパラメータは，通常のactorとcriticよりも緩やかに更新されるよう，各ステップにおいて
$$
\bar{\theta} \leftarrow (1-\tau) \bar{\theta} + \tau \theta, \\
\bar{\omega} \leftarrow (1-\tau) \bar{\omega} + \tau \omega$$
（$\tau \ll 1$）のように更新します．なお，これらのパラメータは$\bar{\theta} = \theta$，$\bar{\omega} = \omega$と初期化されます．

なお，$\tau = 1$ (`tau = 1`) とすれば，Target Actor や Target Critic を用いない方法となります．

### 4. Target Policy Smoothing Regularization

TD3では深層ネットワークを用いてcriticとactorを表現します．これにより，高い表現能力を持つcriticやactorをモデル化することができます．
しかし，深層ネットワークをcriticに用いることから，過学習によって望ましくないピークがcriticに現れることがあります．方策はcriticによって近似される行動価値を最大にするような方策を学習するため，望ましくないピークを出力してしまうような方策が獲得される恐れがあります．

これを解消するために，「近い行動の価値は近いはず」という仮定のもと，行動価値関数がなめらかになるよう，$\Delta$計算の際に target actor の出力$\pi_{\bar{\theta}}$にノイズ $\epsilon \sim clip(N(0, \sigma_{target}), -c, c)$ を加えるといった工夫をしています．ただし，行動が取りうる範囲をはみ出した場合には，境界上に射影しています．

なお，上の実行スクリプトにおいて`policy_noise = 0`とすると，この工夫を行わないことになります．

### 5. Delayed Policy Update


Actorの学習はcriticの精度に依存します．Criticは現在actorが表現している方策の行動価値を推定しているため，actorの学習よりも早く進むことが望ましいといえます．

そこで，actorの更新を`policy_freq`ステップに一度だけ行うようにして，criticの学習が先行するように工夫しています．

なお，`policy_freq = 1`とすると，この工夫を使わないことになります．


### 6. Clipped Double Q-Learning

Q学習において，推定された行動価値$Q^\omega(s, \pi(s))$が真の行動価値$Q^\pi(s, \pi(s))$よりも大きく見積もられる問題が，方策勾配法に限らず行動価値を推定する方法において知られています．

行動価値の過大評価を防ぎ，価値の推定精度を改善するために，criticを二つ（$Q^{\omega_1}$，$Q^{\omega_2}$）用意し，これらをそれぞれ学習させ，二つのcriticの更新におけるターゲット$\Delta$の計算の際に，$Q^{\omega_1}(s, a)$と$Q^{\omega_2}(s, a)$のうち小さい値を採用するといった工夫が取られています．

なお，`train`の内部において，
```
target_Q = torch.min(target_Q1, target_Q2)
```
を
```
target_Q = target_Q1
```
と変更すれば，この工夫を行わない方法となります．ただし，内部的には２つ目のcritic関数を保持・学習しています．

# 自習課題

TD3で導入されている上述の6つの工夫について，それぞれの効果を検証しましょう．
複数の工夫の相乗効果を考慮すると全部で$2^6=64$通りの方法を検証することになり，時間がかかります．そのような場合，各コンポーネントだけを取り除いた方法（6つ）と全ての構成要素を加えた方法（1つ）を比較することで，いずれの要素も重要であることを示すアブレーションテスト（ablation test, ablation study）がしばしば用いられます．アブレーションテストを実施し，各コンポーネントの効果を考察してみましょう．

効果は一つの環境（上であれば`Pendulum-v1`）で全て現れるとは限りません．いくつかの環境でテストしてみましょう．TD3は連続状態，連続行動を想定していますので，そのような環境をいくつか選択し，実験結果を比較しましょう．