# RADICAL-Cybertools Tutorial

RADICAL-Cybertools support the execution of ensemble applications at scale on high performance computing (HPC) platforms. Ensemble applications enable using diverse algorithms to coordinate the execution of up to $10^6$ tasks on all the processors (CPU/GPU) of an HPC machine. This type of applications are common in biophysical systems, climate science, seismology, and polar science domains. RADICAL-Cybertools address challenges of scale, diversity and reliability.

Adaptive ensemble are a particularly interesting type of ensemble applications in which adaptivity is used to determine the behavior of the application at runtime. For example, many biomolecular sampling algorithms are formulated as adaptive: replica-exchange, Expanded Ensemble, etc. Introducing adaptivity, improved simulation efficiency of up to a factor three, but implementing adaptive ensemble applications is challenging due to the complexity of the required algorithms.

## RADICAL-EnTK (Ensemble Toolkit)

RADICAL-Cybertools offers [RADICAL-EnTK (Ensemble Toolkit)](https://radicalentk.readthedocs.io/en/stable/index.html), a workflow engine specifically designed to support the execution of (adaptive) ensemble applications at scale on HPC platforms. EnTK allows users to separate adaptive logic and simulation/analysis code, while abstracting away from the users issues of resource management and resource management and runtime execution coordination. 

EnTK exposes a simple application programming interface (API), implemented in Python and with two (Pythonic) collections of objects and three classes:
* Set: contains objects that have no relative order with each other
* Sequence/List: contains objects that have a linear order, i.e. object 'i' depends on object 'i-1'
* Task: description of executing kernel
* Stage: set of Tasks, i.e. all tasks of a stage may execute concurrently
* Pipeline: sequence of Stages, i.e., Stage 2 may only commence after Stage 1 completes

Thus, in EnTK an ensemble application is described as a set of pipelines, in which each pipiline has a sequence/list of stages, and each stage has a set of tasks. The following figure shows an example of ensemble application in which tasks are represented by arrows:

<img src="./images/entk-pst-model.png" width="350"/>

## Preparing the Execution Environment

Depending on the execution environment, you may want to use the Spack package or the container provided by Exaworks SDK, or load the module provided by the administrators of the HPC platform on which you are executing this tutorial.

In [1]:
!radical-stack


  python               : /opt/conda/bin/python3
  pythonpath           : 
  version              : 3.9.13
  virtualenv           : base

  radical.entk         : 1.41.0
  radical.gtod         : 1.41.0
  radical.pilot        : 1.41.0
  radical.saga         : 1.41.0
  radical.utils        : 1.41.0



# Example: Ensemble of Simulation Pipelines

The following example application shows the execution of a simple ensemble of simulations. Each ensemble member is in itself a pipeline of **three** different stages:

1. **"seed"**: generate a random seed as input data
2. **"simulate"**: evolve a model based on that input data via a set of ensembles
3. **"analyse"**: derive a common metric across the model results

Similar patterns are frequently found in molecular dynamics simulation workflows. For the purpose of this tutorial, the stages are:

- random seed: create a random number
- evolve model: N tasks computing n'th power of the input
- common metric: sum over all 'model' outputs

The final results are then staged back and printed on STDOUT.

The **two** pipelines execute concurrently and, as per EnTK API definitions, each stage inside each pipeline executes sequentially. Importantly, when a stage contains **multiple** tasks, all those tasks can execute concurrently, assuming that enough resources are available. Given a set of resources, EnTK always executes the ensemble application with the highest possible degree of concurrency but, when not enough resources are available, the tasks of a stage may be executed sequentially. All this is transparent to the user that is left free to focus on the ensemble algorithm without having to deal with parallelism and resource management.

First we import EnTK Python module in our application so to be able to use its API.

In [2]:
import radical.entk as re

The following function generates a single simulation pipeline, i.e., a new ensemble member. The pipeline structure consisting of three steps as described above.

In [3]:
def get_stage_1(sandbox):

    # first stage: create 1 task to generate a random seed number
    s1 = re.Stage()

    t1 = re.Task()
    t1.executable = '/bin/sh'
    t1.arguments  = ['-c', 'od -An -N1 -i /dev/random']
    t1.stdout     = 'random.txt'
    t1.sandbox    = sandbox

    s1.add_tasks(t1)
    return s1

In [4]:
def get_stage_2(sandbox):
    
    # second stage: create 10 tasks to compute the n'th power of a random seed
    s2 = re.Stage()

    n_simulations = 10
    for i in range(n_simulations):
        t2 = re.Task()
        t2.executable = '/bin/sh'
        t2.arguments  = ['-c', "echo '$(cat random.txt) ^ %d' | bc" % i]
        t2.stdout     = 'power.%03d.txt' % i
        t2.sandbox    = sandbox
        s2.add_tasks(t2)
    
    return s2

In [5]:
def get_stage_3(sandbox):
    
    # third stage: compute sum over all powers
    s3 = re.Stage()

    t3 = re.Task()
    t3.executable = '/bin/sh'
    t3.arguments  = ['-c', 'cat power.*.txt | paste -sd+ | bc']
    t3.stdout     = 'sum.txt'
    t3.sandbox    = sandbox

    # download the result while renaming to get unique files per pipeline
    t3.download_output_data = ['sum.txt > %s.sum.txt' % sandbox]
    
    s3.add_tasks(t3)
    return s3

In [6]:
def generate_pipeline(uid):

    # all tasks in this pipeline share the same sandbox
    sandbox = uid

    # assemble three stages into a pipeline and return it
    p = re.Pipeline()
    p.add_stages([get_stage_1(sandbox), 
                  get_stage_2(sandbox), 
                  get_stage_3(sandbox)])

    return p

In [7]:
%env RADICAL_LOG_LVL=OFF
%env RADICAL_REPORT_ANIME=FALSE

env: RADICAL_LOG_LVL=OFF
env: RADICAL_REPORT_ANIME=FALSE


Now we write the ensemble application. We create an EnTK's application manager which executes our ensemble.

In [8]:
appman = re.AppManager()

[94mEnTK session: re.session.d0d9a60a-6ec3-11ee-bacd-0242ac110003
[39m[0m[94mCreating AppManager
[39m[0m[94mSetting up ZMQ queues[39m[0m[92m                                                         ok
[39m[0m[94mAppManager initialized[39m[0m[92m                                                        ok
[39m[0m

We assign resource request description to the application manager using three mandatory keys: target resource, walltime, and number of cpus:

In [9]:
appman.resource_desc = {
    'resource': 'local.localhost_test',
    'walltime': 10,
    'cpus'    : 2
}

[94mValidating and assigning resource manager[39m[0m[92m                                     ok
[39m[0m

We create an ensemble of **n** simulation pipelines:

In [10]:
n_pipelines = 2

ensemble = set()
for cnt in range(n_pipelines):
    ensemble.add(generate_pipeline(uid='pipe.%03d' % cnt))

We assign the workflow to the application manager, then run the ensemble and wait for completion:

In [11]:
appman.workflow = ensemble
appman.run()

[94mSetting up ZMQ queues[39m[0m[92m                                                        n/a
[39m[0m[94mnew session: [39m[0m[re.session.d0d9a60a-6ec3-11ee-bacd-0242ac110003][39m[0m[94m                 \
zmq proxy  : [39m[0m[tcp://172.17.0.3:10001][39m[0m[92m                                         ok
[39m[0m[94mcreate pilot manager[39m[0m[92m                                                          ok
[39m[0m[94msubmit 1 pilot(s)[39m[0m
        pilot.0000   local.localhost_test      2 cores       0 gpus[39m[0m[92m           ok
[39m[0m[92mUpdate: [39m[0m[92mAll components created
[39m[0m[94mpipeline.0001 state: SCHEDULING
[39m[0m[92mUpdate: [39m[0m[94mpipeline.0001.stage.0003 state: SCHEDULING
[39m[0m[92mUpdate: [39m[0m[94mpipeline.0001.stage.0003.task.000012 state: SCHEDULING
[39m[0m[92mUpdate: [39m[0m[94mpipeline.0000 state: SCHEDULING
[39m[0m[92mUpdate: [39m[0m[94mpipeline.0000.stage.0000 state: SCHEDULING
[39m[0m[

We check results which were staged back

In [12]:
for cnt in range(n_pipelines):
    data = open('pipe.%03d.sum.txt' % cnt).read()
    print('%3d -- %25d' % (cnt, int(data)))

  0 --         76099344201990076
  1 --          2834413842138373
