From 440388a7e3e7d4eccd59f17f69b4e614e8c6d56c Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 15:02:28 -0500 Subject: [PATCH 1/6] cleaned file, added comment --- distributed/worker_client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/distributed/worker_client.py b/distributed/worker_client.py index 5a775b38191..cf5ace6f720 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -1,5 +1,6 @@ import warnings from contextlib import contextmanager +import threading import dask @@ -50,7 +51,12 @@ def worker_client(timeout=None, separate_thread=True): worker = get_worker() client = get_client(timeout=timeout) - if separate_thread: + + # When passing the client an async function, it runs on the event loop + # in the main thread instead of a background thread. This causes secede() to fail, + is_main_thread = threading.current_thread() is threading.main_thread() + if not is_main_thread and separate_thread: + duration = time() - thread_state.start_time secede() # have this thread secede from the thread pool worker.loop.add_callback( @@ -63,7 +69,7 @@ def worker_client(timeout=None, separate_thread=True): yield client - if separate_thread: + if not is_main_thread and separate_thread: rejoin() From a9263a24ba14c3e216a8fa2d640684e798a5ebdc Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 15:50:51 -0500 Subject: [PATCH 2/6] Add tests --- distributed/tests/test_worker_client.py | 35 +++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 468bd90d463..24ea66cb882 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -339,3 +339,38 @@ def long_running(): assert len(res) == 2 assert res[a.address] > 25 assert res[b.address] > 25 + + +def test_sync_func_on_main_thread(client): + """A synchronous client running a worker that calls an async task which submits its own task to + the scheduler should not fail""" + # https://github.com/dask/distributed/issues/5513 + + async def inc(n): + return n+1 + + async def f(): + with worker_client(separate_thread=True) as c: + m = c.submit(inc, 1) + return m + + res = client.submit(f) + assert res.exception() is None + + +@gen_cluster(client=True) +async def test_async_func_on_main_thread(c, s, a, b): + """An asynchronous client running a worker that calls an async task which submits its own task to + the scheduler should not fail""" + # https://github.com/dask/distributed/issues/5513 + + async def inc(n): + return n+1 + + async def f(): + with worker_client(separate_thread=True) as client: + m = await client.submit(inc, 1) + return m + + res = await c.submit(f) + assert res == 2 \ No newline at end of file From 2d94afd9e6774f78f7c30d1e2e86d0f88d539b46 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 15:52:20 -0500 Subject: [PATCH 3/6] linting --- distributed/tests/test_worker_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 24ea66cb882..7b4f9d8249b 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -347,7 +347,7 @@ def test_sync_func_on_main_thread(client): # https://github.com/dask/distributed/issues/5513 async def inc(n): - return n+1 + return n + 1 async def f(): with worker_client(separate_thread=True) as c: @@ -365,7 +365,7 @@ async def test_async_func_on_main_thread(c, s, a, b): # https://github.com/dask/distributed/issues/5513 async def inc(n): - return n+1 + return n + 1 async def f(): with worker_client(separate_thread=True) as client: @@ -373,4 +373,4 @@ async def f(): return m res = await c.submit(f) - assert res == 2 \ No newline at end of file + assert res == 2 From 523659e528ddfcd909cd925a3acb318d7074c2f5 Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 20:30:59 -0500 Subject: [PATCH 4/6] linting --- distributed/worker_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker_client.py b/distributed/worker_client.py index cf5ace6f720..d76d03b796b 100644 --- a/distributed/worker_client.py +++ b/distributed/worker_client.py @@ -1,6 +1,6 @@ +import threading import warnings from contextlib import contextmanager -import threading import dask From 7fdcaca9651041dc985a1e0b785b363794ea2b8e Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 20:53:58 -0500 Subject: [PATCH 5/6] Added parameterized tests to worker_client --- distributed/tests/test_worker_client.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 7b4f9d8249b..f71a95b4f63 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -180,9 +180,10 @@ def f(i): @gen_cluster(client=True) -async def test_client_executor(c, s, a, b): +@pytest.mark.parametrize('separate_thread', [True, False]) +async def test_client_executor(c, s, a, b, separate_thread): def mysum(): - with worker_client() as c: + with worker_client(separate_thread=separate_thread) as c: with c.get_executor() as e: return sum(e.map(double, range(30))) @@ -341,7 +342,8 @@ def long_running(): assert res[b.address] > 25 -def test_sync_func_on_main_thread(client): +@pytest.mark.parametrize('separate_thread', [True, False]) +def test_sync_func_on_main_thread(client, separate_thread): """A synchronous client running a worker that calls an async task which submits its own task to the scheduler should not fail""" # https://github.com/dask/distributed/issues/5513 @@ -350,7 +352,7 @@ async def inc(n): return n + 1 async def f(): - with worker_client(separate_thread=True) as c: + with worker_client(separate_thread=separate_thread) as c: m = c.submit(inc, 1) return m @@ -359,7 +361,8 @@ async def f(): @gen_cluster(client=True) -async def test_async_func_on_main_thread(c, s, a, b): +@pytest.mark.parametrize('separate_thread', [True, False]) +async def test_async_func_on_main_thread(c, s, a, b, separate_thread): """An asynchronous client running a worker that calls an async task which submits its own task to the scheduler should not fail""" # https://github.com/dask/distributed/issues/5513 @@ -368,7 +371,7 @@ async def inc(n): return n + 1 async def f(): - with worker_client(separate_thread=True) as client: + with worker_client(separate_thread=separate_thread) as client: m = await client.submit(inc, 1) return m From 49fe650bc0764bf3a780331db310640e7d52586c Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sun, 3 Apr 2022 21:10:07 -0500 Subject: [PATCH 6/6] Linting --- distributed/tests/test_worker_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index f71a95b4f63..4855a82ca64 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -180,7 +180,7 @@ def f(i): @gen_cluster(client=True) -@pytest.mark.parametrize('separate_thread', [True, False]) +@pytest.mark.parametrize("separate_thread", [True, False]) async def test_client_executor(c, s, a, b, separate_thread): def mysum(): with worker_client(separate_thread=separate_thread) as c: @@ -342,7 +342,7 @@ def long_running(): assert res[b.address] > 25 -@pytest.mark.parametrize('separate_thread', [True, False]) +@pytest.mark.parametrize("separate_thread", [True, False]) def test_sync_func_on_main_thread(client, separate_thread): """A synchronous client running a worker that calls an async task which submits its own task to the scheduler should not fail""" @@ -361,7 +361,7 @@ async def f(): @gen_cluster(client=True) -@pytest.mark.parametrize('separate_thread', [True, False]) +@pytest.mark.parametrize("separate_thread", [True, False]) async def test_async_func_on_main_thread(c, s, a, b, separate_thread): """An asynchronous client running a worker that calls an async task which submits its own task to the scheduler should not fail"""