# ProT-Diff 模型 - 面向过程编程

理解ProT-Diff模型的训练过程，用函数先代替复杂的类结构。

## 模型概述
ProT-Diff是一个用于生成抗菌肽(AMP)的扩散模型：
1. 使用ProtT5编码器将肽序列转换为嵌入向量
2. 在嵌入空间训练扩散过程
3. 从训练好的扩散模型采样新的嵌入
4. 使用ProtT5解码器将嵌入转换回氨基酸序列


## 第1步：环境设置和依赖导入


In [2]:
# 环境设置说明：
# 如果遇到numpy兼容性问题，请运行以下命令修复：
# conda install numpy=1.24.3 scipy=1.10.1 scikit-learn=1.3.0 -c conda-forge --yes
# 安装必要的包（如果还没安装）：
# conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia
# conda install transformers accelerate einops pandas scikit-learn tqdm matplotlib -c conda-forge

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from transformers import T5Tokenizer, T5EncoderModel, T5ForConditionalGeneration
from transformers.modeling_outputs import BaseModelOutput
from einops import rearrange

# 检查版本兼容性
print(f"NumPy版本: {np.__version__}")
print(f"PyTorch版本: {torch.__version__}")
try:
    import transformers
    print(f"Transformers版本: {transformers.__version__}")
except:
    print("Transformers版本: 导入失败")

# 设置设备
device = "cuda" if torch.cuda.is_available() else "mps"
print(f"使用设备: {device}")

# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)


NumPy版本: 1.24.3
PyTorch版本: 2.6.0
Transformers版本: 4.55.1
使用设备: mps


## 第2步：准备训练数据

我们支持多种数据源：
- **演示数据**：8条示例序列，用于快速学习
- **AMP数据集**：真实的抗菌肽序列数据，用于finetuning
- **Non-AMP数据集**：非抗菌肽序列

### 数据要求：
- 序列长度：5-100个氨基酸
- 只包含20种标准氨基酸：ACDEFGHIKLMNPQRSTVWY



In [3]:
# 示例抗菌肽序列（长度5-48，只包含20种氨基酸）
example_sequences = [
    "GIGKFLKKAKKFGKAFVKILKK",  # 22个氨基酸
    "KKLFKKILKYL",             # 11个氨基酸
    "GLFDIVKKVVGAL",           # 13个氨基酸
    "RWKIFKKIERVGQHTRDAT",     # 19个氨基酸
    "KWKLFKKIPKFLHLAKKF",      # 18个氨基酸
    "FLPIIAKLLSGLL",           # 13个氨基酸
    "KLAKLAKKLAKLAK",          # 14个氨基酸
    "GIGAVLKVLTTGLPALIS"       # 18个氨基酸
]

def load_sequences(use_example=True, dataset_path=None):
    """
    选择示例数据或加载自定义数据集。
    dataset_path 需要是一个包含肽序列的文件（csv）。
    """
    if use_example:
        return example_sequences
    else:
        if dataset_path is None:
            raise ValueError("请提供 dataset_path 参数来加载自定义数据集")
        if dataset_path.endswith(".csv"):
            try:
                df = pd.read_csv(dataset_path)
                # 如果有'sequence'列，使用它；否则使用第一列
                if 'sequence' in df.columns:
                    sequences = df['sequence'].tolist()
                else:
                    sequences = df.iloc[:, 0].tolist()
                # 过滤掉空值和非字符串值
                sequences = [str(seq) for seq in sequences if pd.notna(seq) and str(seq).strip() != '']
                return sequences
            except Exception as e:
                print(f"读取CSV文件时出错: {e}")
                print("尝试使用更简单的方法读取...")
                # 备用方法：直接按行读取
                with open(dataset_path, 'r', encoding='utf-8') as f:
                    lines = f.readlines()
                    # 跳过标题行，提取第一列
                    sequences = []
                    for i, line in enumerate(lines):
                        if i == 0:  # 跳过标题
                            continue
                        parts = line.strip().split(',')
                        if len(parts) > 0 and parts[0]:
                            sequences.append(parts[0])
                    return sequences
        else:
            raise ValueError("目前仅支持 csv 格式文件")

# ===== 使用示例数据 =====
#sequences = load_sequences(use_example=True)

# ===== 或者使用你自己的数据集 =====
sequences = load_sequences(use_example=False, dataset_path="data/Non-AMP/final_non_amps.csv")

print("展示 5 条序列：")
sample_seqs = sequences[:5]
for i, seq in enumerate(sample_seqs, 1):
    print(f"{i}. {seq} (长度: {len(seq)})")

# 统计信息
lengths = [len(seq) for seq in sequences]
total_count = len(sequences)
max_len = max(lengths)
min_len = min(lengths)
median_len = np.median(lengths)
mean_len = np.mean(lengths)

print(f"\n总共 {total_count} 条序列")
print(f"最大长度: {max_len}")
print(f"最小长度: {min_len}")
print(f"中位数长度: {median_len:.2f}")
print(f"平均长度: {mean_len:.2f}")


读取CSV文件时出错: Cannot convert numpy.ndarray to numpy.ndarray
尝试使用更简单的方法读取...
展示 5 条序列：
1. MLSCWKGYI (长度: 9)
2. GSPPVFDRVVVNPQLYENRLNQTRLGT (长度: 27)
3. MIIWLPPLVAAVTTYLLCEYLYYYGRDEH (长度: 29)
4. MEIEDRIDLSERGHDTLEQKR (长度: 21)
5. PVWIMAHMVNAVAQIDEFVNL (长度: 21)

总共 108776 条序列
最大长度: 100
最小长度: 4
中位数长度: 35.00
平均长度: 31.79


## 第3步：加载ProtT5模型

ProtT5是一个预训练的蛋白质语言模型，我们需要加载编码器和解码器：
- **编码器**：将氨基酸序列转换为嵌入向量
- **解码器**：将嵌入向量转换回氨基酸序列


In [19]:
# 安装 sentencepiece（Tokenizer 必需）
# 如果你已装过，可注释掉
!pip install -U sentencepiece tqdm

import os, re, math
from tqdm.auto import tqdm

from typing import List
from torch.utils.data import Dataset, DataLoader
model_name = "Rostlab/prot_t5_xl_half_uniref50-enc"


# ====== 配置 ======
MODEL_NAME = "Rostlab/prot_t5_xl_half_uniref50-enc"  # 半精度 encoder-only
TOKENIZER_NAME = "Rostlab/prot_t5_xl_uniref50"       # Tokenizer 用全量版
MAX_LEN = 48
BATCH_SIZE = 16
DEVICE = device  # 使用之前定义的device

# ====== 实用函数 ======
AA_20 = set("ACDEFGHIKLMNPQRSTVWY")
def clean_seq(seq: str) -> str:
    s = re.sub(r"[^A-Z]", "", seq.upper())
    s = "".join([c for c in s if c in AA_20])
    return s

def space_separate(seq: str) -> str:
    return " ".join(list(seq))

# ====== 加载 Tokenizer 和 编码器 ======
print("正在加载 ProtT5 编码器/Tokenizer ...")
tokenizer = T5Tokenizer.from_pretrained(TOKENIZER_NAME, do_lower_case=False)

encoder_model = T5EncoderModel.from_pretrained(MODEL_NAME)
if DEVICE == "cuda":
    encoder_model = encoder_model.half().to(DEVICE)
else:
    encoder_model = encoder_model.float().to(DEVICE)

encoder_model.eval()
print("✓ ProtT5 编码器加载成功")
print(f"✓ 参数量: {sum(p.numel() for p in encoder_model.parameters()):,}")
print(f"✓ 嵌入维度 d_model: {encoder_model.config.d_model}")
print(f"✓ 使用设备: {DEVICE}")


正在加载 ProtT5 编码器/Tokenizer ...
✓ ProtT5 编码器加载成功
✓ 参数量: 1,208,141,824
✓ 嵌入维度 d_model: 1024
✓ 使用设备: mps


## 第4步：序列编码 - 将氨基酸序列转换为嵌入向量

这一步我们要实现：
1. 单个序列编码函数：将氨基酸序列编码为(L,1024)的嵌入
2. 零填充到固定长度：填充到(48,1024)的固定形状
3. 批量编码：处理所有训练序列


In [20]:
# ====== 单序列编码（用于快速测试） ======
@torch.inference_mode()
def encode_sequence(sequence: str,
                    tokenizer: T5Tokenizer,
                    encoder_model: T5EncoderModel,
                    device: str,
                    max_length: int = MAX_LEN) -> torch.Tensor:
    seq = clean_seq(sequence)
    assert 1 <= len(seq) <= max_length, f"序列长度必须 1~{max_length}，当前 {len(seq)}"
    spaced = space_separate(seq)
    inputs = tokenizer(
        spaced,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=max_length + 2,  # 预留 special tokens
        add_special_tokens=True
    )
    inputs = {k: v.to(device) for k, v in inputs.items()}
    outputs = encoder_model(**inputs).last_hidden_state.squeeze(0)  # (seq_len_w_special, d_model)
    # 去掉首尾 special tokens
    outputs = outputs[1:-1]  # (seq_len, d_model)
    # pad/截断为 (max_length, d_model)
    seq_len, d_model = outputs.shape
    if seq_len < max_length:
        pad = torch.zeros(max_length - seq_len, d_model, device=outputs.device, dtype=outputs.dtype)
        out = torch.cat([outputs, pad], dim=0)
    else:
        out = outputs[:max_length]
    return out  # (max_length, d_model)

# ====== 批量编码（推荐） ======
class SeqDataset(Dataset):
    def __init__(self, sequences: List[str]):
        self.seqs = [clean_seq(s) for s in sequences]
        # 过滤过短/过长
        self.seqs = [s for s in self.seqs if 1 <= len(s) <= MAX_LEN]
    def __len__(self):
        return len(self.seqs)
    def __getitem__(self, idx):
        return self.seqs[idx]

def collate_fn(batch: List[str]):
    # 把 batch 的序列拼成 spaced 文本，交给 tokenizer 做 padding
    spaced = [space_separate(s) for s in batch]
    enc = tokenizer(
        spaced,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=MAX_LEN + 2,
        add_special_tokens=True
    )
    return enc, batch  # 返回原序列以便调试

@torch.inference_mode()
def encode_batch(sequences: List[str],
                 encoder_model: T5EncoderModel,
                 batch_size: int = BATCH_SIZE,
                 device: str = DEVICE,
                 max_length: int = MAX_LEN) -> torch.Tensor:
    ds = SeqDataset(sequences)
    dl = DataLoader(ds, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    embs = []
    for inputs, raw in tqdm(dl, desc="批量编码", leave=False):
        inputs = {k: v.to(device) for k, v in inputs.items()}
        out = encoder_model(**inputs).last_hidden_state  # (B, Lw, d_model)
        # 去掉每条的首尾 special，再 pad 到固定长度
        B, Lw, D = out.shape
        # 这里简化处理：统一去掉首尾，之后按真实长度和 MAX_LEN 修整
        trimmed = out[:, 1:-1, :]  # (B, L, D)
        L = trimmed.size(1)
        if L < max_length:
            pad = torch.zeros(B, max_length - L, D, device=trimmed.device, dtype=trimmed.dtype)
            fixed = torch.cat([trimmed, pad], dim=1)
        else:
            fixed = trimmed[:, :max_length, :]
        # 移到 CPU 节省显存
        embs.append(fixed.float().cpu())
    embs = torch.cat(embs, dim=0)  # (N, max_length, d_model)
    return embs

# ====== 快速自测 ======
if 'example_sequences' not in globals():
    example_sequences = ["GIGKFLKKAKKFGKAFVKILKK", "LLKKLLKKLLKKLL"]
if 'sequences' not in globals():
    sequences = example_sequences * 10

print("\n[单条编码自测]")
emb1 = encode_sequence(example_sequences[0], tokenizer, encoder_model, DEVICE, MAX_LEN)
print("shape:", tuple(emb1.shape), "dtype:", emb1.dtype, "range:", float(emb1.min()), float(emb1.max()))



[单条编码自测]
shape: (48, 1024) dtype: torch.float32 range: -0.8889049887657166 0.8496513962745667


In [21]:
# 批量编码所有训练序列
print("正在编码所有训练序列...")

# 使用高效批量编码
train_embeddings = encode_batch(sequences, encoder_model, BATCH_SIZE, DEVICE, MAX_LEN)

print(f"✓ 所有序列编码完成!")
print(f"✓ 训练数据形状: {train_embeddings.shape}")
print(f"✓ 数据类型: {train_embeddings.dtype}")
print(f"✓ 数据范围: [{train_embeddings.min():.3f}, {train_embeddings.max():.3f}]")

# 计算有效序列长度统计
valid_lengths = (train_embeddings.abs().sum(dim=2) > 1e-6).sum(dim=1).tolist()
print(f"✓ 有效序列长度样例: {valid_lengths[:10]}")
print(f"✓ 平均长度: {np.mean(valid_lengths):.1f}")


正在编码所有训练序列...


批量编码:   0%|          | 0/6231 [00:00<?, ?it/s]

KeyboardInterrupt: 

## 第5步：理解扩散过程基础

扩散模型的核心思想：
1. **前向过程**：逐步向干净数据添加噪声，直到变成纯噪声
2. **反向过程**：训练神经网络学习从噪声中恢复数据

我们需要理解：
- 噪声调度（sqrt schedule）
- 前向扩散公式
- 如何可视化噪声添加过程


## 第6步：构建UNet去噪网络

我们需要构建一个1D UNet来学习去噪过程：

**网络组件**：
- 时间嵌入（Time Embedding）：让模型知道当前处于哪个扩散步骤
- 残差块（ResBlock）：基本的卷积构建块
- 自注意力（Self-Attention）：在瓶颈层增强特征表达
- U型结构：编码器-瓶颈-解码器，带跳跃连接

**模型架构**：Trans-UNet风格的1D版本


## 第7步：训练扩散模型

训练过程的核心：
1. **随机采样时间步**：从0到2000中随机选择
2. **添加噪声**：根据时间步向原始数据添加相应强度的噪声
3. **模型预测**：UNet预测原始数据x0（而不是噪声）
4. **计算损失**：MSE损失，比较预测的x0和真实x0
5. **反向传播**：更新模型参数

**训练配置**：
- 扩散步数：2000步
- 预测目标：x0（原始数据）
- 噪声调度：sqrt schedule


## 第8步：DDPM采样生成新序列

从训练好的扩散模型采样生成新的嵌入向量：

**采样过程**：
1. **从纯噪声开始**：随机初始化(batch_size, 48, 1024)的噪声张量
2. **逐步去噪**：从时间步2000到0，逐步去除噪声
3. **下采样**：将2000步压缩到200步以加速采样
4. **噪声选择**：可以选择高斯噪声或均匀噪声

**DDPM算法**：使用标准的DDPM逆向采样公式


## 第9步：序列解码 - 将嵌入转换回氨基酸序列

使用ProtT5解码器将生成的嵌入向量转换回氨基酸序列：

**解码过程**：
1. **去除零填充**：识别并移除主要为零的行（padding部分）
2. **包装为编码器输出**：将嵌入包装成ProtT5期望的格式
3. **调用解码器**：使用T5的generate方法生成token序列
4. **后处理**：清理生成的文本，只保留20种标准氨基酸

**注意事项**：
- 解码可能不完美，需要过滤无效序列
- 生成的序列长度可能与原始序列不同


## 第10步：结果分析和可视化

分析生成的序列质量：

**质量评估**：
1. **基本统计**：序列长度分布、氨基酸组成
2. **与训练集对比**：比较生成序列和原始训练序列的特征
3. **有效性检查**：过滤掉无效或过短的序列
4. **多样性分析**：检查生成序列的多样性

**可视化**：
- 训练损失曲线
- 生成序列长度分布
- 氨基酸组成热图
- 训练集vs生成集对比


## 总结和下一步

### 🎯 我们完成的工作：

1. **数据准备**：准备了抗菌肽序列作为训练数据
2. **序列编码**：使用ProtT5编码器将氨基酸序列转换为1024维嵌入向量
3. **扩散建模**：理解了扩散过程的前向和反向过程
4. **网络架构**：构建了Trans-UNet风格的1D去噪网络
5. **模型训练**：训练扩散模型学习从噪声中恢复序列嵌入
6. **序列生成**：使用DDPM采样生成新的序列嵌入
7. **序列解码**：将嵌入转换回氨基酸序列
8. **结果评估**：分析生成序列的质量和多样性

### 🚀 改进方向：

1. **更多数据**：使用更大的AMP数据集训练
2. **模型优化**：调整网络架构和超参数
3. **条件生成**：根据特定性质（如长度、活性）生成序列
4. **质量筛选**：添加判别器或规则筛选高质量序列
5. **预训练微调**：实现论文中的预训练+微调策略

### 📚 进一步学习：

- 扩散模型的数学原理
- 蛋白质序列的生物学特性
- 抗菌肽的结构-功能关系
- 更高级的生成模型技术
