In [3]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 决策树
from sklearn.neighbors import KNeighborsClassifier  # KNN
from xgboost import XGBClassifier  # XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix
)
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt


In [4]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 决策树
from sklearn.neighbors import KNeighborsClassifier  # KNN
from xgboost import XGBClassifier  # XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix
)
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt

# ============== 1. 读取数据 ==============
# 请根据实际文件路径进行修改
train_data_1 = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_1_binarized.csv')
train_data_2 = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_2_binarized.csv')
test_data = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_3_binarized.csv')

# 合并作为训练集
train_data = pd.concat([train_data_1, train_data_2], ignore_index=True)

# ============== 2. 如果需要，从SMILES计算Morgan指纹 ==============
# 若 bianrized.csv 中实际上并无 'smiles' 列（或已是数值特征），可注释此函数与相关调用。
def get_fingerprint(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=n_bits)
        return np.array(fp)
    else:
        return np.zeros(n_bits)

# 如果 CSV 中仍包含“smiles”列，需要计算分子指纹；否则请将此段注释/删除
if 'smiles' in train_data.columns:
    train_data['fingerprint'] = train_data['smiles'].apply(get_fingerprint)
if 'smiles' in test_data.columns:
    test_data['fingerprint'] = test_data['smiles'].apply(get_fingerprint)

# ============== 3. 分离正负样本 ==============
# 假设各文件都带有 'antibiotic_activity' 列 (0或1)
train_positive = train_data[train_data['antibiotic_activity'] == 1]
train_negative = train_data[train_data['antibiotic_activity'] == 0]

test_positive = test_data[test_data['antibiotic_activity'] == 1]
test_negative = test_data[test_data['antibiotic_activity'] == 0]

# ============== 4. 负样本聚类筛选 ==============
# ---- 训练集 ----
num_clusters_train = 2 * len(train_positive)
num_clusters_train = min(num_clusters_train, len(train_negative))  # 防止聚类数>负样本总数
num_clusters_train = max(num_clusters_train, 1)  # 至少保证1个簇

train_negative_fps = np.vstack(train_negative['fingerprint'].values)  # 将fingerprint列拼接成矩阵
kmeans_train = KMeans(n_clusters=num_clusters_train, random_state=42)
kmeans_train.fit(train_negative_fps)
closest_train, _ = pairwise_distances_argmin_min(kmeans_train.cluster_centers_, train_negative_fps)
selected_negative_train = train_negative.iloc[closest_train]

# ---- 测试集 ----
num_clusters_test = 2 * len(test_positive)
num_clusters_test = min(num_clusters_test, len(test_negative))
num_clusters_test = max(num_clusters_test, 1)

test_negative_fps = np.vstack(test_negative['fingerprint'].values)
kmeans_test = KMeans(n_clusters=num_clusters_test, random_state=42)
kmeans_test.fit(test_negative_fps)
closest_test, _ = pairwise_distances_argmin_min(kmeans_test.cluster_centers_, test_negative_fps)
selected_negative_test = test_negative.iloc[closest_test]

# ============== 5. 最终训练集与测试集 ==============
final_train_data = pd.concat([train_positive, selected_negative_train], ignore_index=True)
final_test_data = pd.concat([test_positive, selected_negative_test], ignore_index=True)

# 提取特征和标签
X_train = np.vstack(final_train_data['fingerprint'].values)
y_train = final_train_data['antibiotic_activity'].values

X_test = np.vstack(final_test_data['fingerprint'].values)
y_test = final_test_data['antibiotic_activity'].values

# ============== 6. 特征缩放 ==============
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# ============== 7. 定义模型与超参数 ==============
models_params = {
    'Logistic Regression': (
        LogisticRegression(max_iter=1000),
        {'C': [0.01, 0.1, 1, 10, 100]}
    ),
    'SVM': (
        SVC(probability=True),
        {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
    ),
    'Random Forest': (
        RandomForestClassifier(),
        {'n_estimators': [10, 50, 100], 'max_depth': [None, 10, 20]}
    ),
    'Decision Tree': (
        DecisionTreeClassifier(),
        {'max_depth': [None, 5, 10], 'min_samples_split': [2, 5, 10]}
    ),
    'KNN': (
        KNeighborsClassifier(),
        {'n_neighbors': [3, 5, 7], 'weights': ['uniform', 'distance']}
    ),
    'XGBoost': (
        XGBClassifier(use_label_encoder=False, eval_metric='logloss'),
        {'n_estimators': [50, 100], 'max_depth': [3, 6], 'learning_rate': [0.01, 0.1]}
    )
}

# 用于保存各模型训练结果
results = []

# ============== 8. 模型训练与评估 ==============
for model_name, (model, params) in models_params.items():
    print(f"\n>>> Training model: {model_name}")
    clf = GridSearchCV(model, params, scoring='roc_auc', cv=5)
    clf.fit(X_train_scaled, y_train)
    
    best_model = clf.best_estimator_
    # 预测
    y_pred = best_model.predict(X_test_scaled)
    
    # 若支持 predict_proba，则拿到正类(=1)的预测概率；否则使用 decision_function 做归一化
    if hasattr(best_model, "predict_proba"):
        y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
    else:
        y_decision = best_model.decision_function(X_test_scaled)
        # 将决策函数输出归一化到 [0,1]
        y_pred_proba = (
            (y_decision - y_decision.min()) / (y_decision.max() - y_decision.min())
            if y_decision.max() != y_decision.min()
            else np.zeros_like(y_decision)
        )
    
    # 计算性能指标
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
    prc_auc = auc(recall, precision)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    mcc = matthews_corrcoef(y_test, y_pred)
    recall_val = recall_score(y_test, y_pred)
    precision_val = precision_score(y_test, y_pred)

    # 混淆矩阵
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    fp_rate = fp / (fp + tn) if (fp + tn) > 0 else 0

    print(f"Best Params: {clf.best_params_}")
    print(f"ROC-AUC: {roc_auc:.4f}")
    print(f"PRC-AUC: {prc_auc:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"MCC: {mcc:.4f}")
    print(f"Recall: {recall_val:.4f}")
    print(f"Precision: {precision_val:.4f}")
    print(f"False Positives: {fp}")
    print(f"False Positive Rate: {fp_rate:.4f}")

    results.append({
        'Model': model_name,
        'Best Params': clf.best_params_,
        'ROC-AUC': roc_auc,
        'PRC-AUC': prc_auc,
        'Accuracy': accuracy,
        'F1 Score': f1,
        'MCC': mcc,
        'Recall': recall_val,
        'Precision': precision_val,
        'False Positives': fp,
        'False Positive Rate': fp_rate
    })

# ============== 9. 结果保存并展示 ==============
results_df = pd.DataFrame(results)
results_df.to_csv('model_evaluation_results12312.csv', index=False)
print("\n所有模型的评估结果：")
print(results_df)
print("\n评估结果已保存到 model_evaluation_results.csv。")

# ============== 代码总结 ==============
"""
1) 将 library_1_binarized.csv 和 library_2_binarized.csv 拼接为训练集，library_3_binarized.csv 为测试集；
2) 根据 'antibiotic_activity' 分为正负样本，并对负样本进行KMeans聚类，从而选取代表性的负样本；
3) 计算Morgan指纹(若binarized.csv中无需再计算指纹则可跳过这一步)；
4) 训练集和测试集分别拼合正负样本后，进行标准化处理；
5) 用多种模型(GridSearchCV)进行超参数搜索；
6) 计算 ROC-AUC、PRC-AUC、Accuracy、F1、MCC、Recall、Precision、混淆矩阵等指标，并输出结果到CSV。
"""



>>> Training model: Logistic Regression
Best Params: {'C': 0.01}
ROC-AUC: 0.5864
PRC-AUC: 0.4440
Accuracy: 0.6667
F1 Score: 0.2533
MCC: 0.1263
Recall: 0.1696
Precision: 0.5000
False Positives: 19
False Positive Rate: 0.0848

>>> Training model: SVM
Best Params: {'C': 1, 'kernel': 'rbf'}
ROC-AUC: 0.5453
PRC-AUC: 0.4645
Accuracy: 0.6875
F1 Score: 0.1176
MCC: 0.2063
Recall: 0.0625
Precision: 1.0000
False Positives: 0
False Positive Rate: 0.0000

>>> Training model: Random Forest
Best Params: {'max_depth': None, 'n_estimators': 100}
ROC-AUC: 0.5521
PRC-AUC: 0.4401
Accuracy: 0.6905
F1 Score: 0.1746
MCC: 0.2001
Recall: 0.0982
Precision: 0.7857
False Positives: 3
False Positive Rate: 0.0134

>>> Training model: Decision Tree
Best Params: {'max_depth': None, 'min_samples_split': 10}
ROC-AUC: 0.4468
PRC-AUC: 0.4032
Accuracy: 0.4524
F1 Score: 0.3235
MCC: -0.1180
Recall: 0.3929
Precision: 0.2750
False Positives: 116
False Positive Rate: 0.5179

>>> Training model: KNN
Best Params: {'n_neighbors'

"\n1) 将 library_1_binarized.csv 和 library_2_binarized.csv 拼接为训练集，library_3_binarized.csv 为测试集；\n2) 根据 'antibiotic_activity' 分为正负样本，并对负样本进行KMeans聚类，从而选取代表性的负样本；\n3) 计算Morgan指纹(若binarized.csv中无需再计算指纹则可跳过这一步)；\n4) 训练集和测试集分别拼合正负样本后，进行标准化处理；\n5) 用多种模型(GridSearchCV)进行超参数搜索；\n6) 计算 ROC-AUC、PRC-AUC、Accuracy、F1、MCC、Recall、Precision、混淆矩阵等指标，并输出结果到CSV。\n"

In [5]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 决策树
from sklearn.neighbors import KNeighborsClassifier  # KNN
from xgboost import XGBClassifier  # XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix
)
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt

# ============== 1. 读取数据 ==============
# 请根据实际路径修改
train_data_1 = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_1_binarized.csv')
train_data_2 = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_2_binarized.csv')
test_data = pd.read_csv(r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_3_binarized.csv')

# 合并library_1与library_2作为训练集
train_data = pd.concat([train_data_1, train_data_2], ignore_index=True)

# ============== 2. 从SMILES计算Morgan指纹（如已是数值特征可去掉此步） ==============
def get_fingerprint(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=n_bits)
        return np.array(fp)
    else:
        return np.zeros(n_bits)

# 如果CSV含有'smiles'列，则计算指纹
if 'smiles' in train_data.columns:
    train_data['fingerprint'] = train_data['smiles'].apply(get_fingerprint)
if 'smiles' in test_data.columns:
    test_data['fingerprint'] = test_data['smiles'].apply(get_fingerprint)

# ============== 3. 分离正负样本（训练集） ==============
train_positive = train_data[train_data['antibiotic_activity'] == 1]
train_negative = train_data[train_data['antibiotic_activity'] == 0]

# ============== 4. 对训练负样本做KMeans聚类并选取代表 ==============
num_clusters_train = 2 * len(train_positive)
num_clusters_train = min(num_clusters_train, len(train_negative))  # 防止聚类数超过负样本量
num_clusters_train = max(num_clusters_train, 1)

train_negative_fps = np.vstack(train_negative['fingerprint'].values)
kmeans_train = KMeans(n_clusters=num_clusters_train, random_state=42)
kmeans_train.fit(train_negative_fps)

# 找到各簇中心点附近的负样本（代表样本）
closest_train, _ = pairwise_distances_argmin_min(kmeans_train.cluster_centers_, train_negative_fps)
selected_negative_train = train_negative.iloc[closest_train]

# ============== 4.1 计算各簇“半径”（训练集的负样本到其簇中心的最大距离） ==============
# 用于后续判定 “测试集化合物是否在训练集聚类范围内”
train_cluster_labels = kmeans_train.predict(train_negative_fps)
radii = []
for c_id in range(num_clusters_train):
    center = kmeans_train.cluster_centers_[c_id]
    # 该簇下所有训练负样本
    cluster_points = train_negative_fps[train_cluster_labels == c_id]
    dists = np.linalg.norm(cluster_points - center, axis=1)
    max_dist = dists.max()
    radii.append(max_dist)
radii = np.array(radii)

# ============== 5. 构建最终训练集：正样本 + 代表负样本 ==============
final_train_data = pd.concat([train_positive, selected_negative_train], ignore_index=True)

# ============== 6. 处理测试集 ==============
# 先判断library_3中的化合物是否在“训练集聚类范围”内
# 如果在范围内 -> 用于后续验证； 否则 -> 视为不做验证的那部分

# 先取出 test_data 中全部指纹
test_data_fps = np.vstack(test_data['fingerprint'].values)

# 定义函数：判断某个指纹是否在训练负样本的聚类范围内
def is_in_training_cluster_range(fp):
    # 先用 kmeans_train 预测属于哪一个簇
    assigned_cluster = kmeans_train.predict(fp.reshape(1, -1))[0]
    # 计算该簇中心 与 该指纹 的欧几里得距离
    center = kmeans_train.cluster_centers_[assigned_cluster]
    dist = np.linalg.norm(fp - center)
    # 如果该距离 <= 该簇在训练集中对应的“最大距离”，则认为在范围内
    if dist <= radii[assigned_cluster]:
        return True
    else:
        return False

in_range_mask = []
for i in range(len(test_data_fps)):
    fp = test_data_fps[i]
    in_range_mask.append(is_in_training_cluster_range(fp))

test_data['in_training_cluster_range'] = in_range_mask

# 拆分：在范围内 vs 不在范围内
test_data_in_range = test_data[test_data['in_training_cluster_range'] == True]
test_data_out_range = test_data[test_data['in_training_cluster_range'] == False]

print(f"Library_3: 在训练集聚类范围内的化合物数 = {len(test_data_in_range)}")
print(f"Library_3: 不在范围内的化合物数 = {len(test_data_out_range)}")

# ============== 7. 对“范围内”的测试集再做负样本KMeans选取 ==============
test_positive_in_range = test_data_in_range[test_data_in_range['antibiotic_activity'] == 1]
test_negative_in_range = test_data_in_range[test_data_in_range['antibiotic_activity'] == 0]

num_clusters_test_in_range = 2 * len(test_positive_in_range)
num_clusters_test_in_range = min(num_clusters_test_in_range, len(test_negative_in_range))
num_clusters_test_in_range = max(num_clusters_test_in_range, 1)

test_negative_in_range_fps = np.vstack(test_negative_in_range['fingerprint'].values)
kmeans_test_in_range = KMeans(n_clusters=num_clusters_test_in_range, random_state=42)
kmeans_test_in_range.fit(test_negative_in_range_fps)

closest_test_in_range, _ = pairwise_distances_argmin_min(kmeans_test_in_range.cluster_centers_, test_negative_in_range_fps)
selected_negative_test_in_range = test_negative_in_range.iloc[closest_test_in_range]

# 最终用于模型验证的 测试集 = 所有“范围内”正样本 + 聚类筛选的负样本
final_test_data = pd.concat([test_positive_in_range, selected_negative_test_in_range], ignore_index=True)
print(f"最终用于验证的测试样本数 = {len(final_test_data)}")

# ============== 8. 准备训练/测试特征与标签 ==============
X_train = np.vstack(final_train_data['fingerprint'].values)
y_train = final_train_data['antibiotic_activity'].values

X_test = np.vstack(final_test_data['fingerprint'].values)
y_test = final_test_data['antibiotic_activity'].values

# ============== 9. 特征缩放 ==============
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# ============== 10. 定义模型与超参数网格 ==============
models_params = {
    'Logistic Regression': (
        LogisticRegression(max_iter=1000),
        {'C': [0.01, 0.1, 1, 10, 100]}
    ),
    'SVM': (
        SVC(probability=True),
        {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
    ),
    'Random Forest': (
        RandomForestClassifier(),
        {'n_estimators': [10, 50, 100], 'max_depth': [None, 10, 20]}
    ),
    'Decision Tree': (
        DecisionTreeClassifier(),
        {'max_depth': [None, 5, 10], 'min_samples_split': [2, 5, 10]}
    ),
    'KNN': (
        KNeighborsClassifier(),
        {'n_neighbors': [3, 5, 7], 'weights': ['uniform', 'distance']}
    ),
    'XGBoost': (
        XGBClassifier(use_label_encoder=False, eval_metric='logloss'),
        {'n_estimators': [50, 100], 'max_depth': [3, 6], 'learning_rate': [0.01, 0.1]}
    )
}

# 用于保存各模型结果
results = []

# ============== 11. 训练与评估 ==============
for model_name, (model, params) in models_params.items():
    print(f"\n>>> Training model: {model_name}")
    clf = GridSearchCV(model, params, scoring='roc_auc', cv=5)
    clf.fit(X_train_scaled, y_train)
    best_model = clf.best_estimator_

    # 预测
    y_pred = best_model.predict(X_test_scaled)
    
    # 预测概率
    if hasattr(best_model, "predict_proba"):
        y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
    else:
        # 对不支持 predict_proba 的模型，用 decision_function 做归一化
        y_decision = best_model.decision_function(X_test_scaled)
        y_pred_proba = (
            (y_decision - y_decision.min()) / (y_decision.max() - y_decision.min())
            if y_decision.max() != y_decision.min() else np.zeros_like(y_decision)
        )
    
    # 计算各项指标
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    precision_arr, recall_arr, _ = precision_recall_curve(y_test, y_pred_proba)
    prc_auc = auc(recall_arr, precision_arr)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    mcc = matthews_corrcoef(y_test, y_pred)
    recall_val = recall_score(y_test, y_pred)
    precision_val = precision_score(y_test, y_pred)

    # 混淆矩阵
    tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
    fp_rate = fp / (fp + tn) if (fp + tn) > 0 else 0

    print(f"Best Params: {clf.best_params_}")
    print(f"ROC-AUC: {roc_auc:.4f}")
    print(f"PRC-AUC: {prc_auc:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"MCC: {mcc:.4f}")
    print(f"Recall: {recall_val:.4f}")
    print(f"Precision: {precision_val:.4f}")
    print(f"False Positives: {fp}")
    print(f"False Positive Rate: {fp_rate:.4f}")

    results.append({
        'Model': model_name,
        'Best Params': clf.best_params_,
        'ROC-AUC': roc_auc,
        'PRC-AUC': prc_auc,
        'Accuracy': accuracy,
        'F1 Score': f1,
        'MCC': mcc,
        'Recall': recall_val,
        'Precision': precision_val,
        'False Positives': fp,
        'False Positive Rate': fp_rate
    })

# ============== 12. 输出结果 ==============
results_df = pd.DataFrame(results)
results_df.to_csv('model_evaluation_results_part_library3.csv', index=False)

# 把“在范围内”和“不在范围内”的化合物也输出，便于查看
test_data_in_range.to_csv('library_3_in_range.csv', index=False)
test_data_out_range.to_csv('library_3_out_range.csv', index=False)

print("\n所有模型的评估结果：")
print(results_df)
print("\n评估结果已保存到 model_evaluation_results.csv。")
print(f"不在范围内的化合物已输出到 library_3_out_range.csv，共 {len(test_data_out_range)} 条。")

# ============== 代码总结 ==============
"""
1) 训练集 (library_1 + library_2)：
   - 分出正负样本，对负样本做 KMeans 并选取簇中心附近样本，形成最终训练集；
   - 同时记录训练集负样本各簇在 KMeans 中的“半径”。
2) 测试集 (library_3)：
   - 利用训练负样本聚类的中心和半径来判断 library_3 的每个化合物是否在“训练集聚类范围”内；
     - 若距离其所分配的簇中心 <= 该簇在训练集中的最大距离则“在范围内”，否则“超出范围”；
   - 仅对“在范围内”的化合物做后续测试：其中的负样本再做 KMeans 筛选，正样本直接保留；
   - 合并得到最终测试集后，与训练好的模型进行预测和评估；
3) 计算多种指标并输出到 model_evaluation_results.csv；
4) 同时将“在范围内”与“不在范围内”的化合物分别输出为 CSV，方便对比。
"""


Library_3: 在训练集聚类范围内的化合物数 = 3848
Library_3: 不在范围内的化合物数 = 1528
最终用于验证的测试样本数 = 249

>>> Training model: Logistic Regression
Best Params: {'C': 0.01}
ROC-AUC: 0.5826
PRC-AUC: 0.4469
Accuracy: 0.6747
F1 Score: 0.2832
MCC: 0.1570
Recall: 0.1928
Precision: 0.5333
False Positives: 14
False Positive Rate: 0.0843

>>> Training model: SVM
Best Params: {'C': 1, 'kernel': 'rbf'}
ROC-AUC: 0.5813
PRC-AUC: 0.4982
Accuracy: 0.6908
F1 Score: 0.1348
MCC: 0.2222
Recall: 0.0723
Precision: 1.0000
False Positives: 0
False Positive Rate: 0.0000

>>> Training model: Random Forest
Best Params: {'max_depth': None, 'n_estimators': 100}
ROC-AUC: 0.5689
PRC-AUC: 0.4538
Accuracy: 0.6908
F1 Score: 0.1720
MCC: 0.2025
Recall: 0.0964
Precision: 0.8000
False Positives: 2
False Positive Rate: 0.0120

>>> Training model: Decision Tree
Best Params: {'max_depth': None, 'min_samples_split': 5}
ROC-AUC: 0.5534
PRC-AUC: 0.4963
Accuracy: 0.5863
F1 Score: 0.3905
MCC: 0.0776
Recall: 0.3976
Precision: 0.3837
False Positives: 53
Fa

'\n1) 训练集 (library_1 + library_2)：\n   - 分出正负样本，对负样本做 KMeans 并选取簇中心附近样本，形成最终训练集；\n   - 同时记录训练集负样本各簇在 KMeans 中的“半径”。\n2) 测试集 (library_3)：\n   - 利用训练负样本聚类的中心和半径来判断 library_3 的每个化合物是否在“训练集聚类范围”内；\n     - 若距离其所分配的簇中心 <= 该簇在训练集中的最大距离则“在范围内”，否则“超出范围”；\n   - 仅对“在范围内”的化合物做后续测试：其中的负样本再做 KMeans 筛选，正样本直接保留；\n   - 合并得到最终测试集后，与训练好的模型进行预测和评估；\n3) 计算多种指标并输出到 model_evaluation_results.csv；\n4) 同时将“在范围内”与“不在范围内”的化合物分别输出为 CSV，方便对比。\n'

In [9]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.model_selection import KFold, GridSearchCV, train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 添加决策树
from sklearn.neighbors import KNeighborsClassifier  # 添加KNN
from xgboost import XGBClassifier  # 添加XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix  # 添加混淆矩阵
)
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt

# 加载数据
data = pd.read_csv('C:/Users/DuYih/Desktop/github/DL Microbiology Antibiotics/SyntheMol-main/data/Data/1_training_data/raw_data.csv')

# 计算分子指纹
def get_fingerprint(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=n_bits)
        return np.array(fp)
    else:
        return np.zeros(n_bits)

# 应用分子指纹计算
data['fingerprint'] = data['smiles'].apply(lambda x: get_fingerprint(x))

In [11]:
data['fingerprint']

0        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
1        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
2        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
3        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
4        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
                               ...                        
13519    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
13520    [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
13521    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
13522    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
13523    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...
Name: fingerprint, Length: 13524, dtype: object

In [12]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.model_selection import KFold, GridSearchCV, train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 添加决策树
from sklearn.neighbors import KNeighborsClassifier  # 添加KNN
from xgboost import XGBClassifier  # 添加XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix  # 添加混淆矩阵
)
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt

# 加载数据
data = pd.read_csv('C:/Users/DuYih/Desktop/github/DL Microbiology Antibiotics/SyntheMol-main/data/Data/1_training_data/raw_data.csv')

# 计算分子指纹
def get_fingerprint(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=n_bits)
        return np.array(fp)
    else:
        return np.zeros(n_bits)

# 应用分子指纹计算
data['fingerprint'] = data['smiles'].apply(lambda x: get_fingerprint(x))

# 分离正负样本
positive_samples = data[data['antibiotic_activity'] == 1]
negative_samples = data[data['antibiotic_activity'] == 0]

# 设置5折交叉验证
kf = KFold(n_splits=5, shuffle=True, random_state=42)

# 模型和超参数设置，添加决策树、KNN和XGBoost
models_params = {
    'Logistic Regression': (LogisticRegression(max_iter=1000), {'C': [0.01, 0.1, 1, 10, 100]}),
    'SVM': (SVC(probability=True), {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}), 
    'Random Forest': (RandomForestClassifier(), {'n_estimators': [10, 50, 100], 'max_depth': [None, 10, 20]}),
    'Decision Tree': (DecisionTreeClassifier(), {'max_depth': [None, 5, 10], 'min_samples_split': [2, 5, 10]}),
    'KNN': (KNeighborsClassifier(), {'n_neighbors': [3, 5, 7], 'weights': ['uniform', 'distance']}),
    'XGBoost': (XGBClassifier(use_label_encoder=False, eval_metric='logloss'), {'n_estimators': [50, 100], 'max_depth': [3, 6], 'learning_rate': [0.01, 0.1]})
}

# 存储每折的结果，包括新增的指标
results = {model: {'ROC-AUC': [], 'PRC-AUC': [], 'Accuracy': [], 'F1 Score': [], 'MCC': [], 'Recall': [], 'Precision': [], 
                   'False Positives': [], 'False Positive Rate': []} for model in models_params}

# 存储所有折的详细结果
all_fold_results = []

# 进行5折交叉验证
for fold, (train_index, test_index) in enumerate(kf.split(positive_samples)):
    print(f"Processing fold {fold + 1}")
    positive_train = positive_samples.iloc[train_index]
    positive_test = positive_samples.iloc[test_index]
    
    # 分离负样本
    negative_train, negative_test = train_test_split(negative_samples, test_size=0.2, random_state=fold)
    
    # 对训练集负样本进行聚类选择
    num_clusters_train = 2 * len(positive_train)
    num_clusters_train = min(num_clusters_train, len(negative_train))  # 防止聚类数超过样本数
    num_clusters_train = max(num_clusters_train, 1)  # 至少一个簇
    kmeans_train = KMeans(n_clusters=num_clusters_train, random_state=42)
    negative_train_fingerprints = np.vstack(negative_train['fingerprint'].values)
    kmeans_train.fit(negative_train_fingerprints)
    closest_train, _ = pairwise_distances_argmin_min(kmeans_train.cluster_centers_, negative_train_fingerprints)
    selected_negative_train = negative_train.iloc[closest_train[:num_clusters_train]]
    
    # 对测试集负样本进行聚类选择
    num_clusters_test = 2 * len(positive_test)
    num_clusters_test = min(num_clusters_test, len(negative_test))  # 防止聚类数超过样本数
    num_clusters_test = max(num_clusters_test, 1)  # 至少一个簇
    kmeans_test = KMeans(n_clusters=num_clusters_test, random_state=42)
    negative_test_fingerprints = np.vstack(negative_test['fingerprint'].values)
    kmeans_test.fit(negative_test_fingerprints)
    closest_test, _ = pairwise_distances_argmin_min(kmeans_test.cluster_centers_, negative_test_fingerprints)
    selected_negative_test = negative_test.iloc[closest_test[:num_clusters_test]]
    
    # 合并训练和测试数据集
    train_data = pd.concat([positive_train, selected_negative_train])
    test_data = pd.concat([positive_test, selected_negative_test])
    
    # 准备训练和测试数据
    X_train = np.vstack(train_data['fingerprint'].values)
    y_train = train_data['antibiotic_activity'].values
    X_test = np.vstack(test_data['fingerprint'].values)
    y_test = test_data['antibiotic_activity'].values
    
    # 特征缩放
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # 对每个模型进行训练和评估
    for model_name, (model, params) in models_params.items():
        clf = GridSearchCV(model, params, scoring='roc_auc', cv=5)
        clf.fit(X_train_scaled, y_train)
        best_model = clf.best_estimator_
        y_pred = best_model.predict(X_test_scaled)
        
        # 处理预测概率
        if hasattr(best_model, "predict_proba"):
            y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
        else:
            # 对于不支持 predict_proba 的模型，如某些 SVM
            y_pred_proba = best_model.decision_function(X_test_scaled)
            # 将决策函数输出归一化到 [0,1]
            y_pred_proba = (y_pred_proba - y_pred_proba.min()) / (y_pred_proba.max() - y_pred_proba.min()) if y_pred_proba.max() != y_pred_proba.min() else np.zeros_like(y_pred_proba)
        
        # 计算性能指标
        roc_auc = roc_auc_score(y_test, y_pred_proba)
        precision, recall, _ = precision_recall_curve(y_test, y_pred_proba)
        prc_auc = auc(recall, precision)
        accuracy = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        mcc = matthews_corrcoef(y_test, y_pred)
        recall_value = recall_score(y_test, y_pred)
        precision_score_value = precision_score(y_test, y_pred)
        # 计算混淆矩阵
        tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
        fp_rate = fp / (fp + tn) if (fp + tn) > 0 else 0

        # 存储结果
        results[model_name]['ROC-AUC'].append(roc_auc)
        results[model_name]['PRC-AUC'].append(prc_auc)
        results[model_name]['Accuracy'].append(accuracy)
        results[model_name]['F1 Score'].append(f1)
        results[model_name]['MCC'].append(mcc)
        results[model_name]['Precision'].append(precision_score_value)
        results[model_name]['Recall'].append(recall_value)
        results[model_name]['False Positives'].append(fp)
        results[model_name]['False Positive Rate'].append(fp_rate)
        
        # 存储每折每个模型的详细结果
        fold_result = {
            'Fold': fold + 1,
            'Model': model_name,
            'ROC-AUC': roc_auc,
            'PRC-AUC': prc_auc,
            'Accuracy': accuracy,
            'F1 Score': f1,
            'MCC': mcc,
            'Recall': recall_value,
            'Precision': precision_score_value,
            'False Positives': fp,
            'False Positive Rate': fp_rate
        }
        all_fold_results.append(fold_result)

# 将所有折的结果保存为DataFrame
fold_results_df = pd.DataFrame(all_fold_results)
# 保存到CSV文件
fold_results_df.to_csv('model_evaluation_per_fold_tem.csv', index=False)




Processing fold 1
Processing fold 2
Processing fold 3
Processing fold 4
Processing fold 5


In [6]:
# -*- coding: utf-8 -*-
"""
GCN( GINE ) + Graph AutoEncoder 欠采样管线
------------------------------------------------
流程：
1) RDKit 将 SMILES -> 分子图 (节点+边的丰富特征)
2) 训练 Graph Autoencoder (重构节点特征的 MSE) 得到图级嵌入
3) 基于嵌入，按“与正类差异最大(最不相似)”的原则，从负类中选出与正类 1:1 的样本
4) 用选出来的 1:1 数据做 Stratified 5-Fold 训练 GINE-Classifier
5) 评估并输出：ROC-AUC, PRC-AUC, Accuracy, F1, MCC, Recall, Precision, False Positives, False Positive Rate
6) 将每折与总体结果保存到 CSV；保存最佳折模型权重

依赖：
- rdkit
- torch, torch_geometric (>=2.2)
- scikit-learn, pandas, numpy

数据要求：CSV 至少包含两列：'smiles', 'antibiotic_activity'(0/1)
"""

import os
import sys
import math
import json
import random
import argparse
from typing import List, Tuple

import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import Linear

# PyG
from torch_geometric.data import Data
try:
    from torch_geometric.loader import DataLoader
except Exception:
    from torch_geometric.data import DataLoader
from torch_geometric.nn import global_mean_pool, global_add_pool, GINEConv, BatchNorm

from rdkit import Chem
from rdkit.Chem import AllChem

from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import (roc_auc_score, average_precision_score, f1_score,
                             accuracy_score, matthews_corrcoef, precision_score,
                             recall_score, confusion_matrix)

# -----------------------------
# 配置区（可以改）
# -----------------------------
SEED = 42
DEFAULT_CSV = 'E:/SyntheMol-main/data/Data/1_training_data/raw_data.csv'
RESULT_CSV = 'cv_results.csv'
FOLD_DETAIL_CSV = 'cv_per_fold.csv'
BEST_MODEL_PATH = 'best_gine_model.pth'
EMBED_CSV = 'graph_embeddings.csv'

MAX_WORKERS = min(os.cpu_count() or 0, 30)   # 兼容你的“≤30 CPU”的要求
BATCH_SIZE_AE = 64
BATCH_SIZE_CLS = 64
EPOCHS_AE = 60
EPOCHS_CLS = 120
PATIENCE = 15
LR_AE = 1e-3
LR_CLS = 2e-3
WEIGHT_DECAY = 1e-4
DROPOUT = 0.2
HIDDEN = 128
NUM_GINE = 3                   # 编码层数
VAL_SPLIT = 0.1                # 训练折里再切出一点做早停
DIST_METRIC = 'cosine'         # 欠采样相异度度量：'cosine' 或 'euclidean'
NEG_POS_RATIO = 1.0            # 负:正=1:1

# 常见元素表（原子序号），其余归入 "other"
COMMON_Z = [1,5,6,7,8,9,14,15,16,17,19,11,12,20,26,29,30,35,53]  # H,B,C,N,O,F,Si,P,S,Cl,K,Na,Mg,Ca,Fe,Cu,Zn,Br,I

# -----------------------------
# 工具函数
# -----------------------------
def set_seed(seed=SEED):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def one_hot(val, choices):
    v = [0] * len(choices)
    if val in choices:
        v[choices.index(val)] = 1
    return v


def atom_features(atom: Chem.rdchem.Atom) -> List[float]:
    """
    常见、有效的节点特征（比单一原子序号显著更强）：
    - 原子序号 one-hot（COMMON_Z + other）
    - 度(degree 0-5) one-hot
    - 杂化态 one-hot
    - 形式电荷 [-2..2] one-hot
    - 是否芳香、是否在环
    - 全替代氢原子数(0..4) one-hot
    - 手性（R/S/None） one-hot
    """
    z = atom.GetAtomicNum()
    z_onehot = one_hot(z if z in COMMON_Z else -1, COMMON_Z + [-1])

    degree = atom.GetTotalDegree()
    degree_onehot = one_hot(min(degree, 5), list(range(6)))

    hyb = atom.GetHybridization()
    hyb_choices = [
        Chem.rdchem.HybridizationType.SP, Chem.rdchem.HybridizationType.SP2,
        Chem.rdchem.HybridizationType.SP3, Chem.rdchem.HybridizationType.SP3D,
        Chem.rdchem.HybridizationType.SP3D2
    ]
    hyb_onehot = one_hot(hyb if hyb in hyb_choices else None, hyb_choices+[None])

    charge = int(atom.GetFormalCharge())
    charge = max(-2, min(2, charge))
    charge_onehot = one_hot(charge, [-2,-1,0,1,2])

    num_h = min(atom.GetTotalNumHs(), 4)
    num_h_onehot = one_hot(num_h, [0,1,2,3,4])

    aromatic = [int(atom.GetIsAromatic())]
    ring = [int(atom.IsInRing())]

    chiral_tag = atom.GetChiralTag()
    chiral_choices = [
        Chem.rdchem.ChiralType.CHI_UNSPECIFIED,
        Chem.rdchem.ChiralType.CHI_TETRAHEDRAL_CW,
        Chem.rdchem.ChiralType.CHI_TETRAHEDRAL_CCW
    ]
    chiral_onehot = one_hot(chiral_tag if chiral_tag in chiral_choices else Chem.rdchem.ChiralType.CHI_UNSPECIFIED,
                            chiral_choices)

    return z_onehot + degree_onehot + hyb_onehot + charge_onehot + num_h_onehot + aromatic + ring + chiral_onehot


def bond_features(bond: Chem.rdchem.Bond) -> List[float]:
    """
    边特征：
    - 键类型 one-hot：SINGLE/DOUBLE/TRIPLE/AROMATIC
    - 共轭(conjugated)、在环(in ring)
    - 立体化 one-hot（简化）
    """
    bt = bond.GetBondType()
    bt_choices = [Chem.BondType.SINGLE, Chem.BondType.DOUBLE,
                  Chem.BondType.TRIPLE, Chem.BondType.AROMATIC]
    bt_onehot = one_hot(bt if bt in bt_choices else None, bt_choices+[None])

    conj = [int(bond.GetIsConjugated())]
    ring = [int(bond.IsInRing())]

    stereo = bond.GetStereo()
    stereo_choices = [Chem.rdchem.BondStereo.STEREONONE,
                      Chem.rdchem.BondStereo.STEREOZ,
                      Chem.rdchem.BondStereo.STEREOE]
    stereo_onehot = one_hot(stereo if stereo in stereo_choices else Chem.rdchem.BondStereo.STEREONONE,
                            stereo_choices)

    return bt_onehot + conj + ring + stereo_onehot


def smiles_to_graph(smiles: str):
    mol = Chem.MolFromSmiles(smiles)
    if mol is None: 
        return None
    Chem.Kekulize(mol, clearAromaticFlags=False)
    AllChem.Compute2DCoords(mol)

    x = []
    for atom in mol.GetAtoms():
        x.append(atom_features(atom))
    x = torch.tensor(x, dtype=torch.float)

    edge_index = []
    edge_attr = []
    for b in mol.GetBonds():
        i, j = b.GetBeginAtomIdx(), b.GetEndAtomIdx()
        f = bond_features(b)
        edge_index.append([i, j]); edge_attr.append(f)
        edge_index.append([j, i]); edge_attr.append(f)

    if len(edge_index) == 0:
        edge_index = torch.empty((2,0), dtype=torch.long)
        edge_attr = torch.empty((0, len(bond_features(Chem.MolFromSmiles('CC').GetBonds()[0]))), dtype=torch.float)
    else:
        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
        edge_attr = torch.tensor(edge_attr, dtype=torch.float)

    return Data(x=x, edge_index=edge_index, edge_attr=edge_attr)


# -----------------------------
# 模型
# -----------------------------
def mlp(in_dim, out_dim):
    return nn.Sequential(
        nn.Linear(in_dim, out_dim),
        nn.ReLU(),
        nn.Linear(out_dim, out_dim)
    )


class GINEEncoder(nn.Module):
    def __init__(self, in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT):
        super().__init__()
        self.num_layers = num_layers
        self.convs = nn.ModuleList()
        self.bns = nn.ModuleList()
        self.dropout = dropout

        # 第一层
        self.convs.append(GINEConv(mlp(in_dim, hidden), edge_dim=edge_dim))
        self.bns.append(BatchNorm(hidden))
        # 其余层
        for _ in range(num_layers-1):
            self.convs.append(GINEConv(mlp(hidden, hidden), edge_dim=edge_dim))
            self.bns.append(BatchNorm(hidden))

    def forward(self, x, edge_index, edge_attr):
        h = x
        for conv, bn in zip(self.convs, self.bns):
            h_res = h
            h = conv(h, edge_index, edge_attr)
            h = bn(h)
            h = F.relu(h)
            h = F.dropout(h, p=self.dropout, training=self.training)
            # 轻微残差（维度一致时）
            if h_res.shape == h.shape:
                h = h + 0.1 * h_res
        return h  # node embeddings


class GraphAE(nn.Module):
    """ 简单 Graph Autoencoder：编码 node -> H，解码重构 node 特征 """
    def __init__(self, in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT):
        super().__init__()
        self.encoder = GINEEncoder(in_dim, edge_dim, hidden, num_layers, dropout)
        self.decoder = nn.Sequential(
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Linear(hidden, in_dim)
        )

    def forward(self, x, edge_index, edge_attr, batch):
        h = self.encoder(x, edge_index, edge_attr)         # [N, hidden]
        x_hat = self.decoder(h)                             # [N, in_dim]
        g = global_mean_pool(h, batch)                      # graph embedding
        return x_hat, g

    def encode_nodes(self, x, edge_index, edge_attr):
        return self.encoder(x, edge_index, edge_attr)


class GINEClassifier(nn.Module):
    """ 图级分类器，可从 AE.encoder 初始化参数 """
    def __init__(self, in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT):
        super().__init__()
        self.encoder = GINEEncoder(in_dim, edge_dim, hidden, num_layers, dropout)
        self.head = nn.Sequential(
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, 1)
        )

    def forward(self, x, edge_index, edge_attr, batch):
        h = self.encoder(x, edge_index, edge_attr)         # [N, hidden]
        g = global_mean_pool(h, batch)                     # [B, hidden]
        logit = self.head(g).view(-1)
        return logit

    def load_from_ae(self, ae: GraphAE):
        self.encoder.load_state_dict(ae.encoder.state_dict(), strict=False)


# -----------------------------
# 训练 / 评估函数
# -----------------------------
def train_ae(ae, loader, device, epochs=EPOCHS_AE, lr=LR_AE, wd=WEIGHT_DECAY, patience=PATIENCE):
    ae = ae.to(device)
    opt = torch.optim.Adam(ae.parameters(), lr=lr, weight_decay=wd)

    # 兼容不同 torch 版本：有的没有 verbose 参数
    try:
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            opt, mode='min', factor=0.5, patience=5, verbose=False
        )
    except TypeError:
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            opt, mode='min', factor=0.5, patience=5
        )

    best_loss, bad = float('inf'), 0

    for ep in range(1, epochs+1):
        ae.train()
        total = 0.0
        for data in loader:
            data = data.to(device)
            opt.zero_grad(set_to_none=True)
            x_hat, g = ae(data.x, data.edge_index, data.edge_attr, data.batch)
            loss = F.mse_loss(x_hat, data.x)               # 节点特征重构损失
            loss.backward()
            torch.nn.utils.clip_grad_norm_(ae.parameters(), 2.0)
            opt.step()
            total += loss.item()

        mean_loss = total / max(len(loader), 1)
        # 按验证指标(这里用训练均值替代)调整学习率
        scheduler.step(mean_loss)

        # 兼容无 verbose 场景的简单日志
        lr_now = opt.param_groups[0]["lr"]
        print(f'[AE] Epoch {ep:03d} | recon MSE {mean_loss:.5f} | lr {lr_now:.2e}')

        if mean_loss < best_loss - 1e-5:
            best_loss, bad = mean_loss, 0
            torch.save(ae.state_dict(), 'best_graph_ae.pth')
        else:
            bad += 1
            if bad >= patience and ep >= 20:
                print('[AE] Early stop.')
                break

    ae.load_state_dict(torch.load('best_graph_ae.pth', map_location=device))
    ae.eval()
    return ae



@torch.no_grad()
def get_graph_embeddings(ae: GraphAE, loader, device) -> np.ndarray:
    ae.eval()
    embs = []
    for data in loader:
        data = data.to(device)
        h = ae.encode_nodes(data.x, data.edge_index, data.edge_attr)
        g = global_mean_pool(h, data.batch)                # [B, hidden]
        embs.append(g.cpu().numpy())
    return np.concatenate(embs, axis=0)


def select_negatives_farthest(embs: np.ndarray, labels: np.ndarray,
                              metric: str = DIST_METRIC, ratio: float = NEG_POS_RATIO) -> np.ndarray:
    """
    embs: [N, d] 图嵌入；labels: [N] in {0,1}
    返回一个布尔索引，表示被选中的样本（全部正类 + 选出来的负类）
    规则：以“与正类中心 的相似度最低(或距离最大)”来选负类；负样本数≈正样本数*ratio
    """
    pos_idx = np.where(labels == 1)[0]
    neg_idx = np.where(labels == 0)[0]
    if len(pos_idx) == 0 or len(neg_idx) == 0:
        return np.ones_like(labels, dtype=bool)

    pos_centroid = embs[pos_idx].mean(axis=0, keepdims=True)  # [1,d]
    neg_embs = embs[neg_idx]                                  # [K,d]

    if metric == 'cosine':
        # 相似度越小越“相异”
        # cos = <a,b> / (||a||*||b||)
        a = neg_embs / (np.linalg.norm(neg_embs, axis=1, keepdims=True) + 1e-9)
        b = pos_centroid / (np.linalg.norm(pos_centroid, axis=1, keepdims=True) + 1e-9)
        sim = (a @ b.T).reshape(-1)                           # [K]
        order = np.argsort(sim)                               # 从小到大
    elif metric == 'euclidean':
        dist = np.linalg.norm(neg_embs - pos_centroid, axis=1)
        order = np.argsort(-dist)                             # 从大到小
    else:
        raise ValueError("metric must be 'cosine' or 'euclidean'")

    k = int(round(len(pos_idx) * ratio))
    chosen_neg = neg_idx[order[:k]]
    mask = np.zeros_like(labels, dtype=bool)
    mask[pos_idx] = True
    mask[chosen_neg] = True
    return mask


def train_one_epoch_cls(model, loader, optimizer, criterion, device):
    model.train()
    total = 0.0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad(set_to_none=True)
        logits = model(data.x, data.edge_index, data.edge_attr, data.batch)
        y = data.y.view(-1).to(device)
        loss = criterion(logits, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        optimizer.step()
        total += loss.item()
    return total / max(len(loader),1)


@torch.no_grad()
def infer(model, loader, device):
    model.eval()
    ys, ps = [], []
    for data in loader:
        data = data.to(device)
        logits = model(data.x, data.edge_index, data.edge_attr, data.batch)
        prob = torch.sigmoid(logits)
        ys.append(data.y.view(-1).cpu().numpy())
        ps.append(prob.cpu().numpy())
    y_true = np.concatenate(ys) if ys else np.array([])
    y_pred = np.concatenate(ps) if ps else np.array([])
    return y_true, y_pred


def calc_metrics(y_true, y_prob, threshold=0.5):
    if y_true.size == 0:
        return {k: float('nan') for k in
                ['ROC-AUC','PRC-AUC','Accuracy','F1','MCC','Recall','Precision','False Positives','False Positive Rate']}
    y_hat = (y_prob >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_hat, labels=[0,1]).ravel()
    fpr = fp / max((fp + tn), 1)
    out = {
        'ROC-AUC': roc_auc_score(y_true, y_prob),
        'PRC-AUC': average_precision_score(y_true, y_prob),
        'Accuracy': accuracy_score(y_true, y_hat),
        'F1': f1_score(y_true, y_hat, zero_division=0),
        'MCC': matthews_corrcoef(y_true, y_hat),
        'Recall': recall_score(y_true, y_hat, zero_division=0),
        'Precision': precision_score(y_true, y_hat, zero_division=0),
        'False Positives': int(fp),
        'False Positive Rate': fpr,
    }
    return out


# -----------------------------
# 主流程
# -----------------------------
def main(args):
    set_seed(SEED)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')

    # 1) 读数据
    df = pd.read_csv(args.csv)
    assert 'smiles' in df.columns and 'antibiotic_activity' in df.columns, \
        "CSV 必须包含 'smiles' 与 'antibiotic_activity' 两列"
    smiles = df['smiles'].astype(str).tolist()
    labels = df['antibiotic_activity'].astype(int).to_numpy()

    # 2) 构图
    data_list: List[Data] = []
    drop_idx = []
    for i, smi in enumerate(tqdm(smiles, desc='SMILES->Graph')):
        g = smiles_to_graph(smi)
        if g is None or g.x.numel() == 0:
            drop_idx.append(i); continue
        g.y = torch.tensor([labels[i]], dtype=torch.float32)
        data_list.append(g)
    if drop_idx:
        print(f'Warning: {len(drop_idx)} SMILES 转换失败，已跳过。')
        labels = np.delete(labels, drop_idx, axis=0)

    # 维度信息
    in_dim = data_list[0].x.size(1)
    edge_dim = data_list[0].edge_attr.size(1) if data_list[0].edge_attr is not None else 0
    print(f'Node feat dim = {in_dim} | Edge feat dim = {edge_dim} | N graphs = {len(data_list)}')

    # 3) 训练 Graph AE（全部样本，不分标签）
    ae_loader = DataLoader(data_list, batch_size=BATCH_SIZE_AE, shuffle=True, num_workers=0)
    ae = GraphAE(in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT)
    ae = train_ae(ae, ae_loader, device)

    # 4) 提取图嵌入，并做“差异最大”的负类选择
    eval_loader = DataLoader(data_list, batch_size=BATCH_SIZE_AE, shuffle=False, num_workers=0)
    embs = get_graph_embeddings(ae, eval_loader, device)         # [N, hidden]
    pd.DataFrame(embs).to_csv(EMBED_CSV, index=False)

    mask = select_negatives_farthest(embs, labels, metric=DIST_METRIC, ratio=NEG_POS_RATIO)
    data_balanced = [d for d, m in zip(data_list, mask) if m]
    labels_balanced = labels[mask]
    print(f'After undersampling by dissimilarity: pos={labels_balanced.sum()} '
          f'neg={(1-labels_balanced).sum()} total={len(data_balanced)}')

    # 5) 5-fold 训练分类器（encoder 从 AE 迁移初始化）
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
    fold_metrics = []
    all_true, all_prob = [], []
    best_key, best_score = None, -1.0

    for fold, (tr_idx, te_idx) in enumerate(skf.split(np.arange(len(data_balanced)), labels_balanced), 1):
        train_subset = [data_balanced[i] for i in tr_idx]
        test_subset  = [data_balanced[i] for i in te_idx]
        y_train = labels_balanced[tr_idx]
        y_test  = labels_balanced[te_idx]

        # 再从训练里切出 VAL 做早停
        tr_part, val_part, _, _ = train_test_split(
            train_subset, y_train, test_size=VAL_SPLIT, stratify=y_train, random_state=SEED)

        train_loader = DataLoader(tr_part, batch_size=BATCH_SIZE_CLS, shuffle=True,  num_workers=0)
        val_loader   = DataLoader(val_part, batch_size=BATCH_SIZE_CLS, shuffle=False, num_workers=0)
        test_loader  = DataLoader(test_subset, batch_size=BATCH_SIZE_CLS, shuffle=False, num_workers=0)

        model = GINEClassifier(in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT).to(device)
        model.load_from_ae(ae)  # 迁移初始化

        # 正负不均衡依然可能存在（抽样后一般接近1:1），保险起见计算 pos_weight
        pos_count = float((y_train==1).sum()); neg_count = float((y_train==0).sum())
        pos_weight = torch.tensor([(neg_count / max(pos_count, 1.0))], device=device, dtype=torch.float32)
        criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

        optimizer = torch.optim.Adam(model.parameters(), lr=LR_CLS, weight_decay=WEIGHT_DECAY)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

        best_val, bad = -1.0, 0
        best_state = None

        for ep in range(1, EPOCHS_CLS+1):
            loss = train_one_epoch_cls(model, train_loader, optimizer, criterion, device)
            yv, pv = infer(model, val_loader, device)
            # 用 PRC-AUC 作为早停指标（在极不均衡时更鲁棒）
            val_ap = average_precision_score(yv, pv) if yv.size > 0 else 0.0
            scheduler.step(val_ap)
            print(f'[Fold {fold}] Ep {ep:03d} | train loss {loss:.4f} | val AP {val_ap:.4f} | lr {optimizer.param_groups[0]["lr"]:.2e}')

            if val_ap > best_val + 1e-5:
                best_val, bad = val_ap, 0
                best_state = model.state_dict()
            else:
                bad += 1
                if bad >= PATIENCE and ep >= 30:
                    print(f'[Fold {fold}] Early stop.')
                    break

        # 加载最佳参数并测试
        if best_state is not None:
            model.load_state_dict(best_state)
        yt, pt = infer(model, test_loader, device)
        all_true.append(yt); all_prob.append(pt)
        m = calc_metrics(yt, pt, threshold=0.5)
        m_row = {'Fold': fold, **m}
        fold_metrics.append(m_row)
        print(f'[Fold {fold}] Test metrics: {m}')

        # 记录一个“最佳折”模型（以 ROC-AUC 决定）
        if m['ROC-AUC'] > best_score:
            best_score = m['ROC-AUC']; best_key = fold
            torch.save(model.state_dict(), BEST_MODEL_PATH)

    # 6) 汇总并保存结果
    fold_df = pd.DataFrame(fold_metrics)
    fold_df.to_csv(FOLD_DETAIL_CSV, index=False)

    all_true = np.concatenate(all_true); all_prob = np.concatenate(all_prob)
    overall = calc_metrics(all_true, all_prob, threshold=0.5)
    overall['Model'] = 'GINE-Classifier (AE init)'
    print('\n========== Overall (5-fold aggregated) ==========')
    for k,v in overall.items():
        print(f'{k}: {v:.6f}' if isinstance(v, float) else f'{k}: {v}')
    print(f'Best fold model saved to: {BEST_MODEL_PATH}')

    # 也保存一份总表（便于你和其它模型对比拼表）
    out_row = {
        'Model': 'GINE(AE init)+Dissimilar undersampling',
        **overall
    }
    pd.DataFrame([out_row]).to_csv(RESULT_CSV, index=False)
    print(f'\nPer-fold metrics -> {FOLD_DETAIL_CSV}')
    print(f'Overall metrics -> {RESULT_CSV}')


if __name__ == '__main__':
    import argparse, sys
    parser = argparse.ArgumentParser()
    parser.add_argument('--csv', type=str, default=DEFAULT_CSV,
                        help="包含 'smiles' 与 'antibiotic_activity' 的CSV路径")
    # 在 Notebook/IDE 等环境里会有多余的参数，使用 parse_known_args 忽略它们
    args, _ = parser.parse_known_args()
    # 也可以在这里打印一下以确认：
    # print('Parsed args:', args)
    main(args)



Using device: cpu


SMILES->Graph: 100%|██████████| 13524/13524 [00:10<00:00, 1249.98it/s]


Node feat dim = 47 | Edge feat dim = 10 | N graphs = 13524
[AE] Epoch 001 | recon MSE 0.02076 | lr 1.00e-03
[AE] Epoch 002 | recon MSE 0.00463 | lr 1.00e-03
[AE] Epoch 003 | recon MSE 0.00259 | lr 1.00e-03
[AE] Epoch 004 | recon MSE 0.00179 | lr 1.00e-03
[AE] Epoch 005 | recon MSE 0.00137 | lr 1.00e-03
[AE] Epoch 006 | recon MSE 0.00114 | lr 1.00e-03
[AE] Epoch 007 | recon MSE 0.00101 | lr 1.00e-03
[AE] Epoch 008 | recon MSE 0.00093 | lr 1.00e-03
[AE] Epoch 009 | recon MSE 0.00088 | lr 1.00e-03
[AE] Epoch 010 | recon MSE 0.00094 | lr 1.00e-03
[AE] Epoch 011 | recon MSE 0.00084 | lr 1.00e-03
[AE] Epoch 012 | recon MSE 0.00081 | lr 1.00e-03
[AE] Epoch 013 | recon MSE 0.00085 | lr 1.00e-03
[AE] Epoch 014 | recon MSE 0.00078 | lr 1.00e-03
[AE] Epoch 015 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 016 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 017 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 018 | recon MSE 0.00075 | lr 1.00e-03
[AE] Epoch 019 | recon MSE 0.00074 | lr 1.00e-03
[AE] Epoch

英文版选nearest

In [1]:
# -*- coding: utf-8 -*-
"""
GINE + Graph Autoencoder (GAE) undersampling pipeline for molecular activity prediction.

Pipeline
--------
1) RDKit: convert SMILES -> molecular graphs with rich node/edge features
2) Train a Graph Autoencoder (reconstruct node features) to obtain graph embeddings
3) Undersample negatives by similarity to the positive centroid in embedding space:
   - 'nearest'  : pick the most similar negatives (smallest difference)  [DEFAULT]
   - 'farthest' : pick the most dissimilar negatives (largest difference)
   Supports cosine (default) or euclidean distance; negative:positive ~ 1:1 (configurable)
4) Train a GINE-based graph classifier with Stratified 5-Fold cross-validation
5) Report and save metrics per fold and overall:
   ROC-AUC, PRC-AUC, Accuracy, F1, MCC, Recall, Precision, False Positives, False Positive Rate
6) Save best-fold model weights and optional embeddings CSV

Requirements
------------
- rdkit
- torch, torch_geometric (>= 2.2 recommended)
- scikit-learn, pandas, numpy, tqdm

Input
-----
A CSV with at least two columns:
- 'smiles' : SMILES string
- 'antibiotic_activity' : binary label {0,1}

Usage
-----
Command line:
    python train_gcn_gae_pipeline.py --csv /path/to/data.csv \
        --pick nearest --metric cosine --ratio 1.0

Jupyter / IDE:
    This script uses `parse_known_args()` to ignore extra kernel args.
    Alternatively, call:
        from types import SimpleNamespace
        main(SimpleNamespace(csv='/path/to/data.csv', pick='nearest', metric='cosine', ratio=1.0))
"""

import os
import random
from typing import List

import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn.functional as F
from torch import nn

# PyG
from torch_geometric.data import Data
try:
    from torch_geometric.loader import DataLoader
except Exception:  # backward compatibility with very old PyG versions
    from torch_geometric.data import DataLoader
from torch_geometric.nn import global_mean_pool, GINEConv, BatchNorm

# RDKit
from rdkit import Chem
from rdkit.Chem import AllChem

# sklearn
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,
    f1_score,
    accuracy_score,
    matthews_corrcoef,
    precision_score,
    recall_score,
    confusion_matrix,
)

# -----------------------------
# Configuration (edit as needed)
# -----------------------------
SEED = 42
DEFAULT_CSV = "E:/SyntheMol-main/data/Data/1_training_data/raw_data.csv"
RESULT_CSV = "cv_results.csv"
FOLD_DETAIL_CSV = "cv_per_fold.csv"
BEST_MODEL_PATH = "best_gine_model.pth"
EMBED_CSV = "graph_embeddings.csv"

MAX_WORKERS = min(os.cpu_count() or 0, 30)  # keep <= 30 CPUs if you parallelize elsewhere
BATCH_SIZE_AE = 64
BATCH_SIZE_CLS = 64
EPOCHS_AE = 60
EPOCHS_CLS = 120
PATIENCE = 15
LR_AE = 1e-3
LR_CLS = 2e-3
WEIGHT_DECAY = 1e-4
DROPOUT = 0.2
HIDDEN = 128
NUM_GINE = 3                  # number of encoder layers
VAL_SPLIT = 0.10              # split from the training fold for early stopping
DIST_METRIC = "cosine"        # 'cosine' or 'euclidean' (default used if not overriden by CLI)
NEG_POS_RATIO = 1.0           # negative:positive = 1:1 by default
DEFAULT_PICK = "nearest"      # 'nearest' (most similar negatives) or 'farthest' (most dissimilar)

# Common atomic numbers; everything else goes to "other"
COMMON_Z = [1, 5, 6, 7, 8, 9, 14, 15, 16, 17, 19, 11, 12, 20, 26, 29, 30, 35, 53]
# H,B,C,N,O,F,Si,P,S,Cl,K,Na,Mg,Ca,Fe,Cu,Zn,Br,I

# -----------------------------
# Utilities
# -----------------------------
def set_seed(seed: int = SEED) -> None:
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


def one_hot(val, choices):
    vec = [0] * len(choices)
    if val in choices:
        vec[choices.index(val)] = 1
    return vec


def atom_features(atom: Chem.rdchem.Atom) -> List[float]:
    """
    Node features:
      - atomic number: one-hot (COMMON_Z + 'other')
      - degree: one-hot [0..5]
      - hybridization: one-hot {sp, sp2, sp3, sp3d, sp3d2, other}
      - formal charge: one-hot [-2..2]
      - total hydrogens: one-hot [0..4]
      - aromatic (bool)
      - in ring (bool)
      - chirality tag: one-hot {unspecified, CW, CCW}
    """
    z = atom.GetAtomicNum()
    z_onehot = one_hot(z if z in COMMON_Z else -1, COMMON_Z + [-1])

    degree = atom.GetTotalDegree()
    degree_onehot = one_hot(min(degree, 5), list(range(6)))

    hyb = atom.GetHybridization()
    hyb_choices = [
        Chem.rdchem.HybridizationType.SP,
        Chem.rdchem.HybridizationType.SP2,
        Chem.rdchem.HybridizationType.SP3,
        Chem.rdchem.HybridizationType.SP3D,
        Chem.rdchem.HybridizationType.SP3D2,
    ]
    hyb_onehot = one_hot(hyb if hyb in hyb_choices else None, hyb_choices + [None])

    charge = int(atom.GetFormalCharge())
    charge = max(-2, min(2, charge))
    charge_onehot = one_hot(charge, [-2, -1, 0, 1, 2])

    num_h = min(atom.GetTotalNumHs(), 4)
    num_h_onehot = one_hot(num_h, [0, 1, 2, 3, 4])

    aromatic = [int(atom.GetIsAromatic())]
    ring = [int(atom.IsInRing())]

    chiral_tag = atom.GetChiralTag()
    chiral_choices = [
        Chem.rdchem.ChiralType.CHI_UNSPECIFIED,
        Chem.rdchem.ChiralType.CHI_TETRAHEDRAL_CW,
        Chem.rdchem.ChiralType.CHI_TETRAHEDRAL_CCW,
    ]
    chiral_onehot = one_hot(
        chiral_tag if chiral_tag in chiral_choices else Chem.rdchem.ChiralType.CHI_UNSPECIFIED,
        chiral_choices,
    )

    return (
        z_onehot
        + degree_onehot
        + hyb_onehot
        + charge_onehot
        + num_h_onehot
        + aromatic
        + ring
        + chiral_onehot
    )


def bond_features(bond: Chem.rdchem.Bond) -> List[float]:
    """
    Edge features:
      - bond type: one-hot {single,double,triple,aromatic,other}
      - conjugated (bool)
      - in ring (bool)
      - stereo: one-hot {none, Z, E}
    """
    bt = bond.GetBondType()
    bt_choices = [
        Chem.BondType.SINGLE,
        Chem.BondType.DOUBLE,
        Chem.BondType.TRIPLE,
        Chem.BondType.AROMATIC,
    ]
    bt_onehot = one_hot(bt if bt in bt_choices else None, bt_choices + [None])

    conj = [int(bond.GetIsConjugated())]
    ring = [int(bond.IsInRing())]

    stereo = bond.GetStereo()
    stereo_choices = [
        Chem.rdchem.BondStereo.STEREONONE,
        Chem.rdchem.BondStereo.STEREOZ,
        Chem.rdchem.BondStereo.STEREOE,
    ]
    stereo_onehot = one_hot(
        stereo if stereo in stereo_choices else Chem.rdchem.BondStereo.STEREONONE, stereo_choices
    )

    return bt_onehot + conj + ring + stereo_onehot


def smiles_to_graph(smiles: str):
    """Build a PyG `Data` object from a SMILES string."""
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    # Keep aromatic flags; 2D coords are sufficient for this pipeline
    Chem.Kekulize(mol, clearAromaticFlags=False)
    AllChem.Compute2DCoords(mol)

    # Node features
    x = [atom_features(a) for a in mol.GetAtoms()]
    x = torch.tensor(x, dtype=torch.float)

    # Edges + edge features (bidirectional)
    edge_index = []
    edge_attr = []
    for b in mol.GetBonds():
        i, j = b.GetBeginAtomIdx(), b.GetEndAtomIdx()
        f = bond_features(b)
        edge_index.append([i, j]); edge_attr.append(f)
        edge_index.append([j, i]); edge_attr.append(f)

    if len(edge_index) == 0:
        # Rare edge-less molecule
        edge_index = torch.empty((2, 0), dtype=torch.long)
        # Build an empty edge_attr with correct feature width by probing a dummy bond
        dummy = Chem.MolFromSmiles("CC").GetBonds()[0]
        feat_w = len(bond_features(dummy))
        edge_attr = torch.empty((0, feat_w), dtype=torch.float)
    else:
        edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()
        edge_attr = torch.tensor(edge_attr, dtype=torch.float)

    return Data(x=x, edge_index=edge_index, edge_attr=edge_attr)

# -----------------------------
# Models
# -----------------------------
def mlp(in_dim: int, out_dim: int) -> nn.Sequential:
    return nn.Sequential(nn.Linear(in_dim, out_dim), nn.ReLU(), nn.Linear(out_dim, out_dim))


class GINEEncoder(nn.Module):
    """GINE encoder with BatchNorm, dropout and a light residual connection."""

    def __init__(self, in_dim: int, edge_dim: int, hidden: int = HIDDEN, num_layers: int = NUM_GINE, dropout: float = DROPOUT):
        super().__init__()
        self.num_layers = num_layers
        self.convs = nn.ModuleList()
        self.bns = nn.ModuleList()
        self.dropout = dropout

        # First layer
        self.convs.append(GINEConv(mlp(in_dim, hidden), edge_dim=edge_dim))
        self.bns.append(BatchNorm(hidden))

        # Subsequent layers
        for _ in range(num_layers - 1):
            self.convs.append(GINEConv(mlp(hidden, hidden), edge_dim=edge_dim))
            self.bns.append(BatchNorm(hidden))

    def forward(self, x, edge_index, edge_attr):
        h = x
        for conv, bn in zip(self.convs, self.bns):
            h_res = h
            h = conv(h, edge_index, edge_attr)
            h = bn(h)
            h = F.relu(h)
            h = F.dropout(h, p=self.dropout, training=self.training)
            # Tiny residual when shape matches
            if h_res.shape == h.shape:
                h = h + 0.1 * h_res
        return h  # node embeddings


class GraphAE(nn.Module):
    """Graph Autoencoder: node encoder -> reconstruct node features; returns graph embedding."""

    def __init__(self, in_dim: int, edge_dim: int, hidden: int = HIDDEN, num_layers: int = NUM_GINE, dropout: float = DROPOUT):
        super().__init__()
        self.encoder = GINEEncoder(in_dim, edge_dim, hidden, num_layers, dropout)
        self.decoder = nn.Sequential(nn.Linear(hidden, hidden), nn.ReLU(), nn.Linear(hidden, in_dim))

    def forward(self, x, edge_index, edge_attr, batch):
        h = self.encoder(x, edge_index, edge_attr)  # [N, hidden]
        x_hat = self.decoder(h)                     # [N, in_dim]
        g = global_mean_pool(h, batch)              # [B, hidden] graph embedding
        return x_hat, g

    def encode_nodes(self, x, edge_index, edge_attr):
        return self.encoder(x, edge_index, edge_attr)


class GINEClassifier(nn.Module):
    """Graph-level classifier; encoder can be initialized from a trained AE."""

    def __init__(self, in_dim: int, edge_dim: int, hidden: int = HIDDEN, num_layers: int = NUM_GINE, dropout: float = DROPOUT):
        super().__init__()
        self.encoder = GINEEncoder(in_dim, edge_dim, hidden, num_layers, dropout)
        self.head = nn.Sequential(
            nn.Linear(hidden, hidden),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden, 1),
        )

    def forward(self, x, edge_index, edge_attr, batch):
        h = self.encoder(x, edge_index, edge_attr)
        g = global_mean_pool(h, batch)
        logit = self.head(g).view(-1)
        return logit

    def load_from_ae(self, ae: GraphAE):
        self.encoder.load_state_dict(ae.encoder.state_dict(), strict=False)

# -----------------------------
# Training & evaluation helpers
# -----------------------------
def train_ae(ae: GraphAE, loader: DataLoader, device, epochs: int = EPOCHS_AE, lr: float = LR_AE,
             wd: float = WEIGHT_DECAY, patience: int = PATIENCE) -> GraphAE:
    """Train GraphAE with node feature reconstruction loss (MSE)."""
    ae = ae.to(device)
    opt = torch.optim.Adam(ae.parameters(), lr=lr, weight_decay=wd)

    # ReduceLROnPlateau: some torch versions don't support 'verbose'
    try:
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, mode="min", factor=0.5, patience=5, verbose=False)
    except TypeError:
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, mode="min", factor=0.5, patience=5)

    best_loss, bad = float("inf"), 0

    for ep in range(1, epochs + 1):
        ae.train()
        total = 0.0
        for data in loader:
            data = data.to(device)
            opt.zero_grad(set_to_none=True)
            x_hat, _ = ae(data.x, data.edge_index, data.edge_attr, data.batch)
            loss = F.mse_loss(x_hat, data.x)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(ae.parameters(), 2.0)
            opt.step()
            total += loss.item()

        mean_loss = total / max(len(loader), 1)
        scheduler.step(mean_loss)
        lr_now = opt.param_groups[0]["lr"]
        print(f"[AE] Epoch {ep:03d} | recon MSE {mean_loss:.5f} | lr {lr_now:.2e}")

        if mean_loss < best_loss - 1e-5:
            best_loss, bad = mean_loss, 0
            torch.save(ae.state_dict(), "best_graph_ae.pth")
        else:
            bad += 1
            if bad >= patience and ep >= 20:
                print("[AE] Early stop.")
                break

    ae.load_state_dict(torch.load("best_graph_ae.pth", map_location=device))
    ae.eval()
    return ae


@torch.no_grad()
def get_graph_embeddings(ae: GraphAE, loader: DataLoader, device) -> np.ndarray:
    """Return graph embeddings [N_graphs, hidden] from a trained AE encoder."""
    ae.eval()
    embs = []
    for data in loader:
        data = data.to(device)
        h = ae.encode_nodes(data.x, data.edge_index, data.edge_attr)
        g = global_mean_pool(h, data.batch)  # [B, hidden]
        embs.append(g.cpu().numpy())
    return np.concatenate(embs, axis=0)


def select_negatives_by_similarity(
    embs: np.ndarray,
    labels: np.ndarray,
    metric: str = DIST_METRIC,     # 'cosine' or 'euclidean'
    ratio: float = NEG_POS_RATIO,  # negatives : positives
    pick: str = DEFAULT_PICK       # 'nearest' (most similar) or 'farthest' (most dissimilar)
) -> np.ndarray:
    """
    Select negatives relative to the positive centroid in the embedding space.

    Behavior
    --------
    - cosine:    higher similarity -> more similar; lower -> more dissimilar
    - euclidean: smaller distance  -> more similar; larger -> more dissimilar

    Returns
    -------
    mask : (N,) bool array indicating retained samples (all positives + selected negatives)
    """
    assert pick in ("nearest", "farthest"), "pick must be 'nearest' or 'farthest'"

    pos_idx = np.where(labels == 1)[0]
    neg_idx = np.where(labels == 0)[0]
    if len(pos_idx) == 0 or len(neg_idx) == 0:
        return np.ones_like(labels, dtype=bool)

    pos_centroid = embs[pos_idx].mean(axis=0, keepdims=True)  # [1, d]
    neg_embs = embs[neg_idx]                                  # [K, d]

    if metric == "cosine":
        a = neg_embs / (np.linalg.norm(neg_embs, axis=1, keepdims=True) + 1e-9)
        b = pos_centroid / (np.linalg.norm(pos_centroid, axis=1, keepdims=True) + 1e-9)
        sim = (a @ b.T).reshape(-1)  # larger = more similar
        if pick == "nearest":
            order = np.argsort(-sim)   # descending  -> most similar first
        else:
            order = np.argsort(sim)    # ascending   -> most dissimilar first
    elif metric == "euclidean":
        dist = np.linalg.norm(neg_embs - pos_centroid, axis=1)  # smaller = more similar
        if pick == "nearest":
            order = np.argsort(dist)   # ascending  -> most similar first
        else:
            order = np.argsort(-dist)  # descending -> most dissimilar first
    else:
        raise ValueError("metric must be 'cosine' or 'euclidean'")

    k = int(round(len(pos_idx) * ratio))
    chosen_neg = neg_idx[order[:k]]

    mask = np.zeros_like(labels, dtype=bool)
    mask[pos_idx] = True
    mask[chosen_neg] = True
    return mask


def train_one_epoch_cls(model: nn.Module, loader: DataLoader, optimizer, criterion, device) -> float:
    model.train()
    total = 0.0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad(set_to_none=True)
        logits = model(data.x, data.edge_index, data.edge_attr, data.batch)
        y = data.y.view(-1).to(device)
        loss = criterion(logits, y)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        optimizer.step()
        total += loss.item()
    return total / max(len(loader), 1)


@torch.no_grad()
def infer(model: nn.Module, loader: DataLoader, device):
    model.eval()
    ys, ps = [], []
    for data in loader:
        data = data.to(device)
        logits = model(data.x, data.edge_index, data.edge_attr, data.batch)
        prob = torch.sigmoid(logits)
        ys.append(data.y.view(-1).cpu().numpy())
        ps.append(prob.cpu().numpy())
    y_true = np.concatenate(ys) if ys else np.array([])
    y_pred = np.concatenate(ps) if ps else np.array([])
    return y_true, y_pred


def calc_metrics(y_true: np.ndarray, y_prob: np.ndarray, threshold: float = 0.5):
    """Return all required metrics, including FP and FPR."""
    if y_true.size == 0:
        keys = [
            "ROC-AUC", "PRC-AUC", "Accuracy", "F1", "MCC",
            "Recall", "Precision", "False Positives", "False Positive Rate",
        ]
        return {k: float("nan") for k in keys}

    y_hat = (y_prob >= threshold).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_true, y_hat, labels=[0, 1]).ravel()
    fpr = fp / max((fp + tn), 1)

    return {
        "ROC-AUC": roc_auc_score(y_true, y_prob),
        "PRC-AUC": average_precision_score(y_true, y_prob),
        "Accuracy": accuracy_score(y_true, y_hat),
        "F1": f1_score(y_true, y_hat, zero_division=0),
        "MCC": matthews_corrcoef(y_true, y_hat),
        "Recall": recall_score(y_true, y_hat, zero_division=0),
        "Precision": precision_score(y_true, y_hat, zero_division=0),
        "False Positives": int(fp),
        "False Positive Rate": fpr,
    }

# -----------------------------
# Main
# -----------------------------
def main(args) -> None:
    set_seed(SEED)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # 1) Load data
    df = pd.read_csv(args.csv)
    assert "smiles" in df.columns and "antibiotic_activity" in df.columns, \
        "CSV must contain columns 'smiles' and 'antibiotic_activity'."
    smiles = df["smiles"].astype(str).tolist()
    labels = df["antibiotic_activity"].astype(int).to_numpy()

    # 2) Build graphs
    data_list: List[Data] = []
    drop_idx = []
    for i, smi in enumerate(tqdm(smiles, desc="SMILES->Graph")):
        g = smiles_to_graph(smi)
        if g is None or g.x.numel() == 0:
            drop_idx.append(i)
            continue
        g.y = torch.tensor([labels[i]], dtype=torch.float32)
        data_list.append(g)

    if len(data_list) == 0:
        raise RuntimeError("No valid molecules after SMILES->graph conversion.")

    if drop_idx:
        print(f"Warning: {len(drop_idx)} SMILES failed to convert and were skipped.")
        labels = np.delete(labels, drop_idx, axis=0)

    in_dim = data_list[0].x.size(1)
    edge_dim = data_list[0].edge_attr.size(1) if data_list[0].edge_attr is not None else 0
    print(f"Node feat dim = {in_dim} | Edge feat dim = {edge_dim} | N graphs = {len(data_list)}")

    # 3) Train Graph AE on ALL samples
    ae_loader = DataLoader(data_list, batch_size=BATCH_SIZE_AE, shuffle=True, num_workers=0)
    ae = GraphAE(in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT)
    ae = train_ae(ae, ae_loader, device)

    # 4) Get embeddings and perform similarity-based undersampling
    eval_loader = DataLoader(data_list, batch_size=BATCH_SIZE_AE, shuffle=False, num_workers=0)
    embs = get_graph_embeddings(ae, eval_loader, device)  # [N, hidden]
    pd.DataFrame(embs).to_csv(EMBED_CSV, index=False)

    pick_mode = getattr(args, "pick", DEFAULT_PICK)
    dist_metric = getattr(args, "metric", DIST_METRIC)
    ratio = float(getattr(args, "ratio", NEG_POS_RATIO))

    mask = select_negatives_by_similarity(
        embs, labels, metric=dist_metric, ratio=ratio, pick=pick_mode
    )
    data_balanced = [d for d, m in zip(data_list, mask) if m]
    labels_balanced = labels[mask]
    print(
        f"After undersampling by '{pick_mode}' ({dist_metric}) similarity: "
        f"pos={labels_balanced.sum()} neg={(1 - labels_balanced).sum()} total={len(data_balanced)}"
    )

    # 5) 5-fold CV training
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
    fold_metrics = []
    all_true, all_prob = [], []
    best_score = -1.0

    for fold, (tr_idx, te_idx) in enumerate(skf.split(np.arange(len(data_balanced)), labels_balanced), 1):
        train_subset = [data_balanced[i] for i in tr_idx]
        test_subset = [data_balanced[i] for i in te_idx]
        y_train = labels_balanced[tr_idx]
        y_test = labels_balanced[te_idx]

        # validation split from training fold for early stopping
        tr_part, val_part, _, _ = train_test_split(
            train_subset, y_train, test_size=VAL_SPLIT, stratify=y_train, random_state=SEED
        )

        train_loader = DataLoader(tr_part, batch_size=BATCH_SIZE_CLS, shuffle=True, num_workers=0)
        val_loader = DataLoader(val_part, batch_size=BATCH_SIZE_CLS, shuffle=False, num_workers=0)
        test_loader = DataLoader(test_subset, batch_size=BATCH_SIZE_CLS, shuffle=False, num_workers=0)

        model = GINEClassifier(in_dim, edge_dim, hidden=HIDDEN, num_layers=NUM_GINE, dropout=DROPOUT).to(device)
        model.load_from_ae(ae)

        # class imbalance guard (should be close to 1:1 after undersampling)
        pos_count = float((y_train == 1).sum())
        neg_count = float((y_train == 0).sum())
        pos_weight = torch.tensor([(neg_count / max(pos_count, 1.0))], device=device, dtype=torch.float32)
        criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

        optimizer = torch.optim.Adam(model.parameters(), lr=LR_CLS, weight_decay=WEIGHT_DECAY)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="max", factor=0.5, patience=5)

        best_val, bad = -1.0, 0
        best_state = None

        for ep in range(1, EPOCHS_CLS + 1):
            loss = train_one_epoch_cls(model, train_loader, optimizer, criterion, device)
            yv, pv = infer(model, val_loader, device)
            val_ap = average_precision_score(yv, pv) if yv.size > 0 else 0.0  # PRC-AUC for early stopping
            scheduler.step(val_ap)
            print(
                f"[Fold {fold}] Ep {ep:03d} | train loss {loss:.4f} | val AP {val_ap:.4f} "
                f"| lr {optimizer.param_groups[0]['lr']:.2e}"
            )

            if val_ap > best_val + 1e-5:
                best_val, bad = val_ap, 0
                best_state = model.state_dict()
            else:
                bad += 1
                if bad >= PATIENCE and ep >= 30:
                    print(f"[Fold {fold}] Early stop.")
                    break

        # test with best validation state
        if best_state is not None:
            model.load_state_dict(best_state)
        yt, pt = infer(model, test_loader, device)
        all_true.append(yt)
        all_prob.append(pt)
        m = calc_metrics(yt, pt, threshold=0.5)
        fold_metrics.append({"Fold": fold, **m})
        print(f"[Fold {fold}] Test metrics: {m}")

        # keep best model by ROC-AUC
        if m["ROC-AUC"] > best_score:
            best_score = m["ROC-AUC"]
            torch.save(model.state_dict(), BEST_MODEL_PATH)

    # 6) Save per-fold and overall results
    pd.DataFrame(fold_metrics).to_csv(FOLD_DETAIL_CSV, index=False)

    all_true = np.concatenate(all_true)
    all_prob = np.concatenate(all_prob)
    overall_metrics = calc_metrics(all_true, all_prob, threshold=0.5)

    print("\n========== Overall (5-fold aggregated) ==========")
    for k, v in overall_metrics.items():
        print(f"{k}: {v:.6f}" if isinstance(v, float) else f"{k}: {v}")
    print(f"Best fold model saved to: {BEST_MODEL_PATH}")

    # Single-row summary for easy comparison across models
    out_row = {
        "Model": f"GINE(AE init) + {pick_mode} undersampling ({dist_metric}), ratio={ratio}",
        **overall_metrics,
    }
    pd.DataFrame([out_row]).to_csv(RESULT_CSV, index=False)
    print(f"\nPer-fold metrics -> {FOLD_DETAIL_CSV}")
    print(f"Overall metrics -> {RESULT_CSV}")


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--csv",
        type=str,
        default=DEFAULT_CSV,
        help="Path to CSV with columns 'smiles' and 'antibiotic_activity'.",
    )
    parser.add_argument(
        "--pick",
        type=str,
        choices=["nearest", "farthest"],
        default=DEFAULT_PICK,
        help="Negative selection mode relative to positive centroid (default: nearest).",
    )
    parser.add_argument(
        "--metric",
        type=str,
        choices=["cosine", "euclidean"],
        default=DIST_METRIC,
        help="Similarity metric in embedding space (default: cosine).",
    )
    parser.add_argument(
        "--ratio",
        type=float,
        default=NEG_POS_RATIO,
        help="Negative:positive ratio for undersampling (default: 1.0).",
    )
    # In notebook/IDE environments extra args like --f=... may be injected.
    args, _ = parser.parse_known_args()
    main(args)


Using device: cpu


SMILES->Graph: 100%|██████████| 13524/13524 [00:11<00:00, 1203.32it/s]


Node feat dim = 47 | Edge feat dim = 10 | N graphs = 13524
[AE] Epoch 001 | recon MSE 0.02076 | lr 1.00e-03
[AE] Epoch 002 | recon MSE 0.00463 | lr 1.00e-03
[AE] Epoch 003 | recon MSE 0.00259 | lr 1.00e-03
[AE] Epoch 004 | recon MSE 0.00179 | lr 1.00e-03
[AE] Epoch 005 | recon MSE 0.00137 | lr 1.00e-03
[AE] Epoch 006 | recon MSE 0.00114 | lr 1.00e-03
[AE] Epoch 007 | recon MSE 0.00101 | lr 1.00e-03
[AE] Epoch 008 | recon MSE 0.00093 | lr 1.00e-03
[AE] Epoch 009 | recon MSE 0.00088 | lr 1.00e-03
[AE] Epoch 010 | recon MSE 0.00094 | lr 1.00e-03
[AE] Epoch 011 | recon MSE 0.00084 | lr 1.00e-03
[AE] Epoch 012 | recon MSE 0.00081 | lr 1.00e-03
[AE] Epoch 013 | recon MSE 0.00085 | lr 1.00e-03
[AE] Epoch 014 | recon MSE 0.00078 | lr 1.00e-03
[AE] Epoch 015 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 016 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 017 | recon MSE 0.00076 | lr 1.00e-03
[AE] Epoch 018 | recon MSE 0.00075 | lr 1.00e-03
[AE] Epoch 019 | recon MSE 0.00074 | lr 1.00e-03
[AE] Epoch

In [7]:
import pandas as pd
import numpy as np
from rdkit import Chem
from rdkit.Chem import AllChem
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier  # 决策树
from sklearn.neighbors import KNeighborsClassifier  # KNN
from xgboost import XGBClassifier  # XGBoost
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    recall_score,
    precision_recall_curve,
    auc,
    f1_score,
    matthews_corrcoef,
    confusion_matrix
)
from sklearn.model_selection import GridSearchCV
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
import matplotlib.pyplot as plt

# ============== 1. 读取训练集 (library_1 + library_2) 和 测试集 (library_3) ==============
train_data_1 = pd.read_csv(
    r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_1_binarized.csv'
)
train_data_2 = pd.read_csv(
    r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_2_binarized.csv'
)
train_data = pd.concat([train_data_1, train_data_2], ignore_index=True)

test_data = pd.read_csv(
    r'C:\Users\DuYih\Desktop\github\DL Microbiology Antibiotics\SyntheMol-main\data\Data\1_training_data\library_3_binarized.csv'
)

# ============== 2. 如果需要，从 SMILES 计算 Morgan 指纹 ==============
def get_fingerprint(smiles, n_bits=2048):
    mol = Chem.MolFromSmiles(smiles)
    if mol:
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2, nBits=n_bits)
        return np.array(fp)
    else:
        return np.zeros(n_bits)

# 如果 CSV 中有 'smiles' 列，就计算指纹(0/1)。否则可删除以下赋值。
if 'smiles' in train_data.columns:
    train_data['fingerprint'] = train_data['smiles'].apply(get_fingerprint)
if 'smiles' in test_data.columns:
    test_data['fingerprint'] = test_data['smiles'].apply(get_fingerprint)

# ============== 3. 将训练集分为正负样本 ==============
train_positive = train_data[train_data['antibiotic_activity'] == 1]
train_negative = train_data[train_data['antibiotic_activity'] == 0]

print(f"[训练集] 原始正样本数={len(train_positive)}, 负样本数={len(train_negative)}")

# ============== 4. 对训练负样本做KMeans，欠采样 ==============
#    簇数 = 2 × (训练正样本数)，选取各簇中心附近的负样本
num_clusters_train_neg = 2 * len(train_positive)
num_clusters_train_neg = min(num_clusters_train_neg, len(train_negative))  # 不超过负样本数
num_clusters_train_neg = max(num_clusters_train_neg, 1)  # 至少1

train_neg_fps = np.vstack(train_negative['fingerprint'].values)
kmeans_train_neg = KMeans(n_clusters=num_clusters_train_neg, random_state=42)
kmeans_train_neg.fit(train_neg_fps)

closest_train_neg, _ = pairwise_distances_argmin_min(kmeans_train_neg.cluster_centers_, train_neg_fps)
selected_train_negative = train_negative.iloc[closest_train_neg]

final_train_data = pd.concat([train_positive, selected_train_negative], ignore_index=True)
print(f"[训练集] 欠采样后负样本数={len(selected_train_negative)}, 总训练样本数={len(final_train_data)}")

# ============== 5. 构建用于“最近邻”检索的训练集特征矩阵 ==============
#    我们要对测试正样本做最近邻查询，看其最近的训练样本是正还是负
train_fp_matrix = np.vstack(final_train_data['fingerprint'].values)
train_labels = final_train_data['antibiotic_activity'].values

# ============== 6. 测试集分为正负样本 ==============
test_positive = test_data[test_data['antibiotic_activity'] == 1]
test_negative = test_data[test_data['antibiotic_activity'] == 0]

print(f"[测试集] 原始正样本={len(test_positive)}, 负样本={len(test_negative)}")

# ============== 7. 筛选测试正样本：保持最近邻训练样本为正的测试样本 ==============
def get_nearest_train_label(fp, X_train, y_train):
    # 计算 fp 到 X_train 所有向量的欧氏距离，并找最小距离的index
    dists = np.linalg.norm(X_train - fp, axis=1)
    idx_min = np.argmin(dists)
    return y_train[idx_min]

selected_test_positive_list = []
test_pos_fp = np.vstack(test_positive['fingerprint'].values) if len(test_positive)>0 else np.empty((0,))

for i in range(len(test_positive)):
    fp = test_pos_fp[i]
    label_of_nearest = get_nearest_train_label(fp, train_fp_matrix, train_labels)
    if label_of_nearest == 1:
        # 最近邻在训练集中是正样本 => 保留
        selected_test_positive_list.append(test_positive.iloc[i])
        
selected_test_positive = pd.DataFrame(selected_test_positive_list)
print(f"[测试集] 筛选后(最近邻=正)的正样本数 = {len(selected_test_positive)}")

# ============== 8. 筛选测试负样本：欠采样, 簇数= 2 × (测试正样本数) ==============
final_test_pos_count = len(selected_test_positive)
num_clusters_test_neg = 2 * final_test_pos_count
num_clusters_test_neg = min(num_clusters_test_neg, len(test_negative))  # 不超过负样本数
num_clusters_test_neg = max(num_clusters_test_neg, 1) if len(test_negative)>0 else 0

selected_test_negative = pd.DataFrame(columns=test_negative.columns)

if num_clusters_test_neg > 0:
    test_neg_fps = np.vstack(test_negative['fingerprint'].values)
    kmeans_test_neg = KMeans(n_clusters=num_clusters_test_neg, random_state=42)
    kmeans_test_neg.fit(test_neg_fps)
    
    closest_test_neg, _ = pairwise_distances_argmin_min(kmeans_test_neg.cluster_centers_, test_neg_fps)
    selected_test_negative = test_negative.iloc[closest_test_neg]

print(f"[测试集] 筛选后负样本数 = {len(selected_test_negative)} (簇数={num_clusters_test_neg})")

# ============== 9. 最终测试集 = 上述保留的正样本 + 筛选后的负样本 ==============
final_test_data = pd.concat([selected_test_positive, selected_test_negative], ignore_index=True)
print(f"[测试集] 最终用于验证的样本数 = {len(final_test_data)}")

# ============== 10. 准备特征与标签，并做标准化 ==============
X_train = np.vstack(final_train_data['fingerprint'].values)
y_train = final_train_data['antibiotic_activity'].values

if len(final_test_data) > 0:
    X_test = np.vstack(final_test_data['fingerprint'].values)
    y_test = final_test_data['antibiotic_activity'].values
else:
    X_test = np.empty((0, X_train.shape[1]))
    y_test = np.array([])

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test) if len(final_test_data) > 0 else X_test

# ============== 11. 定义模型与超参数网格 + 训练 & 评估 ==============
models_params = {
    'LogisticRegression': (
        LogisticRegression(max_iter=1000),
        {'C': [0.01, 0.1, 1, 10, 100]}
    ),
    'SVM': (
        SVC(probability=True),
        {'C': [0.1, 1, 10], 'kernel': ['linear', 'rbf']}
    ),
    'RandomForest': (
        RandomForestClassifier(),
        {'n_estimators': [10, 50, 100], 'max_depth': [None, 10, 20]}
    ),
    'DecisionTree': (
        DecisionTreeClassifier(),
        {'max_depth': [None, 5, 10], 'min_samples_split': [2, 5, 10]}
    ),
    'KNN': (
        KNeighborsClassifier(),
        {'n_neighbors': [3, 5, 7], 'weights': ['uniform', 'distance']}
    ),
    'XGBoost': (
        XGBClassifier(use_label_encoder=False, eval_metric='logloss'),
        {'n_estimators': [50, 100], 'max_depth': [3, 6], 'learning_rate': [0.01, 0.1]}
    )
}

results = []

if len(final_test_data) == 0:
    print("\n[警告] 最终测试集为空，无法评估模型！")
else:
    for model_name, (model, param_grid) in models_params.items():
        print(f"\n>>> Training model: {model_name}")
        clf = GridSearchCV(model, param_grid, scoring='roc_auc', cv=5)
        clf.fit(X_train_scaled, y_train)
        
        best_model = clf.best_estimator_
        y_pred = best_model.predict(X_test_scaled)
        
        # 预测概率
        if hasattr(best_model, "predict_proba"):
            y_pred_proba = best_model.predict_proba(X_test_scaled)[:, 1]
        else:
            decision_vals = best_model.decision_function(X_test_scaled)
            # 归一化到[0,1]
            y_pred_proba = (
                (decision_vals - decision_vals.min()) / (decision_vals.max() - decision_vals.min())
                if decision_vals.max() != decision_vals.min()
                else np.zeros_like(decision_vals)
            )
        
        # 计算指标
        roc_auc = roc_auc_score(y_test, y_pred_proba) if len(y_test)>0 else None
        prec, rec, _ = precision_recall_curve(y_test, y_pred_proba) if len(y_test)>0 else (None, None, None)
        prc_auc = auc(rec, prec) if prec is not None else None
        accuracy = accuracy_score(y_test, y_pred) if len(y_test)>0 else None
        f1 = f1_score(y_test, y_pred) if len(y_test)>0 else None
        mcc = matthews_corrcoef(y_test, y_pred) if len(y_test)>0 else None
        recall_val = recall_score(y_test, y_pred) if len(y_test)>0 else None
        precision_val = precision_score(y_test, y_pred) if len(y_test)>0 else None
        
        if len(y_test) > 0:
            tn, fp, fn, tp = confusion_matrix(y_test, y_pred).ravel()
            fp_rate = fp / (fp + tn) if (fp + tn) else 0
        else:
            tn = fp = fn = tp = 0
            fp_rate = None
        
        print(f"Best Params: {clf.best_params_}")
        print(f"ROC-AUC: {roc_auc}")
        print(f"PRC-AUC: {prc_auc}")
        print(f"Accuracy: {accuracy}")
        print(f"F1 Score: {f1}")
        print(f"MCC: {mcc}")
        print(f"Recall: {recall_val}")
        print(f"Precision: {precision_val}")
        print(f"False Positives: {fp}, False Positive Rate: {fp_rate}")

        results.append({
            'Model': model_name,
            'Best Params': clf.best_params_,
            'ROC-AUC': roc_auc,
            'PRC-AUC': prc_auc,
            'Accuracy': accuracy,
            'F1 Score': f1,
            'MCC': mcc,
            'Recall': recall_val,
            'Precision': precision_val,
            'False Positives': fp,
            'False Positive Rate': fp_rate
        })

results_df = pd.DataFrame(results)
results_df.to_csv('model_evaluation_results_final128.csv', index=False)
print("\n所有模型训练和评估完成。结果已保存到 model_evaluation_results_final128.csv。")




[训练集] 原始正样本数=422, 负样本数=8595
[训练集] 欠采样后负样本数=844, 总训练样本数=1266
[测试集] 原始正样本=112, 负样本=5264
[测试集] 筛选后(最近邻=正)的正样本数 = 16
[测试集] 筛选后负样本数 = 32 (簇数=32)
[测试集] 最终用于验证的样本数 = 48

>>> Training model: LogisticRegression
Best Params: {'C': 0.01}
ROC-AUC: 0.826171875
PRC-AUC: 0.7974591953277466
Accuracy: 0.8333333333333334
F1 Score: 0.6923076923076923
MCC: 0.61665481259351
Recall: 0.5625
Precision: 0.9
False Positives: 1, False Positive Rate: 0.03125

>>> Training model: SVM
Best Params: {'C': 1, 'kernel': 'rbf'}
ROC-AUC: 0.857421875
PRC-AUC: 0.8678075485024499
Accuracy: 0.8125
F1 Score: 0.6086956521739131
MCC: 0.5843487097907776
Recall: 0.4375
Precision: 1.0
False Positives: 0, False Positive Rate: 0.0

>>> Training model: RandomForest
Best Params: {'max_depth': None, 'n_estimators': 100}
ROC-AUC: 0.8212890625
PRC-AUC: 0.8127170396701646
Accuracy: 0.8541666666666666
F1 Score: 0.7407407407407407
MCC: 0.6659496553711425
Recall: 0.625
Precision: 0.9090909090909091
False Positives: 1, False Positive Rate: 0.

In [6]:
num_clusters_test_neg

224

$$\tau = \alpha\times\frac{a}{b}$$