In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from pycaret.classification import *
import os
import joblib
import shutil
import warnings
import matplotlib.pyplot as plt
import shap
import pandas as pd
from rdkit import Chem
from rdkit.Chem import MACCSkeys

warnings.filterwarnings('ignore', category=UserWarning, module='lightgbm')

# 세팅

## Data preprocessing

In [3]:
raw = pd.read_csv('../data/preprocessed/filtered_FTO_training_total_ignore3D_False.csv')
raw = raw[[col for col in raw.columns if not col.startswith('X')]]
raw['source'].value_counts()

def get_maccs(smiles):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return [None] * 167
    return list(MACCSkeys.GenMACCSKeys(mol))

maccs_df = pd.DataFrame(
    raw['canonical_SMILES'].apply(get_maccs).tolist(),
    columns=[f'X{i}' for i in range(167)]
)

maccs_md = pd.concat([raw, maccs_df], axis=1)

active_df = maccs_md[maccs_md['source'].isin(['active', 'assay_inactive'])]
decoy_df = maccs_md[maccs_md['source'] == 'decoy']

n_active = len(active_df)
decoy_5x = decoy_df.sample(n=n_active * 5, random_state=42)
decoy_10x = decoy_df.sample(n=n_active * 10, random_state=42)

maccs_5x = pd.concat([active_df, decoy_5x]).reset_index(drop=True)
maccs_10x = pd.concat([active_df, decoy_10x]).reset_index(drop=True)

## 사용할 descriptor

In [4]:
selected_descriptor = pd.read_csv('../data/descriptor_selection.csv')

file_md_list = {}
for column in selected_descriptor.columns:
    filename = column
    selected_columns = selected_descriptor[column].iloc[0:].dropna().tolist()
    if filename and selected_columns:
        file_md_list[filename] = selected_columns

## 저장 함수

In [5]:
def safe_move_file(src, dst):
    if os.path.exists(src):
        try:
            os.makedirs(os.path.dirname(dst), exist_ok=True)
            shutil.move(src, dst)
            return True
        except Exception as e:
            print(f"파일 이동 실패: {src} -> {dst}, 에러: {e}")
            return False
    return False

def safe_save_csv(df, path):
    try:
        os.makedirs(os.path.dirname(path), exist_ok=True)
        df.to_csv(path, index=False)
        return True
    except Exception as e:
        print(f"CSV 저장 실패: {path}, 에러: {e}")
        return False

In [6]:
default_filenames = {
    'auc': 'AUC.png',
    'confusion_matrix': 'Confusion Matrix.png',
    'learning': 'Learning Curve.png',
    'feature': 'Feature Importance.png',
    'error': 'Prediction Error.png',
    'calibration': 'Calibration Curve.png'
}

target_models = ['lr', 'et', 'gbc', 'lightgbm', 'svm', 'rf', 'ada']

In [7]:
data_dir = os.path.join("..", "data", "preprocessed")
result_dir = os.path.join("..", "result")

# 모델 학습 및 평가

In [None]:
all_blended_results = {}

for ratio in ['5x', '10x']:
    file_name = f'descriptors_filtered_FTO_training_{ratio}_ignore3D_False.csv'
    base_path = f'FTO_MACCS/{ratio}_w3D'

    if ratio == '5x':
        df = maccs_5x
    else:
        df = maccs_10x
    print(f"{'='*50}{base_path} start{'='*50}")

    full_result_path = os.path.join(result_dir, base_path)
    models_dir      = os.path.join(full_result_path, "models")
    plots_dir       = os.path.join(full_result_path, "plots")
    blend_models_dir = os.path.join(full_result_path, "blend_models")
    blend_plots_dir  = os.path.join(full_result_path, "blend_plots")
    shap_dir        = os.path.join(full_result_path, "SHAP")

    for dir_path in [full_result_path, models_dir, plots_dir, blend_models_dir, blend_plots_dir, shap_dir]:
        os.makedirs(dir_path, exist_ok=True)

    md_cols = file_md_list[file_name]
    fp_cols = [f'X{i+1}' for i in range(166)]
    filtered_df = df[['potency'] + fp_cols + md_cols]

    X = filtered_df.drop('potency', axis=1)
    Y = filtered_df['potency']

    x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.1, random_state=42, stratify=Y)
    df_train = pd.concat([x_train, y_train], axis=1)

    setup(
        data=df_train,
        target='potency',
        session_id=42,
        train_size=0.9,
        fold=10,
        normalize=True,
        fix_imbalance=True,
        remove_outliers=True,
        n_jobs=1,
        verbose=False,
    )

    # 전처리 파이프라인 (SHAP용)
    pipeline = get_config('pipeline')
    X_train_transformed = pipeline.transform(x_train)
    X_test_transformed  = pipeline.transform(x_test)

    if hasattr(X_test_transformed, 'columns'):
        feature_names = X_test_transformed.columns.tolist()
        X_train_df = X_train_transformed
        X_test_df  = X_test_transformed
    else:
        feature_names = [f'f{i}' for i in range(X_test_transformed.shape[1])]
        X_train_df = pd.DataFrame(X_train_transformed, columns=feature_names)
        X_test_df  = pd.DataFrame(X_test_transformed,  columns=feature_names)

    # ── 개별 모델 학습 ──────────────────────────────────────
    summary_data = []
    trained_models = {}
    individual_results = {}

    for model_id in target_models:
        print(f"Creating & tuning {model_id}...")
        if model_id == 'lightgbm':
            lgb_params = {
                'boosting_type': 'gbdt', 'num_leaves': 31, 'learning_rate': 0.1,
                'feature_fraction': 0.8, 'bagging_fraction': 0.8, 'bagging_freq': 5,
                'min_child_samples': 20, 'random_state': 42, 'n_estimators': 100, 'verbosity': -1
            }
            model = create_model(model_id, verbose=False, **lgb_params)
        else:
            model = create_model(model_id, verbose=False)

        tuned_model = tune_model(
            model, optimize='F1', n_iter=50, fold=5, choose_better=True, verbose=False
        )
        results = pull()
        model_name = tuned_model.__class__.__name__

        trained_models[model_id] = tuned_model
        individual_results[model_id] = results

        joblib.dump(tuned_model, os.path.join(models_dir, f"{ratio}_{model_name}_model.pkl"))
        safe_save_csv(results, os.path.join(full_result_path, f"{ratio}_{model_name}_evaluation.csv"))
        safe_save_csv(results, os.path.join(full_result_path, f"{ratio}_{model_id}_individual_results.csv"))
        print(f"{model_id} ({model_name}) 저장 완료 - F1: {results['F1'].mean():.4f}")

        for plot_type, default_name in default_filenames.items():
            try:
                plot_model(tuned_model, plot=plot_type, save=True, verbose=False)
                if os.path.exists(default_name):
                    shutil.move(default_name, os.path.join(plots_dir, f"{ratio}_{model_name}_{plot_type}.png"))
            except Exception as e:
                with open(os.path.join(plots_dir, f"{ratio}_{model_name}_{plot_type}_error.txt"), 'w') as f:
                    f.write(f"Plot Type: {plot_type}\nModel: {model_name}\nError: {str(e)}\n")

        numeric_cols = results.select_dtypes(include=[np.number]).columns
        avg_row = results[numeric_cols].mean().to_dict()
        std_row = results[numeric_cols].std().to_dict()
        avg_row.update({'Model': model_name, 'Type': 'Mean'})
        std_row.update({'Model': model_name, 'Type': 'Std'})
        summary_data.extend([avg_row, std_row])

    if summary_data:
        combined_summary = pd.DataFrame(summary_data)
        safe_save_csv(combined_summary, os.path.join(full_result_path, f"{ratio}_summary_evaluation.csv"))

    # ── F1 기준 상위 4개 모델 선정 ─────────────────────────
    model_f1_scores = {mid: individual_results[mid]['F1'].mean() for mid in individual_results}
    top4_model_ids  = sorted(model_f1_scores, key=model_f1_scores.get, reverse=True)[:4]
    print(f"\n[{ratio}] 블렌딩 선정 모델 (F1 기준 상위 4개):")
    for mid in top4_model_ids:
        print(f"  {mid}: {model_f1_scores[mid]:.4f}")

    fresh_models = [trained_models[mid] for mid in top4_model_ids]
    model_names  = [trained_models[mid].__class__.__name__ for mid in top4_model_ids]

    # ── 블렌딩 ─────────────────────────────────────────────
    print(f"Blending {ratio}...")
    blended_model = blend_models(
        estimator_list=fresh_models, verbose=False, fold=3, method='soft'
    )
    blend_results = pull()

    joblib.dump(blended_model, os.path.join(blend_models_dir, f"{ratio}_blended_model2.pkl"))
    safe_save_csv(blend_results, os.path.join(full_result_path, f"{ratio}_blend_evaluation.csv"))

    test_data = pd.concat([x_test, y_test], axis=1)
    try:
        final_predictions = predict_model(blended_model, data=test_data, verbose=False)
        final_metrics = pull()
        safe_save_csv(final_metrics, os.path.join(full_result_path, f"{ratio}_blend_final_metrics.csv"))
        print("최종 예측 완료")
    except Exception as e:
        print(f"Final prediction failed: {e}")
        final_metrics = None

    for plot_type in ['auc', 'confusion_matrix', 'learning', 'feature']:
        try:
            plot_model(blended_model, plot=plot_type, save=True, verbose=False)
            default_name = default_filenames.get(plot_type, f'{plot_type}.png')
            if os.path.exists(default_name):
                safe_move_file(default_name, os.path.join(blend_plots_dir, f"{ratio}_blend_{plot_type}.png"))
        except Exception as e:
            with open(os.path.join(blend_plots_dir, f"{ratio}_blend_{plot_type}_error.txt"), 'w') as f:
                f.write(f"Plot Type: {plot_type}\nModel: Blended\nError: {str(e)}\n")

    # ── SHAP (AUC 기준 Best 모델) ───────────────────────────
    mean_df  = combined_summary[combined_summary['Type'] == 'Mean']
    best_row = mean_df.loc[mean_df['AUC'].idxmax()]
    best_model_name = best_row['Model']
    best_model_id   = [mid for mid, m in trained_models.items()
                       if m.__class__.__name__ == best_model_name][0]
    best_model = trained_models[best_model_id]
    print(f"\nSHAP - Best 모델: {best_model_name} (AUC: {best_row['AUC']:.4f})")

    if 'LogisticRegression' in best_model_name:
        explainer = shap.LinearExplainer(
            best_model, X_train_df, feature_perturbation="correlation_dependent"
        )
        shap_values_class1 = explainer.shap_values(X_test_df)
        print("LinearExplainer 사용")
    else:
        try:
            explainer = shap.TreeExplainer(best_model)
            shap_values = explainer.shap_values(X_test_df)
            shap_values_class1 = shap_values[1] if isinstance(shap_values, list) else shap_values
            print("TreeExplainer 사용")
        except Exception as e:
            print(f"TreeExplainer 실패: {e} -> KernelExplainer 사용")
            X_background = shap.sample(X_train_df, 50)
            explainer = shap.KernelExplainer(best_model.predict_proba, X_background)
            shap_values = explainer.shap_values(X_test_df, nsamples=100)
            if isinstance(shap_values, list):
                shap_values_class1 = shap_values[1]
            elif isinstance(shap_values, np.ndarray) and shap_values.ndim == 3:
                shap_values_class1 = shap_values[:, :, 1]
            else:
                shap_values_class1 = shap_values
            print("KernelExplainer 사용")

    plt.figure(figsize=(12, 8))
    shap.summary_plot(shap_values_class1, X_test_df, feature_names=feature_names, show=False, max_display=20)
    plt.tight_layout()
    plt.savefig(os.path.join(shap_dir, f'shap_{best_model_name}_summary.png'), dpi=300, bbox_inches='tight')
    plt.close()

    plt.figure(figsize=(10, 8))
    shap.summary_plot(shap_values_class1, X_test_df, feature_names=feature_names,
                      plot_type='bar', show=False, max_display=20)
    plt.tight_layout()
    plt.savefig(os.path.join(shap_dir, f'shap_{best_model_name}_bar.png'), dpi=300, bbox_inches='tight')
    plt.close()

    mean_abs_shap = np.abs(shap_values_class1).mean(axis=0)
    if mean_abs_shap.ndim > 1:
        mean_abs_shap = mean_abs_shap.mean(axis=-1)
    mean_abs_shap = mean_abs_shap.flatten()
    pd.DataFrame({
        'feature': feature_names,
        'mean_abs_shap': mean_abs_shap
    }).sort_values('mean_abs_shap', ascending=False).to_csv(
        os.path.join(shap_dir, f'shap_{best_model_name}_importance.csv'), index=False
    )
    print(f"SHAP 저장 완료 -> {shap_dir}")

    all_blended_results[ratio] = {
        'blend_results': blend_results,
        'final_metrics': final_metrics,
        'model_names': model_names,
        'top4_model_ids': top4_model_ids,
        'model_f1_scores': model_f1_scores,
        'individual_results': individual_results,
        'best_shap_model': best_model_name
    }

    print(f"{'='*50}{base_path} completed{'='*50}")

# ── 최종 요약 출력 ──────────────────────────────────────────
print("\n전체 결과 요약:")
for ratio, res in all_blended_results.items():
    print(f"\n[{ratio}]")
    print(f"  블렌딩 모델: {', '.join(res['top4_model_ids'])}")
    if 'F1' in res['blend_results'].columns:
        print(f"  블렌드 평균 F1: {res['blend_results']['F1'].mean():.4f}")
    print(f"  SHAP Best 모델: {res['best_shap_model']}")
    print("  개별 모델 F1:")
    for mid in res['top4_model_ids']:
        print(f"    {mid}: {res['model_f1_scores'][mid]:.4f}")

Creating & tuning lr...
lr (LogisticRegression) 저장 완료 - F1: 0.6514
Creating & tuning et...
et (ExtraTreesClassifier) 저장 완료 - F1: 0.6982
Creating & tuning gbc...
gbc (GradientBoostingClassifier) 저장 완료 - F1: 0.7171
Creating & tuning lightgbm...
lightgbm (LGBMClassifier) 저장 완료 - F1: 0.7144
Creating & tuning svm...
svm (SGDClassifier) 저장 완료 - F1: 0.6426
Creating & tuning rf...
rf (RandomForestClassifier) 저장 완료 - F1: 0.6844
Creating & tuning ada...
ada (AdaBoostClassifier) 저장 완료 - F1: 0.6515

[5x] 블렌딩 선정 모델 (F1 기준 상위 4개):
  gbc: 0.7171
  lightgbm: 0.7144
  et: 0.6982
  rf: 0.6844
Blending 5x...
최종 예측 완료

SHAP - Best 모델: ExtraTreesClassifier (AUC: 0.8387)
TreeExplainer 사용
SHAP 저장 완료 -> ..\result\FTO_Final/5x_w3D\SHAP
Creating & tuning lr...
lr (LogisticRegression) 저장 완료 - F1: 0.6141
Creating & tuning et...
et (ExtraTreesClassifier) 저장 완료 - F1: 0.6806
Creating & tuning gbc...
gbc (GradientBoostingClassifier) 저장 완료 - F1: 0.7018
Creating & tuning lightgbm...
lightgbm (LGBMClassifier) 저장 완료 - F1

<Figure size 1200x800 with 0 Axes>

<Figure size 1000x800 with 0 Axes>