In [None]:
import gymnasium as gym
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import display, HTML
import numpy as np  
import torch
from torch import nn
from torch.distributions import Normal
import torch.nn.functional as F
from abc import ABC, abstractmethod
from time import time
from datetime import timedelta
import math
from torch.utils.data import Dataset

In [None]:
# プレイの様子を動画で見てみるための関数
def display_video(frames):
    plt.figure(figsize=(8, 8), dpi=50)
    patch = plt.imshow(frames[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frames[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=50)
    display(HTML(anim.to_jshtml()))
    plt.close()

# CarRacing環境の確認

In [None]:
env = gym.make('CarRacing-v3') # 環境の作成

obs, info = env.reset(seed=0)
frames = []
total_reward = 0
print(obs.shape)  # (96, 96, 3)

for _ in range(1000):
    frames.append(obs)
    action = env.action_space.sample()  # 行動空間から一様ランダムに行動をサンプル
    next_obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    total_reward += reward
    obs = next_obs
    if done:
        env.reset()

print('Reward: ', total_reward)
display_video(frames)

# 学習に用いるクラスを定義

In [None]:
# 学習を効率的に進めるために環境をラップするクラスを定義する
# rgbの３チャネルをグレースケールに変換し、4フレーム分をスタックして状態とする
# 1ステップの間に6回行動を繰り返し、その合計報酬を1ステップの報酬とする
# 直近100ステップの平均報酬が-0.1以下になったらエピソードを終了する
# また、コースから大幅に外れた場合もエピソードを終了する

class WrappedEnv():
    def __init__(self, seed=0):
        self.env = gym.make('CarRacing-v3')
        self.seed = seed
    
    def reset(self, visualize=False):
        self.visualize = visualize
        self.frames = []
        self.counter = 0
        self.av_r = self.reward_memory()

        img_rgb, info = self.env.reset(seed=self.seed)
        img_gray = self.rgb2gray(img_rgb)
        self.stack = [img_gray, img_gray, img_gray, img_gray]
        return np.array(self.stack)
    
    def step(self, action):
        total_reward = 0
        for _ in range(6):
            img_rgb, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            if self.visualize:
                self.frames.append(img_rgb)
            if np.mean(img_rgb[:, :, 0]) > 185.0: # コース外の色が緑色なので、緑の成分が大きいとコース外とみなす
                reward -= 0.05
            total_reward += reward
            if self.av_r(reward) <= -0.1:
                done = True
            if done:
                break
        img_gray = self.rgb2gray(img_rgb)
        self.stack.pop(0)
        self.stack.append(img_gray)
        return np.array(self.stack), total_reward, done, info
    
    @staticmethod
    def rgb2gray(rgb, norm=True):
        # rgb image -> gray [0, 1]
        gray = np.dot(rgb[..., :], [0.299, 0.587, 0.114])
        if norm:
            # normalize
            gray = gray / 128. - 1.
        return gray

    @staticmethod
    def reward_memory():
        # record reward for last 100 steps
        count = 0
        length = 100
        history = np.zeros(length)

        def memory(reward):
            nonlocal count
            history[count] = reward
            count = (count + 1) % length
            return np.mean(history)

        return memory

In [None]:
# 第三回講義資料のものに、early stoppingとbest modelを保存する機能を追加したもの
class Trainer:

    def __init__(self, env, algo, seed=0, num_steps=10**6, eval_interval=10**4, num_eval_episodes=3, is_early_stop=True):

        self.env = env
        self.algo = algo
        self.best_return = -float('inf')
        self.eval_times_from_best = 0
        self.is_early_stop = is_early_stop

        # 平均収益を保存するための辞書．
        self.returns = {'step': [], 'return': []}

        # データ収集を行うステップ数．
        self.num_steps = num_steps
        # 評価の間のステップ数(インターバル)．
        self.eval_interval = eval_interval
        # 評価を行うエピソード数．
        self.num_eval_episodes = num_eval_episodes

    def train(self):
        """ num_stepsステップの間，データ収集・学習・評価を繰り返す． """

        # 学習開始の時間
        self.start_time = time()
        # エピソードのステップ数．
        t = 0

        # 環境を初期化する．
        state = self.env.reset()

        for steps in range(1, self.num_steps + 1):
            # 環境(self.env)，現在の状態(state)，現在のエピソードのステップ数(t)，今までのトータルのステップ数(steps)を
            # アルゴリズムに渡し，状態・エピソードのステップ数を更新する．
            state, t = self.algo.step(self.env, state, t, steps)

            # アルゴリズムが準備できていれば，1回学習を行う．
            if self.algo.is_update(steps):
                self.algo.update()

            # 一定のインターバルで評価する．
            if steps % self.eval_interval == 0:
                early_stop = self.evaluate(steps)
                if early_stop:
                    break


    def evaluate(self, steps):
        """ 複数エピソード環境を動かし，平均収益を記録する． """

        returns = []
        for _ in range(self.num_eval_episodes):
            state = self.env.reset()
            done = False
            episode_return = 0.0

            while (not done):
                action = self.algo.exploit(state)
                state, reward, done, info = self.env.step(action)
                episode_return += reward

            returns.append(episode_return)

        mean_return = np.mean(returns)
        self.returns['step'].append(steps)
        self.returns['return'].append(mean_return)

        print(f'Num steps: {steps:<6}   '
              f'Return: {mean_return:<5.1f}   '
              f'Time: {self.time}')
        
        if mean_return > self.best_return:
            self.best_return = mean_return
            self.eval_times_from_best = 0
            self.algo.save_best_model()
            print(f'Best model saved with return: {self.best_return:.1f}')
        else:
            self.eval_times_from_best += 1
        
        if self.is_early_stop and self.eval_times_from_best >= 10:
            print("Early stopping as no improvement in the last 10 evaluations.")
            return True

        return False

    def visualize(self):
        """ 1エピソード分動画を再生する． """
        state = self.env.reset(visualize=True)
        total_reward = 0
        done = False

        while done is False:
            action = self.algo.exploit(state)
            state, reward, done, info = self.env.step(action)
            total_reward += reward

        print('Reward: ', total_reward)
        display_video(self.env.frames)

    def plot(self):
        """ 平均収益のグラフを描画する． """
        fig = plt.figure(figsize=(8, 6))
        plt.plot(self.returns['step'], self.returns['return'])
        plt.xlabel('Steps', fontsize=24)
        plt.ylabel('Return', fontsize=24)
        plt.tick_params(labelsize=18)
        plt.title(f'{self.env.env.unwrapped.spec.id}', fontsize=24)
        plt.tight_layout()

    def load_best_model(self):
        """ 最良のモデルパラメータを読み込む． """
        self.algo.load_best_model()
        self.best_return = -float('inf')
        self.eval_times_from_best = 0
        self.returns = {'step': [], 'return': []}

    @property
    def time(self):
        """ 学習開始からの経過時間． """
        return str(timedelta(seconds=int(time() - self.start_time)))

In [None]:
class Algorithm(ABC):

    def explore(self, state):
        """ 確率論的な行動と，その行動の確率密度の対数 \log(\pi(a|s)) を返す． """
        state = torch.tensor(state, dtype=torch.float, device=self.device).unsqueeze_(0)  
        with torch.no_grad():
            action, log_pi = self.actor.sample(state)
        return action.cpu().numpy()[0], log_pi.item()

    def exploit(self, state):
        """ 決定論的な行動を返す． """
        state = torch.tensor(state, dtype=torch.float, device=self.device).unsqueeze_(0)  
        with torch.no_grad():
            action = self.actor(state)
        return action.cpu().numpy()[0]

    @abstractmethod
    def is_update(self, steps):
        """ 現在のトータルのステップ数(steps)を受け取り，アルゴリズムを学習するか否かを返す． """
        pass

    @abstractmethod
    def step(self, env, state, t, steps):
        """ 環境(env)，現在の状態(state)，現在のエピソードのステップ数(t)，今までのトータルのステップ数(steps)を
            受け取り，リプレイバッファへの保存などの処理を行い，状態・エピソードのステップ数を更新する．
        """
        pass

    @abstractmethod
    def update(self):
        """ 1回分の学習を行う． """
        pass

    @abstractmethod
    def save_best_model(self):
        """ 最良のモデルパラメータを保存する． """
        pass

    @abstractmethod
    def load_best_model(self):
        """ 最良のモデルパラメータを読み込む． """
        pass

# PPOの実装

## 必要な関数の定義

In [None]:
def calculate_log_pi_tanh(log_stds, noises, actions):
    """ 確率論的な行動の確率密度を返す． """
    # ガウス分布 `N(0, stds * I)` における `noises * stds` の確率密度の対数(= \log \pi(u|a))を計算する．
    # (torch.distributions.Normalを使うと無駄な計算が生じるので，下記では直接計算しています．)
    gaussian_log_probs = \
        (-0.5 * noises.pow(2) - log_stds).sum(dim=-1, keepdim=True) - 0.5 * math.log(2 * math.pi) * log_stds.size(-1)

    # tanh による確率密度の変化を修正する．
    log_pis = gaussian_log_probs - torch.log(1 - actions.pow(2) + 1e-6).sum(dim=-1, keepdim=True)

    return log_pis

def calculate_log_pi_sigmoid(log_stds, noises, actions):
    """ 確率論的な行動の確率密度を返す (sigmoid版). """
    # ガウス分布の対数確率密度
    gaussian_log_probs = \
        (-0.5 * noises.pow(2) - log_stds).sum(dim=-1, keepdim=True) - 0.5 * math.log(2 * math.pi) * log_stds.size(-1)

    # sigmoid のヤコビアン補正
    log_pis = gaussian_log_probs -torch.log(actions * (1 - actions) + 1e-6).sum(dim=-1, keepdim=True)

    return log_pis

def reparameterize(means, log_stds):
    """ Reparameterization Trickを用いて，確率論的な行動とその確率密度を返す． """
    # 標準偏差．
    stds = log_stds.exp()
    # 標準ガウス分布から，ノイズをサンプリングする．
    noises = torch.randn_like(means)
    # Reparameterization Trickを用いて，N(means, stds)からのサンプルを計算する．
    us = means + noises * stds
    # tanh　を適用し，確率論的な行動を計算する．
    a0 = torch.tanh(us[:, 0:1])   # shape (B, 1)  <- preserves batch dim
    a12 = torch.sigmoid(us[:, 1:3])# shape (B, 2)
    actions = torch.cat([a0, a12], dim=1)  # shape (B, 3)

    # 確率論的な行動の確率密度の対数を計算する．
    log_pis_tanh = calculate_log_pi_tanh(log_stds[:, 0:1], noises[:, 0:1], a0)
    log_pis_sigmoid = calculate_log_pi_sigmoid(log_stds[:, 1:3], noises[:, 1:3], a12)
    log_pis = log_pis_tanh + log_pis_sigmoid

    return actions, log_pis

def atanh(x):
    """ tanh の逆関数． """
    return 0.5 * (torch.log(1 + x + 1e-6) - torch.log(1 - x + 1e-6))

def logit(x):
    """ sigmoid の逆関数 """
    # x が 0 または 1 にならないように eps を加える
    eps = 1e-6
    x = x.clamp(min=eps, max=1-eps)
    return torch.log(x / (1 - x))

def evaluate_log_pi(means, log_stds, actions):
    """ 平均(mean)，標準偏差の対数(log_stds)でパラメータ化した方策における，行動(actions)の確率密度の対数を計算する． """
    stds = log_stds.exp()
    noises0 = (atanh(actions[:, 0:1]) - means[:, 0:1]) / (stds[:, 0:1] + 1e-8)
    noises12 = (logit(actions[:, 1:3]) - means[:, 1:3]) / (stds[:, 1:3] + 1e-8)
    return calculate_log_pi_tanh(log_stds[:, 0:1], noises0, actions[:, 0:1]) + \
           calculate_log_pi_sigmoid(log_stds[:, 1:3], noises12, actions[:, 1:3])

## nnの実装

In [None]:
import torch.nn as nn

class CarRacingEncoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),  # -> (32, 23, 23)
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), # -> (64, 10, 10)
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), # -> (64, 8, 8)
            nn.ReLU()
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 8 * 8, 512),  # 4096 → 512
            nn.ReLU()
        )

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x  # shape: (batch, 512)

In [None]:
class PPOActor(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.head = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 3)
        )
        self.log_stds = nn.Parameter(torch.zeros(1, 3))

    def forward(self, states):
        h = self.encoder(states)
        action = self.head(h)
        a0 = torch.tanh(action[:, 0:1])   # shape (B, 1)  <- preserves batch dim
        a12 = torch.sigmoid(action[:, 1:3])  # shape (B, 2)
        action = torch.cat([a0, a12], dim=1)  # shape (B, 3)
        return action

    def sample(self, states):
        h = self.encoder(states)
        means = self.head(h)
        return reparameterize(means, self.log_stds)

    def evaluate_log_pi(self, states, actions):
        h = self.encoder(states)
        means = self.head(h)
        return evaluate_log_pi(means, self.log_stds, actions)

In [None]:
class PPOCritic(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.net = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 1),
        )

    def forward(self, states):
        h = self.encoder(states)
        return self.net(h)

## GAE

In [None]:
def calculate_advantage(values, rewards, dones, next_values, gamma=0.995, lambd=0.997):
    """ GAEを用いて，状態価値のターゲットとGAEを計算する． """

    # TD誤差を計算する．
    deltas = rewards + gamma * next_values * (1 - dones) - values

    # GAEを初期化する．
    advantages = torch.empty_like(rewards)

    # 終端ステップを計算する．
    advantages[-1] = deltas[-1]

    # 終端ステップの1つ前から，順番にGAEを計算していく．
    for t in reversed(range(rewards.size(0) - 1)):
        advantages[t] = deltas[t] + gamma * lambd * (1 - dones[t]) * advantages[t + 1]

    # 状態価値のターゲットをλ-収益として計算する．
    targets = advantages + values

    # GAEを標準化する．
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    return targets, advantages

## 学習アルゴリズム

In [None]:
class RolloutBuffer:

    def __init__(self, buffer_size, state_shape, action_shape, device=torch.device('cuda')):
        storage_shape = (buffer_size, *state_shape)
        self.states = torch.empty(storage_shape, dtype=torch.float, device=device)
        self.actions = torch.empty((buffer_size, *action_shape), dtype=torch.float, device=device)
        self.rewards = torch.empty((buffer_size, 1), dtype=torch.float, device=device)
        self.dones = torch.empty((buffer_size, 1), dtype=torch.float, device=device)
        self.log_pis = torch.empty((buffer_size, 1), dtype=torch.float, device=device)
        self.next_states = torch.empty(storage_shape, dtype=torch.float, device=device)

        # 次にデータを挿入するインデックス．
        self._p = 0
        # バッファのサイズ．
        self.buffer_size = buffer_size

    def append(self, state, action, reward, done, log_pi, next_state):
        # state, next_state: numpy in HWC -> convert to CHW tensor on buffer device
        dev = self.states.device
        self.states[self._p].copy_(torch.from_numpy(state).float().to(dev))
        self.actions[self._p].copy_(torch.from_numpy(action).float().to(dev))
        self.rewards[self._p] = float(reward)
        self.dones[self._p] = float(done)
        self.log_pis[self._p] = float(log_pi)
        self.next_states[self._p].copy_(torch.from_numpy(next_state).float().to(dev))
        self._p = (self._p + 1) % self.buffer_size

    def get(self):
        assert self._p == 0, 'Buffer needs to be full before training.'
        return self.states, self.actions, self.rewards, self.dones, self.log_pis, self.next_states

In [None]:
# num_updatesを15としているが、10などもっと小さくした方が、clippingがききやすく安定するはず。

class PPO(Algorithm):

    def __init__(self, state_shape, action_shape, device=torch.device('cuda'), seed=0,
                 batch_size=256, gamma=0.995, lr_actor=3e-4, lr_critic=3e-4,
                 rollout_length=2048, num_updates=15, clip_eps=0.2, lambd=0.97,
                 coef_ent=0.0, max_grad_norm=0.5, actor_class=PPOActor, critic_class=PPOCritic,
                 encoder_class=CarRacingEncoder, best_model_path='ppo_best_model.pth'):
        super().__init__()

        # シードを設定する．
        np.random.seed(seed)
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)

        # データ保存用のバッファ．
        self.buffer = RolloutBuffer(
            buffer_size=rollout_length,
            state_shape=state_shape,
            action_shape=action_shape,
            device=device
        )

        # Actor-Criticのネットワークを構築する．
        a_encoder = encoder_class().to(device)
        c_encoder = encoder_class().to(device)
        self.actor = actor_class(a_encoder).to(device)
        self.critic = critic_class(c_encoder).to(device)

        # オプティマイザ．
        self.optim_actor = torch.optim.Adam(self.actor.parameters(), lr=lr_actor)
        self.optim_critic = torch.optim.Adam(self.critic.parameters(), lr=lr_critic)

        # その他パラメータ．
        self.learning_steps = 0
        self.device = device
        self.batch_size = batch_size
        self.gamma = gamma
        self.rollout_length = rollout_length
        self.num_updates = num_updates
        self.clip_eps = clip_eps
        self.lambd = lambd
        self.coef_ent = coef_ent
        self.max_grad_norm = max_grad_norm
        self.best_model_path = best_model_path

    def is_update(self, steps):
        # ロールアウト1回分のデータが溜まったら学習する．
        return steps % self.rollout_length == 0

    def step(self, env, state, t, steps):
        t += 1

        action, log_pi = self.explore(state)
        next_state, reward, done, info = env.step(action)
        
        # バッファにデータを追加する．
        self.buffer.append(state, action, reward, done, log_pi, next_state)

        # エピソードが終了した場合には，環境をリセットする．
        if done:
            t = 0
            next_state = env.reset()

        return next_state, t

    def update(self):
        self.learning_steps += 1

        states, actions, rewards, dones, log_pis, next_states = self.buffer.get()

        with torch.no_grad():
            values = self.critic(states)
            next_values = self.critic(next_states)
        targets, advantages = calculate_advantage(values, rewards, dones, next_values, self.gamma, self.lambd)

        # バッファ内のデータを num_updates回ずつ使って，ネットワークを更新する．
        for _ in range(self.num_updates):
            # インデックスをシャッフルする．
            indices = np.arange(self.rollout_length)
            np.random.shuffle(indices)

            # ミニバッチに分けて学習する．
            for start in range(0, self.rollout_length, self.batch_size):
                idxes = indices[start:start+self.batch_size]
                self.update_critic(states[idxes], targets[idxes])
                self.update_actor(states[idxes], actions[idxes], log_pis[idxes], advantages[idxes])

    def update_critic(self, states, targets):
        loss_critic = (self.critic(states) - targets).pow_(2).mean()

        self.optim_critic.zero_grad()
        loss_critic.backward(retain_graph=False)
        # 学習を安定させるヒューリスティックとして，勾配のノルムをクリッピングする．
        nn.utils.clip_grad_norm_(self.critic.parameters(), self.max_grad_norm)
        self.optim_critic.step()

    def update_actor(self, states, actions, log_pis_old, advantages):
        log_pis = self.actor.evaluate_log_pi(states, actions)
        mean_entropy = -log_pis.mean()

        ratios = (log_pis - log_pis_old).exp_()
        loss_actor1 = -ratios * advantages
        loss_actor2 = -torch.clamp(
            ratios,
            1.0 - self.clip_eps,
            1.0 + self.clip_eps
        ) * advantages
        loss_actor = torch.max(loss_actor1, loss_actor2).mean() - self.coef_ent * mean_entropy

        self.optim_actor.zero_grad()
        loss_actor.backward(retain_graph=False)
        # 学習を安定させるヒューリスティックとして，勾配のノルムをクリッピングする．
        nn.utils.clip_grad_norm_(self.actor.parameters(), self.max_grad_norm)
        self.optim_actor.step()

    def save_best_model(self):
        torch.save({
            'actor_state_dict': self.actor.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'optim_actor_state_dict': self.optim_actor.state_dict(),
            'optim_critic_state_dict': self.optim_critic.state_dict(),
        }, self.best_model_path)

    def load_best_model(self):
        checkpoint = torch.load(self.best_model_path, map_location=self.device)
        self.actor.load_state_dict(checkpoint['actor_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        self.optim_actor.load_state_dict(checkpoint['optim_actor_state_dict'])
        self.optim_critic.load_state_dict(checkpoint['optim_critic_state_dict'])

## 学習の実行

In [None]:
ENV_ID = 'CarRacing-v3'
SEED = 0
ROLLOUT_LENGTH = 2048
NUM_STEPS = 500 * ROLLOUT_LENGTH
EVAL_INTERVAL = 5 * ROLLOUT_LENGTH

env = WrappedEnv()
state_example = env.reset()

algo = PPO(
    state_shape=state_example.shape,
    action_shape=env.env.action_space.shape,
    seed=SEED,
    device=torch.device("cpu"),
    rollout_length=ROLLOUT_LENGTH,
)

trainer = Trainer(
    env=env,
    algo=algo,
    seed=SEED,
    num_steps=NUM_STEPS,
    eval_interval=EVAL_INTERVAL,
    is_early_stop=False
)

In [None]:
# CPUで学習する場合，使用するスレッド数を設定する．
# ryzen9 7950x 16-core 32-threads を使用した.
torch.set_num_threads(32)

In [None]:
trainer.train()
trainer.plot()
trainer.visualize()

In [None]:
trainer.load_best_model()
trainer.visualize()

# actorの構成を変更し、性能を比較する

## ガウス分布からベータ分布に変更

In [None]:
from torch.distributions import Beta

class PPOActorBeta(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.head = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 6),
            nn.Softplus()
        )

    def forward(self, states):
        h = self.encoder(states)
        action = self.head(h)
        alpha = action[:, 0:3] + 1.0
        beta = action[:, 3:6] + 1.0
        action = alpha / (alpha + beta)  # expectation of Beta distribution
        action = action * torch.tensor([2., 1., 1.], device=action.device) + torch.tensor([-1., 0., 0.], device=action.device)
        return action

    def sample(self, states):
        h = self.encoder(states)
        action = self.head(h)
        alpha = action[:, 0:3] + 1.0
        beta = action[:, 3:6] + 1.0
        dist = Beta(alpha, beta)
        action = dist.sample()
        log_pis = dist.log_prob(action).sum(dim=-1, keepdim=True)
        action = action * torch.tensor([2., 1., 1.], device=action.device) + torch.tensor([-1., 0., 0.], device=action.device)
        return action, log_pis

    def evaluate_log_pi(self, states, actions):
        h = self.encoder(states)
        action = self.head(h)
        alpha = action[:, 0:3] + 1.0
        beta = action[:, 3:6] + 1.0
        dist = Beta(alpha, beta)
        actions = (actions - torch.tensor([-1., 0., 0.], device=actions.device)) / torch.tensor([2., 1., 1.], device=actions.device)
        log_pis = dist.log_prob(actions).sum(dim=-1, keepdim=True)
        return log_pis

In [None]:
ENV_ID = 'CarRacing-v3'
SEED = 0
ROLLOUT_LENGTH = 2048
NUM_STEPS = 500 * ROLLOUT_LENGTH
EVAL_INTERVAL = 5 * ROLLOUT_LENGTH

env = WrappedEnv()
state_example = env.reset()

algo = PPO(
    state_shape=state_example.shape,
    action_shape=env.env.action_space.shape,
    seed=SEED,
    device=torch.device("cpu"),
    rollout_length=ROLLOUT_LENGTH,
    actor_class=PPOActorBeta,
    best_model_path='ppo_beta_best_model.pth'
)

trainer = Trainer(
    env=env,
    algo=algo,
    seed=SEED,
    num_steps=NUM_STEPS,
    eval_interval=EVAL_INTERVAL,
    is_early_stop=False
)

In [None]:
trainer.train()
trainer.plot()
trainer.visualize()

In [None]:
trainer.load_best_model()
trainer.visualize()

## ガウス分布の標準偏差もnnの出力に変更

In [None]:
class PPOActorGaussian2(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.head = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 6)
        )

    def forward(self, states):
        h = self.encoder(states)
        action = self.head(h)
        a0 = torch.tanh(action[:, 0:1])
        a12 = torch.sigmoid(action[:, 1:3])
        action = torch.cat([a0, a12], dim=1)
        return action

    def sample(self, states):
        h = self.encoder(states)
        actions = self.head(h)
        means = actions[:, 0:3]
        log_stds = -F.softplus(actions[:, 3:6])  # log_stdsの範囲を０以下に制限
        return reparameterize(means, log_stds)

    def evaluate_log_pi(self, states, old_actions):
        h = self.encoder(states)
        actions = self.head(h)
        means = actions[:, 0:3]
        log_stds = -F.softplus(actions[:, 3:6])  # log_stdsの範囲を０以下に制限
        return evaluate_log_pi(means, log_stds, old_actions)
    

In [None]:
ENV_ID = 'CarRacing-v3'
SEED = 0
ROLLOUT_LENGTH = 2048
NUM_STEPS = 500 * ROLLOUT_LENGTH
EVAL_INTERVAL = 5 * ROLLOUT_LENGTH

env = WrappedEnv()
state_example = env.reset()

algo = PPO(
    state_shape=state_example.shape,
    action_shape=env.env.action_space.shape,
    seed=SEED,
    device=torch.device("cpu"),
    rollout_length=ROLLOUT_LENGTH,
    actor_class=PPOActorGaussian2,
    best_model_path='ppo_gaussian2_best_model.pth'
)

trainer = Trainer(
    env=env,
    algo=algo,
    seed=SEED,
    num_steps=NUM_STEPS,
    eval_interval=EVAL_INTERVAL,
    is_early_stop=False
)

In [None]:
trainer.train()
trainer.plot()
trainer.visualize()

In [None]:
trainer.load_best_model()
trainer.visualize()

## ハンドル操作をガウス分布（標準偏差はnnと独立した学習可能パラメータ）、アクセルとブレーキをベータ分布に変更

In [None]:
def reparameterize_for_tanh(means, log_stds):
    stds = log_stds.exp()
    noises = torch.randn_like(means)
    us = means + noises * stds
    actions = torch.tanh(us)

    log_pis = calculate_log_pi_tanh(log_stds, noises, actions)

    return actions, log_pis

In [None]:
class PPOActorGausPlusBeta(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.head = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 5)
        )
        self.log_stds = nn.Parameter(torch.zeros(1, 1))

    def forward(self, states):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        beta_action = alpha / (alpha + beta)  # expectation of Beta distribution
        action = torch.cat([torch.tanh(mean), beta_action], dim=1)
        return action

    def sample(self, states):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        dist = Beta(alpha, beta)
        beta_action = dist.sample()
        beta_log_pis = dist.log_prob(beta_action).sum(dim=-1, keepdim=True)
        gaus_action, gaus_log_pis = reparameterize_for_tanh(mean, self.log_stds)
        log_pis = beta_log_pis + gaus_log_pis
        action = torch.cat([gaus_action, beta_action], dim=1)
        return action, log_pis

    def evaluate_log_pi(self, states, old_actions):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        dist = Beta(alpha, beta)
        beta_log_pis = dist.log_prob(old_actions[:, 1:3]).sum(dim=-1, keepdim=True)
        stds = self.log_stds.exp()
        noises = (atanh(old_actions[:, 0:1]) - mean) / (stds + 1e-8)
        gaus_log_pis = calculate_log_pi_tanh(self.log_stds, noises, old_actions[:, 0:1])
        return beta_log_pis + gaus_log_pis

In [None]:
ENV_ID = 'CarRacing-v3'
SEED = 0
ROLLOUT_LENGTH = 2048
NUM_STEPS = 500 * ROLLOUT_LENGTH
EVAL_INTERVAL = 5 * ROLLOUT_LENGTH

env = WrappedEnv()
state_example = env.reset()

algo = PPO(
    state_shape=state_example.shape,
    action_shape=env.env.action_space.shape,
    seed=SEED,
    device=torch.device("cpu"),
    rollout_length=ROLLOUT_LENGTH,
    actor_class=PPOActorGausPlusBeta,
    best_model_path='ppo_gaus_plus_beta_best_model.pth'
)

trainer = Trainer(
    env=env,
    algo=algo,
    seed=SEED,
    num_steps=NUM_STEPS,
    eval_interval=EVAL_INTERVAL,
    is_early_stop=False
)

In [None]:
trainer.train()
trainer.plot()
trainer.visualize()

In [None]:
trainer.load_best_model()
trainer.visualize()

## ハンドル操作をガウス分布（標準偏差もnnの出力を利用）、アクセルとブレーキをベータ分布に変更

In [None]:
class PPOActorGausPlusBeta2(nn.Module):

    def __init__(self, encoder):
        super().__init__()
        self.encoder = encoder
        self.head = nn.Sequential(
            nn.Linear(512, 64),
            nn.Tanh(),
            nn.Linear(64, 6)
        )

    def forward(self, states):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        beta_action = alpha / (alpha + beta)  # expectation of Beta distribution
        action = torch.cat([torch.tanh(mean), beta_action], dim=1)
        return action

    def sample(self, states):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        log_stds = -F.softplus(action[:, 5:6])  # log_stdsの範囲を０以下に制限  
        dist = Beta(alpha, beta)
        beta_action = dist.sample()
        beta_log_pis = dist.log_prob(beta_action).sum(dim=-1, keepdim=True)
        gaus_action, gaus_log_pis = reparameterize_for_tanh(mean, log_stds)
        log_pis = beta_log_pis + gaus_log_pis
        action = torch.cat([gaus_action, beta_action], dim=1)
        return action, log_pis

    def evaluate_log_pi(self, states, old_actions):
        h = self.encoder(states)
        action = self.head(h)
        mean = action[:, 0:1]
        alpha = F.softplus(action[:, 1:3]) + 1.0
        beta = F.softplus(action[:, 3:5]) + 1.0
        log_stds = -F.softplus(action[:, 5:6])  # log_stdsの範囲を０以下に制限
        dist = Beta(alpha, beta)
        beta_log_pis = dist.log_prob(old_actions[:, 1:3]).sum(dim=-1, keepdim=True)
        stds = log_stds.exp()
        noises = (atanh(old_actions[:, 0:1]) - mean) / (stds + 1e-8)
        gaus_log_pis = calculate_log_pi_tanh(log_stds, noises, old_actions[:, 0:1])
        return beta_log_pis + gaus_log_pis

In [None]:
ENV_ID = 'CarRacing-v3'
SEED = 0
ROLLOUT_LENGTH = 2048
NUM_STEPS = 500 * ROLLOUT_LENGTH
EVAL_INTERVAL = 5 * ROLLOUT_LENGTH

env = WrappedEnv()
state_example = env.reset()

algo = PPO(
    state_shape=state_example.shape,
    action_shape=env.env.action_space.shape,
    seed=SEED,
    device=torch.device("cpu"),
    rollout_length=ROLLOUT_LENGTH,
    actor_class=PPOActorGausPlusBeta2,
    best_model_path='ppo_gaus_plus_beta2_best_model.pth'
)

trainer = Trainer(
    env=env,
    algo=algo,
    seed=SEED,
    num_steps=NUM_STEPS,
    eval_interval=EVAL_INTERVAL,
    is_early_stop=False
)

In [None]:
trainer.train()
trainer.plot()
trainer.visualize()

In [None]:
trainer.load_best_model()
trainer.visualize()

# 参考文献
pytorch_car_caring:
https://github.com/xtma/pytorch_car_caring