In [None]:
import os
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from factor_analyzer import FactorAnalyzer
from models import Autoencoder
from sklearn.decomposition import NMF
from sklearn.linear_model import Lasso, LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from utils import translate_text
from sklearn.model_selection import KFold
import shap
import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
import optuna
from sklearn.model_selection import KFold
import torch.nn.functional as F
import io
from PIL import Image
from utils import get_cbcl_details
import random
import netron
import sys
sys.path.append('.')
from model_code import *
import ncv as nested_cv

In [None]:
seed = 8  # 你可以设成其他任何整数
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)  # 多卡也能同步
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
code_dir = Path(os.getcwd())
data_path = code_dir.parent / "data"
assert os.path.exists(
    data_path
), "Data directory not found. Make sure you're running this code from the root directory of the project."

with open(data_path / "cbcl_data_remove_unrelated.csv", "r", encoding="utf-8") as f:
    qns = pd.read_csv(f)

X = qns.iloc[:, 1:].values

# Standardize the data
# scaler = StandardScaler()
scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

# Split into training and validation sets
X_train_raw, X_temp = train_test_split(X, test_size=0.2)
X_val_raw, X_test_raw = train_test_split(X_temp, test_size=0.5)


X_train = scaler.fit_transform(X_train_raw)
X_val = scaler.transform(X_val_raw)
X_test = scaler.transform(X_test_raw)

In [None]:
from utils import get_cbcl_details
items = [get_cbcl_details(col) for col in qns.iloc[:, 1:].columns]
items = np.array(items)

In [None]:
import importlib
import utils
importlib.reload(utils)

In [None]:
from utils import find_column_in_csvs

find_column_in_csvs(
    root_folder=r'G:\ABCD\abcd-data-release-5.1',
    target_column='kbi_p_conflict_causes___10'
)


In [None]:
items

# Train the model

In [None]:
# === AE ===
ae_scores, ae_best = nested_cv.OptimizeAE(X_train)
print("==== AE Result ====")
print("AE outer test MSEs:", ae_scores)
print("AE best hyperparameters:", ae_best)
print()

In [None]:
# === SparseAE ===
sparse_scores, sparse_best = nested_cv.OptimizeSparseAE(X_train)
print("==== SparseAE Result ====")
print("SparseAE outer test MSEs:", sparse_scores)
print("SparseAE best hyperparameters:", sparse_best)
print()

In [None]:
# === VAE ===
vae_scores, vae_best = nested_cv.OptimizeVAE(X_train)
print("==== VAE Result ====")
print("VAE outer test MSEs:", vae_scores)
print("VAE best hyperparameters:", vae_best)
print()

In [None]:
# === BetaVAE ===
beta_scores, beta_best = nested_cv.OptimizeBetaVAE(X_train)
print("==== BetaVAE Result ====")
print("BetaVAE outer test MSEs:", beta_scores)
print("BetaVAE best hyperparameters:", beta_best)
print()

In [None]:
print("==== BetaVAE Result ====")
print("BetaVAE outer test MSEs:", beta_scores)
print("BetaVAE best hyperparameters:", beta_best)
print()

In [None]:
import importlib
import model_code
importlib.reload(model_code)
from model_code import *   # 再次显式导入其中的类/函数
import ncv
importlib.reload(ncv)

In [None]:
# === COAE ===
coae_scores, coae_best = nested_cv.OptimizeCOAE(X_train)
print("==== COAE Result ====")
print("COAE outer test MSEs:", coae_scores)
print("COAE best hyperparameters:", coae_best)
print()

In [None]:
print("==== COAE Result ====")
print("COAE outer test MSEs:", coae_scores)
print("COAE best hyperparameters:", coae_best)
print()

In [None]:
# === FactorVAE ===
fv_scores, fv_best = nested_cv.OptimizeFactorVAE(X_train)
print("==== FactorVAE Result ====")
print("FactorVAE outer test MSEs:", fv_scores)
print("FactorVAE best hyperparameters:", fv_best)
print()

In [None]:
print("==== FactorVAE Result ====")
print("FactorVAE outer test MSEs:", fv_scores)
print("FactorVAE best hyperparameters:", fv_best)
print()

## Autoencoder

In [None]:
autoencoder = Autoencoder(
    X_train, X_val,
    encoding_dim=5,
    layer1_neurons=128,
    layer2_neurons=64,
    layer3_neurons=32,         
)
autoencoder.train(show_plot=True)

latent_factors, rec_errors, explained_variance_ratios, explained_variance_ratio_total, reconstructed = autoencoder.evaluate_on_data(X_train)
explained_variance_ratio_total

In [None]:
# 示例参数
encoding_dim = 5
layer1_neurons = 127
layer2_neurons = 117
layer3_neurons = 106
# layer1_neurons = 128
# layer2_neurons = 64
# layer3_neurons = 32
# layer4_neurons = 90
# n_clusters = 4  # 可根据需要调整聚类数

# 初始化 COAETrainer（推荐使用该封装）
autoencoder = SparseAutoencoder(
    X_train=X_train,
    X_val=X_val,
    encoding_dim=encoding_dim,
    layer1_neurons=layer1_neurons,
    layer2_neurons=layer2_neurons,
    layer3_neurons=layer3_neurons,
    # layer4_neurons=layer4_neurons,
    # n_clusters=n_clusters
)

# 开始训练
autoencoder.train(show_plot=True)
# autoencoder.export_to_onnx(X_train, onnx_path = "../output/sparse_autoencoder.onnx")

## sparse AE

In [None]:
sparseAE = SparseAutoencoder(
    X_train=X_train,
    X_val=X_val,
    encoding_dim=5,             # 必须提供，潜在维度
    layer1_neurons=128,
    layer2_neurons=64,
    layer3_neurons=32,
    sparsity_target=0.05,
    beta=1.0,
)

sparseAE.train(show_plot=True)

# 评估
mu_z, rec_errs, evr_each, evr_total, X_recon = sparseAE.evaluate_on_data(X_test)
print("Explained Variance Ratio (total):", evr_total)


## COAutoencoderModel

In [None]:
coae = COAETrainer(X_train, X_val, 
    latent_dim=5, 
    layer1=128, 
    layer2=64, 
    layer3=32, 
    n_clusters=5)
coae.train(show_plot=True)

z, rec_errs, evr_each, evr_total, X_recon = coae.evaluate_on_data(X_train)

print("Total Explained Variance Ratio (COAE):", evr_total)


## VAE

In [None]:
vae = VariationalAutoencoder(
    X_train, X_val,
    encoding_dim=5,
    layer1_neurons=128,
    layer2_neurons=64,
    layer3_neurons=32,
    beta_kl=0.1          # β-VAE 可自由调节
)
vae.train(show_plot=True)

mu_z, rec_errs, evr_each, evr_total, X_recon = vae.evaluate_on_data(X_test)
evr_total


In [None]:
mu_z, rec_errs, evr_each, evr_total, X_recon = vae.evaluate_on_data(X_train)
evr_total

## Beta VAE

In [None]:
bvae = BetaVAE(
    X_train, X_val,
    encoding_dim=5,
    layer1_neurons=128,
    layer2_neurons=64,
    layer3_neurons=32,         # β-VAE 可自由调节
)
bvae.train(show_plot=True)

latent_factors, rec_errors, explained_variance_ratios, explained_variance_ratio_total, reconstructed = bvae.evaluate_on_data(X_train)

## FactorVAE

## NMF-AE

## NMF-Factor-AE

In [None]:
latent, errs, ratios, total_ratio, recon = trainer.evaluate_on_data(X_train_np)
print(f"FactorVAE+NMF 总体方差解释率: {total_ratio}")

In [None]:
latent, errs, ratios, total_ratio, recon = trainer.evaluate_on_data(X_test)
print(f"FactorVAE+NMF 总体方差解释率: {total_ratio}")

In [None]:
ve_per_feature = 1 - np.var(X_train - reconstructed.cpu().numpy(), axis=0) / np.var(X_train, axis=0)
print("每道题的解释方差率:", ve_per_feature)
# 输出小于 0.1 的题目
low_variance_items = np.where(ve_per_feature < 0.1)[0]

print("解释方差率小于 0.1 的题目索引:", low_variance_items)

print("解释方差率小于 0.1 的题目:", items[low_variance_items])

# Interpretability for all models

In [None]:
latent_factors = (
    latent_factors.values
    if isinstance(latent_factors, pd.DataFrame)
    else latent_factors
)
# original_features = X if isinstance(X, np.ndarray) else X.values
original_features = (
    X_train if isinstance(X_train, np.ndarray) else X_train.values
)

# 存储每个原始特征的回归系数
n_original_features = original_features.shape[1]
n_latent_factors = latent_factors.shape[1]
scaler = StandardScaler()
latent_factors_scaled = scaler.fit_transform(latent_factors)
loadings = []

# 对每个原始特征进行回归，使用 latent_factors 作为输入特征
for i in range(n_original_features):
    y = original_features[:, i]  # 当前原始特征
    reg = LinearRegression().fit(latent_factors, y)
    loadings.append(reg.coef_)

# 将结果转换为 DataFrame，便于查看
loadings_df = pd.DataFrame(
    loadings, columns=[f"Latent_{j+1}" for j in range(n_latent_factors)]
)
# loadings_df.index = [f"Feature_{i+1}" for i in range(n_original_features)]
loadings_df.index = items


In [None]:
loadings_df = loadings_df.reindex(
    loadings_df['Latent_1'].abs().sort_values(ascending=False).index
)
loadings_df

In [None]:
import numpy as np

def average_absolute_correlation(matrix):
    """
    计算输入矩阵各列之间的平均绝对相关系数。
    
    参数:
        matrix: shape (N, d)，这里假定每行是一个样本，每列是一个factor/loading
    
    返回:
        avg_abs_corr: 所有列两两相关系数取绝对值后的平均值
    """
    # 1) 计算 d x d 的相关系数矩阵 (列 vs 列)
    corr_matrix = np.corrcoef(matrix, rowvar=False)  # rowvar=False表示按列计算相关
    
    # 2) 取上三角（不含对角线）的索引
    d = corr_matrix.shape[0]
    upper_tri_indices = np.triu_indices(d, k=1)  # (行索引数组, 列索引数组)
    
    # 3) 取出相关系数，并计算其绝对值
    off_diag_corr_values = corr_matrix[upper_tri_indices]  # 非对角线元素
    abs_off_diag = np.abs(off_diag_corr_values)
    
    # 4) 求平均
    avg_abs_corr = np.mean(abs_off_diag)
    
    return avg_abs_corr


# ==== 示例用法 ====
if __name__ == "__main__":
    # 模拟一个 latent factor 矩阵, shape = (N, d)
    # 比如 N=1000, d=5    
    # 计算 latent factors 的平均绝对相关
    avg_corr_latent = average_absolute_correlation(latent_factors)
    print("Average absolute correlation of latent factors:", avg_corr_latent)
    
    # 如果有 loading_factors 矩阵，同理：
    # loading_factors = ...
    # avg_corr_loading = average_absolute_correlation(loading_factors)
    # print("Average absolute correlation of loading factors:", avg_corr_loading)


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# 设置风格
sns.set(style='whitegrid')

# 遍历每一列（latent）
for col in loadings_df.columns:
    # 获取当前列绝对值最大的前5个特征（CBCL item）
    top5 = loadings_df[col].abs().sort_values(ascending=False).head(8).index
    top5_data = loadings_df.loc[top5, [col]]

    # 绘图
    plt.figure(figsize=(6, 3))  # 每张图小一些方便展示
    sns.heatmap(top5_data, annot=True, cmap='coolwarm', center=0, cbar=True)
    plt.title(f"Top 5 CBCL Loadings for {col}")
    plt.xlabel("Latent Dimension")
    plt.ylabel("CBCL Item")
    plt.tight_layout()
    plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(20, 80))
sns.heatmap(loadings_df, annot=True, cmap='coolwarm', center=0)
plt.title("CBCL Loadings Heatmap")
plt.xlabel("Latent Dimensions")
plt.ylabel("CBCL Items")
plt.tight_layout()
plt.show()


In [None]:
%matplotlib inline
# model = autoencoder.get_model().encoder
# model =model.Encoder()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()
background = X_train[np.random.choice(X_train.shape[0], 100, replace=False)]

explainer = shap.DeepExplainer(model, torch.tensor(background, dtype=torch.float32).to(device), )

# 计算 SHAP 值
shap_values = explainer.shap_values(torch.tensor(X_test[:20], dtype=torch.float32).to(device))

In [None]:
import shap
import matplotlib.pyplot as plt
import io
from PIL import Image

# We'll store each SHAP summary_plot as an in-memory PNG, then display them
images = []

for i in range(5):
    # 1) Create the SHAP summary plot on a brand-new figure
    shap.summary_plot(shap_values[:,:,i], X_test[:20], feature_names=items, show=False)
    plt.xlim(shap_values.min(), shap_values.max())
    
    # 2) Grab that just-created figure object
    tmp_fig = plt.gcf()
    
    # 3) Save it to a buffer in PNG format
    buf = io.BytesIO()
    tmp_fig.savefig(buf, format='png', bbox_inches='tight', dpi=300)
    buf.seek(0)
    
    # 4) Convert buffer -> PIL image and store
    images.append(Image.open(buf))
    
    # 5) Close that figure to avoid overlapping the next iteration
    plt.close(tmp_fig)

# Now create a single "master" figure of 1 row × 4 columns
fig, axes = plt.subplots(nrows=5, ncols=1, figsize=(20,5), dpi=1000)

for idx, ax in enumerate(axes):
    # 6) Display each PIL image in its own subplot
    ax.imshow(images[idx])
    ax.set_axis_off()
    ax.set_title(f"Factor {idx}")

plt.tight_layout()
plt.show()
