<a href="https://colab.research.google.com/github/hamagami/ad2025/blob/main/TransformerAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1) ライブラリ
import numpy as np, torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
np.random.seed(42); torch.manual_seed(42)
print("Device:", device)


In [None]:
# 2) 合成データ
#   正常: ch1 = 正弦波+雑音, ch2 = ch1の「長遅延コピー」+微小ノイズ（長期依存）
#   異常: ch2 を独立ノイズにしたり、遅延を大きく狂わせる（長期依存の破壊）
def make_pair_series(n=10000, delay=100, anomaly=False):
    t = np.arange(n)
    ch1 = 0.8*np.sin(2*np.pi*t/60) + 0.2*np.sin(2*np.pi*t/15) + 0.1*np.random.randn(n)
    ch2 = np.zeros_like(ch1)
    if not anomaly:
        ch2[delay:] = ch1[:-delay] + 0.05*np.random.randn(n-delay)
        ch2[:delay] = 0.05*np.random.randn(delay)
    else:
        ch2[:] = 0.6*np.random.randn(n)              # 関係を破壊
        # 一部区間だけ遅延をズラす（より難しい異常）
        for s in np.random.choice(np.arange(500, n-500), size=5, replace=False):
            d = delay + np.random.randint(-40, 40)
            L = np.random.randint(150, 250)
            e = min(n, s+L)
            ch2[s:e] = 0
            segL = min(e-s, n-d)
            if segL>0:
                ch2[s:s+segL] = ch1[d:d+segL] + 0.05*np.random.randn(segL)
    X = np.stack([ch1, ch2], axis=-1)   # shape: (n, 2)
    return X

N_TRAIN, N_VAL, N_TEST = 12000, 6000, 12000
DELAY = 100

train_ts = make_pair_series(N_TRAIN, delay=DELAY, anomaly=False)  # 正常のみ
val_ts   = make_pair_series(N_VAL,   delay=DELAY, anomaly=False)  # 閾値用 正常
test_ts  = make_pair_series(N_TEST,  delay=DELAY, anomaly=True)   # 異常あり


In [None]:
# 3) スライディングウィンドウ化
def to_windows(X, seq_len=256, stride=1):
    L = len(X)
    out = []
    for i in range(0, L-seq_len+1, stride):
        out.append(X[i:i+seq_len])
    return np.array(out)  # (N, L, C)

SEQ_LEN, STRIDE = 256, 1
X_train = to_windows(train_ts, SEQ_LEN, STRIDE)
X_val   = to_windows(val_ts,   SEQ_LEN, STRIDE)
X_test  = to_windows(test_ts,  SEQ_LEN, STRIDE)

# テストの窓ラベル（ざっくり）: ch2 と ch1のdelay整合が崩れているかの粗い指標
# 相関を使って擬似ラベル（教材用の簡易ラベリング）
def window_anom_label(w, delay=DELAY, tol=0.2):
    x1 = w[:,0]; x2 = w[:,1]
    if len(x1) <= delay+5: return 1
    r = np.corrcoef(x1[:-delay], x2[delay:])[0,1]
    return int(r < 0.6)  # 相関が低ければ異常

y_test = np.array([window_anom_label(w, delay=DELAY) for w in X_test])

X_train.shape, X_val.shape, X_test.shape, y_test.mean()


In [None]:
# 4) DataLoader
def tensor_ds(X):
    X = torch.tensor(X, dtype=torch.float32)
    return TensorDataset(X, X)
train_loader = DataLoader(tensor_ds(X_train), batch_size=64, shuffle=True)
val_loader   = DataLoader(tensor_ds(X_val),   batch_size=64, shuffle=False)
test_loader  = DataLoader(tensor_ds(X_test),  batch_size=64, shuffle=False)


In [None]:
# 5) LSTM-AE（ベースライン）：2変量時系列
class LSTMAE(nn.Module):
    def __init__(self, in_dim=2, hid=128, lat=64, layers=1):
        super().__init__()
        self.enc = nn.LSTM(input_size=in_dim, hidden_size=hid, num_layers=layers, batch_first=True)
        self.h2z = nn.Linear(hid, lat)
        self.z2h = nn.Linear(lat, hid)
        self.dec = nn.LSTM(input_size=in_dim, hidden_size=hid, num_layers=layers, batch_first=True)
        self.out = nn.Linear(hid, in_dim)
    def forward(self, x):
        _, (h, c) = self.enc(x)
        z = torch.relu(self.h2z(h[-1]))
        h0 = torch.relu(self.z2h(z)).unsqueeze(0)
        c0 = torch.zeros_like(h0)
        y, _ = self.dec(x, (h0, c0))  # Teacher forcing: 入力xで条件づけ
        return self.out(y)

lstm = LSTMAE().to(device)


In [None]:
# 6) Transformer-AE：自己注意で長距離依存を直接参照（エンコーダ型AE）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:,0::2] = torch.sin(pos*div)
        pe[:,1::2] = torch.cos(pos*div)
        self.register_buffer('pe', pe.unsqueeze(0))  # (1, L, d_model)
    def forward(self, x):
        L = x.size(1)
        return x + self.pe[:,:L,:]

class TransformerAE(nn.Module):
    def __init__(self, in_dim=2, d_model=64, nhead=4, num_layers=3, dim_ff=128, dropout=0.1):
        super().__init__()
        self.in_proj  = nn.Linear(in_dim, d_model)
        self.pos      = PositionalEncoding(d_model)
        enc_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                               dim_feedforward=dim_ff, dropout=dropout, batch_first=True)
        self.encoder  = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        # デコーダ相当は「エンコーダ出力をそのまま再構成」にする（同長）
        self.out_proj = nn.Linear(d_model, in_dim)
    def forward(self, x):
        h = self.in_proj(x)
        h = self.pos(h)
        z = self.encoder(h)
        y = self.out_proj(z)
        return y

trf = TransformerAE().to(device)


In [None]:
# 6) Transformer-AE：自己注意で長距離依存を直接参照（エンコーダ型AE）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len).unsqueeze(1).float()
        div = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:,0::2] = torch.sin(pos*div)
        pe[:,1::2] = torch.cos(pos*div)
        self.register_buffer('pe', pe.unsqueeze(0))  # (1, L, d_model)
    def forward(self, x):
        L = x.size(1)
        return x + self.pe[:,:L,:]

class TransformerAE(nn.Module):
    def __init__(self, in_dim=2, d_model=64, nhead=4, num_layers=3, dim_ff=128, dropout=0.1):
        super().__init__()
        self.in_proj  = nn.Linear(in_dim, d_model)
        self.pos      = PositionalEncoding(d_model)
        enc_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead,
                                               dim_feedforward=dim_ff, dropout=dropout, batch_first=True)
        self.encoder  = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        # デコーダ相当は「エンコーダ出力をそのまま再構成」にする（同長）
        self.out_proj = nn.Linear(d_model, in_dim)
    def forward(self, x):
        h = self.in_proj(x)
        h = self.pos(h)
        z = self.encoder(h)
        y = self.out_proj(z)
        return y

trf = TransformerAE().to(device)


In [None]:
# 7) 学習ユーティリティ
def train_ae(model, train_loader, val_loader, epochs=15, lr=1e-3):
    opt = optim.Adam(model.parameters(), lr=lr)
    crit = nn.MSELoss()
    best = np.inf
    for ep in range(epochs):
        model.train(); tr=0
        for xb,_ in train_loader:
            xb = xb.to(device)
            opt.zero_grad()
            recon = model(xb)
            loss = crit(recon, xb)
            loss.backward(); opt.step()
            tr += loss.item()
        model.eval(); va=0
        with torch.no_grad():
            for xb,_ in val_loader:
                xb = xb.to(device)
                va += crit(model(xb), xb).item()
        print(f"epoch {ep+1}/{epochs}  train={tr/len(train_loader):.5f}  val={va/len(val_loader):.5f}")
        best = min(best, va/len(val_loader))
    return best

print("=== Train LSTM-AE ===")
train_ae(lstm, train_loader, val_loader, epochs=15, lr=1e-3)
print("=== Train Transformer-AE ===")
train_ae(trf,  train_loader, val_loader, epochs=15, lr=1e-3)


In [None]:
# 8) 窓スコア（MSE）と閾値（検証95%点）
@torch.no_grad()
def window_scores(model, loader):
    model.eval(); scores=[]
    for xb,_ in loader:
        xb = xb.to(device)
        xh = model(xb)
        mse = torch.mean((xb - xh)**2, dim=(1,2))
        scores.append(mse.cpu().numpy())
    return np.concatenate(scores)

lstm_val = window_scores(lstm, val_loader)
trf_val  = window_scores(trf,  val_loader)
lstm_tst = window_scores(lstm, test_loader)
trf_tst  = window_scores(trf,  test_loader)

thr_lstm = np.percentile(lstm_val, 95)
thr_trf  = np.percentile(trf_val,  95)

pred_lstm = (lstm_tst > thr_lstm).astype(int)
pred_trf  = (trf_tst  > thr_trf ).astype(int)


In [None]:
# 9) 評価（擬似ラベル y_test を使用）
from sklearn.metrics import roc_auc_score, average_precision_score

def evaluate(name, val_scores, tst_scores, y):
    thr = np.percentile(val_scores, 95)
    y_pred = (tst_scores > thr).astype(int)
    auc = roc_auc_score(y, tst_scores)
    ap  = average_precision_score(y, tst_scores)
    tpr = np.mean(tst_scores[y==1] > thr)    # 検出率
    fpr = np.mean(tst_scores[y==0] > thr)    # 誤警報率
    print(f"{name}: ROC-AUC={auc:.3f}  PR-AUC={ap:.3f}  Thr={thr:.4g}  TPR={tpr:.3f}  FPR={fpr:.3f}")

evaluate("LSTM-AE", lstm_val, lstm_tst, y_test)
evaluate("Transformer-AE", trf_val, trf_tst, y_test)


In [None]:
# 10) 可視化（スコア分布と例）
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.hist(lstm_tst[y_test==0], bins=50, alpha=0.6, label="Normal")
plt.hist(lstm_tst[y_test==1], bins=50, alpha=0.6, label="Anomaly")
plt.axvline(thr_lstm, color='r', linestyle='--'); plt.title("LSTM-AE scores"); plt.legend()

plt.subplot(1,2,2)
plt.hist(trf_tst[y_test==0], bins=50, alpha=0.6, label="Normal")
plt.hist(trf_tst[y_test==1], bins=50, alpha=0.6, label="Anomaly")
plt.axvline(thr_trf, color='r', linestyle='--'); plt.title("Transformer-AE scores"); plt.legend()
plt.tight_layout(); plt.show()

# 異常窓の復元例
@torch.no_grad()
def show_example(model, X, idx, title):
    xb = torch.tensor(X[idx:idx+1], dtype=torch.float32).to(device)
    xh = model(xb).cpu().numpy()[0]  # (L,2)
    x  = X[idx]                      # (L,2)
    fig,ax = plt.subplots(2,1,figsize=(10,4), sharex=True)
    ax[0].plot(x[:,0], label='ch1'); ax[0].plot(xh[:,0], label='recon'); ax[0].set_title(title+" ch1"); ax[0].legend()
    ax[1].plot(x[:,1], label='ch2'); ax[1].plot(xh[:,1], label='recon'); ax[1].set_title(title+" ch2"); ax[1].legend()
    plt.show()

# 代表的に異常と判定された窓のインデックスを一個ピック
anom_idx_trf = np.where(pred_trf==1)[0]
anom_idx_lstm= np.where(pred_lstm==1)[0]
if len(anom_idx_trf)>0: show_example(trf,  X_test, anom_idx_trf[0],  "Transformer-AE anomaly example")
if len(anom_idx_lstm)>0: show_example(lstm, X_test, anom_idx_lstm[0], "LSTM-AE anomaly example")
