# Code Parallelization

QSPRpred is also helpful to run parallel operations on data. It tries to take the headache out of parallelization by
providing a simple interface to run operations on data in parallel. In this tutorial, we will show how to use the
these features.

## Example Data Set

We will borrow the multitask data set from the [associated tutorial](../modelling/multi_task_modelling.ipynb) since it contains a larger number of molecules:

In [1]:
import pandas as pd

from qsprpred.data import MoleculeTable

# load the data
df = pd.read_csv('../../tutorial_data/AR_LIGANDS.tsv', sep='\t')
df = df.pivot(index="SMILES", columns="accession", values="pchembl_value_Mean")
df.columns.name = None
df.reset_index(inplace=True)
mt = MoleculeTable.fromDF(name="ParallelizationExample", df=df)
len(mt)

6797

## Setting `nJobs` and `chunkSize`

The default QSPRpred data structures (`MoleculeTable`, `QSPRTable` and `PandasChemStore`) all
support parallelization of code by chunking the data into smaller
pieces and running the code on each chunk in parallel. This is done by setting
the `nJobs` and `chunkSize` properties of these objects. 
The `nJobs` property specifies the number of parallel jobs to run. The `chunkSize`
property specifies the number of molecules to process in each job. Note
that in general it is not required that each `Parallelizable` object in QSPRpred
is also `ChunkIterable`, but for all the aforementioned classes this is true 
(see the [API documentation](https://cddleiden.github.io/QSPRpred/docs/api/modules.html) for more information
on the two interfaces).

The `chunkSize` property is automatically calculated based on the number of jobs, but
in some cases it may be useful to set it manually. For example, if the code
being run in parallel is very fast, it may be useful to increase the chunk size
to reduce the overhead of parallelization. On the other hand, if the code being
run in parallel is very slow, it may be useful to decrease the chunk size to
reduce the amount of time spent waiting for the slowest job to finish. 

In addition, the
`chunkSize` property also affects the memory usage of the parallelization. If
the code being run in parallel is very memory intensive, it may be useful to
decrease the chunk size to reduce the memory usage of the parallel processes 
by running on smaller batches of data.

In this tutorial, we will illustrate a few different scenarios and use the `MoleculeTable` above as an examle, but you should be able to execute these workflows just as well for all the other data structures. First, we will run a simple
descriptor calculation in parallel, which you must have seen before in the other tutorials. This time we wrap it in a function so that we can measure the time it takes to run:

In [2]:
from qsprpred.data.descriptors.sets import DescriptorSet
from qsprpred.data.descriptors.fingerprints import MorganFP
from qsprpred.utils.stopwatch import StopWatch


def time_desc_calc(data: MoleculeTable, desc_set: DescriptorSet):
    """A simple function to time descriptor calculation on a data table.
    
    Args:
        data: The data table to calculate descriptors on.
        desc_set: The descriptor set to calculate.
    """
    if data.hasDescriptors([desc_set])[0]:
        print(f"Removing old descriptors: {desc_set}")
        data.dropDescriptorSets([desc_set], full_removal=True)
    print(f"Running and timing descriptor calculation: {desc_set}")
    watch = StopWatch()
    data.addDescriptors([desc_set])
    watch.stop()


time_desc_calc(mt, MorganFP(3, 2048))

Running and timing descriptor calculation: MorganFP
Time it took: 2.3691866509616375


This calculation is done on one CPU by default:

In [3]:
mt.nJobs

1

and the whole data set supplied as one chunk since the default behavior is to process the whole data set in one go on one CPU.

We can now try running this calculation in parallel on 2 CPUs:

In [4]:
mt.nJobs = 4

The chunk size will automatically be adjusted to 25% of the data set size so that each portion of the data set is processed on a separate CPU. We can see how this affects the time taken to run the calculation:

In [5]:
time_desc_calc(mt, MorganFP(3, 2048))

Removing old descriptors: MorganFP
Running and timing descriptor calculation: MorganFP
Time it took: 0.8263363623991609


This was faster, but not by a factor of 4. This is because there is overhead associated with parallelization and the calculation of fingerprints is very fast by itself so the overhead affects our runtime more. In such cases, be careful about setting the chunk size manually:

In [6]:
mt.chunkSize = 10
time_desc_calc(mt, MorganFP(3, 2048))

Removing old descriptors: MorganFP
Running and timing descriptor calculation: MorganFP
Time it took: 3.2016703812405467


**This was slower than even the single CPU calculation!**

## Custom Operations

Descriptor calculators are already prepared parallelizable actions that you can use with the `addDescriptors` method. However, you can also run custom operations on the data set in parallel. To do this, you need to use the `apply` method. This method takes a function as input and runs it on each chunk of the data set in parallel. The function must take the provided chunk of data and any optional arguments as input. Output can be anything that is of value to us. Here is a simple example of a processing function that takes a chunk of a data table and prints its shape and any arguments passed to it:

In [7]:
def processing_function(chunk, *args, **kwargs):
    """A simple function to process a chunk of a data table. Just prints and its arguments."""
    print(chunk.shape)
    print(args)
    print(kwargs)
    props = chunk.to_dict(orient="list")
    for prop in props:
        print(prop, " = ", props[prop][0])
    return chunk.shape

Then we can pass this function to the `apply` method of the given data structure:

In [8]:
mt.nJobs = 2  # this also resets the chunk size to roughly 50% of the data set size again
mt.apply(
    processing_function,
    func_args=("A",),
    func_kwargs={"B": None},
    chunk_type="df"
)

<generator object PandasChemStore.apply at 0x7fb0205803d0>

As you can see, now calculation was run yet. We just received a generator object that we can unwind to run the function on each chunk. This is useful because it means that the underlying implementation does not have to hold all the chunks in memory, but can create them as they are being processed, which can save memory. Let's do this again, but this time we will actually collect the results:

In [9]:
results = []
for result in mt.apply(processing_function, func_args=("A",), func_kwargs={"B": None},
                       chunk_type="df"):
    results.append(result)

(3398, 8)
('A',)
{'B': None}
(3398, 8)
('A',)
{'B': None}
P29275  =  nan
P29274  =  6.61
ID_before_change P29275 =    = ParallelizationExample_storage_library_0000 
nanoriginal_smiles 
 = P29274   = Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1 
5.29P30542
 ID_before_change =   =   nanParallelizationExample_storage_library_3398

original_smilesP0DMS8   =  =   5.89COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1

IDP30542   =  =   ParallelizationExample_storage_library_00005.9

SMILESP0DMS8   =  =   Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1nan

ID   = ParallelizationExample_storage_library_3398
(1, 8)SMILES
 ('A',) = 
 {'B': None}COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1

P29275  =  nan
P29274  =  nan
ID_before_change  =  ParallelizationExample_storage_library_6796
original_smiles  =  c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1
P30542  =  nan
P0DMS8  =  5.56
ID  =  ParallelizationExample_storage_library_6796
SMILES  =  c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1


The results in this case are just the shapes of the chunks that we returned each time:

In [10]:
results

[(3398, 8), (3398, 8), (1, 8)]

Since we do not have and even number of items in the data set, one of the chunks is just one data entry, but that is OK. It will get processed just like the two larger chunks. You can see that we were also able to request the type of the chunk with the `chunk_type` argument. This is a special feature of `ChemStore` that `MoleculeTable` exposes as well. The default implementation will actually return `StoredMol` instances of the underlying storage:

In [11]:
def processing_function_stored_mol(chunk_of_mols, *args, **kwargs):
    """Just process the first molecule in the chunk and return its properties."""
    return chunk_of_mols[0].props


for result in mt.apply(processing_function_stored_mol):
    print(result)

{'P29275': nan, 'P29274': 6.61, 'ID_before_change': 'ParallelizationExample_storage_library_0000', 'original_smiles': 'Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1', 'P30542': nan, 'P0DMS8': 5.89, 'ID': 'ParallelizationExample_storage_library_0000', 'SMILES': 'Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1'}
{'P29275': nan, 'P29274': nan, 'ID_before_change': 'ParallelizationExample_storage_library_6796', 'original_smiles': 'c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1', 'P30542': nan, 'P0DMS8': 5.56, 'ID': 'ParallelizationExample_storage_library_6796', 'SMILES': 'c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1'}
{'P29275': nan, 'P29274': 5.29, 'ID_before_change': 'ParallelizationExample_storage_library_3398', 'original_smiles': 'COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1', 'P30542': 5.9, 'P0DMS8': nan, 'ID': 'ParallelizationExample_storage_library_3398', 'SMILES': 'COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1'}


But you can also request just smiles or rdkit molecules directly:

In [12]:
def processing_function_smiles(chunk_of_smiles, *args, **kwargs):
    """Just process the first molecule in the chunk and return its properties."""
    return chunk_of_smiles.iloc[0]


for result in mt.apply(processing_function_smiles, chunk_type="smiles"):
    print(result)

Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1
COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1
c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1


In [13]:
from rdkit import Chem


def processing_function_rdkit(chunk_of_rdkit, *args, **kwargs):
    """Just process the first molecule in the chunk and return its InChI."""
    return Chem.MolToInchi(chunk_of_rdkit[0])


for result in mt.apply(processing_function_rdkit, chunk_type="rdkit"):
    print(result)

InChI=1S/C15H14BrN5S/c16-10-4-3-5-11(8-10)19-15-20-12-13(21-6-1-2-7-21)17-9-18-14(12)22-15/h3-5,8-9H,1-2,6-7H2,(H,19,20)
InChI=1S/C22H21N5O3/c1-29-16-7-5-6-15(12-16)26-20-18(13-23-21(25-20)24-14-10-11-14)27(22(26)28)17-8-3-4-9-19(17)30-2/h3-9,12-14H,10-11H2,1-2H3,(H,23,24,25)
InChI=1S/C23H31N7O/c1-2-4-6-17(7-5-3-1)26-22-20-21(25-16-24-20)28-23(29-22)27-18-8-10-19(11-9-18)30-12-14-31-15-13-30/h8-11,16-17H,1-7,12-15H2,(H3,24,25,26,27,28,29)


**WARNING:** The `apply` method does not guarantee that the results will be returned in the same order as the chunks were processed. This is because the chunks are processed in parallel and the order depends on the order in which the parallel processes finish. This is not a problem since you will usually have access to the IDs one way or another, but it is something to keep in mind. Molecule processors in the next section formalize this a bit and give you means to write more complex processing functions.

### Molecule Processors

One step above the simple `apply` method is the `processMols` method. This method takes a `MolProcessor` object as input. This object must implement a `__call__` method, which has the same signature as before, but we can signal to the caller what properties we want to have set on the instances. For example, in this case we will require that the data set has activity values associated with the `P29274` accession number:

In [14]:
from qsprpred.data.storage.interfaces.stored_mol import StoredMol
from qsprpred.data.processing.mol_processor import MolProcessor
from typing import Any, Generator, Callable


class MyProcessor(MolProcessor):
    def __call__(
            self,
            mols: list[StoredMol],
            *args,
            **kwargs
    ) -> Any:
        """Just return a tuple of some data extracted for the first molecule in the chunk."""
        return mols[0], mols[0].id, mols[0].props["original_smiles"]

    @property
    def supportsParallel(self) -> bool:
        """Needs to be set to indicate if parallelization is required since
        some processors may provide their own way of parallelization, which 
        can interfere with the default parallelization of the data structure.
        """
        return True

    @property
    def requiredProps(self) -> list[str]:
        """Return a list of required properties that the processor needs to run."""
        return ["P29274"]  # try setting this to a nonexistent property to see an error


results = []
for result in mt.processMols(MyProcessor()):
    results.append(result)
results

[(<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0204f5790>,
  'ParallelizationExample_storage_library_0000',
  'Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0cc040f80>,
  'ParallelizationExample_storage_library_6796',
  'c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb031f5c890>,
  'ParallelizationExample_storage_library_3398',
  'COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1')]

This can be useful when we want to reuse our processors on different data sets and want to force some checks before we run things.

## Changing the Parallelization Backend

Parallelization of data set operations is handled by the `parallelGenerator` of the `MoleculeTable` object. By default, the `MultiprocessingJITGenerator` is used. This generator uses the `multiprocessing` module to run the operations in parallel on the chunks provided as a generator by the `MoleculeTable`. However, you can also use any other implementation of the `ParallelGenerator` interface and replace the default generator with it. For example, you can use the `DaskJITGenerator` to run the operations above just the same, but this time with Dask as the parallel backend:

In [15]:
from qsprpred.extra.utils.parallel import DaskJITGenerator

mt.parallelGenerator = DaskJITGenerator(mt.nJobs)

In [16]:
results = []
for result in mt.processMols(MyProcessor()):
    results.append(result)
results

[(<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0cbf62f60>,
  'ParallelizationExample_storage_library_0000',
  'Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0c071b260>,
  'ParallelizationExample_storage_library_6796',
  'c1nc2c(nc(Nc3ccc(N4CCOCC4)cc3)nc2NC2CCCCCCC2)[nH]1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb02058b260>,
  'ParallelizationExample_storage_library_3398',
  'COc1cc(-n2c(=O)n(-c3c(OC)cccc3)c3c2nc(NC2CC2)nc3)ccc1')]

By default, the `DaskJITGenerator` will use an automatically initialized local Dask cluster to run the operations in parallel. However, you can also customize the client so that it uses SSH to connect to a remote Dask cluster. You could for example deploy an SSH Dask cluster and then use it in your client by overriding the `getPool` method. You can uncomment the following cells to try a simple example on your local machine (see the `DaskJITGenerator` and `SSHCluster` documentation for more information):

In [17]:
# !pip install asyncssh bokeh==2.4.3 # dependencies for the dashboard and SSHCluster

In [18]:
# from dask.distributed import SSHCluster
# 
# cluster = SSHCluster(
#     ["localhost", "localhost"], # first argument is the scheduler, the rest are workers (just one in this case)
#     connect_options={"known_hosts": None},
#     worker_options={"nthreads": 1, "n_workers": mt.nJobs},
#     scheduler_options={"port": 0, "dashboard_address": ":8797"}, # dashboard will be available at http://localhost:8797/status
# )
# cluster

In [19]:
# from dask.distributed import Client
# 
# 
# class CustomDaskJITGenerator(DaskJITGenerator):
#     """A custom Dask JIT generator that uses a custom Dask client."""
# 
#     def getPool(self):
#         """Get a Dask client that connects to our custom cluster set up above."""
#         return Client(cluster)
# 
# 
# mt.parallelGenerator = CustomDaskJITGenerator(mt.nJobs)
# 
# results = []
# for result in mt.processMols(MyProcessorWithID()):
#     results.append(result)
# df_iks = pd.concat(results)
# df_iks

In [20]:
# undeploy the cluster
# cluster.close()

**Note:** The JIT abbreviation stands for "Just In Time" and reflects the fact that all classes deriving from the `JITParallelGenerator` interface evaluate the input generator item by item according to the number of available workers. This is useful to keep the memory usage low for potentially large data sets when running multiprocessing. However, you could implement any kind of parallelization strategy by deriving from the `ParallelGenerator` interface directly: 

In [21]:
from qsprpred.utils.parallel import ParallelGenerator


class JoblibGenerator(ParallelGenerator):
    """Example of a custom parallel generator with progress bar and using the joblib library."""

    def __init__(self, nJobs: int):
        self.nJobs = nJobs

    def make(self, generator: Generator, process_func: Callable, *args,
             **kwargs) -> Generator:
        """Run the process function on the generator in parallel."""
        from tqdm import tqdm
        from joblib import Parallel, delayed

        # get the number of items in the generator
        generated_list = list(generator)

        # run the process function in parallel
        for result in Parallel(n_jobs=self.nJobs)(
                delayed(process_func)(item, *args, **kwargs) for item in
                tqdm(generated_list)
        ):
            yield result


mt.parallelGenerator = JoblibGenerator(mt.nJobs)
mt.chunkSize = 10  # reduce the chunk size to showcase the progress bar a bit better :)
results = []
for result in mt.processMols(MyProcessor()):
    results.append(result)

In [22]:
results

[(<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0204f4050>,
  'ParallelizationExample_storage_library_0000',
  'Brc1cc(Nc2nc3c(ncnc3N3CCCC3)s2)ccc1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0bd1ed4f0>,
  'ParallelizationExample_storage_library_0010',
  'C#CCN1CCC(NC(=O)Nc2nc3nn(C)cc3c3nc(-c4ccco4)nn23)CC1'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0cd4cc650>,
  'ParallelizationExample_storage_library_0020',
  'C#CCn1c(=O)c2c(nc(Cc3cccs3)[nH]2)n(Cc2ccco2)c1=O'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0cc0ac890>,
  'ParallelizationExample_storage_library_0030',
  'C#CCn1c(=O)c2c(nc3N(Cc4ccc(OC)cc4)CCCn32)n(C)c1=O'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb02e697b00>,
  'ParallelizationExample_storage_library_0040',
  'C#CCn1c(=O)c2c(nc3cc(OC)ccn32)n(Cc2ccccc2)c1=O'),
 (<qsprpred.data.storage.tabular.stored_mol.TabularMol at 0x7fb0204f5670>,
  'ParallelizationExample_storage_librar