In [1]:
from Bio.PDB import MMCIFParser
from Bio.PDB.PDBExceptions import PDBConstructionWarning
import warnings
import os

In [1]:
from Bio.PDB.MMCIF2Dict import MMCIF2Dict

def extract_link_info_from_mmcif(cif_file):
    """
    使用 MMCIF2Dict 从 mmCIF 文件中提取 LINK (struct_conn) 信息。

    Args:
        cif_file (str): mmCIF 文件的路径。

    Returns:
        list: 包含连接信息的字典列表。
    """
    mmcif_dict = MMCIF2Dict(cif_file)
    
    link_info = []
    
    # 检查文件中是否存在 _struct_conn 信息
    if '_struct_conn.id' not in mmcif_dict:
        print("文件中未找到 LINK (struct_conn) 信息。")
        return link_info

    num_connections = len(mmcif_dict['_struct_conn.id'])
    
    for i in range(num_connections):
        conn_info = {
            'id': mmcif_dict['_struct_conn.id'][i],
            'type': mmcif_dict.get('_struct_conn.conn_type_id', ['n/a'])[i],
            'partner1': {
                'chain': mmcif_dict.get('_struct_conn.ptnr1_label_asym_id', ['n/a'])[i],
                'residue_name': mmcif_dict.get('_struct_conn.ptnr1_label_comp_id', ['n/a'])[i],
                'residue_seq': mmcif_dict.get('_struct_conn.ptnr1_label_seq_id', ['n/a'])[i],
                'atom': mmcif_dict.get('_struct_conn.ptnr1_label_atom_id', ['n/a'])[i]
            },
            'partner2': {
                'chain': mmcif_dict.get('_struct_conn.ptnr2_label_asym_id', ['n/a'])[i],
                'residue_name': mmcif_dict.get('_struct_conn.ptnr2_label_comp_id', ['n/a'])[i],
                'residue_seq': mmcif_dict.get('_struct_conn.ptnr2_label_seq_id', ['n/a'])[i],
                'atom': mmcif_dict.get('_struct_conn.ptnr2_label_atom_id', ['n/a'])[i]
            }
        }
        link_info.append(conn_info)
        
    return link_info

# --- 示例使用 ---
# 假设你有一个名为 "1a2b.cif" 的文件
# 你可以从 RCSB PDB 数据库下载任何包含非标准连接的结构文件进行测试，例如 4HHB
# from Bio.PDB import PDBList
# pdbl = PDBList()
# pdbl.retrieve_pdb_file('4HHB', pdir='.', file_format='mmCif')

try:
    cif_file_path = '3a55.cif' # 将文件名替换为你的文件
    links = extract_link_info_from_mmcif(cif_file_path)

    for link in links:
        print(f"ID: {link['id']}, 类型: {link['type']}")
        p1 = link['partner1']
        p2 = link['partner2']
        print(f"  伙伴1: 链 {p1['chain']}, 残基 {p1['residue_name']} {p1['residue_seq']}, 原子 {p1['atom']}")
        print(f"  伙伴2: 链 {p2['chain']}, 残基 {p2['residue_name']} {p2['residue_seq']}, 原子 {p2['atom']}")
        print("-" * 20)

except FileNotFoundError:
    print("请确保 CIF 文件存在于正确的路径。")

In [None]:
import os
import argparse
import csv
import re
import math
from multiprocessing import Pool, cpu_count
from Bio.PDB.MMCIF2Dict import MMCIF2Dict
from tqdm import tqdm
from datetime import datetime

# --- 全局常量定义 ---
STANDARD_AMINO_ACIDS = {
    'ALA', 'ARG', 'ASN', 'ASP', 'CYS', 'GLU', 'GLN', 'GLY', 'HIS', 'ILE',
    'LEU', 'LYS', 'MET', 'PHE', 'PRO', 'SER', 'THR', 'TRW', 'TYR', 'VAL'
}
MAIN_CHAIN_ATOMS = {'N', 'CA', 'C', 'O', 'OXT'}
BOND_LENGTH_THRESHOLD = 5.0

def build_coordinate_lookup(mmcif_dict):
    """
    预扫描mmcif字典，构建一个原子坐标的速查字典。
    键: (链ID, 残基号, 插入码, 原子名) -> 值: (x, y, z)
    """
    lookup = {}
    keys = [
        '_atom_site.auth_asym_id', '_atom_site.auth_seq_id', 
        '_atom_site.pdbx_PDB_ins_code', '_atom_site.label_atom_id',
        '_atom_site.Cartn_x', '_atom_site.Cartn_y', '_atom_site.Cartn_z'
    ]
    
    # 检查所有必需的键是否存在
    if not all(key in mmcif_dict for key in keys):
        return None

    chain_ids, res_nums, ins_codes, atom_names, xs, ys, zs = [mmcif_dict.get(k, []) for k in keys]
    
    for i in range(len(chain_ids)):
        try:
            # 使用更完整的键，包含插入码(ins_code)
            key = (chain_ids[i], res_nums[i], ins_codes[i], atom_names[i])
            coords = (float(xs[i]), float(ys[i]), float(zs[i]))
            lookup[key] = coords
        except (ValueError, IndexError):
            continue
    return lookup

def calculate_distance(coords1, coords2):
    """计算两个三维坐标点之间的欧氏距离。"""
    return math.sqrt(sum([(a - b) ** 2 for a, b in zip(coords1, coords2)]))

def analyze_cif_with_length(file_path):
    """最终版分析逻辑：计算键长并根据阈值过滤。"""
    file_name = os.path.basename(file_path)
    found_bonds = []

    try:
        mmcif_dict = MMCIF2Dict(file_path)
        coord_lookup = build_coordinate_lookup(mmcif_dict)
        if coord_lookup is None:
            return file_name, []
            
        conn_ids = mmcif_dict.get('_struct_conn.id', [])
        if not conn_ids:
            return file_name, []

        conn_data = {k: mmcif_dict.get(k, []) for k in [
            '_struct_conn.conn_type_id',
            '_struct_conn.ptnr1_auth_asym_id', '_struct_conn.ptnr1_auth_comp_id', '_struct_conn.ptnr1_auth_seq_id', '_struct_conn.pdbx_ptnr1_PDB_ins_code', '_struct_conn.ptnr1_label_atom_id',
            '_struct_conn.ptnr2_auth_asym_id', '_struct_conn.ptnr2_auth_comp_id', '_struct_conn.ptnr2_auth_seq_id', '_struct_conn.pdbx_ptnr2_PDB_ins_code', '_struct_conn.ptnr2_label_atom_id'
        ]}

        for i in range(len(conn_ids)):
            key1 = (conn_data['_struct_conn.ptnr1_auth_asym_id'][i], conn_data['_struct_conn.ptnr1_auth_seq_id'][i], conn_data['_struct_conn.pdbx_ptnr1_PDB_ins_code'][i], conn_data['_struct_conn.ptnr1_label_atom_id'][i])
            key2 = (conn_data['_struct_conn.ptnr2_auth_asym_id'][i], conn_data['_struct_conn.ptnr2_auth_seq_id'][i], conn_data['_struct_conn.pdbx_ptnr2_PDB_ins_code'][i], conn_data['_struct_conn.ptnr2_label_atom_id'][i])
            
            coords1 = coord_lookup.get(key1)
            coords2 = coord_lookup.get(key2)

            if not coords1 or not coords2:
                continue

            distance = calculate_distance(coords1, coords2)
            if distance > BOND_LENGTH_THRESHOLD:
                continue

            bond_dict = {
                "p1_chain": key1[0], "p1_res_name": conn_data['_struct_conn.ptnr1_auth_comp_id'][i], "p1_res_num": key1[1], "p1_atom_name": key1[3],
                "p2_chain": key2[0], "p2_res_name": conn_data['_struct_conn.ptnr2_auth_comp_id'][i], "p2_res_num": key2[1], "p2_atom_name": key2[3],
                "distance": round(distance, 2)
            }
            
            conn_type = conn_data['_struct_conn.conn_type_id'][i]
            bond_type = None

            if conn_type == 'disulf':
                bond_type = 'disulfide'
                if bond_dict["p1_res_name"] not in STANDARD_AMINO_ACIDS or bond_dict["p2_res_name"] not in STANDARD_AMINO_ACIDS:
                    continue
            elif conn_type == 'covale':
                if bond_dict["p1_res_name"] not in STANDARD_AMINO_ACIDS or bond_dict["p2_res_name"] not in STANDARD_AMINO_ACIDS:
                    continue
                is_p1_main = bond_dict["p1_atom_name"] in MAIN_CHAIN_ATOMS
                is_p2_main = bond_dict["p2_atom_name"] in MAIN_CHAIN_ATOMS

                # 只要不是纯主链连接，就视为异肽键
                if not (is_p1_main and is_p2_main):
                    bond_type = 'isopeptide'
            
            if bond_type:
                bond_dict['bond_type'] = bond_type
                found_bonds.append(bond_dict)
    
    except Exception:
        return file_name, []
    
    return file_name, found_bonds


def run_analysis_and_generate_outputs(directory, output_basename):
    """主函数：执行带键长分析的完整流程。"""
    cif_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.cif')]
    if not cif_files:
        print(f"目录 '{directory}' 中无 .cif 文件。")
        return
    
    report_txt_path = output_basename + "_report.txt"
    data_csv_path = output_basename + "_data.csv"

    num_processes = cpu_count()
    print(f"将使用 {num_processes} 个核心进行最终分析 (键长阈值: {BOND_LENGTH_THRESHOLD} Å)...")

    all_results = []
    with Pool(processes=num_processes) as pool:
        with tqdm(total=len(cif_files), desc="分析文件(含键长)") as pbar:
            for file_name, bonds in pool.imap_unordered(analyze_cif_with_length, cif_files):
                if bonds:
                    all_results.append((file_name, bonds))
                pbar.update(1)

    # --- 1. 写入机器可读的 .csv 文件 (新增 distance 列) ---
    csv_fieldnames = [
        'file_name', 'bond_type', 'distance',
        'p1_chain', 'p1_res_name', 'p1_res_num', 'p1_atom_name',
        'p2_chain', 'p2_res_name', 'p2_res_num', 'p2_atom_name'
    ]
    try:
        with open(data_csv_path, 'w', newline='', encoding='utf-8') as f_csv:
            writer = csv.DictWriter(f_csv, fieldnames=csv_fieldnames)
            writer.writeheader()
            for file_name, bonds in sorted(all_results):
                for bond in bonds:
                    row = bond.copy()
                    row['file_name'] = file_name
                    writer.writerow(row)
        print(f"\n[成功] 机器可读数据已写入: {data_csv_path}")
    except IOError as e:
        print(f"\n[错误] 无法写入CSV文件: {e}")

    # --- 2. 生成并写入人类可读的 .txt 报告 (完整版) ---
    report_lines = []
    total_files = len(cif_files)
    
    stats = {'isopeptide': set(), 'disulfide': set()}
    bond_counts = {'isopeptide': 0, 'disulfide': 0}

    for file_name, bonds in all_results:
        for bond in bonds:
            b_type = bond['bond_type']
            if b_type in stats:
                stats[b_type].add(file_name)
                bond_counts[b_type] += 1
            
    report_lines.append("="*60)
    report_lines.append(f"--- 分析报告 (键长阈值: <={BOND_LENGTH_THRESHOLD} Å) ---")
    report_lines.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report_lines.append(f"分析目录: {os.path.abspath(directory)}")
    report_lines.append(f"总计分析文件数: {total_files}")
    report_lines.append("="*60)

    report_lines.append("\n--- 分析摘要 ---")
    report_lines.append(f"异肽键: {bond_counts['isopeptide']} 个 (分布于 {len(stats['isopeptide'])} 个蛋白质)")
    report_lines.append(f"二硫键: {bond_counts['disulfide']} 个 (分布于 {len(stats['disulfide'])} 个蛋白质)")
    
    report_lines.append("\n--- 详细信息 ---")
    if not all_results:
        report_lines.append("未在任何文件中检测到符合条件的化学键。")
    else:
        for file_name, bonds in sorted(all_results):
            report_lines.append(f"\n[文件: {file_name}]")
            bonds_by_type = {'isopeptide': [], 'disulfide': []}
            for bond in bonds:
                if bond['bond_type'] in bonds_by_type:
                    bonds_by_type[bond['bond_type']].append(bond)

            for b_type, b_list in bonds_by_type.items():
                if b_list:
                    type_map = {'isopeptide': '异肽键', 'disulfide': '二硫键'}
                    report_lines.append(f"  - {type_map[b_type]} ({len(b_list)} 个):")
                    for i, bond in enumerate(b_list):
                        p1 = f"{bond['p1_chain']}:{bond['p1_res_name']}{bond['p1_res_num']}:{bond['p1_atom_name']}"
                        p2 = f"{bond['p2_chain']}:{bond['p2_res_name']}{bond['p2_res_num']}:{bond['p2_atom_name']}"
                        report_lines.append(f"    {i+1}. {p1} <--> {p2} (键长: {bond['distance']:.2f} Å)")
    
    report_lines.append("\n" + "="*60)

    try:
        with open(report_txt_path, 'w', encoding='utf-8') as f_txt:
            f_txt.write("\n".join(report_lines))
        print(f"[成功] 人类可读报告已写入: {report_txt_path}")
    except IOError as e:
        print(f"[错误] 无法写入报告文件: {e}")
        
    # --- 3. 在控制台打印报告预览 ---
    print("\n--- 控制台报告预览 ---")
    print("\n".join(report_lines))
    print("\n分析流程全部完成！")

if __name__ == '__main__':
    dir = '/home/fit/lulei/WORK/xjt/Protein_design/CyclicPeptide/Dataset/ALL_MMCIF/CIF_ALL_DATASET'
    log_file = '/home/fit/lulei/WORK/xjt/Protein_design/CyclicPeptide/Dataset/ALL_MMCIF/link'

    run_analysis_and_generate_outputs(dir, log_file)




将使用 104 个核心进行最终分析 (键长阈值: 5.0 Å)...


分析文件(含键长):   0%|          | 92/195048 [00:16<9:34:56,  5.65it/s] 



KeyboardInterrupt: 

In [10]:
import csv
from collections import defaultdict
STANDARD_AMINO_ACIDS = {
    'ALA', 'ARG', 'ASN', 'ASP', 'CYS', 'GLU', 'GLN', 'GLY', 'HIS', 'ILE',
    'LEU', 'LYS', 'MET', 'PHE', 'PRO', 'SER', 'THR', 'TRW', 'TYR', 'VAL'
}
result_file = '/home/fit/lulei/WORK/xjt/Protein_design/RFdiffusion/mytest/config/inference/link.csv'
def analyze_atom_pairings(file_path):
    """
    读取CSV文件，分析每种氨基酸对(residue pair)中，出现次数最多的原子对(atom pair)及其平均距离。
    为消除冗余，每个文件内同类型的连接只统计一次。
    """

    item_set = set()

    with open(cleaned_chain_dir, 'r', encoding='utf-8') as f:
        # 逐行读取文件内容
        for line in f:
            # 1. line.strip() - 去除行首和行尾的空白字符（如换行符 \n）
            # 2. .split('\t') - 使用制表符 (\t) 分割字符串，得到一个包含各列元素的列表
            columns = line.strip().split('\t')
            
            # 遍历分割后的所有元素
            for item in columns:
                # 如果元素非空，则将其添加到集合中
                if item:
                    item_set.add(item)

    # 结构: { ('res1', 'res2'): {('atom1', 'atom2'): {'count': N, 'total_distance': X}} }
    pair_data = defaultdict(lambda: defaultdict(lambda: {'count': 0, 'total_distance': 0.0}))
    
    # 用于跟踪每个文件内已处理的连接，避免重复计数
    # 结构: { 'file_name': {('res1', 'res2', 'atom1', 'atom2')} }
    processed_links_in_file = defaultdict(set)

    try:
        with open(file_path, 'r', newline='', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                file_name = row['file_name']
                num1, num2 = row['p1_res_num'], row['p2_res_num']
                res1, res2 = row['p1_res_name'], row['p2_res_name']
                atom1, atom2 = row['p1_atom_name'], row['p2_atom_name']
                distance = float(row['distance'])
                pdb_id = row['file_name'].split('.')[0]
                chain1, chain2 = row['p1_chain'], row['p2_chain']
                # if f"{pdb_id}_{chain1}" not in item_set and f"{pdb_id}_{chain2}" not in item_set:
                #     continue
                if (res1,num1,atom1) == (res2,num2,atom2):
                    continue
                if res1 not in STANDARD_AMINO_ACIDS or res2 not in STANDARD_AMINO_ACIDS:
                    continue
                
                # 创建规范的氨基酸对和原子对（排序以保证唯一性）
                res_pair_sorted = tuple(sorted((res1, res2)))
                if res1 == res_pair_sorted[0] and res2 == res_pair_sorted[1]:
                    atom_pair_sorted = (atom1, atom2)
                    num_pair_sorted = (num1, num2)
                else:
                    atom_pair_sorted = (atom2, atom1)
                    num_pair_sorted = (num2, num1)
                
                # 创建一个唯一的连接标识符
                link_signature = (num_pair_sorted[0],num_pair_sorted[1],res_pair_sorted[0], res_pair_sorted[1], atom_pair_sorted[0], atom_pair_sorted[1])

                # 如果此连接类型已在该文件中处理过，则跳过
                if link_signature in processed_links_in_file[file_name]:
                    continue
                
                # 标记此连接类型为已处理
                processed_links_in_file[file_name].add(link_signature)

                pair_data[res_pair_sorted][atom_pair_sorted]['count'] += 1
                pair_data[res_pair_sorted][atom_pair_sorted]['total_distance'] += distance

    except FileNotFoundError:
        print(f"错误: 文件未找到 '{file_path}'")
        return
    except KeyError as e:
        print(f"错误: CSV文件中缺少必需的列: {e}")
        return
    except ValueError:
        print(f"错误: CSV文件中的 'distance' 列包含非数值。")
        return

    print("--- 每种氨基酸对的最常见原子配对方式 (已去重) ---")
    

    # 写入 result_file 结果文件
    try:
        with open(result_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['res1', 'res2', 'atom1', 'atom2', 'avg_distance','count'])
            
            sorted_res_pairs = sorted(pair_data.items())
            
            for res_pair, atom_pair_details in sorted_res_pairs:
                first_res_pair =True
                if not atom_pair_details:
                    continue

                # 根据 'count' 键查找最常见的原子对
                #most_common_atom_pair = max(atom_pair_details, key=lambda k: atom_pair_details[k]['count'])
                for key,val in atom_pair_details.items():
                    #data = atom_pair_details[most_common_atom_pair]
                    data = val
                    count = data['count']
                    total_distance = data['total_distance']
                    
                    # 过滤掉出现次数过少的结果
                    if count <= 3:
                        continue
                    if not first_res_pair and 'C' not in key and 'N' not in key:
                        continue

                    avg_distance = total_distance / count
                    
                    # 写入文件
                    writer.writerow([
                        res_pair[0], 
                        res_pair[1], 
                        key[0], 
                        key[1], 
                        f"{avg_distance:.2f}",
                        count,
                    ])
                    first_res_pair = False
                    # 控制台输出
                    total_occurrences = sum(d['count'] for d in atom_pair_details.values())
                    print(f"\n氨基酸对: {res_pair[0]} - {res_pair[1]} (总出现次数: {total_occurrences})")
                    print(f"  -> 最常见的原子配对: {res_pair[0]}[{key[0]}] - {res_pair[1]}[{key[1]}] (出现 {count} 次, 平均距离: {avg_distance:.2f} Å)")
        
        print(f"\n[成功] 分析结果已写入: {result_file}")

    except IOError as e:
        print(f"\n[错误] 无法写入结果文件: {e}")
    
    return pair_data
# 调用函数
file_path = '/home/fit/lulei/WORK/xjt/Protein_design/CyclicPeptide/Dataset/ALL_MMCIF/link3_data.csv'
cleaned_chain_dir = '/home/fit/lulei/WORK/xjt/Protein_design/CyclicPeptide/Dataset/ALL_MMCIF/tmp/cluster.tsv'
pair_res = analyze_atom_pairings(file_path)


--- 每种氨基酸对的最常见原子配对方式 (已去重) ---

氨基酸对: ALA - SER (总出现次数: 15)
  -> 最常见的原子配对: ALA[C] - SER[OG] (出现 15 次, 平均距离: 1.41 Å)

氨基酸对: ARG - ASP (总出现次数: 13)
  -> 最常见的原子配对: ARG[N] - ASP[CG] (出现 4 次, 平均距离: 1.34 Å)

氨基酸对: ARG - GLU (总出现次数: 24)
  -> 最常见的原子配对: ARG[NH2] - GLU[OE1] (出现 6 次, 平均距离: 1.32 Å)

氨基酸对: ASN - LYS (总出现次数: 131)
  -> 最常见的原子配对: ASN[CG] - LYS[NZ] (出现 124 次, 平均距离: 1.37 Å)

氨基酸对: ASN - SER (总出现次数: 16)
  -> 最常见的原子配对: ASN[CG] - SER[OG] (出现 16 次, 平均距离: 1.44 Å)

氨基酸对: ASP - CYS (总出现次数: 22)
  -> 最常见的原子配对: ASP[CB] - CYS[SG] (出现 8 次, 平均距离: 1.79 Å)

氨基酸对: ASP - CYS (总出现次数: 22)
  -> 最常见的原子配对: ASP[C] - CYS[SG] (出现 5 次, 平均距离: 1.89 Å)

氨基酸对: ASP - GLY (总出现次数: 12)
  -> 最常见的原子配对: ASP[CG] - GLY[N] (出现 12 次, 平均距离: 1.33 Å)

氨基酸对: ASP - LYS (总出现次数: 27)
  -> 最常见的原子配对: ASP[CG] - LYS[NZ] (出现 17 次, 平均距离: 1.31 Å)

氨基酸对: ASP - LYS (总出现次数: 27)
  -> 最常见的原子配对: ASP[C] - LYS[NZ] (出现 4 次, 平均距离: 1.44 Å)

氨基酸对: ASP - THR (总出现次数: 11)
  -> 最常见的原子配对: ASP[CG] - THR[OG1] (出现 10 次, 平均距离: 1.39 Å)

氨基酸对: CYS - CYS (总出现次数: 131

In [29]:
pair_res[('PHE','PHE')]

defaultdict(int,
            {('CG', 'CD1'): 4,
             ('CG', 'CG'): 4,
             ('CD1', 'CG'): 4,
             ('CD1', 'CE2'): 4,
             ('CD2', 'CZ'): 4,
             ('CD2', 'CD2'): 4,
             ('CD2', 'CE2'): 4,
             ('CE1', 'CD2'): 2,
             ('CE1', 'CE2'): 4,
             ('CE2', 'CD1'): 4,
             ('CE2', 'CE1'): 4,
             ('CE2', 'CD2'): 4,
             ('CZ', 'CD2'): 4,
             ('CD2', 'CE1'): 2})

In [16]:
from Bio.PDB import MMCIFParser, PDBParser, PDBIO, MMCIFIO, Select
cif_id = "6QSW.cif"
cif_path = os.path.join("/home/fit/lulei/WORK/xjt/Protein_design/CyclicPeptide/Dataset/ALL_MMCIF/CIF_ALL_DATASET", cif_id )
seq_records = []
# 加载 CIF 文件（自动处理 .gz 压缩）
try:
    parser = MMCIFParser(QUIET=True)
    if cif_path.endswith(".gz"):
        with gzip.open(cif_path, 'rt') as f:
            structure = parser.get_structure(cif_id, f)
    else:
        structure = parser.get_structure(cif_id, cif_path)
except Exception as e:
    print(f"[未知错误] 加载失败: {str(e)} 在{cif_path}")

# 取第一个模型（针对多模型 CIF 文件）
if len(structure) == 0:
    pass
model = structure[0]

# 遍历处理每个链
for chain in model:
    print(chain.id)

AAA
BBB
CCC
