# 溶剂可及性分析工具包 - 详细教程

本教程将逐步讲解溶剂可及性分析工具包的核心概念、代码实现、性能优化和并行化设计。

## 目录

1. [项目简介](#1-项目简介)
2. [环境配置](#2-环境配置)
3. [核心概念](#3-核心概念)
4. [数据模型](#4-数据模型)
5. [算法实现](#5-算法实现)
6. [性能优化](#6-性能优化)
7. [并行化设计](#7-并行化设计)
8. [基准测试](#8-基准测试)
9. [扩展开发](#9-扩展开发)

## 1. 项目简介

蛋白质折叠由疏水效应驱动，促使疏水残基埋藏于蛋白核心，亲水残基暴露于水环境。准确识别溶剂可及残基对预测蛋白-蛋白相互作用、抗原表位等功能区域至关重要。

传统方法基于几何表面积计算（如FreeSASA），本方法直接评估残基与显式水分子的接近程度，提供更物理真实的溶剂可及性度量。

**核心挑战**：高效处理显式溶剂化系统中的大量原子对距离计算（时间复杂度$O(N×M)$）。

**解决方案**：
- **质心法**：计算残基质心到最近水分子的距离
- **原子级方法**：统计残基中可接触水分子（距离<阈值）的原子比例
- **性能优化**：分块计算、KDTree空间索引、向量化操作、并行计算

## 2. 环境配置

运行`setup.sh`安装依赖。导入并验证必要的依赖。

In [1]:
import sys
import numpy as np
import scipy
import Bio
import freesasa

print(f"Python版本: {sys.version}")
print(f"NumPy版本: {np.__version__}")
print(f"SciPy版本: {scipy.__version__}")
print(f"BioPython版本: {Bio.__version__}")
print(
    f"FreeSASA版本: {freesasa.__version__ if hasattr(freesasa, '__version__') else '2.2.1 (通过pip安装)'}"
)

Python版本: 3.14.0 (tags/v3.14.0:ebf955d, Oct  7 2025, 10:15:03) [MSC v.1944 64 bit (AMD64)]
NumPy版本: 2.3.5
SciPy版本: 1.16.3
BioPython版本: 1.86
FreeSASA版本: 2.2.1 (通过pip安装)


## 3. 核心概念

### 3.1 溶剂可及性定义

在蛋白质结构中，每个氨基酸残基的溶剂可及性（Solvent Accessibility）描述该残基与水分子接触的程度：

- **可及残基**：残基表面充分暴露于水环境
- **埋藏残基**：残基表面被其他残基包围，与水隔离

### 3.2 计算方法对比

| 方法 | 原理 | 优点 | 缺点 |
|------|------|------|------|
| **几何法**（FreeSASA） | 计算原子球体的溶剂可及表面积 | 标准方法，结果可比较 | 忽略水分子的显式分布 |
| **质心法**（本工具） | 计算残基质心到最近水分子的距离 | 计算简单，速度快 | 忽略残基形状 |
| **原子级方法**（本工具） | 统计残基中可接触水分子的原子比例 | 考虑原子级细节，更精确 | 计算量较大 |

> 有一个需要注意——我们的方法可能只对某一瞬间的溶剂可及性更精准，要总体更准可能得动态分析，即处理大量的切片xxx_water.pdb并做统计分析。

## 4. 数据模型

项目使用Python `dataclass` 定义核心数据结构：

In [2]:
# 导入项目模块
import sys

sys.path.append(".")

from core.data_models import (
    ResidueInfo,
    WaterInfo,
    AnalysisConfig,
    MethodType,
)

# 创建数据模型实例
residue = ResidueInfo(
    chain="A", resnum=1, resname="ALA", coord=np.array([1.0, 2.0, 3.0])
)

water_coords = np.array([[1.5, 2.5, 3.5], [2.0, 3.0, 4.0]])
waters = WaterInfo(water_coords)

config = AnalysisConfig(
    threshold=3.5, radius=5.0, fraction_threshold=0.20, chunk_size=5000, num_processes=1
)

print(f"残基信息: {residue}")
print(f"水分子数: {len(waters.coords)}")
print(f"配置参数: {config}")

残基信息: ResidueInfo(chain='A', resnum=1, resname='ALA', coord=array([1., 2., 3.]))
水分子数: 2
配置参数: AnalysisConfig(threshold=3.5, margin=2.0, radius=5.0, fraction_threshold=0.2, min_hits=1, small_residue_size=5, chunk_size=5000, num_processes=1, small_residues=('GLY', 'ALA', 'SER', 'THR', 'CYS', 'PRO'), sasa_threshold=10.0)


## 5. 算法实现

### 5.1 距离计算器接口

项目使用策略模式定义距离计算器抽象基类：

In [3]:
from core.distance_calculator import ChunkedDistanceCalculator

# 创建距离计算器实例
calculator = ChunkedDistanceCalculator(chunk_size=5000, num_processes=1)

# 模拟测试数据
test_residues = [
    ResidueInfo("A", 1, "ALA", np.array([1.0, 2.0, 3.0])),
    ResidueInfo("A", 2, "GLY", np.array([4.0, 5.0, 6.0])),
    ResidueInfo("A", 3, "LEU", np.array([7.0, 8.0, 9.0])),
]

test_waters = WaterInfo(
    np.array([[1.5, 2.5, 3.5], [4.5, 5.5, 6.5], [7.5, 8.5, 9.5], [2.0, 3.0, 4.0]])
)

# 计算最小距离
min_distances = calculator.compute_min_distances(test_residues, test_waters)
print(f"最小距离: {min_distances}")

# 统计半径内的水分子数量
counts = calculator.count_waters_within_radius(test_residues, test_waters, radius=3.0)
print(f"半径3Å内的水分子数: {counts}")

最小距离: [0.8660254 0.8660254 0.8660254]
半径3Å内的水分子数: [2 1 1]


### 5.2 方法工厂

使用工厂模式统一创建算法实例：

In [4]:
from algorithms.method_factory import MethodFactory

# 创建质心法实例
centroid_config = AnalysisConfig(num_processes=1)
centroid_method = MethodFactory.create_method(MethodType.CENTROID, centroid_config)

# 创建原子级方法实例
peratom_config = AnalysisConfig(num_processes=1)
peratom_method = MethodFactory.create_method(MethodType.PERATOM, peratom_config)

print(f"质心法类型: {centroid_method.get_method_type()}")
print(f"原子级方法类型: {peratom_method.get_method_type()}")

质心法类型: MethodType.CENTROID
原子级方法类型: MethodType.PERATOM


## 6. 性能优化

### 6.1 分块计算

处理大规模系统时，分块计算可显著降低内存峰值：

In [5]:
# 比较不同分块大小的性能
import time


def benchmark_chunk_sizes(residue_count=1000, water_count=10000):
    """测试不同分块大小的性能"""
    # 生成测试数据
    residues = [
        ResidueInfo("A", i, "ALA", np.random.randn(3) * 10)
        for i in range(residue_count)
    ]
    waters = WaterInfo(np.random.randn(water_count, 3) * 20)

    chunk_sizes = [100, 500, 1000, 5000, 10000]
    results = {}

    for chunk_size in chunk_sizes:
        calculator = ChunkedDistanceCalculator(chunk_size=chunk_size)
        start_time = time.perf_counter()
        distances = calculator.compute_min_distances(residues, waters)
        end_time = time.perf_counter()
        results[chunk_size] = end_time - start_time

    return results


# 运行基准测试
results = benchmark_chunk_sizes(residue_count=500, water_count=5000)
print("不同分块大小的计算时间:")
for chunk_size, elapsed in results.items():
    print(f"  chunk_size={chunk_size}: {elapsed:.4f}s")

不同分块大小的计算时间:
  chunk_size=100: 0.0498s
  chunk_size=500: 0.0520s
  chunk_size=1000: 0.0502s
  chunk_size=5000: 0.0422s
  chunk_size=10000: 0.0416s


### 6.2 KDTree空间索引

使用SciPy的KDTree加速最近邻搜索：

In [6]:
from scipy.spatial import KDTree


# 演示KDTree加速效果
def compare_search_methods(n_points=1000, n_queries=100):
    """比较暴力搜索和KDTree搜索的性能"""
    # 生成随机点
    points = np.random.randn(n_points, 3) * 10
    query_points = np.random.randn(n_queries, 3) * 10

    # 暴力搜索
    start_time = time.perf_counter()
    brute_force_results = []
    for q in query_points:
        distances = np.linalg.norm(points - q, axis=1)
        min_idx = np.argmin(distances)
        brute_force_results.append((min_idx, distances[min_idx]))
    brute_time = time.perf_counter() - start_time

    # KDTree搜索
    start_time = time.perf_counter()
    tree = KDTree(points)
    distances, indices = tree.query(query_points, k=1)
    kdtree_time = time.perf_counter() - start_time

    return brute_time, kdtree_time


brute_time, kdtree_time = compare_search_methods(n_points=1000, n_queries=100)
print(f"暴力搜索时间: {brute_time:.4f}s")
print(f"KDTree搜索时间: {kdtree_time:.4f}s")
print(f"加速比: {brute_time/kdtree_time:.2f}x")

暴力搜索时间: 0.0015s
KDTree搜索时间: 0.0002s
加速比: 6.03x


## 7. 并行化设计

### 7.1 并行化命题验证

**命题**："由于每个氨基酸与其最近的水分子距离计算不依赖于以前的氨基酸的计算过程，理论上在建好树后每个氨基酸都可以各自并行计算"

**验证**：命题成立
- 计算独立性：构建KDTree后，每个残基查询完全独立
- 只读操作：KDTree查询是线程安全的只读操作
- 无状态共享：符合"embarrassingly parallel"问题特征

### 7.2 并行化组件

In [7]:
from core.parallel import ParallelKDTreeQuery


# 演示并行查询
def demo_parallel_query():
    # 创建测试数据
    n_points = 100
    points = np.random.randn(n_points, 3) * 10
    tree = KDTree(points)

    # 查询点
    query_points = np.random.randn(50, 3) * 10
    radius = 2.0

    # 串行查询
    start_time = time.perf_counter()
    serial_results = [tree.query_ball_point(p, radius) for p in query_points]
    serial_time = time.perf_counter() - start_time

    # 并行查询（2线程）
    parallel_query = ParallelKDTreeQuery(tree, num_workers=2)
    start_time = time.perf_counter()
    parallel_results = parallel_query.query_ball_point_parallel(query_points, radius)
    parallel_time = time.perf_counter() - start_time

    # 验证结果一致性
    consistent = all(len(s) == len(p) for s, p in zip(serial_results, parallel_results))

    return serial_time, parallel_time, consistent


serial_time, parallel_time, consistent = demo_parallel_query()
print(f"串行查询时间: {serial_time:.4f}s")
print(f"并行查询时间: {parallel_time:.4f}s")
print(f"加速比: {serial_time/parallel_time:.2f}x")
print(f"结果一致性: {'✓' if consistent else '✗'}")

串行查询时间: 0.0002s
并行查询时间: 0.0012s
加速比: 0.21x
结果一致性: ✓


## 8. 基准测试

### 8.1 完整系统测试

使用项目提供的PDB文件进行端到端测试：

In [8]:
# 加载PDB文件
from io_utils.pdb_loader import PDBLoader


def analyze_pdb_file(pdb_path):
    """分析单个PDB文件"""
    loader = PDBLoader(quiet=True)
    residues, waters, structure = loader.load(pdb_path)

    print(f"文件: {pdb_path}")
    print(f"残基数: {len(residues)}")
    print(f"水分子数: {len(waters.coords) if waters.coords is not None else 0}")

    # 测试两种方法
    config = AnalysisConfig(num_processes=1)

    # 质心法
    centroid_method = MethodFactory.create_method(MethodType.CENTROID, config)
    start_time = time.perf_counter()
    centroid_results = centroid_method.analyze(residues, waters, structure)
    centroid_time = time.perf_counter() - start_time
    centroid_accessible = sum(1 for r in centroid_results if r.accessible)

    # 原子级方法
    peratom_method = MethodFactory.create_method(MethodType.PERATOM, config)
    start_time = time.perf_counter()
    peratom_results = peratom_method.analyze(residues, waters, structure)
    peratom_time = time.perf_counter() - start_time
    peratom_accessible = sum(1 for r in peratom_results if r.accessible)

    print(
        f"质心法: {centroid_time:.3f}s, 可及残基: {centroid_accessible}/{len(centroid_results)}"
    )
    print(
        f"原子级方法: {peratom_time:.3f}s, 可及残基: {peratom_accessible}/{len(peratom_results)}"
    )
    print()

    return {
        "centroid_time": centroid_time,
        "centroid_accessible": centroid_accessible,
        "peratom_time": peratom_time,
        "peratom_accessible": peratom_accessible,
    }


# 测试可用PDB文件
import os

pdb_files = []
if os.path.exists("./pdb"):
    pdb_files = [f for f in os.listdir("./pdb") if f.endswith(".pdb")]
    pdb_files = pdb_files[:3]  # 测试前3个文件

results = {}
for pdb_file in pdb_files:
    try:
        file_path = os.path.join("./pdb", pdb_file)
        results[pdb_file] = analyze_pdb_file(file_path)
    except Exception as e:
        print(f"分析文件 {pdb_file} 时出错: {e}")

文件: ./pdb\1TRZ.pdb
残基数: 102
水分子数: 123
质心法: 0.001s, 可及残基: 12/102
原子级方法: 0.002s, 可及残基: 51/102



文件: ./pdb\5LF3.pdb
残基数: 6223
水分子数: 3674


质心法: 0.429s, 可及残基: 485/6223
原子级方法: 0.540s, 可及残基: 2647/6223

文件: ./pdb\KRAS_water.pdb
残基数: 170
水分子数: 9932


质心法: 0.032s, 可及残基: 62/170
原子级方法: 0.036s, 可及残基: 139/170



### 8.2 并行化性能对比

使用项目提供的基准测试脚本：

In [9]:
# 运行基准测试脚本
!python benchmark_parallel.py --processes 1 2 --runs 2 --methods centroid


基准测试: centroid 方法
测试文件: SUMO1_water.pdb
进程数列表: [1, 2]
运行次数: 2
系统规模: 75 个残基, 5357 个水分子

1. 串行基准 (num_processes=1)
   运行 1: 0.008s, 可及残基: 33/75
   平均时间: 0.008s (±0.000s)

2. 并行测试 (num_processes=1)
     结果一致性: ✓ 全部匹配
   运行 1: 0.009s, 可及残基: 33/75
   运行 2: 0.008s
   平均时间: 0.008s (±0.001s)
   加速比: 1.02x (效率: 101.8%)

2. 并行测试 (num_processes=2)
     结果一致性: ✓ 全部匹配
   运行 1: 0.010s, 可及残基: 33/75
   运行 2: 0.009s
   平均时间: 0.009s (±0.000s)
   加速比: 0.87x (效率: 43.7%)

性能对比汇总

方法: CENTROID
系统: 75残基, 5357水分子
结果一致性: ✓

进程数      平均时间(s)      标准差        加速比        效率(%)     
--------------------------------------------------
1 (串行)   0.008        0.000      1.00       100.0     
1        0.008        0.001      1.02       101.8     
2        0.009        0.000      0.87       43.7      

扩展性分析

CENTROID 方法:
  无加速效果

详细结果已保存到: benchmark_results.txt


## 9. 扩展开发

### 9.1 添加新距离算法

实现`DistanceCalculator`抽象基类：

In [10]:
from core.distance_calculator import DistanceCalculator


class CustomDistanceCalculator(DistanceCalculator):
    """自定义距离计算器示例"""

    def __init__(self, custom_param: float = 1.0):
        self.custom_param = custom_param

    def compute_min_distances(self, residues, waters):
        """自定义最小距离计算"""
        if waters.is_empty():
            return np.full(len(residues), np.inf)

        # 示例：使用加权欧氏距离
        res_coords = np.vstack([r.coord for r in residues])
        water_coords = waters.coords

        # 计算加权距离
        n_res = len(residues)
        min_distances = np.full(n_res, np.inf)

        for i, r_coord in enumerate(res_coords):
            distances = np.linalg.norm(water_coords - r_coord, axis=1)
            weighted_distances = distances * self.custom_param  # 应用自定义权重
            min_distances[i] = np.min(weighted_distances)

        return min_distances

    def count_waters_within_radius(self, residues, waters, radius):
        """自定义半径内水分子统计"""
        if waters.is_empty():
            return np.zeros(len(residues), dtype=int)

        from scipy.spatial import KDTree

        res_coords = np.vstack([r.coord for r in residues])
        water_tree = KDTree(waters.coords)

        counts = np.zeros(len(residues), dtype=int)
        for i, coord in enumerate(res_coords):
            indices = water_tree.query_ball_point(coord, radius)
            counts[i] = len(indices)

        return counts


# 测试自定义计算器
custom_calc = CustomDistanceCalculator(custom_param=1.5)
print(f"自定义计算器创建成功: {custom_calc}")

自定义计算器创建成功: <__main__.CustomDistanceCalculator object at 0x000001EB4B1A34D0>


### 9.2 添加新评估规则

实现`AccessibilityEvaluator`抽象基类：

In [11]:
from core.accessibility_evaluator import AccessibilityEvaluator
from core.data_models import AccessibilityResult, MethodType


class CustomEvaluator(AccessibilityEvaluator):
    """自定义可及性评估器示例"""

    def __init__(self, method: MethodType = MethodType.CENTROID):
        """初始化自定义评估器

        Args:
            method: 评估器对应的方法类型，默认为质心法
        """
        self.method = method

    def evaluate(self, residues, min_distances, water_counts, config):
        """自定义评估逻辑"""
        results = []

        for i, residue in enumerate(residues):
            min_dist = min_distances[i]
            water_count = water_counts[i]

            # 自定义规则：结合距离和水分子数量
            if min_dist < config.threshold and water_count > 2:
                accessible = True
            else:
                accessible = False

            result = AccessibilityResult(
                residue=residue,
                min_distance=min_dist,
                water_count=water_count,
                accessible=accessible,
                method=self.method,  # 使用初始化时设置的方法类型
            )
            results.append(result)

        return results


# 测试自定义评估器
custom_eval = CustomEvaluator(method=MethodType.CENTROID)
print(f"自定义评估器创建成功: {custom_eval}")
print(f"评估器方法类型: {custom_eval.method}")

自定义评估器创建成功: <__main__.CustomEvaluator object at 0x000001EB4B1A3620>
评估器方法类型: MethodType.CENTROID


## 总结

本教程详细讲解了溶剂可及性分析工具包的：

1. **核心概念**：蛋白质溶剂可及性的物理意义和计算方法
2. **数据模型**：使用Python dataclass定义的核心数据结构
3. **算法实现**：质心法和原子级方法的策略模式设计
4. **性能优化**：分块计算、KDTree索引、向量化操作
5. **并行化设计**：基于Python 3.14自由线程特性的并行组件
6. **基准测试**：算法性能对比和并行化效果分析
7. **扩展开发**：如何添加新算法和评估规则

### 下一步

- 尝试分析自己的PDB文件
- 实现自定义距离算法或评估规则
- 优化性能参数（chunk_size, num_processes）
- 探索GPU加速的可能性（本质上就是超级并行，但是需要先分析计算精度）
- 若原始数据精度确定或精度过高，可以实现在读取时就通过乘系数将float计算变为int计算

### 参考资源

- [BioPython文档](https://biopython.org/)
- [FreeSASA文档](https://freesasa.github.io/)
- [SciPy空间算法](https://docs.scipy.org/doc/scipy/reference/spatial.html)