Skip to content

Commit

Permalink
Log warning when a Concurrent, Dask, or Ray versions of `PrefectFutur…
Browse files Browse the repository at this point in the history
…e` are garbage collection before resolution (#14148)
  • Loading branch information
desertaxle authored Jun 20, 2024
1 parent 026b787 commit 27bdbb0
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 1 deletion.
15 changes: 15 additions & 0 deletions src/integrations/prefect-dask/prefect_dask/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ def count_to(highest_number):

from prefect.client.schemas.objects import State, TaskRunInput
from prefect.futures import PrefectFuture, PrefectWrappedFuture
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.task_runners import TaskRunner
from prefect.tasks import Task
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.collections import visit_collection
from prefect.utilities.importtools import from_qualified_name, to_qualified_name
from prefect_dask.client import PrefectDaskClient

logger = get_logger(__name__)


class PrefectDaskFuture(PrefectWrappedFuture[distributed.Future]):
"""
Expand Down Expand Up @@ -129,6 +132,18 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state or self._wrapped_future.done():
return
try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)


class DaskTaskRunner(TaskRunner):
"""
Expand Down
33 changes: 33 additions & 0 deletions src/integrations/prefect-dask/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,39 @@ async def adapt(self, **kwargs):
with task_runner:
assert task_runner._cluster._adapt_called

def test_warns_if_future_garbage_collection_before_resolving(
self, caplog, task_runner
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
for _ in range(10):
test_task.submit()

test_flow()

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_future_resolved_when_garbage_collected(
self, task_runner, caplog
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
futures = [test_task.submit() for _ in range(10)]
for future in futures:
future.wait()

test_flow()

assert "A future was garbage collected before it resolved" not in caplog.text

class TestInputArguments:
async def test_dataclasses_can_be_passed_to_task_runners(self, task_runner):
"""
Expand Down
21 changes: 21 additions & 0 deletions src/integrations/prefect-ray/prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def count_to(highest_number):
from prefect.client.schemas.objects import TaskRunInput
from prefect.context import serialize_context
from prefect.futures import PrefectFuture, PrefectWrappedFuture
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.states import State, exception_to_crashed_state
from prefect.task_engine import run_task_async, run_task_sync
from prefect.task_runners import TaskRunner
Expand All @@ -90,6 +91,8 @@ def count_to(highest_number):
from prefect.utilities.collections import visit_collection
from prefect_ray.context import RemoteOptionsContext

logger = get_logger(__name__)


class PrefectRayFuture(PrefectWrappedFuture[ray.ObjectRef]):
def wait(self, timeout: Optional[float] = None) -> None:
Expand Down Expand Up @@ -129,6 +132,24 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state:
return
try:
ray.get(self.wrapped_future, timeout=0)
return
except GetTimeoutError:
pass

try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)


class RayTaskRunner(TaskRunner[PrefectRayFuture]):
"""
Expand Down
33 changes: 33 additions & 0 deletions src/integrations/prefect-ray/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,36 @@ def flow_with_dependent_tasks():
e.submit(wait_for=[b_future])

flow_with_dependent_tasks()

def test_warns_if_future_garbage_collection_before_resolving(
self, caplog, task_runner
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
for _ in range(10):
test_task.submit()

test_flow()

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_future_resolved_when_garbage_collected(
self, task_runner, caplog
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
futures = [test_task.submit() for _ in range(10)]
for future in futures:
future.wait()

test_flow()

assert "A future was garbage collected before it resolved" not in caplog.text
14 changes: 13 additions & 1 deletion src/prefect/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.exceptions import ObjectNotFound
from prefect.logging.loggers import get_logger
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.states import Pending, State
from prefect.task_runs import TaskRunWaiter
from prefect.utilities.annotations import quote
Expand Down Expand Up @@ -143,6 +143,18 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state or self._wrapped_future.done():
return
try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)


class PrefectDistributedFuture(PrefectFuture):
"""
Expand Down
12 changes: 12 additions & 0 deletions tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def test_result_with_final_state_and_raise_on_failure(self):
with pytest.raises(ValueError, match="oops"):
future.result(raise_on_failure=True)

def test_warns_if_not_resolved_when_garbage_collected(self, caplog):
PrefectConcurrentFuture(uuid.uuid4(), Future())

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_resolved_when_garbage_collected(self, caplog):
wrapped_future = Future()
wrapped_future.set_result(Completed())
PrefectConcurrentFuture(uuid.uuid4(), wrapped_future)

assert "A future was garbage collected before it resolved" not in caplog.text


class TestResolveFuturesToStates:
async def test_resolve_futures_transforms_future(self):
Expand Down

0 comments on commit 27bdbb0

Please sign in to comment.