In [None]:
from AMPpred_MFA.lib.Data import *
from AMPpred_MFA.lib.Visualization import colorful_print, current_time, draw_roc
from AMPpred_MFA.lib.Encoding import AAC
from AMPpred_MFA.lib.Visualization import *
from sklearn.ensemble import (
    RandomForestClassifier,
    AdaBoostClassifier,
    StackingClassifier,
)
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LogisticRegression
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import joblib
import matplotlib as mpl

mpl.rcParams.update({"font.size": 26})  # 设置全局字体大小
CPU_NUM_CORES = joblib.cpu_count(only_physical_cores=True)
print(f"Number of physical cores: {CPU_NUM_CORES}")

In [None]:
DT = "Decision Tree"
RF = "Random Forest"
ADABOOST = "AdaBoost"
XGBOOST = "XGBoost"
STACKING = "Stacking"

params = {
    DT: {
        "criterion": "gini",
        "max_depth": 6,
        "max_features": None,
        "min_samples_leaf": 4,
        "min_samples_split": 10,
    },
    RF: {
        "criterion": "entropy",
        "max_depth": None,
        "max_features": "sqrt",
        "min_samples_leaf": 1,
        "min_samples_split": 2,
        "n_estimators": 300,
    },
    ADABOOST: {"algorithm": "SAMME.R", "learning_rate": 0.1, "n_estimators": 300},
    XGBOOST: {
        "colsample_bytree": 0.6,
        "gamma": 0,
        "learning_rate": 0.1,
        "max_depth": 10,
        "min_child_weight": 1,
        "n_estimators": 300,
        "subsample": 1,
    },
}

models = {
    DT: DecisionTreeClassifier(),
    RF: RandomForestClassifier(),
    ADABOOST: AdaBoostClassifier(),
    XGBOOST: XGBClassifier(),
    STACKING: StackingClassifier(
        estimators=[
            (RF, RandomForestClassifier(**params[RF])),
            (ADABOOST, AdaBoostClassifier(**params[ADABOOST])),
            (XGBOOST, XGBClassifier(**params[XGBOOST])),
        ],
        final_estimator=LogisticRegression(),
    ),
}


In [None]:
# 训练并返回模型
def ensembles_training(models: dict, file_path_train, feature_function):
    X_train, y_train = build_dataset_from_format(file_path_train, feature_function)
    for name in models:
        print("{} is training...".format(name))
        models[name].fit(X_train, y_train)
    return models



# 测试模型并返回评估指标
def ensembles_testing(models: dict, file_path_test, feature_function, roc_flag=False):
    test_metrics = {}
    test_roc = {}
    X_test, y_test = build_dataset_from_format(file_path_test, feature_function)
    for name in models:
        print("{} is testing...".format(name))
        cr = {}
        model = models[name]
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]
        cm = metrics.confusion_matrix(y_test, y_pred)
        auc = metrics.roc_auc_score(y_test, y_pred_proba)
        TN, FP, FN, TP = cm[0, 0], cm[0, 1], cm[1, 0], cm[1, 1]
        cr["AUC"] = auc
        cr["Acc"] = (TN + TP) / (TN + FP + FN + TP)
        cr["MCC"] = ((TP * TN) - (FP * FN)) / (
            np.sqrt((TP + FN) * (TP + FP) * (TN + FP) * (TN + FN))
        )
        cr["Sn"] = TP / (TP + FN)
        cr["Sp"] = TN / (TN + FP)
        cr["CM"] = str(cm.tolist())
        test_metrics[name] = cr

        fpr, tpr, _ = metrics.roc_curve(y_test, y_pred_proba)
        test_roc[name] = (auc, fpr, tpr)
    if roc_flag:
        return test_metrics, test_roc
    return test_metrics



# 将模型训练并测试一次，返回测试集上的性能评估指标
def ensembles_training_testing(
    models: dict, file_path_train, file_path_test, feature_function
):
    models = ensembles_training(models, file_path_train, feature_function)
    test_metrics = ensembles_testing(models, file_path_test, feature_function)


    return test_metrics


# 绘制不同数据集上的roc，key为方法名，value为3元组:(auc,[tpr],[fpr])
def draw_rocs(rocs, title="ROC curve"):
    colors = plt.get_cmap("tab20").colors
    colors_dict = {
        method: colors[i % len(colors)] for i, method in enumerate(rocs.keys())
    }
    roc_fig = plt.figure(figsize=(12, 12))
    fig = plt.subplot(1, 1, 1)
    fig.set_xlim([0.0, 1.0])
    fig.set_ylim([0.0, 1.0])
    fig.set_title(title, fontsize=36)
    fig.set_xlabel("False Positive Rate")
    fig.set_ylabel("True Positive Rate")
    fig.plot([0, 1], [0, 1], linestyle="--", lw=2, color="r", label="Random", alpha=0.8)

    roc_auc = [(method, rocs[method][0]) for method in rocs.keys()]
    roc_auc.sort(key=lambda x: x[1])
    roc_auc = dict(roc_auc)

    for method in roc_auc:
        fpr, tpr = rocs[method][1], rocs[method][2]
        fig.plot(
            fpr,
            tpr,
            lw=2,
            color=colors_dict[method],
            label="{} (AUC = {:.3f})".format(method, roc_auc[method]),
        )

    fig.legend(loc="lower right")
    return roc_fig

### 5次独立重复实验

In [None]:
REPEAT_TIMES = 5
path_train_base = "./dataset/train/{}_trial/train.fasta"
path_test_base = "./dataset/test/our_testset/{}_trial/test.fasta"
path_saving_base = "./result/ensembles_in_{}_trials".format(REPEAT_TIMES)
os.makedirs(path_saving_base, exist_ok=True)

metrics_total = {name: [] for name in models}
for i in range(REPEAT_TIMES):
    print("{}\nThe {} times training and testing...".format("=" * 50, i + 1))
    file_path_train = path_train_base.format(i + 1)
    file_path_test = path_test_base.format(i + 1)
    metrics_current = ensembles_training_testing(
        models, file_path_train, file_path_test, AAC
    )
    for name in metrics_current:
        metrics_total[name].append(metrics_current[name])

for name in models:
    save_path = os.path.join(path_saving_base, name + ".csv")
    df_metrics = pd.DataFrame(metrics_total[name])
    df_mean = []
    for col in df_metrics:
        if col == "CM":
            df_mean.append(
                np.array([eval(i) for i in df_metrics[col].tolist()])
                .mean(axis=0)
                .astype(int)
                .tolist()
            )
        else:
            df_mean.append(
                str(round(df_metrics[col].mean(), 3))
                + "±"
                + str(round(df_metrics[col].std(), 3))
            )
    df_metrics.insert(
        0,
        "Trial Number",
        ["{} th".format(i + 1) for i in range(len(df_metrics))],
        allow_duplicates=False,
    )
    df_metrics.loc[len(df_metrics)] = ["Mean"] + df_mean
    df_metrics.to_csv(save_path, index=False, encoding="ansi")
    print(
        "{}'s training and testing result has been saved in: {}".format(name, save_path)
    )

### 10折交叉验证

In [None]:
FOLD_TIMES = 10
path_train_base = "./dataset/10_fold_validation/{}_fold/train.fasta"
path_test_base = "./dataset/10_fold_validation/{}_fold/test.fasta"
path_saving_base = "./result/ensembles_in_{}_fold_cross_validation".format(
    FOLD_TIMES)
os.makedirs(path_saving_base, exist_ok=True)

metrics_total = {name: [] for name in models}
for i in range(FOLD_TIMES):
    print(
        "{}\nThe {}-fold cross validation training and testing...".format(
            "=" * 50, i + 1
        )
    )
    file_path_train = path_train_base.format(i + 1)
    file_path_test = path_test_base.format(i + 1)
    metrics_current = ensembles_training_testing(
        models, file_path_train, file_path_test, AAC
    )
    for name in metrics_current:
        metrics_total[name].append(metrics_current[name])

for name in models:
    save_path = os.path.join(path_saving_base, name + ".csv")
    df_metrics = pd.DataFrame(metrics_total[name])
    df_mean = []
    for col in df_metrics:
        if col == "CM":
            df_mean.append(
                np.array([eval(i) for i in df_metrics[col].tolist()])
                .mean(axis=0)
                .astype(int)
                .tolist()
            )
        else:
            df_mean.append(
                str(round(df_metrics[col].mean(), 3))
                + "±"
                + str(round(df_metrics[col].std(), 3))
            )
    df_metrics.insert(
        0,
        "Fold Number",
        ["{}".format(i + 1) for i in range(len(df_metrics))],
        allow_duplicates=False,
    )
    df_metrics.loc[len(df_metrics)] = ["Mean"] + df_mean
    df_metrics.to_csv(save_path, index=False, encoding="ansi")
    print(
        "{}'s training and testing result has been saved in: {}".format(
            name, save_path)
    )

### 在Xu等人的数据集上的测试

In [None]:
path_train = "./dataset/train/1_trial/train.fasta"
path_test_base = "./dataset/test/other_testset/"
path_saving_base = "./result/ensembles_in_Xu_testset"
path_saving_roc = os.path.join(path_saving_base, "roc.png")

# 在原始数据集上训练模型
models = ensembles_training(models, path_train, AAC)

# 在xu等人的数据集上测试模型
testset_names = []
metrics_total = {name: [] for name in models}
roc_total = {}
for testset_name in os.listdir(path_test_base):
    print("\n{}{}{}".format("=" * 20, testset_name, "=" * 20))
    file_path_test = os.path.join(path_test_base, testset_name, "test.fasta")
    metrics_current, roc_current = ensembles_testing(
        models, file_path_test, AAC, roc_flag=True
    )
    for name in metrics_current:
        metrics_total[name].append(metrics_current[name])
    roc_total[testset_name] = roc_current
    testset_names.append(testset_name)

# 保存模型的性能评估指标
for name in models:
    save_dir = os.path.join(path_saving_base, name)
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, name + ".csv")
    df_metrics = pd.DataFrame(metrics_total[name])
    df_metrics.insert(
        0,
        "Test set",
        testset_names,
        allow_duplicates=False,
    )

    df_metrics.to_csv(save_path, index=False, encoding="ansi")

    print("{}'s testing result has been saved in: {}".format(name, save_path))

# 绘制ROC曲线
fig_roc, ax = plt.subplots(2, 4, figsize=(20, 10))
ax = ax.flatten()
for i, testset_name in enumerate(roc_total):
    fig_current = draw_rocs(roc_total[testset_name], testset_name)
    fig_current.subplots_adjust(left=0.1, right=0.95, top=0.95, bottom=0.1)
    fig_current.canvas.draw()
    im_current = Image.frombytes(
        "RGB", fig_current.canvas.get_width_height(), fig_current.canvas.tostring_rgb()
    )
    ax[i].imshow(im_current)
    ax[i].set_axis_off()
    plt.close(fig_current)
fig_roc.tight_layout()
plt.subplots_adjust(wspace=0, hspace=0)
path_saving_roc = os.path.join(path_saving_base, "roc.png")
fig_roc.savefig(path_saving_roc, dpi=300, bbox_inches="tight")
print("ROC figure has been saved in: {}".format(path_saving_roc))