Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PreventRedundantTransitions rule from core task orchestration policy #8802

Merged
merged 6 commits into from
Mar 16, 2023

Conversation

rpeden
Copy link
Contributor

@rpeden rpeden commented Mar 15, 2023

Some users choose to run Dask workers on ephemeral infrastructure like EC2 spot instances, as reported in #8602. Sometimes, this infrastructure disappears without giving Prefect the opportunity to gracefully handle the shutdown - instead, Dask itself will retry the task.

The abrupt shutdown, combined with Dask retrying the task without giving Prefect an opportunity to gracefully terminate the task, leave the task in a RUNNING state, causing an error when retrying: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.

The issue also appears to happen when running Ray on spot instances: #13013

After discussing the issue, we decided Prefect should not try to orchestrate things happening outside its control, so this PR removes the PreventRedundantTransitions rule from CoreTaskPolicy.

Closes #8602

A related issue also solved by this PR is #8597, where API timeouts occasionally result in task run trying to re-set the running state when it was already saved server-side.

Closes #8597

Example

Before applying the changes in this PR, run task in a Dask worker, and either kill the worker manually or call os.abort() inside the worker. Dask should restart the worker and begin re-running the task.

Below is a simple test that will cause random crashes, but succeed eventually. We want the task to crash the Dask runner on the first run, but succeed eventually:

import os
import random

from prefect import flow, task
from prefect_dask import DaskTaskRunner


@task()
def sometimes_crashes():
    num = random.randint(1, 10)
    if num > 3:
        os.abort()


@flow(
    task_runner=DaskTaskRunner(
        cluster_kwargs={"processes": True, "scheduler_kwargs": {"allowed_failures": 10}}
    )
)
def task_crash_test():
    res = sometimes_crashes.submit()
    res.result()



if __name__ == "__main__":
    task_crash_test()

Before applying this PR, when the worker crashes and tries to re-run the task, Prefect aborts the state transition and fails the flow run:

10:22:50.689 | INFO    | Flow run 'jolly-chicken' - Created task run 'sometimes_crashes-0' for task 'sometimes_crashes'
10:22:51.084 | INFO    | Flow run 'jolly-chicken' - Submitted task run 'sometimes_crashes-0' for execution.
10:22:51.378 | INFO    | distributed.nanny - Worker process 94265 was killed by signal 6
10:22:51.381 | WARNING | distributed.nanny - Restarting worker
10:22:51.463 | WARNING | Task run 'sometimes_crashes-0' - Task run 'a05d0e42-64ea-4b5b-819a-d06a25a32ab3' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
10:22:52.841 | ERROR   | Flow run 'jolly-chicken' - Finished in state Failed()

After applying the PR's changes, Prefect allows the task to restart, so Dask restarts it and when it completes, the flow finishes successfully:

10:24:45.489 | INFO    | Flow run 'encouraging-chicken' - Created task run 'sometimes_crashes-0' for task 'sometimes_crashes'
10:24:45.824 | INFO    | Flow run 'encouraging-chicken' - Submitted task run 'sometimes_crashes-0' for execution.
10:24:45.965 | INFO    | distributed.nanny - Worker process 94592 was killed by signal 6
10:24:45.968 | WARNING | distributed.nanny - Restarting worker
10:24:46.048 | INFO    | Task run 'sometimes_crashes-0' - Finished in state Completed()
10:24:47.434 | INFO    | Flow run 'encouraging-chicken' - Finished in state Completed('All states completed.')

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • This pull request includes tests or only affects documentation.
  • This pull request includes a label categorizing the change e.g. fix, feature, enhancement

@netlify
Copy link

netlify bot commented Mar 15, 2023

Deploy Preview for prefect-docs ready!

Name Link
🔨 Latest commit e48159e
🔍 Latest deploy log https://app.netlify.com/sites/prefect-docs/deploys/6412b25d24560e000801dbe8
😎 Deploy Preview https://deploy-preview-8802--prefect-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site settings.

@rpeden rpeden added the fix A fix for a bug in an existing feature label Mar 15, 2023
@rpeden rpeden changed the title Remove redundant task transition rule Remove PreventRedundantTransitions rule from core task orchestration policy Mar 15, 2023
@rpeden rpeden added the DONT MERGE This PR shouldn't be merged (yet) label Mar 15, 2023
@rpeden rpeden removed the DONT MERGE This PR shouldn't be merged (yet) label Mar 16, 2023
@rpeden rpeden marked this pull request as ready for review March 16, 2023 06:55
@rpeden rpeden requested a review from a team as a code owner March 16, 2023 06:55
@rpeden rpeden requested a review from jakekaplan March 16, 2023 13:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix A fix for a bug in an existing feature great writeup This is a wonderful example of our standards
Projects
None yet
3 participants