# 学習済みモデルの読み込み・出力（エンコーダ・デコーダ型LSTM）

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim

import numpy as np
from math import sqrt
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# モデルの定義

In [2]:
class StateBridge(nn.Module):
    """
    Encoderの(h, c)をDecoder初期状態へ写像するブリッジ。
    - 層数/隠れ次元が異なってもOK
    - bridge_mode:
        - "zero_pad":  層合わせ=0埋め or 切り落とし、隠れ次元は線形射影
        - "repeat_top":層合わせ=最上層の繰り返し/切り落とし、隠れ次元は線形射影
        - "linear_stack": [B, L_enc, H_enc] -> 線形で [B, L_dec, H_dec] へ（層方向も学習で混合）

    - enc_layers: エンコーダの層の深さ
    - dec_layers: デコーダの層の深さ
    - enc_hidden: エンコーダのノード数
    - dec_hidden: デコーダのノード数
    """
    def __init__(self, enc_layers, dec_layers, enc_hidden, dec_hidden, mode="zero_pad"):
        super().__init__()
        self.enc_layers = enc_layers
        self.dec_layers = dec_layers
        self.enc_hidden = enc_hidden
        self.dec_hidden = dec_hidden
        self.mode = mode

        # 隠れ次元の変換（h/c共用）
        if mode in ("zero_pad", "repeat_top"):
            self.proj = nn.Linear(enc_hidden, dec_hidden, bias=True)
        elif mode == "linear_stack":
            # 層方向もまとめて線形変換
            self.proj_h = nn.Linear(enc_layers * enc_hidden, dec_layers * dec_hidden, bias=True)
            self.proj_c = nn.Linear(enc_layers * enc_hidden, dec_layers * dec_hidden, bias=True)
        else:
            raise ValueError(f"Unknown bridge mode: {mode}")

    def _match_layers(self, x, how="zero_pad"):
        """
        x: [L_enc, B, H_enc] を層数だけ合わせる（隠れ次元は未変換）
        return: [L_dec, B, H_enc]

        B: バッチサイズ
        """
        L_enc, B, H = x.shape
        L_dec = self.dec_layers

        if L_dec == L_enc:
            return x

        if L_dec < L_enc:
            # 上位層を優先して切り落とす（直観的には最上層が一番抽象的）
            return x[:L_dec, :, :]

        # L_dec > L_enc の場合
        pad_count = L_dec - L_enc
        if how == "repeat_top":
            top = x[-1:, :, :].expand(pad_count, B, H)  # 最上層を複製
            return torch.cat([x, top], dim=0)
        else:  # zero_pad
            pad = x.new_zeros(pad_count, B, H)
            return torch.cat([x, pad], dim=0)

    def forward(self, h_enc, c_enc):
        """
        h_enc, c_enc: [L_enc, B, H_enc]
        返り値: (h0_dec, c0_dec) それぞれ [L_dec, B, H_dec]
        """
        if self.mode in ("zero_pad", "repeat_top"):
            # 層合わせ（まだ enc_hidden 次元のまま）
            h = self._match_layers(h_enc, "repeat_top" if self.mode=="repeat_top" else "zero_pad")
            c = self._match_layers(c_enc, "repeat_top" if self.mode=="repeat_top" else "zero_pad")
            # 次元射影
            L, B, H = h.shape
            h = self.proj(h)  # broadcasting: [L,B,H_enc]->[L,B,H_dec]
            c = self.proj(c)
            return h, c

        else:  # linear_stack
            # [L_enc,B,H_enc] -> [B, L_enc*H_enc]
            L_enc, B, H_enc = h_enc.shape
            flat_h = h_enc.transpose(0,1).reshape(B, L_enc*H_enc)
            flat_c = c_enc.transpose(0,1).reshape(B, L_enc*H_enc)
            # 線形写像
            out_h = self.proj_h(flat_h)  # [B, L_dec*H_dec]
            out_c = self.proj_c(flat_c)
            # [L_dec,B,H_dec] に戻す
            L_dec, H_dec = self.dec_layers, self.dec_hidden
            h = out_h.view(B, L_dec, H_dec).transpose(0,1).contiguous()
            c = out_c.view(B, L_dec, H_dec).transpose(0,1).contiguous()
            return h, c

In [3]:
class Seq2SeqLSTM(nn.Module):
    def __init__(
        self,
        in_enc: int, # エンコーダ入力変数の数
        in_dec: int, # デコーダ入力変数の数
        out_dim: int, # 出力変数の数
        enc_hidden: int = 128,
        dec_hidden: int = 128,
        enc_layers: int = 2,
        dec_layers: int = 3,
        bridge_mode: str = "zero_pad",  # "zero_pad" | "repeat_top" | "linear_stack"
        dropout: float = 0.0, # LSTMの層間ドロップアウト率
        bidirectional_enc: bool = False,  # エンコーダのみ双方向にするかどうか（必要ならEncoderを双方向にも）
        head_activation="relu", # 活性化関数
    ):
        super().__init__()
        self.bidirectional_enc = bidirectional_enc
        enc_dir = 2 if bidirectional_enc else 1
        enc_hidden_eff = enc_hidden * enc_dir  # 双方向なら出力次元が倍

        self.enc = nn.LSTM(
            input_size=in_enc,
            hidden_size=enc_hidden,
            num_layers=enc_layers,
            batch_first=True,
            dropout=dropout if enc_layers > 1 else 0.0,
            bidirectional=bidirectional_enc
        )

        self.dec = nn.LSTM(
            input_size=in_dec,
            hidden_size=dec_hidden,
            num_layers=dec_layers,
            batch_first=True,
            dropout=dropout if dec_layers > 1 else 0.0
        )

        # Encoderが双方向のときは、(h_fwd, h_bwd) を結合した次元 enc_hidden_eff を
        # Decoder hidden 次元へ写像する必要がある
        self.bridge = StateBridge(
            enc_layers=enc_layers * enc_dir,
            dec_layers=dec_layers,
            enc_hidden=enc_hidden,
            dec_hidden=dec_hidden,
            mode=bridge_mode
        ) if (enc_layers != dec_layers or enc_dir != 1 or enc_hidden != dec_hidden or bridge_mode=="linear_stack") else None

        self.head = nn.Linear(dec_hidden, out_dim) # 全結合層

        self.act = {
            "identity": nn.Identity(),
            "relu": nn.ReLU(),
            "tanh": nn.Tanh(),
            "sigmoid": nn.Sigmoid(),
        }[head_activation]

    def _extract_final_states(self, out, hc):
        """
        LSTMの出力から (h_T, c_T) を取り出して成形。
        双方向Encoderの場合は各層ごとに [fwd, bwd] を層方向に並べる。
        """
        h, c = hc  # [num_layers * num_directions, B, H]
        return h, c

    def forward(self, enc_x, dec_x):
        """
        enc_x: [B, Te, in_enc]
        dec_x: [B, Td, in_dec]
        return: yhat [B, Td, out_dim]
        """
        _, (h_T, c_T) = self.enc(enc_x)  # h_T,c_T: [L_enc * dir, B, H_enc]

        if self.bridge is not None:
            h0_dec, c0_dec = self.bridge(h_T, c_T)  # [L_dec, B, H_dec]
        else:
            # 形・次元が完全一致ならそのまま
            h0_dec, c0_dec = h_T, c_T

        dec_out, _ = self.dec(dec_x, (h0_dec, c0_dec))  # [B, Td, H_dec]
        yhat = self.act(self.head(dec_out))             # [B, Td, out_dim]
        return yhat



# モデルの読み込み

In [4]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 保存したパスを指定
save_path = r"C:\Users\ryoya\MasterThesis\MT_Furuie\results\Miwa_LSTM\Tiral_3\LSTM_trial_3_3"

# チェックポイントの読み込み
ckpt = torch.load(save_path, map_location="cpu")

# モデルの再構築
cfg = ckpt["cfg"]
model = Seq2SeqLSTM(
    in_enc=cfg["in_enc"], in_dec=cfg["in_dec"], out_dim=cfg["out_dim"],
    enc_hidden=cfg["enc_hidden"], dec_hidden=cfg["dec_hidden"],
    enc_layers=cfg["enc_layers"], dec_layers=cfg["dec_layers"],
    bridge_mode=cfg["bridge_mode"], dropout=cfg["dropout"],
    bidirectional_enc=cfg["bidirectional_enc"],
    head_activation=cfg["head_activation"]
).to(device)

# パラメータをロード
model.load_state_dict(ckpt["model_state"])
model.eval() # 評価モードに切り替え

# 標準化パラメータの取り出し
scaler = ckpt["scaler"]
mean = scaler["mean"]                # pandas.Series
std  = scaler["std"]                 # pandas.Series
data_columns = scaler["data_columns"]  # 列名リスト
y_pos = scaler["y_pos"]

print("----平均----")
print(mean)
print("----標準偏差----")
print(std)
print("----y_pos----")
print(y_pos)



----平均----
CumRain_24h     16.075096
Qin(m3/s)       48.531916
Tur(ppm)       386.243583
CumRain_24h     16.075096
Qin(m3/s)       48.531916
Tur(ppm)       386.243583
dtype: float64
----標準偏差----
CumRain_24h      26.018061
Qin(m3/s)        55.028893
Tur(ppm)       1036.826873
CumRain_24h      26.018061
Qin(m3/s)        55.028893
Tur(ppm)       1036.826873
dtype: float64
----y_pos----
[5]


  ckpt = torch.load(save_path, map_location="cpu")


# モデル出力と実測値を出力・保存（試しに）

In [5]:
# データファイルの読み込み・列番号・タイムステップ長の指定
excel_path = r"C:\Users\ryoya\MasterThesis\MT_Furuie\data\Miwa_LSTM_Data\Trial_3\Miwa_hourlyAve_for_LSTM_trial3.xlsx"

# 列番号（0始まり）で指定
enc_cols   = [5, 3, 4]      # エンコーダ入力の列番号
dec_cols   = [5, 3]  # デコーダ入力の列番号
y_cols      = [4]          # 出力（目的変数）の列番号

Te = 72
Td = 240

df = pd.read_excel(excel_path, header=0)

# 必要列だけ抽出（順番固定）
use_cols = enc_cols + dec_cols + y_cols
data = df.iloc[:, use_cols].copy()

data_norm = (data - mean) / std # 全期間のデータを標準化


In [12]:
# モデルに入力する変数を作成する (enc_X: [Te, Fe], dec_X: [Td, Fd], y: [Td, Fo])

Fe, Fd, Fo = cfg["in_enc"], cfg["in_dec"], cfg["out_dim"]

# 出力したい部分の開始行番号を指定（0始まり） 38664 ,38760,(13032: 2020/06/27), (48048: 2024/06/25)
s = 48048

enc_window = data_norm.iloc[s : s + Te]
dec_window = data_norm.iloc[s + Te : s + Te + Td]
# テンソル化
enc_X = torch.tensor(enc_window.iloc[:, 0:Fe].to_numpy(dtype=np.float32))
dec_X = torch.tensor(dec_window.iloc[:, Fe:Fe+Fd].to_numpy(dtype=np.float32))
y     = torch.tensor(dec_window.iloc[:, Fe+Fd:Fe+Fd+Fo].to_numpy(dtype=np.float32))

# モデルに入力できる形へ変形
enc_in = enc_X.unsqueeze(0).to(device)
dec_in = dec_X.unsqueeze(0).to(device)

# デコーダに対応する時刻、実測のYを抽出
dec_T = df.iloc[s+Te:s+Te+Td, 0:2]
obs_Y = df.iloc[s+Te:s+Te+Td, y_cols]


In [13]:
# モデルから予測
pred_Y_norm = model(enc_in, dec_in)
pred_Y_norm = pred_Y_norm.squeeze(0)

# 逆標準化
ymean = mean.iloc[y_pos]
ystd = std.iloc[y_pos]
ymean_t = torch.tensor(ymean.values, dtype=torch.float32, device=pred_Y_norm.device).view(1, -1)
ystd_t  = torch.tensor(ystd.values,  dtype=torch.float32, device=pred_Y_norm.device).view(1, -1)

pred_Y = pred_Y_norm * ystd_t + ymean_t


In [14]:
# 予測結果などを結合し、エクセルで出力

# 1. pandas Series → DataFrame
dec_T_df = pd.DataFrame(dec_T).reset_index(drop=True)
obs_Y_df  = pd.DataFrame(obs_Y).reset_index(drop=True)

# 2. Tensor → NumPy → DataFrame
pred_Y_df = pd.DataFrame(pred_Y.detach().cpu().numpy())
dec_X_df  = pd.DataFrame(dec_X.detach().cpu().numpy())

# 3. 横方向に結合
out_df = pd.concat([dec_T_df, obs_Y_df, pred_Y_df, dec_X_df], axis=1)


excel_path = r"C:\Users\ryoya\MasterThesis\MT_Furuie\results\Miwa_LSTM\Tiral_3\output_3_3_3.xlsx"
out_df.to_excel(excel_path, index=False)

print("Excelに保存しました:", excel_path)


Excelに保存しました: C:\Users\ryoya\MasterThesis\MT_Furuie\results\Miwa_LSTM\Tiral_3\output_3_3_3.xlsx
