# Creating a lazy-loading reaction.

In [121]:
import rdkit
from dgym.molecule import Molecule
from typing import Optional, List, Union, Any
import itertools
from rdkit.Chem.rdChemReactions import ChemicalReaction
from rdkit import Chem
import random
import dgym as dg

In [122]:
reactions = dg.ReactionCollection.from_json(
    path = f'../../dgym-data/All_Rxns_rxn_library.json',
    smarts_col = 'reaction_string',
    classes_col = 'functional_groups'
)

deck = dg.MoleculeCollection.load(
    f'../../dgym-data/DSi-Poised_Library_annotated.sdf',
    reactant_names=['reagsmi1', 'reagsmi2', 'reagsmi3']
)

In [306]:
import inspect

class LazyReaction:
    
    def __init__(
        self,
        template: Union[str, ChemicalReaction],
        metadata: Optional[dict] = None,
        id: Optional[str] = None
    ) -> None:
        """
        Parameters
        ----------
        template : rdkit.Chem.rdChemReactions.ChemicalReaction
            An rdkit reaction template.
        """
        if isinstance(template, str):
            template = rdkit.Chem.AllChem.ReactionFromSmarts(template)

        self.id = id
        self.template = template
        self.products = list(template.GetProducts())
        self.agents = list(template.GetAgents())
        self.reactants = list(template.GetReactants())
        self.metadata = metadata
    
    def run(self, reagents, sanitize=True):
        
        # If any of the reagents are generators
        if any(inspect.isgenerator(r) for r in reagents):
            
            # Convert ordinary reagents to infinite generators
            sequences = [
                itertools.repeat(x)
                if not inspect.isgenerator(x) else x
                for x in reagents
            ]

            # Run reactants lazily
            for combination in zip(*sequences):
                yield from self.run_single_step(combination, sanitize)
        else:
            yield from self.run_single_step(reagents, sanitize)
        
    def run_single_step(self, reagents, sanitize=True):
        reagents = [r.mol if isinstance(r, Molecule) else r for r in reagents]
        output = self.template.RunReactants(reagents)
        lazy_output = self.parse_output(output, reagents, sanitize = sanitize)
        return lazy_output
        
    def parse_output(self, output, reactants, sanitize=True):
        output = self.flatten_and_randomize(output)
        cache = set()
        for product in output:
            if product := self.sanitize(product):
                yield Molecule(product, reaction = self, reactants = reactants)
            else:
                continue
                
    @staticmethod
    def flatten_and_randomize(nested_tuples, randomize=True):
        flattened_items = []
        for item in nested_tuples:
            if isinstance(item, tuple):
                flattened_items.extend(item)
            else:
                flattened_items.append(item)

        random.shuffle(flattened_items)
        for item in flattened_items:
            yield item
        
    def sanitize(self, mol):
        smiles = Chem.MolToSmiles(mol)
        product = Chem.MolFromSmiles(smiles)
        return product
    
    def _repr_png_(self):
        return self.template._repr_png_()

In [307]:
multi_step_molecule = Molecule(
    'O=C(c1ccnnc1)N(c1ccc2ncccc2c1)c1cnn(-c2c(Cl)cc(C(F)(C(F)(F)F)C(F)(F)F)cc2Cl)c1',
    reaction = reactions['18_Halide_and_Amine'][0],
    reactants = [
        Molecule('FC(F)(F)C(F)(c1cc(Cl)c(-n2cc(I)cn2)c(Cl)c1)C(F)(F)F'),
        Molecule('O=C(Nc1ccc2ncccc2c1)c1ccnnc1',
                 reaction = reactions['13_Carboxylate_and_Amine'][0],
                 reactants = [Molecule('O=C(O)c1ccnnc1'), Molecule('Nc1ccc2ncccc2c1')]
        )
    ]
)

In [308]:
molecule = Molecule(
    'O=C(Nc1ccc2ncccc2c1)c1ccnnc1',
    reaction = reactions['13_Carboxylate_and_Amine'][0],
    reactants = [Molecule('O=C(O)c1ccnnc1'), Molecule('Nc1ccc2ncccc2c1')]
)

r = LazyReaction(reactions['13_Carboxylate_and_Amine'][0].template)

product = r.run(molecule.reactants)

In [309]:
%%time
multi_step_molecule = Molecule(
    'O=C(c1ccnnc1)N(c1ccc2ncccc2c1)c1cnn(-c2c(Cl)cc(C(F)(C(F)(F)F)C(F)(F)F)cc2Cl)c1',
    reaction = reactions['18_Halide_and_Amine'][0],
    reactants = [
        Molecule('FC(F)(F)C(F)(c1cc(Cl)c(-n2cc(I)cn2)c(Cl)c1)C(F)(F)F'),
        product
    ]
)

r = LazyReaction(reactions['18_Halide_and_Amine'][0].template)
output = r.run(multi_step_molecule.reactants)
result = list(output)

CPU times: user 9.13 ms, sys: 0 ns, total: 9.13 ms
Wall time: 9.66 ms


In [326]:
result[2].reactants

[<dgym.molecule.Molecule at 0x7f749af98150>,
 <dgym.molecule.Molecule at 0x7f749b502f50>]