In [1]:
import os
os.chdir("..")

In [2]:
from deep_reinforcement_learning.agent.ppo_agent import create_ppo_for_hvac

In [3]:
import gymnasium as gym
import numpy as np
import torch
import types
from tianshou.data import Batch

In [4]:
import types
import gymnasium as gym
import numpy as np
import torch

# 例：候補値
set_temp_list = [18, 20, 22, 24, 26]
set_mode_list = ["cool", "dry", "heat", "auto"]
set_wind_list = ["low", "mid", "high"]
n_devices = 5

# 行動空間（そのまま）
action_space = gym.spaces.MultiDiscrete(
    np.array([len(set_temp_list), len(set_mode_list), len(set_wind_list)] * n_devices)
)

# ★ 観測の shape を“実際の環境に合わせて”設定（例：64次元ベクトル）
obs_shape = (64,)
observation_space = gym.spaces.Box(
    low=-np.inf, high=np.inf, shape=obs_shape, dtype=np.float32
)

# ★ ダミー環境（必要な属性だけ持つ）
single_env = types.SimpleNamespace(
    observation_space=observation_space,
    action_space=action_space,
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

policy = create_ppo_for_hvac(
    single_env=single_env,             # ← ここだけ本物のenvっぽく渡す
    device=device,
    lr=3e-4,
    set_temp_list=set_temp_list,
    set_mode_list=set_mode_list,
    set_wind_list=set_wind_list,
    n_devices=n_devices,
    deterministic_eval=True,
    discount_factor=1.0,
    gae_lambda=1.0,
    eps_clip=0.18,
    value_clip=True,
    vf_coef=0.5,
    ent_coef=0.02,
    max_grad_norm=0.5,
    advantage_normalization=True,
    reward_normalization=True,
)


In [5]:
import numpy as np
import torch
from tianshou.data import Batch

H = n_devices * 3  # temp,mode,wind × 台数

def to_index_array(act, H):
    # なんでも np.int64 の1D配列に正規化
    if isinstance(act, torch.Tensor):
        return act.detach().cpu().long().numpy().reshape(-1)[:H]
    arr = np.asarray(act, dtype=object)
    if arr.dtype == object:
        flat = []
        for x in arr.ravel():
            if isinstance(x, torch.Tensor):
                flat.append(int(x.detach().cpu().reshape(()).item()))
            elif hasattr(x, "item"):
                try: flat.append(int(x.item()))
                except: pass
            elif isinstance(x, (list, tuple, np.ndarray)):
                flat.extend(to_index_array(x, H).tolist())
            else:
                flat.append(int(x))
        return np.asarray(flat[:H], dtype=np.int64)
    return arr.astype(np.int64).reshape(-1)[:H]

def sample_and_show(policy, B=1):
    # ダミー観測を B 本
    obs_batch = np.random.randn(B, *obs_shape).astype(np.float32)
    batch = Batch(obs=obs_batch, info=Batch())  # info は空でOK

    # 確率サンプル（学習時の挙動）
    policy.train()
    out = policy.forward(batch)
    act_sto = out.act
    print("[stochastic] type:", type(act_sto), "shape:", getattr(act_sto, "shape", None))
    # 正規化して [H] または [B,H] 相当を確認
    if B == 1:
        idx_sto = to_index_array(act_sto, H)
        print("[stochastic] normalized shape:", idx_sto.shape, "values:", idx_sto.tolist())
    else:
        # B>1 のときは各サンプルを表示
        if isinstance(act_sto, torch.Tensor):
            arr = act_sto.detach().cpu().long().numpy()
        else:
            arr = np.asarray(act_sto)
        print("[stochastic] array shape:", arr.shape)
        print("[stochastic] first row (len=", H, "):", arr[0].tolist()[:H])

    # Greedy（評価時の挙動）
    policy.eval()
    out_g = policy.forward(batch, deterministic=True)
    act_greedy = out_g.act
    print("[greedy    ] type:", type(act_greedy), "shape:", getattr(act_greedy, "shape", None))
    if B == 1:
        idx_g = to_index_array(act_greedy, H)
        print("[greedy    ] normalized shape:", idx_g.shape, "values:", idx_g.tolist())

# 実行
sample_and_show(policy, B=1)


[stochastic] type: <class 'torch.Tensor'> shape: torch.Size([1, 15])
[stochastic] normalized shape: (15,) values: [2, 0, 0, 1, 0, 0, 4, 3, 0, 1, 0, 1, 1, 2, 0]
[greedy    ] type: <class 'torch.Tensor'> shape: torch.Size([1, 15])
[greedy    ] normalized shape: (15,) values: [2, 0, 0, 1, 0, 0, 4, 3, 0, 1, 0, 1, 1, 2, 0]
