Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrying tasks wait to retry in the TaskRunner, blocking other tasks from running #3516

Closed
newskooler opened this issue Oct 16, 2020 · 14 comments
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore

Comments

@newskooler
Copy link

newskooler commented Oct 16, 2020

Current behavior

Let's say we have a task like this (pseudo code below):

from prefect import task, Flow
from prefect.engine.executors import LocalDaskExecutor
from prefect.environments import LocalEnvironment


@task(max_retries=3, retry_delay=timedelta(minutes=5))
def get_url_page(url_page: str):
    response = requests.get(url_page)
    response.raise_for_status()
    return response

If we execute this Flow like so :

with Flow('Example') as flow:
    all_url_pages = ['link_to_page1', 'link_to_page2', 'link_to_page3', ....]
    url_page_results = get_url_page.map(all_url_pages)

flow.environment = LocalEnvironment(
        labels=[], executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers),
    )

Then if one of the requests fails and the task waits for 5 min to be retried, none of the other URLs mapped to this task are executed. At the same time, the worker is idle.

Proposed behavior

Ideally I believe when a single job in the mapped task fails, while waiting for the retry, the worker should move to the next job.

@newskooler newskooler added the enhancement An improvement of an existing feature label Oct 16, 2020
@zanieb
Copy link
Contributor

zanieb commented Oct 16, 2020

This makes a lot of sense! I'm not sure off the top of my head why it's behaving that way but I'll try to dig into it soon.

@zanieb zanieb self-assigned this Oct 16, 2020
@Sinha-Ujjawal
Copy link

I am unable to reproduce this. Do you have a working code?

@newskooler
Copy link
Author

I am unable to reproduce this. Do you have a working code?

I can send a minimum viable example later today.

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2020

I am unable to reproduce this. Do you have a working code?

I can send a minimum viable example later today.

I also failed to reproduce this with the given code (added import requests; from datetime import timedelta; num_workers=1)

@jcrist
Copy link

jcrist commented Oct 19, 2020

This has to do with how retrying is currently implemented in prefect. If a task retries and the retry time is < 10 min (hardcoded), the retry wait will be handled in the TaskRunner, which will take a slot in the running executor (e.g. taking a thread in a LocalDaskExecutor). Fixing this would require some larger changes to handle retries at the FlowRunner level, so other work could be done while the task is waiting to retry. I agree that this would be useful to do, but it's not a quick thing to fix.

@jcrist jcrist changed the title Suggestion for worker to not stay idle when performing job on a single mapped task on a DaskExecutor. Retrying tasks wait to retry in the TaskRunner, blocking other tasks from running Oct 19, 2020
@cicdw
Copy link
Member

cicdw commented Oct 19, 2020

Some context: if we don't wait within the Task Runner, then the retry delay isn't really being respected (as the entire graph will need to be visited before returning to the retrying task, which generally takes time).

When running against a Prefect backend, if we don't retry within the runner, then that will result in a new Agent job submission at the retry time which gets really inefficient as the number of retries grows.

@jcrist
Copy link

jcrist commented Oct 19, 2020

Some context: if we don't wait within the Task Runner, then the retry delay isn't really being respected (as the entire graph will need to be visited before returning to the retrying task, which generally takes time).

Right. The retry delay is more of a minimum bound though (we can't guarantee that the task will retry exactly after that delay, that's up to the OS/runtime on how threads are scheduled). Handling it at the FlowRunner level would be possible with some changes to the graph traversal, but quick retries would definitely be slowed down as there'd be lots more overhead. The most efficient way to handle these would be some new thing that runs in every prefect worker process and handles worker-local scheduling - but right now that's all abstracted away in the Executor backends, so we don't really have that option (and adding it would be nontrivial).


We could do a quick-fix this for DaskExecutor based execution alone (not LocalDaskExecutor) by having a retrying task secede from the threadpool while waiting, unblocking other threads. I'm hesitant to do this though, as:

  • It could result in an explosion of threads if many tasks are retrying (we could add a mechanism to limit this if needed)
  • It's an executor-specific behavior, other executors wouldn't support this
  • It'd require either changing how executors work slightly to catch retrying states, or putting a DaskExecutor specific behavior inside the TaskRunner, neither one sounds particularly pleasant.

@cicdw
Copy link
Member

cicdw commented Oct 19, 2020

I don't see any benefit in changing the current behavior, especially if all solutions involve increasing complexity.

@jcrist
Copy link

jcrist commented Oct 19, 2020

I definitely don't see a benefit of the more complicated solution. The DaskExecutor only fix would rely on dask-specific behavior to handle this, but would probably be a fairly small diff in the prefect codebase. Still don't think it's worth it at this point, was just trying to get my thoughts out on the issue.

@newskooler
Copy link
Author

Do you still need code to reproduce?

I still think there is a lot of added value. Sometime when requesting multiple (say hundreds or thousands or more) URL, if one fails (e.g. the endpoint in the URL is not yet available and will be in some minutes or hours later), it makes to keep requesting the rest of the URLs in the meantime; or at least have the option to specify such behaviour (this will actually be ideal) and keep current behaviour as default for example.

@zanieb
Copy link
Contributor

zanieb commented Oct 21, 2020

@snenkov It may be useful if you provided the code for the future, but per the discussion above we're not going to be able to address this right now--the implications are too complex. I'll leave this issue open and hopefully we can address it down the road.

@zanieb zanieb removed their assignment Oct 30, 2020
@Zaubeerer
Copy link

We also run flows with mapped tasks that frequently comprise thousands of task runs. Therefore, this creates significant delays and costs on our side as well.

I don't have a deep understanding of prefect yet, but here is a possible conceptual solution from my perspective:
Couldn't an easy solution be to just queue the failed task run within the map to the end of the map?
Like such, the retries would be managed within a map (not at the flow level?) and therefore be executed relatively soon, without delaying other tasks in the map...

@github-actions
Copy link
Contributor

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

@github-actions github-actions bot added the status:stale This may not be relevant anymore label Nov 22, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Dec 6, 2022

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore
Projects
None yet
Development

No branches or pull requests

6 participants