# 自作のTD3ノートブック

In [1]:
import copy
from dataclasses import dataclass, asdict, is_dataclass, field
import sys
import logging

import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import gymnasium as gym

from myActivator import tanhAndScale
from myReplayBuffer import ReplayBuffer

In [2]:
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s [%(levelname)s] %(message)s",
                    stream=sys.stdout, datefmt="%H:%M:%S")

In [3]:
env = gym.make("Pendulum-v1")
for key in vars(env.spec):
    logging.info('%s: %s', key, vars(env.spec)[key])
for key in vars(env.unwrapped):
    logging.info('%s: %s', key, vars(env.unwrapped)[key])

13:21:00 [INFO] id: Pendulum-v1
13:21:00 [INFO] entry_point: gymnasium.envs.classic_control.pendulum:PendulumEnv
13:21:00 [INFO] reward_threshold: None
13:21:00 [INFO] nondeterministic: False
13:21:00 [INFO] max_episode_steps: 200
13:21:00 [INFO] order_enforce: True
13:21:00 [INFO] disable_env_checker: False
13:21:00 [INFO] kwargs: {}
13:21:00 [INFO] additional_wrappers: ()
13:21:00 [INFO] vector_entry_point: None
13:21:00 [INFO] namespace: None
13:21:00 [INFO] name: Pendulum
13:21:00 [INFO] version: 1
13:21:00 [INFO] max_speed: 8
13:21:00 [INFO] max_torque: 2.0
13:21:00 [INFO] dt: 0.05
13:21:00 [INFO] g: 10.0
13:21:00 [INFO] m: 1.0
13:21:00 [INFO] l: 1.0
13:21:00 [INFO] render_mode: None
13:21:00 [INFO] screen_dim: 500
13:21:00 [INFO] screen: None
13:21:00 [INFO] clock: None
13:21:00 [INFO] isopen: True
13:21:00 [INFO] action_space: Box(-2.0, 2.0, (1,), float32)
13:21:00 [INFO] observation_space: Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
13:21:00 [INFO] spec: EnvSpec(id='Pendulum-

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
@dataclass
class Config:
    # =========================
    # ニューラルネットの設定
    # =========================
    Q_net_sizes: list[int] = field(default_factory=lambda: [64, 64])
    P_net_sizes: list[int] = field(default_factory=lambda: [64, 64])
    Q_net_in: int = 4  # obs(3) + action(1)
    P_net_in: int = 3  # obs(3)
    Q_net_out: int = 1
    P_net_out: int = 1

    # =========================
    # 環境の制約
    # =========================
    # Pendulum-v1の仕様通り
    u_ulim: float = 2.0
    u_llim: float = -2.0

    # =========================
    # 学習に関するパラメータ
    # =========================
    # Pendulumは状態空間が狭いので、学習率は少し高めの 1e-3 (0.001) でも
    # 早く収束することが多いです。不安定なら 3e-4 に下げてください。
    Q_lr1: float = 1e-3
    Q_lr2: float = 1e-3
    P_lr: float = 1e-3
    
    gamma: float = 0.99
    
    sig: float = 0.1
    
    tau: float = 5e-3

In [6]:
class TD3Agent:
    def __init__(self,Config,device=None):
        if Config:
            self.Config = Config
        else:
            raise ValueError("No Config!!")
        
        # ---- device 決定（CUDAがあればCUDA、指示が無ければ）----
        if device is None:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        else:
            self.device = torch.device(device)

        # ---- action bounds（deviceを揃える為 Tensor 化）----
        self.u_low = torch.as_tensor(Config.u_llim, dtype=torch.float32, device=self.device)
        self.u_high = torch.as_tensor(Config.u_ulim, dtype=torch.float32, device=self.device)

        self.Q_net1 = self.build_net(
            Config.Q_net_in,
            Config.Q_net_sizes,
            Config.Q_net_out,
        ).to(self.device)
        self.Q_net1.train()

        self.Q_net2 = self.build_net(
            Config.Q_net_in,
            Config.Q_net_sizes,
            Config.Q_net_out,
        ).to(self.device)
        self.Q_net2.train()

        self.P_net = self.build_net(
            Config.P_net_in,
            Config.P_net_sizes,
            Config.P_net_out,
            tanhAndScale(a_high=self.u_high,a_low=self.u_low),
        ).to(self.device)
        self.P_net.train()

        # ---- Target nets ----
        self.Q_target_net1 = copy.deepcopy(self.Q_net1).to(self.device)
        self.Q_target_net2 = copy.deepcopy(self.Q_net2).to(self.device)
        self.P_target_net = copy.deepcopy(self.P_net).to(self.device)
        self.Q_target_net1.eval()
        self.Q_target_net2.eval()
        self.P_target_net.eval()

        self.Q_optim1 = optim.Adam(self.Q_net1.parameters(), lr=Config.Q_lr1)
        self.Q_optim2 = optim.Adam(self.Q_net2.parameters(), lr=Config.Q_lr2)
        self.P_optim = optim.Adam(self.P_net.parameters(), lr=Config.P_lr)

        # Actorの学習頻度を設定するための変数（これを特定の数で割った余りが0の時だけactor更新にする）
        self.actor_update_counter = 0
        self.policy_update_freq = 2  # TD3の論文では2に設定されていることが多い

        # TDターゲットの計算の際に使用するノイズのクリッピング範囲
        self.target_policy_noise_clip = 0.5  # TD3論文では0.5に設定されていることが多い

    def to(self, device):
        """エージェントの内部のネットと必要Tensorを全部指定 device に移す"""
        self.device = torch.device(device)
        self.Q_net1.to(self.device)
        self.Q_net2.to(self.device)
        self.P_net.to(self.device)
        self.Q_target_net1.to(self.device)
        self.Q_target_net2.to(self.device)
        self.P_target_net.to(self.device)
        self.u_low = self.u_low.to(self.device)
        self.u_high = self.u_high.to(self.device)
        return self

    def build_net(self, input_size, hidden_sizes, output_size=1, output_activator=None):
        layers = []
        for input_size, output_size in zip([input_size]+hidden_sizes, hidden_sizes+[output_size]):
            layers.append(nn.Linear(input_size, output_size))
            layers.append(nn.ReLU())
        layers = layers[:-1]  # 最後のReLUだけ取り除く
        if output_activator:
            layers.append(output_activator)
        net = nn.Sequential(*layers)
        return net
    
    @torch.no_grad()
    def step(self, observation):
        """ノイズ無し（評価用）。環境に渡す行動を返す。"""
        obs_t = torch.as_tensor(observation, dtype=torch.float32, device=self.device)
        if obs_t.dim() == 1:
            obs_t = obs_t.unsqueeze(0)
        
        action = self.P_net(obs_t)
        action = torch.clamp(action, self.u_low, self.u_high)
        return action.squeeze(0).cpu().numpy()
    
    @torch.no_grad()
    def step_with_noise(self, observation):
        # observationをTensorにし、deviceに送る
        obs = torch.as_tensor(observation, dtype=torch.float32, device=self.device)
        if obs.dim() == 1:
            obs = obs.unsqueeze(0)
        
        # 決定論的行動決定
        action = self.P_net(obs)  # shape: (1, act_dim)

        # ε ~ N(0, σ^2 I) を生成して加算（探索ノイズを加える）
        eps = float(self.Config.sig) * torch.randn_like(action)
        action = action + eps

        # 出力製薬 [u_low, u_high] 以内に収める
        action = torch.clamp(action, self.u_low, self.u_high)

        # 環境に渡すならバッチ次元を落として渡す
        return action.squeeze(0).cpu().numpy()
    
    def save_all(self, path: str, extra: dict | None = None):
        """
        Actor/Critic + target nets をまとめて保存（最終モデル用）。
        """
        cfg = asdict(self.Config) if is_dataclass(self.Config) else self.Config

        ckpt = {
            "config": cfg,
            "P_net": self.P_net.state_dict(),
            "Q_net1": self.Q_net1.state_dict(),
            "Q_net2": self.Q_net2.state_dict(),
            "P_target_net": self.P_target_net.state_dict(),
            "Q_target_net1": self.Q_target_net1.state_dict(),
            "Q_target_net2": self.Q_target_net2.state_dict(),
        }
        if extra is not None:
            ckpt["extra"] = extra

        torch.save(ckpt, path)


    def load_all(self, path: str, map_location=None):
        """
        save_all() で保存したチェックポイントをロード。

        PyTorch 2.6 以降:
        torch.load() のデフォルトが weights_only=True になったため、
        config/extra を含むチェックポイントはそのままだと UnpicklingError になり得る。
        その回避として「信頼できるチェックポイントに限り」 weights_only=False を明示する。

        ※ map_location は "cpu" や device を指定可。
        """
        # PyTorch 2.6+ では weights_only 引数がある
        try:
            ckpt = torch.load(path, map_location=map_location, weights_only=False)
        except TypeError:
            # 古い PyTorch（weights_only 引数が無い）向け
            ckpt = torch.load(path, map_location=map_location)

        self.P_net.load_state_dict(ckpt["P_net"])
        self.Q_net1.load_state_dict(ckpt["Q_net1"])
        self.Q_net2.load_state_dict(ckpt["Q_net2"])
        self.P_target_net.load_state_dict(ckpt["P_target_net"])
        self.Q_target_net1.load_state_dict(ckpt["Q_target_net1"])
        self.Q_target_net2.load_state_dict(ckpt["Q_target_net2"])

        return ckpt.get("extra", None)
    
    def mode2eval(self):
        self.P_net.eval()
        self.Q_net1.eval()
        self.Q_net2.eval()

    def mode2train(self):
        self.P_net.train()
        self.Q_net1.train()
        self.Q_net2.train()

    @torch.no_grad()
    def soft_update(self, target_net, online_net, tau):
        """
        Polyak averaging:
          θ' ← (1-τ) θ' + τ θ
        """
        for target_param, online_param in zip(target_net.parameters(), online_net.parameters()):
            target_param.mul_(1.0-tau).add_(tau*online_param)

    def update_net(self, states, actions, rewards, states_next, dones=None):
        """
        1回の更新（Critic→Actor→Target soft update）
        戻り値： (q_loss, p_loss) のスカラー
        """
        # ---- minibatch を device 上 Tensor に統一 ----
        states = torch.as_tensor(states, dtype=torch.float32, device=self.device)
        actions = torch.as_tensor(actions, dtype=torch.float32, device=self.device)
        rewards = torch.as_tensor(rewards, dtype=torch.float32, device=self.device)
        states_next = torch.as_tensor(states_next, dtype=torch.float32, device=self.device)

        if rewards.dim() == 1:
            rewards = rewards.unsqueeze(1)
        
        if dones is None:
            dones = torch.zeros((states.shape[0],1), dtype=torch.float32, device=self.device)
        else:
            dones = torch.as_tensor(dones, dtype=torch.float32, device=self.device)
            if dones.dim() == 1:
                dones = dones.unsqueeze(1)
        
        with torch.no_grad():
            actions_next_for_target = self.P_target_net(states_next)
            actions_next_for_target += torch.clamp(
                torch.randn_like(actions_next_for_target) * self.Config.sig,
                -self.target_policy_noise_clip,
                self.target_policy_noise_clip,
            )
            Q_target1 = self.Q_target_net1(torch.cat([states_next,actions_next_for_target],dim=1))
            Q_target2 = self.Q_target_net2(torch.cat([states_next,actions_next_for_target],dim=1))
            y_targets = rewards + self.Config.gamma*(1-dones)*torch.min(Q_target1,Q_target2)

        Q_values1 = self.Q_net1(torch.cat([states,actions],dim=1))
        Q_values2 = self.Q_net2(torch.cat([states,actions],dim=1))
        Q_loss1 = F.mse_loss(y_targets,Q_values1)
        Q_loss2 = F.mse_loss(y_targets,Q_values2)
        self.Q_optim1.zero_grad()
        self.Q_optim2.zero_grad()
        Q_loss1.backward()
        Q_loss2.backward()
        self.Q_optim1.step()
        self.Q_optim2.step()

        # ---- Actor update ----
        # Actor 更新は一定頻度でのみ行う
        if self.actor_update_counter % self.policy_update_freq == 0:
            # Actor 更新では Q_net を通すが、Q_net自体は更新しないので凍結
            for p in self.Q_net1.parameters():
                p.requires_grad_(False)

            actions_for_Ploss = self.P_net(states)
            P_loss = -self.Q_net1(torch.cat([states,actions_for_Ploss],dim=1)).mean()
            self.P_optim.zero_grad()
            P_loss.backward()
            self.P_optim.step()

            for p in self.Q_net1.parameters():
                p.requires_grad_(True)

            # ---- Target net update ----
            self.soft_update(
                target_net=self.Q_target_net1,
                online_net=self.Q_net1,
                tau=self.Config.tau,
            )
            self.soft_update(
                target_net=self.Q_target_net2,
                online_net=self.Q_net2,
                tau=self.Config.tau,
            )
            self.soft_update(
                target_net=self.P_target_net,
                online_net=self.P_net,
                tau=self.Config.tau
            )
        else:
            P_loss = torch.tensor(0.0)  # Actor更新しなかった場合は0を返すようにしておく

        self.actor_update_counter += 1

        return float(Q_loss1.item()), float(Q_loss2.item()), float(P_loss.item())

In [7]:
def train(
    env,
    agent,
    buffer,
    total_step=40000,
    warmup_steps=1000,
    batch_num=512,
):
    print("cuda available:", torch.cuda.is_available())
    print("agent device:", agent.device)
    print("P_net device:", next(agent.P_net.parameters()).device)
    print("Q_net1 device:", next(agent.Q_net1.parameters()).device)
    print("Q_net2 device:", next(agent.Q_net2.parameters()).device)

    # ログ保存用リスト
    Q_loss1_history = []
    Q_loss2_history = []
    P_loss_history = []
    reward_history = []
    
    episode_num = 1
    reward_log = 0

    # 1) 環境を初期化して最初の観測を得る
    obs, info = env.reset()

    # 2) 環境ステップを total_step 回す
    for t in range(total_step):

        # logging.info("train step %d start", t)

        # 行動選択：warmupまではランダム行動
        if len(buffer) < warmup_steps:
            action = env.action_space.sample()
            # logging.info("warmup now")
        else:
            # TD3の方策；ノイズ入りの行動
            action = agent.step_with_noise(obs)
            # logging.info("training")

        # 3) 環境を1ステップ進める
        obs_next, reward, terminated, truncated, info = env.step(action)
        reward_log += reward

        # 4) “エピソード終了”判定（reset のため）
        done = float(terminated)

        # ---- (B) バッファに格納：学習ターゲット用の done は方針に注意 ----
        # 方針1（簡単）：done をそのまま入れる（truncatedでもブートストラップ停止）
        buffer.add(obs, action, reward, obs_next, done)

        # 方針2（理屈に忠実）：terminated を入れる（truncatedはブートストラップ継続）
        # buffer.add(obs, action, reward, obs_next, terminated)

        # 5) 次の観測へ更新（done なら reset）
        # doneはterminatedにしたので、ここではterminatedとtruncatedのorを使う
        if terminated or truncated:
            logging.info('train episode %d: reward = %.2f',
                            episode_num, reward_log)
            episode_num += 1
            obs, info = env.reset()
            reward_history.append(reward_log)
            reward_log = 0
        else:
            obs = obs_next

        # 6) バッファが十分でなければ学習をスキップ
        if len(buffer) < warmup_steps:
            continue

        # 7) ミニバッチを取り出して更新
        minibatch = buffer.sample(batch_num)
        states      = minibatch["obs"]
        actions     = minibatch["act"]
        rewards     = minibatch["rew"]
        states_next = minibatch["obs_next"]
        dones       = minibatch["done"]

        Q_loss1, Q_loss2, P_loss = agent.update_net(states, actions, rewards, states_next, dones)

        # 8) ログが欲しいならここで（毎step item() は遅くなるので間引くのが定石）
        if t % 100 == 0:
            Q_loss1_history.append(Q_loss1)
            Q_loss2_history.append(Q_loss2)
            P_loss_history.append(P_loss)

    return Q_loss1_history, Q_loss2_history, P_loss_history, reward_history

In [8]:
agent = TD3Agent(Config=Config(),device=device)
TD3ReplayBuffer = ReplayBuffer(obs_dim=3,act_dim=1,size=100000,device=device)
total_step = 350000
warmup_steps = 50000
batch_num = 2048

Qh1, Qh2,  Ph, rh = train(
    env=env,
    agent=agent,
    buffer=TD3ReplayBuffer,
    total_step=total_step,
    warmup_steps=warmup_steps,
    batch_num=batch_num
)

cuda available: True
agent device: cuda
P_net device: cuda:0
Q_net1 device: cuda:0
Q_net2 device: cuda:0
13:21:02 [INFO] train episode 1: reward = -901.91
13:21:02 [INFO] train episode 2: reward = -1296.12
13:21:02 [INFO] train episode 3: reward = -1172.58
13:21:02 [INFO] train episode 4: reward = -972.56
13:21:02 [INFO] train episode 5: reward = -1164.39
13:21:02 [INFO] train episode 6: reward = -915.60
13:21:02 [INFO] train episode 7: reward = -1245.62
13:21:02 [INFO] train episode 8: reward = -1170.60
13:21:02 [INFO] train episode 9: reward = -1707.38
13:21:02 [INFO] train episode 10: reward = -1068.00
13:21:02 [INFO] train episode 11: reward = -1066.14
13:21:02 [INFO] train episode 12: reward = -1603.65
13:21:02 [INFO] train episode 13: reward = -1269.07
13:21:02 [INFO] train episode 14: reward = -1437.97
13:21:02 [INFO] train episode 15: reward = -964.31
13:21:02 [INFO] train episode 16: reward = -1666.95
13:21:02 [INFO] train episode 17: reward = -1210.33
13:21:02 [INFO] train ep

Consider using tensor.detach() first. (Triggered internally at /pytorch/aten/src/ATen/native/Scalar.cpp:22.)
  return float(Q_loss1.item()), float(Q_loss2.item()), float(P_loss.item())


13:21:10 [INFO] train episode 251: reward = -1517.27
13:21:12 [INFO] train episode 252: reward = -1458.95
13:21:14 [INFO] train episode 253: reward = -1568.14
13:21:17 [INFO] train episode 254: reward = -1563.91
13:21:19 [INFO] train episode 255: reward = -1497.67
13:21:21 [INFO] train episode 256: reward = -1781.85
13:21:23 [INFO] train episode 257: reward = -1609.88
13:21:23 [INFO] train episode 258: reward = -1595.95
13:21:26 [INFO] train episode 259: reward = -1386.77
13:21:28 [INFO] train episode 260: reward = -1542.47
13:21:30 [INFO] train episode 261: reward = -1298.50
13:21:32 [INFO] train episode 262: reward = -1372.44
13:21:34 [INFO] train episode 263: reward = -1393.17
13:21:36 [INFO] train episode 264: reward = -1145.64
13:21:39 [INFO] train episode 265: reward = -1088.36
13:21:41 [INFO] train episode 266: reward = -1516.83
13:21:43 [INFO] train episode 267: reward = -1060.28
13:21:46 [INFO] train episode 268: reward = -1120.55
13:21:48 [INFO] train episode 269: reward = -9

In [9]:
env.close()

In [10]:
from pathlib import Path
from datetime import datetime

def make_unique_path(path: str | Path) -> Path:
    """
    path が既に存在する場合、末尾に _1, _2, ... を付けて未使用のパスを返す。
    例: ddpg_final_20251221_235959.pth -> ddpg_final_20251221_235959_1.pth -> ...
    """
    p = Path(path)

    # 存在しないならそのまま使う
    if not p.exists():
        return p

    parent = p.parent
    stem = p.stem      # 拡張子抜きファイル名
    suffix = p.suffix  # ".pth"

    i = 1
    while True:
        cand = parent / f"{stem}_{i}{suffix}"
        if not cand.exists():
            return cand
        i += 1


# 推論用に eval モードにしておく（保存自体は train のままでも可）
agent.mode2eval()

stamp = datetime.now().strftime("%Y%m%d_%H%M%S")

models_dir = Path("./models")
models_dir.mkdir(parents=True, exist_ok=True)

base_path = models_dir / f"td3_final_{stamp}.pth"
save_path = make_unique_path(base_path)

agent.save_all(
    save_path.as_posix(),
    extra={
        "total_step": int(total_step),
        "reward_history": rh,  # 必要ならそのままでOK
    }
)

print(f"saved to {save_path}")

saved to models/td3_final_20260131_141342.pth
