Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generating workflow based on external input #5297

Closed
pythi0s opened this issue Jan 3, 2022 · 3 comments
Closed

Generating workflow based on external input #5297

pythi0s opened this issue Jan 3, 2022 · 3 comments
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore

Comments

@pythi0s
Copy link

pythi0s commented Jan 3, 2022

Current behavior

Currently we have to define tasks using @task decorator and later we create workflow using those created tasks, as stated in documentation. This workflow is static i.e. we are calling tasks as a function and passing parameters to create the dependencies between the tasks. It works great!

Proposed behavior

If you have N tasks defined, instead of defining the workflow statically, is there any way to generate the workflow based on external user input? This input may contain required info like the root tasks which do not depend on any other tasks, few tasks having dependency on other tasks, input for each task etc. something like fission-workflow or dapr-workflows

Example

Let's say I have multiple tasks defined as follows:

@task
def task_1():
    ...

@task
def task_2():
    ...

.
.
.

@task
def task_n():
    ...

Now, let's say I have a POST api which receives a json body defining tasks with its input and dependency, something like this:

{
    "tasks": [
        {
            "name": "task_1",
            "input": [
                {
                    "url": "<some url>"
                }
            ],
            "depends": []
        },
        {
            "name": "task_2",
            "input": [
                {
                    "output_of": "task_1"
                }
            ],
            "depends": [
                "task_1"
            ]
        },
        {
            "name": "task_3",
            "input": [
                {
                    "output_of": "task_1"
                }
            ],
            "depends": [
                "task_1"
            ]
        },
        {
            "name": "task_4",
            "input": [
                {
                    "output_of": "task_1"
                },
                {
                    "output_of": "task_2"
                }
            ],
            "depends": [
                "task_1",
                "task_2"
            ]
        },
        {
            "name": "task_5",
            "input": [
                {
                    "output_of": "task_3"
                },
                {
                    "output_of": "task_4"
                }
            ],
            "depends": [
                "task_3",
                "task_4"
            ]
        }
    ]
}

If we could pass this input to construct the workflow, I would like to construct the workflow based on this input instead of statically defining it within the code by parsing the list and calling the function accordingly. Is there any way to do this?
Any input or help is appreciated!

NOTE: This input seems very naive, it can be changed according to the requirement of Prefect implementation.

@pythi0s pythi0s added the enhancement An improvement of an existing feature label Jan 3, 2022
@anna-geller
Copy link
Contributor

anna-geller commented Jan 27, 2022

There are many ways you could approach this depending on what problem or use case this should solve.

What your example API request is defining is actually not as dynamic as it seems to be at a first glance. In fact, it represents an incredibly simple and static state of workflows where your dependencies can be described in nothing but a YAML/JSON file. There are Prefect features such as mapping, task looping, parametrization, and flow-of-flows orchestrator pattern that allow for much more dynamicism.

Having said that, what you are describing is possible today using Orion. In Orion, there is no need for flow preregistration, and the DAG structure is not required. This means that you can have a deployment pointing to a Python file that defines your flow including the dependencies between tasks. You can change the tasks, introduce new ones and specify new dependencies within this flow file and Orion will automatically detect the new flow structure with no need for extra API calls or re-registration.

So your use case is already possible in Orion, but instead of defining the new tasks/new flow structure via extra API calls, you simply update the underlying Python file defining your flow.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 5, 2023

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

@github-actions github-actions bot added the status:stale This may not be relevant anymore label Mar 5, 2023
@github-actions
Copy link
Contributor

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature status:stale This may not be relevant anymore
Projects
None yet
Development

No branches or pull requests

2 participants