In [11]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# =========================
# 0. 读数据 + 划分 + 标准化
# =========================
df = pd.read_excel("../../data/ENB2012_data.xlsx")

# 前 8 个特征，Y1: Heating Load
X_raw = df.iloc[:, :8].values.astype(np.float32)
y_raw = df.iloc[:, 8].values.astype(np.float32).reshape(-1, 1)

# 60% 训练，20% 校准，20% 测试
X_tr_raw, X_tmp_raw, y_tr_raw, y_tmp_raw = train_test_split(
    X_raw, y_raw, test_size=0.4, random_state=0
)
X_cal_raw, X_test_raw, y_cal_raw, y_test_raw = train_test_split(
    X_tmp_raw, y_tmp_raw, test_size=0.5, random_state=0
)

scaler_x = StandardScaler().fit(X_tr_raw)
scaler_y = StandardScaler().fit(y_tr_raw)

X_tr_np   = scaler_x.transform(X_tr_raw)
X_cal_np  = scaler_x.transform(X_cal_raw)
X_test_np = scaler_x.transform(X_test_raw)

y_tr_np   = scaler_y.transform(y_tr_raw)
y_cal_np  = scaler_y.transform(y_cal_raw)
y_test    = scaler_y.transform(y_test_raw)  # 注意：y_test 保持 numpy，用于 inverse_transform

# 转成 tensor（全部在 CPU 上）
X_tr     = torch.tensor(X_tr_np,   dtype=torch.float32)
y_tr     = torch.tensor(y_tr_np,   dtype=torch.float32)
X_cal    = torch.tensor(X_cal_np,  dtype=torch.float32)
y_cal    = torch.tensor(y_cal_np,  dtype=torch.float32)
X_test_t = torch.tensor(X_test_np, dtype=torch.float32)

# =========================
# 2. 定义一个简单 MLP
# =========================
class MLP(nn.Module):
    def __init__(self, in_dim=8, hidden=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.ReLU(),
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, 1),
        )
    def forward(self, x):
        return self.net(x)

def train_mlp(X, y, epochs=200, batch_size=64, lr=1e-3):
    ds = TensorDataset(X, y)
    dl = DataLoader(ds, batch_size=batch_size, shuffle=True)
    model = MLP(in_dim=X.shape[1])
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.MSELoss()
    model.train()
    for _ in range(epochs):
        for xb, yb in dl:
            opt.zero_grad()
            pred = model(xb)
            loss = loss_fn(pred, yb)
            loss.backward()
            opt.step()
    return model

# =========================
# 3. Split Conformal (一次训练)
# =========================
def split_conformal(alpha=0.1):
    model = train_mlp(X_tr, y_tr)

    model.eval()
    with torch.no_grad():
        mu_cal = model(X_cal)
    resid = (y_cal - mu_cal).abs().cpu().numpy().ravel()

    # 用经验分位数估计 q_hat
    q_hat = np.quantile(resid, 1 - alpha)

    with torch.no_grad():
        mu_test = model(X_test_t)
    mu_test_np = mu_test.cpu().numpy().ravel()
    lower = mu_test_np - q_hat
    upper = mu_test_np + q_hat

    # 还原回原尺度
    y_test_np = scaler_y.inverse_transform(y_test).ravel()
    mu_test_raw = scaler_y.inverse_transform(mu_test_np.reshape(-1, 1)).ravel()
    lower_raw   = scaler_y.inverse_transform(lower.reshape(-1, 1)).ravel()
    upper_raw   = scaler_y.inverse_transform(upper.reshape(-1, 1)).ravel()

    coverage = np.mean((y_test_np >= lower_raw) & (y_test_np <= upper_raw))
    avg_len  = np.mean(upper_raw - lower_raw)

    return dict(
        coverage=coverage,
        avg_len=avg_len,
        q_hat=q_hat,
    )

# =========================
# 4. Quantile Regression (MLP)
# =========================
def quantile_loss(pred, y, tau):
    err = y - pred
    return torch.mean(torch.max(tau * err, (tau - 1) * err))

def train_qr(X, y, tau=0.1, epochs=200, batch_size=64, lr=1e-3):
    ds = TensorDataset(X, y)
    dl = DataLoader(ds, batch_size=batch_size, shuffle=True)
    model = MLP(in_dim=X.shape[1])
    opt = torch.optim.Adam(model.parameters(), lr=lr)
    model.train()
    for _ in range(epochs):
        for xb, yb in dl:
            opt.zero_grad()
            pred = model(xb)
            loss = quantile_loss(pred, yb, tau)
            loss.backward()
            opt.step()
    return model

def quantile_regression(alpha=0.1):
    tau_low  = alpha / 2.0        # 如 alpha=0.1 -> 0.05
    tau_high = 1.0 - alpha / 2.0  # -> 0.95

    # 分别训练下分位数和上分位数模型
    model_low  = train_qr(X_tr, y_tr, tau=tau_low)
    model_high = train_qr(X_tr, y_tr, tau=tau_high)

    model_low.eval()
    model_high.eval()
    with torch.no_grad():
        ql = model_low(X_test_t).cpu().numpy().ravel()
        qu = model_high(X_test_t).cpu().numpy().ravel()

    # 还原尺度
    y_test_np = scaler_y.inverse_transform(y_test).ravel()
    ql_raw = scaler_y.inverse_transform(ql.reshape(-1, 1)).ravel()
    qu_raw = scaler_y.inverse_transform(qu.reshape(-1, 1)).ravel()

    coverage = np.mean((y_test_np >= ql_raw) & (y_test_np <= qu_raw))
    avg_len  = np.mean(qu_raw - ql_raw)

    return dict(
        coverage=coverage,
        avg_len=avg_len,
    )

# =========================
# 5. 运行对比
# =========================
if __name__ == "__main__":
    alpha = 0.1  # 90% 区间

    res_cp = split_conformal(alpha=alpha)
    res_qr = quantile_regression(alpha=alpha)

    print("=== Split Conformal (MLP) ===")
    for k, v in res_cp.items():
        print(f"{k}: {v}")

    print("\n=== Quantile Regression (MLP) ===")
    for k, v in res_qr.items():
        print(f"{k}: {v}")


=== Split Conformal (MLP) ===
coverage: 0.8961038961038961
avg_len: 2.3536224365234375
q_hat: 0.11636930853128445

=== Quantile Regression (MLP) ===
coverage: 0.7987012987012987
avg_len: 3.593290328979492
