Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access task inputs in state handler #3921

Closed
mianos opened this issue Jan 5, 2021 · 5 comments
Closed

Access task inputs in state handler #3921

mianos opened this issue Jan 5, 2021 · 5 comments

Comments

@mianos
Copy link
Contributor

mianos commented Jan 5, 2021

Description

I would like to receive task parameters inputs in a state_handlers *edit: change parameters to inputs.

Expected Behavior

Maybe I am not doing this correctly but, as per the title, in my example below, setting state_handlers=[ignore_timeout_handler] I would expect:

def ignore_timeout_handler(task, old_state, new_state):
    print(f"{prefect.context.parameters} old {old_state} new {new_state}")

When exercised with:

result = slow_task.map(item=nrange,
                           sleep_time=nrange)

to print the item number and sleep time but it is empty:

[2021-01-05 02:40:27] INFO - prefect.FlowRunner | Beginning Flow run for 'Slow flow'
[2021-01-05 02:40:27] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2021-01-05 02:40:27] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2021-01-05 02:40:27] INFO - prefect.TaskRunner | Task 'SlowTask': Starting task run...
{} old <Pending> new <Mapped: "Ready to proceed with mapping.">
[2021-01-05 02:40:27] INFO - prefect.TaskRunner | Task 'SlowTask': finished task run for task with final state: 'Mapped'
[2021-01-05 02:40:27] INFO - prefect.TaskRunner | Task 'SlowTask[0]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <Success: "Task run succeeded.">

Reproduction

from time import sleep

from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect

from prefect.engine.executors import LocalDaskExecutor, LocalExecutor


def ignore_timeout_handler(task, old_state, new_state):
    print(f"{prefect.context.parameters} old {old_state} new {new_state}")
    if new_state.is_failed() and isinstance(new_state, state.TimedOut):
        return_state = state.Success(result={"state": "forced ok"})
    else:
        return_state = new_state
    return return_state


@task
def produce_range():
    return range(5, 10)


class SlowTask(Task):
    def run(self, item, sleep_time=9, **kwopts):
        sleep(sleep_time)
        # doing stuff with a host called 'item'
        return item


with Flow("Slow flow") as flow:
    slow_task = SlowTask(timeout=6, max_retries=2, retry_delay=2, state_handlers=[ignore_timeout_handler])
    nrange = produce_range()
    result = slow_task.map(item=nrange,
                           sleep_time=nrange)

# executor = LocalDaskExecutor(scheduler="threads", num_workers=10)
executor = LocalExecutor()
for ii in flow.run(executor=executor).result[result].result:
    print(ii)

Environment

{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-3.10.0-1062.4.1.el7.x86_64-x86_64-with-centos-7.7.1908-Core",
    "prefect_backend": "server",
    "prefect_version": "0.14.1",
    "python_version": "3.6.8"
  }
}

Same under ubuntu 20.04

@joshmeek
Copy link

joshmeek commented Jan 5, 2021

Hi @mianos you are attempting to access prefect Parameter values in your state handler and you are not passing any parameters into your flow. I believe you actually want to access your task's inputs in the handler but task inputs are not one of the values that are placed into context. I don't believe the task inputs are currently accessible in the state handler but I think it is something that could be accomplished.

@joshmeek joshmeek changed the title state_handlers in '.map'ped tasks don't receive task parameters on prefect.context.parameters Access task inputs in state handler Jan 5, 2021
@mianos
Copy link
Contributor Author

mianos commented Jan 5, 2021

Thanks Josh, yes this is what I mean. Somehow get the task input so I can properly handle the timeouts in the handler.

I can probably do the same thing by replacing the item, similarly to above, then in the next task, iterate through the original list of items and the result at the same time and make a new list that has the failures with their item index and the good ones.
Although that will work in this case, it would be great to be able to get the information in the state_handler to be able to do this sort of thing in a generic manner, keeping with the 'assume failures' philosophy.

@mianos
Copy link
Contributor Author

mianos commented Jan 5, 2021

ps, in case anyone else cares (I notice this has been asked on slack a few times), the following satisfies this in the meantime:

class CaughtTimeout:
    pass

def ignore_timeout_handler(task, old_state, new_state):
    if new_state.is_failed() and isinstance(new_state, state.TimedOut):
        return state.Success(result=CaughtTimeout())
    else:
        return new_state


@prefect.task
def combine_failed(item_list, results):
    rvals = list()
    for ii, oo in zip(item_list, results):
        if isinstance(oo, CaughtTimeout):
            rvals.append({'item': ii, 'status': 'timeout'})
        else:
            rvals.append({'item': ii, 'status': 'OK'})
    return rvals


with Flow("Slow flow") as flow:
    slow_task = SlowTask(timeout=5, state_handlers=[ignore_timeout_handler])
    nrange = produce_range()
    t_result = slow_task.map(item=nrange,
                           sleep_time=nrange)
    result = combine_failed(nrange, t_result)

@github-actions
Copy link
Contributor

github-actions bot commented Dec 6, 2022

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

@github-actions
Copy link
Contributor

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants