Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log warning when a Concurrent, Dask, or Ray versions of PrefectFuture are garbage collection before resolution #14148

Merged
merged 2 commits into from
Jun 20, 2024

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented Jun 19, 2024

Adds a __del__ method to the following PrefectFuture implementations:

  • PrefectConcurrentFuture
  • PrefectDaskFuture
  • PrefectRayFuture

These futures will log a warning if they are garbage collection before resolving. The user will be directed to call either .wait() or .result() on the future for subsequent runs.

Notably, this allows us to log a warning when a PrefectRayFuture is not resolved, which we were not doing previously.

Closes #14018

Example

A flow that existing without waiting for a future like this:

from prefect import flow, task
from prefect_ray import RayTaskRunner


@task
def say_hello(name):
    return f"Hello {name}!"


@flow(task_runner=RayTaskRunner)
def my_flow():
    say_hello.submit("world")


if __name__ == "__main__":
    my_flow()

will output the following logs:

10:51:49.951 | INFO    | prefect.engine - Created flow run 'viridian-sheep' for flow 'my-flow'
10:51:49.952 | INFO    | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/2f67cb2c-9a8f-4e6c-a02e-130f0aa6b779
10:51:49.975 | INFO    | prefect.task_runner.ray - Creating a local Ray instance
2024-06-19 10:51:51,645	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
10:51:52.114 | INFO    | prefect.task_runner.ray - Using Ray cluster with 1 nodes.
10:51:52.115 | INFO    | prefect.task_runner.ray - The Ray UI is available at 127.0.0.1:8265
10:51:52.482 | WARNING | Flow run 'viridian-sheep' - A future was garbage collected before it resolved. Please call `.wait()` or `.result()` on futures to ensure they resolve.
10:51:54.041 | INFO    | Flow run 'viridian-sheep' - Finished in state Completed()

This is the newly added log:

10:51:52.482 | WARNING | Flow run 'viridian-sheep' - A future was garbage collected before it resolved. Please call `.wait()` or `.result()` on futures to ensure they resolve.

Checklist

  • This pull request includes a label categorizing the change e.g. maintenance, fix, feature, enhancement, docs.
  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • If this pull request removes docs files, it includes redirect settings in mint.json.
  • If this pull request adds functions or classes, it includes helpful docstrings.

@desertaxle desertaxle changed the title Warn on unwaited futures Log warning when a Concurrent, Dask, or Ray versions of PrefectFuture are garbage collection before resolution Jun 19, 2024
@desertaxle desertaxle marked this pull request as ready for review June 19, 2024 16:10
@desertaxle desertaxle requested review from zzstoatzz and a team as code owners June 19, 2024 16:10
@desertaxle desertaxle added the enhancement An improvement of an existing feature label Jun 19, 2024
@desertaxle desertaxle enabled auto-merge (squash) June 19, 2024 16:21
Copy link
Collaborator

@chrisguidry chrisguidry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig it! I think we should maybe do it everywhere?

Comment on lines +135 to +145
def __del__(self):
if self._final_state or self._wrapped_future.done():
return
try:
local_logger = get_run_logger()
except Exception:
local_logger = logger
local_logger.warning(
"A future was garbage collected before it resolved."
" Please call `.wait()` or `.result()` on futures to ensure they resolve.",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to move this to the base PrefectWrappedFuture (or even PrefectFuture)? It seems like even if it's just being pedantic, it could help people when they are switching between different task runners?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way that we determine if a wrapped future is resolved is different for each future type. Also, PrefectDistributedFuture is a little tricky because sometimes it's ok if you don't wait on it (e.g. if deferred is True). I don't want to make a __del__ method a required part of the interface until we can reliably check the resolution of all future types. I expect to revisit this again once TaskRunWaiter has some additional capabilities.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

@desertaxle desertaxle merged commit 27bdbb0 into main Jun 20, 2024
42 checks passed
@desertaxle desertaxle deleted the warn-on-unwaited-futures branch June 20, 2024 14:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a warning for PrefectFutures that haven't been waited
2 participants