In [18]:
# Cell 1: Setup
# 中文注释：基础库与路径设置；字体、随机种子、输出目录
import os
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd

# 可视化
import matplotlib
import matplotlib.pyplot as plt
matplotlib.rcParams['font.family'] = 'Times New Roman'  # 全局 Times
matplotlib.rcParams['axes.unicode_minus'] = False

# 建模与评估
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error

from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.ensemble import ExtraTreesRegressor, RandomForestRegressor
from sklearn.neural_network import MLPRegressor

# XGBoost（如未安装，请先：pip install xgboost shap openpyxl）
from xgboost import XGBRegressor

# 缺失值插补
from sklearn.impute import KNNImputer

# 持久化
import joblib

# 路径
OUT_DIR = "outputs"
DES_DIR = os.path.join(OUT_DIR, "des")
FIG_DIR = os.path.join(OUT_DIR, "figs")
MODEL_DIR = os.path.join(OUT_DIR, "models")
LOG_DIR = os.path.join(OUT_DIR, "logs")

for d in [OUT_DIR, DES_DIR, FIG_DIR, MODEL_DIR, LOG_DIR]:
    os.makedirs(d, exist_ok=True)

SEED = 20251008
np.random.seed(SEED)


# 第一部分：数据读取和展示

In [19]:
# Cell 2: Load data
# 中文注释：载入 data.xlsx；自动识别目标变量与可能的字符串分类列
DATA_PATH = "data.xlsx"  

df_raw = pd.read_excel(DATA_PATH)

print("Shape:", df_raw.shape)
print("Columns (head):", list(df_raw.columns)[:20])

TARGET = "ai_dep_dai_total"
assert TARGET in df_raw.columns, f"Target '{TARGET}' not found!"

# 可能的字符串分类列（若存在就统计与画图）
CATEGORICAL_STR_COLS = [
    "school_type","gender", "grade_level", "major_cat",
    "hukou_type", "teacher_ai_guidance", 
]


Shape: (860, 50)
Columns (head): ['school_type', 'school_type_encode', 'gender', 'gender_encode', 'grade_level', 'grade_level_encode', 'major_cat', 'major_cat_encode', 'hukou_type', 'hukou_type_encode', 'age', 'parent_edu_f', 'parent_edu_m', 'family_income_quintile', 'ai_use_days_per_week', 'ai_daily_minutes', 'ai_tool_diversity', 'prompt_length_avg', 'iter_per_task', 'edit_ratio_self']


In [20]:
df_raw

Unnamed: 0,school_type,school_type_encode,gender,gender_encode,grade_level,grade_level_encode,major_cat,major_cat_encode,hukou_type,hukou_type_encode,...,peer_ai_norms,course_difficulty,ai_paid_sub,dai_item_1,dai_item_2,dai_item_3,dai_item_4,dai_item_5,dai_item_6,ai_dep_dai_total
0,专科,0,女,1,大四,4,文学类,10,农村,0,...,1,4,0,1,0,0,1,0,0,2
1,普通本科,1,男,0,大一,1,计算机类,0,农村,0,...,3,2,0,4,5,3,1,4,1,18
2,普通本科,1,女,1,大一,1,化学类,5,农村,0,...,1,1,0,3,3,4,1,1,5,17
3,普通本科,1,女,1,大一,1,化学类,5,农村,0,...,5,3,1,4,4,1,4,4,2,19
4,普通本科,1,女,1,大二,2,思想政治教育类,12,农村,0,...,3,3,0,5,1,3,0,1,0,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
855,专科,0,女,1,大四,4,自动化类,2,农村,0,...,5,4,0,2,5,1,2,0,1,11
856,双一流,2,男,0,大三,3,教育学类,15,农村,0,...,1,2,1,3,0,5,3,1,2,14
857,普通本科,1,女,1,大一,1,新闻传播学类,11,城镇,1,...,3,3,0,3,0,2,3,1,2,11
858,双一流,2,女,1,大二,2,计算机类,0,城镇,1,...,5,4,1,0,4,1,4,4,3,16


# 第二部分： 数据探索分析和描述

In [21]:

num_cols = [c for c in df_raw.columns if pd.api.types.is_numeric_dtype(df_raw[c])]
num_cols = [c for c in num_cols if c.lower() not in ["id", "valid_flag"]]  # 排除ID等
desc = pd.DataFrame({
    "mean": df_raw[num_cols].mean(),
    "std": df_raw[num_cols].std(),
    "min": df_raw[num_cols].min(),
    "median": df_raw[num_cols].median(),
    "max": df_raw[num_cols].max()
})
desc = desc.loc[:, ["mean", "std", "min", "median", "max"]]
desc_path = os.path.join(DES_DIR, "continuous_descriptive.xlsx")
desc.to_excel(desc_path, index=True)
print("Saved:", desc_path)


Saved: outputs\des\continuous_descriptive.xlsx


In [22]:

TRANSLATION_MAP = {
    "school_type": {
        "专科": "Vocational/Junior College",
        "普通本科": "Regular Undergraduate",
        "双一流": "Double First-Class"
    },
    "gender": {"男": "Male", "女": "Female"},
    "grade_level": {"大一": "Freshman", "大二": "Sophomore", "大三": "Junior", "大四": "Senior"},
    "major_cat": {
        "计算机类":"Computer Science",
        "电子信息类":"Electronics & Information",
        "自动化类":"Automation",
        "数学统计学类":"Mathematics & Statistics",
        "物理学类":"Physics",
        "化学类":"Chemistry",
        "生物科学类":"Biological Sciences",
        "土木类":"Civil Engineering",
        "管理类":"Management",
        "经济金融类":"Economics & Finance",
        "文学类":"Literature",
        "新闻传播学类":"Journalism & Communication",
        "思想政治教育类":"Ideological & Political Education",
        "体育学类":"Physical Education",
        "医学类":"Medicine",
        "教育学类":"Education",
        "心理学类":"Psychology",
        "法学类":"Law",
        "历史学类":"History",
        "艺术美术设计类":"Art & Design"
    },
    "hukou_type": {"城镇":"Urban", "农村":"Rural"},
    "teacher_ai_guidance": {
        "严格限制":"Strictly Restricted",
        "允许但需注明":"Allowed with Attribution",
        "鼓励规范使用":"Encouraged with Proper Use"
    },
    "peer_ai_norms": {"低":"Low", "中":"Medium", "高":"High"}
}

# 需要翻译并画饼图的列（仅当这些原始字符串列存在时才处理）
CATEGORICAL_STR_COLS_FOR_PIE = [
    "school_type","gender", "grade_level", "major_cat",
    "hukou_type", "teacher_ai_guidance", "peer_ai_norms"
]

def translate_series_to_en(series: pd.Series, colname: str) -> pd.Series:
    """将分类变量的中文取值翻译成英文；未知值原样保留；缺失标记为 Missing。"""
    mapping = TRANSLATION_MAP.get(colname, {})
    return series.fillna("Missing").map(lambda x: mapping.get(x, x))

def save_freq_table_and_pie_translated(series: pd.Series, name: str):
    # 先翻译
    s_en = translate_series_to_en(series.astype(str), name)
    # 频次与占比（英文标签）
    vc = s_en.value_counts(dropna=False)
    freq_df = pd.DataFrame({"count": vc, "ratio": vc / vc.sum()})
    fpath = os.path.join(DES_DIR, f"freq_{name}.xlsx")
    freq_df.to_excel(fpath)
    print("Saved:", fpath)

    # 饼图（全英文+Times）
    plt.figure(figsize=(6, 6), dpi=1200)
    labels = [str(x) for x in vc.index]
    wedges, texts, autotexts = plt.pie(
        x=vc.values,
        labels=labels,
        autopct=lambda p: f"{p:.1f}%",
        startangle=90,
        wedgeprops=dict(width=0.9, edgecolor='white')
    )
    plt.title(f"Distribution of {name}", fontsize=14)
    plt.legend(wedges, labels, title="Categories", loc="center left", bbox_to_anchor=(1.0, 0.5))
    fig_path = os.path.join(FIG_DIR, f"pie_{name}.png")
    plt.savefig(fig_path, bbox_inches="tight")
    plt.close()
    print("Saved:", fig_path)

# 执行：仅对存在的列进行输出
for col in CATEGORICAL_STR_COLS_FOR_PIE:
    if col in df_raw.columns:
        save_freq_table_and_pie_translated(df_raw[col], col)
    else:
        print(f"[Skip] '{col}' not found (string category).")



Saved: outputs\des\freq_school_type.xlsx
Saved: outputs\figs\pie_school_type.png
Saved: outputs\des\freq_gender.xlsx
Saved: outputs\figs\pie_gender.png
Saved: outputs\des\freq_grade_level.xlsx
Saved: outputs\figs\pie_grade_level.png
Saved: outputs\des\freq_major_cat.xlsx
Saved: outputs\figs\pie_major_cat.png
Saved: outputs\des\freq_hukou_type.xlsx
Saved: outputs\figs\pie_hukou_type.png
Saved: outputs\des\freq_teacher_ai_guidance.xlsx
Saved: outputs\figs\pie_teacher_ai_guidance.png
Saved: outputs\des\freq_peer_ai_norms.xlsx
Saved: outputs\figs\pie_peer_ai_norms.png


In [27]:
import matplotlib.pyplot as plt
from matplotlib import cm
import numpy as np
import os

# ------------ 调色：高对比、绚丽且唯一 ------------
def get_vivid_colors(n: int, for_major: bool = False):
    """
    for_major=True：返回 20 个互不重复、相邻高对比的颜色（tab20 交错取色：偶数索引 + 奇数索引）
    否则：n<=10 用 tab10；n>10 用 hsv 等分色环（强对比）
    """
    if for_major:
        # tab20 有 20 个色块（成对相近），为了提升相邻对比：先取偶数，再取奇数索引
        base = list(cm.get_cmap("tab20").colors)  # 20 tuples
        order = list(range(0, 20, 2)) + list(range(1, 20, 2))
        colors = [base[i] for i in order]
        return colors[:n]
    else:
        if n <= 10:
            return list(cm.get_cmap("tab10").colors)[:n]
        # 11+ 类别的备选（一般不会出现在本单元）：hsv 等分
        hsv = cm.get_cmap("hsv")
        return [hsv(i / n) for i in range(n)]

def plot_pie_no_labels(series_en: pd.Series, title: str, fig_path: str, legend_fontsize: int = 12):
    """绘制饼图：扇区仅百分比，无类别文字；类别在图例中；DPI=1200。"""
    vc = series_en.value_counts(dropna=False)
    labels = [str(x) for x in vc.index]
    n = len(labels)

    # 颜色
    if title == "major_cat":
        colors = get_vivid_colors(n, for_major=True)  # 20 个互不重复颜色
    else:
        colors = get_vivid_colors(n, for_major=False)

    # 绘图
    plt.figure(figsize=(7, 6), dpi=1200)
    wedges, _texts, autotexts = plt.pie(
        x=vc.values,
        labels=None,                      # 不在扇区边写类别
        colors=colors,
        autopct=lambda p: f"{p:.1f}%",    # 仅显示百分比
        pctdistance=0.7,
        startangle=90,
        wedgeprops=dict(width=0.9, edgecolor='white', linewidth=0.8),
        textprops={"fontsize": 6}   # ← 比例数字
    )


    # 图例放类别，字体更大
    plt.legend(
        wedges, labels,
        title="Categories",
        loc="center left", bbox_to_anchor=(1.02, 0.5),
        fontsize=legend_fontsize, title_fontsize=legend_fontsize
    )
    plt.tight_layout()
    plt.savefig(fig_path, bbox_inches="tight")
    plt.close()
    print("Saved:", fig_path)

TARGET_PIE_COLS = ["gender", "grade_level", "major_cat", "school_type", "hukou_type"]
# ------------ 执行：对指定列输出新的饼图 ------------
for col in TARGET_PIE_COLS:
    if col not in df_raw.columns:
        print(f"[Skip] '{col}' not found.")
        continue
    s_en = translate_series_to_en(df_raw[col].astype(str), col)
    out_path = os.path.join(FIG_DIR, f"pie_vivid_{col}.png")
    plot_pie_no_labels(s_en, col, out_path, legend_fontsize=16)

Saved: outputs\figs\pie_vivid_gender.png
Saved: outputs\figs\pie_vivid_grade_level.png
Saved: outputs\figs\pie_vivid_major_cat.png
Saved: outputs\figs\pie_vivid_school_type.png
Saved: outputs\figs\pie_vivid_hukou_type.png


In [28]:


total_cells = df_raw.shape[0] * df_raw.shape[1]
missing_cells = int(df_raw.isna().sum().sum())
overall_missing_ratio = missing_cells / total_cells if total_cells > 0 else 0.0

miss_overall_df = pd.DataFrame({
    "total_rows": [df_raw.shape[0]],
    "total_cols": [df_raw.shape[1]],
    "total_cells": [total_cells],
    "missing_cells": [missing_cells],
    "overall_missing_ratio": [overall_missing_ratio]
})

miss_path = os.path.join(DES_DIR, "missing_report_overall.xlsx")
miss_overall_df.to_excel(miss_path, index=False)
print("Saved:", miss_path)
miss_overall_df



Saved: outputs\des\missing_report_overall.xlsx


Unnamed: 0,total_rows,total_cols,total_cells,missing_cells,overall_missing_ratio
0,860,50,43000,0,0.0


In [29]:
df_raw

Unnamed: 0,school_type,school_type_encode,gender,gender_encode,grade_level,grade_level_encode,major_cat,major_cat_encode,hukou_type,hukou_type_encode,...,peer_ai_norms,course_difficulty,ai_paid_sub,dai_item_1,dai_item_2,dai_item_3,dai_item_4,dai_item_5,dai_item_6,ai_dep_dai_total
0,专科,0,女,1,大四,4,文学类,10,农村,0,...,1,4,0,1,0,0,1,0,0,2
1,普通本科,1,男,0,大一,1,计算机类,0,农村,0,...,3,2,0,4,5,3,1,4,1,18
2,普通本科,1,女,1,大一,1,化学类,5,农村,0,...,1,1,0,3,3,4,1,1,5,17
3,普通本科,1,女,1,大一,1,化学类,5,农村,0,...,5,3,1,4,4,1,4,4,2,19
4,普通本科,1,女,1,大二,2,思想政治教育类,12,农村,0,...,3,3,0,5,1,3,0,1,0,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
855,专科,0,女,1,大四,4,自动化类,2,农村,0,...,5,4,0,2,5,1,2,0,1,11
856,双一流,2,男,0,大三,3,教育学类,15,农村,0,...,1,2,1,3,0,5,3,1,2,14
857,普通本科,1,女,1,大一,1,新闻传播学类,11,城镇,1,...,3,3,0,3,0,2,3,1,2,11
858,双一流,2,女,1,大二,2,计算机类,0,城镇,1,...,5,4,1,0,4,1,4,4,3,16


# 第三部分：机器学习模型建模与评估模块

In [30]:
df_use  = df_raw.drop(['school_type','gender','grade_level','major_cat',"hukou_type","teacher_ai_guidance"],axis=1)

In [31]:
df_use 

Unnamed: 0,school_type_encode,gender_encode,grade_level_encode,major_cat_encode,hukou_type_encode,age,parent_edu_f,parent_edu_m,family_income_quintile,ai_use_days_per_week,...,peer_ai_norms,course_difficulty,ai_paid_sub,dai_item_1,dai_item_2,dai_item_3,dai_item_4,dai_item_5,dai_item_6,ai_dep_dai_total
0,0,1,4,10,0,21,11,11,4,4,...,1,4,0,1,0,0,1,0,0,2
1,1,0,1,0,0,21,13,9,1,3,...,3,2,0,4,5,3,1,4,1,18
2,1,1,1,5,0,20,10,10,3,5,...,1,1,0,3,3,4,1,1,5,17
3,1,1,1,5,0,20,12,9,5,6,...,5,3,1,4,4,1,4,4,2,19
4,1,1,2,12,0,22,12,13,3,7,...,3,3,0,5,1,3,0,1,0,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
855,0,1,4,2,0,19,7,10,5,7,...,5,4,0,2,5,1,2,0,1,11
856,2,0,3,15,0,19,11,11,3,7,...,1,2,1,3,0,5,3,1,2,14
857,1,1,1,11,1,20,7,11,3,4,...,3,3,0,3,0,2,3,1,2,11
858,2,1,2,0,1,20,16,13,1,3,...,5,4,1,0,4,1,4,4,3,16


In [38]:


X = df_use.drop(columns=[TARGET], axis=1).copy()
X = X.drop(columns=X.columns[-6:], axis=1)
y = df_use[TARGET].copy()

# 80/20 划分
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.20, random_state=SEED
)
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)


(688, 37) (172, 37) (688,) (172,)


In [39]:
X_train

Unnamed: 0,school_type_encode,gender_encode,grade_level_encode,major_cat_encode,hukou_type_encode,age,parent_edu_f,parent_edu_m,family_income_quintile,ai_use_days_per_week,...,ease_of_use,trust_in_ai,ai_lit_knowledge,ai_lit_skill,ai_lit_ethics,ai_literacy_index,teacher_ai_guidance_encode,peer_ai_norms,course_difficulty,ai_paid_sub
442,1,0,1,9,1,21,15,13,3,6,...,4,4,51.6,74.2,63.0,62.9,1,3,4,1
193,1,0,3,19,1,21,16,15,3,3,...,4,4,73.6,69.2,66.6,70.4,2,5,5,0
397,2,1,2,0,0,20,8,13,2,5,...,4,2,82.0,90.6,84.1,85.9,0,5,3,0
348,1,0,3,2,0,23,11,13,2,4,...,2,4,69.5,79.8,59.8,71.7,0,3,3,0
509,1,1,2,18,1,21,12,14,4,5,...,4,4,62.1,67.2,53.4,62.4,1,3,3,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99,1,0,1,10,1,19,11,10,5,0,...,4,3,78.8,77.7,47.1,72.0,0,5,4,0
777,1,1,1,16,1,20,11,15,2,4,...,4,4,62.4,47.7,61.2,56.3,1,1,5,1
221,2,0,4,17,1,18,17,11,2,6,...,4,3,81.0,85.8,67.9,80.3,0,1,3,0
638,2,1,2,12,1,19,13,13,3,4,...,3,4,68.5,60.9,63.6,64.5,0,5,4,0


In [40]:
# Cell 8: Helper functions for metrics, AIC/BIC proxy, and robustness
# 中文注释：统一评估；AIC/BIC为非概率模型的近似（基于RSS与“参数规模”近似k）

def count_params_proxy(model, X_fit=None):
    """近似参数规模k：不同模型采用可用的代理量"""
    try:
        # 线性
        if hasattr(model, "coef_"):
            k = int(np.count_nonzero(model.coef_) + (1 if getattr(model, "fit_intercept", True) else 0))
            return max(k, 1)
        # SVR: 支持向量数量
        if hasattr(model, "support_"):
            return max(len(model.support_), 1)
        # KNN: 取特征数作为自由度近似
        if model.__class__.__name__.lower().startswith("kneighbors"):
            return max(getattr(model, "n_features_in_", X_fit.shape[1] if X_fit is not None else 1), 1)
        # 树模型（RF/ET）：节点总数近似
        if hasattr(model, "estimators_"):
            k = 0
            for est in model.estimators_:
                if hasattr(est, "tree_"):
                    k += est.tree_.node_count
            return max(k, 1)
        # XGB: 近似 n_estimators * max_depth
        if model.__class__.__name__ == "XGBRegressor":
            return max(int(model.n_estimators * model.max_depth), 1)
        # MLP：权重矩阵参数量
        if hasattr(model, "coefs_"):
            k = sum(w.size for w in model.coefs_) + sum(b.size for b in model.intercepts_)
            return max(int(k), 1)
    except Exception:
        pass
    # 回退：特征数
    return max(getattr(model, "n_features_in_", X_fit.shape[1] if X_fit is not None else 1), 1)

def regression_report(y_true, y_pred, model=None, X_fit=None, model_name=""):
    n = len(y_true)
    rss = np.sum((y_true - y_pred) ** 2)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae  = mean_absolute_error(y_true, y_pred)
    r2   = r2_score(y_true, y_pred)
    k = count_params_proxy(model, X_fit)
    # 高斯误差AIC/BIC近似： n*ln(RSS/n) + 2k / +k ln(n)
    aic = n * np.log(rss / n + 1e-12) + 2 * k
    bic = n * np.log(rss / n + 1e-12) + k * np.log(n)
    return {
        "model": model_name, "R2": r2, "RMSE": rmse, "MAE": mae,
        "AIC": aic, "BIC": bic, "k_params_proxy": k, "n_test": n
    }


In [41]:
# Cell 9: Grids and training function
# 中文注释：定义模型与网格；每个模型做GridSearchCV；重复多次更换random_state进行鲁棒性测试
from collections import defaultdict

def get_models_and_grids():
    models = {}

    # OLS(LinearRegression)：仅标准化做在模型前
    models["OLS"] = (
        Pipeline([
            ("scaler", StandardScaler(with_mean=True, with_std=True)),
            ("lr", LinearRegression())
        ]),
        {"lr__fit_intercept": [True, False]}
    )

    models["SVR"] = (
        Pipeline([
            ("scaler", StandardScaler()),
            ("svr", SVR())
        ]),
        {
            "svr__kernel": ["rbf"],
            "svr__C": [1, 5, 10, 20, 50],
            "svr__gamma": ["scale", 0.1, 0.01, 0.001],
            "svr__epsilon": [0.1, 0.2, 0.3]
        }
    )

    models["KNN"] = (
        Pipeline([
            ("scaler", StandardScaler()),
            ("knn", KNeighborsRegressor())
        ]),
        {
            "knn__n_neighbors": [3, 5, 7, 9, 15],
            "knn__weights": ["uniform", "distance"],
            "knn__p": [1, 2]
        }
    )

    models["ExtraTrees"] = (
        ExtraTreesRegressor(random_state=SEED, n_jobs=-1),
        {
            "n_estimators": [200, 400, 800],
            "max_depth": [None, 10, 20, 30],
            "min_samples_split": [2, 5, 10],
            "min_samples_leaf": [1, 2, 4]
        }
    )

    models["RandomForest"] = (
        RandomForestRegressor(random_state=SEED, n_jobs=-1),
        {
            "n_estimators": [300, 500, 800, 1000],
            "max_depth": [None, 10, 20, 30],
            "min_samples_split": [2, 5, 10],
            "min_samples_leaf": [1, 2, 4],
            "max_features": ["auto", "sqrt", 0.5]
        }
    )

    models["XGB"] = (
        XGBRegressor(
            objective="reg:squarederror", random_state=SEED, n_jobs=-1, tree_method="hist"
        ),
        {
            "n_estimators": [400, 800, 1200],
            "max_depth": [3, 5, 7, 9],
            "learning_rate": [0.03, 0.05, 0.1],
            "subsample": [0.7, 0.9, 1.0],
            "colsample_bytree": [0.6, 0.8, 1.0],
            "reg_lambda": [1.0, 3.0, 5.0]
        }
    )

    models["ANN"] = (
        Pipeline([
            ("scaler", StandardScaler()),
            ("mlp", MLPRegressor(max_iter=600, random_state=SEED))
        ]),
        {
            "mlp__hidden_layer_sizes": [(64,), (128,), (64, 32), (128, 64)],
            "mlp__alpha": [1e-5, 1e-4, 1e-3],
            "mlp__learning_rate_init": [1e-3, 5e-4, 1e-4],
            "mlp__activation": ["relu", "tanh"]
        }
    )
    return models

def train_and_evaluate_all(X_train, y_train, X_test, y_test, n_repeats=5):
    models = get_models_and_grids()
    all_results = []
    best_models = {}
    search_logs = {}

    cv = KFold(n_splits=5, shuffle=True, random_state=SEED)

    for name, (estimator, grid) in models.items():
        print(f"\n=== {name} ===")
        # 多次重复以测试鲁棒性（变更 cv 随机种子）
        repeat_scores = []
        best_est_global = None
        best_score_global = -np.inf
        best_params_global = None

        for rep in range(n_repeats):
            cv_rep = KFold(n_splits=5, shuffle=True, random_state=SEED + rep)
            gs = GridSearchCV(
                estimator, grid, scoring="r2", cv=cv_rep, n_jobs=-1, verbose=1
            )
            gs.fit(X_train, y_train)
            # 记录
            repeat_scores.append(gs.best_score_)
            if gs.best_score_ > best_score_global:
                best_score_global = gs.best_score_
                best_est_global = gs.best_estimator_
                best_params_global = gs.best_params_

        # 测试集评估
        y_pred = best_est_global.predict(X_test)
        rep = regression_report(y_test, y_pred, model=best_est_global, X_fit=X_train, model_name=name)
        rep.update({
            "cv_best_mean_r2": float(np.mean(repeat_scores)),
            "cv_best_std_r2": float(np.std(repeat_scores)),
            "best_params": str(best_params_global)
        })
        all_results.append(rep)

        # 保存最优模型
        model_path = os.path.join(MODEL_DIR, f"best_{name}.pkl")
        joblib.dump(best_est_global, model_path)
        best_models[name] = model_path

        # 保存网格搜索日志（最后一次gs）
        log_path = os.path.join(LOG_DIR, f"gridcv_{name}.xlsx")
        pd.DataFrame(gs.cv_results_).to_excel(log_path, index=False)
        search_logs[name] = log_path

        print(f"[{name}] test R2={rep['R2']:.4f}  RMSE={rep['RMSE']:.4f}  Saved model→ {model_path}")

    results_df = pd.DataFrame(all_results).sort_values("R2", ascending=False)
    res_path = os.path.join(LOG_DIR, "model_eval_summary.xlsx")
    results_df.to_excel(res_path, index=False)
    print("Saved summary:", res_path)
    return results_df, best_models, search_logs

results_df, best_models, search_logs = train_and_evaluate_all(X_train, y_train, X_test, y_test, n_repeats=5)
results_df



=== OLS ===
Fitting 5 folds for each of 2 candidates, totalling 10 fits
Fitting 5 folds for each of 2 candidates, totalling 10 fits
Fitting 5 folds for each of 2 candidates, totalling 10 fits
Fitting 5 folds for each of 2 candidates, totalling 10 fits
Fitting 5 folds for each of 2 candidates, totalling 10 fits
[OLS] test R2=0.6244  RMSE=4.0810  Saved model→ outputs\models\best_OLS.pkl

=== SVR ===
Fitting 5 folds for each of 60 candidates, totalling 300 fits
Fitting 5 folds for each of 60 candidates, totalling 300 fits
Fitting 5 folds for each of 60 candidates, totalling 300 fits
Fitting 5 folds for each of 60 candidates, totalling 300 fits
Fitting 5 folds for each of 60 candidates, totalling 300 fits
[SVR] test R2=0.6377  RMSE=4.0084  Saved model→ outputs\models\best_SVR.pkl

=== KNN ===
Fitting 5 folds for each of 20 candidates, totalling 100 fits
Fitting 5 folds for each of 20 candidates, totalling 100 fits
Fitting 5 folds for each of 20 candidates, totalling 100 fits
Fitting 5 fol

Unnamed: 0,model,R2,RMSE,MAE,AIC,BIC,k_params_proxy,n_test,cv_best_mean_r2,cv_best_std_r2,best_params
1,SVR,0.637672,4.008439,3.217699,551.610208,668.067504,37,172,0.549614,0.010386,"{'svr__C': 10, 'svr__epsilon': 0.3, 'svr__gamm..."
6,ANN,0.636903,4.012689,3.151889,551.974764,668.432059,37,172,0.528048,0.012277,"{'mlp__activation': 'tanh', 'mlp__alpha': 0.00..."
0,OLS,0.624444,4.080957,3.345741,557.778002,674.235298,37,172,0.550725,0.008616,{'lr__fit_intercept': True}
5,XGB,0.618638,4.112376,3.439971,2886.416384,6663.409756,1200,172,0.51278,0.011686,"{'colsample_bytree': 0.6, 'learning_rate': 0.0..."
3,ExtraTrees,0.577882,4.326544,3.573891,320851.880563,824998.660892,160174,172,0.448068,0.006635,"{'max_depth': 10, 'min_samples_leaf': 2, 'min_..."
4,RandomForest,0.546408,4.484946,3.692945,180772.249892,464449.632098,90128,172,0.425941,0.005789,"{'max_depth': 20, 'max_features': 0.5, 'min_sa..."
2,KNN,0.40985,5.115705,4.077325,635.516403,751.973698,37,172,0.314632,0.008117,"{'knn__n_neighbors': 15, 'knn__p': 2, 'knn__we..."


# 机器学习性能比较可视化

In [50]:
# # Cell 10: Bar plots of metrics across models
# # 中文注释：绘制每个模型的R2、RMSE、MAE、AIC、BIC条形对比图；英文、Times、DPI=1200
# plt_metrics = ["R2", "RMSE", "MAE", "AIC", "BIC"]
# models_order = results_df["model"].tolist()

# for m in plt_metrics:
#     plt.figure(figsize=(8, 5), dpi=1200)
#     vals = results_df.set_index("model").loc[models_order, m]
#     ax = vals.plot(kind="bar", color="#1f77b4", edgecolor="black")
#     plt.title(f"Model Comparison: {m}", fontsize=14)
#     plt.ylabel(m, fontsize=12)
#     plt.xlabel("Model", fontsize=12)
#     plt.grid(axis="y", linestyle="--", alpha=0.5)
#     plt.xticks(rotation=45, ha="right")
#     plt.tight_layout()
#     fig_path = os.path.join(FIG_DIR, f"compare_{m}.png")
#     plt.savefig(fig_path, bbox_inches="tight")
#     plt.close()
#     print("Saved:", fig_path)


plt_metrics = ["R2", "RMSE", "MAE", "AIC", "BIC"]

# 新增：不参与可视化的模型
exclude_models = {"SVR", "ANN", "OLS"}

# 过滤顺序，保留原始顺序但去掉指定模型
models_order = [m for m in results_df["model"].tolist() if m not in exclude_models]

for m in plt_metrics:
    # 若过滤后没有可画的模型，跳过该指标
    if len(models_order) == 0:
        print(f"[Skip] No models left to plot for metric: {m}")
        continue

    plt.figure(figsize=(8, 5), dpi=1200)
    vals = results_df.set_index("model").loc[models_order, m]
    ax = vals.plot(kind="bar", color="#1f77b4", edgecolor="black")
    plt.title(f"Model Comparison: {m}", fontsize=14)
    plt.ylabel(m, fontsize=12)
    plt.xlabel("Model", fontsize=12)
    plt.grid(axis="y", linestyle="--", alpha=0.5)
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()
    fig_path = os.path.join(FIG_DIR, f"compare_{m}.png")
    plt.savefig(fig_path, bbox_inches="tight")
    plt.close()
    print("Saved:", fig_path)


Saved: outputs\figs\compare_R2.png
Saved: outputs\figs\compare_RMSE.png
Saved: outputs\figs\compare_MAE.png
Saved: outputs\figs\compare_AIC.png
Saved: outputs\figs\compare_BIC.png


In [51]:
# # Cell 11: Prediction vs. Actual scatter for each best model (with CI and line)
# # 中文注释：仿照你给的范例，但全英文标签并设置Times与高DPI
# from sklearn.linear_model import LinearRegression

# def plot_pred_vs_actual(model, model_name, X_test, y_test, fig_path):
#     y_pred = model.predict(X_test)
#     y_test = np.asarray(y_test)

#     mse = mean_squared_error(y_test, y_pred)
#     mae = mean_absolute_error(y_test, y_pred)
#     rmse = np.sqrt(mse)
#     r2  = r2_score(y_test, y_pred)

#     plt.figure(figsize=(8, 7), dpi=1200)
#     scatter = plt.scatter(y_test, y_pred, c=y_test, cmap='viridis', alpha=0.7, edgecolors='k')
#     plt.colorbar(scatter, label='Observed')

#     # 对角线
#     mn, mx = min(y_test.min(), y_pred.min()), max(y_test.max(), y_pred.max())
#     plt.plot([mn, mx], [mn, mx], 'purple', linestyle='--', label='Predicted = Observed')

#     # 线性拟合
#     reg = LinearRegression().fit(y_test.reshape(-1, 1), y_pred)
#     y_lin_pred = reg.predict(y_test.reshape(-1, 1))
#     plt.plot(y_test, y_lin_pred, color='red', linestyle='-', label='Fitted line')

#     # 95% CI
#     se = np.sqrt(np.sum((y_pred - y_lin_pred) ** 2) / (len(y_test) - 2))
#     t_val = 1.96
#     ci = t_val * se * np.sqrt(1/len(y_test) + (y_test - np.mean(y_test))**2 / np.sum((y_test - np.mean(y_test))**2))
#     order = np.argsort(y_test)
#     plt.fill_between(y_test[order], y_lin_pred[order] - ci[order], y_lin_pred[order] + ci[order],
#                      color='pink', alpha=0.4, label='95% Confidence Interval')

#     # 文字框（英文）
#     txt = f"$R^2$={r2:.3f}\nRMSE={rmse:.3f}\nMAE={mae:.3f}"
#     plt.text(0.02, 0.98, txt, transform=plt.gca().transAxes, va='top', ha='left',
#              fontsize=12, bbox=dict(facecolor='white', edgecolor='gray', alpha=0.8))

#     plt.title(f"{model_name}: Predicted vs Observed", fontsize=15)
#     plt.xlabel("Observed", fontsize=12)
#     plt.ylabel("Predicted", fontsize=12)
#     plt.legend()
#     plt.grid(True, linestyle='--', alpha=0.5)

#     plt.tight_layout()
#     plt.savefig(fig_path, bbox_inches="tight")
#     plt.close()

# # 对所有最佳模型出图
# for name, path in best_models.items():
#     est = joblib.load(path)
#     fig_path = os.path.join(FIG_DIR, f"scatter_{name}.png")
#     plot_pred_vs_actual(est, name, X_test, y_test, fig_path)
#     print("Saved:", fig_path)



# Cell 11: Prediction vs. Actual scatter for each best model (with CI and line)
# 中文注释：在注释框中增加 AIC、BIC、k_params_proxy；英文、Times、DPI=1200
from sklearn.linear_model import LinearRegression
from matplotlib import rcParams
rcParams["font.family"] = "Times New Roman"

def plot_pred_vs_actual(model, model_name, X_test, y_test, fig_path, k_params_proxy=None):
    y_pred = model.predict(X_test)
    y_test = np.asarray(y_test)

    # 基础指标
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2  = r2_score(y_test, y_pred)

    # ===== 新增：AIC / BIC / k_params_proxy =====
    # 以高斯残差为假设：loglik = -n/2 * [log(2πσ^2) + 1]，σ^2=RSS/n
    n   = len(y_test)
    rss = np.sum((y_test - y_pred) ** 2)
    sigma2 = rss / n if n > 0 else 1.0
    loglik = -0.5 * n * (np.log(2 * np.pi * sigma2) + 1.0)

    # k 参数的“代理”：若未传入，则用模型的 n_features_in_（或 X_test 列数）作为近似
    if k_params_proxy is None:
        k_params_proxy = int(getattr(model, "n_features_in_", X_test.shape[1]))

    AIC = 2 * k_params_proxy - 2 * loglik
    BIC = np.log(n) * k_params_proxy - 2 * loglik
    # ============================================

    plt.figure(figsize=(8, 7), dpi=1200)
    scatter = plt.scatter(y_test, y_pred, c=y_test, cmap='viridis', alpha=0.7, edgecolors='k')
    plt.colorbar(scatter, label='Observed')

    # 对角线
    mn, mx = min(y_test.min(), y_pred.min()), max(y_test.max(), y_pred.max())
    plt.plot([mn, mx], [mn, mx], 'purple', linestyle='--', label='Predicted = Observed')

    # 线性拟合
    reg = LinearRegression().fit(y_test.reshape(-1, 1), y_pred)
    y_lin_pred = reg.predict(y_test.reshape(-1, 1))
    plt.plot(y_test, y_lin_pred, color='red', linestyle='-', label='Fitted line')

    # 95% CI
    se = np.sqrt(np.sum((y_pred - y_lin_pred) ** 2) / (len(y_test) - 2))
    t_val = 1.96
    ci = t_val * se * np.sqrt(1/len(y_test) + (y_test - np.mean(y_test))**2 / np.sum((y_test - np.mean(y_test))**2))
    order = np.argsort(y_test)
    plt.fill_between(y_test[order], y_lin_pred[order] - ci[order], y_lin_pred[order] + ci[order],
                     color='pink', alpha=0.4, label='95% Confidence Interval')

    # 文字框（英文）—— 增加 AIC / BIC / k
    txt = (
        f"$R^2$={r2:.3f}\n"
        f"RMSE={rmse:.3f}\n"
        f"MAE={mae:.3f}\n"
        f"AIC={AIC:.1f}\n"
        f"BIC={BIC:.1f}\n"
        f"k_params={k_params_proxy}"
    )
    plt.text(0.02, 0.98, txt, transform=plt.gca().transAxes, va='top', ha='left',
             fontsize=12, bbox=dict(facecolor='white', edgecolor='gray', alpha=0.85))

    plt.title(f"{model_name}: Predicted vs Observed", fontsize=15)
    plt.xlabel("Observed", fontsize=12)
    plt.ylabel("Predicted", fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.savefig(fig_path, bbox_inches="tight")
    plt.close()

# 若你在 results_df 里已经算过 k 的代理（列名假设为 k_params_proxy），可以传入使用更精确的值：
# 例如：
# k_map = dict(zip(results_df["model"], results_df["k_params_proxy"]))

# 对所有最佳模型出图（有 k_map 就传，没有就不传）
# 例：k_map = dict(zip(results_df["model"], results_df["k_params_proxy"]))  # 如无该列可注释掉
k_map = {m: int(k) for m, k in zip(results_df["model"], results_df.get("k_params_proxy", [None]*len(results_df)))}

for name, path in best_models.items():
    est = joblib.load(path)
    fig_path = os.path.join(FIG_DIR, f"scatter_{name}.png")
    plot_pred_vs_actual(est, name, X_test, y_test, fig_path, k_params_proxy=k_map.get(name))
    print("Saved:", fig_path)


Saved: outputs\figs\scatter_OLS.png
Saved: outputs\figs\scatter_SVR.png
Saved: outputs\figs\scatter_KNN.png
Saved: outputs\figs\scatter_ExtraTrees.png
Saved: outputs\figs\scatter_RandomForest.png
Saved: outputs\figs\scatter_XGB.png
Saved: outputs\figs\scatter_ANN.png


# 基于SHAP的可解释分析

In [None]:

import os
import numpy as np
import pandas as pd
import joblib
import shap
import matplotlib.pyplot as plt
from matplotlib import rcParams

# 统一英文字体与高分辨率
rcParams["font.family"] = "Times New Roman"
rcParams["figure.dpi"] = 1200


FIG_DIR = "./outputs/shap"; DES_DIR = "./outputs/shap"; os.makedirs(FIG_DIR, exist_ok=True); os.makedirs(DES_DIR, exist_ok=True)

# 从你之前保存的 best_models 中抓取 XGB 模型
# 例如 best_models = {"XGB": "models/best_XGB.pkl", ...}
xgb_key_candidates = [k for k in best_models.keys() if "xgb" in k.lower() or "xgboost" in k.lower() or "xgbregressor" in k.lower()]
assert len(xgb_key_candidates) >= 1, "best_models 里未找到 XGB 模型键名，请检查。"
XGB_NAME = xgb_key_candidates[0]
XGB_PATH = best_models[XGB_NAME]
xgb_est = joblib.load(XGB_PATH)

# 保证特征名可用
if hasattr(X_test, "columns"):
    FEATURE_COLS = list(X_test.columns)
else:
    FEATURE_COLS = [f"feat_{i}" for i in range(X_test.shape[1])]

# 强制 y_test 为 ndarray，便于后续索引
y_test_arr = np.asarray(y_test)

print(f"[OK] Loaded XGB model: {XGB_NAME} @ {XGB_PATH}")
print("X_test shape:", X_test.shape, "y_test shape:", y_test_arr.shape)



[OK] Loaded XGB model: XGB @ outputs\models\best_XGB.pkl
X_test shape: (172, 37) y_test shape: (172,)


In [61]:

# TreeExplainer（XGB 原生支持）
explainer = shap.TreeExplainer(xgb_est)
# 新版 shap 推荐 explainer(X) 得到 Explanation；旧版返回 ndarray。两种都兼容：
shap_expl = explainer(X_test)

# 统一拿 values / base values / data
if isinstance(shap_expl, shap.Explanation):
    shap_values = shap_expl.values            # (n, p)
    base_values = shap_expl.base_values       # (n,)
    data_used   = shap_expl.data              # (n, p)
else:
    shap_values = shap_expl                   # ndarray
    base_values = np.repeat(getattr(explainer, "expected_value", 0.0), X_test.shape[0])
    data_used = X_test

# === 导出全局特征重要性（|SHAP| 均值）到 Excel ===
abs_mean_importance = np.mean(np.abs(shap_values), axis=0)
imp_df = pd.DataFrame({
    "feature": FEATURE_COLS,
    "mean_abs_shap": abs_mean_importance
}).sort_values("mean_abs_shap", ascending=False).reset_index(drop=True)

imp_path = os.path.join(DES_DIR, "shap_global_importance_xgb.xlsx")
imp_df.to_excel(imp_path, index=False)
print("Saved:", imp_path)

# === 全局图 1：summary beeswarm ===
plt.figure(figsize=(8, 6))
shap.summary_plot(shap_values, data_used, feature_names=FEATURE_COLS, show=False, plot_size=None, max_display=30)
plt.title("SHAP Summary (Beeswarm) - XGB", fontsize=14)
plt.tight_layout()
fig_path = os.path.join(FIG_DIR, "shap_summary_beeswarm_xgb.png")
plt.savefig(fig_path, bbox_inches="tight", dpi=1200)
plt.close()
print("Saved:", fig_path)

# === 全局图 2：summary bar（平均 |SHAP|）===
plt.figure(figsize=(8, 6))
shap.summary_plot(shap_values, data_used, feature_names=FEATURE_COLS, plot_type="bar", show=False, plot_size=None, max_display=30)
plt.title("SHAP Summary (Bar) - XGB", fontsize=14)
plt.tight_layout()
fig_path = os.path.join(FIG_DIR, "shap_summary_bar_xgb.png")
plt.savefig(fig_path, bbox_inches="tight", dpi=1200)
plt.close()
print("Saved:", fig_path)


Saved: ./outputs/shap\shap_global_importance_xgb.xlsx
Saved: ./outputs/shap\shap_summary_beeswarm_xgb.png
Saved: ./outputs/shap\shap_summary_bar_xgb.png


In [62]:

# 1) 选择前若干重要特征，画带交互的 dependence plot
topK = 8  # 你可改为 10 或其他
top_feats = imp_df["feature"].head(topK).tolist()

for feat in top_feats:
    # interaction_index="auto" 让 shap 自动选择与当前特征交互最强的另一个特征来着色
    plt.figure(figsize=(7, 5))
    shap.dependence_plot(feat, shap_values, data_used, feature_names=FEATURE_COLS,
                         interaction_index="auto", show=False)
    plt.title(f"SHAP Dependence with Interaction — {feat}", fontsize=13)
    plt.tight_layout()
    outp = os.path.join(FIG_DIR, f"shap_dependence_interaction_{feat}.png")
    plt.savefig(outp, bbox_inches="tight", dpi=1200)
    plt.close()
    print("Saved:", outp)

# 2) 交互值矩阵（仅树模型支持 shap_interaction_values）
try:
    inter_vals = explainer.shap_interaction_values(X_test)   # (n, p, p)
    # 平均绝对交互强度
    mean_abs_inter = np.mean(np.abs(inter_vals), axis=0)     # (p, p)
    inter_df = pd.DataFrame(mean_abs_inter, index=FEATURE_COLS, columns=FEATURE_COLS)
    inter_path = os.path.join(DES_DIR, "shap_interaction_matrix_xgb.xlsx")
    inter_df.to_excel(inter_path)
    print("Saved:", inter_path)

    # 热力图（用 matplotlib 绘制）
    import matplotlib.pyplot as plt
    plt.figure(figsize=(10, 8), dpi=1200)
    plt.imshow(mean_abs_inter, cmap="viridis")
    plt.title("Mean |SHAP Interaction| Heatmap - XGB", fontsize=14)
    plt.colorbar(label="Mean |Interaction|")
    plt.xticks(ticks=np.arange(len(FEATURE_COLS)), labels=FEATURE_COLS, rotation=90)
    plt.yticks(ticks=np.arange(len(FEATURE_COLS)), labels=FEATURE_COLS)
    plt.tight_layout()
    heat_path = os.path.join(FIG_DIR, "shap_interaction_heatmap_xgb.png")
    plt.savefig(heat_path, bbox_inches="tight")
    plt.close()
    print("Saved:", heat_path)
except Exception as e:
    print("[Warn] shap_interaction_values not available:", e)


Saved: ./outputs/shap\shap_dependence_interaction_peer_ai_norms.png
Saved: ./outputs/shap\shap_dependence_interaction_ai_daily_minutes.png
Saved: ./outputs/shap\shap_dependence_interaction_academic_stress.png
Saved: ./outputs/shap\shap_dependence_interaction_trust_in_ai.png
Saved: ./outputs/shap\shap_dependence_interaction_grade_level_encode.png
Saved: ./outputs/shap\shap_dependence_interaction_ucla_loneliness.png
Saved: ./outputs/shap\shap_dependence_interaction_acad_self_efficacy.png
Saved: ./outputs/shap\shap_dependence_interaction_school_type_encode.png
Saved: ./outputs/shap\shap_interaction_matrix_xgb.xlsx
Saved: ./outputs/shap\shap_interaction_heatmap_xgb.png


<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

<Figure size 8400x6000 with 0 Axes>

In [63]:
# === Cell 4: Local explanations for min-Y and max-Y samples (robust to pandas/numpy) ===

# 判断是否都是 pandas 对象（有 index / iloc）
use_pandas = hasattr(y_test, "index") and hasattr(X_test, "iloc")

if use_pandas:
    # pandas 分支：idxmin/idxmax 返回 “标签索引”
    idx_min_label = y_test.idxmin()
    idx_max_label = y_test.idxmax()
    # 取对应一行（注意 loc 要用双中括号以保持二维）
    x_min = X_test.loc[[idx_min_label]]
    x_max = X_test.loc[[idx_max_label]]
    # 真实 y 值用 label 取，避免把标签当位置
    y_min_val = float(y_test.loc[idx_min_label])
    y_max_val = float(y_test.loc[idx_max_label])
    # 仅用于打印展示
    print(f"Min-Y sample index(label): {idx_min_label}, value: {y_min_val}")
    print(f"Max-Y sample index(label): {idx_max_label}, value: {y_max_val}")

    # 计算这两点的 SHAP
    exp_min = explainer(x_min)
    exp_max = explainer(x_max)

else:
    # numpy 分支：用位置索引
    idx_min_pos = int(np.argmin(y_test_arr))
    idx_max_pos = int(np.argmax(y_test_arr))
    x_min = X_test[idx_min_pos:idx_min_pos+1]
    x_max = X_test[idx_max_pos:idx_max_pos+1]
    y_min_val = float(y_test_arr[idx_min_pos])
    y_max_val = float(y_test_arr[idx_max_pos])
    print(f"Min-Y sample position: {idx_min_pos}, value: {y_min_val}")
    print(f"Max-Y sample position: {idx_max_pos}, value: {y_max_val}")

    exp_min = explainer(x_min)
    exp_max = explainer(x_max)

# —— Waterfall（局部解释）
plt.figure(figsize=(8, 6), dpi=1200)
shap.plots.waterfall(exp_min[0], max_display=20, show=False)
plt.title(f"Local Waterfall — Min-Y (y={y_min_val:.0f})", fontsize=13)
plt.tight_layout()
wf_min_path = os.path.join(FIG_DIR, "shap_waterfall_minY_xgb.png")
plt.savefig(wf_min_path, bbox_inches="tight")
plt.close()
print("Saved:", wf_min_path)

plt.figure(figsize=(8, 6), dpi=1200)
shap.plots.waterfall(exp_max[0], max_display=20, show=False)
plt.title(f"Local Waterfall — Max-Y (y={y_max_val:.0f})", fontsize=13)
plt.tight_layout()
wf_max_path = os.path.join(FIG_DIR, "shap_waterfall_maxY_xgb.png")
plt.savefig(wf_max_path, bbox_inches="tight")
plt.close()
print("Saved:", wf_max_path)

# —— Force plot（可选）
try:
    plt.figure(figsize=(10, 1.6), dpi=1200)
    shap.force_plot(exp_min.base_values[0], exp_min.values[0], exp_min.data[0],
                    feature_names=FEATURE_COLS, matplotlib=True, show=False)
    plt.title("Local Force — Min-Y", fontsize=12)
    fp_min_path = os.path.join(FIG_DIR, "shap_force_minY_xgb.png")
    plt.savefig(fp_min_path, bbox_inches="tight")
    plt.close()
    print("Saved:", fp_min_path)

    plt.figure(figsize=(10, 1.6), dpi=1200)
    shap.force_plot(exp_max.base_values[0], exp_max.values[0], exp_max.data[0],
                    feature_names=FEATURE_COLS, matplotlib=True, show=False)
    plt.title("Local Force — Max-Y", fontsize=12)
    fp_max_path = os.path.join(FIG_DIR, "shap_force_maxY_xgb.png")
    plt.savefig(fp_max_path, bbox_inches="tight")
    plt.close()
    print("Saved:", fp_max_path)
except Exception as e:
    print("[Warn] Force plot PNG export failed (ok to ignore):", e)


Min-Y sample index(label): 522, value: 1.0
Max-Y sample index(label): 457, value: 30.0
Saved: ./outputs/shap\shap_waterfall_minY_xgb.png
Saved: ./outputs/shap\shap_waterfall_maxY_xgb.png
Saved: ./outputs/shap\shap_force_minY_xgb.png
Saved: ./outputs/shap\shap_force_maxY_xgb.png


<Figure size 12000x1920 with 0 Axes>

<Figure size 12000x1920 with 0 Axes>

# 基于SEM模型的假设检验

In [4]:
# =========================
# SEM：诊断 → 建模 → 导出（按 H1→H2→H3）
# =========================
import os, warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from pathlib import Path

# semopy 兼容导入
from semopy import Model
try:
    from semopy.model_statistics import calc_stats
except Exception:
    try:
        from semopy.statistics import calc_stats
    except Exception:
        from semopy import calc_stats
try:
    from semopy.inspector import inspect as sem_inspect
except Exception:
    from semopy.inspect import inspect as sem_inspect

# 诊断：KMO/Bartlett
from factor_analyzer.factor_analyzer import calculate_kmo, calculate_bartlett_sphericity

# 路径
BASE = Path(".")
DATA_PATH = Path("./data.xlsx")
OUT_DIRS = {
    "figs": "./outputs/sem/figs",
    "tables": "./outputs/sem/tables",
    "logs": "./outputs/sem/logs",
    "models": "./outputs/sem/models",
}
for p in OUT_DIRS.values():
    os.makedirs(p, exist_ok=True)

RANDOM_STATE = 20251010
np.random.seed(RANDOM_STATE)

# ========= 读取数据与派生列 =========
df = pd.read_excel(DATA_PATH)

# 专业组别（用于异质性，不进主结构式）
HIGH = {"计算机类","数学统计学类","医学类","管理类","经济金融类","新闻传播学类","思想政治教育类"}
MID  = {"物理学类","电子信息类","自动化类","化学类","生物科学类","土木类"}
LOW  = {"文学类","教育学类","心理学类","法学类","历史学类","艺术美术设计类","体育学类"}

def map_major_group(s):
    if s in HIGH: return "high"
    if s in MID:  return "mid"
    return "low"

if "major_cat" in df.columns:
    df["major_group"] = df["major_cat"].apply(map_major_group)
else:
    df["major_group"] = "mid"  # 若仅有编码，可后续再映射

# —— 主模型所需列（与你的数据列一致）
y_items    = [f"dai_item_{i}" for i in range(1, 7)]   # DAI 6题
usage_inds = ["ai_daily_minutes", "ai_use_days_per_week", "iter_per_task", "prompt_length_avg"]
strain_inds= ["academic_stress", "ucla_loneliness"]
lit_inds   = ["ai_lit_knowledge", "ai_lit_skill", "ai_lit_ethics"]

# 同伴列：优先使用 encode 版，否则回退
peer_col = "peer_ai_norms_encode" if "peer_ai_norms_encode" in df.columns else \
           ("peer_ai_norms" if "peer_ai_norms" in df.columns else None)
assert peer_col is not None, "未找到同伴规范列（peer_ai_norms_encode 或 peer_ai_norms）"

exogenous = {
    "Peer":   peer_col,
    "Trust":  "trust_in_ai",
    "Grade":  "grade_level_encode",
    "Income": "family_income_quintile",
}

# 丢弃关键缺失
cols_needed = y_items + usage_inds + strain_inds + lit_inds + list(exogenous.values())
dff = df.dropna(subset=cols_needed).copy()

# 居中处理（便于交互与稳定性）
def center(s): return s - s.mean()
for c in usage_inds + lit_inds:
    dff[f"c_{c}"] = center(dff[c])

# ========= 工具函数 =========
def cronbach_alpha(df_):
    df_ = df_.dropna()
    k = df_.shape[1]
    if k <= 1: return np.nan
    var_sum = df_.var(axis=0, ddof=1).sum()
    total_var = df_.sum(axis=1).var(ddof=1)
    return (k/(k-1)) * (1 - var_sum/total_var) if total_var > 0 else np.nan

def composite_z(df_, cols):
    z = (df_[cols] - df_[cols].mean())/df_[cols].std(ddof=0)
    return z.mean(axis=1)

def _normalize_fit_stats(stats_obj, n_obs: int) -> pd.DataFrame:
    if isinstance(stats_obj, pd.DataFrame):
        df0 = stats_obj.copy()
        if df0.shape[0] == 1:
            row = df0.iloc[0]
            def g(keys):
                for k in keys:
                    if k in row:
                        try: return float(row[k])
                        except Exception: pass
                return None
            out = {"n_obs": n_obs,
                   "df": g(["DoF","dof","df"]), "logl": g(["LogLik","logl","loglik","LogLik."]),
                   "AIC": g(["AIC","aic"]), "BIC": g(["BIC","bic"]),
                   "CFI": g(["CFI","cfi"]), "TLI": g(["TLI","tli","NNFI"]),
                   "RMSEA": g(["RMSEA","rmsea"]), "SRMR": g(["SRMR","srmr"])}
            return pd.DataFrame([out])
        idx_lower = {str(i).lower(): i for i in df0.index}
        def g2(key):
            for cand in [key.lower(), key.upper(), key.capitalize()]:
                if cand in idx_lower:
                    s = df0.loc[idx_lower[cand]]
                    try: return float(getattr(s, "values", [s])[0])
                    except Exception:
                        try: return float(s)
                        except Exception: return None
            return None
        out = {"n_obs": n_obs, "df": g2("dof"), "logl": g2("loglik"),
               "AIC": g2("aic"), "BIC": g2("bic"), "CFI": g2("cfi"),
               "TLI": g2("tli"), "RMSEA": g2("rmsea"), "SRMR": g2("srmr")}
        return pd.DataFrame([out])
    out = {"n_obs": n_obs,
           "df": getattr(stats_obj,"dof",None), "logl": getattr(stats_obj,"logl",None),
           "AIC": getattr(stats_obj,"aic",None), "BIC": getattr(stats_obj,"bic",None),
           "CFI": getattr(stats_obj,"cfi",None), "TLI": getattr(stats_obj,"tli",None),
           "RMSEA": getattr(stats_obj,"rmsea",None), "SRMR": getattr(stats_obj,"srmr",None)}
    return pd.DataFrame([out])

def get_coef(params_df: pd.DataFrame, lhs: str, op: str, rhs: str) -> float:
    if params_df is None or len(params_df) == 0: return np.nan
    df0 = params_df.copy(); df0.columns = [str(c).lower() for c in df0.columns]
    def pick(cands): 
        for c in cands:
            if c in df0.columns: return c
        return None
    lcol = pick(["lhs","lval","lvar","left"])
    rcol = pick(["rhs","rval","rvar","right"])
    ocol = pick(["op","operator"])
    ecol = pick(["std.estimate","estimate","est","value","beta","coef"])  # 优先取标准化
    if not all([lcol,rcol,ocol,ecol]): return np.nan
    sub = df0[(df0[lcol]==lhs) & (df0[ocol]==op) & (df0[rcol]==rhs)]
    if sub.empty: return np.nan
    try: return float(sub.iloc[0][ecol])
    except Exception: return np.nan

# ========= 1) 测量诊断：KMO / Bartlett / α =========
blocks = {
    "DAI": y_items,
    "AI_Usage": usage_inds,
    "Strain": strain_inds,
    "AI_Literacy": lit_inds
}
diag_rows = []
for name, cols in blocks.items():
    sub = dff[cols].dropna()
    kmo_per_var, kmo_total = calculate_kmo(sub)     # <- 改这里
    chi2, p_bartlett = calculate_bartlett_sphericity(sub)
    alpha = cronbach_alpha(sub)

    diag_rows.append({
        "block": name,
        "items": ",".join(cols),
        "n_used": len(sub),
        "KMO_overall": float(kmo_total),            # <- 用总体 KMO
        "KMO_mean_per_var": float(np.nanmean(kmo_per_var)),  # 可选
        "Bartlett_chi2": float(chi2),
        "Bartlett_p": float(p_bartlett),
        "Cronbach_alpha": float(alpha)
    })




diag_df = pd.DataFrame(diag_rows)
diag_path = "./outputs/sem/tables/measurement_diagnostics_kmo_bartlett_alpha.xlsx"
diag_df.to_excel(diag_path, index=False)
print("Saved:", diag_path)

# ========= 2) 构造 H3 的观测交互：usage_x_lit =========
dff["_usage_idx_z"] = composite_z(dff, usage_inds)
dff["_lit_idx_z"]   = composite_z(dff, lit_inds)
dff["usage_x_lit"]  = dff["_usage_idx_z"] * dff["_lit_idx_z"]

# ========= 3) SEM 模型（无 UxL；按 H1→H2→H3）=========
model_desc = f"""
# Measurement（CFA）
DAI =~ {' + '.join(y_items)}
AI_Usage =~ {' + '.join(usage_inds)}
Strain =~ {' + '.join(strain_inds)}
AI_Literacy =~ {' + '.join(lit_inds)}

# Structural（H1：中介；保留直达）
AI_Usage ~ {exogenous['Peer']} + {exogenous['Trust']}
DAI ~ AI_Usage + {exogenous['Peer']} + {exogenous['Trust']}

# H2：压力直接效应
DAI ~ Strain

# H3：素养缓冲（观测交互）
DAI ~ usage_x_lit

# Controls
DAI ~ {exogenous['Grade']} + {exogenous['Income']}
"""
print(model_desc)

# ========= 4) 拟合主模型 & 导出 =========
mod = Model(model_desc); _ = mod.fit(dff)
stats_obj = calc_stats(mod)
fit_df = _normalize_fit_stats(stats_obj, n_obs=len(dff))
fit_path = "./outputs/sem/tables/sem_fit_main.xlsx"
fit_df.to_excel(fit_path, index=False); print("Saved:", fit_path)

params = sem_inspect(mod)
params_path = "./outputs/sem/tables/sem_params_main.xlsx"
params.to_excel(params_path, index=False); print("Saved:", params_path)

# 结构路径与载荷导出（带显著性）
def _pick_col(df, candidates, required=True):
    for c in candidates:
        if c in df.columns: return c
    if required: raise KeyError(f"None of columns found: {candidates}")
    return None

dfp = params.copy()
lhs_col = _pick_col(dfp, ["lval","lhs"])
rhs_col = _pick_col(dfp, ["rval","rhs"])
op_col  = _pick_col(dfp, ["op"])
est_col = _pick_col(dfp, ["Std.Estimate","Estimate","Est","est"])
se_col  = _pick_col(dfp, ["SE","Std.Err","StdErr","SE (Robust)","SE (robust)"], required=False)
z_col   = _pick_col(dfp, ["z-value","z","Z"], required=False)
p_col   = _pick_col(dfp, ["p-value","P-value","pvalue","P(>|z|)","p"], required=False)

ps = dfp[dfp[op_col]=="~"].copy()
ps["_est"] = pd.to_numeric(ps[est_col], errors="coerce")
ps["_se"]  = pd.to_numeric(ps[se_col], errors="coerce") if se_col in ps.columns else np.nan
ps["_z"]   = pd.to_numeric(ps[z_col], errors="coerce") if z_col in ps.columns else np.nan
ps["_p"]   = pd.to_numeric(ps[p_col], errors="coerce") if p_col in ps.columns else np.nan

def _stars(p): 
    if pd.isna(p): return ""
    return "***" if p<0.001 else ("**" if p<0.01 else ("*" if p<0.05 else ""))

param_struct = ps[[lhs_col,op_col,rhs_col,"_est","_se","_z","_p"]].rename(columns={
    lhs_col:"lhs", op_col:"op", rhs_col:"rhs", "_est":"beta", "_se":"se", "_z":"z", "_p":"p"
})
param_struct["sig"] = param_struct["p"].apply(_stars)
struct_path = "./outputs/sem/tables/sem_paths_main_structural.xlsx"
param_struct.to_excel(struct_path, index=False); print("Saved:", struct_path)

pl = dfp[dfp[op_col] =="=~"].copy()
pl["_est"] = pd.to_numeric(pl[est_col], errors="coerce")
loadings = pl[[lhs_col,op_col,rhs_col,"_est"]].rename(columns={
    lhs_col:"latent", op_col:"op", rhs_col:"indicator", "_est":"loading"
})
load_path = "./outputs/sem/tables/sem_loadings.xlsx"
loadings.to_excel(load_path, index=False); print("Saved:", load_path)

# ========= 5) 中介与调节 Bootstrap CI =========
from tqdm.auto import trange
idx_all = np.arange(len(dff))
def _coef_from(m, lhs, op, rhs):
    p = sem_inspect(m); return get_coef(p, lhs, op, rhs)

b_peer_usage  = _coef_from(mod, "AI_Usage", "~", exogenous["Peer"])
b_trust_usage = _coef_from(mod, "AI_Usage", "~", exogenous["Trust"])
b_usage_dai   = _coef_from(mod, "DAI", "~", "AI_Usage")
b_strain_dai  = _coef_from(mod, "DAI", "~", "Strain")
b_int_dai     = _coef_from(mod, "DAI", "~", "usage_x_lit")

ind_peer  = b_peer_usage * b_usage_dai
ind_trust = b_trust_usage * b_usage_dai

B = 1000; boot_records = []
for _ in trange(B, leave=False):
    boot_idx = np.random.choice(idx_all, size=len(idx_all), replace=True)
    d_boot = dff.iloc[boot_idx].reset_index(drop=True)
    try:
        m_boot = Model(model_desc); m_boot.fit(d_boot)
        bu  = _coef_from(m_boot, "DAI", "~", "AI_Usage")
        bpu = _coef_from(m_boot, "AI_Usage", "~", exogenous["Peer"])
        btu = _coef_from(m_boot, "AI_Usage", "~", exogenous["Trust"])
        bsd = _coef_from(m_boot, "DAI", "~", "Strain")
        bid = _coef_from(m_boot, "DAI", "~", "usage_x_lit")
        boot_records.append({
            "ind_peer": bpu*bu, "ind_trust": btu*bu,
            "b_strain_to_dai": bsd, "b_interaction_to_dai": bid
        })
    except Exception:
        continue

boot_df = pd.DataFrame(boot_records)
def ci(s): 
    return pd.Series({"mean": s.mean(), "2.5%": s.quantile(0.025), "97.5%": s.quantile(0.975)})
ci_df = boot_df.apply(ci).T
ci_df_path = "./outputs/sem/tables/sem_boot_mediation_and_mod.xlsx"
ci_df.to_excel(ci_df_path); print("Saved:", ci_df_path)

# ========= 6) 分组比较：学校层级 & 专业强度 =========
key_paths = [
    ("AI_Usage","~", exogenous["Peer"]),
    ("AI_Usage","~", exogenous["Trust"]),
    ("DAI","~","AI_Usage"),
    ("DAI","~","Strain"),
    ("DAI","~","usage_x_lit"),
    ("DAI","~",exogenous["Grade"]),
    ("DAI","~",exogenous["Income"]),
]

def export_group_fit(group_col, fname):
    rows = []
    for g, dg in dff.groupby(group_col):
        if len(dg) < 20:
            print(f"[Warn] group='{g}' 样本较小（n={len(dg)}）"); 
        mg = Model(model_desc); mg.fit(dg)
        pg = sem_inspect(mg); stats_g = calc_stats(mg)
        fit_row = _normalize_fit_stats(stats_g, n_obs=len(dg)).iloc[0].to_dict()
        for (lhs,op,rhs) in key_paths:
            rows.append({
                "group": f"{group_col}={g}",
                "lhs": lhs, "op": op, "rhs": rhs,
                "coef": get_coef(pg, lhs, op, rhs),
                "CFI": fit_row.get("CFI"), "TLI": fit_row.get("TLI"),
                "RMSEA": fit_row.get("RMSEA"), "SRMR": fit_row.get("SRMR"),
                "AIC": fit_row.get("AIC"), "BIC": fit_row.get("BIC"),
                "n": int(fit_row.get("n_obs", len(dg)))
            })
    out = pd.DataFrame(rows); out.to_excel(fname, index=False); print("Saved:", fname)

assert "school_type_encode" in dff.columns, "缺少 school_type_encode 列"
export_group_fit("school_type_encode", "./outputs/sem/tables/sem_groups_school_type.xlsx")

if "major_group" in dff.columns:
    export_group_fit("major_group", "./outputs/sem/tables/sem_groups_major.xlsx")

# 结构图（需要本机安装 graphviz）
from semopy import semplot
fig_file = "./outputs/sem/figs/sem_model_graph.png"
semplot(mod, fig_file); print("Saved:", fig_file)


Saved: ./outputs/sem/tables/measurement_diagnostics_kmo_bartlett_alpha.xlsx

# Measurement（CFA）
DAI =~ dai_item_1 + dai_item_2 + dai_item_3 + dai_item_4 + dai_item_5 + dai_item_6
AI_Usage =~ ai_daily_minutes + ai_use_days_per_week + iter_per_task + prompt_length_avg
Strain =~ academic_stress + ucla_loneliness
AI_Literacy =~ ai_lit_knowledge + ai_lit_skill + ai_lit_ethics

# Structural（H1：中介；保留直达）
AI_Usage ~ peer_ai_norms + trust_in_ai
DAI ~ AI_Usage + peer_ai_norms + trust_in_ai

# H2：压力直接效应
DAI ~ Strain

# H3：素养缓冲（观测交互）
DAI ~ usage_x_lit

# Controls
DAI ~ grade_level_encode + family_income_quintile

Saved: ./outputs/sem/tables/sem_fit_main.xlsx
Saved: ./outputs/sem/tables/sem_params_main.xlsx
Saved: ./outputs/sem/tables/sem_paths_main_structural.xlsx
Saved: ./outputs/sem/tables/sem_loadings.xlsx


                                                   

Saved: ./outputs/sem/tables/sem_boot_mediation_and_mod.xlsx
Saved: ./outputs/sem/tables/sem_groups_school_type.xlsx
Saved: ./outputs/sem/tables/sem_groups_major.xlsx
Saved: ./outputs/sem/figs/sem_model_graph.png


In [2]:
# 中文注释：导入依赖并创建输出目录
import os, warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from pathlib import Path

from semopy import Model, Optimizer, calc_stats, semplot
from semopy.inspector import inspect as sem_inspect

# 路径
BASE = Path(".")
DATA_PATH = Path("./data.xlsx")  # 你的上传路径
OUT_DIRS = {
    "figs": "./outputs/sem/figs",
    "tables":  "./outputs/sem/tables",
    "logs":  "./outputs/sem/logs",
    "models":  "./outputs/sem/models",
}


RANDOM_STATE = 20251010
np.random.seed(RANDOM_STATE)


In [None]:
# 中文注释：读取数据；本数据是你手动清洗过的 data.xlsx（分类变量已编码）
df = pd.read_excel(DATA_PATH)

# —— 构造“专业组别”major_group（高/中/低强度）用于异质性（不进入主模型）
HIGH = {"计算机类","数学统计学类","医学类","管理类","经济金融类","新闻传播学类","思想政治教育类"}
MID  = {"物理学类","电子信息类","自动化类","化学类","生物科学类","土木类"}
LOW  = {"文学类","教育学类","心理学类","法学类","历史学类","艺术美术设计类","体育学类"}

def map_major_group(s):
    if s in HIGH: return "high"
    if s in MID:  return "mid"
    return "low"

# 若你的 df 中只保留了编码列，也可以用旧列名恢复；这里尽量兼容两种情况：
if "major_cat" in df.columns:
    df["major_group"] = df["major_cat"].apply(map_major_group)
else:
    # 若只有编码，先设置一个空的（可按编码再映射，你若需要我可以提供编码→中文→组别表）
    df["major_group"] = "mid"

# —— 主模型需要的观测变量列名（与你生成的列保持一致）
y_items = [f"dai_item_{i}" for i in range(1, 7)]
usage_inds = ["ai_daily_minutes", "ai_use_days_per_week", "iter_per_task", "prompt_length_avg"]
strain_inds = ["academic_stress", "ucla_loneliness"]
lit_inds = ["ai_lit_knowledge", "ai_lit_skill", "ai_lit_ethics"]

exogenous = {
    "Peer": "peer_ai_norms",
    "Trust": "trust_in_ai",
    "Grade": "grade_level_encode",
    "Income": "family_income_quintile",
}

# 丢弃含有关键缺失的样本（SEM 对缺失较敏感；也可以切换成 FIML，但 semopy 目前更稳的是完整样本）
cols_needed = y_items + usage_inds + strain_inds + lit_inds + list(exogenous.values())
dff = df.dropna(subset=cols_needed).copy()


def center(s): return s - s.mean()
for c in usage_inds + lit_inds:
    dff[f"c_{c}"] = center(dff[c])

# —— 产品指示（为避免过重，取 3 组合；都用“中心化后”的指示）
#    UxL 指示： (minutes×knowledge), (days×skill), (iter×ethics)
prod_specs = [
    ("c_ai_daily_minutes", "c_ai_lit_knowledge"),
    ("c_ai_use_days_per_week", "c_ai_lit_skill"),
    ("c_iter_per_task", "c_ai_lit_ethics"),
]
prod_cols = []
for a, b in prod_specs:
    name = f"prod_{a[2:]}_x_{b[2:]}"  # 去掉前缀c_
    dff[name] = dff[a] * dff[b]
    prod_cols.append(name)

print("Data ready. N =", len(dff))


Data ready. N = 860


In [6]:
# 中文注释：semopy 的模型描述字符串
# 说明：
# - =~ 表示测量关系（潜变量由哪些指示测量）
# - ~  表示结构路径
# - 我们使用 UxL 作为潜变量，其指示为前面构造的 3 个产品指标
# - 同时放入 Grade、Income 控制 DAI；保留 Peer、Trust → DAI 直接路径做“部分中介”检验（需要可删）

model_desc = f"""
# Measurement
DAI =~ {' + '.join(y_items)}
AI_Usage =~ {' + '.join(usage_inds)}
Strain =~ {' + '.join(strain_inds)}
AI_Literacy =~ {' + '.join(lit_inds)}
UxL =~ {' + '.join(prod_cols)}

# Structural
AI_Usage ~ {exogenous['Peer']} + {exogenous['Trust']}
DAI ~ AI_Usage + Strain + UxL + {exogenous['Peer']} + {exogenous['Trust']} + {exogenous['Grade']} + {exogenous['Income']}
"""

print(model_desc)



# Measurement
DAI =~ dai_item_1 + dai_item_2 + dai_item_3 + dai_item_4 + dai_item_5 + dai_item_6
AI_Usage =~ ai_daily_minutes + ai_use_days_per_week + iter_per_task + prompt_length_avg
Strain =~ academic_stress + ucla_loneliness
AI_Literacy =~ ai_lit_knowledge + ai_lit_skill + ai_lit_ethics
UxL =~ prod_ai_daily_minutes_x_ai_lit_knowledge + prod_ai_use_days_per_week_x_ai_lit_skill + prod_iter_per_task_x_ai_lit_ethics

# Structural
AI_Usage ~ peer_ai_norms + trust_in_ai
DAI ~ AI_Usage + Strain + UxL + peer_ai_norms + trust_in_ai + grade_level_encode + family_income_quintile



In [13]:
# === Fixed Cell (multi-version safe): Fit SEM main model & export fit indices ===
import pandas as pd

# --- robust imports for different semopy versions ---
from semopy import Model
# calc_stats path differs across versions
try:
    from semopy.model_statistics import calc_stats
except Exception:
    try:
        from semopy.statistics import calc_stats
    except Exception:
        # some builds expose calc_stats at top-level
        from semopy import calc_stats

# inspector path also differs
try:
    from semopy.inspector import inspect as sem_inspect
except Exception:
    # older versions
    from semopy.inspect import inspect as sem_inspect

# --- fit model (fit() already does load_dataset + optimize) ---
mod = Model(model_desc)
_ = mod.fit(dff)

# --- normalize fit stats across semopy versions ---
def _normalize_fit_stats(stats_obj, n_obs: int) -> pd.DataFrame:
    """
    兼容两类返回：
    1) pandas.DataFrame（有时是一行多列；有时是行=指标名）
    2) 带属性对象（dof, logl, aic, bic, cfi, tli, rmsea, srmr）
    输出：单行 DataFrame。
    """
    if isinstance(stats_obj, pd.DataFrame):
        df0 = stats_obj.copy()

        # 情况A：一行多列
        if df0.shape[0] == 1:
            row = df0.iloc[0]
            def g(keys):
                for k in keys:
                    if k in row:
                        try:
                            return float(row[k])
                        except Exception:
                            pass
                return None
            out = {
                "n_obs": n_obs,
                "df":   g(["DoF","dof","df"]),
                "logl": g(["LogLik","logl","LogLik.","loglik"]),
                "AIC":  g(["AIC","aic"]),
                "BIC":  g(["BIC","bic"]),
                "CFI":  g(["CFI","cfi"]),
                "TLI":  g(["TLI","tli","NNFI"]),
                "RMSEA":g(["RMSEA","rmsea"]),
                "SRMR": g(["SRMR","srmr"]),
            }
            return pd.DataFrame([out])

        # 情况B：行=指标名
        idx_lower = {str(i).lower(): i for i in df0.index}
        def g2(key):
            for cand in [key.lower(), key.upper(), key.capitalize()]:
                if cand in idx_lower:
                    s = df0.loc[idx_lower[cand]]
                    try:
                        return float(getattr(s, "values", [s])[0])
                    except Exception:
                        try:
                            return float(s)
                        except Exception:
                            return None
            return None

        out = {
            "n_obs": n_obs,
            "df":   g2("dof"),
            "logl": g2("loglik"),
            "AIC":  g2("aic"),
            "BIC":  g2("bic"),
            "CFI":  g2("cfi"),
            "TLI":  g2("tli"),
            "RMSEA":g2("rmsea"),
            "SRMR": g2("srmr"),
        }
        return pd.DataFrame([out])

    # 情况C：对象有属性
    out = {
        "n_obs": n_obs,
        "df":   getattr(stats_obj, "dof", None),
        "logl": getattr(stats_obj, "logl", None),
        "AIC":  getattr(stats_obj, "aic", None),
        "BIC":  getattr(stats_obj, "bic", None),
        "CFI":  getattr(stats_obj, "cfi", None),
        "TLI":  getattr(stats_obj, "tli", None),
        "RMSEA":getattr(stats_obj, "rmsea", None),
        "SRMR": getattr(stats_obj, "srmr", None),
    }
    return pd.DataFrame([out])

# --- compute & export fit indices ---
stats_obj = calc_stats(mod)   # 不要传 dff
fit_df = _normalize_fit_stats(stats_obj, n_obs=len(dff))
fit_path = "./outputs/sem/sem_fit_main.xlsx"
fit_df.to_excel(fit_path, index=False)
print("Saved:", fit_path)

# --- export parameter table ---
params = sem_inspect(mod)
params_path = "./outputs/sem/sem_params_main.xlsx"
params.to_excel(params_path, index=False)
print("Saved:", params_path)

fit_df.head(), params.head()


Saved: ./outputs/sem/sem_fit_main.xlsx
Saved: ./outputs/sem/sem_params_main.xlsx


(   n_obs     df      logl        AIC         BIC       CFI       TLI  \
 0    860  205.0  1.081913  93.836175  322.168929  0.790602  0.759958   
 
       RMSEA  SRMR  
 0  0.064184  None  ,
        lval op           rval   Estimate       Std. Err    z-value   p-value
 0  AI_Usage  ~  peer_ai_norms   0.459233       1.664402   0.275915  0.782614
 1  AI_Usage  ~    trust_in_ai   4.327321       3.229778    1.33982  0.180304
 2       DAI  ~       AI_Usage   0.008565       0.000724  11.826264       0.0
 3       DAI  ~         Strain   1.645551       0.459953   3.577654  0.000347
 4       DAI  ~            UxL -17.780567  119655.168718  -0.000149  0.999881)

In [None]:
# 中文注释：计算间接效应（基于参数表） + bootstrap CI
# 间接：Peer→Usage→DAI / Trust→Usage→DAI
# 取系数：Usage~Peer 的回归系数、DAI~AI_Usage 的回归系数，做乘积。
# bootstrap：重抽样 N 次，保存乘积分布。

from tqdm.auto import trange

def get_coef(df_params, lhs, op, rhs):
    # 在 semopy 参数表中筛选某个路径的系数（估计值）
    m = df_params
    row = m[(m["lval"]==lhs) & (m["op"]==op) & (m["rval"]==rhs)]
    return float(row["Estimate"].values[0])

# 当前样本的点估计
b_peer_usage = get_coef(params, "AI_Usage", "~", exogenous["Peer"])
b_trust_usage = get_coef(params, "AI_Usage", "~", exogenous["Trust"])
b_usage_dai  = get_coef(params, "DAI", "~", "AI_Usage")
b_uxl_dai    = get_coef(params, "DAI", "~", "UxL")

ind_peer = b_peer_usage * b_usage_dai
ind_trust = b_trust_usage * b_usage_dai

# Bootstrap
B = 1000
boot_records = []
idx_all = np.arange(len(dff))
for _ in trange(B, leave=False):
    boot_idx = np.random.choice(idx_all, size=len(idx_all), replace=True)
    d_boot = dff.iloc[boot_idx].reset_index(drop=True)
    m_boot = Model(model_desc)
    try:
        Optimizer(m_boot).optimize(d_boot)
        p_boot = sem_inspect(m_boot)

        bu = get_coef(p_boot, "DAI", "~", "AI_Usage")
        bpu = get_coef(p_boot, "AI_Usage", "~", exogenous["Peer"])
        btu = get_coef(p_boot, "AI_Usage", "~", exogenous["Trust"])
        buxl = get_coef(p_boot, "DAI", "~", "UxL")

        boot_records.append({
            "ind_peer": bpu*bu,
            "ind_trust": btu*bu,
            "b_UxL_to_DAI": buxl
        })
    except Exception:
        # 个别重抽失败就跳过
        continue

boot_df = pd.DataFrame(boot_records)
def ci(s): 
    return pd.Series({"mean": s.mean(), "2.5%": s.quantile(0.025), "97.5%": s.quantile(0.975)})
ci_df = boot_df.apply(ci).T
ci_df_path = "./outputs/sem/sem_boot_mediation_and_mod.xlsx"
ci_df.to_excel(ci_df_path)
print("Saved:", ci_df_path)

ci_df


                                                    

Saved: ./outputs/sem/sem_boot_mediation_and_mod.xlsx




In [16]:
# 中文注释：输出 SEM 结构图（.png）
fig_file =  "./outputs/sem/sem_model_graph.png"
semplot(mod, fig_file)   # 需要本机安装 graphviz
print("Saved:", fig_file)


Saved: ./outputs/sem/sem_model_graph.png


## 异质性分析：按照学校类型

In [19]:
# === Fixed Cell: Groupwise SEM fits by school_type_encode with robust stats export ===
import numpy as np
import pandas as pd
from semopy import Model

# 复用你前面那格里已定义的工具：
# - sem_inspect  : 已做多版本兼容的 inspect()
# - calc_stats   : 已做多版本兼容的 calc_stats()
# - _normalize_fit_stats(stats_obj, n_obs): 将 fit 指标归一为单行 DataFrame

group_col = "school_type_encode"
assert group_col in dff.columns, "缺少 school_type_encode 列"

# --- 兼容：从 exogenous 字典里取命名；若不存在则直接用传入值 ---
def _rhs_name(name: str) -> str:
    try:
        return exogenous.get(name, name)
    except NameError:
        return name

# 关键路径（左右变量名称与主模型一致）
key_paths = [
    ("AI_Usage", "~", _rhs_name("Peer")),
    ("AI_Usage", "~", _rhs_name("Trust")),
    ("DAI", "~", "AI_Usage"),
    ("DAI", "~", "Strain"),
    ("DAI", "~", "UxL"),
    ("DAI", "~", _rhs_name("Grade")),
    ("DAI", "~", _rhs_name("Income")),
]

# --- 兼容不同 semopy 版本的参数表取系数 ---
def get_coef(params_df: pd.DataFrame, lhs: str, op: str, rhs: str) -> float:
    if params_df is None or len(params_df) == 0:
        return np.nan
    df = params_df.copy()
    # 统一列名到小写，便于匹配
    df.columns = [str(c).lower() for c in df.columns]

    # 候选列名映射
    lhs_cols = ["lhs", "lval", "lvar", "left"]
    rhs_cols = ["rhs", "rval", "rvar", "right"]
    op_cols  = ["op", "operator"]
    est_cols = ["est", "estimate", "value", "beta", "coef"]

    def _first_col(cands):
        for c in cands:
            if c in df.columns:
                return c
        return None

    lcol = _first_col(lhs_cols)
    rcol = _first_col(rhs_cols)
    ocol = _first_col(op_cols)
    ecol = _first_col(est_cols)

    if lcol is None or rcol is None or ocol is None or ecol is None:
        return np.nan

    sub = df[(df[lcol] == lhs) & (df[ocol] == op) & (df[rcol] == rhs)]
    if sub.empty:
        return np.nan
    try:
        return float(sub.iloc[0][ecol])
    except Exception:
        return np.nan

# --- 分组拟合并汇总 ---
rows = []
for g, dg in dff.groupby(group_col):
    # 拟合（fit 内部已 load_dataset + optimize）
    mg = Model(model_desc)
    _ = mg.fit(dg)

    # 参数与拟合度
    pg = sem_inspect(mg)            # 参数表（含路径/载荷/显著性）
    stats_obj = calc_stats(mg)      # 不要传 dg
    fit_row = _normalize_fit_stats(stats_obj, n_obs=len(dg)).iloc[0].to_dict()

    # 记录每个关键路径的系数 + 分组拟合指标
    for (lhs, op, rhs) in key_paths:
        rows.append({
            "group": f"school_{g}",
            "lhs": lhs, "op": op, "rhs": rhs,
            "coef": get_coef(pg, lhs, op, rhs),
            "CFI": fit_row.get("CFI"),
            "TLI": fit_row.get("TLI"),
            "RMSEA": fit_row.get("RMSEA"),
            "SRMR": fit_row.get("SRMR"),
            "AIC": fit_row.get("AIC"),
            "BIC": fit_row.get("BIC"),
            "n": int(fit_row.get("n_obs", len(dg)))
        })

school_cmp = pd.DataFrame(rows)
school_cmp_path =  "./outputs/sem/sem_groups_school_type.xlsx"
school_cmp.to_excel(school_cmp_path, index=False)
print("Saved:", school_cmp_path)

school_cmp.head()



Saved: ./outputs/sem/sem_groups_school_type.xlsx


Unnamed: 0,group,lhs,op,rhs,coef,CFI,TLI,RMSEA,SRMR,AIC,BIC,n
0,school_0,AI_Usage,~,peer_ai_norms,-0.223838,0.665335,0.616359,0.094993,,89.679015,235.76342,155
1,school_0,AI_Usage,~,trust_in_ai,0.227389,0.665335,0.616359,0.094993,,89.679015,235.76342,155
2,school_0,DAI,~,AI_Usage,0.010162,0.665335,0.616359,0.094993,,89.679015,235.76342,155
3,school_0,DAI,~,Strain,-882.099793,0.665335,0.616359,0.094993,,89.679015,235.76342,155
4,school_0,DAI,~,UxL,2.269699,0.665335,0.616359,0.094993,,89.679015,235.76342,155


## 多组异质性：按 Major Group（高/中/低）

In [20]:
# === Fixed Cell: Groupwise SEM fits by major_group (high/mid/low) ===
import numpy as np
import pandas as pd
from semopy import Model

assert "major_group" in dff.columns, "缺少 major_group 列"

# 允许的组次序（如数据不存在也会跳过，且打印提示）
ordered_groups = ["high", "mid", "low"]

rows = []
for g in ordered_groups:
    dg = dff[dff["major_group"] == g]
    if len(dg) == 0:
        print(f"[Skip] major_group='{g}' 无样本。")
        continue
    if len(dg) < 50:
        print(f"[Warn] major_group='{g}' 样本量较小（n={len(dg)}），结果可能不稳定。")

    # 拟合（fit 内部已 load_dataset + optimize）
    mg = Model(model_desc)
    _ = mg.fit(dg)

    # 参数与拟合度（多版本兼容）
    pg = sem_inspect(mg)           # 参数表（含路径/载荷/显著性）
    stats_obj = calc_stats(mg)     # 新版不需要传 dg
    fit_row = _normalize_fit_stats(stats_obj, n_obs=len(dg)).iloc[0].to_dict()

    # 汇总关键路径
    for (lhs, op, rhs) in key_paths:
        rows.append({
            "group": f"major_{g}",
            "lhs": lhs, "op": op, "rhs": rhs,
            "coef": get_coef(pg, lhs, op, rhs),
            "CFI": fit_row.get("CFI"),
            "TLI": fit_row.get("TLI"),
            "RMSEA": fit_row.get("RMSEA"),
            "SRMR": fit_row.get("SRMR"),
            "AIC": fit_row.get("AIC"),
            "BIC": fit_row.get("BIC"),
            "n": int(fit_row.get("n_obs", len(dg)))
        })

major_cmp = pd.DataFrame(rows)
major_cmp_path =  "./outputs/sem/sem_groups_major.xlsx"
major_cmp.to_excel(major_cmp_path, index=False)
print("Saved:", major_cmp_path)

major_cmp.head()


Saved: ./outputs/sem/sem_groups_major.xlsx


Unnamed: 0,group,lhs,op,rhs,coef,CFI,TLI,RMSEA,SRMR,AIC,BIC,n
0,major_high,AI_Usage,~,peer_ai_norms,6.622748,0.814551,0.787412,0.062749,,93.068276,272.423747,310
1,major_high,AI_Usage,~,trust_in_ai,10.290709,0.814551,0.787412,0.062749,,93.068276,272.423747,310
2,major_high,DAI,~,AI_Usage,0.008254,0.814551,0.787412,0.062749,,93.068276,272.423747,310
3,major_high,DAI,~,Strain,-241.380596,0.814551,0.787412,0.062749,,93.068276,272.423747,310
4,major_high,DAI,~,UxL,-10.208824,0.814551,0.787412,0.062749,,93.068276,272.423747,310


## 结果整理

In [26]:
# === Fixed Cell: Export structural paths & loadings (robust across semopy versions) ===
import numpy as np
import pandas as pd

def _pick_col(df, candidates, required=True):
    """从多个候选名里找第一列；required=False 时可返回 None。"""
    for c in candidates:
        if c in df.columns:
            return c
    if required:
        raise KeyError(f"None of columns found: {candidates}")
    return None

# 1) 结构路径 (op == "~")
dfp = params.copy()

# 列名兼容：常见有 lval/rval/op/Estimate/SE/z-value/p-value 或 Est/Std.Err/P(>|z|)
lhs_col = _pick_col(dfp, ["lval", "lhs"])
rhs_col = _pick_col(dfp, ["rval", "rhs"])
op_col  = _pick_col(dfp, ["op"])

est_col = _pick_col(dfp, ["Estimate", "Est", "est", "Std.Estimate"], required=True)
se_col  = _pick_col(dfp, ["SE", "Std.Err", "StdErr", "SE (Robust)", "SE (robust)"], required=False)
z_col   = _pick_col(dfp, ["z-value", "z", "Z"], required=False)
p_col   = _pick_col(dfp, ["p-value", "P-value", "pvalue", "P(>|z|)", "p"], required=False)

# 仅取结构路径
ps = dfp[dfp[op_col] == "~"].copy()

# 转数值（字符串也能转成数值；失败给 NaN）
ps["_est"] = pd.to_numeric(ps[est_col], errors="coerce")
if se_col is not None:
    ps["_se"]  = pd.to_numeric(ps[se_col], errors="coerce")
else:
    ps["_se"]  = np.nan
if z_col is not None:
    ps["_z"]   = pd.to_numeric(ps[z_col], errors="coerce")
else:
    ps["_z"]   = np.nan
if p_col is not None:
    ps["_p"]   = pd.to_numeric(ps[p_col], errors="coerce")
else:
    ps["_p"]   = np.nan

# 若 SE 缺失且 z 有值，用 SE = Estimate / z 回推（z=0 或缺失则仍为 NaN）
mask_se_missing = ps["_se"].isna() & ps["_z"].notna() & (ps["_z"] != 0)
ps.loc[mask_se_missing, "_se"] = ps.loc[mask_se_missing, "_est"] / ps.loc[mask_se_missing, "_z"]

# 显著性星号
def _stars(p):
    if pd.isna(p): return ""
    return "***" if p < 0.001 else ("**" if p < 0.01 else ("*" if p < 0.05 else ""))

param_struct = ps[[lhs_col, op_col, rhs_col, "_est", "_se", "_z", "_p"]].rename(columns={
    lhs_col: "lhs", op_col: "op", rhs_col: "rhs",
    "_est": "beta", "_se": "se", "_z": "z", "_p": "p"
})

param_struct["sig"] = param_struct["p"].apply(_stars)

# 导出
struct_path = "./outputs/sem/sem_paths_main_structural.xlsx"
param_struct.to_excel(struct_path, index=False)
print("Saved:", struct_path)

# 2) 测量载荷 (op == "=~")
pl = dfp[dfp[op_col] == "=~"].copy()

# 同样做数值化；不同版本有标准化载荷列(如 Std.Estimate)
pl["_est"] = pd.to_numeric(pl[est_col], errors="coerce")

loadings = pl[[lhs_col, op_col, rhs_col, "_est"]].rename(columns={
    lhs_col: "latent", op_col: "op", rhs_col: "indicator", "_est": "loading"
})

load_path =  "./outputs/sem/sem_loadings.xlsx"
loadings.to_excel(load_path, index=False)
print("Saved:", load_path)



Saved: ./outputs/sem/sem_paths_main_structural.xlsx
Saved: ./outputs/sem/sem_loadings.xlsx
