Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

De-dupe constant tasks #1527

Closed
bnaul opened this issue Sep 18, 2019 · 2 comments · Fixed by #1579
Closed

De-dupe constant tasks #1527

bnaul opened this issue Sep 18, 2019 · 2 comments · Fixed by #1579

Comments

@bnaul
Copy link

bnaul commented Sep 18, 2019

Probably not a high priority (and maybe not feasible for some reason I haven't thought of) but I find this behavior a bit irritating:

from prefect import task, Flow
from prefect.engine.executors import DaskExecutor

@task
def add(x, y):
    return x + y

with Flow('flow') as flow:
    for i in range(10):
        add(i, 0)

flow.run(executor=DaskExecutor('localhost:8786'))

Result: the constant 0 task is repeated every time, as is the corresponding log:
Screen Shot 2019-09-17 at 9 38 29 PM

[2019-09-18 04:37:19,486] INFO - prefect.TaskRunner | Task '0': Starting task run...
[2019-09-18 04:37:19,486] INFO - prefect.TaskRunner | Task '0': Starting task run...
[2019-09-18 04:37:19,489] INFO - prefect.TaskRunner | Task '0': Starting task run...
[2019-09-18 04:37:19,492] INFO - prefect.TaskRunner | Task '0': finished task run for task with final state: 'Success'
[2019-09-18 04:37:19,493] INFO - prefect.TaskRunner | Task '0': finished task run for task with final state: 'Success'
[2019-09-18 04:37:19,497] INFO - prefect.TaskRunner | Task '0': finished task run for task with final state: 'Success'
...

Is there any kind of simple de-duping scheme that might work within a flow, maybe just for Python literals...?

Edit: possibly kind of a dupe of #891 which was already declined...? 🤷‍♂

@cicdw
Copy link
Member

cicdw commented Sep 18, 2019

This is a really good call out; the original reason for this behavior is that we needed a place to track the values that are being used as Task inputs, and simply reusing Prefect's Task machinery was the most natural way to do that. In addition, if we see a non-Prefect Task as an input and simply stored its value without further introspection, we might lose dependency relationships such as:

with Flow("example") as flow:
    my_task(inputs=[task_a, task_b])

However, I fully agree that this is not ideal and something we should improve upon. In the meantime, one way of at least de-duping some constants is to wrap them in a Prefect Constant task:

from prefect import task, Flow
from prefect.tasks.core.constants import Constant
from prefect.engine.executors import DaskExecutor

@task
def add(x, y):
    return x + y

with Flow('flow') as flow:
    zero = Constant(0)
    for i in range(10):
        add(i, zero)

flow.run(executor=DaskExecutor('localhost:8786'))

I'll revisit the logic for this and see how we might improve it.

@bnaul
Copy link
Author

bnaul commented Sep 18, 2019

Makes sense, thanks! One more note: the same behavior would be nice for parameters. Currently:

with Flow('flow') as flow:
    for i in range(10):
        add(i, Parameter('x'))

flow.run(parameters={'x': 1}, executor=DaskExecutor('localhost:8786'))


ValueError: A task with the slug "x" already exists in this flow.

@cicdw cicdw mentioned this issue Oct 1, 2019
3 tasks
zanieb pushed a commit that referenced this issue Apr 13, 2022
…-ui/typescript-4.6.3

Bump typescript from 4.6.2 to 4.6.3 in /orion-ui
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants