In [None]:
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, StackingRegressor
from sklearn.linear_model import ElasticNet, Ridge
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.metrics import mean_squared_error

# ---------- 参数（可调） ----------
EXCEL_PATH = "./回归预测.xlsx"  
TRAIN_SHEET = 0
TEST_SHEET = 1

# 索引说明（0-based）
NUM_FEATURES = list(range(0, 30))   
DRUG_COL = 30                       
TARGET_COL = 31                     

# CV / 调参设置
CV_FOLDS = 5
RANDOM_STATE = 42

# 为防止除以0，计算相对误差时的极小值
EPS = 1e-8
# -----------------------------------

def load_data(path):
    df_train = pd.read_excel(path, sheet_name=TRAIN_SHEET, header=None)
    df_test  = pd.read_excel(path, sheet_name=TEST_SHEET,  header=None)
    return df_train, df_test

def prepare_xy(df):
    X = df.iloc[:, :31].copy()   
    y = df.iloc[:, TARGET_COL].astype(float).values
    return X, y

def build_pipeline(random_state=RANDOM_STATE):
    # 预处理：数值标准化 + 药物列 OneHot
    numeric_idx = NUM_FEATURES
    cat_idx = [DRUG_COL]

    numeric_transformer = Pipeline([
        ("scaler", StandardScaler())
    ])

    categorical_transformer = Pipeline([
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
    ])

    preprocessor = ColumnTransformer(transformers=[
        ("num", numeric_transformer, numeric_idx),
        ("drug", categorical_transformer, cat_idx)
    ])

    # 基学习器
    rf = RandomForestRegressor(n_jobs=-1, random_state=random_state)
    gb = GradientBoostingRegressor(random_state=random_state)
    svr = SVR()
    en = ElasticNet(random_state=random_state, max_iter=10000)

    # stacking 元学习器
    stacking = StackingRegressor(
        estimators=[('rf', rf), ('gb', gb), ('svr', svr), ('en', en)],
        final_estimator=Ridge(),
        n_jobs=-1,
        passthrough=False  
    )

    # 完整 pipeline
    pipeline = Pipeline([
        ("preproc", preprocessor),
        ("stack", stacking)
    ])

    return pipeline

def get_param_grid():
    # 对每个基学习器挑选若干重要超参（网格规模适中）
    param_grid = {
        # RandomForest
        "stack__rf__n_estimators": [100, 300],
        "stack__rf__max_depth": [None, 10, 25],
        # GradientBoosting
        "stack__gb__n_estimators": [100, 300],
        "stack__gb__learning_rate": [0.05, 0.1],
        "stack__gb__max_depth": [3, 6],
        # SVR (注意：对大数据集 SVR 慢)
        "stack__svr__C": [0.1, 1.0],
        "stack__svr__epsilon": [0.01, 0.1],
        # ElasticNet
        "stack__en__alpha": [0.001, 0.01, 0.1],
        "stack__en__l1_ratio": [0.2, 0.5, 0.8]
    }
    return param_grid

def fit_with_cv(pipeline, param_grid, X_train, y_train):
    # 使用 GridSearchCV 在训练集上CV调参
    cv = KFold(n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE)
    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        scoring="neg_mean_squared_error",
        cv=cv,
        n_jobs=-1,
        verbose=0,
        refit=True
    )
    grid.fit(X_train.values, y_train)
    return grid

def evaluate(y_true, y_pred):
    # 总体 SSE
    sse = np.sum((y_pred - y_true) ** 2)
    denom = np.sum(y_true ** 2) + EPS
    rel_sse = sse / denom

    # 每样本相对平方误差（除以 y_true^2，避免除0）
    per_sample_rel_sqerr = ((y_pred - y_true) ** 2) / (y_true ** 2 + EPS)
    per_sample_sqerr = (y_pred - y_true) ** 2

    stats = {
        "SSE": float(sse),
        "RelSSE": float(rel_sse),
        "per_sample_rel_sqerr_mean": float(np.mean(per_sample_rel_sqerr)),
        "per_sample_rel_sqerr_var": float(np.var(per_sample_rel_sqerr, ddof=1)),
        "per_sample_sqerr_mean": float(np.mean(per_sample_sqerr)),
        "per_sample_sqerr_var": float(np.var(per_sample_sqerr, ddof=1))
    }
    return stats, per_sample_rel_sqerr, per_sample_sqerr

def main():
    print("读取数据...")
    df_train, df_test = load_data(EXCEL_PATH)

    X_train, y_train = prepare_xy(df_train)
    X_test,  y_test  = prepare_xy(df_test)

    print(f"训练样本: {X_train.shape}, 测试样本: {X_test.shape}")

    pipeline = build_pipeline()
    param_grid = get_param_grid()

    print("开始 GridSearchCV 调参（训练集上使用 %d 折 CV）..." % CV_FOLDS)
    grid = fit_with_cv(pipeline, param_grid, X_train, y_train)
    print("最佳参数：")
    print(grid.best_params_)
    print("最佳 CV（neg MSE）：", grid.best_score_)

    print("在测试集上做预测并计算误差...")
    best_model = grid.best_estimator_
    y_pred = best_model.predict(X_test.values)

    stats, per_sample_rel_sqerr, per_sample_sqerr = evaluate(y_test, y_pred)

    print("\n====== 测试集总体与样本误差统计 ======")
    print(f"测试集 SSE: {stats['SSE']:.6f}")
    print(f"测试集 相对 SSE (RelSSE = SSE / sum(y_true^2)): {stats['RelSSE']:.6f}")
    print(f"每样本 相对平方误差 mean: {stats['per_sample_rel_sqerr_mean']:.6f}, var: {stats['per_sample_rel_sqerr_var']:.6f}")
    print(f"每样本 平方误差 mean: {stats['per_sample_sqerr_mean']:.6f}, var: {stats['per_sample_sqerr_var']:.6f}")

    # 可导出每样本误差表
    result_df = pd.DataFrame({
        "y_true": y_test,
        "y_pred": y_pred,
        "sq_error": per_sample_sqerr,
        "rel_sq_error": per_sample_rel_sqerr
    })
    out_csv = "test_sample_errors.csv"
    result_df.to_csv(out_csv, index=False)
    print(f"\n已把每个测试样本的真实值、预测值和误差写入: {out_csv}")

if __name__ == "__main__":
    main()


读取数据...
训练样本: (550, 31), 测试样本: (137, 31)
开始 GridSearchCV 调参（训练集上使用 5 折 CV）...
最佳参数：
{'stack__en__alpha': 0.001, 'stack__en__l1_ratio': 0.8, 'stack__gb__learning_rate': 0.05, 'stack__gb__max_depth': 3, 'stack__gb__n_estimators': 100, 'stack__rf__max_depth': 25, 'stack__rf__n_estimators': 300, 'stack__svr__C': 1.0, 'stack__svr__epsilon': 0.01}
最佳 CV（neg MSE）： -296.5382066856176
在测试集上做预测并计算误差...

测试集 SSE: 38747.915773
测试集 相对 SSE (RelSSE = SSE / sum(y_true^2)): 0.077278
每样本 相对平方误差 mean: 0.087586, var: 0.018604
每样本 平方误差 mean: 282.831502, var: 287900.018751

已把每个测试样本的真实值、预测值和误差写入: test_sample_errors.csv
