In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from datetime import datetime
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.metrics import confusion_matrix, roc_auc_score
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler, EditedNearestNeighbours, TomekLinks
from imblearn.combine import SMOTEENN, SMOTETomek
from sklearn.model_selection import StratifiedKFold, KFold
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier

In [None]:
base_dir = '/content/drive/MyDrive/研究所/雅涵實驗室/給學弟妹/嘉基計畫 For 致鈞&芳瑗/input/1213/'
result_dir = '/content/'
dataFolders = [
    # "struct",
    # "unigram(triage_cc)",
    # "unigram(PI)",
    # "bigram(PI)",
    # "CUI(PI)",
    # "struct + unigram(triage_cc)",
    "struct + unigram(PI)",
    # "struct + bigram(PI)",
    # "struct + CUI(PI)",
    # "struct + unigram(triage_cc) + unigram(PI)",
    # "struct + unigram(triage_cc) + bigram(PI)",
    # "struct + unigram(triage_cc) + CUI(PI)"
]
#sampling_methods = ['SMOTETomek', 'SMOTE', 'RUS', 'ROS']
sampling_methods = ['ROS']

classifiers = {
    # 'XGB': XGBClassifier(colsample_bytree=0.7, learning_rate=0.01, max_depth=7, n_estimators=300, subsample=0.7),
    #'SVM': SVC(probability=True),
    # 'RF': RandomForestClassifier(),
    # 'DecisionTree': DecisionTreeClassifier(max_depth=3, min_samples_leaf=1, min_samples_split=2),
    'LogisticRegression': LogisticRegression({C=1, penalty='l1', solver='liblinear'}),
    #'KNN': KNeighborsClassifier(),
}

SyntaxError: ignored

In [None]:
def compute_metrics(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    sensitivity = tp / (tp + fn)
    specificity = tn / (tn + fp)
    return sensitivity, specificity

def apply_sampling(X_train, y_train, method):
    if method == 'SMOTE':
        smote = SMOTE(sampling_strategy=1.0, random_state=42)
        X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
    elif method == 'ROS':
        ros = RandomOverSampler(sampling_strategy=1.0, random_state=42)
        X_resampled, y_resampled = ros.fit_resample(X_train, y_train)
    elif method == 'RUS':
        rus = RandomUnderSampler(sampling_strategy=1.0, random_state=42)
        X_resampled, y_resampled = rus.fit_resample(X_train, y_train)
    elif method == 'SMOTETomek':
        smote_tomek = SMOTETomek(sampling_strategy='auto', random_state=42, smote=SMOTE(), tomek=TomekLinks())
        X_resampled, y_resampled = smote_tomek.fit_resample(X_train, y_train)
    else:
        X_resampled, y_resampled = X_train, y_train
    return X_resampled, y_resampled

def trainClassifier(X_train, y_train, sampling_method, clf):
    try:
        X_train_resampled, y_train_resampled = apply_sampling(X_train, y_train, sampling_method)

        # Fit the classifier to the resampled training data
        clf.fit(X_train_resampled, y_train_resampled)

        return clf

    except Exception as e:
        print(f"Error in trainClassifier for method {sampling_method}: {str(e)}")
        return None

def get_results_on_test(clf, X_test, y_test, sampling_method):
    try:
        # Predict the probabilities for the test data
        y_pred_proba = clf.predict_proba(X_test)[:, 1]
        # Convert probabilities to binary predictions
        y_pred = (y_pred_proba > 0.5).astype(int)

        test_sensitivity, test_specificity = compute_metrics(y_test, y_pred)
        test_auc = roc_auc_score(y_test, y_pred_proba)
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()

        return test_auc, test_sensitivity, test_specificity, tn, fp, fn, tp, y_test, y_pred_proba

    except Exception as e:
        print(f"Error in get_results_on_test: {str(e)}")
        return None, None, None, None, None, None, None, None, None

def plot_and_save_roc_curve(y_true, y_pred_proba, dataFolder, method, result_dir):
    # Compute ROC curve and ROC area for each class
    fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
    roc_auc = auc(fpr, tpr)

    # Plot
    plt.figure(figsize=(10, 7))
    lw = 2
    plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plot_title = f'ROC for {dataFolder}_{method}'
    plt.title(plot_title)
    plt.legend(loc="lower right")

    # Save the plot
    plot_filename = result_dir + '/' + plot_title + '.png'
    plt.savefig(plot_filename)

    plt.show()

def printResults(train_results_df, test_results_df, dataFolders, sampling_methods):
    today = datetime.today().strftime("%Y%m%d%H%M%S")

    #train_results_df.to_csv(result_dir  + '/' + str(dataFolders) + '_' + str(sampling_methods) + '_train_results_' + today + '.csv', index=False, sep=',', encoding='utf-8-sig')
    #test_results_df.to_csv(result_dir  + '/' + str(dataFolders) + '_' + str(sampling_methods) + '_test_results_' + today + '.csv', index=False, sep=',', encoding='utf-8-sig')

In [None]:
def main():
    try:
        train_results_list = []  # 用於收集每個sampling_method的平均分數
        test_results_list = []  # 用於收集測試集結果

        for dataFolder in dataFolders:
            X_train = pd.read_csv(base_dir + dataFolder + '/train/X_train_pre.csv')
            X_test = pd.read_csv(base_dir + dataFolder + '/test/X_test_pre.csv')
            y_train = pd.read_csv(base_dir + dataFolder + '/train/y_train_pre.csv')
            y_test = pd.read_csv(base_dir + dataFolder + '/test/y_test_pre.csv')

            # 移除pseudo_no
            X_train.drop('pseudo_no', axis=1, inplace=True)
            X_test.drop('pseudo_no', axis=1, inplace=True)
            #y_train.drop('Unnamed: 0', axis=1, inplace=True)
            #y_test.drop('Unnamed: 0', axis=1, inplace=True)

            for classifier_name, clf in classifiers.items():
                for sampling_method in sampling_methods:
                    skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)
                    fold_results = []
                    best_auc = float('-inf')
                    best_classifier = None
                    best_sampling_method = None

                    for fold, (train_index, valid_index) in enumerate(skf.split(X_train, y_train)):
                        X_train_fold, X_valid_fold = X_train.iloc[train_index], X_train.iloc[valid_index]
                        y_train_fold, y_valid_fold = y_train.iloc[train_index], y_train.iloc[valid_index]

                        # 在折疊上訓練分類器
                        clf_fold = trainClassifier(X_train_fold, y_train_fold, sampling_method, clf)

                        # 如果訓練失敗，跳過這次迭代
                        if clf_fold is None:
                            continue

                        # 在驗證折上評估分類器
                        auc, sensitivity, specificity, *_ = get_results_on_test(clf_fold, X_valid_fold, y_valid_fold, sampling_method)

                        # 收集此fold的結果
                        fold_results.append({
                            'Fold': fold,
                            'Train AUC': auc,
                            'Train Sensitivity': sensitivity,
                            'Train Specificity': specificity,
                        })

                        # 更新最佳模型
                        if auc > best_auc:
                            best_auc = auc
                            best_classifier = clf_fold
                            best_sampling_method = sampling_method

                    # 計算所有fold的平均值
                    avg_results = {metric: np.mean([fr[metric] for fr in fold_results]) for metric in ['Train AUC', 'Train Sensitivity', 'Train Specificity']}

                    # 將平均結果添加到train_results_list
                    train_results_list.append({
                        'DataFolder': dataFolder,
                        'Classifier': classifier_name,
                        'SamplingMethod': sampling_method,
                        **avg_results
                    })

                    if best_classifier:
                        # 使用最佳分類器在測試集上進行預測
                        auc_test, sensitivity_test, specificity_test, tn, fp, fn, tp, y_true, y_pred_proba = get_results_on_test(best_classifier, X_test, y_test, best_sampling_method)
                        plot_and_save_roc_curve(y_true, y_pred_proba, dataFolder, best_sampling_method, result_dir)

                        # 將測試集的結果添加到test_results_list
                        test_results_list.append({
                            'DataFolder': dataFolder,
                            'Classifier': classifier_name,
                            'SamplingMethod': best_sampling_method,
                            'Test AUC': auc_test,
                            'Test Sensitivity': sensitivity_test,
                            'Test Specificity': specificity_test,
                            'TN': tn,
                            'FP': fp,
                            'FN': fn,
                            'TP': tp,
                        })

        # 生成結果的 DataFrame
        train_results_df = pd.DataFrame(train_results_list)
        test_results_df = pd.DataFrame(test_results_list)

        # 打印結果
        printResults(train_results_df, test_results_df, dataFolders, sampling_methods)
        return train_results_list, test_results_list

    except Exception as e:
        print(f"主函數錯誤: {str(e)}")
        return [], []  # 在錯誤的情況下返回空列表

In [None]:
train_results_list, test_results_list = main()

In [None]:
pd.DataFrame(train_results_list)

In [None]:
pd.DataFrame(test_results_list)

In [None]:
# pd.DataFrame(train_results_list).to_excel("/content/drive/MyDrive/研究所/雅涵實驗室/給學弟妹/嘉基計畫 For 致鈞&芳瑗/Result/ML/1214/train_result_1121.xlsx", index=False)
# pd.DataFrame(test_results_list).to_excel("/content/drive/MyDrive/研究所/雅涵實驗室/給學弟妹/嘉基計畫 For 致鈞&芳瑗/Result/ML/1214/test_result_1121.xlsx", index=False)

In [None]:
pd.DataFrame(train_results_list).to_excel("/content/train_result_1121.xlsx", index=False)
pd.DataFrame(test_results_list).to_excel("/content/test_result_1121.xlsx", index=False)

In [None]:
print("ALL DONE!")