Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuring different task runner for dev and prod deployments #5560

Closed
marvin-robot opened this issue Mar 16, 2022 · 10 comments
Closed

Configuring different task runner for dev and prod deployments #5560

marvin-robot opened this issue Mar 16, 2022 · 10 comments

Comments

@marvin-robot
Copy link
Member

Opened from the Prefect Public Slack Community

davzucky: How can I setup in Orion the task runner at deployment? I want to be able to change the mode between local dev and prod. I can only see how to do that for flow runners at the deployment level

anna: It's not possible to do that directly atm since currently the task runner must be specified on the flow decorator rather than on the DeploymentSpec. And the more I think about it, the more it actually makes sense.

The problem you are describing is not "How can I override the task runner used by a flow on a DeploymentSpec" - this is one possible solution. The actual problem is: "How can I use a different task runner for development and production deployments". And to solve that problem, the intended solution is to have two different deployments: one for dev, and one for prod. The entire dev vs. prod story is not yet fully established - we are working on adding e.g. GitHub storage. What would likely be a good solution to your issue (that is currently not possible yet) is that you may have in the end two different branches on the same repo:
• the "dev" branch may have flow code with, say DaskTaskRunner,
• the "main" or "prod" branch may have flow code with a different task runner e.g. ConcurrentTaskRunner
and then you can have two different deployments - one for dev and one for prod - each of those references the flow code on a given branch. This would give a clear separation of code and environments and would make building CI/CD pipelines much easier.

Having said that, it could be worth exploring adding some sort of override on the DeploymentSpec (exactly as you mentioned) - I will open an issue to open this up for discussion. The only problem I currently see with that is that it goes a bit against the runtime discoverability in Orion - the way I understand how deployments work is that you should be able to create a single deployment for your_flow.py - then, you can run your deployed flow first using say DaskTaskRunner. But then you can modify the flow code your_flow.py, referenced as flow_location on the DeploymentSpec, change the task runner to e.g. ConcurrentTaskRunner and you can create a flow run of this deployed flow with this new task runner without having to recreate the deployment as Orion allows for runtime discoverability and doesn't force you to preregister any DAG's metadata.

Frankly, even in Prefect 1.0, Prefect doesn't store the executor (effectively the same as task runner in 2.0) information in the backend for privacy reasons because it may contain private information such as your Dask/Ray cluster address. Instead, Prefect retrieves this information at runtime from storage - this is another reason I would be leaning more towards two different versions of this flow in dev and prod branches as a solution to this problem.

For now, you could certainly introduce a hack by setting some custom parameter value (that can be set on your DeploymentSpec) that determines which subflow to call:

from prefect import flow
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner


@flow(task_runner=DaskTaskRunner())
def dask_flow():
    pass  # your flow logic here - it may be some extra function


@flow(task_runner=SequentialTaskRunner())
def sequential_flow():
    pass  # your flow logic here - it may be some extra function


@flow
def parent_flow(environment: str = "dev"):
    if environment == "dev":
        dask_flow()
    else:
        sequential_flow()

<@ULVA73B9P> open "Orion: as a user, how can I use a different task runner for development and production deployments?"

Original thread can be found here.

@zanieb zanieb changed the title Orion: as a user, how can I use a different task runner for development and production deployments? Orion: Configuring different task runner for dev and prod deployments Mar 16, 2022
@zanieb
Copy link
Contributor

zanieb commented Mar 16, 2022

An example workaround

import os

from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner


@flow(
    task_runner=DaskTaskRunner()
    if os.environ.get("MY_ENV") == "prod"
    else ConcurrentTaskRunner(),
)
def my_flow():
    ...

# Edit: Note flow runners are now infrastructure blocks
prod = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"})
)

dev = DeploymentSpec(
    flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
)

We are likely to support setting the task runner from the deployment directly in the future.

@zanieb zanieb changed the title Orion: Configuring different task runner for dev and prod deployments Configuring different task runner for dev and prod deployments Apr 1, 2022
@zanieb zanieb added v2 labels Apr 1, 2022
@mkarbo
Copy link
Contributor

mkarbo commented Apr 26, 2022

Hi, I have the same issues and ended up going with a similar workaround - though my workaround quickly became verbose due to ,my dask and dask_kubernetes configurations not fitting into 1-3 lines - is there any ETA on this being supported?

@anna-geller
Copy link
Contributor

no ETA so far, following this issue is the best way to stay up to date on the progress here

@john-jam
Copy link
Contributor

john-jam commented Oct 11, 2022

Hi 👋
I tried to replicate this workaround but I think things have changed in prefect v2 since then (e.g. no more DeploymentSpec with flow_runner arg) and I am curious how can we now achieve the same thing (configure a Task Runner at deployment time)?

@bunchesofdonald
Copy link
Contributor

bunchesofdonald commented Oct 11, 2022

Hi @john-jam, the pattern @madkinsz showed above should still be valid, changing out the task_runner based on an env var:

@flow(
    task_runner=DaskTaskRunner()
    if os.environ.get("MY_ENV") == "prod"
    else ConcurrentTaskRunner(),
)
def my_flow():

Then you can set env vars when initializing the Deployment class:

from <some_module> import my_flow

deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="my-flow",
    infra_overrides={"env": {"MY_ENV": "prod"}},
    work_queue_name="test",
)

if __name__ == "__main__":
    deployment.apply()

Hopefully that helps.

@john-jam
Copy link
Contributor

john-jam commented Oct 11, 2022

Thank you for your response @bunchesofdonald !
I ended up using the env option in KubernetesJob infrastructure object directly.

Btw, if someone else is interested, we have to define the task_runner in the flow annotation now and not as a flow argument like this:

@flow(
    task_runner=DaskTaskRunner()
    if os.environ.get("MY_ENV") == "prod"
    else ConcurrentTaskRunner(),
)
def my_flow():

If not, a serialization error occurs.

@zanieb
Copy link
Contributor

zanieb commented Oct 11, 2022

Thanks! I've updated my example. I make that mistake often when writing flows that don't actually do anything :)

@cicdw
Copy link
Member

cicdw commented Sep 3, 2024

Seems like this pattern satisfies the original request, so closing as complete

@cicdw cicdw closed this as completed Sep 3, 2024
@tekumara
Copy link
Contributor

tekumara commented Sep 3, 2024

It does work but it runs at import time which is a bit awkward. If we could avoid import time logic, that would be cleaner I think eg: a hook that ran after import time but before the flow starts.

@cicdw
Copy link
Member

cicdw commented Sep 3, 2024

@tekumara would you mind opening a new issue with that as an enhancement request? That's a pretty well scoped ask, I think we can figure something out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants