Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support the Dask operator KubeCluster #41

Conversation

john-jam
Copy link
Contributor

This PR adds the support for the new Dask's operator.KubeCluster class. According to their documentation, this is the new preferred way to handle ephemeral clusters since the classic.KubeCluster won't be supported in next releases.

The new operator.KubeCluster class does not inherit from distributed.deploy.SpecCluster anymore so the start method is not called directly during instantiation.

Closes PrefectHQ/prefect#12982

Example

With those changes, the following example works:

from prefect import task, flow, get_run_logger
from prefect_dask.task_runners import DaskTaskRunner


@task
def dask_task():
    logger = get_run_logger()
    logger.info("Hello from Dask worker!")


@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster",
    cluster_kwargs={
        "image": "my-docker-image-with-dask-and-prefect",
    },
    adapt_kwargs={"minimum": 1, "maximum": 1},
))
def dask_flow():
    dask_task.submit()


if __name__ == '__main__':
    dask_flow()

Checklist

  • This pull request references any related issue by including "Closes #<ISSUE_NUMBER>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • This pull request includes tests or only affects documentation.
  • Summarized PR's changes in CHANGELOG.md

Comment on lines +305 to +309
# Depending on the cluster type (Cluster or SpecCluster),
# adapt should or shouldn't be awaited
adapt_response = self._cluster.adapt(**self.adapt_kwargs)
if iscoroutine(adapt_response):
await adapt_response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block the event loop if it's not a coroutine. You'll need to use inspect.iscoroutinefn then run it in a worker thread if it is not (or check if its a Cluster / SpecCluster). This seems like an implementation issue in Dask though?

Copy link
Contributor Author

@john-jam john-jam Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use inspect.iscoroutinefunction on the adapt method itself and asyncio.isfuture on the return value but this method is not declared async and instead returns a future of the private _adapt handled with the method sync. I was only able to make it work with inspect.iscoroutine.

Also, I am not sure to understand why we should run it in a worker thread if it's not async. I was trying to keep the same call as the original code here when used with the classic.KubeCluster or other cloudprovider.*Instance.

Sorry for my lack of experience with asyncio loops 🙃

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're running on an event loop and have some code that performs IO it must either be async and awaited or sync and run in a worker thread. The event loop is designed to switch quickly between a bunch of different tasks. When you run sync code in the event loop, it cannot switch to other tasks. This "blocks" the loop and causes performance problems.

It looks like the sync utility checks if asynchronous is set and returns a coroutine (they call it a "future", but it should be a coroutine from looking at the code).

Comment on lines +299 to +302
# If used with the operator implementation of KubeCluster,
# the cluster is not automatically started
if self._cluster.status.value != "running":
await self._cluster._start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not excited about reaching into a private method here. Is this require to get to an async method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, that does not seem like the intended way of starting a cluster but I didn't find something relevant in their documentation when used with the asynchronous arg. If we create the instance without it, this _start method is called. I asked in their forum here to better understand how to use the new implementation asynchronously.

Copy link
Contributor

@ahuang11 ahuang11 Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some more context: in their __init__:

        if not self.asynchronous:
            self._loop_runner.start()
            self.sync(self._start)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but prefect forces the asynchronous arg to True. Should we allow to pass False when used with operator.KubeCluster?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that they don't call _start() in the __aenter__ method when we enter the context above. This seems like an oversight. It feels like we should report this upstream and we should get some clarity on their intent here instead of introducing workarounds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Let's wait to see if they answer to the thread I created in their discourse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't hear anything back, it might be faster to submit a GitHub issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I'll try to figure out how they should handle the asynchronous start.

@john-jam
Copy link
Contributor Author

@madkinsz @ahuang11 Hello 👋
The _start method is now called when the asynchronous option is set to True on dask-kubernetes. Should we close this PR and the related issue?

@ahuang11
Copy link
Contributor

Yes that's fine with me!

@ahuang11 ahuang11 closed this Jan 17, 2023
@john-jam john-jam deleted the feature/dask-operator-kubecluster-support branch June 1, 2023 01:33
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

operator.KubeCuster support?
3 participants