# Prokka → ESM3 → DALI Workflow

这个工作流程将：
1. 接受 FNA（核酸序列）文件作为输入
2. 使用 Prokka 进行基因注释和蛋白质预测
3. 将 Prokka 输出的蛋白质序列逐条输入 ESM3 进行结构预测
4. 生成符合 DALI 输入标准的 PDB 文件
5. 打包所有结果供下载

## 系统要求
- Google Colab (推荐使用 GPU 运行时)
- 约 10-20 GB 磁盘空间
- 运行时间取决于序列数量和长度

## 1. 环境检测与设置

In [None]:
import sys
import os
from pathlib import Path

# 检测运行环境
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("✓ 运行在 Google Colab")
    from google.colab import files, drive
    
    # 检查 GPU
    import torch
    if torch.cuda.is_available():
        print(f"✓ GPU 可用: {torch.cuda.get_device_name(0)}")
        print(f"  显存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
    else:
        print("⚠ 未检测到 GPU，建议在运行时设置中启用 GPU")
else:
    print("✓ 运行在本地环境")

# 设置工作目录
WORK_DIR = Path("/content/prokka_esm3_workflow") if IN_COLAB else Path("./prokka_esm3_output")
WORK_DIR.mkdir(exist_ok=True, parents=True)
os.chdir(WORK_DIR)

print(f"\n工作目录: {WORK_DIR}")

## 2. 安装依赖

使用 mamba 加速安装过程

In [None]:
# 安装 Mambaforge
import os
import subprocess
import sys

mamba_path = os.path.expanduser("~/mambaforge/bin/mamba")

if not os.path.exists(mamba_path):
    print("正在下载 Mambaforge...")
    
    # 下载文件
    installer = "Mambaforge-Linux-x86_64.sh"
    url = "https://github.com/conda-forge/miniforge/releases/latest/download/" + installer
    
    result = subprocess.run(
        ["wget", "-q", "-O", installer, url],
        capture_output=True
    )
    
    if result.returncode != 0:
        print("下载失败，尝试使用 curl...")
        result = subprocess.run(
            ["curl", "-L", "-o", installer, url],
            capture_output=True
        )
    
    if os.path.exists(installer):
        print("正在安装 Mambaforge...")
        subprocess.run(
            ["bash", installer, "-b", "-p", os.path.expanduser("~/mambaforge")],
            check=True
        )
        os.remove(installer)
        print("✓ Mambaforge 安装完成")
    else:
        raise RuntimeError("无法下载 Mambaforge 安装程序")
else:
    print("✓ Mamba 已安装")

# 设置 PATH
mamba_bin = os.path.expanduser("~/mambaforge/bin")
if mamba_bin not in os.environ.get('PATH', ''):
    os.environ['PATH'] = f"{mamba_bin}:{os.environ.get('PATH', '')}"

# 验证安装
result = subprocess.run(
    [mamba_path, "--version"],
    capture_output=True,
    text=True
)
if result.returncode == 0:
    print(f"Mamba 版本: {result.stdout.strip()}")
else:
    print("警告: Mamba 验证失败")

In [None]:
# 安装 Prokka
import os
import subprocess

mamba_path = os.path.expanduser("~/mambaforge/bin/mamba")
prokka_path = os.path.expanduser("~/mambaforge/bin/prokka")

if not os.path.exists(prokka_path):
    print("正在安装 Prokka（这可能需要几分钟）...")
    
    # 使用 subprocess 安装
    result = subprocess.run(
        [
            mamba_path, "install", "-y",
            "-c", "conda-forge",
            "-c", "bioconda",
            "-c", "defaults",
            "prokka"
        ],
        capture_output=True,
        text=True
    )
    
    if result.returncode == 0:
        print("✓ Prokka 安装完成")
        
        # 设置数据库
        print("正在设置 Prokka 数据库...")
        subprocess.run(
            [prokka_path, "--setupdb"],
            capture_output=True
        )
    else:
        print(f"安装失败: {result.stderr}")
        raise RuntimeError("Prokka 安装失败")
else:
    print("✓ Prokka 已安装")

# 验证安装
result = subprocess.run(
    [prokka_path, "--version"],
    capture_output=True,
    text=True
)
if result.returncode == 0:
    print(f"Prokka 版本: {result.stdout.strip()}")
else:
    print("警告: Prokka 验证失败")

In [None]:
# 安装 Python 依赖
!pip install -q esm biopython tqdm torch

## 3. 导入必要的库

In [None]:
import subprocess
import shutil
from datetime import datetime
from typing import List, Optional
import zipfile

import torch
from Bio import SeqIO
from tqdm.auto import tqdm

print("✓ 所有库导入成功")

## 4. 定义工作流函数

In [None]:
class ProkkaESM3Pipeline:
    """
    Prokka -> ESM3 -> DALI 工作流管道
    """
    
    def __init__(self, work_dir: Path):
        self.work_dir = Path(work_dir)
        self.prokka_dir = self.work_dir / "prokka_output"
        self.pdb_dir = self.work_dir / "esm3_structures"
        self.dali_dir = self.work_dir / "dali_ready"
        
        # 创建目录
        for d in [self.prokka_dir, self.pdb_dir, self.dali_dir]:
            d.mkdir(exist_ok=True, parents=True)
        
        self.model = None
        self.device = None
    
    def run_prokka(self, 
                   fna_file: Path, 
                   prefix: str = "sample",
                   kingdom: str = "Bacteria",
                   cpus: int = 2,
                   **kwargs) -> Path:
        """
        运行 Prokka 进行基因注释
        
        Args:
            fna_file: 输入的 FNA 文件路径
            prefix: 输出文件前缀
            kingdom: 生物界（Bacteria, Archaea, Viruses）
            cpus: 使用的 CPU 核心数
            **kwargs: 其他 Prokka 参数
        
        Returns:
            Prokka 输出目录路径
        """
        print(f"\n{'='*60}")
        print("步骤 1: 运行 Prokka 进行基因注释")
        print(f"{'='*60}")
        
        output_dir = self.prokka_dir / prefix
        
        # 构建 Prokka 命令
        cmd = [
            os.path.expanduser("~/mambaforge/bin/prokka"),
            "--outdir", str(output_dir),
            "--prefix", prefix,
            "--kingdom", kingdom,
            "--cpus", str(cpus),
            "--force",  # 覆盖已存在的输出
        ]
        
        # 添加额外参数
        for key, value in kwargs.items():
            cmd.extend([f"--{key}", str(value)])
        
        cmd.append(str(fna_file))
        
        print(f"运行命令: {' '.join(cmd)}")
        print("\n正在运行 Prokka（这可能需要几分钟）...")
        
        try:
            result = subprocess.run(cmd, check=True, capture_output=True, text=True)
            print("\n✓ Prokka 运行成功！")
            
            # 显示统计信息
            stats_file = output_dir / f"{prefix}.txt"
            if stats_file.exists():
                print("\n注释统计:")
                print(stats_file.read_text())
            
            return output_dir
            
        except subprocess.CalledProcessError as e:
            print(f"\n✗ Prokka 运行失败: {e}")
            print(f"错误输出: {e.stderr}")
            raise
    
    def load_esm3_model(self, model_name: str = 'esm3-sm-open-v1'):
        """
        加载 ESM3 模型
        
        Args:
            model_name: ESM3 模型名称
        """
        print(f"\n{'='*60}")
        print("步骤 2: 加载 ESM3 模型")
        print(f"{'='*60}")
        
        # 自动检测设备
        if torch.cuda.is_available():
            self.device = 'cuda'
            print(f"使用 GPU: {torch.cuda.get_device_name(0)}")
        else:
            self.device = 'cpu'
            print("使用 CPU（建议启用 GPU 以加速）")
        
        from esm.models.esm3 import ESM3
        
        print(f"\n正在加载模型: {model_name}...")
        self.model = ESM3.from_pretrained(model_name).to(self.device)
        self.model.eval()
        
        print("✓ ESM3 模型加载成功！")
    
    def predict_structures(self,
                          prokka_dir: Path,
                          prefix: str,
                          num_steps: int = 8,
                          max_length: int = 400,
                          min_length: int = 30) -> List[Path]:
        """
        使用 ESM3 预测蛋白质结构
        
        Args:
            prokka_dir: Prokka 输出目录
            prefix: Prokka 输出前缀
            num_steps: ESM3 生成步数
            max_length: 最大序列长度（过长的序列会被跳过）
            min_length: 最小序列长度
        
        Returns:
            生成的 PDB 文件路径列表
        """
        print(f"\n{'='*60}")
        print("步骤 3: 使用 ESM3 预测蛋白质结构")
        print(f"{'='*60}")
        
        if self.model is None:
            self.load_esm3_model()
        
        from esm.sdk.api import ESMProtein, GenerationConfig
        
        # 读取 Prokka 输出的蛋白质序列
        faa_file = prokka_dir / f"{prefix}.faa"
        
        if not faa_file.exists():
            raise FileNotFoundError(f"找不到 Prokka 蛋白质文件: {faa_file}")
        
        # 解析序列
        sequences = list(SeqIO.parse(faa_file, "fasta"))
        print(f"\n从 Prokka 读取到 {len(sequences)} 条蛋白质序列")
        
        # 过滤序列
        filtered_seqs = [
            seq for seq in sequences 
            if min_length <= len(seq.seq) <= max_length
        ]
        
        skipped = len(sequences) - len(filtered_seqs)
        if skipped > 0:
            print(f"跳过 {skipped} 条序列（长度不在 {min_length}-{max_length} 范围内）")
        
        print(f"将预测 {len(filtered_seqs)} 条序列的结构\n")
        
        pdb_files = []
        success_count = 0
        error_count = 0
        
        # 逐条预测
        for rec in tqdm(filtered_seqs, desc="预测结构"):
            try:
                seq = str(rec.seq)
                # 清理 ID 以用作文件名
                name = rec.id.replace('|', '_').replace('/', '_').replace('\\', '_')[:100]
                pdb_file = self.pdb_dir / f"{name}.pdb"
                
                # 跳过已存在的文件
                if pdb_file.exists():
                    pdb_files.append(pdb_file)
                    success_count += 1
                    continue
                
                # 使用 ESM3 生成结构
                protein = ESMProtein(sequence=seq)
                protein = self.model.generate(
                    protein, 
                    GenerationConfig(track='structure', num_steps=num_steps)
                )
                
                # 保存为 PDB
                protein.to_pdb(str(pdb_file))
                pdb_files.append(pdb_file)
                success_count += 1
                
            except Exception as e:
                print(f"\n预测失败 {rec.id}: {e}")
                error_count += 1
                continue
        
        print(f"\n✓ 结构预测完成！")
        print(f"  成功: {success_count}")
        print(f"  失败: {error_count}")
        
        return pdb_files
    
    def prepare_for_dali(self, pdb_files: List[Path]) -> Path:
        """
        准备符合 DALI 输入标准的文件
        
        Args:
            pdb_files: PDB 文件路径列表
        
        Returns:
            DALI 输出目录路径
        """
        print(f"\n{'='*60}")
        print("步骤 4: 准备 DALI 输入文件")
        print(f"{'='*60}")
        
        # 复制 PDB 文件到 DALI 目录
        for pdb_file in tqdm(pdb_files, desc="复制文件"):
            dest = self.dali_dir / pdb_file.name
            if not dest.exists():
                shutil.copy2(pdb_file, dest)
        
        # 创建文件列表
        file_list = self.dali_dir / "pdb_list.txt"
        with open(file_list, 'w') as f:
            for pdb_file in pdb_files:
                f.write(f"{pdb_file.name}\n")
        
        # 创建 README
        readme = self.dali_dir / "README.txt"
        with open(readme, 'w') as f:
            f.write("DALI 输入文件\n")
            f.write("="*60 + "\n\n")
            f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"PDB 文件数量: {len(pdb_files)}\n\n")
            f.write("使用说明:\n")
            f.write("1. 这些 PDB 文件可直接用于 DALI 结构比对\n")
            f.write("2. 访问 DALI 服务器: http://ekhidna2.biocenter.helsinki.fi/dali/\n")
            f.write("3. 上传单个或多个 PDB 文件进行比对分析\n")
            f.write("4. pdb_list.txt 包含所有 PDB 文件的列表\n")
        
        print(f"\n✓ DALI 文件准备完成！")
        print(f"  位置: {self.dali_dir}")
        print(f"  文件数: {len(pdb_files)}")
        
        return self.dali_dir
    
    def create_download_package(self, prefix: str) -> Path:
        """
        创建可下载的压缩包
        
        Args:
            prefix: 输出文件前缀
        
        Returns:
            压缩包路径
        """
        print(f"\n{'='*60}")
        print("步骤 5: 创建下载包")
        print(f"{'='*60}")
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        zip_file = self.work_dir / f"{prefix}_results_{timestamp}.zip"
        
        with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zf:
            # 添加 Prokka 结果
            print("\n打包 Prokka 结果...")
            for file in self.prokka_dir.rglob('*'):
                if file.is_file():
                    arcname = file.relative_to(self.work_dir)
                    zf.write(file, arcname)
            
            # 添加 ESM3 结构
            print("打包 ESM3 结构...")
            for file in self.pdb_dir.rglob('*.pdb'):
                arcname = file.relative_to(self.work_dir)
                zf.write(file, arcname)
            
            # 添加 DALI 文件
            print("打包 DALI 文件...")
            for file in self.dali_dir.rglob('*'):
                if file.is_file():
                    arcname = file.relative_to(self.work_dir)
                    zf.write(file, arcname)
        
        size_mb = zip_file.stat().st_size / (1024 * 1024)
        print(f"\n✓ 压缩包创建完成！")
        print(f"  文件: {zip_file.name}")
        print(f"  大小: {size_mb:.2f} MB")
        
        return zip_file
    
    def run_full_pipeline(self,
                         fna_file: Path,
                         prefix: str = "sample",
                         kingdom: str = "Bacteria",
                         num_steps: int = 8,
                         max_seq_length: int = 400,
                         **prokka_kwargs) -> Path:
        """
        运行完整工作流
        
        Args:
            fna_file: 输入的 FNA 文件
            prefix: 输出文件前缀
            kingdom: 生物界
            num_steps: ESM3 生成步数
            max_seq_length: 最大序列长度
            **prokka_kwargs: Prokka 额外参数
        
        Returns:
            下载包路径
        """
        start_time = datetime.now()
        print(f"\n{'='*60}")
        print("Prokka → ESM3 → DALI 工作流")
        print(f"{'='*60}")
        print(f"开始时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"输入文件: {fna_file}")
        print(f"输出前缀: {prefix}")
        
        # 步骤 1: Prokka 注释
        prokka_dir = self.run_prokka(fna_file, prefix, kingdom, **prokka_kwargs)
        
        # 步骤 2-3: ESM3 结构预测
        pdb_files = self.predict_structures(
            prokka_dir, 
            prefix, 
            num_steps=num_steps,
            max_length=max_seq_length
        )
        
        # 步骤 4: 准备 DALI 文件
        self.prepare_for_dali(pdb_files)
        
        # 步骤 5: 创建下载包
        zip_file = self.create_download_package(prefix)
        
        end_time = datetime.now()
        duration = end_time - start_time
        
        print(f"\n{'='*60}")
        print("✓ 工作流完成！")
        print(f"{'='*60}")
        print(f"总耗时: {duration}")
        print(f"\n结果文件: {zip_file}")
        
        return zip_file

print("✓ 工作流类定义完成")

## 5. 上传输入文件

上传你的 FNA（核酸序列）文件

In [None]:
if IN_COLAB:
    print("请上传 FNA 文件...")
    uploaded = files.upload()
    
    # 获取上传的文件
    fna_files = [f for f in uploaded.keys() if f.endswith(('.fna', '.fa', '.fasta'))]
    
    if not fna_files:
        raise ValueError("未找到 FNA 文件，请确保上传的是 .fna/.fa/.fasta 格式的文件")
    
    input_fna = Path(fna_files[0])
    print(f"\n✓ 上传成功: {input_fna}")
    print(f"  文件大小: {input_fna.stat().st_size / 1024:.2f} KB")
else:
    # 本地环境：手动指定文件路径
    input_fna = Path("example.fna")  # 修改为你的文件路径
    
    if not input_fna.exists():
        print(f"错误: 文件不存在 {input_fna}")
        print("请修改上面的代码，指定正确的 FNA 文件路径")
    else:
        print(f"✓ 输入文件: {input_fna}")

## 6. 配置工作流参数

In [None]:
# 基本参数
OUTPUT_PREFIX = "my_genome"  # 输出文件前缀
KINGDOM = "Bacteria"  # 生物界: Bacteria, Archaea, 或 Viruses

# Prokka 参数
GENUS = None  # 例如: "Escherichia" (可选)
SPECIES = None  # 例如: "coli" (可选)
STRAIN = None  # 例如: "K12" (可选)

# ESM3 参数
NUM_STEPS = 8  # 生成步数 (8-16，越大越慢但质量可能更好)
MAX_SEQ_LENGTH = 400  # 最大序列长度（超过此长度的序列会被跳过）
MIN_SEQ_LENGTH = 30  # 最小序列长度

# CPU 核心数
CPUS = 2

print("配置参数:")
print(f"  输出前缀: {OUTPUT_PREFIX}")
print(f"  生物界: {KINGDOM}")
print(f"  ESM3 步数: {NUM_STEPS}")
print(f"  序列长度范围: {MIN_SEQ_LENGTH}-{MAX_SEQ_LENGTH}")

## 7. 运行工作流

这将执行以下步骤：
1. Prokka 基因注释
2. ESM3 结构预测
3. DALI 文件准备
4. 创建下载包

In [None]:
# 创建管道实例
pipeline = ProkkaESM3Pipeline(WORK_DIR)

# 准备 Prokka 参数
prokka_params = {
    'cpus': CPUS,
}

if GENUS:
    prokka_params['genus'] = GENUS
if SPECIES:
    prokka_params['species'] = SPECIES
if STRAIN:
    prokka_params['strain'] = STRAIN

# 运行完整工作流
try:
    result_zip = pipeline.run_full_pipeline(
        fna_file=input_fna,
        prefix=OUTPUT_PREFIX,
        kingdom=KINGDOM,
        num_steps=NUM_STEPS,
        max_seq_length=MAX_SEQ_LENGTH,
        **prokka_params
    )
    
    print(f"\n{'='*60}")
    print("工作流执行成功！")
    print(f"{'='*60}")
    print(f"\n结果已打包到: {result_zip}")
    
except Exception as e:
    print(f"\n{'='*60}")
    print("工作流执行失败")
    print(f"{'='*60}")
    print(f"错误: {e}")
    import traceback
    traceback.print_exc()

## 8. 下载结果

下载包含所有结果的压缩文件

In [None]:
if IN_COLAB and 'result_zip' in locals():
    print("正在准备下载...")
    files.download(str(result_zip))
    print("\n✓ 下载已开始！")
    print("\n压缩包内容:")
    print("  - prokka_output/: Prokka 注释结果")
    print("  - esm3_structures/: ESM3 预测的 PDB 结构")
    print("  - dali_ready/: 符合 DALI 标准的文件")
else:
    print(f"结果文件位置: {result_zip}")
    print("\n如果是本地环境，请直接访问上述路径获取结果")

## 9. 查看结果摘要

In [None]:
# 统计结果
print("\n" + "="*60)
print("结果摘要")
print("="*60)

# Prokka 结果
prokka_faa = pipeline.prokka_dir / OUTPUT_PREFIX / f"{OUTPUT_PREFIX}.faa"
if prokka_faa.exists():
    prokka_proteins = list(SeqIO.parse(prokka_faa, "fasta"))
    print(f"\nProkka 注释结果:")
    print(f"  蛋白质数量: {len(prokka_proteins)}")

# ESM3 结果
pdb_files = list(pipeline.pdb_dir.glob("*.pdb"))
print(f"\nESM3 结构预测:")
print(f"  PDB 文件数: {len(pdb_files)}")

# DALI 文件
dali_files = list(pipeline.dali_dir.glob("*.pdb"))
print(f"\nDALI 输入文件:")
print(f"  准备就绪的文件: {len(dali_files)}")

print(f"\n" + "="*60)
print("\n下一步:")
print("1. 下载并解压结果文件")
print("2. 查看 prokka_output/ 中的注释结果")
print("3. 在 dali_ready/ 中找到可用于 DALI 的 PDB 文件")
print("4. 访问 DALI 服务器进行结构比对:")
print("   http://ekhidna2.biocenter.helsinki.fi/dali/")

## 10. 可选：单独查看某个 PDB 结构

In [None]:
# 列出所有生成的 PDB 文件
pdb_files = sorted(pipeline.pdb_dir.glob("*.pdb"))

if pdb_files:
    print(f"共 {len(pdb_files)} 个 PDB 文件:\n")
    for i, pdb in enumerate(pdb_files[:10], 1):  # 只显示前10个
        print(f"{i}. {pdb.name}")
    
    if len(pdb_files) > 10:
        print(f"... 还有 {len(pdb_files) - 10} 个文件")
    
    # 如果想查看某个文件内容，可以运行：
    # print("\n第一个 PDB 文件内容（前20行）:")
    # with open(pdb_files[0]) as f:
    #     for i, line in enumerate(f):
    #         if i >= 20:
    #             break
    #         print(line.rstrip())
else:
    print("未找到 PDB 文件")

## 附录：故障排查

### 常见问题

1. **Prokka 安装失败**
   - 确保正确安装了 mamba
   - 尝试重新运行安装单元格

2. **ESM3 显存不足**
   - 减小 `MAX_SEQ_LENGTH` 参数
   - 使用更小的模型或 CPU 模式

3. **Prokka 运行时间过长**
   - 正常情况，取决于输入文件大小
   - 可以增加 `CPUS` 参数

4. **找不到蛋白质序列**
   - 检查输入的 FNA 文件格式是否正确
   - 确认 Prokka 成功运行

### 性能优化

- 使用 GPU 运行时可大幅加速 ESM3
- 调整 `NUM_STEPS` 平衡速度和质量
- 对于大量序列，考虑批量处理