Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Recursion with DaskTaskRunner in k8s #12973

Open
lewijw opened this issue Jan 19, 2023 · 0 comments
Open

Task Recursion with DaskTaskRunner in k8s #12973

lewijw opened this issue Jan 19, 2023 · 0 comments

Comments

@lewijw
Copy link

lewijw commented Jan 19, 2023

I am trying to create a flow that works similarly to this Fibonacci example for Dask:

https://distributed.dask.org/en/stable/task-launch.html

Note that it is recursively calling the fib function.

I translated this to the following Prefect flow which is working in our Prefect k8s deployment. Note that it is using a mix of prefect.flow and dask.delayed:

import time
from dask import delayed, compute
from prefect import flow, task, get_run_logger

@delayed
def fib(n):
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib(n - 1)  # these calls are delayed
    b = fib(n - 2)
    a, b = compute(a, b)  # execute both in parallel
    return a + b

@flow
def run():

    logger = get_run_logger()

    start = time.time()
    result = fib(10).compute()
    logger.info(f"#{result}")
    end = time.time()
    logger.info(f"Time consumed in working: #{end - start}")
    

if __name__ == "__main__":
    run()

It seems to work, but in this case Dask appears to be running inside the KubernetesJob pod. This is nice, but I am trying to get each task to run in its own Dask worker pod for parallelization across pods. To do this, I understand I need to use the DaskTaskRunner. The following code fails:

import time
from prefect import flow, task, get_run_logger
from prefect_dask import DaskTaskRunner

@task
def fib(n):
    logger = get_run_logger()
    logger.info(f"processing #{n}")
    if n < 2:
        return n
    # We can use dask.delayed and dask.compute to launch
    # computation from within tasks
    a = fib.submit(n - 1)
    b = fib.submit(n - 2)
    return a + b

@flow(task_runner=DaskTaskRunner(address="tcp://mycluster-scheduler.dask-operator:8786"))
def run():

    logger = get_run_logger()

    start = time.time()
    result = fib.submit(10)
    logger.info(f"#{result}")
    end = time.time()
    logger.info(f"Time consumed in working: #{end - start}")
    

if __name__ == "__main__":
    run()

The console shows:

INFO Downloading flow code from storage at '/opt/prefect/flows'

INFO #PrefectFuture('fib-bd5fe420-0')

INFO Time consumed in working: #0.0022249221801757812

INFO Created task run 'fib-bd5fe420-0' for task 'fib'

INFO Submitted task run 'fib-bd5fe420-0' for execution.

WARNING Task run '5aca1cd9-c2a9-46d7-a345-784f1acccb4b' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.                                   fib-bd5fe420-0

ERROR Finished in state Failed('1/1 states are not final.')

In the Dask worker log this line was printed:

WARNING | Task run 'fib-bd5fe420-0' - Task run '5aca1cd9-c2a9-46d7-a345-784f1acccb4b' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state.
@desertaxle desertaxle transferred this issue from PrefectHQ/prefect-dask Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant