Skip to content

Commit

Permalink
Prevent deadlocks with default ThreadPoolTaskRunner settings (#14201)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle committed Jun 20, 2024
1 parent 940197c commit ed41da9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/prefect/task_runners.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc
import asyncio
import sys
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextvars import copy_context
Expand Down Expand Up @@ -205,7 +206,7 @@ class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture]):
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
self._executor: Optional[ThreadPoolExecutor] = None
self._max_workers = max_workers
self._max_workers = sys.maxsize if max_workers is None else max_workers

def duplicate(self) -> "ThreadPoolTaskRunner":
return type(self)(max_workers=self._max_workers)
Expand Down
26 changes: 26 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import time
import uuid
from concurrent.futures import Future
from typing import Any, Iterable, Optional
Expand All @@ -9,6 +10,7 @@
from prefect._internal.concurrency.api import create_call, from_async
from prefect.context import TagsContext, tags
from prefect.filesystems import LocalFileSystem
from prefect.flows import flow
from prefect.futures import PrefectFuture, PrefectWrappedFuture
from prefect.results import _default_storages
from prefect.settings import (
Expand Down Expand Up @@ -200,6 +202,30 @@ def test_map_with_future_resolved_to_list(self):
results = [future.result() for future in futures]
assert results == [(1, 1), (2, 2), (3, 3)]

def test_handles_recursively_submitted_tasks(self):
"""
Regression test for https://github.com/PrefectHQ/prefect/issues/14194.
This test ensures that the ThreadPoolTaskRunner doesn't place an upper limit on the
number of submitted tasks active at once. The highest default max workers on a
ThreadPoolExecutor is 32, so this test submits 33 tasks recursively, which will
deadlock without the ThreadPoolTaskRunner setting the max_workers to sys.maxsize.
"""

@task
def recursive_task(n):
if n == 0:
return n
time.sleep(0.1)
future = recursive_task.submit(n - 1)
return future.result()

@flow
def test_flow():
return recursive_task.submit(33)

assert test_flow().result() == 0


class TestPrefectTaskRunner:
@pytest.fixture(autouse=True)
Expand Down

0 comments on commit ed41da9

Please sign in to comment.