In [19]:
import os
import wget
import pathlib
base_path = pathlib.Path(".").absolute()
print(base_path)

from rdkit import Chem

os.chdir("..")
from src.abstract import *
from src.interfaces.rdkit import *
from src.interfaces.durrantlab import *
from src.interfaces.openmm import *
from src.interfaces.gnina import *
from src.interfaces.openmm import *
from src.interfaces.generic import *
from src.interfaces.cuby4 import *
from src.interfaces.cuby4c import *
from src.config import *
from src.utils import baselines, pdbtools
os.chdir(base_path)

SQM_SCORE_MOPAC_SOLV_CONFIG = Cuby4MOPACFullConfig(
    method="pm6",
    mopac_exe="auto",
    mozyme=True,
    corrections="d3h4x",
    solvent="cosmo2",
)

SQM_SCORE_MOPAC_GAS_CONFIG = Cuby4MOPACFullConfig(
    method="pm6",
    mopac_exe="auto",
    mozyme=True,
    corrections="d3h4x",
    solvent="none",
)

class SQMBaselineScorePipe(Pipeline):
    name = "SQM Ligand Baseline Scoring"
    def __init__(self, return_input: bool = False):
        super().__init__(return_input=False, out_dir=OUT_DIR)
        self.rtin = return_input
        interface1 = Cuby4Interface(intconfig=SQM_SCORE_MOPAC_SOLV_CONFIG, cuby4_path="auto", 
                                    work_dir=TMP_DIR, debug=DEBUG_MODE)
        interface2 = Cuby4Interface(intconfig=SQM_SCORE_MOPAC_GAS_CONFIG, cuby4_path="auto", 
                                    work_dir=TMP_DIR, debug=DEBUG_MODE)
        self.blocks = [ # TODO: Other enumerations...
            RDKitTautEmumerator(debug=DEBUG_MODE, max_tautomers=100),
            DimorphiteProtoEnumerator(debug=DEBUG_MODE, min_ph=4.4, max_ph=10.4),
            RDKitRingEnumerator(debug=DEBUG_MODE, minimize=True, num_confs=50, max_per_ring=2, dist_threshold=0.5),
            OpenMMLigandOptimizer(debug=DEBUG_MODE, forcefields=["amber14-all.xml"], work_dir=TMP_DIR),
            Cuby4LigandOptimizer(debug=DEBUG_MODE, interface=interface2, n_threads=N_AVAILABLE_THREADS, work_dir=TMP_DIR),
            Cuby4LigandEnergyScorer(debug=DEBUG_MODE, interface=interface1, n_threads=N_AVAILABLE_THREADS, work_dir=TMP_DIR),
        ]
    def run(self, ligands: List[Chem.Mol], targets: List[str], extra_info: dict = ...) -> Tuple[List[Chem.Mol] | List[str]]:
        n_primary_ligs = len(ligands)
        orig_ligs = ligands
        orig_targs = targets
        # [ligand.SetDoubleProp("end_state_energy", self.blocks[-1].score(ligand)) for ligand in ligands]
        [ligand.SetIntProp("SpecialIdx", i) for i, ligand in enumerate(ligands)]
        ligands, targets = super().run(ligands, targets, extra_info)
        prev_id = 0
        final_scores = []
        energies = []
        for ligand in ligands:
            # print(prev_id)
            # print(ligand.GetIntProp("SpecialIdx"))
            if ligand.GetIntProp("SpecialIdx") != prev_id:
                prev_id = ligand.GetIntProp("SpecialIdx")
                final_scores.append(baselines.average_energies(energies))
                energies = []
            fcharge = Chem.GetFormalCharge(ligand)
            energy = ligand.GetDoubleProp(Cuby4LigandEnergyScorer.score_name)
            htrans = baselines.calc_hydrogen_transfer_energy(fcharge)
            energies.append(energy+htrans)
        final_scores.append(baselines.average_energies(energies))
        
        # print(n_primary_ligs)
        # print(final_scores)
        assert n_primary_ligs == len(final_scores)
        extra_info["baseline_energy"] = final_scores

        if self.rtin:
            for i, lig in enumerate(orig_ligs):
                lig.SetDoubleProp("baseline_energy", final_scores[i])
            return orig_ligs, orig_targs
        else:
            return ligands, targets

/home/arazthexd/projects/002_sqm/test


In [3]:
pdb_path = os.path.join(base_path, "4bck.pdb")
if not os.path.exists(pdb_path):
    wget.download("http://files.rcsb.org/download/4BCK.pdb", out=pdb_path)

In [20]:
class TestPipe(Pipeline):
    name = "Test Pipeline"
    def __init__(self):
        super().__init__()
        self.interface = Cuby4Interface(intconfig=SQM_SCORE_MOPAC_SOLV_CONFIG, cuby4_path="auto", 
                                        work_dir=TMP_DIR, debug=DEBUG_MODE)
        self.blocks = [
            # PDBFixerProteinPrepper(debug=DEBUG_MODE, ph=7.4, chains="all", keep_water=False, 
            #                        add_missing_residues=False, out_dir=TMP_DIR, save_prefix="prepared"),
            # PocketIsolator(debug=DEBUG_MODE, radius=10.0, work_dir=TMP_DIR),
            # Cuby4ComplexInteractScorer(debug=DEBUG_MODE, interface=self.interface, n_threads = N_AVAILABLE_THREADS, work_dir=TMP_DIR),
            SQMBaselineScorePipe(return_input=True),
            # Cuby4LigandEnergyScorer(score_name="hahahatest", debug=DEBUG_MODE, interface=self.interface, n_threads=N_AVAILABLE_THREADS, work_dir=TMP_DIR),
            # Cuby4LigandHTransScorer(debug=DEBUG_MODE, interface=self.interface, n_threads=N_AVAILABLE_THREADS, work_dir=TMP_DIR)
        ]

In [21]:
prot = "/home/arazthexd/projects/002_sqm/test/4bck_prot.pdb"
lig = Chem.MolFromMol2File("/home/arazthexd/projects/002_sqm/test/4bck_lig.mol2", removeHs=False)

In [22]:
pipeline = TestPipe()
output = pipeline.run([lig], [prot])


>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>              STARTED: Test Pipeline               >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>       STARTED: SQM Ligand Baseline Scoring        >>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
target len 1

[Running Pipe Block: RDKIT Tautomer Enumerator]


100%|██████████| 1/1 [00:00<00:00,  1.78it/s]


target len 1

[Running Pipe Block: DimorphiteDL Protomer Enumerator]


  if line is not "":
  if line is not "":
  if line is not "":
  if line is not "":
  if line is not "":
  if line is not "":
  if line is not "":



usage: ipykernel_launcher.py [-h] [--min_ph MIN] [--max_ph MAX]
                             [--pka_precision PRE] [--smiles SMI]
                             [--smiles_file FILE] [--output_file FILE]
                             [--max_variants MXV] [--label_states] [--silent]
                             [--test]

Dimorphite 1.2.4: Creates models of appropriately protonated small moleucles.
Apache 2.0 License. Copyright 2020 Jacob D. Durrant.

options:
  -h, --help           show this help message and exit
  --min_ph MIN         minimum pH to consider (default: 6.4)
  --max_ph MAX         maximum pH to consider (default: 8.4)
  --pka_precision PRE  pKa precision factor (number of standard devations,
                       default: 1.0)
  --smiles SMI         SMILES string to protonate
  --smiles_file FILE   file that contains SMILES strings to protonate
  --output_file FILE   output file to write protonated SMILES (optional)
  --max_variants MXV   limit number of variants per input 

Exception: ERROR: unrecognized arguments: --f=/home/arazthexd/.local/share/jupyter/runtime/kernel-v2-623rxqxR4pnsuRw.json



In [15]:
output

([<rdkit.Chem.rdchem.Mol at 0x7f2ce15b8190>],
 ['./tmp/isolated_pocket_1EH36.pdb'])

In [23]:
from paradag import DAG
from collections import OrderedDict

In [37]:
class Vtx(object):
    def __init__(self, v):
        self.__value__ = v

vtx = Vtx(999)
vtx2 = Vtx(20)

dag = DAG()

dag.add_vertex(vtx, vtx2)

In [39]:
from paradag import dag_run
from paradag import SequentialProcessor

class CustomExecutor:
    def param(self, vertex):
        return vertex.__value__

    def execute(self, param):
        print('Executing:', param)

print(dag_run(dag, processor=SequentialProcessor(), executor=CustomExecutor()))

Executing: 20
Executing: 999
[<__main__.Vtx object at 0x7f2ce0cc7010>, <__main__.Vtx object at 0x7f2ce0cc7af0>]


In [47]:
class _Connection():
    pass

class PipelineBlock():
    def __init__(self, debug: bool = False) -> None:
        self.debug = debug
        self._executed = False
        self._output = dict()
        self._in_connections = dict()
        self._out_connections = dict()
    
    def reset(self, removeOutput: bool = True, removeConnections: bool = True) -> None:
        if removeOutput:
            self.executed = False
            self._output = dict()
        if removeConnections:
            self._in_connections: Dict[str, _Connection] = dict()
            self._out_connections: Dict[str, List[_Connection]] = dict()
    
    def _add_connection(self, connection):
        pass

    def link_input(self, input_key: str, prev_block: PipelineBlock, prev_out_key: str):
        assert input_key in self.required_input_keys + self.optional_input_keys
        assert input_key not in self._in_connections.keys()
        assert prev_out_key in prev_block.output_keys
        connection = _Connection(prev_block, prev_out_key, self, input_key)
        self._in_connections[input_key] = connection
        prev_block._add_connection(connection)

    # @abstractmethod
    def execute(self, input_dict: dict):
        self.output = input_dict
        return self.output
    
    def check(self, inputs: dict):
        assert all(k in inputs.keys() for k in self.required_input_keys)
        for k in inputs.keys():
            if k not in self.required_input_keys + self.optional_input_keys:
                if self.debug: print(f"WARNING: {k} not in input keys of pipeline block: {self.name}")

    @property
    def output(self) -> dict:
        return self._output

    @output.setter
    def output(self, d: dict):
        assert set(d.keys()) == set(self.output_keys)
        self._output = d

    @property
    def executed(self) -> bool:
        return self._executed

    @property
    def output_keys(self) -> List[str]:
        return ["ligands", "targets"]
    
    @property
    def required_input_keys(self) -> List[str]:
        return ["ligands", "targets"]
    
    @property
    def optional_input_keys(self) -> List[str]:
        return []
        
    @property
    @abstractmethod
    def name(self) -> str:
        return "pipeblock"
    
    def is_same(self, p):
        return self == p

In [48]:
a = PipelineBlock()

In [49]:
b = PipelineBlock()

In [51]:
a.is_same(a)

True

In [54]:
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Tuple, Any, Dict

import os
from itertools import repeat

def zip_complex(ligands, targets):
    if len(targets) == 1:
        zipped = zip(ligands, repeat(targets[0]))
    elif len(targets) == len(ligands):
        zipped = zip(ligands, targets)
    else:
        raise ValueError()
    return zipped

class _Connection():
    def __init__(self, source_block: PipelineBlock, source_key: str, 
                 target_block: PipelineBlock, target_key: str) -> None:
        source_block._add_connection(self)
        target_block._add_connection(self)
        source_block._cyclic_check_pass([])
        self.source_block = source_block
        self.source_key = source_key
        self.target_block = target_block
        self.target_key = target_key

class PipelineBlock(ABC):
    def __init__(self, debug: bool = False) -> None:
        self.debug = debug
        self._executed = False
        self._latest_input = None
        self._output = dict()
        self._in_connections: Dict[str, _Connection] = dict()
        self._out_connections: Dict[str, List[_Connection]] = dict()
    
    def reset(self, removeOutput: bool = True, removeConnections: bool = True) -> None:
        if removeOutput:
            self._executed = False
            self._latest_input = None
            self._output = dict()
        if removeConnections:
            self._in_connections: Dict[str, _Connection] = dict()
            self._out_connections: Dict[str, List[_Connection]] = dict()
    
    def _add_connection(self, connection: _Connection) -> None:
        if connection.source_block == self:
            assert connection.source_key in self.output_keys
            if connection.source_key not in self._out_connections.keys():
                self._out_connections[connection.source_key] = []
            self._out_connections[connection.source_key].append(connection)
        if connection.target_block == self:
            assert connection.target_key in self.required_input_keys + self.optional_input_keys
            assert connection.target_key not in self._in_connections.keys()
            self._in_connections[connection.target_key] = connection
    
    def _next_blocks(self) -> List[PipelineBlock]:
        next_blocks = []
        for connections in self._out_connections.values():
            for connection in connections:
                if connection.target_block not in next_blocks:
                    next_blocks.append(connection.target_block)
        return next_blocks
        
    def _cyclic_check_pass(self, prev_blocks: List[PipelineBlock]) -> None:
        assert self not in prev_blocks
        prev_blocks.append(self)
        for block in self._next_blocks():
            block._cyclic_check_pass(prev_blocks)

    @abstractmethod
    def execute(self, input_dict: dict):
        self.output = input_dict
        return self.output

    @property
    def output(self) -> dict:
        return self._output

    @output.setter
    def output(self, d: dict):
        assert set(d.keys()) == set(self.output_keys)
        self._output = d

    @property
    def executed(self) -> bool:
        return self._executed

    @property
    def output_keys(self) -> List[str]:
        return ["ligands", "targets"]
    
    @property
    def required_input_keys(self) -> List[str]:
        return ["ligands", "targets"]
    
    @property
    def optional_input_keys(self) -> List[str]:
        return []
        
    @property
    @abstractmethod
    def name(self) -> str:
        return "pipeblock"

class Pipeline(ABC):
    def __init__(self, base_dir: str = "."):
        if not os.path.exists(base_dir):
            os.mkdir(base_dir)
        
        self._initiated = False
        self.base_dir = pathlib.Path(base_dir)
        self._blocks: Dict[str, PipelineBlock] = dict()
        self._connections: Dict[str, _Connection] = dict()
    
    def add_block(self, block: PipelineBlock, block_name: str | None = None) -> None:
        if block_name is None:
            block_name = "block{i}"
            i = 0
            while True:
                i += 1
                name_suggest = block_name.format(i=i)
                if name_suggest not in self._blocks.keys():
                    block_name = name_suggest
                    break
        block._parent = self
        block._name_in_parent = block_name # TODO
        self._blocks[block_name] = block
    
    def add_blocks(self, blocks: List[PipelineBlock] | Dict[str, PipelineBlock]) -> None:
        if isinstance(blocks, list):
            [self.add_block(block) for block in blocks]
        if isinstance(blocks, dict):
            [self.add_block(block, name) for name, block in blocks.items()]

    def initiate(self):
        self._initiated = True

    @property
    def initiated(self):
        return self._initiated

    @property
    @abstractmethod
    def name(self) -> str:
        return "pipeline"

In [56]:
Pipeline()._initiated

True