Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause does not work with map for flow.run() #5504

Closed
marvin-robot opened this issue Mar 2, 2022 · 0 comments
Closed

Pause does not work with map for flow.run() #5504

marvin-robot opened this issue Mar 2, 2022 · 0 comments

Comments

@marvin-robot
Copy link
Member

Opened from the Prefect Public Slack Community

brettnaul: q about mapping and PAUSE signals: the mapping docs say
> Even though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.
this seems kinda true, but when you try to raise PAUSE inside a mapped task it seems to just loop infinitely, whereas for a normal task you see <Task: pause> is currently Paused; enter 'y' to resume: . anyone have a strong opinion on whether this is a bug or just not a supported usage?

from prefect import Flow, task
from prefect.engine.signals import PAUSE

@task
def pause(i):
    import prefect
    if not prefect.context.get('resume'):
        raise PAUSE("pausing")
    return i

with Flow("f") as f:
   # pause.map(i=[1])  # infinite loop
   pause(i=1)          # works normally

f.run()

brettnaul: mapped version output

[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'
INFO:prefect.TaskRunner:Task 'pause': Finished task run for task with final state: 'Mapped'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Starting task run...
INFO:prefect.TaskRunner:Task 'pause[0]': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | PAUSE signal raised: PAUSE('pausing')
INFO:prefect.TaskRunner:PAUSE signal raised: PAUSE('pausing')
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause[0]': Finished task run for task with final state: 'Paused'
INFO:prefect.TaskRunner:Task 'pause[0]': Finished task run for task with final state: 'Paused'
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
INFO:prefect.FlowRunner:Flow run RUNNING: terminal tasks are incomplete.
[2022-03-02 08:53:41-0500] INFO - prefect.FlowRunner | Beginning Flow run for 'f'
INFO:prefect.FlowRunner:Beginning Flow run for 'f'
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Starting task run...
INFO:prefect.TaskRunner:Task 'pause': Starting task run...
[2022-03-02 08:53:41-0500] INFO - prefect.TaskRunner | Task 'pause': Finished task run for task with final state: 'Mapped'

...etc etc

kevin701: I think there are a lot of things that behave different from flow.run() compared to agent deployed runs. For example, only one Flow can be fired at a time even if the schedule has two runs.

For this one specifically, I would expect the Cloud runs to work even if flow.run() does not. But yeah, I can see how this can be painful because it means you can’t test locally whatsoever. I’d open an issue.

kevin701: <@ULVA73B9P> open “Pause does not work with map for flow.run()”

Original thread can be found here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants