<a href="https://colab.research.google.com/github/MO230101/The-codes-for-hydrogel-study-/blob/main/RFE%EF%BC%8BSymbolic_learn_for_HSQC_ver_3_for_paper.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#個々のデータをCSVで出力（採用ver.2）
import pandas as pd
from sklearn.feature_selection import RFE
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import numpy as np

def perform_rfe_and_evaluate_and_save(X, y, original_df, file_name_prefix):
    """
    RFEを実行し、指定された特徴量数(9-12)で抽出された特徴量を個別のCSVファイルに保存し、
    そのモデルの予測精度も評価する関数。

    このバージョンでは、RFEを全データで学習し、step=0.5で、n_features_to_select=9〜12を探索する。
    出力CSVに元のファイルのインデックスと'CA1'列を含める。
    """
    # データの欠損値チェックと補完
    if X.isnull().values.any():
        print(f"Warning: NaN values found in {file_name_prefix}. Imputing with the mean.")
        X = X.fillna(X.mean())

    # --- RFEのパラメータ設定 ---
    step = 0.5
    min_features_to_explore = 9
    max_features_to_explore = 12
    actual_max_features = X.shape[1]

    if actual_max_features < min_features_to_explore:
        print(f"Warning: {file_name_prefix} has only {actual_max_features} features, which is less than the requested minimum of {min_features_to_explore}.")
        return

    start_feature_count = max(1, min_features_to_explore)
    end_feature_count = min(actual_max_features, max_features_to_explore)

    if start_feature_count > end_feature_count:
        print(f"Warning: Invalid feature range for {file_name_prefix}. Start: {start_feature_count}, End: {end_feature_count}. Skipping RFE.")
        return

    print(f"--- Exploring features for {file_name_prefix} ({start_feature_count} to {end_feature_count} features) with step={step} ---")

    for n_features_to_select in range(start_feature_count, end_feature_count + 1):
        estimator = GradientBoostingClassifier(random_state=42)
        rfe = RFE(estimator=estimator, n_features_to_select=n_features_to_select, step=step)

        # RFEを全データで学習
        rfe.fit(X, y)

        # 選択された特徴量のデータフレームを作成
        selected_features_df = pd.DataFrame(rfe.transform(X), columns=X.columns.values[rfe.support_], index=X.index)

        # --- ここから評価ロジック ---
        # RFEで選択された特徴量でモデルを再学習
        X_selected = rfe.transform(X)
        model = GradientBoostingClassifier(random_state=42)
        model.fit(X_selected, y)

        # 予測と評価（全データで評価するため過剰適合の可能性あり）
        y_pred = model.predict(X_selected)
        y_prob = model.predict_proba(X_selected)[:, 1]

        accuracy = accuracy_score(y, y_pred)
        precision = precision_score(y, y_pred, zero_division=0)
        recall = recall_score(y, y_pred, zero_division=0)
        f1 = f1_score(y, y_pred, zero_division=0)
        auc = roc_auc_score(y, y_prob)

        print(f"  N_Features: {n_features_to_select:2d} | F1-score: {f1:.4f} | AUC: {auc:.4f}")

        # --- 元のCSVの最初の2列（インデックスとCA1列）を抽出して結合 ---
        # `original_df`にはインデックスが設定されていることを前提とする
        # 'CA1'列をコピー
        initial_columns = original_df[['CA1']].copy()

        # 抽出された特徴量と最初の2列を結合
        # `selected_features_df`と`initial_columns`は同じインデックスを持つため、そのまま結合可能
        output_df = pd.concat([initial_columns, selected_features_df], axis=1)

        # 出力ファイル名を生成
        output_file_name = f"{file_name_prefix.replace('.csv', '')}_RFE_N{n_features_to_select}_step{int(step*10)}.csv"

        # CSVファイルとして保存
        output_df.to_csv(output_file_name) # デフォルトでインデックスが保存される
        print(f"    Saved features for N={n_features_to_select} to {output_file_name}")


def main():
    # ファイルパスの定義
    file1_path = '****.csv'
    file2_path = '****.csv'

    # ファイル1の読み込みとRFE実行
    print(f"Processing {file1_path}...")
    try:
        df1 = pd.read_csv(file1_path, index_col=0)
        X1 = df1.drop(['CA1'], axis=1)
        y1 = df1['CA1']

        perform_rfe_and_evaluate_and_save(X1, y1, df1, file1_path)
    except FileNotFoundError:
        print(f"Error: {file1_path} not found. Please ensure the file is in the correct directory.")

    # ファイル2の読み込みとRFE実行
    print(f"\nProcessing {file2_path}...")
    try:
        df2 = pd.read_csv(file2_path, index_col=0)
        X2 = df2.drop(['CA1'], axis=1)
        y2 = df2['CA1']

        perform_rfe_and_evaluate_and_save(X2, y2, df2, file2_path)
    except FileNotFoundError:
        print(f"Error: {file2_path} not found. Please ensure the file is in the correct directory.")

if __name__ == "__main__":
    main()

In [None]:
#しきい値設定、上位3位を表示
# The code for HSQC
import pandas as pd
import numpy as np
!pip install gplearn
from gplearn.genetic import SymbolicRegressor
from sklearn.model_selection import train_test_split, ParameterSampler
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, roc_auc_score, r2_score
from sklearn.feature_selection import SelectKBest, f_regression
import random
from gplearn.functions import make_function

# ランダムシード固定
random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)

# データ読み込み
data = pd.read_csv('****.csv')
if 'Unnamed: 0' in data.columns:
    X = data.drop(['CA1', 'Unnamed: 0'], axis=1)
else:
    X = data.drop('CA1', axis=1)
y = data['CA1']

# データの欠損値チェックと補完
if X.isnull().values.any():
    print("Warning: NaN values found in the dataset. Imputing with the mean.")
    X = X.fillna(X.mean())

# データ分割
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=random_seed, stratify=y
)

X_train_scaled_df = X_train.copy()
X_test_scaled_df = X_test.copy()

# カスタム関数定義 (NaN 対策を含む)
def protected_square(x):
    return np.where(np.abs(x) < 1e10, x**2, 1e10)

def multiply2(x1, x2):
    return x1 * x2

def safe_divide(x1, x2):
    return np.where(np.abs(x2) < 1e-6, 1.0, x1 / x2)

def multiply3(x1, x2, x3):
    return x1 * x2 * x3

def protected_exp(x):
    return np.where(x < 100, np.exp(x), 1e10)

def protected_log(x):
    return np.where(x > 0, np.log(x), -1e10)

square_function = make_function(function=protected_square, arity=1, name='square')
multiply2_function = make_function(function=multiply2, arity=2, name='mul2')
safe_divide_function = make_function(function=safe_divide, arity=2, name='div')
multiply3_function = make_function(function=multiply3, arity=3, name='mul3')
exp_function = make_function(function=protected_exp, arity=1, name='exp')
log_function = make_function(function=protected_log, arity=1, name='log')

# 改善版：関数セットと複数の候補値を含むパラメータグリッド
function_set = ['add', 'sub', 'mul', 'abs', log_function, 'inv', 'neg',
                square_function,
                multiply2_function, safe_divide_function,
                multiply3_function,
                exp_function]

param_grid = {
    'n_features_to_select': [10, 20],  # 特徴量数を複数試す
    'function_set': [function_set],
    'population_size': [1000, 2000, 5000],  # 複数の候補値
    'generations': [100, 300, 500],  # 複数の候補値
    'tournament_size': [5, 10],  # 複数の候補値
    'stopping_criteria': [0.05],
    'p_crossover': [0.95],
    'p_subtree_mutation': [0.01],
    'p_hoist_mutation': [0.01],
    'p_point_mutation': [0.01],
    'max_samples': [0.9, 1.0],
    'parsimony_coefficient': [0.01],
}

# パラメータサンプリング
n_iter = 5 # 試行回数を増やして、より多くの組み合わせを試す
param_sampler = ParameterSampler(param_grid, n_iter=n_iter, random_state=random_seed)

all_results = []
# 0.1から0.9まで、0.1刻みの閾値で検証する
thresholds = np.arange(0.1, 1.0, 0.1)

for i, params in enumerate(param_sampler):
    print(f"\n--- Symbolic Regression Trial {i+1}/{n_iter} ---")
    print(f"Parameters: {params}")

    # 特徴量選択
    selector = SelectKBest(score_func=f_regression, k=params['n_features_to_select'])
    X_train_sel = selector.fit_transform(X_train_scaled_df, y_train)
    X_test_sel = selector.transform(X_test_scaled_df)
    selected_features = X_train.columns[selector.get_support(indices=True)].tolist()

    # モデル構築
    est = SymbolicRegressor(
        random_state=random_seed,
        function_set=params['function_set'],
        metric='mse',
        population_size=params['population_size'],
        generations=params['generations'],
        tournament_size=params['tournament_size'],
        stopping_criteria=params['stopping_criteria'],
        p_crossover=params['p_crossover'],
        p_subtree_mutation=params['p_subtree_mutation'],
        p_hoist_mutation=params['p_hoist_mutation'],
        p_point_mutation=params['p_point_mutation'],
        max_samples=params['max_samples'],
        parsimony_coefficient=params['parsimony_coefficient'],
        n_jobs=-1,
        feature_names=selected_features
    )

    est.fit(X_train_sel, y_train)
    y_pred = est.predict(X_test_sel)

    # 閾値の検証
    max_f1_for_current_model = -1.0
    best_threshold_for_current_model = None
    best_metrics_at_threshold = {}

    for threshold in thresholds:
        y_pred_binary = (y_pred > threshold).astype(int)
        f1 = f1_score(y_test, y_pred_binary, zero_division=0)

        if f1 > max_f1_for_current_model:
            max_f1_for_current_model = f1
            best_threshold_for_current_model = threshold
            best_metrics_at_threshold = {
                'accuracy': accuracy_score(y_test, y_pred_binary),
                'precision': precision_score(y_test, y_pred_binary, zero_division=0),
                'recall': recall_score(y_test, y_pred_binary, zero_division=0),
                'f1_score': f1
            }

    auc = roc_auc_score(y_test, y_pred)

    # モデルごとの結果を格納
    all_results.append({
        'rank_metric': max_f1_for_current_model + auc, # F1とAUCの合計でランキング
        'f1_score': max_f1_for_current_model,
        'auc': auc,
        'equation': str(est._program),
        'features': selected_features,
        'best_threshold': best_threshold_for_current_model,
        'metrics_at_best_threshold': best_metrics_at_threshold,
        'hyperparameters': {
            'n_features_to_select': params['n_features_to_select'],
            'population_size': params['population_size'],
            'generations': params['generations'],
            'tournament_size': params['tournament_size'],
            'p_crossover': params['p_crossover'],
            'p_subtree_mutation': params['p_subtree_mutation'],
            'p_hoist_mutation': params['p_hoist_mutation'],
            'p_point_mutation': params['p_point_mutation']
        }
    })

# 結果をランキング指標でソート
all_results.sort(key=lambda x: x['rank_metric'], reverse=True)

# トップ3のモデルを表示
print("\n" + "="*80)
print("--- Symbolic Regression: Top 3 Equations ---")
print("="*80)

for rank, result in enumerate(all_results[:3], 1):
    print(f"\n--- Rank {rank} Equation ---")
    print(f"  Equation: {result['equation']}")
    print(f"  Features: {result['features']}")
    print(f"  AUC: {result['auc']:.4f}")
    print(f"  Best F1-score: {result['f1_score']:.4f} (at threshold {result['best_threshold']:.1f})")
    print(f"  Other Metrics (at best threshold):")
    print(f"    Accuracy: {result['metrics_at_best_threshold']['accuracy']:.4f}")
    print(f"    Precision: {result['metrics_at_best_threshold']['precision']:.4f}")
    print(f"    Recall: {result['metrics_at_best_threshold']['recall']:.4f}")
    print(f"  Hyperparameters:")
    for key, value in result['hyperparameters'].items():
        print(f"    {key}: {value}")