Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator.KubeCuster support? #12982

Open
john-jam opened this issue Oct 12, 2022 · 15 comments
Open

operator.KubeCuster support? #12982

john-jam opened this issue Oct 12, 2022 · 15 comments

Comments

@john-jam
Copy link
Contributor

john-jam commented Oct 12, 2022

Hi!
When I try to use the new KubeCluster implementation (from the operator module and not the one from the classic module) as follow:

@flow(task_runner=DaskTaskRunner(
    cluster_class="dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster"
))
def test_flow():
    ...

I got the following error:

03:16:41.206 | INFO    | prefect.engine - Created flow run 'mauve-tuna' for flow 'test-flow'
03:16:41.207 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster`
03:17:24.025 | ERROR   | Flow run 'mauve-tuna' - Crash detected! Execution was interrupted by an unexpected exception.
Traceback (most recent call last):
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/engine.py", line 232, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/engine.py", line 360, in begin_flow_run
    flow_run_context.task_runner = await stack.enter_async_context(
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/task_runners.py", line 166, in start
    self._started = False
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 537, in __aexit__
    await f
  File "/home/john/projects/venv/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 540, in _close
    await custom_objects_api.delete_namespaced_custom_object(
  File "/home/john/projects/venv/lib/python3.8/site-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
    response_data = await self.request(
  File "/home/john/projects/venv/lib/python3.8/site-packages/kubernetes_asyncio/client/rest.py", line 220, in DELETE
    return (await self.request("DELETE", url,
  File "/home/john/projects/venv/lib/python3.8/site-packages/kubernetes_asyncio/client/rest.py", line 177, in request
    r = await self.pool_manager.request(**args)
  File "/home/john/projects/venv/lib/python3.8/site-packages/aiohttp/client.py", line 508, in _request
    req = self._request_class(
  File "/home/john/projects/venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 305, in __init__
    self.update_host(url)
  File "/home/john/projects/venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 364, in update_host
    raise InvalidURL(url)
aiohttp.client_exceptions.InvalidURL: /apis/kubernetes.dask.org/v1/namespaces/prefect/daskclusters/dask-john-5d3b8ba4-4
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/john/projects/venv/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 848, in reap_clusters
    loop = asyncio.get_event_loop()
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.

Process finished with exit code 1

It seems the DaskTaskRunner tries to close the cluster before starting it.
Is this new class supported yet by prefect? If yes, do you have a working example?

@ahuang11
Copy link
Contributor

It seems to error on:
aiohttp.client_exceptions.InvalidURL: /apis/kubernetes.dask.org/v1/namespaces/prefect/daskclusters/dask-john-5d3b8ba4-4

Can you double check if this URL is valid?

@john-jam
Copy link
Contributor Author

john-jam commented Oct 13, 2022

Hi @ahuang11 ! Thanks for your response.
This URL is indeed invalid because it corresponds to the "future" dask cluster not created yet. The id here dask-john-5d3b8ba4-4 is different each flow run and no corresponding Dask cluster is created.

I did some tests and found out that this line is responsible for the close trigger because this error is raised:

Traceback (most recent call last):
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/john/projects/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/client/orion.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/engine.py", line 232, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/engine.py", line 360, in begin_flow_run
    flow_run_context.task_runner = await stack.enter_async_context(
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 171, in __aenter__
    return await self.gen.__anext__()
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect/task_runners.py", line 161, in start
    await self._start(exit_stack)
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 308, in _start
    raise e
  File "/home/john/projects/venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 301, in _start
    self._client = await exit_stack.enter_async_context(
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/contextlib.py", line 568, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/client.py", line 1398, in __aenter__
    await self
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/client.py", line 1213, in _start
    await self._ensure_connected(timeout=timeout)
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/client.py", line 1276, in _ensure_connected
    comm = await connect(
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/comm/core.py", line 291, in connect
    comm = await asyncio.wait_for(
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/tasks.py", line 494, in wait_for
    return fut.result()
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/comm/tcp.py", line 487, in connect
    ip, port = parse_host_port(address)
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/comm/addressing.py", line 95, in parse_host_port
    port = _default()
  File "/home/john/projects/venv/lib/python3.8/site-packages/distributed/comm/addressing.py", line 73, in _default
    raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address '<Not Connected>'

My guess is the cluster passed to the client is '<Not Connected>' because the _start method of operator.KubeCluster is not called somehow when this line is called in the DaskTaskRunner.

With classic.KubeCluster, the _start method is called before creating the client.

@ahuang11
Copy link
Contributor

ahuang11 commented Oct 13, 2022

I think _start should be called in

self._cluster = exit_stack.enter_async_context(
            self.cluster_class(asynchronous=True, **self.cluster_kwargs)
)

which is equivalent to

async with self.cluster_class(asynchronous=True, **self.cluster_kwargs) as self._cluster:
....

and in your case

async with KubeCluster(asynchronous=True, **self.cluster_kwargs) as self._cluster:
...

Can you try whether running this works:

async with KubeCluster(asynchronous=True, **self.cluster_kwargs) as cluster:
    print("cluster._started")   # or the equivalent attribute to check whether a cluster started

@john-jam
Copy link
Contributor Author

When I try this with self.cluster_class as operator.KubeCluster:

async with self.cluster_class(asynchronous=True, **self.cluster_kwargs) as cluster:
    print(cluster.status)
    print(cluster.scheduler_address)

It outputs:

Status.created
<Not Connected>

and the _start method is not called. It is called when using the classic.KubeCluster.

Maybe it's unrelated but the classic.KubeCluster inherits distributed.deploy.SpecCluster (the call to _start seems to be done in here) and the operator.KubeCluster inherits from distributed.deploy.Cluster.

@ahuang11
Copy link
Contributor

Yes, it seems like it doesn't get started; only gets "created"
https://docs.dask.org/en/stable/_modules/distributed/deploy/cluster.html

    async def __aenter__(self):
        await self
        return self

Maybe you can call "dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster" outside of Prefect, e.g.

cluster = dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster()
cluster.start()

@flow(address=cluster)
...

@ahuang11
Copy link
Contributor

ahuang11 commented Oct 14, 2022

If that worked for you, maybe we can add a warning about manually starting the cluster here under an if self._cluster.status != "started" (probably not the right string to check against)

@john-jam
Copy link
Contributor Author

We can add a warning or call the _start method directly if the status is not running, what do you think?

I made it work with the following updates after this line:

from distributed.core import Status
import asyncio
...
            self._connect_to = self._cluster = await exit_stack.enter_async_context(
                self.cluster_class(asynchronous=True, **self.cluster_kwargs)
            )
            
            # If used with the operator implementation of KubeCluster, the cluster is not automatically started
            if self._cluster.status is not Status.running:
                await self._cluster._start()

            if self.adapt_kwargs:
                # Depending on the cluster type (Cluster or SpecCluster), adapt returns a future or not
                adapt_response = self._cluster.adapt(**self.adapt_kwargs)
                if asyncio.isfuture(adapt_response):
                    await adapt_response
...

Let me know if you want me to create a PR!

@ahuang11
Copy link
Contributor

ahuang11 commented Oct 15, 2022

I think this could work, my only concern is that once this cluster is started, it won't be closed (unlike the async exit context) so maybe we should add a warning that we started it and it's up to the user to close it, and I would appreciate a PR!

@john-jam
Copy link
Contributor Author

john-jam commented Oct 15, 2022

Actually I think they handle the cluster cleanup with this method.

The only problem is that, when used with prefect, we got the following error:

Traceback (most recent call last):
  File "/home/john/projects/venv/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py", line 845, in reap_clusters
    loop = asyncio.get_event_loop()
  File "/home/john/.pyenv/versions/3.8.13/lib/python3.8/asyncio/events.py", line 639, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'MainThread'.

It seems this line can't get the current event loop when used with prefect. If I replace it with loop = asyncio.new_event_loop() it works. If you have a workaround, I am all ears!

On a side note, their dask-kubernetes-operator pod already cleanup the clusters properly so it's maybe not necessary.

@ahuang11
Copy link
Contributor

Can you provide an example of how reap_cluster is called? Is it called automatically or do we have to invoke it manually?

@zanieb
Copy link
Contributor

zanieb commented Oct 17, 2022

With the error

RuntimeError: There is no current event loop in thread 'MainThread'.

Where in Prefect's code is this code being called? We run several event loops and there is often one present.

@john-jam
Copy link
Contributor Author

Can you provide an example of how reap_cluster is called? Is it called automatically or do we have to invoke it manually?
Where in Prefect's code is this code being called? We run several event loops and there is often one present.

I think this reap_cluster method is not called directly by prefect or prefect_dask but instead automatically on-exit with the help of the @atexit.register annotation here. When debugging, the asyncio loop gets closed before reaching this code. Maybe the dask_kubernetes package should support closed loop?

I created a draft PR with the _start and adapt calls handled for theoperator.KubeCluster implementation. I still need to add tests and maybe warnings as suggested. With the example in the PR, the flow is executed properly on an ephemeral clustr with 1 worker and the cluster is cleaned up by the dask-kubernetes-operator pod but I obviously still hit the There is no current event loop in thread 'MainThread'. error.

@ahuang11
Copy link
Contributor

ahuang11 commented Oct 18, 2022

That's awesome, TIL about the atexit decorator!

Thanks for making the PR too. Maybe we can wrap that logic you have and replace self.cluster_class under our custom contextmanager that handles closing and starting, e.g.

@asynccontextmanager
async def _cluster_start(self):
    cluster = self.cluster_class(asynchronous=True, **self.cluster_kwargs)
    # If used with the operator implementation of KubeCluster,
    # the cluster is not automatically started
    if self._cluster.status.value != "running":
        await self._cluster._start()
    if self.adapt_kwargs:
        # Depending on the cluster type (Cluster or SpecCluster),
        # adapt should or shouldn't be awaited
        adapt_response = self._cluster.adapt(**self.adapt_kwargs)
        if iscoroutine(adapt_response):
            await adapt_response
    yield cluster
    cluster.close()    
    
....
            self._connect_to = self._cluster = await exit_stack.enter_async_context(
                self._cluster_start()
            )

@john-jam
Copy link
Contributor Author

Sure, I'll add it and add some tests.

@zanieb
Copy link
Contributor

zanieb commented Oct 18, 2022

Regarding the atexit issue — I've got an old open pull request in distributed to fix it dask/distributed#5471

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants