# 利用Evo2模型作为基模，添加全连接层与分类头，用于训练增强子活性预测模型

以下为代码

## 导入库

导入了训练增强子活性预测模型所需的主要库，包括PyTorch、pandas、tqdm等。  
尝试导入Evo2模型作为基模。如果未找到Evo2库，会提示错误并退出。这样为后续的数据处理、模型构建和训练做好了准备。


In [1]:
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
from torch.optim import AdamW
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm 

# Import Evo2
try:
    from evo2 import Evo2
except ImportError:
    print("未找到Evo2库，请确保已正确安装。")
    exit()

KeyboardInterrupt: 

## 定义类：数据集Evo2RegressionDataset

创建了一个名为`Evo2RegressionDataset`的自定义Dataset类，用于处理增强子活性预测的数据。该类实现以下功能：

1. **初始化**：加载包含DNA序列和表达活性的CSV文件，并设置Evo2分词器
2. **数据验证**：确保CSV文件包含'sequence'和'expression'必要列
3. **数据获取**：将DNA序列转换为模型可用的token序列，对序列进行填充或截断，确保所有输入长度一致，最后结合expression数据项根据索引 idx 返回模型训练所需的单条样本，格式为{ 'input_ids': sequence数据项分词后的序列, 'target': expression数据项对应的浮点张量 }

这个类为后续模型训练提供了标准化的数据接口，使DNA序列能够被Evo2模型处理。

In [None]:
class Evo2RegressionDataset(Dataset):
    """
    为回归模型加载包含增强子DNA序列和活性的数据集。
    使用Evo2模型提供的分词器。
    """
    def __init__(self, csv_file, tokenizer, max_length=512):
        """
        参数:
            csv_file (str): 包含'sequence'和'expression'列的csv文件路径。
            tokenizer (object): 来自Evo2的分词器实例。
            max_length (int): 用于填充/截断的最大序列长度。
        """
        try:
            self.data = pd.read_csv(csv_file)
            # 确保CSV文件包含必要的列
            if 'sequence' not in self.data.columns or 'expression' not in self.data.columns:
                raise ValueError("CSV文件必须包含'sequence'和'expression'列。")
        except FileNotFoundError:
            print(f"错误：在{csv_file}路径未找到CSV文件")
            raise
        except Exception as e:
            print(f"读取CSV文件时出错：{e}")
            raise

        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    # 根据索引 idx 返回模型训练所需的单条样本，格式为{ 'input_ids': sequence数据项分词后的序列, 'target': expression数据项对应的浮点张量 }
    def __getitem__(self, idx):
        if idx >= len(self.data):
            raise IndexError("索引超出范围")

        # 获取指定索引idx的对应的sequence数据项，存储在seq中
        seq = self.data.iloc[idx]['sequence'] 
        # 确保sequence列元素是字符串类型
        if not isinstance(seq, str):
            seq = str(seq)

        # 获取对应的expression数据项，存储在target中
        target = float(self.data.iloc[idx]['expression'])

        # 使用Evo2的分词器对序列进行分词
        # 注意：Evo2使用charleveltokenizer，不需要像HuggingFace分词器那样的显式填充/截断参数
        # 它通常基于字符到整数的映射进行分词。我们需要手动进行填充/截断。
        token_ids = self.tokenizer.tokenize(seq)

        # 手动进行填充/截断
        if len(token_ids) > self.max_length:
            token_ids = token_ids[:self.max_length]
        else:
            padding_length = self.max_length - len(token_ids)
            #charleveltokenizer 的 pad token id = 1
            pad_token_id = getattr(self.tokenizer, 'pad_token_id', 1)
            token_ids.extend([pad_token_id] * padding_length)

        input_ids = torch.tensor(token_ids, dtype=torch.long)

        return {
            'input_ids': input_ids,
            'target': torch.tensor(target, dtype=torch.float)
        }


## 定义类：基于evo2+全连接+回归头的活性预测模型Evo2RegressionModel

创建了一个名为`Evo2ForRegression`的模型类，继承自PyTorch的nn.Module，用于增强子活性预测任务。

1. **基础架构**：利用预训练的Evo2模型作为特征提取器，并冻结其参数以保留已学习的DNA表示能力
2. **组件构成**：
    - 加载并冻结Evo2基础模型
    - 添加全连接层(FC)将Evo2输出映射到中间表示
    - 使用ReLU激活函数和Dropout层防止过拟合
    - 最后通过回归头预测增强子活性值

3. **前向传播流程**：
    - 从Evo2的第28层提取DNA序列嵌入
    - 对序列维度进行平均池化，获取整个序列的全局表示
    - 通过全连接层、激活函数和Dropout层处理
    - 最终输出单一的回归预测值


In [None]:
class Evo2ForRegression(nn.Module):
    """
    基于evo2+全连接+回归头的活性预测模型。
    """
    def __init__(self, model_name="evo2_7b", local_path='/root/autodl-tmp/evo2/models/evo2-7b/evo2_7b.pt', dropout_rate=0.1, intermediate_size=512):
        super().__init__()
        print(f"加载 Evo2 模型: {model_name}...")
        try:
            self.evo2_wrapper = Evo2(model_name=model_name, local_path=local_path)
        except Exception as e:
            print(f"加载 Evo2 模型 '{model_name}' 时出错: {e}")
            print("请确认模型名称是否正确，以及相关依赖是否已安装。")
            raise

        # 冻结Evo2基础模型参数
        print("正在冻结Evo2基础模型参数...")
        for param in self.evo2_wrapper.model.parameters():
            param.requires_grad = False

        # 从模型配置中获取隐藏层大小
        hidden_size = self.evo2_wrapper.model.config.hidden_size

        # 新的全连接层
        self.fc1 = nn.Linear(hidden_size, intermediate_size)
        self.activation = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        # 回归头：从intermediate_size到1的线性层（预测活性/expression这一标量值）
        self.regressor = nn.Linear(intermediate_size, 1)

        # 将新层转换为bfloat16以匹配预期的输入数据类型
        self.fc1.to(torch.bfloat16)
        self.regressor.to(torch.bfloat16)
        print(f"Evo2ForRegression初始化完成，隐藏层大小={hidden_size}, 全连接与回归头中间层大小={intermediate_size}。")

    def forward(self, input_ids):
        """
        前向传播（通过Evo2基础模型，全连接层和回归头）。

        参数：
            input_ids (torch.Tensor): 标记ID张量 (batch_size, sequence_length)。

        返回：
            torch.Tensor: 回归预测值 (batch_size,1)。如batch_size=8,返回形如[34.5, 25.125, 26.125, 73.0, 0.88671875, 44.75, 28.75, -27.125]是一次处理8个样本的预测结果。
        """
       
        # 从指定的层提取DNA增强子序列的embeddings。
        layer_to_embed = "blocks.28.mlp.l3"

         # 让输入参数通过Evo2基础模型
        _, embeddings = self.evo2_wrapper(input_ids, return_embeddings=True, layer_names=[layer_to_embed]) #self.evo2_wrapper是一个evo2实例，它接受左边的参数进行前向传播，返回两个参数：第一个是模型的输出，第二个是指定层（这里为layer_to_embed层）的嵌入（embeddings）。
        last_layer_output = embeddings[layer_to_embed] #last_layer_output是一个三维pytorch张量，形状为[batch_size, sequence_length, hidden_size]。
        #本程序中batch_size=和sequence_length在main函数指定, hidden_size=4096。所以last_layer_output是一个1 x sequence_length x 4096的三维数组/矩阵，数组的[0][a-1][i-1]元素代表输入序列中第i个碱基对应的4096维嵌入向量中第i维的嵌入数值。


        # 在sequence_length维度上池化隐藏状态
        # 使用平均池化：在序列长度上平均嵌入（平均池化的意思：一共sequence_length个位置，每个位置都有一个4096维嵌入向量，将所以这些位置的sequence_length个4096维向量做平均，得到最终的一个4096维向量，这个向量包含了整个序列的全局信息）
        # 形状：(batch_size, hidden_size=4096)
        pooled_output = last_layer_output.mean(dim=1)

        # 通过新的全连接层和激活函数
        x = self.fc1(pooled_output)
        x = self.activation(x)

        # 应用dropout和回归头
        x = self.dropout(x)
        prediction = self.regressor(x)

        # x形如[[34.5], [25.125], [26.125], [73.0], [0.88671875], [44.75], [28.75], [-27.125]]（batch_size=8），是一次处理8个样本的预测结果。

        # 压缩输出，去掉多余维度以获得形状(batch_size,1)
        #prediction形如[34.5, 25.125, 26.125, 73.0, 0.88671875, 44.75, 28.75, -27.125]是一次处理8个样本的预测结果。
        
        return prediction.squeeze(1)


## 定义训练与验证函数

1. **`train_epoch` 函数**：  
    - 用于训练模型一个周期。
    - 将模型设置为训练模式 (`model.train()`)，并通过 `tqdm` 显示训练进度。
    - 对每个批次数据进行前向传播、计算损失、反向传播和优化。
    - 累加所有批次的损失并除以数据集数据个数以计算整个数据集的平均损失。

2. **`evaluate` 函数**：  
    - 用于在验证集上评估模型性能。
    - 将模型设置为评估模式 (`model.eval()`)，并禁用梯度计算 (`torch.no_grad()`)，以提高评估效率。
    - 对每个批次数据进行前向传播和损失计算。
    - 累加所有批次的损失并除以验证集数据个数以计算整个验证集的平均损失。

这两个函数为模型的训练和验证过程提供了标准化的接口，确保训练过程的可控性和验证过程的准确性。

In [None]:
def train_epoch(model, data_loader, optimizer, criterion, device):
    """用于训练模型一个周期的函数"""
    model.train()  # 将模型设置为训练模式
    total_loss = 0
    # 用tqdm包装data_loader以显示进度条（可选）
    progress_bar = tqdm(data_loader, desc="训练中", leave=False)
    for batch in progress_bar:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        #input_ids: 是一个张量 (Tensor)，代表了批处理 (batch) 中所有 DNA 序列经过分词和填充/截断后的结果。它的形状是 (batch_size, max_length)。在 main 函数配置中，batch_size 默认为 8，max_length 默认为 512。因此，input_ids 的具体形状是 (8, 512)。

        targets = batch['target'].to(device, dtype=torch.bfloat16)
        #targets: 是一个张量，代表了批处理中每个 DNA 序列对应的增强子活性值 (expression)。它的形状是 (batch_size)。在配置中，batch_size 为 8，所以 targets 的具体形状是 (8)。

        # 前向传播
        outputs = model(input_ids)

        # 计算损失
        loss = criterion(outputs, targets)

        # 反向传播和优化
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * input_ids.size(0)

        #loss 通常是一个 PyTorch 张量（Tensor），它代表了模型在当前这一个批次（batch）数据上的平均损失。例如，如果你的批次大小是 32，那么 loss 就代表了这 32 个样本的平均损失值。它是一个只包含一个元素的零维张量，例如 tensor(0.1234)。
        #接下来是 .item() 方法。这是一个非常重要的方法。直接对 PyTorch 张量进行累加会保留其计算图（computation graph），这会在每个循环中不断消耗内存，最终可能导致内存溢出。.item() 方法的作用是从一个只包含单个值的张量中提取出其对应的标准 Python 数字（通常是浮点数），并且不保留任何计算图信息。这样，我们就可以安全地用它来进行累加计算。
        #input_ids.size(0) 用来获取当前批次的大小。数据通常以 (batch_size, ...) 的形式组织。因此，.size(0) 或 .shape[0] 会返回第一个维度的大小，也就是批次中的样本数量。
        #loss.item() * input_ids.size(0)。这个操作的目的是将平均损失转换回这个批次的总损失。因为 loss 是批次的平均损失，所以将它乘以批次中的样本数，就得到了这个批次所有样本的损失之和。这对于精确计算整个数据集的总损失至关重要，特别是当最后一个批次的大小可能小于常规批次大小时。
        #total_loss += ... 这部分就是将当前批次计算出的总损失累加到 total_loss 变量中。在遍历完所有数据批次后，total_loss 变量将包含整个数据集的总损失。


        # 更新进度条描述（可选）
        progress_bar.set_postfix(loss=loss.item())

    avg_loss = total_loss / len(data_loader.dataset)
    return avg_loss

def evaluate(model, data_loader, criterion, device):
    """在验证集上评估模型。"""
    model.eval()  # 将模型设置为评估模式
    total_loss = 0
    with torch.no_grad():  # 禁用梯度计算
        # 用tqdm包装data_loader以显示进度条（可选）
        progress_bar = tqdm(data_loader, desc="评估中", leave=False)
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            targets = batch['target'].to(device, dtype=torch.bfloat16)

            # 前向传播
            outputs = model(input_ids)

            # 计算损失
            loss = criterion(outputs, targets)

            total_loss += loss.item() * input_ids.size(0)
            # 更新进度条描述（可选）
            progress_bar.set_postfix(loss=loss.item())

    avg_loss = total_loss / len(data_loader.dataset)
    return avg_loss


## 主程序，配置参数，处理训练和评估过程

主程序定义了完整的训练流程，包括参数配置、数据加载、模型初始化和训练循环：

**参数配置**：
- 设置模型名称、数据文件路径、输出目录等基本参数
- 配置训练超参数：批大小(8)、训练轮数(100)、学习率(1e-5)等
- 设置模型架构参数：序列长度(512)、dropout率(0.1)、隐藏层大小(512)

**环境准备**：
- 自动检测并设置GPU/CPU设备
- 创建输出目录用于保存训练结果

**模型与数据初始化**：
- 加载Evo2回归模型，包含预训练基础模型和自定义回归头
- 创建数据集实例，按8:2比例分割训练集和验证集
- 构建DataLoader用于批处理训练数据

**训练配置**：
- 使用AdamW优化器，仅优化回归头参数（冻结Evo2基础模型）
- 采用MSE损失函数进行回归任务
- 实现早停机制：监控验证损失，自动保存性能最佳的模型

**继续训练功能**：
- 支持从已保存的模型检查点继续训练
- 程序启动时询问用户是否继续训练已有模型
- 如果选择继续训练，会加载已保存的模型权重并评估初始验证损失
- 自动处理检查点文件不存在的情况，提供友好的错误提示

**训练循环**：
- 每轮训练包含完整的训练和验证过程
- 实时显示训练进度和损失变化
- 当验证损失改善时自动保存模型权重

In [None]:
def main():
    # --- 配置参数 ---
    model_name = "evo2_7b" # Evo2模型名称
    csv_file = "data.csv" # 包含DNA序列和活性数据的CSV文件路径
    output_dir = "evo2_regression_output" # 保存模型的目录
    output_model_file = os.path.join(output_dir, f"{model_name}_regression.pt")

    max_length = 512   # tokenizer的最大序列长度
    batch_size = 8     # 批处理大小，根据显存情况调整
    epochs = 100       # 训练轮数
    learning_rate = 1e-5 # 回归头的学习率
    train_split = 0.8  # 训练数据占数据集的比例
    dropout_rate = 0.1 # 回归头的dropout率
    intermediate_hidden_size = 512 # 全连接隐藏层大小

    os.makedirs(output_dir, exist_ok=True)    # 如果输出目录不存在则创建

    # --- 设备设置 ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"使用设备: {device}")
    if device == torch.device("cpu"):
        print("警告：正在使用CPU。训练可能会很慢。")

    # --- 加载模型和分词器 ---
    try:
        model = Evo2ForRegression(model_name=model_name, dropout_rate=dropout_rate, intermediate_size=intermediate_hidden_size)
        tokenizer = model.evo2_wrapper.tokenizer 
    except Exception as e:
        print(f"初始化模型失败: {e}")
        return # 如果模型加载失败则退出
    
    # 询问是否继续训练
    print("是否继续训练已保存的模型？(y/n)")
    continue_training = input().strip().lower() == 'y'

    # 如果设置了继续训练，则加载已保存的模型权重
    if continue_training and os.path.exists(output_model_file):
        try:
            print(f"从检查点加载模型: {output_model_file}")
            # 加载状态字典，并确保它被映射到正确的设备
            model.load_state_dict(torch.load(output_model_file, map_location=device))
            print("模型权重加载成功，将继续训练。")
        except Exception as e:
            print(f"加载模型检查点失败: {e}。将从头开始训练。")
    elif continue_training:
        print(f"警告: 未找到模型检查点 {output_model_file}。将从头开始训练。")

    model.to(device)

    # --- 构建数据集 ---
    try:
        dataset = Evo2RegressionDataset(csv_file, tokenizer, max_length)
    except (FileNotFoundError, ValueError) as e:
        print(f"加载数据集失败: {e}")
        return

    train_size = int(train_split * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0)

    print(f"数据集已加载: {len(dataset)} 个样本")
    print(f"训练集大小: {len(train_dataset)}")
    print(f"验证集大小: {len(val_dataset)}")

    # --- 设置优化器和损失函数 ---
    optimizer = AdamW(list(model.fc1.parameters()) + list(model.regressor.parameters()), lr=learning_rate)
    criterion = nn.MSELoss()

    # --- 开始训练 ---
    best_val_loss = float('inf')

    # 如果继续训练，可以先评估一次当前模型的性能
    if continue_training and os.path.exists(output_model_file):
        initial_val_loss = evaluate(model, val_loader, criterion, device)
        print(f"加载模型的初始验证损失: {initial_val_loss:.4f}")
        best_val_loss = initial_val_loss

    print("开始训练...")
    for epoch in range(epochs):
        train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss = evaluate(model, val_loader, criterion, device)

        print(f"轮次 {epoch+1}/{epochs}: 训练损失 = {train_loss:.4f}, 验证损失 = {val_loss:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), output_model_file)
            print(f"验证损失已改善。模型已保存至 {output_model_file}")

    print("训练完成。")
    print(f"最佳验证损失: {best_val_loss:.4f}")
    print(f"最佳模型已保存至: {output_model_file}")

if __name__ == "__main__":
    main()

不含继续训练功能的原始main函数

In [None]:
def main():
    # --- 配置参数 ---
    model_name = "evo2_7b" # Evo2模型名称
    csv_file = "data.csv" # 包含DNA序列和活性数据的CSV文件路径
    output_dir = "evo2_regression_output" # 保存模型的目录
    output_model_file = os.path.join(output_dir, f"{model_name}_regression.pt")

    max_length = 512   # tokenizer的最大序列长度
    batch_size = 8     # 批处理大小，根据显存情况调整
    epochs = 100       # 训练轮数
    learning_rate = 1e-5 # 回归头的学习率
    train_split = 0.8  # 训练数据占数据集的比例
    dropout_rate = 0.1 # 回归头的dropout率
    intermediate_hidden_size = 512 # 全连接隐藏层大小

    os.makedirs(output_dir, exist_ok=True)    # 如果输出目录不存在则创建

    # --- 设备设置 ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"使用设备: {device}")
    if device == torch.device("cpu"):
        print("警告：正在使用CPU。训练可能会很慢。")

    # --- 加载模型和分词器 ---
    # Evo2ForRegression类负责加载基础模型和分词器
    try:
        model = Evo2ForRegression(model_name=model_name, dropout_rate=dropout_rate, intermediate_size=intermediate_hidden_size)
        tokenizer = model.evo2_wrapper.tokenizer 
    except Exception as e:
        print(f"初始化模型失败: {e}")
        return # 如果模型加载失败则退出

    model.to(device)

    # --- 构建数据集 ---
    try:
        dataset = Evo2RegressionDataset(csv_file, tokenizer, max_length)
    except (FileNotFoundError, ValueError) as e:
        print(f"加载数据集失败: {e}")
        print("请确保csv_file路径正确且文件有效。")
        return # 如果数据集加载失败则退出

    train_size = int(train_split * len(dataset))
    val_size = len(dataset) - train_size
    try:
        train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    except ValueError as e:
        print(f"分割数据集时出错 (训练集大小={train_size}, 验证集大小={val_size}, 总数={len(dataset)}): {e}")
        print("确保您的数据集有足够的样本进行分割。")
        return

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0) # num_workers=0以简化，可根据需要调整
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0)

    print(f"数据集已加载: {len(dataset)} 个样本")
    print(f"训练集大小: {len(train_dataset)}")
    print(f"验证集大小: {len(val_dataset)}")

    # --- 设置优化器和损失函数 ---
    # 只优化回归头的参数（fc1和regressor）
    optimizer = AdamW(list(model.fc1.parameters()) + list(model.regressor.parameters()), lr=learning_rate)
    criterion = nn.MSELoss() # 用于回归的均方误差损失，换MAE-torch.nn.L1Loss()?

    # --- 开始训练 ---
    best_val_loss = float('inf') # 初始化为无穷大

    print("开始训练...")
    for epoch in range(epochs):
        # 训练一个轮次
        train_loss = train_epoch(model, train_loader, optimizer, criterion, device)

        # 在验证集上评估
        val_loss = evaluate(model, val_loader, criterion, device)

        print(f"轮次 {epoch+1}/{epochs}: 训练损失 = {train_loss:.4f}, 验证损失 = {val_loss:.4f}")

        # 如果验证损失改善则保存模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            # 保存整个模型的状态字典
            # 可以改成只保存回归头
            torch.save(model.state_dict(), output_model_file)
            print(f"验证损失已改善。模型已保存至 {output_model_file}")

    print("训练完成。")
    print(f"最佳验证损失: {best_val_loss:.4f}")
    print(f"最佳模型已保存至: {output_model_file}")

if __name__ == "__main__":
    main()
