In [None]:

import os
from typing import List, Tuple

import numpy as np
import pandas as pd
import joblib

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNetCV
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, accuracy_score


In [2]:
# 数据路径与输出路径
CSV_PATH = "wave3ndata.csv"   # 改成你的数据路径
OUTDIR = "models_enet"
os.makedirs(OUTDIR, exist_ok=True)

# 固定目标列
TARGET_COL = "adlab_c"

# 随机种子
RANDOM_STATE = 42


In [3]:
# 工具函数
def detect_bin_con_cols(X: pd.DataFrame) -> Tuple[List[str], List[str]]:
    """
    根据唯一值判断哪些是0/1二值列，哪些是连续列。
    这么做是为了用不同的缺失填充策略。
    """
    bin_cols, con_cols = [], []
    for c in X.columns:
        vals = pd.unique(X[c].dropna())
        if len(vals) and set(np.unique(vals)).issubset({0, 1}):
            bin_cols.append(c)
        else:
            con_cols.append(c)
    return bin_cols, con_cols


def apply_impute_inplace(X: pd.DataFrame,
                         bin_cols: List[str],
                         con_cols: List[str]) -> dict:
    """
    按列填充缺失值，并把用到的填充值返回，方便推理时复用。
    - 二值列：众数
    - 连续列：中位数
    """
    stats = {"binary_modes": {}, "continuous_medians": {}}

    for c in bin_cols:
        if X[c].isna().any():
            mode_val = X[c].mode().iloc[0]
            X[c] = X[c].fillna(mode_val)
            stats["binary_modes"][c] = float(mode_val)

    for c in con_cols:
        if X[c].isna().any():
            med = X[c].median()
            X[c] = X[c].fillna(med)
            stats["continuous_medians"][c] = float(med)

    return stats


def eval_report(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    """
    和HGB保持一模一样的指标，方便对比：
    - MAE
    - RMSE
    - R2
    - Acc_rounded（四舍五入后准确率）
    - Within1（|误差|<=1 的比例）
    """
    mae = mean_absolute_error(y_true, y_pred)
    rmse = mean_squared_error(y_true, y_pred, squared=False)
    r2 = r2_score(y_true, y_pred)
    y_r = np.clip(np.rint(y_pred), 0, 6).astype(int)
    acc = accuracy_score(y_true, y_r)
    within1 = float(np.mean(np.abs(y_true - y_r) <= 1))

    return dict(MAE=mae, RMSE=rmse, R2=r2, Acc_rounded=acc, Within1=within1)


In [4]:
# 读取数据+清洗
df = pd.read_csv(CSV_PATH)

# 检查目标列
assert TARGET_COL in df.columns, f"数据中找不到目标列 {TARGET_COL}"

# 丢掉目标缺失的行
df = df.dropna(subset=[TARGET_COL]).copy()

# 把目标裁剪到 0~6 并取整，保持ADL指标的语义
df[TARGET_COL] = df[TARGET_COL].astype(float).round().clip(0, 6).astype(int)

# 只取数值特征
feature_cols = [c for c in df.columns if c != TARGET_COL and pd.api.types.is_numeric_dtype(df[c])]
X = df[feature_cols].copy()
y = df[TARGET_COL].values

print("数据形状：", X.shape, y.shape)


数据形状： (6506, 63) (6506,)


In [5]:
# 区分二值列和连续列
bin_cols, con_cols = detect_bin_con_cols(X)

# 填充缺失，并记录用过的填充值（推理要用）
impute_stats = apply_impute_inplace(X, bin_cols, con_cols)

impute_stats


{'binary_modes': {}, 'continuous_medians': {}}

In [6]:
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=RANDOM_STATE,
    stratify=y   # 按0~6分层，保持比例
)

X_train.shape, X_test.shape


((5204, 63), (1302, 63))

In [7]:
#训练ElasticNet模型
from sklearn.linear_model import ElasticNetCV

# 标准化所有列：线性模型很需要这个
pre = ColumnTransformer(
    transformers=[("num", StandardScaler(), X_train.columns.tolist())],
    remainder="drop"
)

# 自动调参的 ElasticNet
enet = ElasticNetCV(
    l1_ratio=[0.1, 0.5, 0.9],           # L1/L2混合
    alphas=np.logspace(-2, 1, 15),      # 惩罚强度搜索
    cv=3,
    random_state=RANDOM_STATE,
    max_iter=5000
)

pipe = Pipeline([
    ("prep", pre),
    ("model", enet)
])

pipe.fit(X_train, y_train)


In [8]:
#评估
y_pred = pipe.predict(X_test)
metrics_enet = eval_report(y_test, y_pred)
metrics_enet




{'MAE': 0.5974260018121857,
 'RMSE': 0.9225989639137494,
 'R2': 0.5493302162248033,
 'Acc_rounded': 0.6105990783410138,
 'Within1': 0.8832565284178188}

In [9]:
#保存模型
artifact = {
    "model_type": "ElasticNet",
    "pipeline": pipe,               # 里面包含了标准化+模型
    "target_col": TARGET_COL,
    "feature_cols": X_train.columns.tolist(),
    "bin_cols": bin_cols,
    "con_cols": con_cols,
    "impute_stats": impute_stats,
}

joblib.dump(artifact, os.path.join(OUTDIR, "adl_adlab_c_elasticnet.joblib"))
pd.DataFrame([metrics_enet]).to_csv(os.path.join(OUTDIR, "results_metrics_elasticnet.csv"), index=False)

print("已保存到：", OUTDIR)


已保存到： models_enet


In [10]:
#推理
# 加载刚才保存的模型
artifact_loaded = joblib.load(os.path.join(OUTDIR, "adl_adlab_c_elasticnet.joblib"))
pipe_loaded = artifact_loaded["pipeline"]
feature_cols_loaded = artifact_loaded["feature_cols"]
impute_stats_loaded = artifact_loaded["impute_stats"]

# 用原始 df 做一下演示预测
X_new = df[feature_cols_loaded].copy()

# 先按训练时的统计填充
for c, v in impute_stats_loaded.get("binary_modes", {}).items():
    if c in X_new.columns:
        X_new[c] = X_new[c].fillna(v)
for c, v in impute_stats_loaded.get("continuous_medians", {}).items():
    if c in X_new.columns:
        X_new[c] = X_new[c].fillna(v)

y_new_pred = pipe_loaded.predict(X_new)
y_new_pred_rounded = np.clip(np.rint(y_new_pred), 0, 6).astype(int)

pd.DataFrame({
    "adl_true": df[TARGET_COL].values[:10],
    "adl_pred_enet": y_new_pred[:10],
    "adl_pred_enet_rounded": y_new_pred_rounded[:10]
})


Unnamed: 0,adl_true,adl_pred_enet,adl_pred_enet_rounded
0,1,0.398889,0
1,0,0.338269,0
2,0,0.234304,0
3,0,0.653125,1
4,0,1.81022,2
5,0,0.031907,0
6,2,0.177818,0
7,6,2.390642,2
8,0,0.642851,1
9,0,0.372566,0
