In [8]:
import os
import yaml
import numpy as np
import requests
from scipy.spatial import distance
from bioblocks.io import read_model, write_model
from bioblocks.transform import rename_chains
from anarci import anarci
from Bio import pairwise2
from Bio.pairwise2 import format_alignment
from Bio.PDB import PDBParser

# ========== 1️⃣ 读取配置 ==========
with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)

# 公共参数
scheme = config["common"]["scheme"]
similarity_threshold = config["common"]["similarity_threshold"]
ncpu = config["common"]["ncpu"]

# rename 配置
rename_cfg = config["rename"]
PDB_NAME = rename_cfg["PDB_NAME"]
PDB_DIR = rename_cfg["PDB_DIR"]
CIF_DIR = rename_cfg["CIF_DIR"]
PDB_FILE = os.path.join(PDB_DIR, PDB_NAME)
PDB_OUTPUT_DIR = PDB_DIR

# 默认取 CIF 文件夹下的第一个文件作为 AF3 对照
cif_files = sorted([f for f in os.listdir(CIF_DIR) if f.lower().endswith(".cif")])
AF3_example_cif = os.path.join(CIF_DIR, cif_files[0]) if cif_files else None

In [2]:
def fetch_pdb_biounit(pdb_id, assembly_id=1, output_file=None):
    """从 RCSB 下载指定 PDB 的生物装配结构"""
    pdb_id = pdb_id.lower()
    url = f"https://files.rcsb.org/download/{pdb_id}.pdb{assembly_id}"
    if output_file is None:
        output_file = f"{pdb_id}_pdb{assembly_id}.pdb"
    print(f"⬇️  Downloading biological assembly {assembly_id} for {pdb_id} ...")
    r = requests.get(url)
    if r.status_code != 200:
        raise ValueError(f"❌ 无法下载 {pdb_id} assembly {assembly_id} (HTTP {r.status_code})")
    with open(output_file, "wb") as f:
        f.write(r.content)
    print(f"✅ Saved -> {output_file}")
    return output_file

In [3]:
def classify_chain(seq, chain_id=None, scheme="kabat", ncpu=1):
    """使用 ANARCI 判断链类型（H, L, 或 None）"""
    name = chain_id if chain_id else "seq"
    alignment_results, hit_tables, domain_alignments = anarci([(name, seq)],
                                                              scheme=scheme,
                                                              ncpu=ncpu,
                                                              output=False)
    if not hit_tables or hit_tables[0] is None or len(hit_tables[0]) == 0:
        return "None"
    chain_type = hit_tables[0][0]["chain_type"]
    if chain_type == "H":
        return "H"
    elif chain_type in ["K", "L"]:
        return "L"
    else:
        return chain_id

In [4]:
def sequence_similarity(seq1, seq2):
    """计算序列相似度"""
    seq1, seq2 = seq1.strip().upper(), seq2.strip().upper()
    alignments = pairwise2.align.globalxx(seq1, seq2)
    best_alignment = alignments[0]
    aln_seq1, aln_seq2, score, start, end = best_alignment
    matches = sum(a == b for a, b in zip(aln_seq1, aln_seq2))
    identity = matches / max(len(seq1), len(seq2))
    return {
        "identity": identity,
        "alignment": format_alignment(*best_alignment)
    }


In [5]:
def cluster_complexes(structure, antibody_chains,
                      pair_distance_cutoff=25.0,
                      contact_cutoff=5.0):
    """根据空间距离聚类抗体复合物"""
    model = structure[0]
    chains = list(model.get_chains())
    chain_dict = {c.id: c for c in chains}

    H_chains = [cid for cid, t in antibody_chains if t == "H"]
    L_chains = [cid for cid, t in antibody_chains if t == "L"]
    antigen_chains = [cid for cid, t in antibody_chains if t == "None"]

    if not H_chains or not L_chains or not antigen_chains:
        return []

    chain_centroids = {}
    for cid, c in chain_dict.items():
        coords = np.array([a.coord for a in c.get_atoms()])
        if coords.size == 0:
            continue
        chain_centroids[cid] = coords.mean(axis=0)

    hl_pairs = []
    for H in H_chains:
        for L in L_chains:
            if H in chain_centroids and L in chain_centroids:
                dist_HL = np.linalg.norm(chain_centroids[H] - chain_centroids[L])
                if dist_HL <= pair_distance_cutoff:
                    hl_pairs.append((H, L))

    complexes = []
    for (H, L) in hl_pairs:
        H_atoms = np.array([a.coord for a in chain_dict[H].get_atoms()])
        L_atoms = np.array([a.coord for a in chain_dict[L].get_atoms()])
        hl_atoms = np.vstack([H_atoms, L_atoms])

        antigen_contacts = []
        for A in antigen_chains:
            A_atoms = np.array([a.coord for a in chain_dict[A].get_atoms()])
            if A_atoms.size == 0:
                continue
            min_dist = np.min(distance.cdist(hl_atoms, A_atoms))
            if min_dist <= contact_cutoff:
                antigen_contacts.append(A)

        complexes.append(set([H, L] + antigen_contacts))
    return complexes

In [9]:
pdb_id = PDB_NAME.split(".")[0]
if not os.path.exists(PDB_FILE):
    fetch_pdb_biounit(pdb_id, output_file=PDB_FILE)

model = read_model(PDB_FILE)
af3_model = read_model(AF3_example_cif)

⬇️  Downloading biological assembly 1 for 6hxw ...
✅ Saved -> /home/yuyang/lb_yaml/data/CD73/6hxw.pdb


In [10]:
mapping_dict = {}
antibody_chains = []
for chain in model.get_chains():
    seq = model[chain.id].sequence
    classification = classify_chain(seq, scheme=scheme, ncpu=ncpu)
    antibody_chains.append((chain.id, classification))
    mapping_dict[chain.id] = classification

In [11]:
# 抗原链处理
antigen_chain_ids = [k for k, v in mapping_dict.items() if v not in ["H", "L"]]
parser = PDBParser(QUIET=True)
structure = parser.get_structure("model", PDB_FILE)
complexes = cluster_complexes(structure, antibody_chains)

target_antigen_seq = af3_model["A"].sequence
similarity_results = {}

In [12]:
mapping_dict

{'A': 'None', 'B': 'None', 'C': 'H', 'D': 'L', 'H': 'H', 'L': 'L'}

In [13]:
for antigen_chain_id in antigen_chain_ids:
    potential_seq = model[antigen_chain_id].sequence.replace("X", "")
    result = sequence_similarity(target_antigen_seq, potential_seq)
    if result["identity"] > similarity_threshold:
        mapping_dict[antigen_chain_id] = "A"
        similarity_results[antigen_chain_id] = result

In [14]:
mapping_dict

{'A': 'A', 'B': 'A', 'C': 'H', 'D': 'L', 'H': 'H', 'L': 'L'}

In [15]:

if similarity_results:
    best_chain_id = max(similarity_results, key=lambda cid: similarity_results[cid]["identity"])
    mapping_dict[best_chain_id] = "A"
    selected_complex = next((c for c in complexes if best_chain_id in c), None)
    if selected_complex:
        mapping_dict = {
            cid: ("A" if cid == best_chain_id else mapping_dict.get(cid))
            for cid in selected_complex
            if cid in mapping_dict
        }

In [16]:
mapping_dict

{'H': 'H', 'A': 'A', 'L': 'L'}

In [17]:
new_model = rename_chains(model=model, rename_mapping=mapping_dict)


In [18]:
write_model(new_model, PDB_FILE)