Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log warning when a Concurrent, Dask, or Ray versions of PrefectFuture are garbage collection before resolution #14148

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/integrations/prefect-dask/prefect_dask/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,16 @@ def count_to(highest_number):

from prefect.client.schemas.objects import State, TaskRunInput
from prefect.futures import PrefectFuture, PrefectWrappedFuture
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.task_runners import TaskRunner
from prefect.tasks import Task
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.collections import visit_collection
from prefect.utilities.importtools import from_qualified_name, to_qualified_name
from prefect_dask.client import PrefectDaskClient

logger = get_logger(__name__)


class PrefectDaskFuture(PrefectWrappedFuture[distributed.Future]):
"""
Expand Down Expand Up @@ -129,6 +132,18 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state or self._wrapped_future.done():
return
try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)
Comment on lines +135 to +145
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to move this to the base PrefectWrappedFuture (or even PrefectFuture)? It seems like even if it's just being pedantic, it could help people when they are switching between different task runners?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way that we determine if a wrapped future is resolved is different for each future type. Also, PrefectDistributedFuture is a little tricky because sometimes it's ok if you don't wait on it (e.g. if deferred is True). I don't want to make a __del__ method a required part of the interface until we can reliably check the resolution of all future types. I expect to revisit this again once TaskRunWaiter has some additional capabilities.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!



class DaskTaskRunner(TaskRunner):
"""
Expand Down
33 changes: 33 additions & 0 deletions src/integrations/prefect-dask/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,39 @@ async def adapt(self, **kwargs):
with task_runner:
assert task_runner._cluster._adapt_called

def test_warns_if_future_garbage_collection_before_resolving(
self, caplog, task_runner
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
for _ in range(10):
test_task.submit()

test_flow()

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_future_resolved_when_garbage_collected(
self, task_runner, caplog
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
futures = [test_task.submit() for _ in range(10)]
for future in futures:
future.wait()

test_flow()

assert "A future was garbage collected before it resolved" not in caplog.text

class TestInputArguments:
async def test_dataclasses_can_be_passed_to_task_runners(self, task_runner):
"""
Expand Down
21 changes: 21 additions & 0 deletions src/integrations/prefect-ray/prefect_ray/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def count_to(highest_number):
from prefect.client.schemas.objects import TaskRunInput
from prefect.context import serialize_context
from prefect.futures import PrefectFuture, PrefectWrappedFuture
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.states import State, exception_to_crashed_state
from prefect.task_engine import run_task_async, run_task_sync
from prefect.task_runners import TaskRunner
Expand All @@ -90,6 +91,8 @@ def count_to(highest_number):
from prefect.utilities.collections import visit_collection
from prefect_ray.context import RemoteOptionsContext

logger = get_logger(__name__)


class PrefectRayFuture(PrefectWrappedFuture[ray.ObjectRef]):
def wait(self, timeout: Optional[float] = None) -> None:
Expand Down Expand Up @@ -129,6 +132,24 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state:
return
try:
ray.get(self.wrapped_future, timeout=0)
return
except GetTimeoutError:
pass

try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)


class RayTaskRunner(TaskRunner[PrefectRayFuture]):
"""
Expand Down
33 changes: 33 additions & 0 deletions src/integrations/prefect-ray/tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,36 @@ def flow_with_dependent_tasks():
e.submit(wait_for=[b_future])

flow_with_dependent_tasks()

def test_warns_if_future_garbage_collection_before_resolving(
self, caplog, task_runner
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
for _ in range(10):
test_task.submit()

test_flow()

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_future_resolved_when_garbage_collected(
self, task_runner, caplog
):
@task
def test_task():
return 42

@flow(task_runner=task_runner)
def test_flow():
futures = [test_task.submit() for _ in range(10)]
for future in futures:
future.wait()

test_flow()

assert "A future was garbage collected before it resolved" not in caplog.text
14 changes: 13 additions & 1 deletion src/prefect/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.exceptions import ObjectNotFound
from prefect.logging.loggers import get_logger
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.states import Pending, State
from prefect.task_runs import TaskRunWaiter
from prefect.utilities.annotations import quote
Expand Down Expand Up @@ -143,6 +143,18 @@ def result(
_result = run_coro_as_sync(_result)
return _result

def __del__(self):
if self._final_state or self._wrapped_future.done():
return
try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)


class PrefectDistributedFuture(PrefectFuture):
"""
Expand Down
12 changes: 12 additions & 0 deletions tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ def test_result_with_final_state_and_raise_on_failure(self):
with pytest.raises(ValueError, match="oops"):
future.result(raise_on_failure=True)

def test_warns_if_not_resolved_when_garbage_collected(self, caplog):
PrefectConcurrentFuture(uuid.uuid4(), Future())

assert "A future was garbage collected before it resolved" in caplog.text

def test_does_not_warn_if_resolved_when_garbage_collected(self, caplog):
wrapped_future = Future()
wrapped_future.set_result(Completed())
PrefectConcurrentFuture(uuid.uuid4(), wrapped_future)

assert "A future was garbage collected before it resolved" not in caplog.text


class TestResolveFuturesToStates:
async def test_resolve_futures_transforms_future(self):
Expand Down
Loading