In [None]:
import os
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import confusion_matrix
from fttransformer_mask import FTTransformer

# ============ 設定 ============
DATA_PATH = "cardio_train.csv"
TEST_RATIO = 0.2
categorical_features = ["gender", "cholesterol", "gluc", "smoke", "alco", "active"]
continuous_features = ["age", "height", "weight", "ap_hi", "ap_lo"]
additional_features = ["cholesterol", "gluc", "smoke", "alco", "active"]
MASK_BASE = [False, False] + [True] * 5 + [False] * 5
DEVICE = torch.device("cpu")  # GPUを使いたい場合は "cuda" に変更

# モデルファイルのパス辞書（fold: パス）
fold_to_model_path = {
    1: "./save_mask_models/best_model_epoch300_fold1_ValAcc0.732.pth",
    2: "./save_mask_models/best_model_epoch300_fold2_ValAcc0.739.pth",
    3: "./save_mask_models/best_model_epoch300_fold3_ValAcc0.731.pth",
    4: "./save_mask_models/best_model_epoch300_fold4_ValAcc0.737.pth",
    5: "./save_mask_models/best_model_epoch300_fold5_ValAcc0.735.pth",
}
# ============ メイン処理 ============
def main():
    # データ読み込みと前処理
    data = pd.read_csv(DATA_PATH, sep=';')
    ids = data["id"].values
    X = data.drop(columns=["cardio"])
    y = data["cardio"]

    X_trainval, X_test, y_trainval, y_test, id_trainval, id_test = train_test_split(
        X, y, ids, test_size=TEST_RATIO, random_state=42, stratify=y
    )

    encoders = {}
    for col in categorical_features:
        le = LabelEncoder()
        X_trainval[col] = le.fit_transform(X_trainval[col])
        X_test[col] = le.transform(X_test[col])
        encoders[col] = le

    scaler = StandardScaler()
    X_trainval[continuous_features] = scaler.fit_transform(X_trainval[continuous_features])
    X_test[continuous_features] = scaler.transform(X_test[continuous_features])

    # 出現確率の再計算
    feature_value_probs = {
        feat: X_trainval[feat].value_counts(normalize=True).sort_index().to_dict()
        for feat in additional_features
    }

    # 全fold集計用リスト
    all_y = []
    all_base_preds = []
    all_improved_preds = []

    for fold, model_path in fold_to_model_path.items():
        print(f"=== Fold{fold} モデル読み込み: {model_path} ===")
        if not os.path.exists(model_path):
            print(f"❌ モデルが存在しません: {model_path}")
            continue

        # モデルロード
        model = FTTransformer(
            categories=[X[cat].nunique() for cat in categorical_features],
            num_continuous=len(continuous_features),
            dim=64, depth=6, heads=8, ff_dropout=0.2, attn_dropout=0.2
        ).to(DEVICE)
        model.load_state_dict(torch.load(model_path, map_location=DEVICE))
        model.eval()

        # foldごとのリスト
        base_preds = []
        improved_preds = []
        base_correct = 0
        improved_correct = 0

        for i in range(len(X_test)):
            label = int(y_test.iloc[i])
            x_cat = torch.tensor([X_test.iloc[i][categorical_features].values], dtype=torch.long).to(DEVICE)
            x_cont = torch.tensor([X_test.iloc[i][continuous_features].values.astype(np.float32)], dtype=torch.float32).to(DEVICE)

            # ベース推論
            base_mask = torch.tensor([MASK_BASE], dtype=torch.bool).to(DEVICE)
            with torch.no_grad():
                base_conf = torch.sigmoid(model(x_cat, x_cont, base_mask)).item()
            base_pred = int(base_conf > 0.5)
            base_preds.append(base_pred)
            if base_pred == label:
                base_correct += 1

            # 期待寄与計算
            best_gain = -1.0
            best_feat = None
            for feat in additional_features:
                mask = MASK_BASE.copy()
                idx = additional_features.index(feat)
                mask[2 + idx] = False
                expected_gain = 0.0
                for val, prob in feature_value_probs[feat].items():
                    x_cat_alt = x_cat.clone()
                    x_cat_alt[0, categorical_features.index(feat)] = int(val)
                    mask_tensor = torch.tensor([mask], dtype=torch.bool).to(DEVICE)
                    with torch.no_grad():
                        conf_alt = torch.sigmoid(model(x_cat_alt, x_cont, mask_tensor)).item()
                    expected_gain += abs(conf_alt - base_conf) * prob
                if expected_gain > best_gain:
                    best_gain = expected_gain
                    best_feat = feat

            # 最終推論
            final_mask = MASK_BASE.copy()
            final_mask[2 + additional_features.index(best_feat)] = False
            mask_tensor = torch.tensor([final_mask], dtype=torch.bool).to(DEVICE)
            with torch.no_grad():
                best_feat_conf = torch.sigmoid(model(x_cat, x_cont, mask_tensor)).item()
            best_pred = int(best_feat_conf > 0.5)
            improved_preds.append(best_pred)
            if best_pred == label:
                improved_correct += 1

        # foldごとに混同行列表示
        y_true = y_test.tolist()
        base_cm = confusion_matrix(y_true, base_preds)
        imp_cm = confusion_matrix(y_true, improved_preds)
        print(f"Fold{fold} - base accuracy: {base_correct/len(X_test):.4f}, "
            f"improved accuracy: {improved_correct/len(X_test):.4f}")
        print("Base CM:\n", base_cm)
        print("Improved CM:\n", imp_cm)

        # 全体集計用に追加
        all_y.extend(y_true)
        all_base_preds.extend(base_preds)
        all_improved_preds.extend(improved_preds)

    # 全foldを通じた混同行列
    overall_base_cm = confusion_matrix(all_y, all_base_preds)
    overall_imp_cm = confusion_matrix(all_y, all_improved_preds)
    print("Overall Base Confusion Matrix:")
    print(overall_base_cm)
    print("Overall Improved Confusion Matrix:")
    print(overall_imp_cm)

if __name__ == "__main__":
    main()



=== Fold1 モデル読み込み: ./save_mask_models/best_model_epoch300_fold1_ValAcc0.732.pth ===


  x_cat = torch.tensor([X_test.iloc[i][categorical_features].values], dtype=torch.long).to(DEVICE)
