In [1]:
# Cell A: Environment and dependencies
import os, sys, json, gzip, shutil, subprocess, textwrap, math, random, itertools
from pathlib import Path

def pip_install(pkgs):
    cmd = [sys.executable, "-m", "pip", "install"] + pkgs
    print("+", " ".join(cmd))
    subprocess.run(cmd, check=True)

# Required packages (pyrosetta is handled by conda env)
pip_install(["biopython", "pandas", "numpy", "scipy", "matplotlib", "tmtools"])

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats

from Bio.SeqUtils import seq1 as _seq1

# Safe 3-letter to 1-letter conversion
def three_to_one(resname):
    try:
        return _seq1(resname)
    except Exception:
        return 'X'

# Simple command runner

def run(cmd, env=None):
    if isinstance(cmd, (list, tuple)):
        printable = " ".join(map(str, cmd))
    else:
        printable = cmd
    print("+", printable)
    proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
    if proc.returncode != 0:
        raise RuntimeError(f"Command failed ({proc.returncode})\nSTDOUT:\n{proc.stdout}\nSTDERR:\n{proc.stderr}")
    return proc.stdout, proc.stderr

# Tool availability checks

def check_tool(name_or_path):
    if os.path.isabs(name_or_path) or os.path.sep in name_or_path:
        return os.path.exists(name_or_path)
    return shutil.which(name_or_path) is not None

# CONFIG
CONFIG = {
    "workdir": os.path.abspath("."),
    "outdir": os.path.abspath("outputs"),
    "inputs_dir": os.path.abspath("inputs"),
    "pdb_ids": ["6P8F", "6P8H", "2W9Z"],
    "chains": {
        "6P8F": {"CyclinD1": "A", "CDK4": "B", "p27": "C"},
        "6P8H": {"CyclinD1": "A", "CDK4": "B", "p21": "C"},
        "2W9Z": {"CyclinD1": "A", "CDK4": "B"},
    },
    "contact_cutoff": 4.5,
    "binder_len": 50,
    "n_designs": 20,
    "bindcraft_trim_on_oom": True,
    "trim_radius": 8.0,
    "identity_max": 0.20,
    "similarity_max": 0.35,
    "tm_score_max": 0.35,
    "dock_engine": "contact_proxy",  # "rosetta_cli" | "pyrosetta" | "contact_proxy"
    "decoys_fast": 25,
    "decoys_final": 200,
    "xla_mem_fraction": 0.7,
    "xla_disable_command_buffer": True,
    "xla_preallocate": False,
    "max_fast_dock": 200,
    "max_final_dock": 100,
    "max_L": 5,
    "max_LL": 1,
    "L_target": 0.08,
    "L_penalty_alpha": 10.0,
    "seed_topk": 10,
    "iters": 50,
    "proposals_per_iter": 20,
    "keep_global": 100,
    # BindCraft / Rosetta paths
    "bindcraft_script": os.path.abspath("bindcraft.py"),
    "bindcraft_advanced": os.path.abspath("settings_advanced/smoke_2stage.json"),
    "bindcraft_filters": os.path.abspath("settings_filters/peptide_relaxed_filters.json"),
    "rosetta_bin": os.environ.get("ROSETTA_BIN", ""),
    "rosetta_db": os.environ.get("ROSETTA_DB", ""),
    "needle_exe": os.environ.get("NEEDLE_EXE", "needle"),
    "tmalign_exe": os.environ.get("TMALIGN_EXE", "TM-align"),
    "pymol_exe": os.environ.get("PYMOL_EXE", "pymol"),
}

Path(CONFIG["outdir"]).mkdir(parents=True, exist_ok=True)
Path(CONFIG["inputs_dir"]).mkdir(parents=True, exist_ok=True)
print(json.dumps(CONFIG, indent=2))

# JAX/XLA memory controls for BindCraft
if CONFIG.get("xla_preallocate") is False:
    os.environ.setdefault("XLA_PYTHON_CLIENT_PREALLOCATE", "false")
if CONFIG.get("xla_mem_fraction"):
    os.environ.setdefault("XLA_CLIENT_MEM_FRACTION", str(CONFIG["xla_mem_fraction"]))
if CONFIG.get("xla_disable_command_buffer"):
    flags = os.environ.get("XLA_FLAGS", "")
    if "--xla_gpu_enable_command_buffer" not in flags:
        os.environ["XLA_FLAGS"] = (flags + " --xla_gpu_enable_command_buffer=").strip()



+ /home/xuchengjie/miniconda3/envs/BindCraft/bin/python -m pip install biopython pandas numpy scipy matplotlib tmtools
Looking in indexes: https://mirrors.aliyun.com/pypi/simple




{
  "workdir": "/home/xuchengjie/Program/BindCraft",
  "outdir": "/home/xuchengjie/Program/BindCraft/outputs",
  "inputs_dir": "/home/xuchengjie/Program/BindCraft/inputs",
  "pdb_ids": [
    "6P8F",
    "6P8H",
    "2W9Z"
  ],
  "chains": {
    "6P8F": {
      "CyclinD1": "A",
      "CDK4": "B",
      "p27": "C"
    },
    "6P8H": {
      "CyclinD1": "A",
      "CDK4": "B",
      "p21": "C"
    },
    "2W9Z": {
      "CyclinD1": "A",
      "CDK4": "B"
    }
  },
  "contact_cutoff": 4.5,
  "binder_len": 50,
  "n_designs": 20,
  "bindcraft_trim_on_oom": true,
  "trim_radius": 8.0,
  "identity_max": 0.2,
  "similarity_max": 0.35,
  "tm_score_max": 0.35,
  "dock_engine": "contact_proxy",
  "decoys_fast": 25,
  "decoys_final": 200,
  "xla_mem_fraction": 0.7,
  "xla_disable_command_buffer": true,
  "xla_preallocate": false,
  "max_fast_dock": 200,
  "max_final_dock": 100,
  "max_L": 5,
  "max_LL": 1,
  "L_target": 0.08,
  "L_penalty_alpha": 10.0,
  "seed_topk": 10,
  "iters": 50,
  "proposal

In [2]:
# Cell B: Download / read / clean PDB
from Bio.PDB import PDBParser, PDBIO, Select
import urllib.request

INPUTS = Path(CONFIG["inputs_dir"])
OUTDIR = Path(CONFIG["outdir"])
CLEAN_DIR = OUTDIR / "clean"
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

class ProteinSelect(Select):
    def __init__(self, keep_chains):
        self.keep_chains = set(keep_chains)
    def accept_chain(self, chain):
        return chain.id in self.keep_chains
    def accept_residue(self, residue):
        # Keep only standard amino acids (no water/ligands)
        hetflag = residue.id[0]
        return hetflag == " "


def download_pdb(pdb_id, dest_path):
    url = f"https://files.rcsb.org/download/{pdb_id}.pdb.gz"
    print("Downloading", url)
    gz_path = dest_path.with_suffix('.pdb.gz')
    if not gz_path.exists():
        urllib.request.urlretrieve(url, gz_path)
    # gunzip
    if not dest_path.exists():
        with gzip.open(gz_path, 'rb') as fin, open(dest_path, 'wb') as fout:
            fout.write(fin.read())
    return dest_path


def clean_pdb(pdb_path, out_path, keep_chains):
    parser = PDBParser(QUIET=True)
    structure = parser.get_structure("struct", pdb_path)
    io = PDBIO()
    io.set_structure(structure)
    io.save(str(out_path), select=ProteinSelect(keep_chains))
    return out_path

# Download and clean
pdb_6p8f = download_pdb("6P8F", INPUTS / "6P8F.pdb")
pdb_6p8h = download_pdb("6P8H", INPUTS / "6P8H.pdb")
pdb_2w9z = download_pdb("2W9Z", INPUTS / "2W9Z.pdb")

clean_6p8f = clean_pdb(pdb_6p8f, CLEAN_DIR / "6p8f_ABC.pdb", ["A","B","C"])
clean_6p8h = clean_pdb(pdb_6p8h, CLEAN_DIR / "6p8h_ABC.pdb", ["A","B","C"])
clean_2w9z = clean_pdb(pdb_2w9z, CLEAN_DIR / "2w9z_AB.pdb", ["A","B"])

print("Cleaned:", clean_6p8f, clean_6p8h, clean_2w9z)


Downloading https://files.rcsb.org/download/6P8F.pdb.gz
Downloading https://files.rcsb.org/download/6P8H.pdb.gz
Downloading https://files.rcsb.org/download/2W9Z.pdb.gz
Cleaned: /home/xuchengjie/Program/BindCraft/outputs/clean/6p8f_ABC.pdb /home/xuchengjie/Program/BindCraft/outputs/clean/6p8h_ABC.pdb /home/xuchengjie/Program/BindCraft/outputs/clean/2w9z_AB.pdb


In [3]:
# Cell C: Structure alignment + residue mapping (6P8F/6P8H -> 2W9Z)
from Bio.PDB import PDBParser, Superimposer
from Bio import pairwise2
from Bio.Align import substitution_matrices

ALN_DIR = OUTDIR / "aln"
ALN_DIR.mkdir(parents=True, exist_ok=True)

parser = PDBParser(QUIET=True)


def chain_residues(structure, chain_id):
    chain = structure[0][chain_id]
    residues = [r for r in chain if r.id[0] == " "]
    return residues


def residue_to_str(res):
    resseq = res.id[1]
    icode = res.id[2].strip()
    return f"{resseq}{icode}" if icode else str(resseq)


def get_sequence_and_residues(structure, chain_id):
    residues = chain_residues(structure, chain_id)
    seq = []
    res_ids = []
    for r in residues:
        try:
            aa = three_to_one(r.get_resname())
        except KeyError:
            aa = "X"
        seq.append(aa)
        res_ids.append(residue_to_str(r))
    return "".join(seq), res_ids, residues


def align_and_map(struct_a, chain_a, struct_b, chain_b):
    seq_a, ids_a, res_a = get_sequence_and_residues(struct_a, chain_a)
    seq_b, ids_b, res_b = get_sequence_and_residues(struct_b, chain_b)

    blosum62 = substitution_matrices.load("BLOSUM62")
    aln = pairwise2.align.globalds(seq_a, seq_b, blosum62, -10, -0.5, one_alignment_only=True)[0]
    aln_a, aln_b = aln.seqA, aln.seqB

    # Sequence-based mapping
    mapping = {}
    i = j = 0
    for aa, bb in zip(aln_a, aln_b):
        if aa != "-" and bb != "-":
            mapping[ids_a[i]] = ids_b[j]
            i += 1
            j += 1
        elif aa != "-" and bb == "-":
            i += 1
        elif aa == "-" and bb != "-":
            j += 1

    # Structural alignment (CA) for RMSD info
    ca_a = [r["CA"] for r in res_a if "CA" in r]
    ca_b = [r["CA"] for r in res_b if "CA" in r]
    n = min(len(ca_a), len(ca_b))
    sup = Superimposer()
    sup.set_atoms(ca_a[:n], ca_b[:n])
    rmsd = sup.rms

    return mapping, rmsd


def try_tmtools(struct_a, chain_a, struct_b, chain_b):
    try:
        from tmtools import tm_align
    except Exception:
        return None
    # Extract CA coords and sequences
    res_a = chain_residues(struct_a, chain_a)
    res_b = chain_residues(struct_b, chain_b)
    ca_a = np.array([r["CA"].get_coord() for r in res_a if "CA" in r], dtype=float)
    ca_b = np.array([r["CA"].get_coord() for r in res_b if "CA" in r], dtype=float)
    if len(ca_a) == 0 or len(ca_b) == 0:
        return None
    seq_a, _, _ = get_sequence_and_residues(struct_a, chain_a)
    seq_b, _, _ = get_sequence_and_residues(struct_b, chain_b)
    try:
        out = tm_align(ca_a, ca_b, seq_a, seq_b)
        return out.tm_score
    except Exception:
        return None

# Load structures
s_6p8f = parser.get_structure("6P8F", clean_6p8f)
s_6p8h = parser.get_structure("6P8H", clean_6p8h)
s_2w9z = parser.get_structure("2W9Z", clean_2w9z)

maps = {}
for pdb_id, struct in [("6P8F", s_6p8f), ("6P8H", s_6p8h)]:
    mapping = {}
    rmsd_info = {}
    for ch in ["A", "B"]:
        m, rmsd = align_and_map(struct, ch, s_2w9z, ch)
        mapping[ch] = m
        rmsd_info[ch] = rmsd
    maps[pdb_id] = mapping
    tm_a = try_tmtools(struct, "A", s_2w9z, "A")
    tm_b = try_tmtools(struct, "B", s_2w9z, "B")
    print(pdb_id, "RMSD A/B", rmsd_info, "TM A/B", tm_a, tm_b)

# Save mappings
with open(ALN_DIR / "map_6p8f_to_2w9z.json", "w") as f:
    json.dump(maps["6P8F"], f, indent=2)
with open(ALN_DIR / "map_6p8h_to_2w9z.json", "w") as f:
    json.dump(maps["6P8H"], f, indent=2)

print("Saved mapping JSONs in", ALN_DIR)




6P8F RMSD A/B {'A': 4.035795839976786, 'B': 20.335865594136237} TM A/B None None
6P8H RMSD A/B {'A': 6.26549914952364, 'B': 20.080579668259528} TM A/B None None
Saved mapping JSONs in /home/xuchengjie/Program/BindCraft/outputs/aln


In [4]:
# Cell D: Interaction site identification (p27/p21 separately)
from Bio.PDB import NeighborSearch

HOTSPOT_DIR = OUTDIR / "hotspots"
HOTSPOT_DIR.mkdir(parents=True, exist_ok=True)


def get_contacts(structure, receptor_chains, ligand_chain, cutoff):
    # Collect atoms
    receptor_atoms = []
    ligand_atoms = []
    for ch in receptor_chains:
        receptor_atoms += [a for a in structure[0][ch].get_atoms() if a.get_parent().id[0] == " "]
    ligand_atoms += [a for a in structure[0][ligand_chain].get_atoms() if a.get_parent().id[0] == " "]

    ns = NeighborSearch(receptor_atoms)
    rec_residues = set()
    lig_residues = set()

    for atom in ligand_atoms:
        close = ns.search(atom.coord, cutoff)
        if close:
            lig_residues.add(atom.get_parent())
            for a2 in close:
                rec_residues.add(a2.get_parent())

    def res_to_tag(res):
        ch = res.get_parent().id
        resseq = res.id[1]
        icode = res.id[2].strip()
        tag = f"{ch}{resseq}{icode}" if icode else f"{ch}{resseq}"
        return tag

    rec_tags = sorted({res_to_tag(r) for r in rec_residues})
    lig_tags = sorted({res_to_tag(r) for r in lig_residues})
    return rec_tags, lig_tags

# p27 contacts from 6P8F
rec_hotspots_p27, lig_iface_p27 = get_contacts(s_6p8f, ["A", "B"], "C", CONFIG["contact_cutoff"])

(HOTSPOT_DIR / "rec_hotspots_p27_6p8f.txt").write_text("\n".join(rec_hotspots_p27))
(HOTSPOT_DIR / "lig_iface_p27_6p8f.txt").write_text("\n".join(lig_iface_p27))

# p21 contacts from 6P8H
rec_hotspots_p21, lig_iface_p21 = get_contacts(s_6p8h, ["A", "B"], "C", CONFIG["contact_cutoff"])

(HOTSPOT_DIR / "rec_hotspots_p21_6p8h.txt").write_text("\n".join(rec_hotspots_p21))

print("Saved hotspot lists in", HOTSPOT_DIR)


Saved hotspot lists in /home/xuchengjie/Program/BindCraft/outputs/hotspots


In [5]:
# Cell E: Hotspot mapping to 2W9Z + union

with open(ALN_DIR / "map_6p8f_to_2w9z.json") as f:
    map_6p8f = json.load(f)
with open(ALN_DIR / "map_6p8h_to_2w9z.json") as f:
    map_6p8h = json.load(f)


def map_hotspots(hotspots, mapping):
    out = []
    for tag in hotspots:
        chain = tag[0]
        resid = tag[1:]
        if chain in mapping and resid in mapping[chain]:
            mapped = mapping[chain][resid]
            out.append(f"{chain}{mapped}")
    return sorted(set(out))

p27_mapped = map_hotspots(rec_hotspots_p27, map_6p8f)
p21_mapped = map_hotspots(rec_hotspots_p21, map_6p8h)
union_mapped = sorted(set(p27_mapped) | set(p21_mapped))

(HOTSPOT_DIR / "hotspots_p27_mapped_2w9z.txt").write_text("\n".join(p27_mapped))
(HOTSPOT_DIR / "hotspots_p21_mapped_2w9z.txt").write_text("\n".join(p21_mapped))
(HOTSPOT_DIR / "hotspots_union_2w9z.txt").write_text("\n".join(union_mapped))

print("Mapped hotspots saved.")


Mapped hotspots saved.


In [6]:
# Cell F: Run BindCraft on 2W9Z to generate 50 aa binders
BINDCRAFT_DIR = OUTDIR / "bindcraft"
BINDCRAFT_DIR.mkdir(parents=True, exist_ok=True)


def compress_ranges(nums):
    nums = sorted(set(nums))
    ranges = []
    if not nums:
        return ranges
    start = prev = nums[0]
    for n in nums[1:]:
        if n == prev + 1:
            prev = n
        else:
            ranges.append((start, prev))
            start = prev = n
    ranges.append((start, prev))
    return ranges


def hotspot_list_to_string(hotspot_tags):
    # Convert like A123, A124, B210 to "A123-124,B210"
    by_chain = {}
    for tag in hotspot_tags:
        ch = tag[0]
        resid = tag[1:]
        try:
            num = int(''.join([c for c in resid if c.isdigit()]))
        except Exception:
            continue
        by_chain.setdefault(ch, []).append(num)
    parts = []
    for ch, nums in by_chain.items():
        for start, end in compress_ranges(nums):
            if start == end:
                parts.append(f"{ch}{start}")
            else:
                parts.append(f"{ch}{start}-{end}")
    return ",".join(parts)


def trim_receptor_by_hotspots(pdb_path, hotspots_file, radius):
    from Bio.PDB import PDBParser, NeighborSearch, PDBIO, Select

    parser = PDBParser(QUIET=True)
    struct = parser.get_structure("rec", pdb_path)
    hotspot_tags = set(Path(hotspots_file).read_text().strip().split())

    if not hotspot_tags:
        print("WARNING: No hotspot tags found; using full receptor.")
        return pdb_path

    hotspot_atoms = []
    for ch in struct[0]:
        for res in ch:
            if res.id[0] != " ":
                continue
            tag = f"{ch.id}{res.id[1]}{res.id[2].strip()}" if res.id[2].strip() else f"{ch.id}{res.id[1]}"
            if tag in hotspot_tags:
                hotspot_atoms.extend(list(res.get_atoms()))

    if not hotspot_atoms:
        print("WARNING: No hotspot atoms found; using full receptor.")
        return pdb_path

    all_atoms = [a for a in struct.get_atoms() if a.get_parent().id[0] == " "]
    ns = NeighborSearch(all_atoms)

    keep_res = set()
    for hatom in hotspot_atoms:
        for a in ns.search(hatom.coord, radius):
            keep_res.add(a.get_parent())

    # always keep hotspot residues
    for ch in struct[0]:
        for res in ch:
            if res.id[0] != " ":
                continue
            tag = f"{ch.id}{res.id[1]}{res.id[2].strip()}" if res.id[2].strip() else f"{ch.id}{res.id[1]}"
            if tag in hotspot_tags:
                keep_res.add(res)

    class KeepSelect(Select):
        def accept_residue(self, residue):
            return residue in keep_res

    out_pdb = BINDCRAFT_DIR / f"trimmed_receptor_r{radius}.pdb"
    io = PDBIO()
    io.set_structure(struct)
    io.save(str(out_pdb), select=KeepSelect())
    return out_pdb


def run_bindcraft(target_pdb, hotspots_file, binder_len, n_designs):
    if not check_tool(CONFIG["bindcraft_script"]):
        print("WARNING: BindCraft script not found. Skipping binder generation.")
        (BINDCRAFT_DIR / "designs.fasta").write_text("")
        pd.DataFrame(columns=["design_id", "sequence"]).to_csv(BINDCRAFT_DIR / "designs_metadata.csv", index=False)
        return None

    # Warning for short binder length
    if binder_len < 60:
        print("WARNING: binder_len < 60. BindCraft defaults may be less efficient for 50 aa.")

    design_path = BINDCRAFT_DIR / "run"
    design_path.mkdir(parents=True, exist_ok=True)

    hotspot_tags = (Path(hotspots_file).read_text().strip().split())
    hotspot_string = hotspot_list_to_string(hotspot_tags)

    settings = {
        "design_path": str(design_path),
        "binder_name": "CDK4_CyclinD1",
        "starting_pdb": str(target_pdb),
        "chains": "A,B",
        "target_hotspot_residues": hotspot_string,
        "lengths": [binder_len, binder_len],
        "number_of_final_designs": n_designs,
    }

    settings_path = BINDCRAFT_DIR / "bindcraft_settings.json"
    settings_path.write_text(json.dumps(settings, indent=2))

    cmd = [sys.executable, CONFIG["bindcraft_script"],
           "--settings", str(settings_path),
           "--filters", CONFIG["bindcraft_filters"],
           "--advanced", CONFIG["bindcraft_advanced"]]
    try:
        run(cmd)
        return design_path
    except RuntimeError as e:
        msg = str(e)
        if CONFIG.get("bindcraft_trim_on_oom") and ("RESOURCE_EXHAUSTED" in msg or "out of memory" in msg or "CUDA_ERROR_OUT_OF_MEMORY" in msg):
            print("WARNING: BindCraft OOM. Retrying with trimmed receptor.")
            radii = [CONFIG["trim_radius"], 10.0, 8.0, 6.0]
            for radius in radii:
                trimmed_pdb = trim_receptor_by_hotspots(target_pdb, hotspots_file, radius)
                design_path_trim = BINDCRAFT_DIR / f"run_trimmed_r{radius}"
                design_path_trim.mkdir(parents=True, exist_ok=True)
                settings["design_path"] = str(design_path_trim)
                settings["starting_pdb"] = str(trimmed_pdb)
                settings_path_trim = BINDCRAFT_DIR / f"bindcraft_settings_trimmed_r{radius}.json"
                settings_path_trim.write_text(json.dumps(settings, indent=2))
                cmd = [sys.executable, CONFIG["bindcraft_script"],
                       "--settings", str(settings_path_trim),
                       "--filters", CONFIG["bindcraft_filters"],
                       "--advanced", CONFIG["bindcraft_advanced"]]
                try:
                    run(cmd)
                    return design_path_trim
                except RuntimeError as e2:
                    msg2 = str(e2)
                    if "RESOURCE_EXHAUSTED" in msg2 or "out of memory" in msg2 or "CUDA_ERROR_OUT_OF_MEMORY" in msg2:
                        print(f"OOM at trim radius {radius}. Trying smaller radius...")
                        continue
                    raise
        raise


def collect_bindcraft_outputs(design_path):
    # Try to assemble sequences and metadata
    fasta_path = BINDCRAFT_DIR / "designs.fasta"
    meta_path = BINDCRAFT_DIR / "designs_metadata.csv"

    if design_path is None or not Path(design_path).exists():
        fasta_path.write_text("")
        pd.DataFrame(columns=["design_id", "sequence"]).to_csv(meta_path, index=False)
        return fasta_path, meta_path, {}

    pose_map = {}
    # Search for PDBs in Accepted/Ranked and Accepted
    for sub in ["Accepted/Ranked", "Accepted", "MPNN", "Trajectory", "Trajectory/Relaxed", "Rejected"]:
        p = Path(design_path) / sub
        if p.exists():
            for pdb in p.glob("*.pdb"):
                stem = pdb.stem
                pose_map.setdefault(stem, str(pdb))

    sequences = []
    # Prefer mpnn_design_stats.csv
    csv_path = Path(design_path) / "mpnn_design_stats.csv"
    if csv_path.exists():
        df = pd.read_csv(csv_path)
        if "Sequence" in df.columns and not df.empty:
            for _, row in df.iterrows():
                sequences.append((row.get("Design", f"design_{len(sequences)+1}"), row["Sequence"]))
        df.to_csv(meta_path, index=False)

    if not sequences:
        # Try to derive from PDBs (sequence extraction)
        from Bio.PDB import PDBParser
        parser = PDBParser(QUIET=True)
        for stem, pdb in pose_map.items():
            try:
                struct = parser.get_structure(stem, pdb)
                # Infer binder chain as chain not in receptor A/B (fallback to shortest chain)
                binder_chain = None
                chain_lens = {}
                for ch in struct[0]:
                    clen = sum(1 for r in ch if r.id[0] == " ")
                    chain_lens[ch.id] = clen
                    if ch.id not in ("A", "B"):
                        binder_chain = ch.id
                if binder_chain is None and chain_lens:
                    binder_chain = min(chain_lens, key=chain_lens.get)
                if binder_chain is None:
                    continue
                seq = []
                for r in struct[0][binder_chain]:
                    if r.id[0] != " ":
                        continue
                    try:
                        seq.append(three_to_one(r.get_resname()))
                    except KeyError:
                        seq.append("X")
                sequences.append((stem, "".join(seq)))
            except Exception:
                continue
        pd.DataFrame(sequences, columns=["design_id", "sequence"]).to_csv(meta_path, index=False)

    # If still too few sequences, pull from other run directories (if any)
    if len(sequences) < 5:
        extra = {}
        for alt in sorted(BINDCRAFT_DIR.glob("run*")):
            if alt == Path(design_path):
                continue
            # Search for PDBs in alternate run
            for sub in ["Accepted/Ranked", "Accepted", "MPNN", "Trajectory", "Trajectory/Relaxed", "Rejected"]:
                p = alt / sub
                if p.exists():
                    for pdb in p.glob("*.pdb"):
                        extra.setdefault(pdb.stem, str(pdb))
        if extra:
            # extend pose_map for docking downstream
            for stem, pdb in extra.items():
                pose_map.setdefault(stem, pdb)
            from Bio.PDB import PDBParser
            parser = PDBParser(QUIET=True)
            for stem, pdb in extra.items():
                if any(stem == s[0] for s in sequences):
                    continue
                try:
                    struct = parser.get_structure(stem, pdb)
                    chain_lens = {ch.id: sum(1 for r in ch if r.id[0] == " ") for ch in struct[0]}
                    binder_chain = min(chain_lens, key=chain_lens.get) if chain_lens else None
                    if binder_chain is None:
                        continue
                    seq = []
                    for r in struct[0][binder_chain]:
                        if r.id[0] != " ":
                            continue
                        try:
                            seq.append(three_to_one(r.get_resname()))
                        except KeyError:
                            seq.append("X")
                    sequences.append((stem, "".join(seq)))
                except Exception:
                    continue
            pd.DataFrame(sequences, columns=["design_id", "sequence"]).to_csv(meta_path, index=False)


    # Write FASTA
    with open(fasta_path, "w") as f:
        for name, seq in sequences:
            f.write(f">{name}\n{seq}\n")

    return fasta_path, meta_path, pose_map

# Run BindCraft
bindcraft_design_path = run_bindcraft(clean_2w9z, HOTSPOT_DIR / "hotspots_union_2w9z.txt", CONFIG["binder_len"], CONFIG["n_designs"])

fasta_path, meta_path, pose_map = collect_bindcraft_outputs(bindcraft_design_path)
print("BindCraft outputs:", fasta_path, meta_path)


+ /home/xuchengjie/miniconda3/envs/BindCraft/bin/python /home/xuchengjie/Program/BindCraft/bindcraft.py --settings /home/xuchengjie/Program/BindCraft/outputs/bindcraft/bindcraft_settings.json --filters /home/xuchengjie/Program/BindCraft/settings_filters/peptide_relaxed_filters.json --advanced /home/xuchengjie/Program/BindCraft/settings_advanced/smoke_2stage.json


+ /home/xuchengjie/miniconda3/envs/BindCraft/bin/python /home/xuchengjie/Program/BindCraft/bindcraft.py --settings /home/xuchengjie/Program/BindCraft/outputs/bindcraft/bindcraft_settings_trimmed_r8.0.json --filters /home/xuchengjie/Program/BindCraft/settings_filters/peptide_relaxed_filters.json --advanced /home/xuchengjie/Program/BindCraft/settings_advanced/smoke_2stage.json


BindCraft outputs: /home/xuchengjie/Program/BindCraft/outputs/bindcraft/designs.fasta /home/xuchengjie/Program/BindCraft/outputs/bindcraft/designs_metadata.csv


In [7]:
# Cell G: p27 low-similarity filter (sequence + structural)
from Bio import pairwise2
from Bio.Align import substitution_matrices

FILTER_DIR = OUTDIR / "filter"
FILTER_DIR.mkdir(parents=True, exist_ok=True)

# Build p27 interface sequence from 6P8F chain C
seq_p27 = []
res_order = []
for res in s_6p8f[0]["C"]:
    if res.id[0] != " ":
        continue
    tag = f"C{res.id[1]}{res.id[2].strip()}" if res.id[2].strip() else f"C{res.id[1]}"
    if tag in lig_iface_p27:
        try:
            aa = three_to_one(res.get_resname())
        except Exception:
            aa = "X"
        seq_p27.append(aa)
        res_order.append(tag)

p27_iface_seq = "".join(seq_p27)
(FILTER_DIR / "p27_iface.fasta").write_text(f">p27_iface\n{p27_iface_seq}\n")


def score_alignment(seq1, seq2):
    # Global alignment with BLOSUM62
    blosum62 = substitution_matrices.load("BLOSUM62")
    aln = pairwise2.align.globalds(seq1, seq2, blosum62, -10, -0.5, one_alignment_only=True)[0]
    a, b = aln.seqA, aln.seqB
    matches = 0
    positives = 0
    aligned = 0
    for x, y in zip(a, b):
        if x == "-" or y == "-":
            continue
        aligned += 1
        if x == y:
            matches += 1
            positives += 1
        else:
            key = (x, y) if (x, y) in blosum62 else (y, x)
            if key in blosum62 and blosum62[key] > 0:
                positives += 1
    identity = matches / aligned if aligned else 0.0
    similarity = positives / aligned if aligned else 0.0
    return identity, similarity


def needle_score(seq1, seq2):
    # Use EMBOSS needle if available
    if not check_tool(CONFIG["needle_exe"]):
        return None
    tmp_dir = FILTER_DIR / "needle_tmp"
    tmp_dir.mkdir(exist_ok=True)
    f1 = tmp_dir / "seq1.fasta"
    f2 = tmp_dir / "seq2.fasta"
    out = tmp_dir / "needle.txt"
    f1.write_text(f">s1\n{seq1}\n")
    f2.write_text(f">s2\n{seq2}\n")
    cmd = [CONFIG["needle_exe"], "-asequence", str(f1), "-bsequence", str(f2), "-gapopen", "10", "-gapextend", "0.5", "-outfile", str(out), "-auto"]
    try:
        run(cmd)
    except Exception:
        return None
    # Parse identity/similarity
    identity = similarity = None
    for line in out.read_text().splitlines():
        if line.strip().startswith("# Identity:"):
            pct = line.split("(")[-1].split("%")[-2]
            identity = float(pct) / 100.0
        if line.strip().startswith("# Similarity:"):
            pct = line.split("(")[-1].split("%")[-2]
            similarity = float(pct) / 100.0
    if identity is None or similarity is None:
        return None
    return identity, similarity


def compute_tm_score(pdb1, pdb2):
    # Try tmtools python first
    try:
        from tmtools import tm_align
        from Bio.PDB import PDBParser
        parser = PDBParser(QUIET=True)
        s1 = parser.get_structure("s1", pdb1)
        s2 = parser.get_structure("s2", pdb2)
        # use first chain of each
        c1 = list(s1[0])[0]
        c2 = list(s2[0])[0]
        ca1 = np.array([r["CA"].get_coord() for r in c1 if "CA" in r])
        ca2 = np.array([r["CA"].get_coord() for r in c2 if "CA" in r])
        if len(ca1) == 0 or len(ca2) == 0:
            return None
        seq1 = "".join([three_to_one(r.get_resname()) for r in c1 if r.id[0] == " " and "CA" in r])
        seq2 = "".join([three_to_one(r.get_resname()) for r in c2 if r.id[0] == " " and "CA" in r])
        out = tm_align(ca1, ca2, seq1, seq2)
        return out.tm_score
    except Exception:
        pass
    # Try external TM-align
    if check_tool(CONFIG["tmalign_exe"]):
        cmd = [CONFIG["tmalign_exe"], pdb1, pdb2]
        try:
            stdout, _ = run(cmd)
            for line in stdout.splitlines():
                if line.strip().startswith("TM-score="):
                    try:
                        return float(line.split("=")[1].split()[0])
                    except Exception:
                        continue
        except Exception:
            return None
    return None

# Prepare p27 interface structure for TM-score
p27_iface_pdb = FILTER_DIR / "p27_iface.pdb"
if not p27_iface_pdb.exists():
    from Bio.PDB import PDBIO, Select
    class InterfaceSelect(Select):
        def accept_residue(self, residue):
            if residue.id[0] != " ":
                return False
            tag = f"C{residue.id[1]}{residue.id[2].strip()}" if residue.id[2].strip() else f"C{residue.id[1]}"
            return tag in lig_iface_p27
    io = PDBIO()
    io.set_structure(s_6p8f)
    io.save(str(p27_iface_pdb), select=InterfaceSelect())

# Load binder sequences
from Bio import SeqIO
binders = list(SeqIO.parse(fasta_path, "fasta"))

rows = []
for rec in binders:
    seq = str(rec.seq)
    # needle if available
    ns = needle_score(p27_iface_seq, seq)
    if ns is not None:
        identity, similarity = ns
        source = "needle"
    else:
        identity, similarity = score_alignment(p27_iface_seq, seq)
        source = "biopython"

    tm_score = None
    # Attempt TM-score if pose exists
    if rec.id in pose_map:
        tm_score = compute_tm_score(pose_map[rec.id], str(p27_iface_pdb))

    pass_seq = (identity <= CONFIG["identity_max"]) and (similarity <= CONFIG["similarity_max"])
    pass_tm = (tm_score is None) or (tm_score < CONFIG["tm_score_max"])
    passed = pass_seq and pass_tm

    rows.append({
        "binder_id": rec.id,
        "identity": identity,
        "similarity": similarity,
        "tm_score": tm_score,
        "pass_seq": pass_seq,
        "pass_tm": pass_tm,
        "passed": passed,
        "method": source,
        "sequence": seq,
    })

if not rows:
    sim_df = pd.DataFrame(columns=["binder_id","identity","similarity","tm_score","pass_seq","pass_tm","passed","method","sequence"])
else:
    sim_df = pd.DataFrame(rows)
sim_df.to_csv(FILTER_DIR / "similarity_metrics.csv", index=False)

kept = sim_df[sim_df["passed"]]
with open(FILTER_DIR / "kept.fasta", "w") as f:
    for _, row in kept.iterrows():
        f.write(f">{row['binder_id']}\n{row['sequence']}\n")

print("Kept", len(kept), "of", len(sim_df))


Kept 2 of 49


In [8]:
# Cell H: Docking + binding energy scoring (fast)
DOCK_DIR = OUTDIR / "docking"
DOCK_DIR.mkdir(parents=True, exist_ok=True)

_pyrosetta_inited = False

def ensure_pyrosetta():
    global _pyrosetta_inited
    if _pyrosetta_inited:
        return
    try:
        import pyrosetta
        pyrosetta.init('-mute all')
        _pyrosetta_inited = True
    except Exception as e:
        raise RuntimeError(f"PyRosetta init failed: {e}")


def rosetta_exec(name):
    if CONFIG["rosetta_bin"]:
        candidate = Path(CONFIG["rosetta_bin"]) / name
        if candidate.exists():
            return str(candidate)
    if check_tool(name):
        return name
    return None


def interface_score_pyrosetta(pose, partners):
    from pyrosetta import rosetta
    # If still centroid, use a centroid score proxy
    if not pose.is_fullatom():
        scorefxn = rosetta.core.scoring.ScoreFunctionFactory.create_score_function("score3")
        return scorefxn(pose)
    iam = rosetta.protocols.analysis.InterfaceAnalyzerMover(partners)
    iam.apply(pose)
    return iam.get_interface_dG()


def ensure_fullatom(pose):
    from pyrosetta import rosetta
    if not pose.is_fullatom():
        to_fa = rosetta.protocols.simple_moves.SwitchResidueTypeSetMover("fa_standard")
        to_fa.apply(pose)
    return pose


def infer_binder_chain_id(pose, receptor_chains=("A","B")):
    chain_lengths = {}
    for i in range(1, pose.num_chains()+1):
        ch = pose.pdb_info().chain(pose.chain_begin(i))
        chain_lengths[ch] = pose.chain_end(i) - pose.chain_begin(i) + 1
        if ch not in receptor_chains:
            return ch
    if chain_lengths:
        return min(chain_lengths, key=chain_lengths.get)
    return None


def get_partners(pose, binder_chain):
    rec = []
    for i in range(1, pose.num_chains()+1):
        ch = pose.pdb_info().chain(pose.chain_begin(i))
        if ch != binder_chain:
            rec.append(ch)
    return "".join(rec) + "_" + binder_chain


def contact_score(pdb_path, receptor_chains=("A","B"), cutoff=4.5):
    from Bio.PDB import PDBParser, NeighborSearch
    parser = PDBParser(QUIET=True)
    struct = parser.get_structure("dock", pdb_path)
    # infer binder chain by shortest if needed
    chain_lens = {ch.id: sum(1 for r in ch if r.id[0]==" ") for ch in struct[0]}
    binder_chain = None
    for ch in struct[0]:
        if ch.id not in receptor_chains:
            binder_chain = ch.id
            break
    if binder_chain is None and chain_lens:
        binder_chain = min(chain_lens, key=chain_lens.get)
    if binder_chain is None:
        return None
    rec_atoms = []
    for ch in receptor_chains:
        if ch in struct[0]:
            rec_atoms += [a for a in struct[0][ch].get_atoms() if a.get_parent().id[0] == " "]
    lig_atoms = [a for a in struct[0][binder_chain].get_atoms() if a.get_parent().id[0] == " "]
    if not rec_atoms or not lig_atoms:
        return None
    ns = NeighborSearch(rec_atoms)
    count = 0
    for a in lig_atoms:
        if ns.search(a.coord, cutoff):
            count += 1
    return -float(count)


def dock_pose_pyrosetta(pose, partners, nstruct):
    from pyrosetta import rosetta
    from pyrosetta.rosetta.protocols.docking import DockingProtocol, setup_foldtree
    from pyrosetta.rosetta.utility import vector1_bool

    # Setup foldtree
    try:
        v = vector1_bool()
        v.append(True)
        setup_foldtree(pose, partners, v)
    except Exception:
        pass

    dock = DockingProtocol()
    dock.set_partners(partners)

    best_score = None
    best_pose = None
    for _ in range(nstruct):
        p = pose.clone()
        ensure_fullatom(p)
        if p.is_fullatom():
            try:
                dock.apply(p)
            except Exception:
                # fallback: no docking move
                pass
        sc = interface_score_pyrosetta(p, partners)
        if best_score is None or sc < best_score:
            best_score = sc
            best_pose = p.clone()
    return best_score, best_pose


# Load kept binders
kept_fasta = FILTER_DIR / "kept.fasta"
kept_binders = list(SeqIO.parse(kept_fasta, "fasta"))

rows = []
for rec in kept_binders[:CONFIG["max_fast_dock"]]:
    if rec.id not in pose_map:
        continue
    if CONFIG["dock_engine"] == "contact_proxy":
        score = contact_score(pose_map[rec.id], cutoff=CONFIG["contact_cutoff"])
        best_pose_path = pose_map[rec.id]
    elif CONFIG["dock_engine"] == "rosetta_cli":
        # placeholder for external Rosetta if installed
        score = None
        best_pose_path = pose_map[rec.id]
    else:
        ensure_pyrosetta()
        import pyrosetta
        pose = pyrosetta.pose_from_pdb(pose_map[rec.id])
        ensure_fullatom(pose)
        binder_chain = infer_binder_chain_id(pose)
        if binder_chain is None:
            continue
        partners = get_partners(pose, binder_chain)
        score, best_pose = dock_pose_pyrosetta(pose, partners, CONFIG["decoys_fast"])
        best_pose_path = str(DOCK_DIR / f"{rec.id}_best_fast.pdb")
        if best_pose is not None:
            best_pose.dump_pdb(best_pose_path)
    rows.append({"binder_id": rec.id, "docking_score": score, "best_pose_path": best_pose_path})

if not rows:
    fast_df = pd.DataFrame(columns=["binder_id","docking_score","best_pose_path"])
else:
    fast_df = pd.DataFrame(rows)
fast_df.to_csv(DOCK_DIR / "docking_scores_fast.csv", index=False)
print("Fast docking scores saved.")


Fast docking scores saved.


In [9]:
# Cell I: Docking score vs sequence features correlation
ANALYSIS_DIR = OUTDIR / "analysis"
ANALYSIS_DIR.mkdir(parents=True, exist_ok=True)

# Load sequences and scores
try:
    fast_df = pd.read_csv(DOCK_DIR / "docking_scores_fast.csv")
except pd.errors.EmptyDataError:
    fast_df = pd.DataFrame(columns=["binder_id", "docking_score", "best_pose_path"])

seq_df = sim_df[["binder_id", "sequence"]].drop_duplicates()
merged = fast_df.merge(seq_df, on="binder_id", how="left")

KD = {
    'A':1.8,'C':2.5,'D':-3.5,'E':-3.5,'F':2.8,'G':-0.4,'H':-3.2,'I':4.5,
    'K':-3.9,'L':3.8,'M':1.9,'N':-3.5,'P':-1.6,'Q':-3.5,'R':-4.5,'S':-0.8,
    'T':-0.7,'V':4.2,'W':-0.9,'Y':-1.3
}


def seq_features(seq):
    seq = seq or ""
    n = len(seq) if seq else 1
    count_L = seq.count("L")
    count_LL = sum(1 for i in range(len(seq)-1) if seq[i:i+2] == "LL")
    frac_L = count_L / n
    hyd = sum(KD.get(a, 0.0) for a in seq) / n
    charge = (seq.count('K') + seq.count('R') + 0.1*seq.count('H')) - (seq.count('D') + seq.count('E'))
    frac_G = seq.count('G') / n
    frac_P = seq.count('P') / n
    frac_aromatic = (seq.count('F') + seq.count('W') + seq.count('Y')) / n
    return {
        "frac_L": frac_L,
        "count_L": count_L,
        "count_LL": count_LL,
        "hydropathy": hyd,
        "net_charge": charge,
        "frac_G": frac_G,
        "frac_P": frac_P,
        "frac_aromatic": frac_aromatic,
    }

if fast_df.empty:
    # Create empty outputs + placeholder plots
    pd.DataFrame().to_csv(ANALYSIS_DIR / "pearson_corr.csv")
    pd.DataFrame().to_csv(ANALYSIS_DIR / "spearman_corr.csv")
    empty_feat = pd.DataFrame(columns=["binder_id","docking_score","frac_L","count_L","count_LL","hydropathy","net_charge","frac_G","frac_P","frac_aromatic"])
    empty_feat.to_csv(ANALYSIS_DIR / "seq_score_correlations.csv", index=False)

    for name in ["docking_vs_fracL.png", "feature_correlation_heatmap.png"]:
        plt.figure(figsize=(4,3))
        plt.text(0.5, 0.5, "No data", ha="center", va="center")
        plt.axis("off")
        plt.tight_layout()
        plt.savefig(ANALYSIS_DIR / name, dpi=200)
        plt.close()
    print("No docking scores; correlation analysis skipped.")
else:
    feature_rows = []
    for _, row in merged.iterrows():
        feats = seq_features(row["sequence"])
        feats.update({"binder_id": row["binder_id"], "docking_score": row["docking_score"]})
        feature_rows.append(feats)

    feat_df = pd.DataFrame(feature_rows)

    # Correlations (numeric only)
    num_df = feat_df.select_dtypes(include=[np.number])
    pearson = num_df.corr(method='pearson')
    spearman = num_df.corr(method='spearman')
    pearson.to_csv(ANALYSIS_DIR / "pearson_corr.csv")
    spearman.to_csv(ANALYSIS_DIR / "spearman_corr.csv")

    # Plot docking_score vs frac_L
    plt.figure(figsize=(4,3))
    plt.scatter(feat_df["frac_L"], feat_df["docking_score"], s=20)
    plt.xlabel("frac_L")
    plt.ylabel("docking_score")
    plt.tight_layout()
    plt.savefig(ANALYSIS_DIR / "docking_vs_fracL.png", dpi=200)
    plt.close()

    # Heatmap
    plt.figure(figsize=(6,5))
    plt.imshow(spearman.values, cmap='coolwarm', vmin=-1, vmax=1)
    plt.xticks(range(len(spearman.columns)), spearman.columns, rotation=90, fontsize=8)
    plt.yticks(range(len(spearman.index)), spearman.index, fontsize=8)
    plt.colorbar(label='Spearman')
    plt.tight_layout()
    plt.savefig(ANALYSIS_DIR / "feature_correlation_heatmap.png", dpi=200)
    plt.close()

    feat_df.to_csv(ANALYSIS_DIR / "seq_score_correlations.csv", index=False)
    print("Correlation outputs saved.")


Correlation outputs saved.


In [10]:
# Cell J: Leu constraints + effective_score

try:
    feat_df = pd.read_csv(ANALYSIS_DIR / "seq_score_correlations.csv")
except pd.errors.EmptyDataError:
    feat_df = pd.DataFrame(columns=["binder_id","docking_score","frac_L","count_L","count_LL","hydropathy","net_charge","frac_G","frac_P","frac_aromatic"])

if feat_df.empty:
    seed_top = pd.DataFrame(columns=["binder_id","docking_score","effective_score"])
else:
    # Hard constraints
    valid = feat_df[(feat_df["count_L"] <= CONFIG["max_L"]) & (feat_df["count_LL"] <= CONFIG["max_LL"])].copy()

    # Effective score with soft penalty
    valid["effective_score"] = valid["docking_score"] + CONFIG["L_penalty_alpha"] * np.maximum(0, valid["frac_L"] - CONFIG["L_target"])

    valid = valid.sort_values("effective_score")
    seed_top = valid.head(CONFIG["seed_topk"]).copy()

seed_top.to_csv(ANALYSIS_DIR / "seed_top10.csv", index=False)
print("Selected", len(seed_top), "seed binders")


Selected 2 seed binders


In [11]:
# Cell K: Iterative mutation optimization (Top10 -> Global Top100)
MUT_DIR = OUTDIR / "mutation"
MUT_DIR.mkdir(parents=True, exist_ok=True)

amino_acids = list("ACDEFGHIKLMNPQRSTVWY")

# PyRosetta helpers

def mutate_and_score(base_pdb, new_seq, receptor_chains=("A","B")):
    ensure_pyrosetta()
    import pyrosetta
    from pyrosetta import rosetta

    pose = pyrosetta.pose_from_pdb(base_pdb)
    ensure_fullatom(pose)
    binder_chain = infer_binder_chain_id(pose, receptor_chains)
    if binder_chain is None:
        return None
    partners = get_partners(pose, binder_chain)

    # Identify binder chain indices
    chain_index = None
    for i in range(1, pose.num_chains()+1):
        ch = pose.pdb_info().chain(pose.chain_begin(i))
        if ch == binder_chain:
            chain_index = i
            break
    if chain_index is None:
        return None
    start = pose.chain_begin(chain_index)
    end = pose.chain_end(chain_index)
    if len(new_seq) != (end - start + 1):
        return None

    # Mutate residues
    for i, aa in enumerate(new_seq):
        resi = start + i
        if pose.residue(resi).name1() != aa:
            mut = rosetta.protocols.simple_moves.MutateResidue(resi, aa)
            mut.apply(pose)

    # Repack binder sidechains only
    scorefxn = rosetta.core.scoring.get_score_function()
    tf = rosetta.core.pack.task.TaskFactory()
    tf.push_back(rosetta.core.pack.task.operation.InitializeFromCommandline())
    tf.push_back(rosetta.core.pack.task.operation.RestrictToRepacking())
    binder_sel = rosetta.core.select.residue_selector.ChainSelector(binder_chain)
    not_binder = rosetta.core.select.residue_selector.NotResidueSelector(binder_sel)
    tf.push_back(rosetta.core.pack.task.operation.OperateOnResidueSubset(
        rosetta.core.pack.task.operation.PreventRepackingRLT(), not_binder))

    packer = rosetta.protocols.minimization_packing.PackRotamersMover(scorefxn)
    packer.task_factory(tf)
    packer.apply(pose)

    # Interface score
    dG = interface_score_pyrosetta(pose, partners)
    return dG

# Seed sequences
if CONFIG["dock_engine"] != "pyrosetta" or seed_top.empty:
    pd.DataFrame(columns=["sequence","effective_score","seed_id","base_pose_path"]).to_csv(MUT_DIR / "top100_scores_fast.csv", index=False)
    (MUT_DIR / "top100.fasta").write_text("")
    pd.DataFrame(columns=["seed_id","iter","sequence","docking_score","effective_score","accepted","base_pose_path"]).to_csv(MUT_DIR / "mutation_trace.csv", index=False)
    print("No seed binders; mutation skipped.")
else:
    seed_ids = seed_top["binder_id"].tolist()
    seed_scores = {row["binder_id"]: row["docking_score"] for _, row in seed_top.iterrows()}
    seed_seq_map = {row["binder_id"]: row.get("sequence", "") for _, row in merged.iterrows()}
    seed_pose_map = {row["binder_id"]: row.get("best_pose_path", "") for _, row in fast_df.iterrows()}

    trace_rows = []
    all_candidates = {}

    for seed_id in seed_ids:
        current_seq = seed_seq_map.get(seed_id, "")
        base_pdb = seed_pose_map.get(seed_id)
        if not current_seq or not base_pdb:
            continue

        # Base score from docking
        base_score = seed_scores.get(seed_id, 0.0)

        for it in range(CONFIG["iters"]):
            props = []
            for _ in range(CONFIG["proposals_per_iter"]*2):
                i = random.randrange(len(current_seq))
                aa = random.choice(amino_acids)
                if aa == 'L':
                    continue  # avoid introducing new Leu by default
                new_seq = current_seq[:i] + aa + current_seq[i+1:]
                props.append(new_seq)
                if len(props) >= CONFIG["proposals_per_iter"]:
                    break

            for pseq in props:
                feats = seq_features(pseq)
                if feats["count_L"] > CONFIG["max_L"] or feats["count_LL"] > CONFIG["max_LL"]:
                    continue
                dG = mutate_and_score(base_pdb, pseq)
                if dG is None:
                    continue
                eff = dG + CONFIG["L_penalty_alpha"] * max(0, feats["frac_L"] - CONFIG["L_target"])
                trace_rows.append({
                    "seed_id": seed_id,
                    "iter": it,
                    "sequence": pseq,
                    "docking_score": dG,
                    "effective_score": eff,
                    "accepted": False,
                    "base_pose_path": base_pdb,
                })
                if eff < base_score:
                    current_seq = pseq
                    base_score = eff
                    trace_rows[-1]["accepted"] = True
                    all_candidates[pseq] = (eff, seed_id, base_pdb)

        # keep final sequence
        all_candidates[current_seq] = (base_score, seed_id, base_pdb)

    # Build Top100
    cand_rows = []
    for seq, (eff, seed_id, base_pdb) in all_candidates.items():
        cand_rows.append({"sequence": seq, "effective_score": eff, "seed_id": seed_id, "base_pose_path": base_pdb})

    if cand_rows:
        cand_df = pd.DataFrame(cand_rows).sort_values("effective_score").drop_duplicates("sequence")
    else:
        cand_df = pd.DataFrame(columns=["sequence","effective_score","seed_id","base_pose_path"])

    cand_df.head(CONFIG["keep_global"]).to_csv(MUT_DIR / "top100_scores_fast.csv", index=False)

    with open(MUT_DIR / "top100.fasta", "w") as f:
        for i, row in cand_df.head(CONFIG["keep_global"]).iterrows():
            f.write(f">mut_{i}\n{row['sequence']}\n")

    pd.DataFrame(trace_rows).to_csv(MUT_DIR / "mutation_trace.csv", index=False)
    print("Mutation optimization completed.")


No seed binders; mutation skipped.


In [12]:
# Cell L: Top100 refined docking (high sampling)
FINAL_DIR = OUTDIR / "final"
FINAL_DIR.mkdir(parents=True, exist_ok=True)

try:
    mut_df = pd.read_csv(MUT_DIR / "top100_scores_fast.csv")
except pd.errors.EmptyDataError:
    mut_df = pd.DataFrame(columns=["sequence","effective_score","seed_id","base_pose_path"])

if mut_df.empty:
    # Fallback: use fast docking scores if mutation/docking skipped
    try:
        fast_df = pd.read_csv(DOCK_DIR / "docking_scores_fast.csv")
    except pd.errors.EmptyDataError:
        fast_df = pd.DataFrame(columns=["binder_id","docking_score","best_pose_path"])

    if not fast_df.empty:
        seq_df = sim_df[["binder_id","sequence"]].drop_duplicates()
        final_df = fast_df.merge(seq_df, on="binder_id", how="left")
        final_df = final_df.rename(columns={"docking_score": "final_docking_score"})
        if "effective_score" not in final_df.columns:
            final_df["effective_score"] = np.nan
        final_df.to_csv(FINAL_DIR / "final_scores.csv", index=False)
        print("Final docking fallback: using fast docking scores.")
    else:
        pd.DataFrame(columns=["binder_id","final_docking_score","best_pose_path","sequence","effective_score"]).to_csv(FINAL_DIR / "final_scores.csv", index=False)
        print("No candidates for final docking.")
else:
    rows = []
    if CONFIG["dock_engine"] == "contact_proxy":
        for idx, row in mut_df.head(CONFIG["max_final_dock"]).iterrows():
            base_pdb = row.get("base_pose_path", "")
            if not base_pdb or not os.path.exists(base_pdb):
                continue
            score = contact_score(base_pdb, cutoff=CONFIG["contact_cutoff"])
            rows.append({
                "binder_id": f"mut_{idx}",
                "final_docking_score": score,
                "best_pose_path": base_pdb,
                "sequence": row.get("sequence", ""),
                "effective_score": row.get("effective_score", None),
            })
    else:
        for idx, row in mut_df.head(CONFIG["max_final_dock"]).iterrows():
            seq = row.get("sequence", "")
            base_pdb = row.get("base_pose_path", "")
            if not base_pdb or not os.path.exists(base_pdb):
                continue

            ensure_pyrosetta()
            import pyrosetta
            pose = pyrosetta.pose_from_pdb(base_pdb)
            ensure_fullatom(pose)
            binder_chain = infer_binder_chain_id(pose)
            if binder_chain is None:
                continue
            partners = get_partners(pose, binder_chain)
            score, best_pose = dock_pose_pyrosetta(pose, partners, CONFIG["decoys_final"])
            best_pose_path = str(FINAL_DIR / f"{idx}_best_final.pdb")
            if best_pose is not None:
                best_pose.dump_pdb(best_pose_path)
            rows.append({
                "binder_id": f"mut_{idx}",
                "final_docking_score": score,
                "best_pose_path": best_pose_path,
                "sequence": seq,
                "effective_score": row.get("effective_score", None),
            })

    final_df = pd.DataFrame(rows)
    final_df.to_csv(FINAL_DIR / "final_scores.csv", index=False)
    print("Final docking completed.")


Final docking fallback: using fast docking scores.


In [13]:
# Cell M: Top10 interaction visualization (PPI)
VIZ_DIR = OUTDIR / "viz"
(VIZ_DIR / "top10_interactions").mkdir(parents=True, exist_ok=True)
(VIZ_DIR / "top10_pymol").mkdir(parents=True, exist_ok=True)
(VIZ_DIR / "top10_figures").mkdir(parents=True, exist_ok=True)

try:
    final_df = pd.read_csv(FINAL_DIR / "final_scores.csv")
except pd.errors.EmptyDataError:
    final_df = pd.DataFrame(columns=["binder_id","best_pose_path"])


def compute_contacts_from_pdb(pdb_path, receptor_chains=("A","B"), cutoff=4.5):
    parser = PDBParser(QUIET=True)
    structure = parser.get_structure("cplx", pdb_path)
    # Infer binder chain
    binder_chain = None
    for ch in structure[0]:
        if ch.id not in receptor_chains:
            binder_chain = ch.id
            break
    if binder_chain is None:
        # fallback: shortest chain
        chain_lens = {ch.id: sum(1 for r in ch if r.id[0]==" ") for ch in structure[0]}
        if chain_lens:
            binder_chain = min(chain_lens, key=chain_lens.get)
    if binder_chain is None:
        return [], [], []

    rec_atoms = []
    for ch in receptor_chains:
        if ch in structure[0]:
            rec_atoms += [a for a in structure[0][ch].get_atoms() if a.get_parent().id[0] == " "]
    lig_atoms = [a for a in structure[0][binder_chain].get_atoms() if a.get_parent().id[0] == " "]

    ns = NeighborSearch(rec_atoms)
    contacts = []
    rec_res = set()
    lig_res = set()
    for a in lig_atoms:
        close = ns.search(a.coord, cutoff)
        for b in close:
            ra = b.get_parent()
            la = a.get_parent()
            rec_res.add((ra.get_parent().id, ra.id[1]))
            lig_res.add((la.get_parent().id, la.id[1]))
            contacts.append((
                (ra.get_parent().id, ra.id[1], b.get_name()),
                (la.get_parent().id, la.id[1], a.get_name()),
                float(a - b)
            ))
    # sort by distance
    contacts = sorted(contacts, key=lambda x: x[2])
    return sorted(rec_res), sorted(lig_res), contacts[:20]


def write_pymol_script(pdb_path, out_pml, receptor_chains=("A","B"), contacts=None):
    # Simple PyMOL script for interface visualization
    script = [f"load {pdb_path}", "remove solvent", "hide everything"]
    script.append(f"select receptor, chain {'+'.join(receptor_chains)}")
    script.append(f"select binder, not (chain {'+'.join(receptor_chains)})")
    script.append("show cartoon, receptor")
    script.append("show cartoon, binder")
    script.append("color cyan, receptor")
    script.append("color orange, binder")
    script.append("set cartoon_transparency, 0.2, receptor")
    script.append("set cartoon_transparency, 0.0, binder")

    # Draw distances for top contacts
    if contacts:
        for i, (rinfo, linfo, dist) in enumerate(contacts):
            rch, rres, ratom = rinfo
            lch, lres, latom = linfo
            script.append(f"distance d{i}, (chain {rch} and resi {rres} and name {ratom}), (chain {lch} and resi {lres} and name {latom})")
    script.append(f"png {str(Path(out_pml).with_suffix('.png'))}, dpi=300, ray=1")

    Path(out_pml).write_text("\n".join(script) + "\n")

if final_df.empty:
    print("No final docking results; skipping visualization.")
else:
    # Make top10
    for _, row in final_df.head(10).iterrows():
        pdb_path = row.get("best_pose_path")
        if not pdb_path or not os.path.exists(str(pdb_path)):
            continue
        rec_res, lig_res, contacts = compute_contacts_from_pdb(pdb_path, cutoff=CONFIG["contact_cutoff"])
        out_json = VIZ_DIR / "top10_interactions" / f"{row['binder_id']}.json"
        out_json.write_text(json.dumps({
            "receptor_residues": rec_res,
            "binder_residues": lig_res,
            "top_contacts": contacts,
            "pdb_path": pdb_path
        }, indent=2))

        # PyMOL script
        out_pml = VIZ_DIR / "top10_pymol" / f"{row['binder_id']}.pml"
        write_pymol_script(pdb_path, out_pml, contacts=contacts)

        # Optional PNG via PyMOL
        fig_path = VIZ_DIR / "top10_figures" / f"{row['binder_id']}.png"
        if check_tool(CONFIG["pymol_exe"]):
            cmd = [CONFIG["pymol_exe"], "-cq", str(out_pml)]
            try:
                run(cmd)
            except Exception:
                pass

        # Fallback: simple contact-distance plot
        if not fig_path.exists():
            plt.figure(figsize=(4,3))
            if contacts:
                dists = [c[2] for c in contacts]
                plt.plot(range(1, len(dists)+1), dists, marker='o')
                plt.xlabel("Contact rank")
                plt.ylabel("Distance (A)")
            else:
                plt.text(0.5, 0.5, "No contacts", ha="center", va="center")
                plt.axis("off")
            plt.tight_layout()
            plt.savefig(fig_path, dpi=200)
            plt.close()

    print("Visualization assets prepared.")


+ pymol -cq /home/xuchengjie/Program/BindCraft/outputs/viz/top10_pymol/CDK4_CyclinD1_l50_s961754.pml


+ pymol -cq /home/xuchengjie/Program/BindCraft/outputs/viz/top10_pymol/CDK4_CyclinD1_l50_s511797.pml


Visualization assets prepared.


In [14]:
# Summary cell: Final report
REPORT = OUTDIR / "REPORT.md"

try:
    final_df = pd.read_csv(FINAL_DIR / "final_scores.csv")
except pd.errors.EmptyDataError:
    final_df = pd.DataFrame(columns=["binder_id","final_docking_score","effective_score","sequence"])

# Top10 summary table
summary_cols = ["binder_id", "final_docking_score", "effective_score", "sequence"]
if not final_df.empty:
    print(final_df.head(10)[summary_cols])
else:
    print("No final docking scores available.")

# Write report
report_lines = []
report_lines.append("# CDK4-CyclinD1 Binder Pipeline Report")
report_lines.append("")
report_lines.append("## Parameters")
report_lines.append("```json")
report_lines.append(json.dumps(CONFIG, indent=2))
report_lines.append("```")
report_lines.append("")
report_lines.append("## Outputs")
report_lines.append(f"- Clean PDBs: {CLEAN_DIR}")
report_lines.append(f"- Hotspots: {HOTSPOT_DIR}")
report_lines.append(f"- BindCraft: {BINDCRAFT_DIR}")
report_lines.append(f"- Filter: {FILTER_DIR}")
report_lines.append(f"- Docking: {DOCK_DIR}")
report_lines.append(f"- Analysis: {ANALYSIS_DIR}")
report_lines.append(f"- Mutation: {MUT_DIR}")
report_lines.append(f"- Final: {FINAL_DIR}")
report_lines.append(f"- Viz: {VIZ_DIR}")

notes = []
if "smoke_2stage" in str(CONFIG.get("bindcraft_advanced", "")):
    notes.append("- BindCraft used smoke_2stage settings for resource-limited execution.")
if CONFIG.get("bindcraft_trim_on_oom"):
    notes.append(f"- Target trimmed around hotspots (radius={CONFIG.get('trim_radius')}) on OOM fallback.")
if CONFIG.get("dock_engine") == "contact_proxy":
    notes.append("- Docking used contact-count proxy scoring (PyRosetta disabled due to stability).")
if notes:
    report_lines.append("")
    report_lines.append("## Notes")
    report_lines.extend(notes)

if final_df.empty:
    report_lines.append("")
    report_lines.append("## Results")
    report_lines.append("- No final docking results were produced.")

REPORT.write_text("\n".join(report_lines))

# Print output tree (key files)
key_files = [
    CLEAN_DIR / "6p8f_ABC.pdb",
    CLEAN_DIR / "6p8h_ABC.pdb",
    CLEAN_DIR / "2w9z_AB.pdb",
    HOTSPOT_DIR / "hotspots_union_2w9z.txt",
    BINDCRAFT_DIR / "designs.fasta",
    FILTER_DIR / "similarity_metrics.csv",
    FILTER_DIR / "kept.fasta",
    DOCK_DIR / "docking_scores_fast.csv",
    ANALYSIS_DIR / "seq_score_correlations.csv",
    MUT_DIR / "mutation_trace.csv",
    MUT_DIR / "top100.fasta",
    FINAL_DIR / "final_scores.csv",
    REPORT,
]
print("\nKey outputs:")
for p in key_files:
    print("-", p)


                   binder_id  final_docking_score  effective_score  \
0  CDK4_CyclinD1_l50_s961754               -905.0              NaN   
1  CDK4_CyclinD1_l50_s511797               -834.0              NaN   

                                            sequence  
0  MQRVKEVRRQIRNYRETIDGIGIWMTRDKFMDRVWEIMVEVFTRME...  
1  PKLSDFDRHMDRMRTPAQVRESNAHMSKYAKRHFTPKEVKQWEAPD...  

Key outputs:
- /home/xuchengjie/Program/BindCraft/outputs/clean/6p8f_ABC.pdb
- /home/xuchengjie/Program/BindCraft/outputs/clean/6p8h_ABC.pdb
- /home/xuchengjie/Program/BindCraft/outputs/clean/2w9z_AB.pdb
- /home/xuchengjie/Program/BindCraft/outputs/hotspots/hotspots_union_2w9z.txt
- /home/xuchengjie/Program/BindCraft/outputs/bindcraft/designs.fasta
- /home/xuchengjie/Program/BindCraft/outputs/filter/similarity_metrics.csv
- /home/xuchengjie/Program/BindCraft/outputs/filter/kept.fasta
- /home/xuchengjie/Program/BindCraft/outputs/docking/docking_scores_fast.csv
- /home/xuchengjie/Program/BindCraft/outputs/analysis/seq