Skip to content

Improve dask dashboard displaying of nested tasks execution #4346

@pierreglaser

Description

@pierreglaser

Description

dask's dashboard Task Stream pane (in the status page) seems to map horizontal sections within the pane to unique worker thread ids.

On the other hand, when a client.submit call is made within a submit-ed task ("nested task" situation), additional worker threads must be created through secede calls, which in turns creates additional horizontal section.
However,

  • the two (outer and inner) task status are not independent: the outer task will complete only if the inner task has already completed.
  • as far as I can tell, the use case for secede is only to make sure a passive, waiting worker thread does not occupy the thread pool while it waits for other tasks. Therefore, even though an additional worker thread is created at each secede calls, the number of active threads remains unchanged throughout the program execution.

In my opinion both of those points are not made visually clear by the dask dashboard. If n working threads exist initially in the cluster and m secede calls are made, n+m horizontal sections (suggesting n+m active threads) will displayed, while the arrangement does not take into account the hierarchical structure of the program (nested submit calls)

See screenshot:
nested_submit_dask_dashboard

That was obtained from the following reproducer
import logging
import time

import numpy as np

from distributed import (
    LocalCluster,
    Client,
    get_client,
    secede,
    rejoin,
    get_worker,
    as_completed,
)


def sum_and_sleep(array):
    time.sleep(0.5)
    return np.sum(array)


def outer_function(array, i):
    print(f"running outer task {i}")
    client = get_client()

    futures = client.map(
        sum_and_sleep, [array[i + j :] for j in range(NUM_INNER_TASKS)]
    )
    secede()

    results = [f.result() for f in futures]
    return sum(results)


if __name__ == "__main__":
    NUM_OUTER_TASKS = 10
    NUM_INNER_TASKS = 10

    my_arrays = [np.ones(100000) for _ in range(NUM_OUTER_TASKS)]

    cluster = LocalCluster(
        n_workers=4, threads_per_worker=1, silence_logs=logging.WARNING
    )
    client = Client(cluster)

    list_of_res = []
    results = client.map(outer_function, my_arrays, range(len(my_arrays)))
    print(client.gather(results))

Although 4 worker threads are active at the same time, many more line exists (13 in this situation).

Real life use case

Nested scatter calls typically happen in joblib, in the context of nested Parallel calls.

Next steps proposal

Would you be opened to improve this situation? I'm thinking about a config entry allowing toggling between mapping horizontal sections to thread ids, and mapping horizontal sections to {process id +/hostnames}, although there may surely be better display options. I can make a PR, I'm mostly waiting for some kind for prior consent by core developers.

cc'ing @ogrisel.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions