In [5]:
import os
import tempfile
import pandas as pd
import numpy as np
from Bio.PDB import MMCIFParser, PDBIO
import pyrosetta
from pyrosetta import init, pose_from_pdb
import datetime
import multiprocessing as mp

# 1) PyRosetta를 한 번만 초기화
if not hasattr(pyrosetta, "_already_init"):
    init("-mute all")
    pyrosetta._already_init = True
def CDRH3_Bfactors(pose):    

    pose_i1 = pyrosetta.rosetta.protocols.antibody.AntibodyInfo(pose)
    cdr_h3_i_first = pyrosetta.rosetta.protocols.antibody.AntibodyInfo.get_CDR_start(pose_i1,pyrosetta.rosetta.protocols.antibody.CDRNameEnum.h3,pose)

    cdr_h3_i_last = pyrosetta.rosetta.protocols.antibody.AntibodyInfo.get_CDR_end(pose_i1,pyrosetta.rosetta.protocols.antibody.CDRNameEnum.h3,pose)
    h3_plddt_array = [pose.pdb_info().temperature(i,1) for i in range (cdr_h3_i_first+1, cdr_h3_i_last+1)]  
    return h3_plddt_array

def CDRL3_Bfactors(pose):    

    pose_i1 = pyrosetta.rosetta.protocols.antibody.AntibodyInfo(pose)
    cdr_l3_i_first = pyrosetta.rosetta.protocols.antibody.AntibodyInfo.get_CDR_start(pose_i1,pyrosetta.rosetta.protocols.antibody.CDRNameEnum.l3,pose)

    cdr_l3_i_last = pyrosetta.rosetta.protocols.antibody.AntibodyInfo.get_CDR_end(pose_i1,pyrosetta.rosetta.protocols.antibody.CDRNameEnum.l3,pose)
    l3_plddt_array = [pose.pdb_info().temperature(i,1) for i in range (cdr_l3_i_first+1, cdr_l3_i_last+1)]  
    return l3_plddt_array
    
def convert_cif_to_pdb(cif_path: str, pdb_out: str):
    """mmCIF 파일을 읽어서 PDB 형식으로 저장"""
    parser = MMCIFParser(QUIET=True)
    structure = parser.get_structure("X", cif_path)
    io = PDBIO()
    io.set_structure(structure)
    io.save(pdb_out)

def compute_h3_l3(path: str):
    """
    입력 경로(path)가 .cif이면 임시 PDB로 변환한 뒤,
    CDR-H3/L3 평균 pLDDT를 계산해 반환.
    """
    # .cif 파일 여부 확인
    is_cif = path.lower().endswith(".cif")
    if is_cif:
        tmp = tempfile.NamedTemporaryFile(suffix=".pdb", delete=False)
        tmp.close()
        convert_cif_to_pdb(path, tmp.name)
        pdb_path = tmp.name
    else:
        pdb_path = path

    # PyRosetta로 구조 읽기
    pose = pose_from_pdb(pdb_path)

    # CDR-H3/L3 B-factor(=pLDDT) 리스트 추출
    h3 = CDRH3_Bfactors(pose)
    l3 = CDRL3_Bfactors(pose)

    # 임시 PDB 파일 삭제
    if is_cif:
        os.remove(pdb_path)

    # 평균값 반환 (값이 없으면 NaN)
    return (float(np.mean(h3)) if h3 else np.nan,
            float(np.mean(l3)) if l3 else np.nan)

def worker(args):
    idx, row = args
    # 'model_path' 컬럼이 있으면 그 경로를, 없으면 'pdb' 컬럼 경로를 사용
    path = row.model_path if "model_path" in row.index and pd.notnull(row.model_path) else row.pdb
    h3_mean, l3_mean = compute_h3_l3(path)
    return idx, h3_mean, l3_mean

if __name__ == "__main__":
    df_main = pd.read_csv("final_data_with_new_rosetta_metrics_20250521.csv")
    df = df_main.iloc[:10].copy()  # .copy() 로 뷰 경고 방지
    inputs = list(df.iterrows())

    # 1) 사용할 CPU 코어 수 지정
    num_cpus = 4  # 원하시는 개수로 변경
    with mp.Pool(processes=num_cpus) as pool:
        results = pool.map(worker, inputs)

    # 2) 계산 결과 기록
    for idx, h3, l3 in results:
        df.at[idx, "plddt_H3"] = h3
        df.at[idx, "plddt_L3"] = l3

    # 3) 타임스탬프 생성 및 파일명에 반영
    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    out_fname = f"final_with_H3_L3_plddt_{ts}.csv"
    df.to_csv(out_fname, index=False)
    print(f"완료: {out_fname} 파일로 저장되었습니다.")

완료: final_with_H3_L3_plddt_20250522_030920.csv 파일로 저장되었습니다.
완료: final_with_H3_L3_plddt_20250522_030920.csv 파일로 저장되었습니다.
