Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Investigate compatibility with Dask changes to run tasks outside their event loop #55

Closed
zanieb opened this issue Dec 1, 2022 · 2 comments
Assignees

Comments

@zanieb
Copy link
Contributor

zanieb commented Dec 1, 2022

Dask is updating their worker to run submitted async functions outside of their core event loop — this makes a ton of sense but it's breaking Prefect and we are blocking merge of the change. We should determine what changes in behavior are happening and coordinate with them to update their PR or update Prefect to support the changes.

See the change at dask/distributed#7339
See failures at https://github.com/PrefectHQ/prefect-dask/actions/runs/3593749911/jobs/6051108850 in #50

@ahuang11 ahuang11 self-assigned this Dec 1, 2022
@ahuang11
Copy link
Contributor

ahuang11 commented Dec 1, 2022

I am encountering this error when I run pytest tests -k "test_failing_flow_run[dask_task_runner_with_thread_pool]"

../../../mambaforge/envs/prefect-dask/lib/python3.9/site-packages/alembic/util/langhelpers.py:68: KeyError
---------------------------------------------------------------------------------------------------------------------------- Captured log call ----------------------------------------------------------------------------------------------------------------------------
INFO     prefect.engine:engine.py:227 Created flow run 'rebel-lyrebird' for flow 'test-flow'
INFO     prefect.task_runner.dask:task_runners.py:290 Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
INFO     prefect.task_runner.dask:task_runners.py:307 The Dask dashboard is available at http://192.168.50.68:8787/status
INFO     prefect.flow_runs:engine.py:1080 Created task run 'task_b-8e3be95f-0' for task 'task_b'
INFO     prefect.flow_runs:engine.py:1116 Submitted task run 'task_b-8e3be95f-0' for execution.
INFO     prefect.flow_runs:engine.py:1080 Created task run 'task_a-4179b103-0' for task 'task_a'
INFO     prefect.flow_runs:engine.py:1116 Submitted task run 'task_a-4179b103-0' for execution.
INFO     prefect.flow_runs:engine.py:1080 Created task run 'task_c-167cbbb3-0' for task 'task_c'
INFO     prefect.flow_runs:engine.py:1116 Submitted task run 'task_c-167cbbb3-0' for execution.
INFO     prefect.flow_runs:engine.py:1080 Created task run 'task_c-167cbbb3-1' for task 'task_c'
INFO     prefect.flow_runs:engine.py:1116 Submitted task run 'task_c-167cbbb3-1' for execution.
ERROR    prefect.task_runs:engine.py:1357 Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 1340, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/andrew/Applications/python/prefect/src/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/andrew/Applications/python/prefect/src/prefect/testing/standard_test_suites/task_runners.py", line 80, in task_b
    raise ValueError("This task fails and passes data downstream!")
ValueError: This task fails and passes data downstream!
ERROR    prefect.task_runs:engine.py:1398 Finished in state Failed('Task run encountered an exception: ValueError: This task fails and passes data downstream!\n')
INFO     prefect.task_runs:engine.py:1426 Crash detected! Execution was interrupted by an unexpected exception: KeyError: 'script'

ERROR    prefect.flow_runs:engine.py:384 Finished in state Failed('2/4 states failed.')
========================================================================================================================= short test summary info =========================================================================================================================
FAILED tests/test_task_runners.py::TestDaskTaskRunner::test_failing_flow_run[dask_task_runner_with_thread_pool] - KeyError: 'script'

alembic has to something to do with database so I remove ~/.prefect/orion.db, but still got the same error; then I realized conftest uses prefect_test_harness, so I commented it out:

# @pytest.fixture(scope="session", autouse=True)
# def prefect_db():
#     """
#     Sets up test harness for temporary DB during test runs.
#     """
#     with prefect_test_harness():
#         yield

And that test passes

========================================================================================================================= test session starts ===========================================================================================================================
platform darwin -- Python 3.9.13, pytest-7.1.3, pluggy-1.0.0 -- /Users/andrew/mambaforge/envs/prefect-dask/bin/python3.9
cachedir: .pytest_cache
rootdir: /Users/andrew/Applications/python/prefect-dask, configfile: setup.cfg
plugins: anyio-3.6.1, flaky-3.7.0, asyncio-0.19.0, timeout-2.1.0, respx-0.20.0
asyncio: mode=auto
collected 117 items / 116 deselected / 1 selected

tests/test_task_runners.py::TestDaskTaskRunner::test_failing_flow_run[dask_task_runner_with_thread_pool] <- ../prefect/src/prefect/testing/standard_test_suites/task_runners.py 12:10:14.804 | INFO    | prefect.engine - Created flow run 'sage-waxbill' for flow 'test-flow'
12:10:14.807 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
12:10:15.205 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://192.168.50.68:8787/status
12:10:16.323 | INFO    | Flow run 'sage-waxbill' - Created task run 'task_b-8e3be95f-0' for task 'task_b'
12:10:16.693 | INFO    | Flow run 'sage-waxbill' - Submitted task run 'task_b-8e3be95f-0' for execution.
12:10:16.697 | INFO    | Flow run 'sage-waxbill' - Created task run 'task_a-4179b103-0' for task 'task_a'
12:10:16.699 | INFO    | Flow run 'sage-waxbill' - Submitted task run 'task_a-4179b103-0' for execution.
12:10:16.867 | INFO    | Flow run 'sage-waxbill' - Created task run 'task_c-167cbbb3-0' for task 'task_c'
12:10:16.871 | INFO    | Flow run 'sage-waxbill' - Submitted task run 'task_c-167cbbb3-0' for execution.
12:10:17.002 | INFO    | Flow run 'sage-waxbill' - Created task run 'task_c-167cbbb3-1' for task 'task_c'
12:10:17.007 | INFO    | Flow run 'sage-waxbill' - Submitted task run 'task_c-167cbbb3-1' for execution.
12:10:17.099 | ERROR   | Task run 'task_a-4179b103-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 1340, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/andrew/Applications/python/prefect/src/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/andrew/Applications/python/prefect/src/prefect/testing/standard_test_suites/task_runners.py", line 76, in task_a
    raise RuntimeError("This task fails!")
RuntimeError: This task fails!
12:10:17.128 | ERROR   | Task run 'task_b-8e3be95f-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/andrew/Applications/python/prefect/src/prefect/engine.py", line 1340, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/andrew/Applications/python/prefect/src/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/andrew/mambaforge/envs/prefect-dask/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/andrew/Applications/python/prefect/src/prefect/testing/standard_test_suites/task_runners.py", line 80, in task_b
    raise ValueError("This task fails and passes data downstream!")
ValueError: This task fails and passes data downstream!
12:10:17.232 | ERROR   | Task run 'task_a-4179b103-0' - Finished in state Failed('Task run encountered an exception: RuntimeError: This task fails!\n')
12:10:17.256 | ERROR   | Task run 'task_b-8e3be95f-0' - Finished in state Failed('Task run encountered an exception: ValueError: This task fails and passes data downstream!\n')
12:10:18.768 | ERROR   | Flow run 'sage-waxbill' - Finished in state Failed('2/4 states failed.')
PASSED

==================================================================================================================== 1 passed, 116 deselected in 4.76s ==================================================================================================================

@desertaxle
Copy link
Member

Fixed via PrefectHQ/prefect#7789

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants