In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from catboost import CatBoostClassifier
from joblib import dump
import xgboost as xgb
import os as OS
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from scipy.stats import randint, uniform
import shap
import json

Dataset

In [None]:
def dataset_import(file_path, mode, feature_name:list, os_tag:str, dfs_tag:str):
    dataset = pd.read_excel(file_path)
    included_col = feature_name[:]
    if os_tag != "":
        included_col.append(os_tag)
    if dfs_tag != "":
        included_col.append(dfs_tag)
    dataset.dropna(inplace=True, subset=included_col)
    dataset.head()

    assert mode == "all" or "hr_pos" or "hr_neg"
    if mode == "hr_pos":
        dataset = dataset[dataset["HR"]==1]
    elif mode == "hr_neg":
        dataset = dataset[dataset["HR"]==0]
    elif mode == "all":
        pass

    data = dataset[feature_name].values.tolist()
    if os_tag != "":
        os = dataset[os_tag].values.tolist()
    else:
        os = []
    
    if dfs_tag != "":
        dfs = dataset[dfs_tag].values.tolist()
    else:
        dfs = []
    return data, os, dfs

Models

In [None]:
def random_forest(data, target, feature_name, output_dir="", mode="", is_save=False, is_plot=False):
    x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=0.2, stratify=target, random_state=240225)

    param_dist = {
        'n_estimators': randint(50, 200),
        'max_depth': [None] + list(range(3, 20)),
        'min_samples_split': randint(2, 11),
        'min_samples_leaf': randint(1, 11),
        'bootstrap': [True, False],
        'criterion': ['gini', 'entropy']
    }
    rf_model = RandomForestClassifier()
    random_search = RandomizedSearchCV(estimator=rf_model, param_distributions=param_dist, n_iter=100, cv=5, n_jobs=50, scoring="roc_auc")
    random_search.fit(x_train, y_train)

    with open(f"./best_param/randomForest_{mode}.json", "w") as file:
        file.write(json.dumps(random_search.best_params_))

    print(f"5-Fold Evaluation roc_auc_score: {random_search.best_score_}")

    # rf_class = RandomForestClassifier(**random_search.best_params_)
    # rf_class.fit(x_train, y_train)
    rf_class = random_search.best_estimator_
    y_pred = rf_class.predict_proba(x_test)[:,1]

    print(f"Test roc_auc_score: {roc_auc_score(y_test, y_pred)}")

    if is_plot:
        # feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
        explainer = shap.TreeExplainer(rf_class)
        data_pd = pd.DataFrame(data, columns=feature_name)
        shap_values = explainer.shap_values(data_pd)
        print("Random Forest Shap Plot")
        shap.summary_plot(shap_values[1], data_pd)
        # for class_index in range(len(shap_values)):
        #     shap.summary_plot(shap_values[class_index], data_pd)

    if is_save:
        dump(rf_class, f"{output_dir}/randomForest_{mode}.joblib")

In [None]:
def xgboost(data, target, feature_name,  output_dir="", mode="", is_save=False, is_plot=False):
    x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=0.2, stratify=target,random_state=240225)

    param_dist = {
        'n_estimators': randint(50, 200),
        'max_depth': randint(3, 20),
        'learning_rate': uniform(0.01, 0.3),
        'subsample': uniform(0.5, 0.5),
        'colsample_bytree': uniform(0.5, 0.5)
    }

    xgb_model = xgb.XGBClassifier()
    random_search = RandomizedSearchCV(estimator=xgb_model, param_distributions=param_dist, n_iter=100, cv=5, n_jobs=50, scoring="roc_auc")
    random_search.fit(x_train, y_train)

    with open(f"./best_param/xgBoost_{mode}.json", "w") as file:
        file.write(json.dumps(random_search.best_params_))

    print(f"5-Fold Evaluation roc_auc_score: {random_search.best_score_}")

    # bst = xgb.XGBClassifier(**random_search.best_params_)
    # bst.fit(x_train, y_train)
    bst = random_search.best_estimator_
    y_pred = bst.predict_proba(x_test)[:,1]

    print(f"Test roc_auc_score: {roc_auc_score(y_test, y_pred)}")

    if is_plot:
        # feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
        explainer = shap.TreeExplainer(bst)
        data_pd = pd.DataFrame(data, columns=feature_name)
        shap_values = explainer(data_pd)
        print("XGBoost Shap Plot")
        shap.summary_plot(shap_values, data_pd)

    if is_save:
        bst.save_model(f"{output_dir}/xgBoost_{mode}.json")

In [None]:
def catboost(data, target, feature_name,  output_dir="", mode="", is_save=False, is_plot=False):
    x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=0.2, stratify=target,random_state=240225)
    
    param_dist = {
        'iterations': randint(50, 200),
        'depth': randint(3, 10),
        'learning_rate': uniform(0.01, 0.1),
        'l2_leaf_reg': uniform(1, 10),
        'border_count': randint(32, 255)
    }

    catboost_model = CatBoostClassifier()
    random_search = RandomizedSearchCV(estimator=catboost_model, param_distributions=param_dist, n_iter=100, cv=5, n_jobs=50, scoring="roc_auc")
    random_search.fit(x_train, y_train)

    with open(f"./best_param/catBoost_{mode}.json", "w") as file:
        file.write(json.dumps(random_search.best_params_))

    print(f"5-Fold Evaluation roc_auc_score: {random_search.best_score_}")

    best_catboost_model = random_search.best_estimator_
    y_pred = best_catboost_model.predict_proba(x_test)[:,1]

    print(f"Test roc_auc_score: {roc_auc_score(y_test, y_pred)}")

    if is_plot:
        # feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
        explainer = shap.TreeExplainer(best_catboost_model)
        data_pd = pd.DataFrame(data, columns=feature_name)
        shap_values = explainer(data_pd)
        print("CatBoost Shap Plot")
        shap.summary_plot(shap_values, data_pd)

    if is_save:
        best_catboost_model.save_model(f"{output_dir}/catBoost_{mode}.json")

In [None]:
def mlp(data, target, feature_name,  output_dir="", mode="", is_save=False, is_plot=False):
    x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=0.2, stratify=target,random_state=240225)

    param_dist = {
        'hidden_layer_sizes': [(20,40,40,20), (40, 40), (20, 40, 20), (20, 30, 30, 20)],
        'activation': ['logistic', 'tanh', 'relu'],
        'solver': ['sgd', 'adam'],
        'alpha': uniform(0.0001, 0.1),
        'learning_rate_init': uniform(0.001, 0.01)
    }

    mlp_model = MLPClassifier(max_iter=1000)
    random_search = RandomizedSearchCV(estimator=mlp_model, param_distributions=param_dist, n_iter=100, cv=5, n_jobs=50, scoring="roc_auc")
    random_search.fit(x_train, y_train)

    with open(f"./best_param/mlp_{mode}.json", "w") as file:
        file.write(json.dumps(random_search.best_params_))

    print(f"5-Fold Evaluation roc_auc_score: {random_search.best_score_}")

    best_mlp_model = random_search.best_estimator_
    y_pred = best_mlp_model.predict_proba(x_test)[:,1]

    print(f"Test roc_auc_score: {roc_auc_score(y_test, y_pred)}")

    if is_plot:
        # feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
        data_pd = pd.DataFrame(data, columns=feature_name)
        
        explainer = shap.KernelExplainer(best_mlp_model.predict_proba, np.array(x_train))
        shap_values = explainer.shap_values(np.array(x_test), nsamples=100)
        print("MLP Shap Plot")
        shap.summary_plot(shap_values[1], np.array(data_pd), feature_names=feature_name)

    if is_save:
        dump(best_mlp_model, f"{output_dir}/mlp_{mode}.joblib")

In [None]:
from sklearn.model_selection import cross_val_score
from bayes_opt import BayesianOptimization

def mlp_bayes(data, target, feature_name,  output_dir="", mode="", is_save=False, is_plot=False):
    x_train, x_test, y_train, y_test = train_test_split(data, target, test_size=0.2, stratify=target,random_state=240225)

    # 定义一个目标函数，用于贝叶斯优化
    def objective_function(layer_0, layer_1, layer_2, lr, alpha):
        model = MLPClassifier(hidden_layer_sizes=(int(layer_0), int(layer_1), int(layer_2)), learning_rate_init=lr, alpha=alpha, random_state=42, max_iter=1000)
        scores = cross_val_score(model, x_train, y_train, cv=5, scoring='roc_auc')
        return scores.mean()

    param_bounds = {
        'layer_0': (12, 30),
        'layer_1': (20, 40),
        'layer_2': (20, 40),
        'lr': (0.001, 0.1),
        'alpha': (0.01, 1),
    }

    optimizer = BayesianOptimization(
        f=objective_function,
        pbounds=param_bounds,
        random_state=42
    )

    optimizer.maximize(
        init_points=10,
        n_iter=40
    )

    best_params = optimizer.max['params']

    with open(f"./best_param/mlp_{mode}.json", "w") as file:
        file.write(json.dumps(best_params))

    print(f"5-Fold Evaluation roc_auc_score: {objective_function(best_params['layer_0'], best_params['layer_1'], best_params['layer_2'], best_params['lr'], best_params['alpha'])}")

    best_mlp_model = MLPClassifier(hidden_layer_sizes=(int(best_params['layer_0']), int(best_params['layer_1']), int(best_params['layer_2'])), learning_rate_init=best_params['lr'], alpha=best_params['alpha'], random_state=42, max_iter=1000)
    best_mlp_model.fit(x_train, y_train)
    y_pred = best_mlp_model.predict_proba(x_test)[:,1]

    print(f"Test roc_auc_score: {roc_auc_score(y_test, y_pred)}")

    if is_plot:
        # feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
        data_pd = pd.DataFrame(data, columns=feature_name)
        
        explainer = shap.KernelExplainer(best_mlp_model.predict_proba, np.array(x_train))
        shap_values = explainer.shap_values(np.array(x_test), nsamples=100)
        print("MLP Shap Plot")
        shap.summary_plot(shap_values[1], np.array(data_pd), feature_names=feature_name)

    if is_save:
        dump(best_mlp_model, f"{output_dir}/mlp_{mode}.joblib")

Main

In [None]:
feature_name = ["Her-2 status", "Age", "Histologic type", "Intravascular cancer thrombus", "Endocrine therapy", "Radiation therapy", "Her-2 target therapy", "Ki67", "Stage"]
mode = "all"
output_dir = "./model_dict_os5"
file_path = "./dataset.xlsx"
os_tag = "OS (5 years)"
dfs_tag = ""
# dfs_tag = "DFS (10 years)"

if not OS.path.exists(output_dir):
    OS.makedirs(output_dir)

data, os, dfs = dataset_import(
    file_path=file_path,
    mode=mode,
    feature_name=feature_name,
    os_tag=os_tag,
    dfs_tag=dfs_tag
)

print(f"Dataset import suceess. Mode is {mode}")

In [None]:
mlp_bayes(data=data, target=os, feature_name=feature_name, mode=mode, output_dir=output_dir, is_plot=True, is_save=True)

In [None]:
random_forest(data=data, target=os, feature_name=feature_name, mode=mode, output_dir=output_dir, is_plot=True, is_save=True)

In [None]:
xgboost(data=data, target=os, feature_name=feature_name, mode=mode, output_dir=output_dir, is_plot=True, is_save=True)

In [None]:
catboost(data=data, target=os, feature_name=feature_name, mode=mode, output_dir=output_dir, is_plot=True, is_save=True)

In [None]:
mlp(data=data, target=os, feature_name=feature_name, mode=mode, output_dir=output_dir, is_plot=False, is_save=False)