# ロボットアーム - Panda Gym

In [None]:
import os
import json
from pathlib import Path
import numpy as np
import gymnasium as gym
import panda_gym
import matplotlib.pyplot as plt
from matplotlib import font_manager
from tqdm import tqdm
from IPython.display import display, Markdown, Image

import torch

from gail_algo import GAILTrainer, build_expert_loader
from collect_expert_trajectories import collect_expert_trajectories

# 日本語フォント設定
import matplotlib
import japanize_matplotlib

* ロボット: Franka Emika Panda マニピュレータの操作タスクをシミュレート。
<br>

* 観測空間:
    - すべてのタスクにグリッパの位置と速度（6値）。
    - 物体を扱うタスクでは位置・姿勢・線形/回転速度が追加（物体1つにつき12値）。
    - グリッパ開閉距離（拘束されていなければ1値）。

* 行動空間:
    - グリッパの平行移動コマンド（x, y, z の3値）。
    - グリッパ開閉コマンド（1値）。
    
* シミュレーション:
    - エージェントの1ステップあたり20タイムステップ（各2ms）。
    - インタラクション周波数は25Hz。
    - ほとんどのタスクは約2秒（50ステップ）のエピソード長。
    
* 報酬関数:
    - 既定の報酬はスパース: 目標到達（5cm 以内）なら0、それ以外は -1。
    - スパース報酬は定義が簡単だが、進捗の手掛かりが少ない。

# PandaReach-v3

* ターゲット位置は30cm × 30cm × 30cmの範囲でランダム生成され、グリッパがそこへ到達するタスク。

In [None]:
env = gym.make(
    "PandaReach-v3",
    render_mode="rgb_array",
    renderer="OpenGL",
    render_target_position=[0, 0.15, 0.25],
    render_distance=0.85,
    render_yaw=135,
    render_pitch=-20,
)

In [None]:
print('\n観測空間:', env.observation_space)
print('\n行動空間: ', env.action_space)

obs, info = env.reset()
print('\n初期状態: ', (obs, info))

# 行動は環境のアクション空間と同じ形状にする（スカラーだとエラーになる）
action_sample = np.zeros(env.action_space.shape, dtype=np.float32)
print('\n環境での1ステップ: ', env.step(action_sample))

print('\n\nレンダリングした環境: ')
env.reset()
plt.axis('off')
plt.imshow(env.render())
plt.show()

In [None]:
obs_shape = env.observation_space['observation'].shape[0] + \
            env.observation_space['achieved_goal'].shape[0] + \
            env.observation_space['desired_goal'].shape[0]

# GAIL用のエキスパート軌跡

1. `../TD3/Models/Expert/` の事前学習TD3エキスパートで軌跡（状態 + 行動）をロールアウト。
2. 生成した軌跡を一度 `./expert_trajectories.pt` に保存し、以降の実行で再利用。
3. このファイルからミニバッチを作り、学習中にGAIL識別器へ供給する。

In [None]:
expert_path = "./expert_trajectories.pt"
if not os.path.exists(expert_path):
    collect_expert_trajectories(env_name="PandaReach-v3", episodes=200, steps_per_episode=300,
                                expert_model_path="../TD3/Models/Expert/", save_path=expert_path,
                                render=False)
else:
    print(f"キャッシュされたエキスパート軌跡を {expert_path} から利用します")

In [None]:
expert_loader = build_expert_loader(expert_path, batch_size=256, device=None, shuffle=True)

# GAIL目的関数（数式）
- **報酬（ポリシー側）**: $r(s,a) = \log D_\phi(s,a)$。識別器が「エキスパートらしい」と判断するほど報酬が高い。
- **識別器の損失**（最小化で表記）: 
$$\mathcal{L}_D = -\mathbb{E}_{(s,a)\sim\text{expert}}[\log D_\phi(s,a)] - \mathbb{E}_{(s,a)\sim\pi_\theta}[\log (1 - D_\phi(s,a))]$$
- **ポリシー（生成側）の目的**: 
$$\max_\theta\; \mathbb{E}_{(s,a)\sim\pi_\theta}[\log D_\phi(s,a)]$$
- **TD3本体**: Actor/Critic はTD3そのまま（2つのCriticで $\min(Q_1, Q_2)$、ターゲット平滑化、遅延Actor更新）。GAILでは上記の $\log D$ を外部報酬として TD3 学習に渡す点のみが異なる。

In [None]:
# GAILエージェント設定（繰り返し実行用に辞書で保持）
gail_agent_kwargs = dict(
    env=env,
    input_dims=obs_shape,
    agent_name='GAIL',
    model_save_path='./Models/Apprentice/',
    exploration_period=300,
    disc_lr=3e-4,
    disc_updates=2,
    gail_reward_scale=1.0,
    expert_loader=expert_loader,
)

print('GAILエージェント設定を初期化しました。次のセルで繰り返し学習できます。')

In [None]:
# y/n で追加学習を繰り返し、各Runの結果を即座に可視化して保存する
run_idx = 0
keep_training = 'y'

results_dir = Path("../Results/GAIL")
results_dir.mkdir(parents=True, exist_ok=True)
manifest_path = results_dir / "run_manifest.json"

# 既存のマニフェストを読み込み
if manifest_path.exists():
    try:
        with open(manifest_path, "r") as f:
            run_manifest = json.load(f)
    except Exception:
        run_manifest = []
else:
    run_manifest = []

while keep_training == 'y':
    run_idx += 1
    # モデル保存先を run ごとに分けて上書きを避ける
    gail_agent_kwargs['model_save_path'] = f"./Models/Apprentice_Run_{run_idx}/"

    agent = GAILTrainer(**gail_agent_kwargs)

    score_history, avg_score_history = agent.gail_train(
        n_episodes=500,
        opt_steps=10,
        print_every=50,
        render_save_path=None,
        plot_save_path=f"../Results/GAIL/GAIL_Performance_Run_{run_idx}.png",
    )

    agent.save_model()
    print(f"Run {run_idx} を保存しました -> {gail_agent_kwargs['model_save_path']}")

    # 学習履歴を保存
    history_path = results_dir / f"GAIL_History_Run_{run_idx}.json"
    with open(history_path, "w") as f:
        json.dump({"score_history": score_history, "avg_score_history": avg_score_history}, f)

    # マニフェストを更新して保存
    run_manifest.append({
        "run_idx": run_idx,
        "history_path": str(history_path),
        "plot_path": f"../Results/GAIL/GAIL_Performance_Run_{run_idx}.png",
        "model_path": gail_agent_kwargs['model_save_path'],
    })
    with open(manifest_path, "w") as f:
        json.dump(run_manifest, f, indent=2)

    # 直近Runのグラフを即座に表示
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(score_history, label='Score')
    ax.plot(avg_score_history, label='Average Score')
    ax.set_title(f'GAIL Run {run_idx} 学習曲線')
    ax.set_xlabel('エピソード')
    ax.set_ylabel('スコア')
    ax.legend()
    ax.grid(True, linestyle='--', alpha=0.5)
    plt.tight_layout()
    display(Markdown(f"### Run {run_idx} の学習結果"))
    display(fig)
    plt.close(fig)

    keep_training = input('別のGAILエージェントを続けて学習しますか? (y/n): ').strip().lower()
    if keep_training != 'y':
        keep_training = 'n'
        print('GAILの学習ループを終了します。')

# GAILポリシーの評価

In [None]:
# 保存済みの各Runの学習曲線をまとめて表示
from glob import glob

results_dir = Path("../Results/GAIL")
manifest_path = results_dir / "run_manifest.json"

if not manifest_path.exists():
    print("まだ保存済みのRunがありません。学習セルを先に実行してください。")
else:
    with open(manifest_path, "r") as f:
        run_manifest = json.load(f)

    if len(run_manifest) == 0:
        print("マニフェストが空です。学習セルを先に実行してください。")
    else:
        for entry in run_manifest:
            run_idx = entry.get("run_idx")
            history_path = entry.get("history_path")
            if not history_path or not Path(history_path).exists():
                print(f"Run {run_idx}: 履歴ファイルが見つかりませんでした ({history_path})")
                continue

            with open(history_path, "r") as f:
                hist = json.load(f)
            score_history = hist.get("score_history", [])
            avg_score_history = hist.get("avg_score_history", [])

            fig, ax = plt.subplots(figsize=(8, 4))
            ax.plot(score_history, label='Score')
            ax.plot(avg_score_history, label='Average Score')
            ax.set_title(f'GAIL Run {run_idx} 学習曲線 (保存済み)')
            ax.set_xlabel('エピソード')
            ax.set_ylabel('スコア')
            ax.legend()
            ax.grid(True, linestyle='--', alpha=0.5)
            plt.tight_layout()
            display(Markdown(f"### Run {run_idx} の学習結果 (保存済み)"))
            display(fig)
            plt.close(fig)

学習済みGAILポリシーを実行し、GIFを保存して得られた報酬を確認します。

In [None]:
gif_base = '../Results/GAIL/GAIL Policy'
gif_path = f"{gif_base}.gif"

gail_reward = agent.test_model(env=env, steps=200, render_save_path=gif_base, fps=5)
print('GAILの報酬: ', gail_reward)

if Path(gif_path).exists():
    display(Markdown('### GAILポリシーの実行結果 (GIF)'))
    display(Image(filename=gif_path))
else:
    print(f"GIF が見つかりませんでした: {gif_path}")