# Ensemble Toolkit (EnTK)

*"A Python framework for developing and executing applications comprised of multiple sets of tasks, also known as ensembles."*

## Ensemble Toolkit: Overview

### What is an Ensemble?
* A set of computational tasks that need to be executed in order to achieve a desired result.
* The number and type of applications that can be formulated as ensembles is vast and span many scientific domains.

### Challenges of Ensemble Execution on HPC Machines
* Writing scientific problems in ways that lend themselves to distributed and coordinated solutions.
* Sizing, acquiring, and managing resources for the execution.
* Managing the execution of the ensemble.

Traditionally difficult to run ensemble applications on HPC, having to deal with scheduler and task distribution.

## Ensemble Toolkit: Overview

### Existing Solutions:
* Customized scripts: Fragile, nonportable, high user effort in task and resource management.
* End-to-end workflows: Rigid, lengthy development/modification time.

### Purpose of EnTK
* Adresses the challenges of scale, diversity, and reliability that are associated with scientific applications.
* Provides high-level abstractions for building ensemble-based applications.
* Acts as middleware between the application and the computing infrastructure.
* Takes responsibility for task, data, and resource management.

## Ensemble Toolkit: Overview

### EnTK Requirements
* Support heterogeneous computing infrastructures.
* Abstract the complexity of execution and resource management.
* Be performance independent of the type of computing infrastructure.

EnTK also has to be fault-tolerant at scale, i.e., when both the probability and cost of failure increase.

## Ensemble Toolkit: Application Model

Applications are contructed by combining the following user-facing components:

* **Task:** An abstraction of a computational task that contains information regarding an executable, its software environment and its data dependencies.
* **Stage:** A set of tasks without mutual dependence that can be executed concurrently.
* **Pipeline:** A list of stages that must be executed sequentially.

An application can consist of a set of pipelines, where each pipeline is a list of stages , and each stage is is a set of tasks.

## Ensemble Toolkit: Application Model

<img src="https://raw.githubusercontent.com/radical-cybertools/radical.entk/master/docs/figures/pst-model.jpg" alt="pst-model" width="500"/>

## Ensemble Toolkit: Installation

EnTK dependencies:

* Python 2.7
* RabbitMQ    (provides infrastructure for communication between internal components)
* MongoDB     (used to store and retrieve operational data during execution of an application)

Installation only required on local machine and not on remote computational infrastructure.

## Ensemble Toolkit: "Hello World!"
* How to construct an ensemble application.
* Single pipeline, stage, and task.
* Running on local machine.

**Importing components from the EnTK module**
```python
from radical.entk import Pipeline, Stage, Task, AppManager
```

**Creating the Workflow**
```python
# Create a Pipeline object
p = Pipeline()

# Create a Stage object
s = Stage()

# Create a Task object
t = Task()
t.name = 'my-first-task'        # Assign a name to the task
t.executable = ['/bin/echo']    # Assign executable to the task
t.arguments = ['Hello World!']  # Assign arguments for the task executable

# Add Task to the Stage
s.add_tasks(t)

# Add Stage to the Pipeline
p.add_stages(s)
```

**Creating the Application Manager**
```python
# Create Application Manager
appman = AppManager(hostname=hostname, port=port)

# Create a dictionary describe four mandatory keys:
# resource, walltime, and cpus
# resource is 'local.localhost' to execute locally
res_dict = {

    'resource': 'local.localhost',
    'walltime': 10,
    'cpus': 1
}

# Assign resource request description to the Application Manager
appman.resource_desc = res_dict

# Assign the workflow as a set or list of Pipelines to the Application Manager
# Note: The list order is not guaranteed to be preserved
appman.workflow = set([p])

# Run the Application Manager
appman.run()
```

## Ensemble Toolkit: Changing Target Machine

EnTK allows us to submit tasks on a remote machine from our local machine. It requires passwordless ssh access and the remote system needs to support Radical Pilot and EnTK.

<img src="https://raw.githubusercontent.com/radical-cybertools/radical.entk/master/docs/figures/hosts_and_ports.png" alt="hosts_and_ports" width="500"/>

Once passwordless access has been set up, switching from one target machine to another simply requires that we modify the resource dictionary in our code:

**Localhost**
```python
res_dict = {

    'resource': 'local.localhost',
    'walltime': 10,
    'cpus': 1
}
```

**XSEDE Bridges Cluster**
```python
res_dict = {

    'resource': 'xsede.comet',
    'walltime': 10,
    'cpus': 1,
    'schema': 'ssh'

}
```

## Ensemble Toolkit: SPECFEM Application

Global Inversion Project:
* Demands high amount of computational resources.
* Complex workflow with several stages and different requirements.
* Needs to run at extreme scale.

Increased scaling requires a more automated approach to management and execution of the workflow, which is what EnTK provides.

## Ensemble Toolkit: SPECFEM Application

<img src="images/specfem_pst.png" alt="specfem_pst" width="500"/>

**Creating the Workflow (Mesher)**
```python
p = Pipeline()

# First stage to perform one meshfem task
meshfem_stage = Stage()
    
t1 = Task()
    
t1.pre_exec = [ # Modules to be loaded
                'module load ...',
                'module load ...',
                # Untar the input data
                'tar xf meshfem_data.tar',
                # Preprocessing
                'mkdir DATABASES_MPI',
                'mkdir OUTPUT_FILES',
                'cp DATA/Par_file OUTPUT_FILES/',
                'cp DATA/CMTSOLUTION OUTPUT_FILES/',
                'cp DATA/STATIONS OUTPUT_FILES/',
                
        ]
t1.executable = ['./bin/xmeshfem3D']
t1.cpu_reqs = {'processes': 384, 'process_type': 'MPI',
               'threads_per_process': 1, 'thread_type': 'OpenMP'}

meshfem_stage.add_tasks(t1)

p.add_stages(meshfem_stage)
```

**Creating the Workflow (Solver)**
```python
# Second stage to perform multiple specfem tasks
specfem_stage = Stage()

events = [ 'C201303130312A', 'C200904210526A', 'C200809291519A',
           'C201404240310A', 'C201309251642A', 'C201004112208A', ]

for event in events:

    t = Task()
    t.pre_exec = [  # Modules to be loaded
                    'module load ...',
                    'module load ...',
                    # Untar the specfem input data
                    'tar xf specfem_data_event_%s.tar'%event,
                    # Link to common DATABASES_MPI containing mesh files
                    'ln -s /scratch/DATABASES_MPI DATABASES_MPI'

                    ]
    t.executable = ['./bin/xspecfem3D']
    t.cpu_reqs = {'processes': 0, 'process_type': 'MPI',
                  'threads_per_process': 0, 'thread_type': 'OpenMP'}
    t.gpu_reqs = {'processes': 384, 'process_type': 'MPI',
                  'threads_per_process': 1, 'thread_type': 'OpenMP'}
    t.copy_input_data = ['/scratch/specfem_data_event_%s.tar'%event]
                            ]
    specfem_stage.add_tasks(t)

p.add_stages(specfem_stage)
```

**Creating the Application Manager**
```python
res_dict = {
              'resource': 'ornl.titan_aprun',
              'walltime': 8*num_events,
              'cpus': 384,
              'gpus':  384,
              'project': 'GEO111',
              'schema': 'local'
           }
if num_events<=16:
    res_dict['queue'] = 'debug'
else:
    res_dict['queue'] = 'batch'
    
# Create Application Manager
appman = AppManager(hostname=hostname, port=port, resubmit_failed=False)

# Assign resource request description to the Application Manager
appman.resource_desc = res_dict

# Assign the workflow as a set or list of Pipelines to the Application Manager
appman.workflow = set([p])

# Run the Application Manager
appman.run()        
```

## Ensemble Toolkit: SPECFEM Application

<img src="images/specfem_tasks.png" alt="specfem_pst" width="800"/>

On Titan: Forward simulations are best executed with $2^4$ concurrent tasks.

## Ensemble Toolkit: Going Forward

* In general:
  * Port to Python 3
  * Release of milestone 1.0 version
* More specifically:
  * Scaling tests on Summit
  * Implement full global inversion pipeline in EnTK.