From 921209755da62081539ed947e1b605573c4f4024 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 12 May 2022 16:34:02 +0100 Subject: [PATCH 1/4] Release 2022.5.0 From 76096419385bc626778bec213e5191e7895948aa Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 13 May 2022 17:13:04 +0100 Subject: [PATCH 2/4] Add finalizer to shutdown experimental KubeCluster --- dask_kubernetes/experimental/kubecluster.py | 24 +++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index c635f99bc..697691339 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -1,11 +1,18 @@ +from __future__ import annotations + import asyncio +import atexit +from contextlib import suppress from enum import Enum +from typing import ClassVar +import weakref + import kubernetes_asyncio as kubernetes -from distributed.core import rpc +from distributed.core import Status, rpc from distributed.deploy import Cluster -from distributed.utils import Log, Logs, LoopRunner +from distributed.utils import Log, Logs, LoopRunner, TimeoutError from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.operator import ( @@ -103,6 +110,8 @@ class KubeCluster(Cluster): KubeCluster.from_name """ + _instances: ClassVar[weakref.WeakSet[KubeCluster]] = weakref.WeakSet() + def __init__( self, name, @@ -133,6 +142,8 @@ def __init__( self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) self.loop = self._loop_runner.loop + self._instances.add(self) + super().__init__(asynchronous=asynchronous, **kwargs) if not self.asynchronous: self._loop_runner.start() @@ -537,3 +548,12 @@ def from_name(cls, name, **kwargs): >>> cluster = KubeCluster.from_name(name="simple-cluster") """ return cls(name=name, create_mode=CreateMode.CONNECT_ONLY, **kwargs) + + +@atexit.register +def close_clusters(): + for cluster in list(KubeCluster._instances): + if cluster.shutdown_on_close: + with suppress(TimeoutError): + if cluster.status != Status.closed: + cluster.close(timeout=10) From 192c2f7861e668e3ba569ba8313e7e91b9594f59 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 13 May 2022 17:36:52 +0100 Subject: [PATCH 3/4] Add finalizer to reap clusters when Python exits --- dask_kubernetes/experimental/kubecluster.py | 26 ++++++++++++++------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 697691339..28595da50 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -4,6 +4,7 @@ import atexit from contextlib import suppress from enum import Enum +import time from typing import ClassVar import weakref @@ -374,11 +375,11 @@ async def _delete_worker_group(self, name): name=f"{self.name}-cluster-{name}", ) - def close(self): + def close(self, timeout=3600): """Delete the dask cluster""" - return self.sync(self._close) + return self.sync(self._close, timeout=timeout) - async def _close(self): + async def _close(self, timeout=None): await super()._close() if self.shutdown_on_close: async with kubernetes.client.api_client.ApiClient() as api_client: @@ -390,7 +391,12 @@ async def _close(self): namespace=self.namespace, name=self.cluster_name, ) + start = time.time() while (await self._get_cluster()) is not None: + if time.time() > start + timeout: + raise TimeoutError( + f"Timed out deleting cluster resource {self.cluster_name}" + ) await asyncio.sleep(1) def scale(self, n, worker_group="default"): @@ -551,9 +557,13 @@ def from_name(cls, name, **kwargs): @atexit.register -def close_clusters(): - for cluster in list(KubeCluster._instances): - if cluster.shutdown_on_close: - with suppress(TimeoutError): - if cluster.status != Status.closed: +def reap_clusters(): + async def _reap_clusters(): + for cluster in list(KubeCluster._instances): + if cluster.shutdown_on_close and cluster.status != Status.closed: + await ClusterAuth.load_first(cluster.auth) + with suppress(TimeoutError): cluster.close(timeout=10) + + loop = asyncio.get_event_loop() + loop.run_until_complete(_reap_clusters()) From da5b89ab53e45886b0c0bc640c1f87684cbf2546 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 13 May 2022 17:40:14 +0100 Subject: [PATCH 4/4] Handle async clusters --- dask_kubernetes/experimental/kubecluster.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 28595da50..e1aee1420 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -563,7 +563,10 @@ async def _reap_clusters(): if cluster.shutdown_on_close and cluster.status != Status.closed: await ClusterAuth.load_first(cluster.auth) with suppress(TimeoutError): - cluster.close(timeout=10) + if cluster.asynchronous: + await cluster.close(timeout=10) + else: + cluster.close(timeout=10) loop = asyncio.get_event_loop() loop.run_until_complete(_reap_clusters())