In [None]:

# 1) 필수 라이브러리 설치/임포트

import numpy as np
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split
from dataset.traffic_dataset import TrafficDataset
from dataset.dataset_config import edge_index, edge_attr
#from models.baselines import STGCN  # 튜닝 대상 모델
#from models.FreTSformer import FreTSformer
from models.STLinear import STLinear
from models.STLinear_deriven import STLinear_SPE
from utils.Trainer import Trainer    # 앞서 만든 Trainer
import optuna


# 2) collate_fn 정의 (기존과 동일)
def collate_fn(batch_list):
    xs = torch.stack([data.x for data in batch_list], dim=0)  # [B, T, E, C]
    ys = torch.stack([data.y for data in batch_list], dim=0)  # [B, n_pred, E, D]
    return xs, ys


In [None]:

# 3) 데이터 준비 
dataset_np = np.load('dataset/traffic_dataset_13_smoothen.npy', allow_pickle=True)
dataset = TrafficDataset(dataset_np, window=12, randomize=False)

train_size = int(len(dataset) * 0.8)
val_size   = len(dataset) - train_size
train_ds, val_ds = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_ds, batch_size=512, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(val_ds,   batch_size=512, shuffle=False, collate_fn=collate_fn)

# 배치 한 번 꺼내서 형상 확인
x0, y0 = next(iter(train_loader))
B, T, E, C_in = x0.shape
_, n_pred, _, C_out = y0.shape

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Dataset shapes: x0={x0.shape}, y0={y0.shape}, device={device}")


In [None]:

# 4) Optuna Objective 정의
def objective(trial):
    # --- Hyperparameter suggestions ---
    # 공통
    lr = trial.suggest_loguniform("lr", 1e-5, 1e-3)
    weight_decay = trial.suggest_loguniform("weight_decay", 1e-6, 1e-3)
    dropout = trial.suggest_uniform("dropout", 0.1, 0.3)
    # GNN 전용
    kernel_size = trial.suggest_categorical("kernel_size", [17, 33, 65])
    K = trial.suggest_int("K", 1, 3)
    num_layers = trial.suggest_int("num_layers", 2, 4)
    num_heads = trial.suggest_categorical("num_heads", [2,4,8])

    model = STLinear_SPE(
        num_nodes =E,
        kernel_size=kernel_size, #odd number
        num_heads=num_heads,
        num_layers=num_layers,
        dropout=dropout,
    )


    
    # --- 모델 초기화 ---
    # model = STGCN(
    #     num_nodes=E,
    #     node_feature_dim=C_in,
    #     pred_node_dim=C_out,
    #     n_pred=n_pred,
    #     encoder_embed_dim=embed_dim,
    #     encoder_depth=depth,
    #     kernel_size=kernel_size,
    #     K=K,
    #     dropout=dropout,
    #     num_channel_block=num_channel_block,
    #     num_time_block=num_time_block

    # ).to(device)

    optimizer = AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = torch.nn.L1Loss()

    # --- Trainer 실행 (간단히 10 epoch) ---
    trainer = Trainer(
        model=model,
        train_loader=train_loader,
        valid_loader=val_loader,
        optimizer=optimizer,
        criterion=criterion,
        epochs=30,
        device=device,
        print_interval=0,    # 출력 자제
        plot_interval=0,     # 시각화 자제
        early_stopping_patience=4
    )
    trainer.fit()

    # 최종 검증 손실 반환
    valid_loss = trainer.get_best_valid_loss() # 또는 직접 기록한 best
    return valid_loss


In [None]:

# 5) Optuna Study 생성 및 최적화 실행
study = optuna.create_study(
    direction="minimize",
    sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective, n_trials=30)


# 6) 최적 결과 확인
print("Best validation loss:", study.best_value)
print("Best hyperparameters:")
for k,v in study.best_params.items():
    print(f"  {k}: {v}")


# 7) 베스트 파라미터로 재학습 & 결과 시각화 예시
best_params = study.best_params
best_model = STLinear_SPE(
    num_nodes=E,
    node_feature_dim=C_in,
    pred_node_dim=C_out,
    n_pred=n_pred,
    kernel_size=best_params['kernel_size'],
    num_heads=best_params['num_heads'],
    num_layers=best_params['num_layers'],
    dropout=best_params['dropout']
).to(device)

best_opt = AdamW(best_model.parameters(), lr=best_params['lr'], weight_decay=best_params['weight_decay'])
trainer = Trainer(
    model=best_model,
    train_loader=train_loader,
    valid_loader=val_loader,
    optimizer=best_opt,
    criterion=torch.nn.L1Loss(),
    epochs=60,
    device=device,
    print_interval=1,
    plot_interval=2
)
trainer.fit()
hist = trainer.get_history()

import matplotlib.pyplot as plt
plt.plot(hist['train_loss'], label='Train Loss')
plt.plot(hist['valid_loss'], label='Val Loss')
plt.legend()
plt.show()

x_batch, y_batch = next(iter(train_loader))  # (B, T, E, D), (B, T_out, E, D_out)
x_input = x_batch[0].unsqueeze(0).to(device) # B=1로 만듦

best_model.eval()

with torch.no_grad():
    output, attention_maps = best_model(x_input, return_attn=True)  # x_input: (B, T, E, D')

# 예시 시각화: 첫 번째 레이어, 첫 번째 배치, 첫 타임스텝, 헤드 0
import matplotlib.pyplot as plt
import seaborn as sns

attn = attention_maps[0][0, 0, 0].cpu().numpy()  # (E, E)
sns.heatmap(attn)
plt.title("Layer 1, Head 0 Attention Map at t=0")
plt.show()