## Import packages

In [1]:
from dataclasses import dataclass, field
import math
from typing import Callable, Iterable

from lightning import pytorch as pl
import numpy as np
import pandas as pd
from rdkit import Chem
import torch

from chemprop import data, models
from chemprop.models import MPNN

## Define helper function to make model predictions from SMILES

In [2]:
def make_prediction(
    models: list[MPNN],
    trainer: pl.Trainer,
    smiles: list[str],
) -> np.ndarray:
    """Makes predictions on a list of SMILES.

    Parameters
    ----------
    models : list
        A list of models to make predictions with.
    smiles : list
        A list of SMILES to make predictions on.

    Returns
    -------
    list[list[float]]
        A list of lists containing the predicted values.
    """

    test_data = [data.MoleculeDatapoint.from_smi(smi) for smi in smiles]
    test_dset = data.MoleculeDataset(test_data)
    test_loader = data.build_dataloader(
        test_dset, batch_size=1, num_workers=0, shuffle=False
    )

    with torch.inference_mode():
        sum_preds = []
        for model in models:
            predss = trainer.predict(model, test_loader)
            preds = torch.cat(predss, 0)
            preds = preds.cpu().numpy()
            sum_preds.append(preds)

        # Ensemble predictions
        sum_preds = sum(sum_preds)
        avg_preds = sum_preds / len(models)

    return avg_preds

## Classes/functions relevant to Monte Carlo Tree Search

Mostly similar to the scripts from Chemprop v1 [interpret.py](https://github.com/chemprop/chemprop/blob/master/chemprop/interpret.py) with additional documentation

In [3]:
@dataclass
class MCTSNode:
    """Represents a node in a Monte Carlo Tree Search.

    Parameters
    ----------
    smiles : str
        The SMILES for the substructure at this node.
    atoms : list
        A list of atom indices in the substructure at this node.
    W : float
        The total action value, which indicates how likely the deletion will lead to a good rationale.
    N : int
        The visit count, which indicates how many times this node has been visited. It is used to balance exploration and exploitation.
    P : float
        The predicted property score of the new subgraphs' after the deletion, shown as R in the original paper.
    """

    smiles: str
    atoms: Iterable[int]
    W: float = 0
    N: int = 0
    P: float = 0
    children: list[...] = field(default_factory=list)

    def __post_init__(self):
        self.atoms = set(self.atoms)

    def Q(self) -> float:
        """
        Returns
        -------
        float
            The mean action value of the node.
        """
        return self.W / self.N if self.N > 0 else 0

    def U(self, n: int, c_puct: float = 10.0) -> float:
        """
        Parameters
        ----------
        n : int
            The sum of the visit count of this node's siblings.
        c_puct : float
            A constant that controls the level of exploration.
        
        Returns
        -------
        float
            The exploration value of the node.
        """
        return c_puct * self.P * math.sqrt(n) / (1 + self.N)

In [4]:
def find_clusters(mol: Chem.Mol) -> tuple[list[tuple[int, ...]], list[list[int]]]:
    """Finds clusters within the molecule. Jin et al. from [1]_ only allows deletion of one peripheral non-aromatic bond or one peripheral ring from each state,
    so the clusters here are defined as non-ring bonds and the smallest set of smallest rings.

    Parameters
    ----------
    mol : RDKit molecule
        The molecule to find clusters in.

    Returns
    -------
    tuple
        A tuple containing:
        - list of tuples: Each tuple contains atoms in a cluster.
        - list of int: Each atom's cluster index.
    
    References
    ----------
    .. [1] Jin, Wengong, Regina Barzilay, and Tommi Jaakkola. "Multi-objective molecule generation using interpretable substructures." International conference on machine learning. PMLR, 2020. https://arxiv.org/abs/2002.03244
    """

    n_atoms = mol.GetNumAtoms()
    if n_atoms == 1:  # special case
        return [(0,)], [[0]]

    clusters = []
    for bond in mol.GetBonds():
        a1 = bond.GetBeginAtom().GetIdx()
        a2 = bond.GetEndAtom().GetIdx()
        if not bond.IsInRing():
            clusters.append((a1, a2))

    ssr = [tuple(x) for x in Chem.GetSymmSSSR(mol)]
    clusters.extend(ssr)

    atom_cls = [[] for _ in range(n_atoms)]
    for i in range(len(clusters)):
        for atom in clusters[i]:
            atom_cls[atom].append(i)

    return clusters, atom_cls

In [5]:
def extract_subgraph_from_mol(mol: Chem.Mol, selected_atoms: set[int]) -> tuple[Chem.Mol, list[int]]:
    """Extracts a subgraph from an RDKit molecule given a set of atom indices.

    Parameters
    ----------
    mol : RDKit molecule
        The molecule from which to extract a subgraph.
    selected_atoms : list of int
        The indices of atoms which form the subgraph to be extracted.

    Returns
    -------
    tuple
        A tuple containing:
        - RDKit molecule: The subgraph.
        - list of int: Root atom indices from the selected indices.
    """

    selected_atoms = set(selected_atoms)
    roots = []
    for idx in selected_atoms:
        atom = mol.GetAtomWithIdx(idx)
        bad_neis = [y for y in atom.GetNeighbors() if y.GetIdx() not in selected_atoms]
        if len(bad_neis) > 0:
            roots.append(idx)

    new_mol = Chem.RWMol(mol)

    for atom_idx in roots:
        atom = new_mol.GetAtomWithIdx(atom_idx)
        atom.SetAtomMapNum(1)
        aroma_bonds = [
            bond for bond in atom.GetBonds() if bond.GetBondType() == Chem.rdchem.BondType.AROMATIC
        ]
        aroma_bonds = [
            bond
            for bond in aroma_bonds
            if bond.GetBeginAtom().GetIdx() in selected_atoms
            and bond.GetEndAtom().GetIdx() in selected_atoms
        ]
        if len(aroma_bonds) == 0:
            atom.SetIsAromatic(False)

    remove_atoms = [
        atom.GetIdx() for atom in new_mol.GetAtoms() if atom.GetIdx() not in selected_atoms
    ]
    remove_atoms = sorted(remove_atoms, reverse=True)
    for atom in remove_atoms:
        new_mol.RemoveAtom(atom)

    return new_mol.GetMol(), roots

In [6]:
def extract_subgraph(smiles: str, selected_atoms: set[int]) -> tuple[str, list[int]]:
    """Extracts a subgraph from a SMILES given a set of atom indices.

    Parameters
    ----------
    smiles : str
        The SMILES string from which to extract a subgraph.
    selected_atoms : list of int
        The indices of atoms which form the subgraph to be extracted.

    Returns
    -------
    tuple
        A tuple containing:
        - str: SMILES representing the subgraph.
        - list of int: Root atom indices from the selected indices.
    """
    # try with kekulization
    mol = Chem.MolFromSmiles(smiles)
    Chem.Kekulize(mol)
    subgraph, roots = extract_subgraph_from_mol(mol, selected_atoms)
    try:
        subgraph = Chem.MolToSmiles(subgraph, kekuleSmiles=True)
        subgraph = Chem.MolFromSmiles(subgraph)
    except Exception:
        subgraph = None

    mol = Chem.MolFromSmiles(smiles)  # de-kekulize
    if subgraph is not None and mol.HasSubstructMatch(subgraph):
        return Chem.MolToSmiles(subgraph), roots

    # If fails, try without kekulization
    subgraph, roots = extract_subgraph_from_mol(mol, selected_atoms)
    subgraph = Chem.MolToSmiles(subgraph)
    subgraph = Chem.MolFromSmiles(subgraph)

    if subgraph is not None:
        return Chem.MolToSmiles(subgraph), roots
    else:
        return None, None

In [7]:
def mcts_rollout(
    node: MCTSNode,
    state_map: dict[str, MCTSNode],
    orig_smiles: str,
    clusters: list[set[int]],
    atom_cls: list[set[int]],
    nei_cls: list[set[int]],
    scoring_function: Callable[[list[str]], list[float]],
    min_atoms: int = 15,
    c_puct: float = 10.0,
) -> float:
    """A Monte Carlo Tree Search rollout from a given MCTSNode.

    Parameters
    ----------
    node : MCTSNode
        The MCTSNode from which to begin the rollout.
    state_map : dict
        A mapping from SMILES to MCTSNode.
    orig_smiles : str
        The original SMILES of the molecule.
    clusters : list
        Clusters of atoms.
    atom_cls : list
        Atom indices in the clusters.
    nei_cls : list
        Neighboring cluster indices.
    scoring_function : function
        A function for scoring subgraph SMILES using a Chemprop model.
    min_atoms : int
        The minimum number of atoms in a subgraph.
    c_puct : float
        The constant controlling the level of exploration.

    Returns
    -------
    float
        The score of this MCTS rollout.
    """
    # Return if the number of atoms is less than the minimum
    cur_atoms = node.atoms
    if len(cur_atoms) <= min_atoms:
        return node.P

    # Expand if this node has never been visited
    if len(node.children) == 0:
        # Cluster indices whose all atoms are present in current subgraph
        cur_cls = set([i for i, x in enumerate(clusters) if x <= cur_atoms])

        for i in cur_cls:
            # Leaf atoms are atoms that are only involved in one cluster.
            leaf_atoms = [a for a in clusters[i] if len(atom_cls[a] & cur_cls) == 1]

            # This checks
            # 1. If there is only one neighbor cluster in the current subgraph (so that we don't produce unconnected graphs), or
            # 2. If the cluster has only two atoms and the current subgraph has only one leaf atom.
            # If either of the conditions is met, remove the leaf atoms in the current cluster.
            if len(nei_cls[i] & cur_cls) == 1 or len(clusters[i]) == 2 and len(leaf_atoms) == 1:
                new_atoms = cur_atoms - set(leaf_atoms)
                new_smiles, _ = extract_subgraph(orig_smiles, new_atoms)
                if new_smiles in state_map:
                    new_node = state_map[new_smiles]  # merge identical states
                else:
                    new_node = MCTSNode(new_smiles, new_atoms)
                if new_smiles:
                    node.children.append(new_node)

        state_map[node.smiles] = node
        if len(node.children) == 0:
            return node.P  # cannot find leaves

        scores = scoring_function([x.smiles for x in node.children])
        for child, score in zip(node.children, scores):
            child.P = score

    sum_count = sum(c.N for c in node.children)
    selected_node = max(node.children, key=lambda x: x.Q() + x.U(sum_count, c_puct=c_puct))
    v = mcts_rollout(
        selected_node,
        state_map,
        orig_smiles,
        clusters,
        atom_cls,
        nei_cls,
        scoring_function,
        min_atoms=min_atoms,
        c_puct=c_puct,
    )
    selected_node.W += v
    selected_node.N += 1

    return v

In [8]:
def mcts(
    smiles: str,
    scoring_function: Callable[[list[str]], list[float]],
    n_rollout: int,
    max_atoms: int,
    prop_delta: float,
    min_atoms: int = 15,
    c_puct: int = 10,
) -> list[MCTSNode]:
    """Runs the Monte Carlo Tree Search algorithm.

    Parameters
    ----------
    smiles : str
        The SMILES of the molecule to perform the search on.
    scoring_function : function
        A function for scoring subgraph SMILES using a Chemprop model.
    n_rollout : int
        The number of MCTS rollouts to perform.
    max_atoms : int
        The maximum number of atoms allowed in an extracted rationale.
    prop_delta : float
        The minimum required property value for a satisfactory rationale.
    min_atoms : int
        The minimum number of atoms in a subgraph.
    c_puct : float
        The constant controlling the level of exploration.

    Returns
    -------
    list
        A list of rationales each represented by a MCTSNode.
    """

    mol = Chem.MolFromSmiles(smiles)

    clusters, atom_cls = find_clusters(mol)
    nei_cls = [0] * len(clusters)
    for i, cls in enumerate(clusters):
        nei_cls[i] = [nei for atom in cls for nei in atom_cls[atom]]
        nei_cls[i] = set(nei_cls[i]) - {i}
        clusters[i] = set(list(cls))
    for a in range(len(atom_cls)):
        atom_cls[a] = set(atom_cls[a])

    root = MCTSNode(smiles, set(range(mol.GetNumAtoms())))
    state_map = {smiles: root}
    for _ in range(n_rollout):
        mcts_rollout(
            root,
            state_map,
            smiles,
            clusters,
            atom_cls,
            nei_cls,
            scoring_function,
            min_atoms=min_atoms,
            c_puct=c_puct,
        )

    rationales = [
        node
        for _, node in state_map.items()
        if len(node.atoms) <= max_atoms and node.P >= prop_delta
    ]

    return rationales

## Load model

In [9]:
model_path = "outputs/strategy_3/model_0/best.pt"

In [10]:
mpnn = models.MPNN.load_from_file(model_path)
mpnn

MPNN(
  (message_passing): BondMessagePassing(
    (W_i): Linear(in_features=86, out_features=2048, bias=False)
    (W_h): Linear(in_features=2048, out_features=2048, bias=False)
    (W_o): Linear(in_features=2120, out_features=2048, bias=True)
    (dropout): Dropout(p=0.0, inplace=False)
    (tau): ReLU()
    (V_d_transform): Identity()
    (graph_transform): Identity()
  )
  (agg): MeanAggregation()
  (bn): Identity()
  (predictor): BinaryClassificationFFN(
    (ffn): MLP(
      (0): Sequential(
        (0): Linear(in_features=2048, out_features=1800, bias=True)
      )
      (1): Sequential(
        (0): ReLU()
        (1): Dropout(p=0.0, inplace=False)
        (2): Linear(in_features=1800, out_features=1800, bias=True)
      )
      (2): Sequential(
        (0): ReLU()
        (1): Dropout(p=0.0, inplace=False)
        (2): Linear(in_features=1800, out_features=1, bias=True)
      )
    )
    (criterion): BCELoss(task_weights=[[1.0]])
    (output_transform): Identity()
  )
  (X_d_t

## Load data to run interpretation for

In [11]:
test_path = "testing.csv"
smiles_column = "smiles"

In [12]:
df_test = pd.read_csv(test_path)
df_test

Unnamed: 0,smiles,is_toxic
0,C#Cc1cccc(N=c2[nH]cnc3cc(OCCOC)c(OCCOC)cc23)c1,1
1,CC1OC1P(=O)(O)O,1
2,CC(C)CC(N(C)C)C1(c2ccc(Cl)cc2)CCC1,1
3,COCCCOc1cc(CC(CC(N)C(O)CC(C(=O)NCC(C)(C)C(N)=O...,1
4,CNC1CCC(c2ccc(Cl)c(Cl)c2)c2ccccc21,1
...,...,...
218,ClC1C(Cl)C(Cl)C(Cl)C(Cl)C1Cl,0
219,Oc1c(C2=CCC(c3ccc(Cl)cc3)CC2)c(O)c2ccccc2c1O,0
220,C=Cc1ccc(S(=O)(=O)O)cc1,0
221,OCC(O)c1oc(O)c(O)c1O,0


## Set up trainer

In [13]:
trainer = pl.Trainer(logger=None, enable_progress_bar=False, devices=1)

💡 Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


# Running interpretation

In [14]:
# MCTS options
rollout = 10  # number of MCTS rollouts to perform. If mol.GetNumAtoms() > 50, consider setting n_rollout = 1 to avoid long computation time

c_puct = 10.0  # constant that controls the level of exploration

max_atoms = 20  # maximum number of atoms allowed in an extracted rationale

min_atoms = 8  # minimum number of atoms in an extracted rationale

prop_delta = 0.5  # Minimum score to count as positive.
# In this algorithm, if the predicted property from the substructure if larger than prop_delta, the substructure is considered satisfactory.
# This value depends on the property you want to interpret.

num_rationales_to_keep = 1  # number of rationales to keep for each molecule

In [15]:
# Define the scoring function. "Score" for a substructure is the predicted property value of the substructure.

models = [mpnn]

property_for_interpretation = "is_toxic"

property_id = (
    df_test.columns.get_loc(property_for_interpretation) - 1
)  # property index in the dataset; -1 for the SMILES column


def scoring_function(smiles: list[str]) -> list[float]:
    return make_prediction(
        models=models,
        trainer=trainer,
        smiles=smiles,
    )[:, property_id]

In [16]:
# only use the first 5 SMILES for demonstration purposes
all_smiles = df_test[smiles_column].tolist()
all_smiles

['C#Cc1cccc(N=c2[nH]cnc3cc(OCCOC)c(OCCOC)cc23)c1',
 'CC1OC1P(=O)(O)O',
 'CC(C)CC(N(C)C)C1(c2ccc(Cl)cc2)CCC1',
 'COCCCOc1cc(CC(CC(N)C(O)CC(C(=O)NCC(C)(C)C(N)=O)C(C)C)C(C)C)ccc1OC',
 'CNC1CCC(c2ccc(Cl)c(Cl)c2)c2ccccc21',
 'NCC1(CC(=O)O)CCCCC1',
 'Cc1ccc(-n2[nH]c(C)c(N=Nc3cccc(-c4cccc(C(=O)O)c4)c3O)c2=O)cc1C',
 'CC(=O)Oc1cccc2c1C(=O)c1c(OC(C)=O)cc(C(=O)O)cc1C2=O',
 'NS(=O)(=O)Cc1noc2ccccc12',
 'CC(C)(O)c1ccccc1CCC(SCC1(CC(=O)O)CC1)c1cccc(C=Cc2ccc3ccc(Cl)cc3n2)c1',
 'Cc1oncc1C(=O)Nc1ccc(C(F)(F)F)cc1',
 'COc1cccc2c1C1CN(CCCCn3c(=O)[nH]c4c(sc5ncc(-c6ccccc6)nc54)c3=O)CC1CO2',
 'CN1C(=O)CCS(=O)(=O)C1c1ccc(Cl)cc1',
 'CCc1[nH]c(=N)[nH]c(=N)c1-c1ccc(Cl)cc1',
 'CN(C)CCCC1(c2ccc(F)cc2)OCc2cc(C#N)ccc21',
 'Clc1ccccc1C(c1ccccc1)(c1ccccc1)n1ccnc1',
 'NC(O)=NO',
 'CNC(CC(C)C)C(=O)NC1C(=O)NC(CC(N)=O)C(=O)NC2C(=O)Nc3c(O)[nH]c(c(O)[nH]c(=C(O)O)c4cc(O)cc(O)c4c4cc3ccc4O)C(O)c3ccc(c(Cl)c3)Oc3cc2cc(c3OC2OC(CO)C(O)C(O)C2OC2CC(C)(N)C(O)C(C)O2)Oc2ccc(cc2Cl)C1O',
 'CN(CCCCCCCCCCN(C)C(=O)Oc1ccccc1[N+](C)(C)C)C(=O)

In [17]:
results_df = {"smiles": [], property_for_interpretation: []}

for i in range(num_rationales_to_keep):
    results_df[f"rationale_{i}"] = []
    results_df[f"rationale_{i}_score"] = []
    results_df[f"rationale_{i}_atoms"] = []

for smiles in all_smiles:
    score = scoring_function([smiles])[0]
    if score > prop_delta:
        rationales = mcts(
            smiles=smiles,
            scoring_function=scoring_function,
            n_rollout=rollout,
            max_atoms=max_atoms,
            prop_delta=prop_delta,
            min_atoms=min_atoms,
            c_puct=c_puct,
        )
    else:
        rationales = []

    results_df["smiles"].append(smiles)
    results_df[property_for_interpretation].append(score)

    if len(rationales) == 0:
        for i in range(num_rationales_to_keep):
            results_df[f"rationale_{i}"].append(None)
            results_df[f"rationale_{i}_score"].append(None)
            results_df[f"rationale_{i}_atoms"].append(None)
    else:
        min_size = min(len(x.atoms) for x in rationales)
        min_rationales = [x for x in rationales if len(x.atoms) == min_size]
        rats = sorted(min_rationales, key=lambda x: x.P, reverse=True)

        for i in range(num_rationales_to_keep):
            if i < len(rats):
                results_df[f"rationale_{i}"].append(rats[i].smiles)
                results_df[f"rationale_{i}_score"].append(rats[i].P)
                results_df[f"rationale_{i}_atoms"].append(list(rats[i].atoms))
            else:
                results_df[f"rationale_{i}"].append(None)
                results_df[f"rationale_{i}_score"].append(None)
                results_df[f"rationale_{i}_atoms"].append(None)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
/home/jackson/miniconda3/envs/chemprop_v2_live/lib/python3.11/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:425: The 'predict_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
LOCAL_RANK: 0 - CUDA_VISIBLE_D

In [18]:
results_df = pd.DataFrame(results_df)
results_df["is_toxic_proba"] = results_df["is_toxic"]
results_df["is_toxic"] = results_df["is_toxic"].gt(0.5).astype(int)

In [19]:
results_df

Unnamed: 0,smiles,is_toxic,rationale_0,rationale_0_score,rationale_0_atoms,is_toxic_proba
0,C#Cc1cccc(N=c2[nH]cnc3cc(OCCOC)c(OCCOC)cc23)c1,1,c1cc(N=[CH2:1])cc([CH3:1])c1,0.550236,"[1, 2, 3, 4, 5, 6, 7, 8, 28]",0.588413
1,CC1OC1P(=O)(O)O,1,,,,0.563770
2,CC(C)CC(N(C)C)C1(c2ccc(Cl)cc2)CCC1,1,Clc1ccc([CH:1]2CCC2)cc1,0.544160,"[8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]",0.556251
3,COCCCOc1cc(CC(CC(N)C(O)CC(C(=O)NCC(C)(C)C(N)=O...,1,c1cc([OH:1])[cH:1]cc1C[CH3:1],0.601331,"[34, 35, 36, 37, 6, 7, 8, 9, 10]",0.558771
4,CNC1CCC(c2ccc(Cl)c(Cl)c2)c2ccccc21,1,Clc1ccc([CH3:1])cc1Cl,0.623248,"[5, 6, 7, 8, 9, 10, 11, 12, 13]",0.739185
...,...,...,...,...,...,...
218,ClC1C(Cl)C(Cl)C(Cl)C(Cl)C1Cl,1,ClC1[CH2:1]C(Cl)[CH2:1]C(Cl)[CH2:1]1,0.710011,"[1, 2, 3, 4, 6, 7, 8, 10, 11]",0.777693
219,Oc1c(C2=CCC(c3ccc(Cl)cc3)CC2)c(O)c2ccccc2c1O,1,c1ccc2c(c1)[cH:1][cH:1][cH:1][cH:1]2,0.668836,"[1, 2, 16, 18, 19, 20, 21, 22, 23, 24]",0.667687
220,C=Cc1ccc(S(=O)(=O)O)cc1,0,,,,0.348093
221,OCC(O)c1oc(O)c(O)c1O,0,,,,0.477462


In [20]:
visual_df = pd.merge(results_df, df_test, on="smiles", suffixes=('_pred', '_true'))
visual_df

Unnamed: 0,smiles,is_toxic_pred,rationale_0,rationale_0_score,rationale_0_atoms,is_toxic_proba,is_toxic_true
0,C#Cc1cccc(N=c2[nH]cnc3cc(OCCOC)c(OCCOC)cc23)c1,1,c1cc(N=[CH2:1])cc([CH3:1])c1,0.550236,"[1, 2, 3, 4, 5, 6, 7, 8, 28]",0.588413,1
1,CC1OC1P(=O)(O)O,1,,,,0.563770,1
2,CC(C)CC(N(C)C)C1(c2ccc(Cl)cc2)CCC1,1,Clc1ccc([CH:1]2CCC2)cc1,0.544160,"[8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]",0.556251,1
3,COCCCOc1cc(CC(CC(N)C(O)CC(C(=O)NCC(C)(C)C(N)=O...,1,c1cc([OH:1])[cH:1]cc1C[CH3:1],0.601331,"[34, 35, 36, 37, 6, 7, 8, 9, 10]",0.558771,1
4,CNC1CCC(c2ccc(Cl)c(Cl)c2)c2ccccc21,1,Clc1ccc([CH3:1])cc1Cl,0.623248,"[5, 6, 7, 8, 9, 10, 11, 12, 13]",0.739185,1
...,...,...,...,...,...,...,...
218,ClC1C(Cl)C(Cl)C(Cl)C(Cl)C1Cl,1,ClC1[CH2:1]C(Cl)[CH2:1]C(Cl)[CH2:1]1,0.710011,"[1, 2, 3, 4, 6, 7, 8, 10, 11]",0.777693,0
219,Oc1c(C2=CCC(c3ccc(Cl)cc3)CC2)c(O)c2ccccc2c1O,1,c1ccc2c(c1)[cH:1][cH:1][cH:1][cH:1]2,0.668836,"[1, 2, 16, 18, 19, 20, 21, 22, 23, 24]",0.667687,0
220,C=Cc1ccc(S(=O)(=O)O)cc1,0,,,,0.348093,0
221,OCC(O)c1oc(O)c(O)c1O,0,,,,0.477462,0


In [21]:
# if no rationale could be devised, drop the row
visual_df = visual_df[visual_df["rationale_0"].map(lambda elt: elt is not None)]
# drop rows where the predictions were incorrect
visual_df = visual_df[(visual_df["is_toxic_pred"] - visual_df["is_toxic_true"]).eq(0)]

In [22]:
import pandas as pd
from rdkit import Chem
from rdkit.Chem import Draw
from rdkit.Chem.Draw import rdMolDraw2D
from IPython.display import display, HTML
from io import BytesIO
import base64
from PIL import Image

def highlight_substructure(smiles: str, selected_atoms: list[int], score: float):
    """
    Generates an RDKit image of a molecule with a specified substructure highlighted
    based on a list of atom indices. Both atoms and bonds of the substructure will be highlighted.

    Args:
        smiles (str): The SMILES string of the molecule.
        selected_atoms (list[int]): A list of atom indices in the molecule to highlight.
        score (float): The score associated with the substructure.
                       If score < 0.5, highlights in green.
                       If score >= 0.5, highlights in red.

    Returns:
        PIL.Image.Image: A PIL Image object of the highlighted molecule,
                         or None if SMILES is invalid or no atoms are selected.
    """
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        print(f"Warning: Could not parse SMILES: {smiles}")
        return None

    if not selected_atoms: # No atoms to highlight
        print(f"No selected atoms for SMILES: {smiles}")
        # Optionally, you can return an unhighlighted image or None
        # For now, returning None as per previous error handling
        return None

    highlight_atoms = list(selected_atoms) # Ensure it's a list for RDKit
    highlight_bonds = []

    # Identify bonds where both atoms are in the selected_atoms list
    for bond in mol.GetBonds():
        begin_atom_idx = bond.GetBeginAtomIdx()
        end_atom_idx = bond.GetEndAtomIdx()

        if begin_atom_idx in highlight_atoms and end_atom_idx in highlight_atoms:
            highlight_bonds.append(bond.GetIdx())

    # Determine highlight color based on score
    highlight_color = (0.0, 1.0, 0.0) if score < 0.5 else (1.0, 0.0, 0.0) # Green or Red

    # Create a drawer object
    drawer = rdMolDraw2D.MolDraw2DCairo(300, 200)

    # Draw the molecule with highlights for both atoms and bonds
    drawer.DrawMolecule(
        mol,
        highlightAtoms=highlight_atoms,
        highlightAtomColors={atom_idx: highlight_color for atom_idx in highlight_atoms},
        highlightBonds=highlight_bonds,
        highlightBondColors={bond_idx: highlight_color for bond_idx in highlight_bonds}
    )
    drawer.FinishDrawing()

    # Convert the drawing to a PIL Image
    img_bytes = drawer.GetDrawingText()
    img = Image.open(BytesIO(img_bytes))
    return img

# Iterate through the DataFrame and display visualizations
# Using visual_df as per your notebook's last cell output
for index, row in visual_df.iterrows():
    smiles = row['smiles']
    # Get the rationale_0_atoms directly
    rationale_atoms = row['rationale_0_atoms']
    score = row['rationale_0_score']
    
    # Only try to highlight if rationale_atoms is not None and not empty
    if rationale_atoms is not None and len(rationale_atoms) > 0:
        img = highlight_substructure(smiles, rationale_atoms, score)
    else:
        img = None # No valid substructure to highlight

    if img:
        # Convert PIL Image to base64 for embedding in HTML
        buffered = BytesIO()
        img.save(buffered, format="PNG")
        img_str = base64.b64encode(buffered.getvalue()).decode()

        # Display using IPython.display.HTML for better formatting in notebooks
        display(HTML(f"""
            <div style="display: flex; align-items: center; margin-bottom: 20px; border: 1px solid #eee; padding: 10px; border-radius: 8px; background-color: #f9f9f9;">
                <img src="data:image/png;base64,{img_str}" style="margin-right: 15px; border-radius: 4px;"/>
                <div style="font-family: 'Inter', sans-serif;">
                    <p style="margin: 0; font-size: 14px;"><strong>SMILES:</strong> {smiles}</p>
                    <p style="margin: 5px 0 0; font-size: 14px;"><strong>Rationale Atoms:</strong> {rationale_atoms}</p>
                    <p style="margin: 5px 0 0; font-size: 14px;"><strong>Score:</strong> {score:.2f}
                        <span style="color: {'green' if score < 0.5 else 'red'}; font-weight: bold;">
                            ({('Green' if score < 0.5 else 'Red')} Highlight)
                        </span>
                    </p>
                </div>
            </div>
        """))
    else:
        # Display an error message if no image could be generated or no rationale atoms were found
        display(HTML(f"""
            <div style="margin-bottom: 20px; border: 1px solid #fdd; padding: 10px; border-radius: 8px; background-color: #ffe;">
                <p style="margin: 0; font-size: 14px; color: #d00;"><strong>Could not visualize:</strong> SMILES: {smiles}, Rationale Atoms: {rationale_atoms}. This might be due to invalid SMILES, or no rationale found/selected.</p>
            </div>
        """))
