Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronously executing custom graphs #3400

Open
MichaelSchreier opened this issue Jan 22, 2020 · 4 comments
Open

Asynchronously executing custom graphs #3400

MichaelSchreier opened this issue Jan 22, 2020 · 4 comments

Comments

@MichaelSchreier
Copy link

I'm trying to run multiple custom graphs via client.get(...) simultaneously but I'm having issues getting this to work.

The native Futures interface does not seem to support custom graphs but only executing individual functions.
I've also tried various permutations of the examples provided on the Asynchronous Operation section of the documentation (both in the asyncioand torando flavor), but to no avail.

The following code for instance yields an exception or cancels the tasks depending on where you apply the asynchronous=True argument:

import asyncio
import time

from dask.distributed import LocalCluster, Client


def mywait(seconds: int):
    print(f"I'm waiting for {seconds} seconds")
    time.sleep(seconds)
    return seconds


def passthrough(arg):
    return arg


def run_asyncio():
    cluster = LocalCluster(dashboard_address=22222, processes=True)
    # client = Client(cluster)

    graph_1 = {"wait 10": (mywait, 10), "end": (passthrough, "wait 10")}
    graph_2 = {"wait 20": (mywait, 20), "end": (passthrough, "wait 20")}

    async def _run_multiple_graphs(graphs):
        client = await Client(cluster, asynchronous=True)
        result = await asyncio.wait([client.get(graph, "end") for graph in graphs])
        return result

    output = asyncio.get_event_loop()\
        .run_until_complete(_run_multiple_graphs([graph_1, graph_2]))
    print(output)


if __name__ == "__main__":
    run_asyncio()

As I will be starting computation of the graphs simultaneously a viable workaround is to just cleverly merge graph_1 and graph_2 into a single definition but I'm wondering whether that is the only solution.

@mrocklin
Copy link
Member

mrocklin commented Jan 22, 2020 via email

@MichaelSchreier
Copy link
Author

I did try various placements of the asynchronous=True keyword (cluster, client, function call, combinations) but never got it working correctly.

What did work was simply performing the calls to client.get(...) from independent threads (via ThreadPoolExecutor):

import time
from dask.distributed import LocalCluster, Client
from concurrent.futures import ThreadPoolExecutor, as_completed


def mywait(seconds: int):
    print(f"I'm waiting for {seconds} seconds")
    time.sleep(seconds)
    return seconds


def passthrough(arg):
    return arg


def run_threaded():
    cluster = LocalCluster(dashboard_address=22222, processes=True)
    client = Client(cluster)

    graph_1 = {"wait 10": (mywait, 10), "end1": (passthrough, "wait 10")}
    graph_2 = {"wait 20": (mywait, 20), "end2": (passthrough, "wait 20")}

    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(client.get, graph_1, 'end1'),
            executor.submit(client.get, graph_2, 'end2')
        ]

        for future in as_completed(futures):
            try:
                print(future.result())
            except:
                pass

    client.close()
    cluster.close()


if __name__ == "__main__":
    run_threaded()

One thing one needs to take care of, however, is that the graphs must have different keys, otherwise the graphs are merged on the identical keys. As one can imagine this leads to all sorts of weird behavior.

I'd still be interested to hear how to make this work using async calls.

@jrbourbeau
Copy link
Member

There's also a sync= keyword for Client.get that, when set to False, will return a Future rather than the concrete result. Perhaps that may be of use to you here @MichaelSchreier

@jakirkham
Copy link
Member

Admittedly this is an old thread, but going to add this anyways in case others come by

There is Client.get_executor, which returns a concurrent.futures.Executor instance that uses the Distributed Client to submit work on the backend. This may fit the second usage case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants