# Parallelization

TokSearch supports multiple methods of processing data after a ```Pipeline``` has been defined. ```compute_serial``` processes each shot in the ```Pipeline``` sequentially. This is fine for small problems. But, if either the number of shots or the amount of data per shot increases, then TokSearch supports multiple methods of parallelizing the computations. At present, three methods support parallel execution of ```Pipeline``` objects:

- ```compute_multiprocessing```: This method uses [JobLib](https://joblib.readthedocs.io/en/stable/) to execute the ```Pipeline``` using node-local multiprocessing.
- ```compute_ray```: Here, the [Ray](https://ray.readthedocs.io/en/latest/) framework is used to execute either node-local parallelization or fully distributed across multiple worker nodes.
- ```compute_spark```: Like ```compute_ray```, here [Apache Spark](https://spark.apache.org/) allows both node-local and fully distributed computation.

Spark and Ray offer similar capabilities and performance when used with TokSearch. In particular, both can be used in fully distributed fashion and parallelize the ```Pipeline``` processing across arbitrarily many compute nodes as long as a cluster is set up properly. Additionally, they each can be used to parallelize within a single node, taking advantage of multi-core systems. For small jobs, though, ```compute_multiprocessing``` method is often the fastest and lightest weight.

## Partitioning

```Pipeline``` parallelization, regardless of backend, works by partioning the list of shots into roughly equal length chunks and then assinging each chunk to a worker process. Once all chunks have been processed the resulting records are returned to the calling program and reassembled into a flattened list of ```Records```.

## Examples

The following examples show usage of the ```compute*``` methods.

A few prelimimaries first:

We'll import some things that we'll need. Note that we're specifying that the environment variables ```MKL_NUM_THREADS```, ```NUMEXPR_NUM_THREADS```, and ```OMP_NUM_THREADS``` should all equal "1". This prevents numpy from using multithreading, which tends to have a negative affect on execution time when running under Spark and Ray. Normally we'd want to set these using a modulefile or in a default environment, but we'll specify them here just to be sure. Note that these variables must be set before numpy is imported.

We'll also create a utility class for timing execution later on.

In [4]:
import time
import os
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"

import numpy as np

# Context manager that we can use to time execution
class Timer(object):
    def __init__(self):
        self.start = None
        
    def __enter__(self):
        self.start = time.time()
        
    def __exit__(self, *args):
        elapsed = time.time() - self.start
        print('Ran in {0:.2f} s'.format(elapsed))

Now we can build our pipeline. The particulars of the pipeline aren't critical. It just retrieves a few signals and performs a simple calculation, then returns just the result of the calculation.

In [5]:
from toksearch import Pipeline, MdsSignal
    
def create_pipeline(shots):
    ipmhd_signal = MdsSignal(r'\ipmhd', 'efit01')
    ip_signal = MdsSignal(r'\ipmeas', 'efit01')


    pipeline = Pipeline(shots)
    pipeline.fetch('ipmhd', ipmhd_signal)
    pipeline.fetch('ip', ip_signal)
    @pipeline.map
    def calc_max_ipmhd(rec):
        rec['max_ipmhd'] = np.max(np.abs(rec['ipmhd']['data']))
        
    pipeline.keep(['max_ipmhd'])
    return pipeline   
    

Now we specify a non-trivial number of shots and create a list of shot numbers.

In [6]:
num_shots = 2000
shots = list(range(165920, 165920+num_shots))

Finally, we run the different methods.

### Serial

In [7]:
print('*'*80)
print('RUNNING WITH compute_serial')
pipeline = create_pipeline(shots)
with Timer():
    serial_result = pipeline.compute_serial()

********************************************************************************
RUNNING WITH compute_serial
Ran in 38.58 s


### Multiprocessing

In [9]:
print('*'*80)
print('RUNNING WITH compute_multiprocessing')
pipeline = create_pipeline(shots)
with Timer():
    multiproc_result = pipeline.compute_multiprocessing()

********************************************************************************
RUNNING WITH compute_multiprocessing
Ran in 5.80 s


### Ray

In [11]:
import ray

# Create a dummy pipeline to initialize ray so we can benchmark without the startup overhead
dummy_res = Pipeline([1,2]).compute_ray()
list(dummy_res)

print('*'*80)
print('RUNNING WITH compute_ray')
pipeline = create_pipeline(shots)
with Timer():
    ray_result = pipeline.compute_ray(numparts=8)
    list(ray_result)
    ray_result.cleanup() # This shuts down the ray cluster

Initializing ray with _temp_dir = /mnt/beegfs/users/sammuli/tmp/tmpj4075hyz
********************************************************************************
RUNNING WITH compute_ray
Ran in 9.89 s


### Spark

In [13]:
print('*'*80)
print('RUNNING WITH compute_spark')

pipeline = create_pipeline(shots)

# Spark results are generated lazily, so calling compute_spark
# just initializes spark. We won't count that in our timing.
spark_result = pipeline.compute_spark(numparts=8)

with Timer():
    list(spark_result) # Converting to a list materializes the result
spark_result.cleanup()

********************************************************************************
RUNNING WITH compute_spark
MASTER: None
[('spark.master', 'None'), ('spark.app.submitTime', '1712249973365'), ('spark.submit.pyFiles', ''), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell')]


                                                                                

Ran in 9.04 s


## Summary

Summarizing the results:

| **Backend**     | **Execution Time (s)** | **Speedup** |
|-----------------|--------------------|-------------|
| Serial          | 38.58              | -           |
| Multiprocessing | 5.80               | 6.65x       |
| Ray             | 9.89               | 3.90x       |
| Spark           | 9.04               | 4.27x       |

These results were obtained using 8 cores and 8 partitions (specified in the ```numparts``` keyword argument in the ```compute_spark``` and ```compute_ray``` methods) on a saga compute node.

This demonstrates that the multiprocessing backed will often beat Ray or Spark due to the various parallelization overhead issues. But, for much bigger jobs, both Spark and Ray are capable of fully distributed computing that utilizes multiple nodes in a cluster.

Running a TokSearch script that utilizes these capabilities is covered [here](/distributed_computing/)