Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception filters to retries similar to retry_on in v1 #5611

Closed
marvin-robot opened this issue Mar 29, 2022 · 2 comments
Closed

Add exception filters to retries similar to retry_on in v1 #5611

marvin-robot opened this issue Mar 29, 2022 · 2 comments
Labels
enhancement An improvement of an existing feature status:duplicate This issue already exists v1-parity A v1 feature under consideration for implementation in v2

Comments

@marvin-robot
Copy link
Member

marvin-robot commented Mar 29, 2022

TL;DR

Add a similar capability to retry_on functionality to allow retries on specific exception types

Needed for v1 parity:

- retry_on (Union[Exception, Iterable[Type[Exception]]], optional): Exception types that will
allow retry behavior to occur. If not set, all exceptions will allow retries. If set,
retries will only occur if the exception is a subtype of the exception types provided.

Opened from the Prefect Public Slack Community

mike399: Hi again, we have a requirement to handle two classes of errors: retryable errors and non-retryable errors. I think I saw some examples how this might be achieved somewhere but cant spot them for Prefect 2. In the non-retryable scenario we'd want the task state to go immediately to Failed (bypassing any retry settings configured on the task decorator)

anna: In Prefect 2.0 you can do:

For retriable errors:

@task(retries=2, retry_delay_seconds=60)

For non-retriable errors (no retries):

@task

mike399: But if I have a single task that can throw either type of error? Is there a context object that can be manipulated, I was thinking something similar to this?

import requests
from prefect import flow, task

@task(retries=3,retry_delay_seconds=5)
def call_api(url):
    try: 
        response = requests.get(url)
        print(response.status_code)
        return response.json()
    except NonRetryableError:
        TaskRunContext.set_state(TaskRun.FAILED)

@flow
def task_retry_bypass(url):
    fact_json = call_api(url)

anna: Generally speaking, there is no need to use try-except in a Prefect flow. That's why you use Prefect so that you don't need to worry about all the different types of exceptions that might need to be caught.

The retries mean a maximum number of retries:

  1. If the task doesn't need a retry and succeeds after its first execution, it's done.
  2. If it succeeds after the first retry (even though you configured 3 of them), the same - the task ends successfully after this first retry.
  3. If it still fails after 3 retries, then it will be marked as Failed.

anna: But if you want e.g. to do something if the task throws a specific exception, you could do it within a @flow by acting on the task.wait().result() which would be an exception in such case - LMK if you need an example

anna:

from prefect import task, flow, get_run_logger
from prefect.orion.schemas.states import Failed
import requests


@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    <http://logger.info|logger.info>(status_code)
    if status_code != 200:
        return Failed(message="Stopping the task run immediately!")


@flow
def task_retry_bypass(url):
    call_api(url)


if __name__ == "__main__":
    task_retry_bypass()

anna: this way you explicitly decide when the task run is supposed to be considered as Failed based on the API response - raising any exception will automatically cause the task run to be considered Failed without having to manually set that using:

TaskRunContext.set_state(TaskRun.FAILED)

mike399: Is overriding the task run state an option, it would allow us to benefit from the task level retry configuration - but also skip that when needed. A common scenario would be distinguishing between transient system failures (e.g. a service unavailable) vs permanent errors arising from unexpected application state (e.g. once an object has been deleted there's no point retrying any attempt to modify it)

anna: Good question. You can return any State in your task to influence that: https://orion-docs.prefect.io/concepts/states/

anna: Basically, instead of:

TaskRunContext.set_state(TaskRun.FAILED)

you do:

return prefect.orion.schemas.states.Failed("your_msg")

mike399: Thanks Anna, will give that a try

anna: You could even do it on a flow level - having normal retries on a task level, but if the task result returns a specific value, you raise an exception on a flow level and end the flow run immediately - just mentioning this because it's also a valid option:

from prefect import task, flow


@task(retries=3, retry_delay_seconds=5)
def call_api(url):
    response = requests.get(url)
    status_code = response.status_code
    logger = get_run_logger()
    <http://logger.info|logger.info>(status_code)
    return response.json()


@flow
def task_retry_bypass():
    res = call_api().wait().result()
    if res["sth"] == "sth":
        raise ValueError("Non retriable error!")


if __name__ == "__main__":
    task_retry_bypass()

mike399: This one worked (no retries)

from prefect import flow, task
import prefect

@task(retries=3,retry_delay_seconds=5)
def call_api(url):
    return prefect.orion.schemas.states.StateType.FAILED

@flow
def task_retry_bypass(url):
    fact_json = call_api(url)

task_retry_bypass("foo")
~                             

We'd like to make this as simple as possible (to reduce learning curve and help with developer experience), so I think we can extend the above and use an inner decorated function to hide all the details

anna: Sure, you can customize it as you wish, it's pretty much just Python so the sky is the limit! 🙂

mike399: I hit a probem with this...if the return is

return prefect.orion.schemas.states.StateType.FAILED

Then the retry is skipped BUT the Flow Run ends up in a Completed state - tracing through this occurs because the StateType.FAILED is not a State (so Prefect defaults to Completed for "python object returns")
and if we return

return prefect.orion.schemas.states.Failed()

Then client.py: propose_state appears to do a call to set_task_run_state, but the proposed state is rejected and we go into a retry loop

mike399: This is getting a little too involved, most of our workflows tend to be idempotent system operations, so we may be able to handle this in a different way (checking result payload in the flow as you indicated). Would be nice if Prefect had additional Exception filters it could apply for retry (similar to the tenacity library)

anna: Thanks for the suggestion, I'll pass it on to the product team

anna: <@ULVA73B9P> open “Orion: feature request to add Exception filters applicable for retries”

Original thread can be found here.

@zanieb
Copy link
Contributor

zanieb commented Mar 29, 2022

See also #5139

@zanieb zanieb added the enhancement An improvement of an existing feature label Mar 29, 2022
@zanieb zanieb changed the title Orion: feature request to add Exception filters applicable for retries Orion: Add exception filters to retries Mar 29, 2022
@zanieb zanieb changed the title Orion: Add exception filters to retries Add exception filters to retries Apr 1, 2022
@anna-geller anna-geller added the v1-parity A v1 feature under consideration for implementation in v2 label Dec 7, 2022
@anna-geller anna-geller changed the title Add exception filters to retries Add exception filters to retries similar to retry_on in v1 Dec 7, 2022
@zanieb
Copy link
Contributor

zanieb commented Feb 22, 2023

Let's track this in #7576 since this one has a big slack thread in it.

@zanieb zanieb closed this as not planned Won't fix, can't repro, duplicate, stale Feb 22, 2023
@zanieb zanieb added status:duplicate This issue already exists and removed status:roadmap labels Feb 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature status:duplicate This issue already exists v1-parity A v1 feature under consideration for implementation in v2
Projects
None yet
Development

No branches or pull requests

3 participants