# ProteinMCP — Nanobody Design Workflow

Design nanobody CDR regions using BoltzGen with optimized cysteine filtering for single-domain antibodies (VHH).

| Component | Description |
|-----------|-------------|
| **BoltzGen** | Generative model for nanobody CDR loop design with structure prediction |
| **Async Jobs** | GPU-accelerated design with asynchronous job submission and monitoring |
| **Quality Metrics** | pTM, iPTM, pAE, H-bonds, delta SASA, cysteine filtering |

**Prerequisites:** Docker (with GPU support), Claude Code CLI, ProteinMCP installed locally.

**Links:** [GitHub](https://github.com/charlesxu90/ProteinMCP) · [BoltzGen](https://github.com/jwohlwend/boltzgen) · [Boltz2](https://github.com/jwohlwend/boltz)

---

## Job configs

In [None]:
# ── User Configuration ──
TARGET_NAME = "penguinpox"
TARGET_CIF_NAME = "9bkq-assembly2.cif"  # Target CIF file name
TARGET_CHAIN = "B"                       # Chain to design nanobody against
NUM_DESIGNS = 10                          # Number of nanobody designs to generate
BUDGET = 2                                # Computational budget (higher = more diverse)

ANTHROPIC_API_KEY = ""  # Get one at https://console.anthropic.com/
CLAUDE_MODEL = "claude-haiku-4-5-20251001"

## Import utility and define functions

In [None]:
import os
import subprocess
import json
import select

# ---------- Streaming command runner ----------
def run_cmd(cmd, cwd=None):
    """Run a shell command and stream stdout/stderr line-by-line in real time."""
    proc = subprocess.Popen(
        cmd, shell=True, cwd=cwd,
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
        bufsize=1, text=True,
    )
    for line in proc.stdout:
        print(line, end="", flush=True)
    proc.wait()
    if proc.returncode != 0:
        print(f"\n\u26a0\ufe0f  Command exited with code {proc.returncode}")
    return proc.returncode

# ---------- Claude streaming helper ----------
def _display_claude_line(line):
    """Parse a single stream-json line from Claude CLI and print progress."""
    if not line.strip():
        return
    try:
        data = json.loads(line)
        msg_type = data.get('type', '')
        subtype = data.get('subtype', '')

        if msg_type == 'system':
            if subtype == 'init':
                session_id = data.get('session_id', '')[:8]
                print(f"  \U0001f916 Session started: {session_id}...", flush=True)
            elif subtype != 'transcript':
                print(f"  \u2699\ufe0f  System: {subtype}", flush=True)

        elif msg_type == 'assistant':
            message = data.get('message', {})
            for block in message.get('content', []):
                block_type = block.get('type', '')
                if block_type == 'thinking':
                    text = block.get('thinking', '')[:100]
                    print(f"  \U0001f4ad Thinking: {text}...", flush=True)
                elif block_type == 'text':
                    lines = block.get('text', '').strip().split('\n')
                    for tl in lines[:5]:
                        if tl.strip():
                            print(f"  {tl}", flush=True)
                    if len(lines) > 5:
                        print(f"  ... ({len(lines) - 5} more lines)", flush=True)
                elif block_type == 'tool_use':
                    tool_name = block.get('name', 'unknown')
                    tool_input = block.get('input', {})
                    if tool_name == 'Bash':
                        print(f"  \U0001f527 Bash: {tool_input.get('command', '')[:80]}", flush=True)
                    elif tool_name in ('Read', 'Write', 'Edit'):
                        print(f"  \U0001f4d6 {tool_name}: {tool_input.get('file_path', '')}", flush=True)
                    elif tool_name.startswith('mcp__'):
                        print(f"  \U0001f50c MCP: {tool_name}", flush=True)
                    else:
                        print(f"  \U0001f527 {tool_name}", flush=True)

        elif msg_type == 'user':
            for block in data.get('message', {}).get('content', []):
                if block.get('type') == 'tool_result':
                    if block.get('is_error', False):
                        err = block.get('content', '')
                        err = err[:100] if isinstance(err, str) else str(err)[:100]
                        print(f"  \u274c Error: {err}", flush=True)
                    else:
                        content = block.get('content', '')
                        if isinstance(content, str) and content.strip():
                            first = content.strip().split('\n')[0][:80]
                            if first:
                                print(f"  \u2705 Result: {first}", flush=True)
                        else:
                            print(f"  \u2705 Done", flush=True)

        elif msg_type == 'result':
            if subtype == 'success':
                print(f"  \u2705 Completed successfully", flush=True)
            elif subtype == 'error':
                print(f"  \u274c Error: {data.get('error', 'Unknown')}", flush=True)

    except json.JSONDecodeError:
        if line.strip():
            print(f"  {line}", flush=True)


def run_claude(prompt, allowed_tools=None, cwd=None):
    """Run Claude CLI with real-time streaming output.

    Args:
        prompt: The prompt text to send to Claude (passed via stdin).
        allowed_tools: Comma-separated tool names, e.g. "Bash,Read,Write".
        cwd: Working directory for the claude process.

    Returns:
        Process return code (0 = success).
    """
    cmd = [
        "claude",
        "--model", CLAUDE_MODEL,
        "-p", "-",
        "--output-format", "stream-json",
        "--verbose",
        "--dangerously-skip-permissions",
    ]
    if allowed_tools:
        cmd += ["--allowedTools", allowed_tools]

    print(f"  \U0001f916 Claude model: {CLAUDE_MODEL}")
    print(f"  \U0001f4cb Tools: {allowed_tools or 'all'}")
    print(f"  " + "-" * 58)

    proc = subprocess.Popen(
        cmd, cwd=cwd,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True, bufsize=1,
    )
    proc.stdin.write(prompt)
    proc.stdin.close()

    while True:
        if proc.poll() is not None:
            # Drain remaining output
            for line in (proc.stdout.read() or '').split('\n'):
                _display_claude_line(line)
            for line in (proc.stderr.read() or '').split('\n'):
                if line.strip():
                    print(f"  \u2699\ufe0f  {line}", flush=True)
            break

        try:
            readable, _, _ = select.select([proc.stdout, proc.stderr], [], [], 0.1)
        except (ValueError, OSError):
            break

        for stream in readable:
            line = stream.readline()
            if line:
                if stream == proc.stdout:
                    _display_claude_line(line.rstrip('\n'))
                else:
                    if line.strip():
                        print(f"  \u2699\ufe0f  {line.rstrip()}", flush=True)

    rc = proc.wait()
    print(f"  " + "-" * 58)
    if rc != 0:
        print(f"  \u26a0\ufe0f  Claude exited with code {rc}")
    return rc

## Setup and verify the environment

In [None]:
import shutil

# ---------- Paths ----------
# Auto-detect REPO_DIR: walk up from notebook location to find project root
_nb_dir = os.path.abspath("")
if os.path.basename(_nb_dir) == "notebooks":
    REPO_DIR = os.path.dirname(_nb_dir)
else:
    REPO_DIR = _nb_dir

# ---------- Load API key from .env if not set ----------
if not ANTHROPIC_API_KEY:
    _env_file = os.path.join(REPO_DIR, ".env")
    if os.path.exists(_env_file):
        with open(_env_file) as f:
            for line in f:
                line = line.strip()
                if line.startswith("ANTHROPIC_API_KEY="):
                    ANTHROPIC_API_KEY = line.split("=", 1)[1].strip().strip("\"'")
                    break
        if ANTHROPIC_API_KEY:
            print(f"Loaded ANTHROPIC_API_KEY from {_env_file}")

if not ANTHROPIC_API_KEY:
    raise ValueError("ANTHROPIC_API_KEY is required. Set it above or add to .env in REPO_DIR.")
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY

# ---------- Data paths ----------
EXAMPLE_DIR = os.path.join(REPO_DIR, "examples", "case3_nanobody_design")
DATA_DIR    = os.path.join(REPO_DIR, "data", TARGET_NAME)
RESULTS_DIR = os.path.join(REPO_DIR, "results", f"{TARGET_NAME}_nanobody")
TARGET_CIF  = os.path.join(DATA_DIR, TARGET_CIF_NAME)
SCAFFOLD_DIR = os.path.join(DATA_DIR, "nanobody_scaffolds")

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(os.path.join(RESULTS_DIR, "designs"), exist_ok=True)
os.makedirs(os.path.join(RESULTS_DIR, "logs"), exist_ok=True)
os.makedirs(SCAFFOLD_DIR, exist_ok=True)

# Copy example CIF into DATA_DIR
src_cif = os.path.join(EXAMPLE_DIR, TARGET_CIF_NAME)
if os.path.exists(src_cif) and not os.path.exists(TARGET_CIF):
    shutil.copy2(src_cif, TARGET_CIF)
    print(f"Copied {src_cif} -> {TARGET_CIF}")

# Copy example nanobody scaffolds into DATA_DIR
src_scaffold_dir = os.path.join(EXAMPLE_DIR, "nanobody_scaffolds")
if os.path.isdir(src_scaffold_dir):
    for fname in os.listdir(src_scaffold_dir):
        src = os.path.join(src_scaffold_dir, fname)
        dst = os.path.join(SCAFFOLD_DIR, fname)
        if os.path.isfile(src) and not os.path.exists(dst):
            shutil.copy2(src, dst)
    scaffold_files = [f for f in os.listdir(SCAFFOLD_DIR) if f.endswith(".yaml")]
    print(f"Copied {len(scaffold_files)} scaffold YAML files to {SCAFFOLD_DIR}")

assert os.path.exists(TARGET_CIF), f"Target CIF not found: {TARGET_CIF}"

# List scaffold files for config generation
SCAFFOLD_YAMLS = sorted(
    [os.path.join(SCAFFOLD_DIR, f) for f in os.listdir(SCAFFOLD_DIR) if f.endswith(".yaml")]
)

print(f"\nCLAUDE_MODEL     : {CLAUDE_MODEL}")
print(f"TARGET_NAME      : {TARGET_NAME}")
print(f"TARGET_CIF       : {TARGET_CIF}")
print(f"TARGET_CHAIN     : {TARGET_CHAIN}")
print(f"NUM_DESIGNS      : {NUM_DESIGNS}")
print(f"BUDGET           : {BUDGET}")
print(f"REPO_DIR         : {REPO_DIR}")
print(f"DATA_DIR         : {DATA_DIR}")
print(f"RESULTS_DIR      : {RESULTS_DIR}")
print(f"SCAFFOLD_YAMLS   : {[os.path.basename(f) for f in SCAFFOLD_YAMLS]}")

## Install & Register MCPs

In [None]:
import time
_t0 = time.time()

# Install ProteinMCP if not already present
if run_cmd("which pmcp") != 0:
    run_cmd(f"pip install -e {REPO_DIR}")
    run_cmd(f"pip install -r {REPO_DIR}/requirements.txt")
else:
    print("ProteinMCP already installed.")

# Install Claude Code if not already present
if run_cmd("which claude") != 0:
    run_cmd("npm install -g @anthropic-ai/claude-code")
else:
    print("Claude Code already installed.")

# Verify Docker is available (required for boltzgen_mcp)
if run_cmd("docker --version") != 0:
    print("WARNING: Docker not found. boltzgen_mcp requires Docker with GPU support.")
else:
    print("Docker found.")

# Set API key for Claude Code
os.environ["ANTHROPIC_API_KEY"] = ANTHROPIC_API_KEY
print(f"\nProteinMCP & Claude Code ready.")
print(f"ANTHROPIC_API_KEY set (ends with ...{ANTHROPIC_API_KEY[-4:]})")
print(f"Elapsed: {time.time() - _t0:.1f}s")

In [None]:
import time
_t0 = time.time()

# Install and register boltzgen_mcp (Docker-based, may pull large image)
print("Installing boltzgen_mcp (this may take a while for the Docker image pull)...")
run_cmd("pmcp install boltzgen_mcp", cwd=REPO_DIR)

# Verify MCP status
print(f"\n{'='*50}")
run_cmd("pmcp status", cwd=REPO_DIR)
print(f"\nElapsed: {time.time() - _t0:.1f}s")

## Step 1 — Prepare BoltzGen Configuration

In [None]:
import time
_t0 = time.time()

# Build scaffold path list for the prompt
scaffold_list = "\n".join(f"        - {p}" for p in SCAFFOLD_YAMLS)

prompt = f"""\
I want to design nanobodies targeting the protein structure at {TARGET_CIF}, \
chain {TARGET_CHAIN}. Please create a BoltzGen configuration file at \
{RESULTS_DIR}/config.yaml.

The configuration should use this exact YAML structure:

entities:
  - file:
      path: {TARGET_CIF}
      include:
        - chain:
            id: {TARGET_CHAIN}

  - file:
      path:
{scaffold_list}

Write this YAML content to {RESULTS_DIR}/config.yaml.
Please convert relative paths to absolute paths before writing.
"""

run_claude(
    prompt,
    allowed_tools="Bash,Read,Write",
    cwd=REPO_DIR,
)

# Verify config was created
config_path = os.path.join(RESULTS_DIR, "config.yaml")
if os.path.exists(config_path):
    with open(config_path) as f:
        print(f"\nConfig generated: {config_path}")
        print(f.read())
else:
    print(f"\nWARNING: Config file not found at {config_path}")

print(f"Elapsed: {time.time() - _t0:.1f}s")

## Step 2 — Validate Configuration

In [None]:
import time
_t0 = time.time()

prompt = f"""\
Can you validate the BoltzGen configuration at {RESULTS_DIR}/config.yaml \
using the boltzgen_mcp server? Please use verbose mode for detailed output.
Please convert relative paths to absolute paths before calling the MCP server.
"""

run_claude(
    prompt,
    allowed_tools="mcp__boltzgen_mcp__validate_config,Bash,Read",
    cwd=REPO_DIR,
)

print(f"\nElapsed: {time.time() - _t0:.1f}s")

## Step 3 — Submit Nanobody Design Job

In [None]:
import time, re
_t0 = time.time()

prompt = f"""\
Can you submit a nanobody design job using BoltzGen with the configuration \
at {RESULTS_DIR}/config.yaml? Use the nanobody-anything protocol with \
{NUM_DESIGNS} designs and budget {BUDGET}. Save the outputs to \
{RESULTS_DIR}/designs/.
Please convert relative paths to absolute paths before calling the MCP server.
After submitting, print the job_id so I can monitor progress.
"""

# Capture Claude output to extract job_id
proc = subprocess.Popen(
    ["claude", "--model", CLAUDE_MODEL, "-p", "-",
     "--output-format", "stream-json", "--verbose",
     "--dangerously-skip-permissions",
     "--allowedTools", "mcp__boltzgen_mcp__submit_generic_boltzgen,Bash,Read,Write"],
    cwd=REPO_DIR,
    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    text=True, bufsize=1,
)
proc.stdin.write(prompt)
proc.stdin.close()

full_output = ""
for line in proc.stdout:
    full_output += line
    _display_claude_line(line.rstrip('\n'))
proc.wait()

# Try to extract job_id from output
job_id = None
for match in re.finditer(r'job_id["\']?\s*[:=]\s*["\']?([a-zA-Z0-9_-]+)', full_output):
    job_id = match.group(1)
if not job_id:
    # Try UUID pattern
    for match in re.finditer(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', full_output):
        job_id = match.group(0)

if job_id:
    print(f"\nCaptured job_id: {job_id}")
else:
    print("\nWARNING: Could not extract job_id from output.")
    print("You can manually set it: job_id = 'your-job-id-here'")

print(f"Elapsed: {time.time() - _t0:.1f}s")

## Step 4 — Monitor Job Progress

In [None]:
import time
_t0 = time.time()

if not job_id:
    print("No job_id set. Please set it manually: job_id = 'your-job-id'")
else:
    prompt = f"""\
Can you check the status of my BoltzGen nanobody design job with ID {job_id}? \
Also show me the recent log output.
"""

    run_claude(
        prompt,
        allowed_tools="mcp__boltzgen_mcp__get_job_status,mcp__boltzgen_mcp__get_job_log,Bash,Read",
        cwd=REPO_DIR,
    )

print(f"\nElapsed: {time.time() - _t0:.1f}s")

## Step 5 — Retrieve Results

In [None]:
import time
_t0 = time.time()

if not job_id:
    print("No job_id set. Please set it manually: job_id = 'your-job-id'")
else:
    prompt = f"""\
Can you get the results of my completed BoltzGen nanobody design job with ID {job_id}? \
List all output files in {RESULTS_DIR}/designs/ and print any available \
design quality metrics (pTM, iPTM, pAE, H-bonds).
"""

    run_claude(
        prompt,
        allowed_tools="mcp__boltzgen_mcp__get_job_result,Bash,Read",
        cwd=REPO_DIR,
    )

# List output files
designs_dir = os.path.join(RESULTS_DIR, "designs")
if os.path.isdir(designs_dir):
    pdb_files = [f for f in os.listdir(designs_dir) if f.endswith(".pdb")]
    print(f"\nDesign PDB outputs ({len(pdb_files)} files): {sorted(pdb_files)}")

# Print metrics if available
for name in ["all_designs_metrics.csv", "design_metrics.csv", "metrics.csv"]:
    for search_dir in [
        os.path.join(designs_dir, "final_ranked_designs"),
        designs_dir,
        RESULTS_DIR,
    ]:
        csv_path = os.path.join(search_dir, name)
        if os.path.exists(csv_path):
            import pandas as pd
            df = pd.read_csv(csv_path)
            print(f"\nMetrics from {name}:")
            print(df.to_string(index=False))
            break

print(f"\nElapsed: {time.time() - _t0:.1f}s")

## Step 6 — Visualize Results

In [None]:
import time
_t0 = time.time()

VIZ_SCRIPT = os.path.join(REPO_DIR, "workflow-skills", "scripts", "nanobody_design_viz.py")

# Install viz deps if needed
run_cmd("pip install -q matplotlib seaborn scipy Pillow pandas")

# Generate separate figures + merged summary
run_cmd(f"python {VIZ_SCRIPT} {RESULTS_DIR}")

# Display figures inline
from IPython.display import display, Image

# Try merged summary first
figures_dir = os.path.join(RESULTS_DIR, "figures")
summary_png = os.path.join(figures_dir, "nanobody_design_summary.png")
if os.path.exists(summary_png):
    print("\nMerged summary figure:")
    display(Image(filename=summary_png, width=900))

# Also display individual figures
fig_names = [
    "nanobody_design_quality_score.png",
    "nanobody_design_structure_quality.png",
    "nanobody_design_normalized_heatmap.png",
    "nanobody_design_statistics_table.png",
    "nanobody_design_quality_boxplot.png",
    "nanobody_design_interface_metrics.png",
    "nanobody_design_top5_designs.png",
    "nanobody_design_correlation.png",
]
for fname in fig_names:
    fpath = os.path.join(figures_dir, fname)
    if os.path.exists(fpath):
        print(f"\n{fname}:")
        display(Image(filename=fpath, width=500))

print(f"\nElapsed: {time.time() - _t0:.1f}s")

## Step 7 — Download Results

In [None]:
import os

# ---- Summary table ----
metrics_found = False
for name in ["all_designs_metrics.csv", "design_metrics.csv", "metrics.csv"]:
    for search_dir in [
        os.path.join(RESULTS_DIR, "designs", "final_ranked_designs"),
        os.path.join(RESULTS_DIR, "designs"),
        RESULTS_DIR,
    ]:
        csv_path = os.path.join(search_dir, name)
        if os.path.exists(csv_path):
            import pandas as pd
            df = pd.read_csv(csv_path)
            print("=" * 60)
            print("NANOBODY DESIGN RESULTS SUMMARY")
            print("=" * 60)
            print(f"Total designs: {len(df)}")
            if 'pass_filters' in df.columns:
                n_passed = df['pass_filters'].sum()
                print(f"Passed filters: {n_passed} / {len(df)}")
            print(f"\nMetrics ({name}):")
            print(df.to_string(index=False))
            print("=" * 60)
            metrics_found = True
            break
    if metrics_found:
        break

if not metrics_found:
    print("No metrics CSV found. Check RESULTS_DIR for output files.")
    print(f"Contents of {RESULTS_DIR}:")
    for item in sorted(os.listdir(RESULTS_DIR)):
        print(f"  {item}")

# ---- Zip results ----
zip_name = f"{TARGET_NAME}_nanobody_results"
zip_path = os.path.join(REPO_DIR, f"{zip_name}.zip")
run_cmd(f'cd "{os.path.dirname(RESULTS_DIR)}" && zip -r "{zip_path}" "{os.path.basename(RESULTS_DIR)}"')
print(f"\nResults available at: {RESULTS_DIR}")
print(f"Zipped archive: {zip_path}")

---
## Instructions & Troubleshooting

### Workflow Overview

This notebook uses BoltzGen to design nanobody CDR regions through an async job pipeline:

1. **Prepare Config** — Create a YAML configuration with target CIF and nanobody scaffolds
2. **Validate Config** — Verify the configuration is correct before submission
3. **Submit Job** — Launch an async GPU design job (nanobody-anything protocol)
4. **Monitor Progress** — Poll job status and view logs
5. **Retrieve Results** — Download designed nanobody structures and metrics
6. **Visualize** — Generate quality assessment figures
7. **Download** — Package results for sharing

### Input Format

- **Target CIF** — CIF or PDB file with target protein structure
- **Target chain** — Chain ID to design nanobody against (e.g., `"B"`)
- **Nanobody scaffolds** — YAML files defining framework regions (optional, BoltzGen has defaults)
- **Budget** — Computational budget controlling design diversity (typical: 1–5)

### Common Issues

| Problem | Solution |
|---------|---------|
| `boltzgen_mcp` not found | Run `pmcp install boltzgen_mcp` to pull Docker image |
| Job stuck in pending | Check GPU availability and Docker GPU runtime |
| Config validation fails | Verify chain IDs exist in CIF; check file paths are absolute |
| Low quality designs (low pTM/iPTM) | Increase budget parameter; try different scaffolds |
| GPU out of memory | Reduce num_designs; ensure sufficient GPU memory (~8GB) |
| Job ID not captured | Manually set `job_id = 'your-id'` from Step 3 output |
| Docker image pull timeout | Re-run `pmcp install boltzgen_mcp` (large image) |
| Scaffold files not found | Scaffolds are optional; BoltzGen has built-in defaults |

### Quality Thresholds

| Metric | Good | Acceptable | Description |
|--------|------|------------|-------------|
| pTM | \u22650.8 | \u22650.6 | Predicted TM-score (higher is better) |
| iPTM | \u22650.5 | \u22650.3 | Interface pTM score (higher is better) |
| pAE | \u22645\u00c5 | \u226410\u00c5 | Predicted aligned error (lower is better) |
| H-bonds | \u22653 | \u22651 | Interface hydrogen bonds (higher is better) |
| delta SASA | \u2265400 | \u2265200 | Buried surface area (higher is better) |

### References

- [BoltzGen](https://github.com/jwohlwend/boltzgen) — Generative model for nanobody design
- [Boltz2](https://github.com/jwohlwend/boltz) — Structure prediction
- [Nanobody Resources (SAbDab)](https://opig.stats.ox.ac.uk/webapps/sabdab-sabpred/) — Nanobody databases
- [ProteinMCP](https://github.com/charlesxu90/ProteinMCP) — This project