In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn

import torchvision
import torchvision.transforms as transforms

import os
import argparse

from models.s4.s4 import S4Block as S4  # Can use full version instead of minimal S4D standalone below
from models.s4.s4d import S4D
from tqdm.auto import tqdm
import copy

from typing import Any, List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import gymnasium as gym
import pybullet_envs  # PyBulletの環境をgymに登録する
from torch.utils.tensorboard import SummaryWriter

# Dropout broke in PyTorch 1.11
if tuple(map(int, torch.__version__.split('.')[:2])) == (1, 11):
    print("WARNING: Dropout is bugged in PyTorch 1.11. Results may be worse.")
    dropout_fn = nn.Dropout
if tuple(map(int, torch.__version__.split('.')[:2])) >= (1, 12):
    dropout_fn = nn.Dropout1d
else:
    dropout_fn = nn.Dropout2d

CUDA extension for structured kernels (Cauchy and Vandermonde multiplication) not found. Install by going to extensions/kernels/ and running `python setup.py install`, for improved speed and memory efficiency. Note that the kernel changed for state-spaces 4.0 and must be recompiled.
Falling back on slow Cauchy and Vandermonde kernel. Install at least one of pykeops or the CUDA extension for better speed and memory efficiency.




  from .autonotebook import tqdm as notebook_tqdm


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
best_acc = 0  # best test accuracy
start_epoch = 0  # start from epoch 0 or last checkpoint epoch

### ハイパラの設定

In [None]:
import json
import datetime

#TODO: このハイパラたちは後で書き換える（学習コード作ってから最後に書いたほうがいい）
lr = 0.001
weight_decay = 0.01
num_workers = 4
batch_size = 8
d_model = 512
d_mlp = 512
prenorm = True
dropout = 0.2
grad_clip = 1000

hyperparameters = {
    "lr": lr,
    "weight_decay": weight_decay,
    "num_workers":  num_workers,
    "batch_size": batch_size,
    "d_model": d_model,
    "d_mlp": d_mlp,
    "prenorm": prenorm,
    "dropout": dropout,
    "grad_clip": grad_clip,
}

# ハイパラの種類が今後増える可能性を踏まえ、ファイル名にversionを記載する(hyparaVxxとなるように)
current_time = datetime.datetime.now()
current_time_str = current_time.strftime("%Y%m%d_%H%M")
with open(f'hyparams/hyparaV1_{current_time_str}.json', 'w') as f:
    json.dump(hyperparameters, f, indent=4)

### 環境のWrapper（カメラに関する）

In [None]:
class GymWrapper_PyBullet(object):
    """
    PyBullet環境のためのラッパー
    """

    metadata = {"render.modes": ["human", "rgb_array"]}
    reward_range = (-np.inf, np.inf)

    # __init__でカメラ位置に関するパラメータ（ cam_dist:カメラ距離，cam_yaw：カメラの水平面での回転，cam_pitch:カメラの縦方向での回転）を受け取り，カメラの位置を調整できるようにします.
    # 　同時に画像の大きさも変更できるようにします
    def __init__(
        self,
        env: gym.Env,
        cam_dist: int = 3,
        cam_yaw: int = 0,
        cam_pitch: int = -30,
        render_width: int = 320,
        render_height: int = 240,
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        env : gym.Env
            gymで提供されている環境のインスタンス．
        cam_dist : int
            カメラの距離．
        cam_yaw : int
            カメラの水平面での回転．
        cam_pitch : int
            カメラの縦方向での回転．
        render_width : int
            観測画像の幅．
        render_height : int
            観測画像の高さ．
        """
        self._env = env

        self._render_width = render_width
        self._render_height = render_height
        self._set_nested_attr(self._env, cam_dist, "_cam_dist")
        self._set_nested_attr(self._env, cam_yaw, "_cam_yaw")
        self._set_nested_attr(self._env, cam_pitch, "_cam_pitch")
        self._set_nested_attr(self._env, render_width, "_render_width")
        self._set_nested_attr(self._env, render_height, "_render_height")

    def _set_nested_attr(self, env: gym.Env, value: int, attr: str) -> None:
        """
        多重継承の属性に再帰的にアクセスして値を変更する．
        カメラの設定に利用．

        Parameters
        ----------
        value : int
            設定したい値．
        attr : str
            変更したい属性の名前．
        """
        if hasattr(env, attr):
            setattr(env, attr, value)
        else:
            self._set_nested_attr(env.env, value, attr)

    def __getattr(self, name: str) -> Any:
        """
        環境が保持している属性値を取得するメソッド．

        Parameters
        ----------
        name : str
            取得したい属性値の名前．

        Returns
        -------
        _env.name : Any
            環境が保持している属性値．
        """
        return getattr(self._env, name)

    @property
    def observation_space(self) -> gym.spaces.Box:
        """
        観測空間に関する情報を取得するメソッド．

        Returns
        -------
        space : gym.spaces.Box
            観測空間に関する情報（各画素値の最小値，各画素値の最大値，観測データの形状， データの型）．
        """
        width = self._render_width
        height = self._render_height
        return gym.spaces.Box(0, 255, (height, width, 3), dtype=np.uint8)

    @property
    def action_space(self) -> gym.spaces.Box:
        """
        行動空間に関する情報を取得するメソッド．

        Returns
        -------
        space : gym.spaces.Box
            行動空間に関する情報（各行動の最小値，各行動の最大値，行動空間の次元， データの型） ．
        """
        return self._env.action_space

    # 　元の観測（低次元の状態）は今回は捨てて，env.render()で取得した画像を観測とします.
    #  画像，報酬，終了シグナルが得られます.
    def step(self, action: np.ndarray) -> (np.ndarray, float, bool, dict):
        """
        環境に行動を与え次の観測，報酬，終了フラグを取得するメソッド．

        Parameters
        ----------
        action : np.dnarray (action_dim, )
            与える行動．

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            行動を与えたときの次の観測．
        reward : float
            行動を与えたときに得られる報酬．
        done : bool
            エピソードが終了したかどうか表すフラグ．
        info : dict
            その他の環境に関する情報．
        """
        _, reward, done, info = self._env.step(action)
        obs = self._env.render(mode="rgb_array") # 今回状態として画像を扱いたいため
        return obs, reward, done, info

    def reset(self) -> np.ndarray:
        """
        環境をリセットするためのメソッド．

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            環境をリセットしたときの初期の観測．
        """
        self._env.reset()
        obs = self._env.render(mode="rgb_array")
        return obs

    def render(self, mode="human", **kwargs) -> np.ndarray:
        """
        観測をレンダリングするためのメソッド．

        Parameters
        ----------
        mode : str
            レンダリング方法に関するオプション． (default='human')

        Returns
        -------
        obs : np.ndarray (height, width, 3)
            観測をレンダリングした結果．
        """
        return self._env.render(mode, **kwargs)

    def close(self) -> None:
        """
        環境を閉じるためのメソッド．
        """
        self._env.close()
        

#### カメラに関するWrapperのテスト

In [None]:
env = gym.make("HalfCheetahBulletEnv-v0")
# カメラのパラメータを与えてカメラの位置と角度，画像の大きさを調整
env = GymWrapper_PyBullet(
    env, cam_dist=2, cam_pitch=0, render_width=64, render_height=64
)

env.reset()
image = env.render(mode="rgb_array")
plt.imshow(image)
plt.show()
env.close()
del env

### 環境のWrapper（行動の連続入力に関する）

In [None]:
class RepeatAction(gym.Wrapper):
    """
    同じ行動を指定された回数自動的に繰り返すラッパー．観測は最後の行動に対応するものになる
    """

    def __init__(self, env: GymWrapper_PyBullet, skip: int = 4) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        env : GymWrapper_PyBullet
            環境のインスタンス．今回は先程定義したラッパーでラップした環境を利用する．
        skip : int
            同じ行動を繰り返す回数．
        """
        gym.Wrapper.__init__(self, env)
        self._skip = skip

    def reset(self) -> np.ndarray:
        """
        環境をリセットするためのメソッド．

        Returns
        -------
        obs : np.ndarray (width, height, 3)
            環境をリセットしたときの初期の観測．
        """
        return self.env.reset()

    def step(self, action: np.ndarray) -> (np.ndarray, float, bool, dict):
        """
        環境に行動を与え次の観測，報酬，終了フラグを取得するメソッド．
        与えられた行動をskipの回数だけ繰り返した結果を返す．

        Parameters
        ----------
        action : np.ndarray (action_dim, )
            与える行動．

        Returns
        -------
        obs : np.ndarray (width, height, 3)
            行動をskipの回数だけ繰り返したあとの観測．
        total_reawrd : float
            行動をskipの回数だけ繰り返したときの報酬和．
        done : bool
            エピソードが終了したかどうか表すフラグ．
        info : dict
            その他の環境に関する情報．
        """
        total_reward = 0.0
        for _ in range(self._skip):
            obs, reward, done, info = self.env.step(action)
            total_reward += reward
            if done:
                break
        return obs, total_reward, done, info

#### Wrapperを通した環境を作る関数

In [None]:
def make_env() -> RepeatAction:
    """
    作成たラッパーをまとめて適用して環境を作成する関数．

    Returns
    -------
    env : RepeatAction
        ラッパーを適用した環境．
    """
    env = gym.make("HalfCheetahBulletEnv-v0")  # 環境を読み込む．今回はHalfCheetah
    # Dreamerでは観測は64x64のRGB画像
    env = GymWrapper_PyBullet(
        env, cam_dist=2, cam_pitch=0, render_width=64, render_height=64
    )
    env = RepeatAction(env, skip=2)  # DreamerではActionRepeatは2
    return env

### Replay Buffer
連続した経験をとってくるのでDQNとは少し違う

In [None]:
# 　今回のReplayBuffer
class ReplayBuffer(object):
    """
    RNNを用いて訓練するのに適したリプレイバッファ．
    """

    def __init__(
        self, capacity: int, observation_shape: List[int], action_dim: int
    ) -> None:
        """
        コンストラクタ．

        Parameters
        ----------
        capacity : int
            リプレイバッファにためておくことができる経験の上限．
        observation_shape : List[int]
            環境から与えられる観測の形状．
        action_dim : int
            行動空間の次元数．
        """
        self.capacity = capacity

        self.observations = np.zeros((capacity, *observation_shape), dtype=np.uint8)
        self.actions = np.zeros((capacity, action_dim), dtype=np.float32)
        self.rewards = np.zeros((capacity, 1), dtype=np.float32)
        self.done = np.zeros((capacity, 1), dtype=bool)
        # self.done = np.zeros((capacity, 1), dtype=np.bool)

        self.index = 0
        self.is_filled = False

    def push(
        self, observation: np.ndarray, action: np.ndarray, reward: float, done: bool
    ) -> None:
        """
        リプレイバッファに経験を追加するメソッド．

        Parameters
        ----------
        observation : np.ndarray (64, 64, 3)
            環境から得られた観測．
        action : np.ndarray (action_dim, )
            エージェントがとった（もしくは経験を貯める際のランダムな）行動．
        reward : float
            観測に対して行動をとったときに得られる報酬．
        done : bool
            エピソードが終了するかどうかのフラグ．
        """
        self.observations[self.index] = observation
        self.actions[self.index] = action
        self.rewards[self.index] = reward
        self.done[self.index] = done

        # indexは巡回し，最も古い経験を上書きする
        if self.index == self.capacity - 1:
            self.is_filled = True
        self.index = (self.index + 1) % self.capacity

    def sample(self, batch_size: int, chunk_length: int) -> Tuple[np.ndarray]:
        """
        経験をリプレイバッファからサンプルします．（ほぼ）一様なサンプルです．
        結果として返ってくるのは観測（画像），行動，報酬，終了シグナルについての(batch_size, chunk_length, 各要素の次元)の配列です．
        各バッチは連続した経験になっています．
        注意: chunk_lengthをあまり大きな値にすると問題が発生する場合があります．

        Parameters
        ----------
        batch_size : int
            バッチサイズ．
        chunk_length : int
            バッチあたりの系列長．


        Returns
        -------
        sampled_observations : np.ndarray (batch size, chunk length, 3, 64, 64)
            バッファからサンプリングされた観測．
        sampled_actions : np.ndarray (batch size, chunk length, action dim)
            バッファからサンプリングされた行動．
        sampled_rewards : np.ndarray (batch size, chunk length, 1)
            バッファからサンプリングされた報酬．
        sampled_done : np.ndarray (batch size, chunk length, 1)
            バッファからサンプリングされたエピソードの終了フラグ．
        """
        episode_borders = np.where(self.done)[0]
        sampled_indexes = []
        for _ in range(batch_size):
            cross_border = True
            while cross_border:
                initial_index = np.random.randint(len(self) - chunk_length + 1)
                final_index = initial_index + chunk_length - 1
                cross_border = np.logical_and(
                    initial_index <= episode_borders, episode_borders < final_index
                ).any()  # 論理積
            sampled_indexes += list(range(initial_index, final_index + 1))

        sampled_observations = self.observations[sampled_indexes].reshape(
            batch_size, chunk_length, *self.observations.shape[1:]
        )
        sampled_actions = self.actions[sampled_indexes].reshape(
            batch_size, chunk_length, self.actions.shape[1]
        )
        sampled_rewards = self.rewards[sampled_indexes].reshape(
            batch_size, chunk_length, 1
        )
        sampled_done = self.done[sampled_indexes].reshape(batch_size, chunk_length, 1)
        return sampled_observations, sampled_actions, sampled_rewards, sampled_done

    def __len__(self) -> int:
        """
        バッファに貯められている経験の数を返すメソッド．

        Returns
        -------
        length : int
            バッファに貯められている経験の数．
        """
        return self.capacity if self.is_filled else self.index

#### 観測の前処理を行う関数
ラッパーとして最初から適用してしまわないのは，リプレイバッファにはより容量の小さなnp．uint8の形式で保存しておきたいためです．

In [None]:
def preprocess_obs(obs: np.ndarray) -> np.ndarray:
    """
    画像を正規化する．[0, 255] -> [-0.5, 0.5]．

    Parameters
    ----------
    obs : np.ndarray (64, 64, 3) or (chank length, batch size, 64, 64, 3)
        環境から得られた観測．画素値は[0, 255]．

    Returns
    -------
    normalized_obs : np.ndarray (64, 64, 3) or (chank length, batch size, 64, 64, 3)
        画素値を[-0.5, 0.5]で正規化した観測．
    """
    obs = obs.astype(np.float32)
    normalized_obs = obs / 255.0 - 0.5
    return normalized_obs

#### λ-returnを計算する関数

In [None]:
def lambda_target(
    rewards: torch.Tensor, values: torch.Tensor, gamma: float, lambda_: float
) -> torch.Tensor:
    """
    価値関数の学習のためのλ-returnを計算する関数．

    Parameters
    ----------
    rewards : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        報酬モデルによる報酬の推定値．
    values : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        価値関数を近似するValueモデルによる状態価値観数の推定値．
    gamma : float
        割引率．
    lambda_ : float
        λ-returnのパラメータλ．

    V_lambda : torch.Tensor (imagination_horizon, batch size * (chank length - 1))
        各状態に対するλ-returnの値．
    """
    V_lambda = torch.zeros_like(rewards, device=rewards.device)

    H = rewards.shape[0] - 1
    V_n = torch.zeros_like(rewards, device=rewards.device)
    V_n[H] = values[H]
    for n in range(1, H + 1):
        # まずn-step returnを計算します
        # 注意: 系列が途中で終わってしまったら，可能な中で最大のnを用いたn-stepを使います
        V_n[:-n] = (gamma**n) * values[n:]
        for k in range(1, n + 1):
            if k == n:
                V_n[:-n] += (gamma ** (n - 1)) * rewards[k:]
            else:
                V_n[:-n] += (gamma ** (k - 1)) * rewards[k : -n + k]

        # lambda_でn-step returnを重みづけてλ-returnを計算します
        if n == H:
            V_lambda += (lambda_ ** (H - 1)) * V_n
        else:
            V_lambda += (1 - lambda_) * (lambda_ ** (n - 1)) * V_n

    return V_lambda

## ここからはモデルの実装編

### S4Block (p26 Figure21参照)

In [5]:
class S4Block(nn.Module):

    def __init__(
        self,
        d_model=256,
        d_mlp = 512,
        n_layers=2,
        dropout=0.2,
        prenorm=True,
    ):
        super(S4Block, self).__init__()
        
        assert d_model % 2 == 0, "For GLU, d_model must be even!!"
        self.prenorm = prenorm

        # Stack S4 layers as residual blocks
        self.s4_layers = nn.ModuleList()
        self.norms = nn.ModuleList()
        self.dropouts1 = nn.ModuleList()
        self.linears = nn.ModuleList()
        self.glus = nn.ModuleList()
        self.dropouts2= nn.ModuleList()
        for _ in range(n_layers):
            self.norms.append(nn.LayerNorm(d_model))
            self.s4_layers.append(
                S4D(d_model, dropout=dropout, transposed=True, lr=min(0.001, lr))
            )
            self.dropouts1.append(dropout_fn(dropout))
            self.linears.append(nn.Linear(d_model, 2*d_model))
            self.glus.append(nn.GLU())
            self.dropouts2.append(dropout_fn(dropout))

        self.norm_mlp = nn.ModuleList([
            nn.LayerNorm(d_model),
            nn.Linear(d_model, d_mlp),
            nn.GELU(),
            dropout_fn(dropout),
            nn.Linear(d_model, d_mlp),
            dropout_fn(dropout)])

    def forward(self, x):
        """
        Input x is shape (B, L, d_model), L is the length of continuous observations, B is the batch size
        """
        x = x.transpose(-1, -2)  # (B, L, d_model) -> (B, d_model, L)
        for s4, norm, dropout1, linear, glu, dropout2 in \
            zip(self.s4_layers, self.norms, self.dropouts1, self.linears, self.glus, self.dropouts2):
            # Each iteration of this loop will map (B, d_model, L) -> (B, d_model, L)

            z = x
            if self.prenorm:
                # Prenorm
                z = norm(z.transpose(-1, -2)).transpose(-1, -2)

            # Apply S4 block: we ignore the state input and output
            z, _ = s4(z)

            # Dropout on the output of the S4 block
            z = dropout1(z)

            # Mixing informations
            z = linear(z.transpose(-1, -2)).transpose(-1, -2) # (B, L, d_model) -> (B, L, 2*d_model)
            z = glu(z) # (B, L, 2*d_model) -> (B, L, d_model)
            z = dropout2(z)

            # Residual connection
            x = z + x

            if not self.prenorm:
                # Postnorm
                x = norm(x.transpose(-1, -2)).transpose(-1, -2)

        x = x.transpose(-1, -2)  # (B, d_model, L) -> (B, L, d_model)

        #TODO: x_にも操作が反映されてたりしないか確認する. residual connectionのため
        x_ = x
        x = x_ + self.norm_mlp(x)

        return x
    

### HistoryEncoder
HistoryEncoderはPriorに当たる<br>
(次元を表す変数が"*\_dim"だったり"d\_\*"だったりして紛らわしかもしれません)

In [None]:
class HistoryEncoder(nn.Module):

    def __init__(
        self,

        z_dim, # z_dim=1024にする予定. Encoderからの出力zの次元と合わせる必要がある
        action_dim,
        gMLP_dim=512,

        history_dim=256,
        S4_mlp_dim=512,
        S4_n_layers=2,
        dropout=0.2,
        prenorm=True,

        zMLP_dim=512,
    ):
        super(HistoryEncoder, self).__init__()
        self.gMLP = nn.Sequential([
            nn.Linear(z_dim + action_dim, gMLP_dim),
            nn.ReLU(),
            nn.Linear(gMLP_dim, history_dim)
        ])

        self.S4 = S4Block(d_model=history_dim, d_mlp=S4_mlp_dim, n_layers=S4_n_layers, dropout=dropout, prenorm=prenorm)
        
        self.zMLP = nn.Sequential([
            nn.Linear(history_dim, zMLP_dim),
            nn.ReLU(),
            nn.Linear(zMLP_dim, z_dim)
        ])
    
    def forward(self, z, action):
        """
        Parameters
        ----------
        z : torch.Tensor (batch_size, L, z_dim)
            環境から得られた観測画像の潜在表現. この時点ではone-hot vectorである

        action : torch.Tensor (batch_size, L, action_dim)
        
        Returns
        ----------
        h : torch.Tensor (batch_size, L, 1024)
            観測画像を埋め込み、カテゴリカル分布からサンプルしたもの(この時点ではone-hot vector)
            勾配を通してあるのはreward lossからの勾配を計算するため
        
        z : torch.Tensor (batch_size, L, z_dim)
            次の環境の観測画像の潜在表現. この時点ではone-hot vectorである
        
        dist : torch.distribution
            zの分布. ELBOのKL-divergenceを計算するために必要
        """
        g = self.gMLP(torch.cat([z, action], dim=-1))
        h = self.S4(g)
        logit = self.zMLP(h)
        dist = torch.distributions.OneHotCategorical(logits=logit)        
        stoch = dist.sample()
        stoch += dist.probs - dist.probs.detach() # using "straight-through gradients"
        z = torch.flatten(stoch, start_dim=-2, end_dim=-1)

        return h, z, dist
    

### Encoder, Decoder
EncoderはPosteriorに当たる<br>
(Decodeで画像にする必要はあるか？評価する上では画像にする必要はありそうだけど実際にモデルとしては軽いほうがいい)

In [None]:
class Encoder(nn.Module):
    """
    (input_dim, 64, 64)の画像を(1024,)のベクトルに変換する
    """
    
    def __init__(
        self,
        input_dim=3, # grayscaleなら1
        category_size=32,
        class_size=32,
    ):
        super(Encoder, self).__init__()
        self.input_dim = input_dim
        self.cv1 = nn.Conv2d(input_dim, 32, kernel_size=4, stride=2) # (input_dim, 64, 64) -> (32, 31, 31)
        self.cv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2) # (32, 31, 31) -> (64, 14, 14)
        self.cv3 = nn.Conv2d(64, 128, kernel_size=4, stride=2) # (64, 14, 14) -> (128, 6, 6)
        self.cv4 = nn.Conv2d(128, 256, kernel_size=4, stride=2) # (128, 6, 6) -> (256, 2, 2)
        self.category_size = category_size
        self.class_size = class_size

    def forward(self, obs):
        """
        Parameters
        ----------
        obs : torch.Tensor (batch_size, L, input_dim, 64, 64), Lは連続した観測画像の系列長
            環境から得られた観測画像
        
        Returns
        ----------
        z : torch.Tensor (batch_size, L, 1024)
            観測画像を埋め込み、カテゴリカル分布からサンプルしたもの(この時点ではone-hot vector)
            勾配を通してあるのはreward lossからの勾配を計算するため
        
        dist: torch.distribution
            zの分布. ELBOのKL-divergenceを計算するために必要
        """
        hidden = F.silu(self.cv1(obs))
        hidden = F.silu(self.cv2(hidden))
        hidden = F.silu(self.cv3(hidden))
        logit = F.silu(self.cv4(hidden)).reshape(*hidden.shape[:-3], self.category_size, self.class_size) # (batch_size, L, 256, 2, 2) -> (batch_size, L, 32, 32)
        dist = torch.distributions.OneHotCategorical(logits=logit)        
        stoch = dist.sample()
        stoch += dist.probs - dist.probs.detach() # using "straight-through gradients"
        z = torch.flatten(stoch, start_dim=-2, end_dim=-1)
        
        return z, dist



class Decoder(nn.Module):
    """
    (1024,)のベクトルを(input_dim, 64, 64)の画像に変換する
    """
    
    def __init__(
        self,
        output_dim=3, # grayscaleなら1
        z_dim=1024,
        history_dim=1024,
    ):
        super(Decoder, self).__init__()
        self.output_dim = output_dim
        self.fc = nn.Linear(z_dim + history_dim, 1024)
        self.cv1 = nn.ConvTranspose2d(1024, 128, kernel_size=5, stride=2) # (1024, 1, 1) -> (128, 5, 5)
        self.cv2 = nn.ConvTranspose2d(128, 64, kernel_size=5, stride=2) # (128, 5, 5) -> (64, 13, 13)
        self.cv3 = nn.ConvTranspose2d(64, 32, kernel_size=6, stride=2) # (64, 13, 13) -> (32, 30, 30)
        self.cv4 = nn.ConvTranspose2d(32, output_dim, kernel_size=6, stride=2) # (32, 30, 30) -> (input_dim, 64, 64)

    def forward(self, h, z):
        """
        Parameters
        ----------
        h: torch.Tensor (batch_size, L, history_dim)
            これまでの履歴(S4Blockからの出力)

        z : torch.Tensor (batch_size, L, z_dim)
            次の観測の潜在表現
        
        Returns
        ----------
        obs : torch.Tensor (batch_size, L, output_dim, 64, 64)
            次の観測画像
        """
        hidden = self.fc(torch.cat([z, h], dim=-1))
        hidden = hidden.view(*hidden.shape[:-1],1024, 1, 1)
        hidden = F.silu(self.cv1(hidden))
        hidden = F.silu(self.cv2(hidden))
        hidden = F.silu(self.cv3(hidden))
        obs = self.cv4(hidden)
        
        return obs


### RewardModel
報酬モデル. 1層のMLP

In [None]:
class RewardModel(nn.Module):

    def __init__(
        self,
        history_dim,
        z_dim,
        mlp_dim=512,
    ):
        super(RewardModel, self).__init__()
        self.fc1 = nn.Sequential([
            nn.Linear(history_dim + z_dim, mlp_dim),
            nn.ReLU(),
            nn.Linear(mlp_dim, 1)
        ])
    
    def forward(self, h, z):
        """
        Parameters
        ----------
        h: torch.Tensor (batch_size, L, history_dim)
            これまでの履歴

        z : torch.Tensor (batch_size, L, z_dim)
            次の観測の潜在表現
        
        Returns
        ----------
        reward : torch.Tensor (batch_size, L, 1)
            報酬の予測値
        """
        reward = self.fc1(torch.cat([h, z], dim=-1))
        
        return reward
    

### PolicyModel
まだ実装しない（世界モデルのみをテストしてから）

### ValueModel
まだ実装しない（世界モデルのみをテストしてから）

## 学習の実装編

### ハイパーパラメータの設定と学習の準備

ハイパーパラメータを設定し，モデルやリプレイバッファを宣言して学習の準備を整えます．

In [None]:
env = make_env()

# リプレイバッファの宣言
buffer_capacity = 200000  # Colabのメモリの都合上，元の実装より小さめにとっています
replay_buffer = ReplayBuffer(
    capacity=buffer_capacity,
    observation_shape=env.observation_space.shape,
    action_dim=env.action_space.shape[0],
)

# モデルの宣言

# ハイパラ
# encoder用
grayscale = True
category_size = 16
class_size = 16
# decoder用
z_dim = category_size * class_size
history_dim = 256 # history_dimの分だけS4-layerはコピーされるので、あまり大きすぎると計算量が大変かも
# rewardModel用
rewardMLP_dim = 256
# historyEncoder用
gMLP_dim = 512
S4_mlp_dim = 512
zMLP_dim = 512
dropout = 0.2

encoder = Encoder(
    input_dim=(1 if grayscale else 3),
    category_size=category_size,
    class_size=class_size
).to(device)

decoder = Decoder(
    output_dim=(1 if grayscale else 3),
    z_dim=z_dim,
    history_dim=history_dim
).to(device) # z_dimはcategory_size * class_sizeとなる必要がある

rewardModel = RewardModel(
    history_dim=history_dim,
    z_dim=z_dim,
    mlp_dim=rewardMLP_dim
)

historyEncoder = HistoryEncoder(
    z_dim=z_dim,
    action_dim=env.action_space.shape[0],
    gMLP_dim=gMLP_dim,
    history_dim=history_dim,
    S4_mlp_dim=S4_mlp_dim,
    S4_n_layers=2,
    dropout=dropout,
    prenorm=True,
    zMLP_dim=zMLP_dim
)

#=======================ここまで=========================

# オプティマイザの宣言
model_lr = 6e-4  # encoder, rssm, obs_model, reward_modelの学習率
value_lr = 8e-5
action_lr = 8e-5
eps = 1e-4
model_params = (
    list(encoder.parameters())
    + list(rssm.transition.parameters())
    + list(rssm.observation.parameters())
    + list(rssm.reward.parameters())
)
model_optimizer = torch.optim.Adam(model_params, lr=model_lr, eps=eps)
value_optimizer = torch.optim.Adam(value_model.parameters(), lr=value_lr, eps=eps)
action_optimizer = torch.optim.Adam(action_model.parameters(), lr=action_lr, eps=eps)

# その他ハイパーパラメータ
seed_episodes = 5  # 最初にランダム行動で探索するエピソード数
all_episodes = 100  # 学習全体のエピソード数（300ほどで，ある程度収束します）
test_interval = 10  # 何エピソードごとに探索ノイズなしのテストを行うか
model_save_interval = 20  # NNの重みを何エピソードごとに保存するか
collect_interval = 100  # 何回のNNの更新ごとに経験を集めるか（＝1エピソード経験を集めるごとに何回更新するか）

action_noise_var = 0.3  # 探索ノイズの強さ

batch_size = 50
chunk_length = 50  # 1回の更新で用いる系列の長さ
imagination_horizon = 15  # Actor-Criticの更新のために，Dreamerで何ステップ先までの想像上の軌道を生成するか


gamma = 0.9  # 割引率
lambda_ = 0.95  # λ-returnのパラメータ
clip_grad_norm = 100  # gradient clippingの値
free_nats = 3  # KL誤差（RSSMのTransitionModelにおけるpriorとposteriorの間の誤差）がこの値以下の場合，無視する

In [None]:
log_dir = "logs"
writer = SummaryWriter(log_dir)
# %tensorboard --logdir='./logs'

### 学習の実装（世界モデルのみテストする）

### 学習の実装（方策モデルも含めてタスクを解く学習）