Skip to content

Commit

Permalink
Migrate KubeCluster close to kr8s (#791)
Browse files Browse the repository at this point in the history
* Use kr8s to delete a DaskCluster on close

* Fix import
  • Loading branch information
jacobtomlinson committed Jul 31, 2023
1 parent 1c81504 commit e730a89
Showing 1 changed file with 16 additions and 24 deletions.
40 changes: 16 additions & 24 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
TimeoutError,
format_dashboard_link,
)
from kubernetes_asyncio.client.exceptions import ApiException

from dask_kubernetes.common.auth import ClusterAuth

from dask_kubernetes.common.networking import (
get_scheduler_address,
wait_for_scheduler,
Expand All @@ -45,12 +43,16 @@
from dask_kubernetes.common.utils import get_current_namespace
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import (
DaskCluster,
DaskCluster as AIODaskCluster,
DaskWorkerGroup as AIODaskWorkerGroup,
)
from dask_kubernetes.aiopykube.objects import Pod, Service
from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError
from dask_kubernetes.operator._objects import DaskWorkerGroup, DaskAutoscaler
from dask_kubernetes.operator._objects import (
DaskCluster,
DaskWorkerGroup,
DaskAutoscaler,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -485,7 +487,7 @@ async def _wait_for_controller(self):
"""Wait for the operator to set the status.phase."""
start = time.time()
while start + self._resource_timeout > time.time():
cluster = await DaskCluster.objects(
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
if (
Expand All @@ -503,7 +505,7 @@ async def _watch_component_status(self):
while True:
# Get DaskCluster status
with suppress(pykube.exceptions.ObjectDoesNotExist):
cluster = await DaskCluster.objects(
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
self._startup_component_status["cluster"] = cluster.obj["status"][
Expand Down Expand Up @@ -743,25 +745,15 @@ def close(self, timeout=3600):
async def _close(self, timeout=3600):
await super()._close()
if self.shutdown_on_close:
async with kubernetes.client.api_client.ApiClient() as api_client:
custom_objects_api = kubernetes.client.CustomObjectsApi(api_client)
try:
await custom_objects_api.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=self.namespace,
name=self.name,
)
except ApiException as e:
if e.reason == "Not Found":
logger.warning(
"Failed to delete DaskCluster, looks like it has already been deleted."
)
else:
raise
cluster = await DaskCluster(self.name, namespace=self.namespace)
try:
await cluster.delete()
except kr8s.NotFoundError:
logger.warning(
"Failed to delete DaskCluster, looks like it has already been deleted."
)
start = time.time()
while (await self._get_cluster()) is not None:
while await cluster.exists():
if time.time() > start + timeout:
raise TimeoutError(
f"Timed out deleting cluster resource {self.name}"
Expand Down

0 comments on commit e730a89

Please sign in to comment.