In [None]:
from prodock.engine.vina import VinaDock
vd = VinaDock(
    sf_name="vina",
    cpu=4,
    seed=42,
    receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
    center=(32.5, 13.0, 133.75),
    size=(22.5, 23.5, 22.5),
    ligand="Data/testcase/dock/ligand/8HW.pdbqt",
    exhaustiveness=8,
    n_poses=9,
    out_poses="./Data/testcase/dock/out/vina_out.pdbqt",
    log_path="./Data/testcase/docklog/log.txt",
    autorun=True,
    autowrite=True,
)
print("scores:", vd.scores)
print("best:", vd.best_score)


In [None]:
vd = (VinaDock(sf_name="vina", cpu=4, seed=42, verbosity=1)
      .set_receptor("Data/testcase/dock/receptor/5N2F.pdbqt", validate=False)
      .define_box(center=(32.5, 13.0, 133.75), size=(22.5, 23.5, 22.5))
      .set_ligand("Data/testcase/dock/ligand/8HW.pdbqt")
      .dock(exhaustiveness=8, n_poses=9)
      .write_poses("./Data/testcase/dock/out/vina_out.pdbqt")
      .write_log("./Data/testcase/docklog/log.txt"))


In [None]:
from prodock.engine.binary import BinaryDock

bd = BinaryDock(
    binary_name="qvina",              # or "qvina", path to executable, etc.
    cpu=8,
    seed=42,
    receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
    center=(32.5, 13.0, 133.75),
    size=(22.5, 23.5, 22.5),
    ligand="Data/testcase/dock/ligand/8HW.pdbqt",
    exhaustiveness=8,
    n_poses=9,
    out_poses="./Data/testcase/dock/out/smina_out.pdbqt",
    log_path="./Data/testcase/docklog/smina.log",
    autorun=True,     # compute maps/run right away (if inputs present)
    autowrite=True,   # ensure output/log paths are written (binary may also write)
    verbosity=1,      # 0=ERROR,1=INFO,2=DEBUG
)
print("result:", bd.result)   # dict with rc, stdout, stderr, out, log, called


In [None]:
from prodock.engine.binary import BinaryDock

bd = BinaryDock(
    binary_name="qvina",              # or "qvina", path to executable, etc.
    cpu=8,
    seed=42,
    receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
    center=(32.5, 13.0, 133.75),
    size=(22.5, 23.5, 22.5),
    ligand="out/BMS-986115.pdbqt",
    exhaustiveness=8,
    n_poses=9,
    out_poses="./Data/testcase/dock/out/smina_out.pdbqt",
    log_path="./Data/testcase/docklog/smina.log",
    autorun=True,     # compute maps/run right away (if inputs present)
    autowrite=True,   # ensure output/log paths are written (binary may also write)
    verbosity=1,      # 0=ERROR,1=INFO,2=DEBUG
)
print("result:", bd.result)   # dict with rc, stdout, stderr, out, log, called


In [None]:
bd.result

In [None]:
# prodock/engine/multiple.py
from __future__ import annotations

import csv
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union

try:
    from tqdm import tqdm  # type: ignore
except Exception:  # pragma: no cover
    tqdm = None  # type: ignore

from prodock.engine.vina import VinaDock
from prodock.engine.binary import BinaryDock

logger = logging.getLogger("prodock.engine.multiple")
logger.addHandler(logging.NullHandler())


@dataclass
class DockResult:
    """Container for one ligand docking outcome."""
    ligand_path: Path
    out_path: Optional[Path] = None
    log_path: Optional[Path] = None
    # Unified scores: list of (affinity, rmsd_lb, rmsd_ub)
    scores: List[Tuple[float, float, float]] = field(default_factory=list)
    best_score: Optional[float] = None
    status: str = "pending"  # pending | ok | failed | skipped
    error: Optional[str] = None
    attempts: int = 0


class MultipleDock:
    """
    Batch docking over a directory or explicit list of ligands using either
    :class:`prodock.engine.vina.VinaDock` (Vina API) or
    :class:`prodock.engine.binary.BinaryDock` (smina/qvina).

    Performance optimization
    ------------------------
    - When `cache_per_worker=True` (default), each worker/thread **caches one backend**:
      - **Vina**: the worker builds a single `VinaDock`, calls `set_receptor()` and
        `define_box()` **once**, then **reuses the precomputed maps** for all ligands
        handled by that worker. This eliminates repeated "Computing Vina grid ... done."
        lines and saves significant time.
      - **Binary** (smina/qvina): binaries still compute their own internal grids per
        subprocess run, but per-worker caching avoids repeated executable resolution,
        `--help` probing, and repeated option wiring.

    Usage styles
    ------------
    1) **One-shot** (mirrors your VinaDock compact init + autorun):
       >>> md = MultipleDock(
       ...     receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
       ...     ligand_dir="Data/testcase/dock/ligand",
       ...     backend="vina",
       ...     ligand_format="pdbqt",
       ...     center=(32.5, 13.0, 133.75),
       ...     size=(22.5, 23.5, 22.5),
       ...     exhaustiveness=8,
       ...     n_poses=9,
       ...     cpu=4,
       ...     out_dir="./Data/testcase/dock/out",
       ...     log_dir="./Data/testcase/dock/logs",
       ...     n_workers=4,
       ...     skip_existing=True,
       ...     verbose=1,
       ...     autorun=True,
       ...     autowrite=True,
       ... )
       >>> print("best per ligand:", md.best_per_ligand)  # doctest: +SKIP

    2) **Staged chaining** (fluent API, like your VinaDock chain):
       >>> md = (MultipleDock(
       ...         receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
       ...         ligand_dir="Data/testcase/dock/ligand",
       ...         backend="vina",
       ...         ligand_format="pdbqt")
       ...       .set_box((32.5,13.0,133.75), (22.5,23.5,22.5))
       ...       .set_exhaustiveness(8)
       ...       .set_num_modes(9)
       ...       .set_cpu(4)
       ...       .set_out_dirs("./Data/testcase/dock/out", "./Data/testcase/dock/logs")
       ...       .set_workers(4)
       ...       .set_skip_existing(True)
       ...       .set_verbose(1)
       ...       .run()
       ... )
       >>> print(len(md.ok_results))  # doctest: +SKIP

    Parameters
    ----------
    receptor : str | Path
        Receptor **PDBQT** path (required).
    ligand_dir : str | Path, optional
        Directory with ligands (discovery per `ligand_format` + `filter_pattern`).
    ligands : sequence of paths, optional
        Explicit ligands list (overrides folder discovery).
    backend : str | Callable[[], Any] | VinaDock | BinaryDock, optional
        "vina" | "smina" | "qvina" | "qvina-w" | "binary" | "<custom exe>" |
        factory callable returning a backend instance | prebuilt backend instance.
    ligand_format : {"pdbqt","sdf","mol2","auto","any"}, optional
        Default "pdbqt". Use "sdf"/"mol2" with smina.
    filter_pattern : str, optional
        Glob (e.g., "LIG_*") without extension, default "*".
    center, size : tuple, optional
        Docking box; required if `autobox=False`.
    autobox : bool, optional
        If True, use autobox with `autobox_ref` and `autobox_padding` (BinaryDock/smina).
        VinaDock does **not** support autobox.
    autobox_ref : str | Path, optional
        Reference file for autobox.
    autobox_padding : float, optional
        Padding Å for autobox (default 4.0).
    exhaustiveness : int, optional
        Search exhaustiveness (default 8).
    n_poses : int, optional
        Number of poses (default 9).
    cpu : int, optional
        Worker CPU threads to request (if supported).
    seed : int, optional
        RNG seed (if supported).
    out_dir, log_dir : str | Path, optional
        Output root and logs directory (defaults: "./docked" and "<out_dir>/logs").
    pose_suffix, log_suffix : str, optional
        Filenames suffixes (defaults: "_docked.pdbqt", ".log").
    n_workers : int, optional
        Thread workers (default 1). For Vina, each worker builds one maps cache.
    skip_existing : bool, optional
        Skip ligands whose output already exists (default True).
    max_retries : int, optional
        Retries on failure (default 2).
    retry_backoff : float, optional
        Exponential backoff base (default 1.5).
    timeout : float, optional
        Per-run timeout for BinaryDock subprocesses.
    verbose : int, optional
        0 silent, 1 tqdm, 2+ per-ligand prints.
    cache_per_worker : bool, optional
        Cache a prepared backend per worker (default True).
    autorun, autowrite : bool, optional
        Convenience flags: run on construction and write summary CSV.

    Notes
    -----
    - For Vina + `n_workers=1`, maps are computed **once total**; for `n_workers>1`,
      maps computed **once per worker**.
    """

    def __init__(
        self,
        receptor: Union[str, Path],
        ligand_dir: Optional[Union[str, Path]] = None,
        *,
        ligands: Optional[Sequence[Union[str, Path]]] = None,
        backend: Union[str, Callable[[], Any], VinaDock, BinaryDock] = "vina",
        ligand_format: str = "pdbqt",
        filter_pattern: str = "*",
        center: Optional[Tuple[float, float, float]] = None,
        size: Optional[Tuple[float, float, float]] = None,
        autobox: bool = False,
        autobox_ref: Optional[Union[str, Path]] = None,
        autobox_padding: float = 4.0,
        exhaustiveness: int = 8,
        n_poses: int = 9,
        cpu: Optional[int] = None,
        seed: Optional[int] = None,
        out_dir: Union[str, Path] = "./docked",
        log_dir: Optional[Union[str, Path]] = None,
        pose_suffix: str = "_docked.pdbqt",
        log_suffix: str = ".log",
        n_workers: int = 1,
        skip_existing: bool = True,
        max_retries: int = 2,
        retry_backoff: float = 1.5,
        timeout: Optional[float] = None,
        verbose: int = 1,
        cache_per_worker: bool = True,
        autorun: bool = False,
        autowrite: bool = False,
    ):
        # Receptor
        self.receptor = Path(receptor)
        if not self.receptor.exists():
            raise FileNotFoundError(f"Receptor not found: {self.receptor}")
        if self.receptor.suffix.lower() != ".pdbqt":
            raise ValueError("Receptor must be a PDBQT file.")

        # Ligands source
        self.ligand_dir: Optional[Path] = Path(ligand_dir) if ligand_dir else None
        if self.ligand_dir and not self.ligand_dir.exists():
            raise FileNotFoundError(f"Ligand directory not found: {self.ligand_dir}")
        self._explicit_ligands: Optional[List[Path]] = [Path(p) for p in ligands] if ligands else None

        # Backend spec
        self._backend_spec = backend

        # Discovery
        self._ligand_format = ligand_format.lower().strip()
        self._filter_pattern = filter_pattern

        # Box / autobox
        self._box_center = tuple(map(float, center)) if center is not None else None
        self._box_size = tuple(map(float, size)) if size is not None else None
        self._use_autobox = bool(autobox)
        self._autobox_ref = Path(autobox_ref) if autobox_ref is not None else None
        self._autobox_padding = float(autobox_padding)

        # Params
        self._exhaustiveness = int(exhaustiveness)
        self._num_modes = int(n_poses)
        self._cpu = None if cpu is None else int(cpu)
        self._seed = None if seed is None else int(seed)

        # IO
        self.out_dir = Path(out_dir)
        self.log_dir = Path(log_dir) if log_dir is not None else (self.out_dir / "logs")
        self.pose_suffix = str(pose_suffix)
        self.log_suffix = str(log_suffix)

        # Runtime
        self._n_workers = max(1, int(n_workers))
        self._skip_existing = bool(skip_existing)
        self._max_retries = max(0, int(max_retries))
        self._retry_backoff = float(retry_backoff)
        self._timeout = None if timeout is None else float(timeout)
        self._verbose = max(0, int(verbose))
        self._cache_per_worker = bool(cache_per_worker)

        # State
        self._ligands: List[Path] = []
        self.results: List[DockResult] = []
        self._summary_path: Optional[Path] = None

        # Thread-local cache for per-worker backends
        self._tls = threading.local()

        # Discovery
        if self._explicit_ligands is not None:
            self._ligands = [p for p in self._explicit_ligands if p.exists()]
        else:
            self._refresh_ligands()

        # Autorun
        if autorun:
            self.run(n_workers=self._n_workers)
            if autowrite:
                self.write_summary()

    # -------------------- Discovery -------------------- #
    def _join_glob(self, ext_wo_dot: str) -> str:
        base = self._filter_pattern
        if base.endswith(f".{ext_wo_dot}"):
            return base
        return f"{base}.{ext_wo_dot}"

    def _refresh_ligands(self) -> None:
        if self._explicit_ligands is not None:
            self._ligands = [p for p in self._explicit_ligands if p.exists()]
            return
        if not self.ligand_dir:
            self._ligands = []
            return
        ext_sets: Dict[str, List[Path]] = {
            "pdbqt": sorted(self.ligand_dir.glob(self._join_glob("pdbqt"))),
            "sdf":   sorted(self.ligand_dir.glob(self._join_glob("sdf"))),
            "mol2":  sorted(self.ligand_dir.glob(self._join_glob("mol2"))),
        }
        fmt = self._ligand_format
        if fmt == "pdbqt":
            self._ligands = ext_sets["pdbqt"]
        elif fmt == "sdf":
            self._ligands = ext_sets["sdf"]
        elif fmt == "mol2":
            self._ligands = ext_sets["mol2"]
        elif fmt == "auto":
            self._ligands = ext_sets["pdbqt"] or ext_sets["sdf"] or ext_sets["mol2"]
        elif fmt == "any":
            self._ligands = ext_sets["pdbqt"] + ext_sets["sdf"] + ext_sets["mol2"]
        else:
            raise ValueError("ligand_format must be one of: 'pdbqt', 'sdf', 'mol2', 'auto', 'any'")

    def _validate_ready(self) -> None:
        if not self._use_autobox and (self._box_center is None or self._box_size is None):
            raise RuntimeError("Docking box not defined. Call set_box(...) or enable_autobox(...).")
        if not self._ligands:
            raise RuntimeError("No ligands discovered. Check ligand_dir/ligands and ligand_format/filter_pattern.")
        if isinstance(self._backend_spec, str) and self._backend_spec.lower() in {"vina", "qvina", "qvina-w"}:
            wrong = [p for p in self._ligands if p.suffix.lower() != ".pdbqt"]
            if wrong:
                raise ValueError("Selected backend requires PDBQT ligands. Offending: " + ", ".join(x.name for x in wrong))

    # -------------------- Backend factory & caching -------------------- #
    def _backend_key(self) -> str:
        """A stable key describing the prepared backend configuration."""
        spec = self._backend_spec
        if isinstance(spec, str):
            name = spec.lower()
        elif isinstance(spec, (VinaDock, BinaryDock)):
            name = type(spec).__name__.lower()
        else:
            name = "factory"
        # We include receptor + box/autobox in the signature to ensure correctness
        return "|".join([
            name,
            str(self.receptor.resolve()),
            f"autobox={int(self._use_autobox)}",
            f"ref={str(self._autobox_ref) if self._autobox_ref else ''}",
            f"pad={self._autobox_padding}",
            f"center={self._box_center}",
            f"size={self._box_size}",
            f"cpu={self._cpu}",
            f"seed={self._seed}",
            f"exh={self._exhaustiveness}",
            f"nposes={self._num_modes}",
        ])

    def _get_cached_backend(self) -> Any:
        """Return a cached backend for this worker (thread), if available & compatible."""
        if not self._cache_per_worker:
            return None
        if not hasattr(self._tls, "cached"):
            self._tls.cached = None  # type: ignore[attr-defined]
            self._tls.sig = None     # type: ignore[attr-defined]
        if self._tls.sig == self._backend_key():  # type: ignore[attr-defined]
            return self._tls.cached  # type: ignore[attr-defined]
        return None

    def _set_cached_backend(self, backend: Any) -> None:
        if not self._cache_per_worker:
            return
        self._tls.cached = backend  # type: ignore[attr-defined]
        self._tls.sig = self._backend_key()  # type: ignore[attr-defined]

    def _create_backend_fresh(self) -> Any:
        """Create and fully prepare a fresh backend according to the current settings."""
        spec = self._backend_spec

        # If a callable factory is provided, call it (recommended for parallel)
        if callable(spec) and not isinstance(spec, str):
            backend = spec()
        elif isinstance(spec, (VinaDock, BinaryDock)):
            backend = spec  # reuse (only safe sequentially)
        elif isinstance(spec, str):
            key = spec.lower()
            if key == "vina":
                backend = VinaDock(sf_name="vina", cpu=(self._cpu or 1), seed=self._seed, verbosity=1 if self._verbose == 1 else (2 if self._verbose >= 2 else 0))
            else:
                backend = BinaryDock(binary_name=key)
        else:
            # Unknown type → assume BinaryDock with custom executable name
            backend = BinaryDock(str(spec))

        # Prepare common state
        if isinstance(backend, VinaDock) or (type(backend).__name__ == "VinaDock"):
            backend.set_receptor(str(self.receptor))
            if self._use_autobox:
                # Not supported in Vina Python API — enforce explicit box
                raise RuntimeError("Autobox is not supported by VinaDock; provide center/size.")
            backend.define_box(center=self._box_center, size=self._box_size)
        else:
            # BinaryDock path
            backend.set_receptor(str(self.receptor))
            if self._use_autobox:
                backend.enable_autobox(str(self._autobox_ref), padding=self._autobox_padding)
            else:
                backend.set_box(self._box_center, self._box_size)
            if self._cpu is not None:
                backend.set_cpu(self._cpu)
            if self._seed is not None:
                backend.set_seed(self._seed)
            backend.set_exhaustiveness(self._exhaustiveness)
            backend.set_num_modes(self._num_modes)
            if self._timeout is not None and hasattr(backend, "set_timeout"):
                backend.set_timeout(self._timeout)

        return backend

    def _get_backend(self) -> Any:
        """
        Retrieve a backend for the current worker. If caching is enabled,
        return the cached backend or create+cache a new one with maps already
        computed (Vina) or options set (Binary).
        """
        cached = self._get_cached_backend()
        if cached is not None:
            return cached
        backend = self._create_backend_fresh()
        self._set_cached_backend(backend)
        return backend

    # -------------------- Fluent setters -------------------- #
    def set_backend(self, backend: Union[str, Callable[[], Any], VinaDock, BinaryDock]) -> "MultipleDock":
        self._backend_spec = backend
        # invalidate cache if backend changed
        if hasattr(self._tls, "sig"):
            self._tls.sig = None  # type: ignore[attr-defined]
        return self

    def set_box(self, center: Tuple[float, float, float], size: Tuple[float, float, float]) -> "MultipleDock":
        self._box_center = tuple(float(x) for x in center)
        self._box_size = tuple(float(x) for x in size)
        self._use_autobox = False
        if hasattr(self._tls, "sig"):
            self._tls.sig = None  # invalidate cache
        return self

    def enable_autobox(self, reference_ligand: Union[str, Path], padding: float = 4.0) -> "MultipleDock":
        self._use_autobox = True
        self._autobox_ref = Path(reference_ligand)
        self._autobox_padding = float(padding)
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_exhaustiveness(self, ex: int) -> "MultipleDock":
        self._exhaustiveness = int(ex)
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_num_modes(self, n: int) -> "MultipleDock":
        self._num_modes = int(n)
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_cpu(self, cpu: int) -> "MultipleDock":
        self._cpu = int(cpu)
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_seed(self, seed: Optional[int]) -> "MultipleDock":
        self._seed = int(seed) if seed is not None else None
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_out_dirs(self, out_dir: Union[str, Path], log_dir: Optional[Union[str, Path]] = None) -> "MultipleDock":
        self.out_dir = Path(out_dir)
        self.out_dir.mkdir(parents=True, exist_ok=True)
        self.log_dir = Path(log_dir) if log_dir else (self.out_dir / "logs")
        self.log_dir.mkdir(parents=True, exist_ok=True)
        return self

    def set_workers(self, n_workers: int) -> "MultipleDock":
        self._n_workers = max(1, int(n_workers))
        return self

    def set_skip_existing(self, skip: bool) -> "MultipleDock":
        self._skip_existing = bool(skip)
        return self

    def set_max_retries(self, max_retries: int, backoff: float = 1.5) -> "MultipleDock":
        self._max_retries = max(0, int(max_retries))
        self._retry_backoff = float(backoff)
        return self

    def set_timeout(self, seconds: Optional[float]) -> "MultipleDock":
        self._timeout = None if seconds is None else float(seconds)
        if hasattr(self._tls, "sig"):
            self._tls.sig = None
        return self

    def set_verbose(self, verbose: int) -> "MultipleDock":
        self._verbose = max(0, int(verbose))
        return self

    def set_filter_pattern(self, glob_pat: str) -> "MultipleDock":
        self._filter_pattern = str(glob_pat)
        self._refresh_ligands()
        return self

    def set_ligand_format(self, ligand_format: str) -> "MultipleDock":
        self._ligand_format = ligand_format.lower().strip()
        self._refresh_ligands()
        return self

    def set_ligand_dir(self, ligand_dir: Union[str, Path]) -> "MultipleDock":
        self.ligand_dir = Path(ligand_dir)
        if not self.ligand_dir.exists():
            raise FileNotFoundError(f"Ligand directory not found: {self.ligand_dir}")
        self._refresh_ligands()
        return self

    def set_ligands(self, ligands: Sequence[Union[str, Path]]) -> "MultipleDock":
        self._explicit_ligands = [Path(p) for p in ligands]
        self._refresh_ligands()
        return self

    # -------------------- Helpers -------------------- #
    def _make_out_paths(self, ligand_path: Path) -> Tuple[Path, Path]:
        name = ligand_path.stem
        return (self.out_dir / f"{name}{self.pose_suffix}", self.log_dir / f"{name}{self.log_suffix}")

    def _should_skip(self, out_path: Path) -> bool:
        return self._skip_existing and out_path.exists()

    # -------------------- Single-ligand execution -------------------- #
    def _dock_single(self, ligand_path: Path) -> DockResult:
        res = DockResult(ligand_path=ligand_path)
        out_path, log_path = self._make_out_paths(ligand_path)
        res.out_path, res.log_path = out_path, log_path

        if self._should_skip(out_path):
            res.status = "skipped"
            return res

        attempt = 0
        while attempt <= self._max_retries:
            attempt += 1
            res.attempts = attempt
            try:
                backend = self._get_backend()

                # VinaDock-like flow
                if isinstance(backend, VinaDock) or (type(backend).__name__ == "VinaDock"):
                    backend.set_ligand(str(ligand_path))
                    backend.dock(exhaustiveness=self._exhaustiveness, n_poses=self._num_modes)
                    if hasattr(backend, "write_poses"):
                        backend.write_poses(str(out_path))
                    if hasattr(backend, "write_log"):
                        backend.write_log(str(log_path))
                    # gather scores
                    sc = getattr(backend, "scores", None) or []
                    res.scores = list(sc)
                    # best may be a tuple from get_best or a property best_score
                    best = getattr(backend, "best_score", None) or (getattr(backend, "get_best", lambda: None)())
                    if best is None:
                        res.best_score = None
                    else:
                        res.best_score = float(best[0]) if isinstance(best, (list, tuple)) else float(best)

                else:
                    # BinaryDock-like flow
                    backend.set_ligand(str(ligand_path))
                    backend.set_out(str(out_path))
                    backend.set_log(str(log_path))
                    backend.run()
                    # parse scores from log (unified)
                    if hasattr(backend, "parse_scores_from_log"):
                        rows = backend.parse_scores_from_log(log_path)
                        res.scores = [(float(r["affinity"]), float(r["rmsd_lb"]), float(r["rmsd_ub"])) for r in rows]
                        res.best_score = res.scores[0][0] if res.scores else None
                    else:
                        sc = getattr(backend, "scores", None) or []
                        res.scores = list(sc)
                        best = getattr(backend, "best_score", None) or (getattr(backend, "get_best", lambda: None)())
                        res.best_score = (float(best[0]) if isinstance(best, (list, tuple)) else float(best)) if best is not None else None

                res.status = "ok"
                res.error = None
                return res

            except Exception as exc:
                res.status = "failed"
                res.error = f"{type(exc).__name__}: {exc}"
                logger.exception("Dock attempt %d failed for %s: %s", attempt, ligand_path, exc)
                if attempt > self._max_retries:
                    return res
                wait = min(30.0 * attempt, (self._retry_backoff ** (attempt - 1)))
                if self._verbose >= 2:
                    print(f"[retry] {ligand_path.name} attempt={attempt}/{self._max_retries} sleep {wait:.1f}s")
                time.sleep(wait)

        return res  # fallback

    # -------------------- Public run -------------------- #
    def run(
        self,
        *,
        n_workers: Optional[int] = None,
        ligands: Optional[Sequence[Union[str, Path]]] = None,
    ) -> "MultipleDock":
        """
        Execute docking for all discovered/explicit ligands.

        Parameters
        ----------
        n_workers : int, optional
            Thread count (defaults to configured value).
        ligands : sequence of paths, optional
            Override the ligands set for this run.

        Returns
        -------
        MultipleDock
            self (inspect :pyattr:`results`, :pyattr:`best_per_ligand`).
        """
        if ligands is not None:
            self.set_ligands(ligands)

        self._refresh_ligands()
        self._validate_ready()

        self.out_dir.mkdir(parents=True, exist_ok=True)
        self.log_dir.mkdir(parents=True, exist_ok=True)

        n_workers = self._n_workers if n_workers is None else max(1, int(n_workers))
        self._n_workers = n_workers  # affects backend cache key (per-worker maps)
        total = len(self._ligands)
        self.results = []

        use_tqdm = (self._verbose >= 1) and (tqdm is not None)

        if n_workers <= 1:
            iterator: Iterable[Path] = self._ligands
            if use_tqdm:
                iterator = tqdm(iterator, desc="Docking", unit="ligand", ncols=80)
            for lig in iterator:
                if self._verbose >= 2:
                    print(f"[dock] {lig.name}")
                res = self._dock_single(lig)
                self.results.append(res)
                if self._verbose >= 2:
                    self._print_one_line(res)
            return self

        # parallel
        futures = []
        with ThreadPoolExecutor(max_workers=n_workers) as pool:
            for lig in self._ligands:
                futures.append(pool.submit(self._dock_single, lig))

            if use_tqdm:
                pbar = tqdm(total=total, desc="Docking", unit="ligand", ncols=80)
                for fut in as_completed(futures):
                    res = self._resolve_future(fut)
                    self.results.append(res)
                    pbar.update(1)
                    if self._verbose >= 2:
                        self._print_one_line(res)
                pbar.close()
            else:
                for fut in as_completed(futures):
                    res = self._resolve_future(fut)
                    self.results.append(res)

        return self

    def _resolve_future(self, fut) -> DockResult:
        try:
            return fut.result()
        except Exception as exc:  # pragma: no cover
            logger.exception("Unhandled exception during docking: %s", exc)
            return DockResult(ligand_path=Path("<unknown>"), status="failed", error=str(exc))

    def _print_one_line(self, r: DockResult) -> None:  # pragma: no cover
        if r.status == "ok":
            print(f"[ok] {r.ligand_path.name} best={r.best_score} out={r.out_path}")
        elif r.status == "skipped":
            print(f"[skipped] {r.ligand_path.name} (exists)")
        else:
            print(f"[fail] {r.ligand_path.name} err={r.error}")

    # -------------------- Outputs -------------------- #
    def write_summary(self, path: Optional[Union[str, Path]] = None) -> Path:
        """
        Write a CSV summary of results. Returns the written path.
        """
        path = Path(path) if path is not None else (self.out_dir / "docking_summary.csv")
        path.parent.mkdir(parents=True, exist_ok=True)
        with path.open("w", newline="", encoding="utf-8") as fh:
            writer = csv.writer(fh)
            writer.writerow(["ligand", "out_path", "log_path", "best_score", "status", "error", "attempts"])
            for r in self.results:
                writer.writerow([
                    str(r.ligand_path),
                    str(r.out_path) if r.out_path else "",
                    str(r.log_path) if r.log_path else "",
                    f"{r.best_score:.3f}" if r.best_score is not None else "",
                    r.status,
                    r.error or "",
                    r.attempts,
                ])
        self._summary_path = path
        logger.info("Wrote docking summary to %s", path)
        return path

    # -------------------- Convenience accessors -------------------- #
    @property
    def ligands(self) -> List[Path]:
        return list(self._ligands)

    @property
    def best_per_ligand(self) -> Dict[str, Optional[float]]:
        return {r.ligand_path.name: r.best_score for r in self.results}

    @property
    def ok_results(self) -> List[DockResult]:
        return [r for r in self.results if r.status == "ok"]

    @property
    def failed_results(self) -> List[DockResult]:
        return [r for r in self.results if r.status == "failed"]

    @property
    def summary_path(self) -> Optional[Path]:
        return self._summary_path

    def help(self) -> None:  # pragma: no cover
        print(self.__doc__ or "MultipleDock: batch docking wrapper.")


In [1]:
from prodock.engine.multiple import MultipleDock

md = MultipleDock(
    receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
    ligand_dir="out",
    backend="qvina",
    ligand_format="pdbqt",
    center=(32.5, 13.0, 133.75),
    size=(22.5, 23.5, 22.5),
    exhaustiveness=8,
    n_poses=9,
    cpu=4,
    out_dir="./Data/testcase/dock/out",
    log_dir="./Data/testcase/dock/logs",
    n_workers=4,
    skip_existing=False,
    verbose=1,
    autorun=True,
    autowrite=True,
    seed=42,
)
print("results:", len(md.results))
print("best per ligand:", md.best_per_ligand)


Docking: 100%|████████████████████████████████| 3/3 [00:23<00:00,  7.86s/ligand]

results: 3
best per ligand: {'BMS-863233.pdbqt': -8.0, 'BMS-183920.pdbqt': -7.8, 'BMS-986115.pdbqt': -6.4}





In [None]:
# One-shot-like: create a runner, configure, then run immediately
runner = MultipleDock(
    receptor="Data/testcase/dock/receptor/5N2F.pdbqt",
    ligand_dir="out",   # folder containing many *.pdbqt
    backend="smina",                            # use Vina backend
    ligand_format="pdbqt",                     # default, required by vina
)

# Configure and run in a compact block
runner.set_box((32.5, 13.0, 133.75), (22.5, 23.5, 22.5)) \
      .set_exhaustiveness(8) \
      .set_num_modes(9) \
      .set_cpu(4) \
      .set_out_dirs("./Data/testcase/dock/out", "./Data/testcase/dock/logs") \
      .set_workers(4) \
      .set_skip_existing(True) \
      .set_verbose(1) \
      .run()

# Inspect results
print("Total ligands processed:", len(runner.results))
print("Successful runs:", len(runner.ok_results))
print("Best per ligand (dict):", runner.best_per_ligand)

# Write CSV summary
runner.write_summary("./Data/testcase/dock/out/docking_summary.csv")

In [None]:
# dock_gui.py
from __future__ import annotations

import json
import html
import time
import threading
import traceback
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import ipywidgets as widgets
from IPython.display import display, clear_output, HTML, Javascript

from prodock.engine.multiple import MultipleDock

# Optional ProVis preview; if not importable the GUI degrades gracefully.
try:
    from .provis import ProVis
except Exception:
    try:
        from provis import ProVis
    except Exception:
        ProVis = None  # type: ignore

class DockGUI:
    """
    Interactive Dock GUI for MultipleDock.

    Usage:
        gui = DockGUI().build().display()

    Features:
    - Async chunked runs (Pause/Resume/Stop)
    - Tail newest .log into a log card
    - Preview Top-5 poses with ProVis (if available)
    - Export report, write summary CSV, optional zip bundle
    - Compact layout and Dark/Light theme
    - Keyboard shortcuts: Alt+U (Run), Alt+S (Write CSV), Alt+P (Pause/Resume)
    """

    THEMES: Dict[str, Dict[str, str]] = {
        "light": {
            "bg": "#ffffff",
            "card_bg": "#fbfbfd",
            "card_border": "#e7eef6",
            "mute": "#6b7280",
            "accent": "#2563eb",
            "status_ok": "#16a34a",
            "status_warn": "#f59e0b",
            "status_err": "#dc2626",
            "table_head_bg": "#f3f4f6",
            "table_border": "#e5e7eb",
            "link": "#0ea5e9",
            "header_grad": "linear-gradient(90deg,#eef2ff,#f0f9ff)",
        },
        "dark": {
            "bg": "#0f172a",
            "card_bg": "#0b1222",
            "card_border": "#1f2a44",
            "mute": "#9aa3b2",
            "accent": "#60a5fa",
            "status_ok": "#22c55e",
            "status_warn": "#fbbf24",
            "status_err": "#f87171",
            "table_head_bg": "#111a2f",
            "table_border": "#1f2a44",
            "link": "#67e8f9",
            "header_grad": "linear-gradient(90deg,#0b1222,#0f172a)",
        },
    }

    # ---- construction ---------------------------------------------------------
    def __init__(self, vw: int = 1100, vh: int = 700) -> None:
        # viewer preferred size for ProVis preview
        self._vw = int(vw)
        self._vh = int(vh)

        # Header/theme/layout
        self._theme = widgets.ToggleButtons(options=[("🌞 Light", "light"), ("🌙 Dark", "dark")], value="light")
        self._compact = widgets.Checkbox(value=False, description="Compact layout (1-col)")
        self._title = widgets.HTML("<h2 style='margin:6px 0 0 0;'>Dock Console</h2><div style='color:#6b7280;margin-top:-4px;'>MultipleDock</div>")
        self._btn_help = widgets.Button(description="", icon="question-circle", tooltip="Open help")

        # Sources / ligands
        self._receptor_path = widgets.Text(value="", description="Receptor:", placeholder="path/to/receptor.pdbqt")
        self._ligand_dir = widgets.Text(value="", description="Ligand dir:", placeholder="path/to/ligands")
        self._ligand_list_json = widgets.Textarea(value="", description="Ligands (JSON):", placeholder='["/path/a.pdbqt","/path/b.pdbqt"]', layout=widgets.Layout(height="90px"))
        self._refresh_lig_btn = widgets.Button(description="Refresh ligands", icon="refresh")
        self._preview_ligands_btn = widgets.Button(description="Preview found", icon="list")
        self._lig_format = widgets.Dropdown(options=["pdbqt", "sdf", "mol2", "auto", "any"], value="pdbqt", description="Ligand fmt:")
        self._filter_pat = widgets.Text(value="*", description="Filter glob:")

        # Backend / runtime params
        self._backend = widgets.Dropdown(options=[("Vina (API)", "vina"), ("smina (binary)", "smina"), ("qvina", "qvina"), ("qvina-w", "qvina-w"), ("Custom binary", "binary")], value="vina", description="Backend:")
        self._custom_backend = widgets.Text(value="", description="Custom exe:", placeholder="path/to/exe")
        self._exhaust = widgets.IntSlider(value=8, min=1, max=64, step=1, description="Exhaustiveness:", continuous_update=False)
        self._n_poses = widgets.IntSlider(value=9, min=1, max=50, step=1, description="Num poses:", continuous_update=False)
        self._cpu = widgets.IntText(value=4, description="CPU:")
        self._workers = widgets.IntSlider(value=4, min=1, max=64, step=1, description="Workers:", continuous_update=False)
        self._seed = widgets.IntText(value=0, description="Seed (0 none):")

        # Box / autobox
        self._use_autobox = widgets.Checkbox(value=False, description="Use autobox (Binary only)")
        self._autobox_ref = widgets.Text(value="", description="Autobox ref:")
        self._autobox_pad = widgets.FloatText(value=4.0, description="Autobox pad:")
        self._center_x = widgets.FloatText(value=0.0, description="center_x:")
        self._center_y = widgets.FloatText(value=0.0, description="center_y:")
        self._center_z = widgets.FloatText(value=0.0, description="center_z:")
        self._size_x = widgets.FloatText(value=22.5, description="size_x:")
        self._size_y = widgets.FloatText(value=22.5, description="size_y:")
        self._size_z = widgets.FloatText(value=22.5, description="size_z:")
        self._vina_cfg_text = widgets.Textarea(value="", description="Import cfg:", placeholder="Paste Vina center_x/…/size_z here", layout=widgets.Layout(height="82px"))
        self._vina_import_btn = widgets.Button(description="Import → box", icon="sign-in")

        # IO & runtime
        self._out_dir = widgets.Text(value="./docked", description="Out dir:")
        self._log_dir = widgets.Text(value="", description="Log dir (opt):")
        self._skip_existing = widgets.Checkbox(value=True, description="Skip existing")
        self._max_retries = widgets.IntSlider(value=2, min=0, max=10, step=1, description="Max retries:")
        self._backoff = widgets.FloatText(value=1.5, description="Retry backoff:")
        self._timeout = widgets.FloatText(value=0.0, description="Timeout (s, 0=off):")
        self._verbose = widgets.Dropdown(options=[(0, 0), (1, 1), (2, 2)], value=1, description="Verbosity:")

        # Actions + chunk/pause/stop
        self._run_btn = widgets.Button(description="Run docking", button_style="primary", icon="play", tooltip="update-shortcut")
        self._pause_btn = widgets.ToggleButton(value=False, description="Pause", icon="pause", tooltip="pause-shortcut")
        self._stop_btn = widgets.Button(description="Stop", icon="stop")
        self._write_csv_btn = widgets.Button(description="Write summary CSV", icon="save", tooltip="savecfg-shortcut")
        self._export_report_btn = widgets.Button(description="Export report (HTML)", icon="file")
        self._zip_outputs = widgets.Checkbox(value=False, description="Zip outputs with report")
        self._chunk_size = widgets.IntSlider(value=50, min=1, max=2000, step=1, description="Chunk size:", continuous_update=False)

        # Status / info / progress / results / filter
        self._status = widgets.HTML(self._status_bar("Ready. Provide receptor and ligands.", level="warn"))
        self._info_panel = widgets.HTML(self._info_block("<i>No run yet.</i>"))
        self._pbar = widgets.IntProgress(value=0, min=0, max=100, description="Progress")
        self._eta = widgets.HTML("<div class='muted'>ETA: —</div>")
        self._ticker = widgets.HTML("<div class='muted'>—</div>")
        self._filter_box = widgets.Text(value="", placeholder="Filter results (ligand / status / score)...")
        self._results_html = widgets.HTML(self._results_table_html([]))

        # Log output uses Output (context manager)
        self._log_output = widgets.Output(layout={"border": "1px solid #ddd", "height": "220px", "overflow": "auto"})
        self._tail_logs = widgets.Checkbox(value=True, description="Tail newest .log")

        # ProVis preview area (Output for context manager)
        self._preview_top_btn = widgets.Button(description="Preview Top-5 poses", icon="eye")
        self._viz_screenshot_btn = widgets.Button(description="Screenshot viewer", icon="camera")
        self._viz_out = widgets.Output(layout={"border": "1px solid transparent", "height": "420px", "overflow": "auto"})

        # Internal state
        self._ui: Optional[widgets.Widget] = None
        self._left_col: Optional[widgets.Widget] = None
        self._right_col: Optional[widgets.Widget] = None
        self._root: Optional[widgets.Widget] = None

        self._last_md: Optional[MultipleDock] = None
        self._last_error: Optional[str] = None

        self._agg_lock = threading.Lock()
        self._agg_results: List[Dict[str, Any]] = []
        self._total_ligands = 0
        self._processed_ligands = 0

        self._run_thread: Optional[threading.Thread] = None
        self._monitor_thread: Optional[threading.Thread] = None
        self._log_thread: Optional[threading.Thread] = None
        self._run_start_ts: Optional[float] = None
        self._stop_flag = threading.Event()
        self._pause_flag = threading.Event()
        self._on_finish_callbacks: List[Callable[[ "DockGUI"], None]] = []

        # Wire events
        self._theme.observe(self._on_theme, names="value")
        self._compact.observe(self._on_compact, names="value")
        self._btn_help.on_click(lambda _: self._open_help_tab())

        self._refresh_lig_btn.on_click(self._on_refresh_ligands)
        self._preview_ligands_btn.on_click(self._on_preview_ligands)
        self._vina_import_btn.on_click(self._on_vina_import)

        self._run_btn.on_click(self._on_run)
        self._pause_btn.observe(self._on_pause_toggle, names="value")
        self._stop_btn.on_click(self._on_stop)
        self._write_csv_btn.on_click(self._on_write_csv)
        self._export_report_btn.on_click(self._on_export_report)
        self._filter_box.observe(self._on_filter_change, names="value")
        self._tail_logs.observe(self._on_tail_toggle, names="value")
        self._preview_top_btn.on_click(self._on_preview_top)
        self._viz_screenshot_btn.on_click(self._on_viz_screenshot)

        # tooltips used by injected JS shortcuts
        self._run_btn.tooltip = "update-shortcut"
        self._write_csv_btn.tooltip = "savecfg-shortcut"
        self._pause_btn.tooltip = "pause-shortcut"

    # -------------------------
    # Fluent setters & public API
    # -------------------------
    def set_receptor(self, path: Union[str, Path]) -> "DockGUI":
        """
        Set receptor path.

        :param path: receptor .pdbqt
        :return: self
        """
        self._receptor_path.value = str(Path(str(path)).expanduser())
        return self

    def set_ligand_dir(self, path: Union[str, Path]) -> "DockGUI":
        """
        Set ligand directory for discovery.

        :param path: ligand folder
        :return: self
        """
        self._ligand_dir.value = str(Path(str(path)).expanduser())
        return self

    def set_ligands(self, ligands: Sequence[Union[str, Path]]) -> "DockGUI":
        """
        Provide explicit ligands list via JSON field.

        :param ligands: sequence of ligand paths
        :return: self
        """
        arr = [str(Path(str(p)).expanduser()) for p in ligands]
        self._ligand_list_json.value = json.dumps(arr, indent=2)
        return self

    def set_on_finish(self, callback: Callable[[ "DockGUI"], None]) -> "DockGUI":
        """
        Register a callback invoked after a run completes.

        :param callback: callable receiving this DockGUI
        :return: self
        """
        if callable(callback):
            self._on_finish_callbacks.append(callback)
        return self

    # -------------------------
    # Layout / build / display
    # -------------------------
    def build(self) -> "DockGUI":
        """Compose widgets and return self (idempotent)."""
        if self._ui is None:
            header = widgets.HBox([widgets.HBox([self._title]), widgets.HBox([self._theme, self._compact, self._btn_help])], layout=widgets.Layout(justify_content="space-between", align_items="center", padding="6px 8px"))

            card_sources = self._card("Sources", widgets.VBox([self._receptor_path, widgets.HTML("<b>Ligands</b>"), self._ligand_dir, self._ligand_list_json, widgets.HBox([self._refresh_lig_btn, self._preview_ligands_btn]), widgets.HTML("<b>Discovery filters</b>"), widgets.HBox([self._lig_format, self._filter_pat])]))
            card_backend = self._card("Backend & Params", widgets.VBox([widgets.HBox([self._backend, self._custom_backend, self._verbose]), widgets.HBox([self._exhaust, self._n_poses]), widgets.HBox([self._cpu, self._workers, self._seed])]))
            card_box = self._card("Box / Autobox", widgets.VBox([self._use_autobox, widgets.HBox([self._autobox_ref, self._autobox_pad]), widgets.HTML("<b>Explicit box</b>"), widgets.GridBox(children=[self._center_x, self._center_y, self._center_z, self._size_x, self._size_y, self._size_z], layout=widgets.Layout(grid_template_columns="repeat(3, minmax(120px, 1fr))", grid_gap="8px")), widgets.HBox([self._vina_cfg_text, self._vina_import_btn])]))
            card_io = self._card("IO & Runtime", widgets.VBox([self._out_dir, self._log_dir, widgets.HBox([self._skip_existing, self._max_retries, self._backoff]), widgets.HBox([self._timeout]), widgets.HBox([self._chunk_size])]))
            help_text = widgets.HTML("<div style='line-height:1.6'><b>Workflow</b>: fill receptor & ligands → choose backend → set box/autobox → Run.<br>Shortcuts: Alt+U Run, Alt+S Write CSV, Alt+P Pause/Resume.</div>")
            card_help = self._card("Help", help_text)

            self._tabs = widgets.Tab(children=[widgets.VBox([card_sources]), widgets.VBox([card_backend]), widgets.VBox([card_box]), widgets.VBox([card_io]), widgets.VBox([card_help])])
            for i, name in enumerate(["Sources", "Backend", "Box", "IO", "Help"]):
                self._tabs.set_title(i, name)

            actions = widgets.HBox([self._run_btn, self._pause_btn, self._stop_btn, self._write_csv_btn, self._export_report_btn, self._zip_outputs])
            progress = widgets.HBox([self._pbar, self._eta])
            preview_bar = widgets.HBox([self._preview_top_btn, self._viz_screenshot_btn])

            right_stack = widgets.VBox([actions, progress, self._status, self._info_panel, widgets.HTML("<b>Results</b>"), self._filter_box, self._results_html, widgets.HTML("<b>Pose Preview</b>"), preview_bar, self._viz_out, widgets.HTML("<b>Log tail</b>"), widgets.HBox([self._tail_logs]), self._log_output, widgets.HTML("<b>Status</b>"), self._ticker])

            self._left_col = widgets.VBox([self._tabs], layout=widgets.Layout(width="42%", padding="6px"))
            self._right_col = widgets.VBox([right_stack], layout=widgets.Layout(width="58%", padding="6px"))
            self._root = widgets.HBox([self._left_col, self._right_col])
            self._ui = widgets.VBox([header, self._root], layout=widgets.Layout(padding="6px"))
            self._ui.add_class("provis-root")

            # apply theme, shortcuts and layout
            self._apply_theme(self._theme.value)
            self._inject_shortcuts()
            self._apply_layout(self._compact.value)
        return self

    def display(self) -> "DockGUI":
        """Render and return self."""
        if self._ui is None:
            self.build()
        display(self._ui)
        return self

    # -------------------------
    # Event handlers
    # -------------------------
    def _open_help_tab(self) -> None:
        if hasattr(self, "_tabs"):
            self._tabs.selected_index = 4

    def _on_theme(self, change: Dict[str, Any]) -> None:
        self._apply_theme(change["new"])

    def _on_compact(self, change: Dict[str, Any]) -> None:
        self._apply_layout(bool(change["new"]))

    def _on_refresh_ligands(self, _btn) -> None:
        try:
            md = self._build_md(dry=True)
            self._status.value = self._status_bar(f"Found {len(md.ligands)} ligands.", "ok")
            self._ticker.value = f"<div>Discovered {len(md.ligands)} ligands</div>"
            with self._log_output:
                clear_output(wait=True)
                print(f"Discovered {len(md.ligands)} ligands. (First 50 listed below)")
                for p in md.ligands[:50]:
                    print(" -", p)
                if len(md.ligands) > 50:
                    print(" ... (+%d more)" % (len(md.ligands) - 50))
        except Exception as e:
            self._capture_error("Refresh ligands error", e, echo=True)

    def _on_preview_ligands(self, _btn) -> None:
        self._on_refresh_ligands(_btn)

    def _on_vina_import(self, _btn) -> None:
        try:
            d = self._parse_vina_cfg(self._vina_cfg_text.value)
            self._center_x.value = d["center_x"]
            self._center_y.value = d["center_y"]
            self._center_z.value = d["center_z"]
            self._size_x.value = d["size_x"]
            self._size_y.value = d["size_y"]
            self._size_z.value = d["size_z"]
            self._status.value = self._status_bar("Vina cfg imported to box.", "ok")
        except Exception as e:
            self._capture_error("CFG import error", e, echo=True)

    def _on_run(self, _btn) -> None:
        self._start_async_run_chunked()

    def _on_pause_toggle(self, change: Dict[str, Any]) -> None:
        val = bool(change.get("value", False))
        if val:
            self._pause_flag.set()
            self._pause_btn.description = "Resume"
            self._pause_btn.icon = "play"
            self._ticker.value = "<div>Paused.</div>"
        else:
            self._pause_flag.clear()
            self._pause_btn.description = "Pause"
            self._pause_btn.icon = "pause"
            self._ticker.value = "<div>Resumed.</div>"

    def _on_stop(self, _btn) -> None:
        self._stop_flag.set()
        self._ticker.value = "<div>Stop requested.</div>"

    def _on_write_csv(self, _btn) -> None:
        try:
            if not self._last_md:
                self._status.value = self._status_bar("Nothing to write — no run yet.", "warn")
                return
            p = self._last_md.write_summary(None)
            with self._log_output:
                clear_output(wait=True)
                print("Summary written:", p)
            self._status.value = self._status_bar(f"Summary CSV written → {p}", "ok")
        except Exception as e:
            self._capture_error("Write summary error", e, echo=True)

    def _on_export_report(self, _btn) -> None:
        self._execute_export_report(bool(self._zip_outputs.value))

    def _on_filter_change(self, change: Dict[str, Any]) -> None:
        self._results_html.value = self._results_table_html(self._agg_results, change.get("value", ""))

    def _on_tail_toggle(self, change: Dict[str, Any]) -> None:
        if not change.get("value", True):
            with self._log_output:
                clear_output(wait=True)
                print("Log tail disabled.")

    def _on_preview_top(self, _btn) -> None:
        self._preview_top5_in_provis()

    def _on_viz_screenshot(self, _btn) -> None:
        self._viz_screenshot()

    # -------------------------
    # Run orchestration (chunked, async, pause/resume)
    # -------------------------
    def _start_async_run_chunked(self) -> None:
        if self._run_thread and self._run_thread.is_alive():
            self._status.value = self._status_bar("A run is already in progress.", "warn")
            return
        try:
            base_md = self._build_md(dry=True)
            ligs = list(base_md.ligands)
            self._total_ligands = len(ligs)
            if self._total_ligands == 0:
                self._status.value = self._status_bar("No ligands discovered.", "warn")
                return

            # reset aggregate state
            with self._agg_lock:
                self._agg_results = []
                self._processed_ligands = 0
            self._results_html.value = self._results_table_html([])
            self._pbar.value = 0
            self._pbar.max = max(1, self._total_ligands)
            self._run_start_ts = time.time()
            self._stop_flag.clear()
            self._pause_flag.clear()
            self._last_md = base_md
            self._status.value = self._status_bar("Running docking (chunked, async)…", "warn")
            self._update_info_panel(base_md, running=True)

            # start log tailer if enabled
            if self._tail_logs.value:
                self._start_log_tailer_thread()

            chunk = max(1, int(self._chunk_size.value))

            def worker():
                try:
                    for start in range(0, self._total_ligands, chunk):
                        if self._stop_flag.is_set():
                            break
                        # pause support: block between chunks
                        while self._pause_flag.is_set() and not self._stop_flag.is_set():
                            time.sleep(0.2)
                        batch = ligs[start:start + chunk]
                        md = self._build_md(dry=False)
                        md.set_ligands(batch)
                        md.run(n_workers=None, ligands=None)
                        rows = self._normalize_results(md)
                        with self._agg_lock:
                            self._agg_results.extend(rows)
                            self._processed_ligands = min(self._total_ligands, self._processed_ligands + len(batch))
                except Exception as e:
                    self._capture_error("Run error", e, echo=True)

            def monitor():
                last_seen = -1
                while True:
                    if self._stop_flag.is_set():
                        break
                    with self._agg_lock:
                        done = self._processed_ligands
                        total = self._total_ligands
                        rows = list(self._agg_results)
                    if done != last_seen:
                        last_seen = done
                        self._results_html.value = self._results_table_html(rows, self._filter_box.value)
                        self._pbar.value = done
                        self._ticker.value = f"<div>Processed {done}/{total} ligands</div>"
                        if self._run_start_ts and done > 0:
                            elapsed = time.time() - self._run_start_ts
                            rate = elapsed / max(1, done)
                            remain = int(rate * max(0, total - done))
                            self._eta.value = f"<div class='muted'>ETA: ~{remain}s (elapsed {int(elapsed)}s)</div>"
                    if done >= total:
                        break
                    time.sleep(0.5)

                # finished or stopped
                if self._stop_flag.is_set():
                    self._status.value = self._status_bar("Stopped by user.", "warn")
                    self._pbar.bar_style = "warning"
                else:
                    self._status.value = self._status_bar("Docking finished.", "ok")
                    self._pbar.bar_style = "success"

                self._update_info_panel(self._last_md, running=False)

                # run finish callbacks
                for cb in list(self._on_finish_callbacks):
                    try:
                        cb(self)
                    except Exception as e:
                        self._capture_error("on_finish callback error", e, echo=True)

            self._run_thread = threading.Thread(target=worker, daemon=True)
            self._monitor_thread = threading.Thread(target=monitor, daemon=True)
            self._run_thread.start()
            self._monitor_thread.start()

        except Exception as e:
            self._capture_error("Setup run error", e, echo=True)

    # -------------------------
    # Log tailer
    # -------------------------
    def _start_log_tailer_thread(self) -> None:
        if self._log_thread and self._log_thread.is_alive():
            return

        def tailer():
            last_path: Optional[Path] = None
            while (self._run_thread and self._run_thread.is_alive()) or (self._log_thread and self._log_thread is threading.current_thread()):
                try:
                    log_root = Path(self._log_dir.value).expanduser() if self._log_dir.value.strip() else (Path(self._out_dir.value).expanduser() / "logs")
                    latest = None
                    latest_mtime = -1.0
                    if log_root.exists():
                        for p in log_root.rglob("*.log"):
                            try:
                                mt = p.stat().st_mtime
                                if mt > latest_mtime:
                                    latest, latest_mtime = p, mt
                            except Exception:
                                pass
                    if latest is None:
                        with self._log_output:
                            clear_output(wait=True)
                            print("No log files in", str(log_root))
                        time.sleep(1.0)
                        continue
                    if last_path is None or latest != last_path:
                        last_path = latest
                    data = latest.read_text(encoding="utf-8", errors="replace")
                    tail = data[-8000:] if len(data) > 8000 else data
                    with self._log_output:
                        clear_output(wait=True)
                        print(tail)
                        print("\n--- file:", str(latest))
                    time.sleep(1.0)
                except Exception:
                    time.sleep(1.0)

        self._log_thread = threading.Thread(target=tailer, daemon=True)
        self._log_thread.start()

    # -------------------------
    # Preview Top-5 poses with ProVis (optional)
    # -------------------------
    def _preview_top5_in_provis(self) -> None:
        if ProVis is None:
            self._status.value = self._status_bar("ProVis not available: cannot preview.", "warn")
            return
        rows = [r for r in self.results if r.get("best_score") is not None and r.get("out_path")]
        if not rows:
            self._status.value = self._status_bar("No completed poses to preview.", "warn")
            return
        rows_sorted = sorted(rows, key=lambda r: (r["best_score"] if r["best_score"] is not None else 1e9))[:5]
        with self._viz_out:
            clear_output(wait=True)
            try:
                viz = ProVis(vw=self._vw, vh=self._vh)
                rec = self._receptor_path.value.strip()
                if rec and Path(rec).exists():
                    viz.load_receptor(rec).style_preset("publication", surface=False)
                palette = ["#00ffff", "#ff9f1c", "#2ec4b6", "#e71d36", "#9b5de5"]
                for i, r in enumerate(rows_sorted, 1):
                    p = Path(r["out_path"])
                    if not p.exists():
                        continue
                    data = p.read_text(encoding="utf-8", errors="replace")
                    fmt = p.suffix.lstrip(".").lower() or "pdbqt"
                    viz.load_ligand_from_text(data, name=f"{i}. {Path(r['ligand']).name} (best={r['best_score']})", fmt=fmt)
                    viz.highlight_ligand(style="stick", color=self._css_hex_to_threejs(palette[(i - 1) % len(palette)]), radius=0.28)
                viz.set_background(self._bg_hex_for_provis()).show()
            except Exception as e:
                with self._viz_out:
                    print("Preview error:", e)
                    traceback.print_exc()

    def _viz_screenshot(self) -> None:
        js = """
        (function(){
          try{
            const canvases = Array.from(document.querySelectorAll('canvas')).filter(c=>c.width>0 && c.height>0);
            if(!canvases.length){ alert('No canvas found'); return; }
            let best = canvases[0], bestArea = best.width*best.height;
            for(const c of canvases){ const a = c.width*c.height; if(a>bestArea){ best = c; bestArea=a; } }
            const data = best.toDataURL('image/png');
            const a = document.createElement('a'); a.href = data; a.download = 'dock_preview.png';
            document.body.appendChild(a); a.click(); a.remove();
          }catch(e){ console.error(e); alert('Capture failed: '+e); }
        })();
        """
        display(Javascript(js))
        self._status.value = self._status_bar("Viewer screenshot requested (browser will download).", "ok")

    # -------------------------
    # Export / write CSV / report
    # -------------------------
    def _execute_export_report(self, zip_outputs: bool) -> None:
        try:
            if not self._last_md:
                self._status.value = self._status_bar("Nothing to export — no run yet.", "warn")
                return
            csv_path = self._last_md.write_summary(None)
            rows = self.results
            report_html = self._render_report_html(self._last_md, rows)
            report_path = Path(self._last_md.out_dir) / "report.html"
            report_path.write_text(report_html, encoding="utf-8")
            bundle_msg = ""
            if zip_outputs:
                zip_path = Path(self._last_md.out_dir) / "bundle.zip"
                with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
                    zf.write(csv_path, arcname=csv_path.name)
                    zf.write(report_path, arcname=report_path.name)
                    for folder in [self._last_md.out_dir, self._last_md.log_dir]:
                        for p in Path(folder).rglob("*"):
                            if p.is_file():
                                try:
                                    zf.write(p, arcname=str(p.relative_to(self._last_md.out_dir)))
                                except Exception:
                                    pass
                bundle_msg = f" & bundle → {zip_path}"
            self._status.value = self._status_bar(f"Report → {report_path} & CSV → {csv_path}{bundle_msg}", "ok")
            with self._log_output:
                clear_output(wait=True)
                print("Report written:", report_path)
            self._ticker.value = f"<div>Report: {report_path}</div>"
        except Exception as e:
            self._capture_error("Export report error", e, echo=True)

    # -------------------------
    # Build MultipleDock from GUI state
    # -------------------------
    def _build_md(self, dry: bool) -> MultipleDock:
        receptor = Path(self._receptor_path.value).expanduser()
        lig_dir = Path(self._ligand_dir.value).expanduser() if self._ligand_dir.value.strip() else None
        lig_json = self._ligand_list_json.value.strip()
        ligs = None
        if lig_json:
            arr = json.loads(lig_json)
            if not isinstance(arr, list):
                raise ValueError("Ligands JSON must be a list")
            ligs = [str(Path(str(p)).expanduser()) for p in arr]
        backend_key = self._backend.value
        backend_spec = backend_key
        if backend_key == "binary":
            exe = self._custom_backend.value.strip()
            if not exe:
                raise ValueError("Provide custom binary path/name")
            backend_spec = exe
        use_autobox = bool(self._use_autobox.value)
        autobox_ref = Path(self._autobox_ref.value).expanduser() if self._autobox_ref.value.strip() else None
        if use_autobox and backend_key == "vina":
            raise RuntimeError("Autobox unsupported by Vina API")
        center = (float(self._center_x.value), float(self._center_y.value), float(self._center_z.value))
        size = (float(self._size_x.value), float(self._size_y.value), float(self._size_z.value))
        out_dir = Path(self._out_dir.value).expanduser()
        log_dir = Path(self._log_dir.value).expanduser() if self._log_dir.value.strip() else None
        timeout = float(self._timeout.value) if float(self._timeout.value or 0) > 0 else None
        seed = int(self._seed.value) if int(self._seed.value or 0) > 0 else None

        md = MultipleDock(
            receptor=receptor,
            ligand_dir=lig_dir,
            ligands=ligs,
            backend=backend_spec,
            ligand_format=self._lig_format.value,
            filter_pattern=self._filter_pat.value or "*",
            center=None if use_autobox else center,
            size=None if use_autobox else size,
            autobox=use_autobox,
            autobox_ref=autobox_ref,
            autobox_padding=float(self._autobox_pad.value),
            exhaustiveness=int(self._exhaust.value),
            n_poses=int(self._n_poses.value),
            cpu=int(self._cpu.value) if int(self._cpu.value or 0) > 0 else None,
            seed=seed,
            out_dir=out_dir,
            log_dir=log_dir,
            n_workers=int(self._workers.value),
            skip_existing=bool(self._skip_existing.value),
            max_retries=int(self._max_retries.value),
            retry_backoff=float(self._backoff.value),
            timeout=timeout,
            verbose=int(self._verbose.value),
            cache_per_worker=True,
            autorun=False,
            autowrite=False,
        )
        return md

    # -------------------------
    # Visual helpers (theme, layout, cards, status)
    # -------------------------
    def _apply_theme(self, key: str) -> None:
        t = self.THEMES.get(key, self.THEMES["light"])
        if self._ui:
            self._ui.layout = widgets.Layout(padding="6px", border="1px solid transparent", background_color=t["bg"])
        css = f"""
        <style id="dock-gui-theme">
          .provis-root .provis-card {{
            background: {t['card_bg']} !important;
            border: 1px solid {t['card_border']} !important;
            border-radius: 10px; box-shadow: 0 6px 18px rgba(0,0,0,0.06);
          }}
          .provis-root table.docktbl {{ width:100%; border-collapse:collapse; border:1px solid {t['table_border']}; }}
          .provis-root table.docktbl th {{ background:{t['table_head_bg']}; text-align:left; padding:6px; border-bottom:1px solid {t['table_border']}; }}
          .provis-root table.docktbl td {{ padding:6px; border-bottom:1px solid {t['table_border']}; font-family:ui-monospace; font-size:12px; }}
          .provis-root .muted {{ color: {t['mute']}; }}
          .provis-root a {{ color: {t['link']}; text-decoration:none; }}
        </style>
        """
        display(HTML(css))

    def _apply_layout(self, compact: bool) -> None:
        if not (self._root and self._left_col and self._right_col):
            return
        if compact:
            self._left_col.layout = widgets.Layout(width="100%", padding="6px")
            self._right_col.layout = widgets.Layout(width="100%", padding="6px")
            self._root.children = (widgets.VBox([self._left_col, self._right_col]),)
        else:
            self._left_col.layout = widgets.Layout(width="42%", padding="6px")
            self._right_col.layout = widgets.Layout(width="58%", padding="6px")
            self._root.children = (self._left_col, self._right_col)

    def _card(self, title: str, body: widgets.Widget) -> widgets.VBox:
        head = widgets.HTML(f"<h3>{title}</h3>")
        box = widgets.VBox([head, body], layout=widgets.Layout(padding="10px 12px", margin="8px 0"))
        box.add_class("provis-card")
        return box

    def _status_bar(self, text: str, level: str = "ok") -> str:
        col = {"ok": self.THEMES[self._theme.value]["status_ok"], "warn": self.THEMES[self._theme.value]["status_warn"], "err": self.THEMES[self._theme.value]["status_err"]}.get(level, "#999")
        return f"<div style='padding:8px;border-left:4px solid {col};'>{html.escape(text)}</div>"

    def _info_block(self, inner: str) -> str:
        return f"<div style='padding:10px;border:1px dashed #d1d5db;border-radius:6px;margin-top:8px;'>{inner}</div>"

    def _update_info_panel(self, md: Optional[MultipleDock], *, running: bool) -> None:
        """
        Update the right-side info card summarizing the run.
        """
        if not md:
            self._info_panel.value = self._info_block("<i>No run.</i>")
            return
        bdesc = self._describe_backend_for_md()
        out = html.escape(str(md.out_dir))
        log = html.escape(str(md.log_dir))
        bits = [
            f"<b>Backend:</b> {bdesc}",
            f"<b>Workers:</b> {md._n_workers}",
            f"<b>CPU/thread:</b> {md._cpu or '-'}",
            f"<b>Exhaustiveness:</b> {md._exhaustiveness}",
            f"<b>Num poses:</b> {md._num_modes}",
            f"<b>Out:</b> {out}",
            f"<b>Logs:</b> {log}",
        ]
        if md._use_autobox:
            bits.append(f"<b>Box:</b> autobox ref={html.escape(str(md._autobox_ref))}, pad={md._autobox_padding}")
        else:
            bits.append(f"<b>Box:</b> center={md._box_center}, size={md._box_size}")
        if running:
            bits.append("<i>Running…</i>")
        elif self._run_start_ts:
            dur = int(time.time() - self._run_start_ts)
            bits.append(f"<b>Duration:</b> {dur}s")
        self._info_panel.value = self._info_block("<br>".join(bits))

    # -------------------------
    # Helpers: shortcuts, parsing, normalize, css utils
    # -------------------------
    def _inject_shortcuts(self) -> None:
        js = """
        (function(){
          if (window.__dockgui_shortcuts__) return;
          window.__dockgui_shortcuts__ = true;
          document.addEventListener('keydown', function(e){
            try{
              if (e.altKey && e.key && e.key.toLowerCase() === 'u'){
                const btn = Array.from(document.querySelectorAll('button[title="update-shortcut"]'))[0];
                if (btn){ btn.click(); e.preventDefault(); }
              }
              if (e.altKey && e.key && e.key.toLowerCase() === 's'){
                const btn = Array.from(document.querySelectorAll('button[title="savecfg-shortcut"]'))[0];
                if (btn){ btn.click(); e.preventDefault(); }
              }
              if (e.altKey && e.key && e.key.toLowerCase() === 'p'){
                const btn = Array.from(document.querySelectorAll('button[title="pause-shortcut"]'))[0];
                if (btn){ btn.click(); e.preventDefault(); }
              }
            }catch(err){ console.warn('Shortcut error', err); }
          }, true);
        })();
        """
        display(Javascript(js))

    @staticmethod
    def _parse_vina_cfg(text: str) -> Dict[str, float]:
        vals: Dict[str, float] = {}
        for line in text.splitlines():
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            if "=" in line:
                k, v = line.split("=", 1)
            else:
                parts = line.split()
                if len(parts) < 2:
                    continue
                k, v = parts[0], parts[1]
            k = k.strip()
            try:
                vals[k] = float(v)
            except Exception:
                pass
        for k in ("center_x", "center_y", "center_z", "size_x", "size_y", "size_z"):
            if k not in vals:
                raise ValueError(f"Missing '{k}' in cfg.")
        return vals

    def _normalize_results(self, md: MultipleDock) -> List[Dict[str, Any]]:
        out: List[Dict[str, Any]] = []
        for r in md.results:
            out.append({
                "ligand": str(getattr(r, "ligand_path", "")),
                "out_path": str(getattr(r, "out_path", "") or ""),
                "log_path": str(getattr(r, "log_path", "") or ""),
                "best_score": getattr(r, "best_score", None),
                "status": getattr(r, "status", ""),
                "error": getattr(r, "error", None) or "",
                "attempts": getattr(r, "attempts", 0),
            })
        return out

    def _describe_backend_for_md(self) -> str:
        key = self._backend.value
        if key == "binary":
            exe = self._custom_backend.value.strip()
            return f"binary:{exe}" if exe else "binary:<none>"
        return key

    def _bg_hex_for_provis(self) -> str:
        hexc = self.THEMES[self._theme.value]["bg"].lstrip("#")
        if len(hexc) == 3:
            hexc = "".join(ch * 2 for ch in hexc)
        return "0x" + hexc

    def _css_hex_to_threejs(self, hex_color: str) -> str:
        h = hex_color.strip().lstrip("#")
        if len(h) == 3:
            h = "".join(ch * 2 for ch in h)
        return f"0x{h}"

    def _capture_error(self, msg: str, e: Exception, echo: bool = False) -> None:
        tb = traceback.format_exc()
        self._last_error = f"{msg}: {e}\n{tb}"
        self._status.value = self._status_bar(f"{msg}: {e}", "err")
        with self._log_output:
            clear_output(wait=True)
            print(self._last_error)
        if echo:
            print(self._last_error)

    # -------------------------
    # Export/report utilities
    # -------------------------
    def _render_report_html(self, md: MultipleDock, rows: List[Dict[str, Any]]) -> str:
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        title = "Docking Report"
        meta = [f"<b>Generated:</b> {now}", f"<b>Receptor:</b> {html.escape(str(md.receptor))}", f"<b>Backend:</b> {html.escape(self._describe_backend_for_md())}", f"<b>Ligands:</b> {len(md.ligands)}", f"<b>Out:</b> {html.escape(str(md.out_dir))}", f"<b>Logs:</b> {html.escape(str(md.log_dir))}"]
        if md._use_autobox:
            meta.append(f"<b>Autobox:</b> ref={html.escape(str(md._autobox_ref))}, pad={md._autobox_padding}")
        else:
            meta.append(f"<b>Box:</b> center={md._box_center}, size={md._box_size}")
        tbl = self._results_table_html(rows)
        css = "<style>body{font-family:system-ui;padding:18px;}table{width:100%;border-collapse:collapse;border:1px solid #e5e7eb}th{background:#f3f4f6;padding:6px}</style>"
        return f"<!doctype html><html><head><meta charset='utf-8'><title>{title}</title>{css}</head><body><h1>{title}</h1><div>{' • '.join(meta)}</div>{tbl}</body></html>"

    # -------------------------
    # Convenience dunders
    # -------------------------
    def __repr__(self) -> str:
        running = bool(self._run_thread and self._run_thread.is_alive())
        return f"<DockGUI backend={self._backend.value} workers={self._workers.value} running={running}>"

    def __len__(self) -> int:
        with self._agg_lock:
            return len(self._agg_results)

    # -------------------------
    # Properties for external access
    # -------------------------
    @property
    def results(self) -> List[Dict[str, Any]]:
        with self._agg_lock:
            return list(self._agg_results)

    @property
    def summary_path(self) -> Optional[Path]:
        return self._last_md.summary_path if self._last_md else None

    @property
    def last_error(self) -> Optional[str]:
        return self._last_error

In [None]:
from prodock.process.pdbqt_sanitizer import PDBQTSanitizer

s = PDBQTSanitizer("out/BMS-986115.pdbqt")
s.validate()                 # -> list[str] warnings
s.sanitize()                 # -> self (chainable)
# s.write("ligand.sanitized.pdbqt")
s.sanitize_inplace()         # overwrite original file

In [None]:
(32.5, 13.0, 133.75), (22.5, 23.5, 22.5)) \

In [None]:
gui = DockGUI().build().display()

In [None]:
from prodock.io.convert import pdbqt_to_sdf