# 一、数据处理

本小节主要对原始数据进行预处理，包括数据清洗、数据格式化、特征抽取等，从而获得用于训练的输入数据

In [2]:
import time
import os
import pandas as pd
import math
from featured_data_generated import cal_pep_des_multiprocess

import warnings
warnings.simplefilter("ignore", (UserWarning, FutureWarning))

In [3]:
import time

def timer(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} 执行时间: {end_time - start_time:.4f} 秒")
        return result
    return wrapper

In [4]:
class GenerateSample():
    """用于生成蛋白质或肽的分类与回归样本。

    该类从给定的正样本和负样本数据集中筛选、处理数据，并生成用于机器学习的分类和回归数据集。

    Attributes:
        gramapa_path (str): 包含正样本数据的文件路径。
        negative_file_path (str): 包含负样本数据的文件路径。
        generate_example_path (str): 生成样本数据的输出路径。
        mode (str): 数据生成模式，'all' 表示包含所有样本，其他值仅生成正样本。
    """

    def __init__(self, gramapa_path, negative_file_path, generate_example_path, mode):
        """初始化 GenerateSample 类。

        Args:
            grampa_path (str): 正样本数据文件路径。
            negative_file_path (str): 负样本数据文件路径。
            generate_example_path (str): 输出数据存储路径。
            mode (str): 数据生成模式，'all' 表示包含所有样本，其他值表示仅处理正样本。
        """
        self.gramapa_path = gramapa_path
        self.negative_file_path = negative_file_path
        self.generate_example_path = generate_example_path
        self.mode = mode

    @timer
    def __call__(self, *args, **kwargs):
        """调用该类实例，执行样本生成流程。

        处理步骤包括：
        1. 读取并筛选正样本数据
        2. 生成所有肽序列的描述符
        3. 读取负样本数据并进行处理
        4. 合并正负样本数据
        5. 生成分类和回归样本数据，并保存到文件
        """
        
        positive_sample = self.generate_positive_peptides(self.gramapa_path)
        negtive_sample = self.generate_negative_data(self.negative_file_path)
        all_sample = self.concat_datasets(positive_sample, negtive_sample)
        
        print("generating xgboost sample.....")
        num = len(all_sample)
        start = time.time()
        sequence = all_sample["sequence"]
        peptides = sequence.values.copy().tolist()
        result = all_sample["MIC"]
        type = all_sample["type"]
        xgb_sample_df = cal_pep_des_multiprocess.cal_pep_parallel(peptides, sequence, result, type)
        self.split_sample(xgb_sample_df, "xgb")
        end = time.time()
        print("generate xgboost feature data cost time:", (end - start) / num)

        print("generating lstm sample.....")
        if self.mode == "all":
            self.split_sample(all_sample, "lstm")
        else:
            self.split_sample(positive_sample, "lstm")
        print("lstm sample is ok")

    def filter_data_with_str(self, col_name, str, data):
        """筛选包含特定字符串的行。

        Args:
            col_name (str): 要筛选的列名。
            str_value (str): 要匹配的字符串。
            data (pd.DataFrame): 输入数据。

        Returns:
            pd.DataFrame: 过滤后的数据集。
        """
        bool_filter = data[col_name].str.contains(str)
        filter_data = data[bool_filter]
        return filter_data

    def positive_data_filter(self, data):
        """筛选符合标准的正样本数据。

        过滤规则：
        1. 仅保留 C 端酰胺化（C-terminal amidation）的序列
        2. 仅保留序列长度在 5 到 50 之间的样本

        Args:
            data (pd.DataFrame): 输入正样本数据。

        Returns:
            pd.DataFrame: 过滤后的正样本数据。
        """
        data = data[data["has_cterminal_amidation"] == True]
        data = data[data["sequence"].apply(lambda x: 5 < len(x) <= 50)]
        return data

    def generate_positive_peptides(self, positive_file_path):
        """计算所有肽序列的 MIC 值，并标记为正样本。
        
        该方法的核心流程如下：
        1. 获取 `sequence` 列中所有唯一的肽序列。
        2. 对于每个唯一肽序列：
            - 计算其 MIC 值的 10 次方之和（用于后续计算几何平均值）。
            - 计算该肽序列在数据集中出现的次数（用于取平均）。
            - 计算其平均 MIC 值，并标记该肽序列为正样本（type = 1）。
        3. 组织数据并返回包含以下三列的 DataFrame：
            - `sequence`: 肽序列
            - `MIC`: 计算得到的平均 MIC 值
            - `type`: 样本类型（1 代表正样本）

        Args:
            data (pd.DataFrame): 经过筛选的正样本数据。

        Returns:
            pd.DataFrame: 包含肽序列、平均 MIC 值和样本类型的 DataFrame。
        """

        data = pd.read_csv(positive_file_path, encoding="utf8")
        data = self.filter_data_with_str("bacterium", "aureus", data)
        data = self.positive_data_filter(data)
        
        data_all = [[], [], []]
        for i in data["sequence"].unique():
            data_all[0].append(i)
            log_num = 0
            count = 0
            for i in data[data["sequence"] == i]["value"]:
                log_num += math.pow(10, i)  # 计算 10^MIC 值的总和
                count += 1
            avg_MIC = float(log_num / count)
            data_all[1].append(avg_MIC)  # 计算几何平均 MIC 值
            data_all[2].append(1)  # 该肽序列属于正样本，标记为 1

        data_all = list(map(list, zip(*data_all)))
        data = pd.DataFrame(data=data_all, columns=["sequence", "MIC", "type"])
        return data

    def generate_negative_data(self, negative_file_path):
        """读取并处理负样本数据。

        过滤规则：
        1. 移除包含 B、X、Z、O、U 的序列
        2. 仅保留长度小于 50 的序列

        Args:
            negative_file_path (str): 负样本数据文件路径。

        Returns:
            pd.DataFrame: 处理后的负样本数据。
        """
        data_negative = pd.read_csv(negative_file_path, encoding="utf8")
        data_negative = data_negative[~data_negative["Sequence"].str.contains("B|X|Z|O|U")]
        data_negative = data_negative[data_negative["Sequence"].apply(lambda x: 5 < len(x) <= 50)]
        data_negative.reset_index(drop=True, inplace=True)
        data = pd.DataFrame(columns=["sequence", "MIC", "type"])
        for i in range(data_negative.shape[0]):
            # data = data.append({"sequence": data_negative["Sequence"][i], "MIC": 8196, "type": 0}, ignore_index=True)
            data = pd.concat([data, pd.DataFrame([{"sequence": data_negative["Sequence"][i], "MIC": 8196, "type": 0}])], ignore_index=True)
        data = data.drop_duplicates()
        return data
    
    def data2csv(self, data, file_name):
        """将 DataFrame 保存为 CSV 文件。

        Args:
            data (pd.DataFrame): 需要保存的数据。
            file_name (str): 输出 CSV 文件路径。
        """
        data.to_csv(file_name, encoding="utf8", index=False)
    
    def concat_datasets(self, positive_file, negative_file):
        """合并正样本和负样本数据，并随机打乱顺序。

        Args:
            positive_file (pd.DataFrame): 处理后的正样本数据。
            negative_file (pd.DataFrame): 处理后的负样本数据。

        Returns:
            pd.DataFrame: 合并后的数据集。
        """
        data_concat = pd.concat([positive_file, negative_file], ignore_index=True, axis=0)  # 默认纵向合并0 横向合并1
        data_concat = data_concat.sample(frac=1, random_state=None)
        data_concat.reset_index(drop=True, inplace=True)
        return data_concat

    def split_sample(self, sample, tag, train_ratio=0.8):
        """将数据集拆分为训练集和测试集，并保存为 CSV 文件。

        训练集占 80%，测试集占 20%。

        Args:
            sample (pd.DataFrame): 需要拆分的数据集。
        """
        num = len(sample)
        train_sample = sample[:int(train_ratio * num)]
        test_sample = sample[int(train_ratio * num):]
        self.data2csv(train_sample, os.path.join(self.generate_example_path, f"{tag}_train_sample.csv"))
        self.data2csv(test_sample, os.path.join(self.generate_example_path, f"{tag}_test_sample.csv"))

In [5]:
import os

def ensure_folder_exists(folder_path):
    """
    确保文件夹存在。如果文件夹不存在，则创建它。

    Args:
        folder_path (str): 要创建的文件夹路径。

    Returns:
        None
    """
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)  # 创建文件夹
        print(f"已创建新文件夹: {folder_path}")
    else:
        print(f"文件夹已存在: {folder_path}")

ensure_folder_exists("data")

已创建新文件夹: data


In [6]:
gramapa_path = "downloads/199116/grampa.csv"
negative_file_path = "downloads/199116/origin_negative.csv"
generate_example_path = "data"
mode = "all"
generate_sample = GenerateSample(gramapa_path, negative_file_path, generate_example_path, mode)
generate_sample()

# 二、模型训练

## 2.1 XGBoost 分类器

generating xgboost sample.....
开始计算，共有 7769 条序列，使用 4 个进程
已处理 1000/7769 条序列 | 进程 ID: 241 | 进程统计: {241: 269, 243: 234, 242: 247, 244: 250}
已处理 2000/7769 条序列 | 进程 ID: 244 | 进程统计: {241: 515, 243: 487, 242: 491, 244: 507}
已处理 3000/7769 条序列 | 进程 ID: 242 | 进程统计: {241: 771, 243: 725, 242: 746, 244: 758}
已处理 4000/7769 条序列 | 进程 ID: 242 | 进程统计: {241: 1034, 243: 973, 242: 1000, 244: 993}
已处理 5000/7769 条序列 | 进程 ID: 241 | 进程统计: {241: 1283, 243: 1218, 242: 1256, 244: 1243}
已处理 6000/7769 条序列 | 进程 ID: 244 | 进程统计: {241: 1517, 243: 1478, 242: 1498, 244: 1507}
已处理 7000/7769 条序列 | 进程 ID: 243 | 进程统计: {241: 1761, 243: 1743, 242: 1745, 244: 1751}
所有序列处理完成！共 7769 条
进程工作统计: {241: 1944, 243: 1923, 242: 1947, 244: 1955}
generate xgboost feature data cost time: 0.05625973181737438
generating lstm sample.....
lstm sample is ok
__call__ 执行时间: 440.5697 秒


## 2.1 XGBoost 分类器

In [7]:
# Pip 21.3+ is required
! pip --version
! pip install --upgrade pip

pip 23.0.1 from /usr/local/lib/python3.10/site-packages/pip (python 3.10)
Looking in indexes: https://mirrors.aliyun.com/pypi/simple
Collecting pip
  Downloading https://mirrors.aliyun.com/pypi/packages/c9/bc/b7db44f5f39f9d0494071bddae6880eb645970366d0a200022a1a93d57f5/pip-25.0.1-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.0.1
    Uninstalling pip-23.0.1:
      Successfully uninstalled pip-23.0.1
Successfully installed pip-25.0.1
[0m

In [8]:
! pip install xgboost==3.0.0

Looking in indexes: https://mirrors.aliyun.com/pypi/simple
Collecting xgboost==3.0.0
  Downloading https://mirrors.aliyun.com/pypi/packages/63/f1/653afe1a1b7e1d03f26fd4bd30f3eebcfac2d8982e1a85b6be3355dcae25/xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl (253.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.9/253.9 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:06[0m
    torch (>=1.9.*)
           ~~~~~~^[0m[33m
[0mInstalling collected packages: xgboost
Successfully installed xgboost-3.0.0
[0m

In [9]:
from utils import get_train_xgb_classifier_data
import xgboost
from xgboost.callback import EarlyStopping
from sklearn import metrics
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
from collections import Counter
import joblib
import numpy as np

In [10]:
@timer
def train_xgboost_classifier(train_data_path, test_data_path):
    # 加载数据
    x_train, x_test, y_train, y_test = get_train_xgb_classifier_data(train_data_path, test_data_path)
    
    # 查看类别分布
    train_counter = Counter(y_train)
    test_counter = Counter(y_test)
    
    # 计算类别比例
    train_total = sum(train_counter.values())
    test_total = sum(test_counter.values())

    train_ratio = {k: v / train_total for k, v in train_counter.items()}
    test_ratio = {k: v / test_total for k, v in test_counter.items()}

    # 计算 scale_pos_weight
    scale_pos_weight = train_counter[0] / max(1, train_counter[1])

    # 打印信息
    print("==== 类别分布 ====")
    print(f"训练集类别分布: {train_counter}")
    print(f"测试集类别分布: {test_counter}\n")

    print("==== 类别比例 ====")
    print(f"训练集类别比例: {train_ratio}")
    print(f"测试集类别比例: {test_ratio}\n")

    print("==== 类别不平衡分析 ====")
    if train_ratio[1] < 0.2:
        print(f"训练集类别 1 仅占 {train_ratio[1]:.2%}，数据存在类别不平衡问题！")
        print(f"建议设置 XGBoost 的 `scale_pos_weight={scale_pos_weight:.2f}`\n")
    else:
        print("训练集类别相对平衡。\n")

    print("==== 数据拆分合理性检查 ====")
    if abs(train_ratio[1] - test_ratio[1]) < 0.02:  # 差异小于 2%
        print("训练集和测试集类别比例接近，拆分合理。\n")
    else:
        print(f"测试集类别 1 占比 {test_ratio[1]:.2%}，与训练集 {train_ratio[1]:.2%} 差异较大，可能需要重新拆分！\n")
    
    
    # 参数网格
    param_grid = {
        'learning_rate': [0.1, 0.3, 0.5, 0.7],
        'max_depth': [4, 6, 8, 10],
        'n_estimators': [300, 600, 900, 1200]
    }
    
    xgb_model = xgboost.XGBClassifier(
        use_label_encoder=False,
        objective="binary:logistic",
        tree_method="hist",
        device="cuda",
        scale_pos_weight=scale_pos_weight,
        eval_metric='auc',
        verbosity=0
    )
    
    grid_search = GridSearchCV(
        estimator=xgb_model,
        param_grid=param_grid,
        scoring='f1',
        cv=3,
        verbose=2,
        n_jobs=-1
    )
    
    # 不传递额外参数给fit方法
    grid_search.fit(x_train, y_train)
    
    # 获取最佳参数
    best_parameters = grid_search.best_params_
    print("最佳参数:", best_parameters)
    print("最佳F1分数:", grid_search.best_score_)
    
    
    # 使用早停 - 注意eval_set是合法的fit参数
    # 创建验证集（用于早停）
    x_train_split, x_valid, y_train_split, y_valid = train_test_split(
        x_train, y_train, test_size=0.2, stratify=y_train, random_state=42
    )
    early_stop = EarlyStopping(rounds=50,
                               metric_name='auc',
                               data_name='validation_0')
    
    # 使用最佳参数训练最终模型
    final_model = xgboost.XGBClassifier(
        max_depth=best_parameters["max_depth"],
        n_estimators=best_parameters["n_estimators"],
        learning_rate=best_parameters["learning_rate"],
        use_label_encoder=False,
        objective="binary:logistic",
        tree_method="hist",
        device="cuda",
        scale_pos_weight=scale_pos_weight,
        eval_metric='auc',
        callbacks=[early_stop],
        verbosity=0
    )
    
    # 在 fit 方法中使用回调
    final_model.fit(
        x_train_split, y_train_split,
        eval_set=[(x_valid, y_valid)],
        verbose=True
    )
    
    # 保存模型
    joblib.dump(final_model, "xgb_classifier_model.pkl")
    
    # 评估最终模型
    y_pred = final_model.predict(x_test)
    
    print("混淆矩阵:")
    print(confusion_matrix(y_test, y_pred))
    print("\n分类报告:")
    print(classification_report(y_test, y_pred))
    print(f"准确率: {accuracy_score(y_test, y_pred):.4f}")
    print(f"F1分数: {f1_score(y_test, y_pred):.4f}")

In [11]:
train_xgboost_classifier(
    "data/xgb_train_sample.csv",
    "data/xgb_test_sample.csv"
)

==== 类别分布 ====
训练集类别分布: Counter({0: 4687, 1: 1528})
测试集类别分布: Counter({0: 1170, 1: 384})

==== 类别比例 ====
训练集类别比例: {0: 0.7541432019308125, 1: 0.24585679806918745}
测试集类别比例: {0: 0.752895752895753, 1: 0.2471042471042471}

==== 类别不平衡分析 ====
训练集类别相对平衡。

==== 数据拆分合理性检查 ====
训练集和测试集类别比例接近，拆分合理。

Fitting 3 folds for each of 64 candidates, totalling 192 fits
最佳参数: {'learning_rate': 0.1, 'max_depth': 4, 'n_estimators': 900}
最佳F1分数: 0.9393589585990361
[0]	validation_0-auc:0.93101
[1]	validation_0-auc:0.93594
[2]	validation_0-auc:0.95650
[3]	validation_0-auc:0.96077
[4]	validation_0-auc:0.96073
[5]	validation_0-auc:0.96317
[6]	validation_0-auc:0.96437
[7]	validation_0-auc:0.96894
[8]	validation_0-auc:0.96914
[9]	validation_0-auc:0.97505
[10]	validation_0-auc:0.97561
[11]	validation_0-auc:0.97583
[12]	validation_0-auc:0.97722
[13]	validation_0-auc:0.97815
[14]	validation_0-auc:0.97900
[15]	validation_0-auc:0.97992
[16]	validation_0-auc:0.97993
[17]	validation_0-auc:0.98095
[18]	validation_0-auc:0.981

## 2.2 XGBoost 排序器

In [12]:
from utils import get_train_xgb_rank_featured_data
from functools import partial

@timer
def train_xgboost_rank(train_data_path, test_data_path):
    """训练一个 XGBoost 排序模型（Learning to Rank），用于 MIC 值预测，并基于排名计算 Top-k 准确率。

    Args:
        train_data_path (str): 训练数据集的文件路径。
        test_data_path (str): 测试数据集的文件路径。

    Returns:
        None
    """

    x_train, x_test, y_train, y_test, test_data = get_train_xgb_rank_featured_data(train_data_path, test_data_path)

    xgb_model = xgboost.XGBRegressor(
        max_depth=3,
        n_estimators=200,
        learning_rate=0.2,
        use_label_encoder=False,
        objective="rank:pairwise",
        tree_method="hist",
        device="cuda",
        eval_metric="auc",
        verbosity=0
    )
    
    xgb_model.fit(x_train, y_train)
    
    # 评估最终模型
    y_pred = xgb_model.predict(x_test)
    
    df = pd.DataFrame(test_data["sequence"].copy(), columns=["sequence"])
    df["MIC"] = y_pred
    df.sort_values("MIC", inplace=True)
    for k in [50, 100, 500]:
        true_sequecne_topk = set(test_data.iloc[:k, ]["sequence"])
        pred_sequence_topk = set(df.iloc[:k, ]["sequence"])
        
        score = len(true_sequecne_topk & pred_sequence_topk) / k
        print(f"xgb_rank top{k} hit rate:", score)
    
    # 保存模型
    joblib.dump(xgb_model, "xgb_rank_model.pkl")

In [13]:
train_xgboost_rank(
    "data/xgb_train_sample.csv",
    "data/xgb_test_sample.csv"
)

xgb_rank top50 hit rate: 0.16
xgb_rank top100 hit rate: 0.23
xgb_rank top500 hit rate: 0.692
train_xgboost_rank 执行时间: 3.5369 秒
[CV] END ...learning_rate=0.1, max_depth=4, n_estimators=300; total time=   4.5s
[CV] END ...learning_rate=0.1, max_depth=4, n_estimators=600; total time=  14.2s
[CV] END ...learning_rate=0.1, max_depth=4, n_estimators=600; total time=  10.5s
[CV] END ...learning_rate=0.1, max_depth=4, n_estimators=900; total time=  17.1s
[CV] END ..learning_rate=0.1, max_depth=4, n_estimators=1200; total time=  24.5s
[CV] END ..learning_rate=0.1, max_depth=4, n_estimators=1200; total time=  19.8s
[CV] END ...learning_rate=0.1, max_depth=6, n_estimators=300; total time=  10.8s
[CV] END ...learning_rate=0.1, max_depth=6, n_estimators=600; total time=  15.3s
[CV] END ...learning_rate=0.1, max_depth=6, n_estimators=900; total time=  23.6s
[CV] END ...learning_rate=0.1, max_depth=6, n_estimators=900; total time=  19.9s
[CV] END ..learning_rate=0.1, max_depth=6, n_estimators=1200; t

## 2.3 LSTM 回归器

In [14]:
from torch import nn
import torch
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class LstmNet(nn.Module):
    def __init__(self, embedding_dim, hidden_num, num_layer, bidirectional, dropout, Letter_dict):
        """LSTM 网络模型，用于处理序列数据。

        Args:
            embedding_dim (int): 词嵌入的维度。
            hidden_num (int): LSTM 隐藏层单元数。
            num_layer (int): LSTM 层数。
            bidirectional (bool): 是否使用双向 LSTM。
            dropout (float): Dropout 比例。
            Letter_dict (dict): 字符到索引的映射字典。
        """
        super(LstmNet, self).__init__()
        # 定义 LSTM 层
        self.lstm = torch.nn.LSTM(
            embedding_dim,                  # 词嵌入维度
            hidden_num,                     # 隐藏层单元数
            num_layer,                      # LSTM 层数
            bidirectional=bidirectional,    # 是否为双向 LSTM
            batch_first=True,               # 设定 batch 维度优先
            dropout=dropout)                # Dropout 率
        # 定义线性层（全连接层）用于输出
        self.linear = nn.Sequential(
            nn.Linear(hidden_num * (2 if bidirectional == True else 1), 64),    # 连接隐藏层输出到 64 维
            nn.ReLU(inplace=True),                                              # ReLU 激活函数
            nn.Linear(64, 1))                                                   # 最终输出层
        # 定义词嵌入层
        self.embedding = torch.nn.Embedding(
            num_embeddings=len(Letter_dict) + 1,    # 词汇表大小
            embedding_dim=embedding_dim,            # 词嵌入维度
            padding_idx=0)                          # 设置填充索引

    def forward(self, x, length):
        """前向传播逻辑

        Args:
            x (Tensor): 输入的序列数据 (batch_size, seq_len)。
            length (Tensor): 每个序列的真实长度 (batch_size)。

        Returns:
            Tensor: 模型输出，形状为 (batch_size, 1)。
        """
        # 通过嵌入层，将索引转换为词向量
        x = self.embedding(x.long())
        # 使用 pack_padded_sequence 压缩填充序列，提高 LSTM 计算效率
        x = pack_padded_sequence(input=x, lengths=length, batch_first=True, enforce_sorted=False)
        # 通过 LSTM 层
        output, (h_s, h_c) = self.lstm(x)
        # 将压缩后的序列解压回原始序列形式
        output = pad_packed_sequence(output, batch_first=True)[0]
        # 计算所有时间步的均值，并传入全连接层进行预测
        out = self.linear(output.mean(dim=1))
        return out

In [18]:
from utils import df2list, get_dict, get_reverse_dict, evaluate_customized
from dataset import PeptideDataset
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error, r2_score


class LstmTrain():
    def __init__(self, train_file, test_file):
        self.net = LstmNet(embedding_dim, hidden_num, num_layer, bidirectional, dropout, get_dict())
        self.net.to(device)
        # 读取训练数据，数据共有3列，分别是[sequence, MIC, type]
        train_data = pd.read_csv(train_file, encoding="utf-8").reset_index(drop=True)
        train_data = df2list(train_data, "sequence", "MIC", "type", ngram_num, log_num=10)  # 将训练数据格式化
        # 构建数据集，一个数据单元由4部分构成，第一是序列的索引表示，定长50，不足补零；第二是log10MIC；第三是type值，0or1；第四是序列长度
        train_dataset = PeptideDataset(train_data)
        self.train_dataset = train_dataset
        self.train_dataloader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)

        # 数据加载，决定输入网络的数据形式，每次输入batch_size个数据
        # 一个batch_size的数据内容为：[tensor([[seq], [seq], ...]),
        #                           tensor([[log10MIC], [log10MIC], ...]),
        #                           tensor([type, type, ...]),
        #                           tensor([len, len, ...])]
        # 一个batch_size的数据维度为：[torch.Size([batch_size, 50]),
        #                           torch.Size([batch_size, 1]),
        #                           torch.Size([batch_size]),
        #                           torch.Size([batch_size])]

        test_data = pd.read_csv(test_file, encoding="utf-8").reset_index(drop=True)
        test_data = df2list(test_data, "sequence", "MIC", "type", ngram_num, log_num=10)
        test_dataset = PeptideDataset(test_data)
        self.test_dataloader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

        self.opt = torch.optim.AdamW(self.net.parameters(), lr=learning_rate)  # 优化器
        self.loss_fn = nn.MSELoss()  # 损失函数

    def __call__(self):
        # 初始化最佳模型指标
        loss_best_all = 10000  # 记录所有数据的最佳 MSE
        loss_best_pos = 10000  # 记录阳性数据的最佳 MSE
        r2_best_all = -5  # 记录所有数据的最佳 R²
        r2_best_pos = -5  # 记录阳性数据的最佳 R²
        top20mse_best = 99  # 记录 Top 20 MSE 最优值
        top60mse_best = 99  # 记录 Top 60 MSE 最优值

        # 训练与测试结果存储
        test_loss_all = []
        test_loss_pos = []
        test_r2_all = []
        test_r2_pos = []
        top_20_mse = []
        top_60_mse = []

        best_model_path_all = "all_lstm_regress_model.pth"  # 记录所有数据的最佳模型路径
        best_model_path_pos = "pos_lstm_regress_model.pth"  # 记录阳性数据的最佳模型路径

        for epoch in range(epoch_num):
            # 每 de_epo 轮，降低学习率
            if epoch > 0 and epoch % de_epo == 0:
                for param_group in self.opt.param_groups:
                    param_group['lr'] *= de_r
            
            loss_all = 0   # 记录当前轮次的总损失
            count = 0  # 记录 batch 数量
            self.net.train()
            
            # 训练循环
            for idx, batch in enumerate(tqdm(self.train_dataloader)):
                text = batch[0].to(device)  # 序列输入 torch.Size([batch_size, 50])
                label = batch[1].to(device)  # 真实 MIC 标签 torch.Size([batch_size, 1])
                length = batch[3]  # 序列长度 torch.Size([batch_size])
                
                out = self.net(text, length)  # 模型向前传播
                loss = self.loss_fn(out, label)  # 计算损失

                self.opt.zero_grad()
                loss.backward(retain_graph=True)  # 反向传播
                self.opt.step()  # 更新参数

                loss_all += loss
                count += 1

                if count % 100 == 0:
                    print("\r Epoch:%d,Loss:%f" % (epoch, loss_all / count))

            # 进入评估模式
            self.net.eval()
            predict, label_o, sequence = [], [], []
            
            # 测试循环
            for idx, batch in enumerate(self.test_dataloader):
                text = torch.LongTensor(batch[0])
                label = batch[1]
                length = batch[3]
                text = text.to(device)
                label = label.to(device)
                out = self.net(text, length)

                predict.extend(out.cpu().detach().numpy().reshape(1, -1)[0])
                label_o.extend(label.cpu().detach().numpy())
                for i in text.cpu().numpy():
                    temp = []
                    for j in i[i > 0]:
                        temp.append(get_reverse_dict()[j])
                    sequence.append("".join(temp))
            df = pd.DataFrame({"sequence": sequence, "label": label_o, "predict": predict})

            # 仅计算阳性数据
            pred, label = zip(*[(predict[i], label_o[i]) for i in range(len(label_o)) if label_o[i] <= np.log10(8196) - 0.01])
            
            # 计算 MSE 和 R² 评分
            mse_result_all = mean_squared_error(label_o, predict)
            mse_result_pos = mean_squared_error(label, pred)
            r2_result_all = r2_score(label_o, predict)
            r2_result_pos = r2_score(label, pred)

            # 记录测试结果
            test_loss_all.append(mse_result_all)
            test_loss_pos.append(mse_result_pos)
            test_r2_all.append(r2_result_all)
            test_r2_pos.append(r2_result_pos)
            
            # 计算 Top 20 和 Top 60 MSE
            top20_mse, top60_mse, _, _ = evaluate_customized(self.net, label, pred)
            top_20_mse.append(top20_mse)
            top_60_mse.append(top60_mse)

            # 保存最佳模型
            if mse_result_all <= loss_best_all:
                loss_best_all = mse_result_all
                torch.save(self.net.state_dict(), best_model_path_all)
            if mse_result_pos <= loss_best_pos:
                loss_best_pos = mse_result_pos
                torch.save(self.net.state_dict(), best_model_path_pos)
            if r2_result_all >= r2_best_all:
                r2_best_all = r2_result_all
            if r2_result_pos >= r2_best_pos:
                r2_best_pos = r2_result_pos
            if top20_mse <= top20mse_best:
                top20mse_best = top20_mse
            if top60_mse <= top60mse_best:
                top60mse_best = top60_mse
            
            # 打印当前 epoch 结果
            print("Top 20 MSE: ", top20_mse)
            print("Top 60 MSE: ", top60_mse)
            print("MSE_loss_all = %f" % (mse_result_all))
            print("R2_score_all = %f" % (r2_result_all))
            print("MSE_loss_pos = %f" % (mse_result_pos))
            print("R2_score_pos = %f" % (r2_result_pos))
            print(
                "\r Epoch: %d Best MSE Error all %f ; Best MSE Error pos: %f ; Best R2 Error all: %f ; Best R2 Error pos: %f ; Best Top 20: %f ;Best Top 60: %f" \
                % (epoch, loss_best_all, loss_best_pos, r2_best_all, r2_best_pos, top20mse_best, top60mse_best))

In [19]:
embedding_dim = 50
hidden_num = 128
num_layer = 2
bidirectional = False
dropout = 0.7
ngram_num = 1
log_num = 10
batch_size = 16
learning_rate = 2 * 1e-3
epoch_num = 100
de_epo = 16
de_r = 0.5
seed = 42
mode = "lstm"

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"模型训练将在{device}进行")
lstmtrain = LstmTrain(
    "data/lstm_train_sample.csv",
    "data/lstm_test_sample.csv"
)

模型训练将在cuda:0进行


In [ ]:
import sys

# 备份原始 sys.stdout
original_stdout = sys.stdout

try:
    sys.stdout = open('train_log.txt', 'w')  # 重定向输出到文件
    lstmtrain()  # 运行训练
finally:
    sys.stdout.close()  # 关闭文件
    sys.stdout = original_stdout  # 恢复默认输出

100%|██████████| 389/389 [00:05<00:00, 74.68it/s] 
100%|██████████| 389/389 [00:04<00:00, 96.16it/s] 
100%|██████████| 389/389 [00:02<00:00, 138.90it/s]
100%|██████████| 389/389 [00:04<00:00, 96.45it/s] 
100%|██████████| 389/389 [00:04<00:00, 96.61it/s] 
100%|██████████| 389/389 [00:04<00:00, 96.45it/s] 
100%|██████████| 389/389 [00:02<00:00, 146.56it/s]
100%|██████████| 389/389 [00:03<00:00, 97.84it/s] 
100%|██████████| 389/389 [00:04<00:00, 96.03it/s]
100%|██████████| 389/389 [00:04<00:00, 95.31it/s] 
100%|██████████| 389/389 [00:02<00:00, 149.12it/s]
100%|██████████| 389/389 [00:03<00:00, 98.64it/s] 
100%|██████████| 389/389 [00:03<00:00, 97.45it/s] 
100%|██████████| 389/389 [00:04<00:00, 95.37it/s]
100%|██████████| 389/389 [00:02<00:00, 145.93it/s]
100%|██████████| 389/389 [00:04<00:00, 94.39it/s]
100%|██████████| 389/389 [00:04<00:00, 95.93it/s]
100%|██████████| 389/389 [00:03<00:00, 98.72it/s] 
100%|██████████| 389/389 [00:02<00:00, 140.13it/s]
 68%|██████▊   | 265/389 [00:02<00:

# 三、活性肽预测

## 3.1 生成用于预测的数据

In [ ]:
# 亲水性为1 疏水性为0
hydrophily = ["S", "T", "C", "Y", "N", "Q", "D", "E", "K", "R", "H"]  # 亲水性
hydrophobe = ["G", "A", "V", "L", "I", "M", "F", "W", "P"]  # 疏水性
# 氨基酸的电荷映射（e_charge）：正电荷为1，负电荷为-1，中性为0
e_charge = {"S": 0, "T": 0, "C": 0, "Y": 0, "N": 0, "Q": 0, "D": -1, "E": -1, "K": 1, "R": 1 \
    , "H": 1, "G": 0, "A": 0, "V": 0, "L": 0, "I": 0, "M": 0, "F": 0, "W": 0, "P": 0}
# 规则列表，每个字符串包含7位，代表7肽的亲水性/疏水性模式（0为疏水，1为亲水）
rules_list = ["0000011", "0000111",
              "0001111", "0011111",
              "1100000", "1110000",
              "1111000", "1111100",
              "1100100", "0010011",
              "1001001", "1001000",
              "1000100", "0100100",
              "0010010", "0010001",
              "0001001"]
# 存储符合每种规则的7肽序列的列表
result_list = [[] for x in rules_list]

@timer
def get_sequence(sequence_now, charge_now, rule, rule_count):
    """递归生成符合规则的7肽序列，并确保至少有一个正电荷。

    :param sequence_now: 当前已生成的肽序列
    :param charge_now: 当前序列的电荷总和
    :param rule: 7位的规则字符串，指示亲水性/疏水性模式
    :param rule_count: 当前递归到的氨基酸位置（0-6）
    """
    if rule_count == 7:  # 递归终止条件：已构建7个氨基酸
        if charge_now > 0:  # 只有带正电荷的序列才被保留
            result_list[rules_list.index(rule)].append(sequence_now)

    else:
        type = int(rule[rule_count])  # 读取当前位规则，0为疏水，1为亲水
        if type == 1:   # 选择亲水性氨基酸
            for pep in hydrophily:
                get_sequence(sequence_now + pep, charge_now + e_charge[pep], rule, rule_count + 1)
        else:  # 选择疏水性氨基酸
            for pep in hydrophobe:
                get_sequence(sequence_now + pep, charge_now + e_charge[pep], rule, rule_count + 1)

# 遍历所有规则，生成符合规则的7肽序列
for rule in rules_list:
    get_sequence("", 0, rule, 0)
    break
    
# 为7肽序列提取特征
file_count = 0
for peptides in result_list:
    cal_pep_des_multiprocess.cal_pep_parallel(peptides=peptides, sequence=pd.Series(peptides, name="sequence"),
                                              results=pd.Series(dtype=object), types=pd.Series(dtype=object),
                                              output_path="7_peptide_rule_%d.txt" % (file_count))
    print(f"已生成 7_peptide_rule_{file_count}.txt")
    file_count += 1
    break

In [ ]:
from dataset import PredictDataset

class Predict():
    def __init__(self,xgb_classifier_path, xgb_rank_path, rank_top_k, lstm_param_path, predict_data_file, result_save_path):
        self.xgb_classifier_path = xgb_classifier_path
        self.xgb_rank_path = xgb_rank_path
        self.rank_top_k = rank_top_k
        self.lstm_param_path = lstm_param_path
        self.predict_data_file = predict_data_file
        self.result_save_path = result_save_path
        if not os.path.exists(self.result_save_path):
            os.mkdir(self.result_save_path)
        
        
    def __call__(self, *args, **kwargs):
        # 1. XGBoost 分类模型预测
        pos_feature_data = self.xgb_classifier_predict()  # 利用 xgb 分类器预测获取阳性结果
        xgboost_classify = pos_feature_data["sequence"].values.tolist()
        with open(os.path.join(self.result_save_path, "xgb_classifier_pos_seq.txt"), "w") as f:
            for i in range(len(xgboost_classify)):
                f.write(xgboost_classify[i])
                f.write("\n")
            f.close()
        
        # 2. XGBoost 排序模型预测
        xgb_rank_model = joblib.load(self.xgb_rank_path)  # 获取 xgb 排序模型
        xgb_rank_result = xgb_rank_model.predict(pos_feature_data.iloc[:, 1:-2].values)  # 利用 xgb 排序模型预测 MIC 值
        dataframe = pd.DataFrame([])
        dataframe["sequence"] = pos_feature_data["sequence"].copy()
        dataframe["MIC"] = xgb_rank_result
        dataframe.sort_values("MIC", inplace=True)
        dataframe.reset_index(drop=True, inplace=True)
        self.sequence = dataframe["sequence"].values[0: int(self.rank_top_k)]
        dataframe.iloc[:int(self.rank_top_k), :].to_csv(os.path.join(self.result_save_path, f"top_{str(self.rank_top_k)}.csv"))
        
        # 3. LSTM 回归模型预测
        self.lstm_preidct()

    def xgb_classifier_predict(self):
        xgb_cls_model = joblib.load(self.xgb_classifier_path)
        predict_data = pd.read_csv(self.predict_data_file, chunksize=50000, encoding="utf-8", low_memory=False, index_col=0)
        df = pd.DataFrame([])
        for chunk in predict_data:
            y = xgb_cls_model.predict(chunk.iloc[:, 0:-2].values)
            mask = [bool(x) for x in y]  # 预测值y转为bool值，0为False，1为True
            data = chunk[mask]  # 筛选出阳性结果
            df = pd.concat([df, data])  # 合并所有阳性结果
            print(df.describe())  # 统计每列的平均值、标准差等数据
        df.reset_index(drop=True, inplace=True)  # 重置索引
        df.to_csv(os.path.join(self.result_save_path, "xgb_classifier_pos_data.csv"), index=False)
        return df 


    def lstm_preidct(self):
        net = LstmNet(embedding_dim, hidden_num, num_layer, bidirectional, dropout, get_dict())
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        net.to(device)
        net.eval()
        # 构建数据集，dataset中每个单元由2个元素的元组构成，第一个是对序列的one-hot编码，第二个是序列的长度。
        dataset = PredictDataset(self.sequence)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
        sequence = []
        predict = []
        for i, (text, length) in enumerate(dataloader):
            text = text.to(device)
            out = net(text, length).reshape(-1)
            predict.extend(out.cpu().detach().numpy())
            for i in text:
                temp = ''.join([get_reverse_dict()[n] for n in i.tolist() if n > 0])
                sequence.append(temp)
        df = pd.DataFrame({"sequence": sequence, "predict": predict})
        df.sort_values("predict", inplace=True)
        df.to_csv(os.path.join(self.lstm_result_save_path, "lstm_regression_result.csv"), index=False)

In [ ]:
predictor = Predict(xgb_classifier_path = "xgb_classifier_model.pkl",
                    xgb_rank_path = "xgb_rank_model.pkl",
                    rank_top_k = 500,
                    lstm_param_path = "pos_lstm_regress_model.pth",
                    predict_data_file = "7_peptide_rule_0.csv",
                    result_save_path = "results")