# **Demo on the "External Task" functionality** 


## **Motivation**

### In Classical Workflow, the source of the data is usually a storage system. For instance, in the HPC area, the storage is usually a Parallel File System. Going through the storage system is not doable because of the IO bottleneck and the huge amount of data generated in some fields.

![Drag Racing](Figures/posthoc.png)


### In situ workflows bypass disk access as they analyze the data as closely as possible to when and where it is generated (the HPC platforms).

![Drag Racing](Figures/insitu.png)

### Bring together HPC and Big data communities by using Dask Distributed for In Situ data analytics.


## **Challenges**

### * It is about coupling two different Programming models, Message Passing and Distributed Task-based Paradigms
### * Different core concepts (data, communications, time ...)
### * A deep understanding of the scheduling and Dask operation in general were required 
### * This [thesis](https://www.researchgate.net/publication/371595603_Distributed_Task_based_In_Situ_Data_Analytics_for_High_Performance_Simulations) presents the challenges in depths and proposes a bridging models to couple MPI simulation and Dask analytics. 


## **How This Could be Done?**

### The available way to send raw data to dask workers without using the `Client.scatter` method, or so-called pure data tasks. 
### **Issue**: One can submit tasks on that data only once it is available on the client side:(because the scatter does not trigger the transition process). 


### Using `Client.scatter` to send simulation data to be analyzed by Dask is possible. However, lots of metadata must be sent to the scheduler to achieve that (for ex `Future`, semantic data about the chunk such as its position in the spatiotemporal distribution of an array ..). 
### **Issue**: When having 1000s of processers in a HPC simulation and have a client associated with each MPI process, the traffic to the scheduler becomes astronomical, thus it slows down it.  ([DEISA](https://ieeexplore.ieee.org/abstract/document/9680456) paper talks about this solution)


![Drag Racing](Figures/deisa.png)

## **Natively Support External Tasks in Dask**
    
### We define an `External Task` as a work unit that runs in an external environment than Dask. But still, it is known by the scheduler even before it becomes available, and can be used in a task graph as any (internal) Dask task.

### To support this new type of tasks without adding extra overhead in their management and be able to submit tasks on them before they become available we have introduced several 



### An `External Task` is created in a `deisa` (Dask-Enabled In Situ Analytics) state. It has a possible transition to the `memory` state when it becomes available in the distributed memory of Dask. And it unblockes all the dependent tasks by triggering the Transition Algorithm.

### The following is a typical use case of `External` tasks in in situ workflows

![Drag Racing](Figures/deisa2.png)


### The following Activity Diagram shows the how the `deisa` tasks opertaion:

![Drag Racing](Figures/DA_deisa.png)


## **Requirments**

### The [@GueroudjiAmal](https://github.com/GueroudjiAmal) [Dask Distributed Forked version on Github](https://github.com/GueroudjiAmal/distributed)


In [1]:
from distributed import Client, Future
from dask import delayed 
import dask.array as da
import numpy as np

# Create the local cluster and Clinet01

In [2]:
client1 = Client()

distributed.diskutils - INFO - Found stale lock file and directory '/home/agueroudji/Downloads/dask-worker-space/worker-a7p65_xe', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/agueroudji/Downloads/dask-worker-space/worker-_ecc9qwu', purging


In [3]:
client1

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: >,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 31.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34227,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 31.00 GiB

0,1
Comm: tcp://127.0.0.1:42947,Total threads: 3
Dashboard: http://127.0.0.1:36525/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:36139,
Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-7ive8o4j,Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-7ive8o4j

0,1
Comm: tcp://127.0.0.1:44305,Total threads: 3
Dashboard: http://127.0.0.1:43247/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:38343,
Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-j316s2uv,Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-j316s2uv

0,1
Comm: tcp://127.0.0.1:36621,Total threads: 3
Dashboard: http://127.0.0.1:36075/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:39305,
Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-bov6n6am,Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-bov6n6am

0,1
Comm: tcp://127.0.0.1:34359,Total threads: 3
Dashboard: http://127.0.0.1:32789/status,Memory: 7.75 GiB
Nanny: tcp://127.0.0.1:45521,
Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-hld1cq1e,Local directory: /home/agueroudji/Downloads/dask-worker-space/worker-hld1cq1e


# Create a Future with a given name `old`
### `old` in not related to any data, and we will try to create a dask array from it, they dask array will have the same name as the future.



In [4]:
old = Future("old", inform=True)

In [5]:
old

In [6]:
old

# Use the future in analytics 

### We submit a sum on that array namely by calling: `dask.array.sum()` method.



In [7]:
old_dask_array = da.from_delayed(delayed(old), shape=(10,10), dtype=float)
old_dask_array

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 800 B 800 B Shape (10, 10) (10, 10) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",10  10,

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [8]:
old_sum = old_dask_array.sum()
old_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,4 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 4 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,4 Tasks,1 Chunks
Type,float64,numpy.ndarray


# ERROR !!
### Dask will send the entry tasks to the workers, and try to start computing the `sum` on the `dask.array`, but the dask.array is empty. thus it fails. 



In [9]:
old_sum_res = old_sum.compute()

Function:  execute_task
args:      ((Compose(functools.partial(<function sum at 0x7fc8b30d0a60>, dtype=dtype('float64'), axis=(0, 1), keepdims=False), functools.partial(<function _concatenate2 at 0x7fc893e9fc70>, axes=(0, 1))), (subgraph_callable-4d95e9ee-f3ec-445a-8073-d4142911380c, None, None)))
kwargs:    {}
Exception: 'AxisError(0, 0, None)'



AxisError: axis 0 is out of bounds for array of dimension 0

# Now lets test the `External` task feature thanks to `deisa` following almost the same steps.
### * Create a `Future` with a specific name here `deisa_key`
### * Actuvate the `deisa` mode by setting the `deisa` parameter to `True`
### * Make sure to inform the scheduler that you desire getting news about this future.





In [10]:
deisa_future = Future("deisa_key", deisa=True,  inform=True)
deisa_future

# Here we are!! our fisrt external `Future`

### The `deisa` future corresponds to data we will have later from an external source, but we do not know when.
### We have enough information about the data that we will receive, here it is an [10, 10] array of floats.

### Let's create this dask.array reference with the same key as the `future`.

In [11]:
External_dask_array = da.from_delayed(delayed(deisa_future), shape=(10,10), dtype=float)
External_dask_array

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 800 B 800 B Shape (10, 10) (10, 10) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",10  10,

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray


# Ahead of time task submission

### The advantage of `deisa` tasks is that we can submit tasks on them that will not be scheduled :)  
### Those tasks are pure data tasks that will be sent to the workers when they are only produced by the external source that can be a running simulation.
### Once in the Dask memory, the scheduler will be informed that data is in memory and, triggers the transition Algorithm.
### Let's do that !!!
### We will try the same task graph, `dask.array.sum` could be anything else.

In [12]:
External_sum = External_dask_array.sum()
External_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,4 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 4 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,4 Tasks,1 Chunks
Type,float64,numpy.ndarray


### Don't worry it works, we just didn't submit anything yet to the scheduler

In [15]:
%time
res = External_sum.compute() 
res

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 8.58 µs


49.68244568807924

# Yeah !! it's computed 

In [19]:
client1.shutdown()