# SAR Delta Explorer

Explore structure–activity relationships (SAR) for a target:
- Pick a target
- Adjust similarity + fold-change thresholds
- View series grid, common core (MCS), R-group decomposition (best-effort), and activity cliffs

If the API is unreachable from this container, set **API URL** (e.g. `http://host.docker.internal:8000`) and click **Refresh**.


In [None]:
from __future__ import annotations

import os
import textwrap
from typing import Any, Dict, List, Optional

import httpx
import pandas as pd

import ipywidgets as widgets
from IPython.display import display, HTML

from rdkit import Chem
from rdkit.Chem import Draw
from rdkit.Chem import AllChem
from rdkit import RDLogger
RDLogger.DisableLog('rdApp.*')

import logging
logging.getLogger("amprenta_rag.chemistry.rgroup").setLevel(logging.WARNING)

# Optional 3D viewer
try:
    import py3Dmol  # type: ignore
except ImportError:  # graceful fallback if not installed
    py3Dmol = None

# Chemistry helpers (best-effort)
from amprenta_rag.chemistry.rgroup import find_common_core, decompose_rgroups


API_OVERRIDE_URL: Optional[str] = None
compounds_state: List[Dict[str, Any]] = []


def _candidate_api_base_urls() -> List[str]:
    """Try override/env first, then common Docker host + compose service names."""
    urls: List[str] = []

    if API_OVERRIDE_URL:
        urls.append(API_OVERRIDE_URL.rstrip("/"))

    env = os.getenv("AMPRENTA_API_URL") or os.getenv("API_BASE_URL")
    if env:
        urls.append(env.rstrip("/"))

    urls.extend([
        "http://host.docker.internal:8000",
        "http://localhost:8000",
        # best-effort compose service names
        "http://api:8000",
        "http://amprenta-api:8000",
        "http://backend:8000",
    ])

    out: List[str] = []
    for u in urls:
        if u and u not in out:
            out.append(u)
    return out


def api_get(path: str, params: Optional[dict] = None, timeout_s: float = 10.0) -> Any:
    last_err = None
    for base in _candidate_api_base_urls():
        url = f"{base}{path}" if path.startswith("/") else f"{base}/{path}"
        try:
            r = httpx.get(url, params=params, timeout=timeout_s)
            r.raise_for_status()
            return r.json()
        except Exception as e:
            last_err = e
    raise RuntimeError(
        f"Failed API GET {path}. Tried: {_candidate_api_base_urls()}. Last error: {last_err!r}"
    )


def _mol_from_smiles_best_effort(smiles: str):
    """RDKit SMILES parser that tolerates 'can't kekulize' cases."""
    if not smiles:
        return None

    m = Chem.MolFromSmiles(smiles)
    if m is not None:
        return m

    # fallback for aromatic/kekulization issues
    try:
        m = Chem.MolFromSmiles(smiles, sanitize=False)
        if m is None:
            return None
        Chem.SanitizeMol(m, sanitizeOps=Chem.SanitizeFlags.SANITIZE_ALL ^ Chem.SanitizeFlags.SANITIZE_KEKULIZE)
        return m
    except Exception:
        return None


def canonicalize_smiles_best_effort(smiles: str) -> Optional[str]:
    m = _mol_from_smiles_best_effort(smiles)
    if not m:
        return None
    try:
        return Chem.MolToSmiles(m, isomericSmiles=True)
    except Exception:
        return smiles


def _build_conformer(smiles: str):
    """Generate a 3D conformer with MMFF optimization. Returns RDKit Mol or None."""
    m = _mol_from_smiles_best_effort(smiles)
    if m is None:
        return None
    try:
        m = Chem.AddHs(m)
        if AllChem.EmbedMolecule(m, AllChem.ETKDG()) != 0:
            return None
        try:
            AllChem.MMFFOptimizeMolecule(m)
        except Exception:
            pass
        return m
    except Exception:
        return None


def _make_3d_widget(smiles: str, title: str = ""):
    """Return a py3Dmol view widget or fallback HTML if unavailable."""
    if py3Dmol is None:
        return HTML("<i>py3Dmol not installed in this image.</i>")
    mol = _build_conformer(smiles)
    if mol is None:
        return HTML("<i>Could not generate 3D conformer.</i>")
    try:
        mb = Chem.MolToMolBlock(mol)
        view = py3Dmol.view(width=380, height=320)
        view.addModel(mb, "sdf")
        view.setStyle({"stick": {}})
        view.setBackgroundColor("0xeeeeee")
        view.zoomTo()
        if title:
            return widgets.VBox([HTML(f"<b>{title}</b>"), view.show()])
        return view.show()
    except Exception:
        return HTML("<i>3D rendering failed.</i>")


def _make_overlay_widget(smiles1: str, smiles2: str, label1: str = "Mol A", label2: str = "Mol B"):
    """Side-by-side 3D viewers for a pair (best-effort)."""
    left = _make_3d_widget(smiles1, label1)
    right = _make_3d_widget(smiles2, label2)
    return widgets.HBox([left, right])


def mol_grid(smiles_list: List[str], legends: Optional[List[str]] = None, mols_per_row: int = 4, size: int = 220):
    mols = []
    used_legends = []
    for i, smi in enumerate(smiles_list):
        m = _mol_from_smiles_best_effort(smi)
        if m is None:
            continue
        mols.append(m)
        if legends and i < len(legends):
            used_legends.append(legends[i])
        else:
            used_legends.append("")

    if not mols:
        display(HTML("<i>No valid molecules to display.</i>"))
        return

    img = Draw.MolsToGridImage(
        mols,
        molsPerRow=mols_per_row,
        subImgSize=(size, size),
        legends=used_legends,
    )
    display(img)


In [None]:
# Demo fallback data (so the dashboard works even if SAR API routes are not present)
DEMO_TARGET = "DEMO"
DEMO_COMPOUNDS: List[Dict[str, Any]] = [
    {"compound_id": "DEMO-001", "smiles": "Fc1ccc2[nH]c3ccccc3n2c1", "ic50": 50.0, "units": "nM"},
    {"compound_id": "DEMO-002", "smiles": "Clc1ccc2[nH]c3ccccc3n2c1", "ic50": 1000.0, "units": "nM"},
    {"compound_id": "DEMO-003", "smiles": "COc1ccc2[nH]c3ccccc3n2c1", "ic50": 120.0, "units": "nM"},
    {"compound_id": "DEMO-004", "smiles": "Cc1ccc2[nH]c3ccccc3n2c1", "ic50": 90.0, "units": "nM"},
]


def load_targets() -> List[Dict[str, Any]]:
    try:
        targets = api_get("/api/v1/sar/targets")
        if isinstance(targets, list):
            targets = [t for t in targets if t.get("target")]
            targets.sort(key=lambda x: x.get("compound_count") or 0, reverse=True)
            if targets:
                return targets
    except Exception:
        pass

    # fallback to demo mode
    return [{"target": DEMO_TARGET, "compound_count": len(DEMO_COMPOUNDS)}]


def make_target_options(targets: List[Dict[str, Any]]):
    return [(f"{t['target']} ({t.get('compound_count', 0)} compounds)", t["target"]) for t in (targets or [])]


def get_compounds_for_target(target: str) -> List[Dict[str, Any]]:
    if target == DEMO_TARGET:
        return DEMO_COMPOUNDS
    return api_get(f"/api/v1/sar/targets/{target}/compounds")


def get_cliffs_for_target(target: str, similarity_threshold: float, fold_change: float) -> List[Dict[str, Any]]:
    if target != DEMO_TARGET:
        return api_get(
            f"/api/v1/sar/targets/{target}/cliffs",
            params={"similarity_threshold": similarity_threshold, "fold_change": fold_change},
        )

    # local cliffs for demo
    from rdkit.Chem import AllChem
    from rdkit import DataStructs

    data = [c for c in DEMO_COMPOUNDS if c.get("smiles") and c.get("ic50")]
    fp = {}
    for c in data:
        m = Chem.MolFromSmiles(c["smiles"])
        if not m:
            continue
        fp[c["compound_id"]] = (c, AllChem.GetMorganFingerprintAsBitVect(m, 2, nBits=2048))

    ids = list(fp.keys())
    cliffs = []
    for i, id1 in enumerate(ids):
        c1, fp1 = fp[id1]
        for id2 in ids[i+1:]:
            c2, fp2 = fp[id2]
            sim = float(DataStructs.TanimotoSimilarity(fp1, fp2))
            if sim < similarity_threshold:
                continue
            v1 = float(c1["ic50"])
            v2 = float(c2["ic50"])
            if v1 <= 0 or v2 <= 0:
                continue
            fc = max(v1 / v2, v2 / v1)
            if fc < fold_change:
                continue
            cliffs.append({
                "compound_1": c1["compound_id"],
                "smiles_1": c1["smiles"],
                "activity_1": v1,
                "compound_2": c2["compound_id"],
                "smiles_2": c2["smiles"],
                "activity_2": v2,
                "similarity": round(sim, 3),
                "fold_change": round(fc, 2),
                "assay_id": None,
            })

    return cliffs


In [None]:
# Widgets

api_url_txt = widgets.Text(
    value=os.getenv("AMPRENTA_API_URL") or os.getenv("API_BASE_URL") or "http://host.docker.internal:8000",
    description="API URL:",
    layout=widgets.Layout(width="620px"),
)

_targets = load_targets()

target_dd = widgets.Dropdown(
    options=make_target_options(_targets),
    description="Target:",
    layout=widgets.Layout(width="620px"),
)

similarity = widgets.FloatSlider(
    value=0.6,
    min=0.0,
    max=1.0,
    step=0.05,
    description="Similarity:",
    readout_format=".2f",
    continuous_update=False,
    layout=widgets.Layout(width="620px"),
)

fold_change = widgets.FloatLogSlider(
    value=10.0,
    base=10,
    min=0,   # 10^0 = 1
    max=2,   # 10^2 = 100
    step=0.1,
    description="Fold Δ:",
    continuous_update=False,
    layout=widgets.Layout(width="620px"),
)

refresh_btn = widgets.Button(description="Refresh", button_style="primary")

status = widgets.HTML(value="")
out = widgets.Output()

# 3D viewer controls
mol_dd = widgets.Dropdown(
    options=[("Select molecule", "")],
    description="3D molecule:",
    layout=widgets.Layout(width="620px"),
)
view_btn = widgets.Button(description="Show 3D", button_style="info")
viewer_out = widgets.Output()
viewer_controls = widgets.HBox([mol_dd, view_btn])

ui = widgets.VBox([
    api_url_txt,
    target_dd,
    similarity,
    fold_change,
    refresh_btn,
    status,
    out,
    widgets.HTML("<h4>3D conformer viewer</h4>"),
    viewer_controls,
    viewer_out,
])


In [None]:
def render_dashboard(*_):
    global API_OVERRIDE_URL, compounds_state
    API_OVERRIDE_URL = (api_url_txt.value or "").strip() or None

    tgt = target_dd.value
    sim = float(similarity.value)
    fc = float(fold_change.value)

    # Clear 3D viewer when refreshing target
    viewer_out.clear_output()

    with out:
        out.clear_output()

        # Reload target list each refresh (so the dropdown becomes real once API is reachable)
        targets = load_targets()
        target_dd.options = make_target_options(targets)

        if not tgt:
            tgt = target_dd.value

        status.value = f"<b>Loading</b> target={tgt} similarity≥{sim:.2f} fold_change≥{fc:.2f} …"

        # Fetch compounds
        compounds = get_compounds_for_target(tgt)
        df = pd.DataFrame(compounds or [])

        if df.empty:
            status.value = "<b>No compounds</b> returned for this target."
            mol_dd.options = [("Select molecule", "")]
            return

        df = df[df["smiles"].notna()]
        if "ic50" in df.columns:
            df = df[df["ic50"].notna()]
        df = df.reset_index(drop=True)
        compounds_state = df.to_dict("records")

        # Update 3D dropdown options
        mol_opts = []
        for rec in compounds_state:
            cid = rec.get("compound_id") or ""
            smi = rec.get("smiles") or ""
            if not smi:
                continue
            ic50 = rec.get("ic50")
            label = f"{cid} (IC50={ic50})" if ic50 is not None else cid
            mol_opts.append((label, smi))
        mol_dd.options = mol_opts or [("No molecules", "")]

        status.value = f"<b>Loaded</b> {len(df)} compounds."

        # Series Overview
        display(HTML("<h3>Series overview</h3>"))
        show_n = min(20, len(df))
        legends = []
        for i in range(show_n):
            cid = df.loc[i, "compound_id"]
            ic50 = df.loc[i, "ic50"] if "ic50" in df.columns else None
            legends.append(f"{cid}\nIC50={ic50}")
        mol_grid(df.loc[: show_n - 1, "smiles"].tolist(), legends=legends, mols_per_row=4)

        # Common core (MCS) (best-effort)
        display(HTML("<h3>Common core (MCS)</h3>"))
        raw_smiles_list = df.loc[: show_n - 1, "smiles"].tolist()
        smiles_list = [s for s in (canonicalize_smiles_best_effort(s) for s in raw_smiles_list) if s]
        core_smarts = find_common_core(smiles_list)
        if not core_smarts:
            display(HTML("<i>No common core found (or RDKit unavailable).</i>"))
        else:
            display(HTML(f"<pre style='white-space:pre-wrap'>{core_smarts}</pre>"))
            try:
                core_mol = Chem.MolFromSmarts(core_smarts)
                if core_mol:
                    display(Draw.MolToImage(core_mol, size=(320, 240)))
            except Exception:
                pass

        # R-group decomposition (best-effort)
        display(HTML("<h3>R-group grid (best-effort)</h3>"))
        if core_smarts:
            decomp = decompose_rgroups(smiles_list, core_smarts)
            rdf = pd.DataFrame(decomp)
            if not rdf.empty:
                merged = rdf.merge(df[["smiles", "compound_id", "ic50"]], on="smiles", how="left")
                merged = merged.sort_values(by="ic50", ascending=True, na_position="last")
                display(merged.head(30))
            else:
                display(HTML("<i>Decomposition returned no rows.</i>"))
        else:
            display(HTML("<i>Need a core SMARTS to decompose.</i>"))

        # Activity cliffs
        display(HTML("<h3>Activity cliffs</h3>"))
        cliffs = get_cliffs_for_target(tgt, similarity_threshold=sim, fold_change=fc)
        cdf = pd.DataFrame(cliffs or [])
        if cdf.empty:
            display(HTML("<i>No cliffs found at current thresholds. Try lowering similarity or fold-change.</i>"))
            display(HTML("<p style='margin-top:0.5rem'>Hint: similarity 0.30–0.50 often yields results in small demo sets.</p>"))
            return

        display(cdf.sort_values(by=["fold_change", "similarity"], ascending=False).head(25))

        top = cdf.sort_values(by=["fold_change", "similarity"], ascending=False).head(6)
        for _, row in top.iterrows():
            display(HTML(
                f"<b>{row.get('compound_1')}</b> vs <b>{row.get('compound_2')}</b> | sim={row.get('similarity')} | fold={row.get('fold_change')}"
            ))
            mol_grid([row.get("smiles_1"), row.get("smiles_2")], legends=["1", "2"], mols_per_row=2, size=260)
            display(_make_overlay_widget(row.get("smiles_1", ""), row.get("smiles_2", ""), "Compound 1", "Compound 2"))


def _on_view_clicked(_):
    smiles = mol_dd.value
    viewer_out.clear_output()
    if not smiles:
        return
    with viewer_out:
        display(_make_3d_widget(smiles))


def _on_refresh(_):
    try:
        render_dashboard()
    except Exception as e:
        with out:
            out.clear_output()
            status.value = f"<b style='color:#b00020'>Error:</b> {e!r}"
            display(HTML("<pre style='white-space:pre-wrap'>" + textwrap.dedent(str(e)) + "</pre>"))


view_btn.on_click(_on_view_clicked)
refresh_btn.on_click(_on_refresh)

display(ui)

# initial render
_on_refresh(None)



## Notes

- This dashboard will use the SAR API if available at `/api/v1/sar/*`.
- If the SAR API routes are not available, it falls back to an in-notebook **DEMO** dataset.
