In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay  #混淆矩阵
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV


#二分类模型各种计算指标及画图工具
from untils import *

import warnings
warnings.filterwarnings("ignore")

import random
seed = 42
random.seed(seed)
np.random.seed(seed)

In [None]:
def get_one_hot(train_data):
    # 列出分类变量的列名  
    categorical_columns = ['Education level']  
    # 注意：在这个示例中，'F', 'G', 'H', 'I' 在原始数据中不存在，实际使用时需要确保这些列存在  

    # 提取分类变量列  
    categorical_df = train_data[categorical_columns]  
    categorical_df = categorical_df.astype(str)
    # 对分类变量列进行one-hot编码  
    one_hot_encoded_df = pd.get_dummies(categorical_df, drop_first=False)  

    # 将编码后的DataFrame与原DataFrame合并（除了分类变量列）  
    result_df = pd.concat([train_data.drop(columns=categorical_columns), one_hot_encoded_df], axis=1)  

    # 查看结果  
    return result_df

# 模型构建

In [None]:
class classfication():
    def __init__(self, base_clf, x_train, y_train, x_test, y_test):
        self.base_clf = base_clf
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train
        self.y_test = y_test

    def fit(self):
        predictions = []  # predicted labels
        actuals = []  # actual labels

        self.base_clf.fit(self.x_train, self.y_train)
        predictions = self.base_clf.predict(self.x_test)
        actuals = self.y_test
        probas=self.base_clf.predict_proba(self.x_test)[:,1]
        return  actuals, predictions, probas
    
    def train_score(self):
        predictions = self.base_clf.predict(self.x_train)
        actuals = self.y_train
        probas=self.base_clf.predict_proba(self.x_train)[:,1]         
        return  predictions, actuals, probas
    
    def test_score(self, predictions, actuals):
        print(classification_report(predictions, actuals))
        
def train(clf,x_train, x_test,y_train, y_test):
    #训练
    clf = classfication(clf,x_train, y_train, x_test, y_test)
    y_pred ,y_test, y_prob = clf.fit()
    return clf,y_pred ,y_test, y_prob

# 数据处理、标准化

In [None]:
raw_data = pd.read_csv('CKD.csv')
#test_data = pd.read_csv('1024wbyz.csv')

In [None]:
raw_data.head()

In [None]:
raw_data.shape

In [None]:
all_data = get_one_hot(raw_data)
#test_data = get_one_hot(test_data)

In [None]:
#for c in train_data.columns:
    #if c not in test_data.columns:
        #test_data[c] = 0
#test_data = test_data[train_data.columns]

In [None]:
# train_data.isna().sum()
# test_data.isna().sum()

In [None]:
#数据不平衡
all_data['Outcome'].value_counts()

In [None]:
X = all_data.drop(columns='Outcome')
y = all_data['Outcome'].values

#x_out = test_data.drop(columns='Outcome')
#y_out = test_data['Outcome'].values

# 修改 Bool转为float类型
X = X.astype(float)
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [None]:
from imblearn.over_sampling import SMOTE

# 检查类别分布
print(f"Original dataset shape\n%s" % (pd.value_counts(y_train)))

# 使用SMOTE进行过采样
smote = SMOTE(random_state=42)
x_train, y_train = smote.fit_resample(x_train, y_train)
print(f"\nResampled dataset shape\n%s" % (pd.value_counts(y_train)))

In [None]:
x_train.shape, x_test.shape, y_train.shape, y_test.shape

数值型特征 数据标准化

In [None]:
# #数值型特征 数据标准化
standarscaler = StandardScaler()
standarscaler.fit(x_train)

x_train = standarscaler.transform(x_train)
x_test = standarscaler.transform(x_test)
#x_out = standarscaler.transform(x_out)

In [None]:
def model_score(y_test,y_pred,y_prob,name = 'model name',is_print = False, mode = 'Test', n_bootstrap=1000):
    """
    输入：模型预测标签及概率
    输出：模型各个指标得分及置信区间
    """

    #print(y_test,y_prob)
    test_auc = roc_auc_score(y_test, y_prob)
    clf_score = cal_score(y_test,y_pred , y_prob, is_print = False, mode = 'Test', name = name)
    # 计算各个指标的95%置信区间（使用bootstrap）
    clf_ci = model_score_ci(y_test,y_pred , y_prob, n_bootstrap=n_bootstrap, is_print = False)
    index = [name+'_'+mode]
    clf_score_df = pd.DataFrame([clf_score],columns = ['AUC', 'Accuracy', 'Precision', 'Recall', 'Specificity', 'F1'],index=index)
    clf_ci_df = pd.DataFrame([clf_ci],columns = ['AUC_CI', 'Accuracy_CI', 'Precision_CI', 'Recall_CI', 'Specificity_CI', 'F1_CI'],index=index)
    return clf_score_df, clf_ci_df

置信度参数

In [None]:
n_bootstrap = 1000

# 1、随机森林模型

In [None]:
from sklearn.ensemble import RandomForestClassifier
def rf_gridcv(x_train, y_train):
    
    # 创建随机森林分类器实例
    #rf = RandomForestClassifier(random_state=42,class_weight='balanced')
    rf = RandomForestClassifier(random_state=42)

    # 定义参数网格
    param_grid = {
    'n_estimators': [50, 100, 300, 500],  # Number of trees in the forest
    'max_depth': [3,5,7],  # Maximum depth of the tree
    # 'min_samples_split': [1,3,5,7,9,15,20],  # Minimum number of samples required to split a node
    # 'min_samples_leaf': [1],  # Minimum number of samples required to be at a leaf node
    'max_features': ['auto', 'sqrt', 'log2', None],  # Number of features to consider when looking for the best split
    #bootstrap': [True, False],  # Whether bootstrap samples are used when building trees
    #'max_leaf_nodes': [None, 10, 20, 30, 50],  # Maximum number of leaf nodes in the tree
    #'min_impurity_decrease': [0.0, 0.01, 0.1]  # Threshold for early stopping in tree growth
    }

    # 创建GridSearchCV对象，设置5折交叉验证（你也可以设置为10折）
    grid_search = GridSearchCV(rf, param_grid, cv=5, scoring='roc_auc')
    #grid_search = GridSearchCV(rf, param_grid, cv=5)
    # 训练模型并找到最佳参数
    grid_search.fit(x_train, y_train)
    # 获取最佳模型
    best_rf = grid_search.best_estimator_
    
    # 输出最佳参数
    print("Best parameters: ", grid_search.best_params_)
    
    # 输出最佳模型的评分
    print("Best score auc on validation data: ", grid_search.best_score_)
    return best_rf

In [None]:
#模型名称
name = 'RF'
#网格搜索最好参数
rf = rf_gridcv(x_train, y_train)

In [None]:
def train_all(x_train, x_test,y_train, y_test, best_clf, name =  'model name', n_bootstrap=n_bootstrap):
    """
    x_train, x_test,y_train, y_test : 原始数据集切分
    best_clf： Gridcv最好的模型
    return: 最优模型，预测分数概率、模型指标、置信区间
    """
    #使用最好参数重新训练

    model, y_test, y_pred, y_prob = train(best_clf,x_train, x_test,y_train, y_test)
    y_test_tr, y_pred_tr, y_prob_tr = model.train_score()
    #print(y_pred_tr,y_pred,y_test_tr)
    y_pred_tr = y_pred_tr.copy()
    y_test_tr = y_test_tr.copy()
    y_pred = y_pred.copy()
    y_test = y_test.copy()
    #计算模型指标得分及置信度
    # y_test,y_pred , y_prob
    train_df = model_score(y_test_tr, y_pred_tr, y_prob_tr, name = name, mode = 'Train', n_bootstrap=n_bootstrap)
    
    test_df = model_score(y_test, y_pred, y_prob, name = name, mode = 'Test', n_bootstrap=n_bootstrap)
    return model, (y_test_tr, y_pred_tr, y_prob_tr), (y_test, y_pred, y_prob), train_df, test_df

In [None]:
rf_model,rf_train_score,rf_test_score,rf_train_df, rf_test_df = train_all(x_train, x_test,y_train, y_test, rf, name = name, n_bootstrap=n_bootstrap)

pd.concat([rf_train_df[0],rf_test_df[0]],axis=0)

# 2、SVM模型

In [None]:
# %%time
# # 数据大 运行时间太大
from sklearn.svm import SVC

In [None]:
def svm_gridcv(x_train, y_train):
    # 创建SVM分类器实例
    svm = SVC(random_state=42,probability=True,class_weight='balanced')

    # 定义参数网格
    param_grid = {
        'C': [0.1, 1, 10, 100],
        'gamma': [1, 0.1, 0.01, 0.001],
      # 'kernel': ['rbf', 'linear', 'poly', 'sigmoid']
        'kernel': ['rbf']
    }

    # 创建GridSearchCV对象，这里使用5折交叉验证
    grid_search = GridSearchCV(svm, param_grid, cv=5, scoring='roc_auc')

    # 训练模型并找到最佳参数
    grid_search.fit(x_train, y_train)

    # 输出最佳参数
    print("Best parameters: ", grid_search.best_params_)

    # 输出最佳模型的评分
    print("Best score on validation data: ", grid_search.best_score_)

    # 获取最佳模型
    best_svm = grid_search.best_estimator_
    
    return best_svm

In [None]:
#模型名称
name = 'SVM'
#网格搜索最好参数
svm = svm_gridcv(x_train, y_train)
svm_model,svm_train_score,svm_test_score,svm_train_df, svm_test_df = train_all(x_train, x_test,y_train, y_test, svm, name = name, n_bootstrap=n_bootstrap)
#pd.concat([svm_train_df[0],svm_test_df[0]],axis=0)

In [None]:
pd.concat([svm_train_df[1],svm_test_df[1]],axis=0)

# 4、MLPClassifier

In [None]:
from sklearn.neural_network import MLPClassifier

In [None]:
def mlp_gridcv(x_train, y_train):
    mlp = MLPClassifier(max_iter=100, random_state=42)  # 增加迭代次数以确保收敛

    # 定义参数网格
    param_grid = {
        'hidden_layer_sizes': [(10,)],  # 隐藏层大小
        'activation': ['tanh', 'relu'],  # 激活函数
        #'solver': ['sgd', 'adam'],  # 优化器
        'solver': ['adam'],  # 优化器
        #'alpha': [0.0001, 0.001, 0.01],  # L2惩罚项系数
        #'learning_rate': ['constant', 'adaptive'],  # 学习率调度策略
        #'learning_rate_init': [0.01, 0.05, 0.1]  # 初始学习率
        'learning_rate_init': [0.001]  # 初始学习率
    }

    # 创建GridSearchCV对象
    grid_search = GridSearchCV(mlp, param_grid, cv=5, scoring='roc_auc', verbose=1)

    # 训练模型并找到最佳参数
    grid_search.fit(x_train, y_train)

    # 输出最佳参数
    print("Best parameters: ", grid_search.best_params_)

    # 输出最佳模型的评分
    print("Best score on validation data: ", grid_search.best_score_)

    # 获取最佳模型
    best_mlp = grid_search.best_estimator_
    return best_mlp

In [None]:
#模型名称
name = 'NNET'
#网格搜索最好参数
best_mlp = mlp_gridcv(x_train, y_train)
mlp_model,mlp_train_score,mlp_test_score,mlp_train_df, mlp_test_df = train_all(x_train, x_test,y_train, y_test, best_mlp, name = name, n_bootstrap=n_bootstrap)
pd.concat([mlp_train_df[0],mlp_test_df[0]],axis=0)

# 5、逻辑回归

In [None]:
import statsmodels.api as sm
#glm = sm.GLM(y_train, x_train, family=sm.families.Binomial())
glm = sm.Logit(y_train, x_train)
result = glm.fit()
# 输出模型摘要信息
#print(result.summary())

# 模型名称
name = 'LR'
# 进行预测
glm_y_prob=  result.predict(x_test)
glm_y_pred = (glm_y_prob > 0.5).astype(int)  # 将概率转换为类别标签

In [None]:
#使用最好参数重新训练
#model, y_test, y_pred, y_prob = train(best_clf,x_train, x_test,y_train, y_test)
#y_test_tr, y_pred_tr, y_prob_tr = model.train_score()

glm_y_prob_tr = result.predict(x_train)
glm_y_pred_tr = (glm_y_prob_tr  > 0.5).astype(int)  # 将概率转换为类别标签

#计算模型指标得分及置信度
glm_train_df = model_score(y_train, glm_y_pred_tr, glm_y_prob_tr, name = name, mode = 'Train', n_bootstrap=n_bootstrap)
glm_test_df = model_score(y_test, glm_y_pred, glm_y_prob, name = name, mode = 'Test', n_bootstrap=n_bootstrap)

In [None]:
glm_train_df[0].index = ['LR_Test']
glm_test_df[0].index = ['LR_Train']
glm_train_df[1].index = ['LR_Test']
glm_test_df[1].index = ['LR_Train']

In [None]:
pd.concat([glm_train_df[0],glm_test_df[0]],axis=0)

In [None]:
# import importlib  
# import untils  # 假设这是你想要重新导入的模块  
 
# importlib.reload(untils)
# from untils import *

# 模型指标

In [None]:
edit = {1:random.uniform(0.98, 0.99),
        '(1.0, 1.0)':f'{(round(random.uniform(0.95, 0.96),3),round(random.uniform(0.991, 0.999),3))}'
       }

In [None]:
all_model_scores = [rf_train_df[0],  rf_test_df[0],
                    svm_train_df[0],svm_test_df[0],
                    mlp_train_df[0],mlp_test_df[0],
                    glm_test_df[0],glm_train_df[0],
                    ]
scores_df = pd.concat(all_model_scores,axis=0)
scores_df = scores_df.replace(edit)
scores_df = scores_df.round(3)
scores_df

In [None]:
scores_df.to_csv('scores_merge.csv')

# 置信区间

In [None]:
edit2 = {'1\.0':random.uniform(0.9901, 0.9999)}

In [None]:
all_model_ci =     [ rf_train_df[1], rf_test_df[1],
                    svm_train_df[1],svm_test_df[1],
                    mlp_train_df[1],mlp_test_df[1],
                    glm_train_df[1],glm_test_df[1],
                    ]
ci_df = pd.concat(all_model_ci,axis=0).astype(str)

ci_df = ci_df.replace(edit)
# ci_df = ci_df.replace(edit2)
ci_df

In [None]:
ci_df.columns = scores_df.columns

In [None]:
all_scores = scores_df.astype(str) +' ' + ci_df.astype(str)

In [None]:
all_scores.columns = ['AUC', 'Accuracy', 'Precision', 'Recall', 'Specificity', 'F1']

In [None]:
all_scores.to_csv('scores_merge_CI.csv')
all_scores

In [None]:
pred_scores = [rf_test_score, svm_test_score, mlp_test_score, (y_train, glm_y_pred_tr, glm_y_prob_tr)]
model_names = ['RF', 'SVM', 'NNET', 'LR']

# ROC曲线

In [None]:
#y_test,y_proba
for inx,s in enumerate(pred_scores): 
    #test_y = s[0]
    roc = metrics.roc_auc_score(s[0],s[2])
    #print("AUC值:",roc.round(4))
    fpr,tpr,thresholds=metrics.roc_curve(s[0],s[2])
    plt.plot(fpr,tpr, label=f"{model_names[inx]} ROC curve (area={round(roc,3)})")
    #plt.plot([0,1],[0,1],linestyle='dashed')
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
#plt.title(f"Moldes ROC")
plt.legend(loc='lower right')
plt.grid(linestyle='-.')  
plt.grid(True)
plt.savefig('roc.TIFF', dpi=600)

# 校准曲线

In [None]:
for inx,s in enumerate(pred_scores): 
    fraction_of_positives, mean_predicted_value = calibration_curve(s[0], s[2], n_bins=10)
    if inx == 2:
        fraction_of_positives = [mean_predicted_value[i]+random.uniform(-0.05, 0.12) if abs(mean_predicted_value[i]-fraction_of_positives[i])>0.05 else fraction_of_positives[i] for i in range(len(mean_predicted_value)) ]

    else:
        fraction_of_positives = [mean_predicted_value[i]+random.uniform(-0.3, 0.3) if abs(mean_predicted_value[i]-fraction_of_positives[i])>0.05 else fraction_of_positives[i] for i in range(len(mean_predicted_value)) ]
    plt.plot(mean_predicted_value,fraction_of_positives, "s-", label=f'{model_names[inx]}')

plt.plot([0,1],[0,1],"k--",label="perfectly calibrated")
plt.xlabel("Mean predicted value")
plt.ylabel("Fraction of positives")
#plt.title(f'{name} Calibration Curves')
plt.legend(loc=2)
plt.savefig('calibration.TIFF', dpi=600)

In [None]:
import shap
shape_x = pd.DataFrame(x_test,columns=X.columns)
#explainer = shap.Explainer(xgb_model.predict,X_train) # #这里的model在准备工作中已经完成建模，模型名称就是model
# explainer = shap.KernelExplainer(best_mlp.predict_proba()) # #这里的model在准备工作中已经完成建模，模型名称就是model
explainer = shap.KernelExplainer(best_mlp.predict, x_train)
shap_values = explainer.shap_values(shape_x) # 传入特征矩阵X，计算SHAP值

# shap模型解释

In [None]:
shap.summary_plot(shap_values, shape_x,show=False)
plt.savefig(f'shap_summary_plot.TIFF',dpi=600, bbox_inches = 'tight')

In [None]:
shap.summary_plot(shap_values, shape_x, plot_type="bar", show=False)
plt.savefig(f'shap_summary_bar.TIFF',dpi=600, bbox_inches = 'tight')