Skip to content

Commit

Permalink
Small changes form kr8s kubecluster migration (#786)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson committed Jul 28, 2023
1 parent 1592f89 commit 6b632ab
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 5 deletions.
4 changes: 3 additions & 1 deletion dask_kubernetes/common/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from dask_kubernetes.common.utils import check_dependency
from dask_kubernetes.aiopykube.objects import Pod
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.exceptions import CrashLoopBackOffError


Expand Down Expand Up @@ -193,8 +194,9 @@ async def get_scheduler_address(
return address


async def wait_for_scheduler(api, cluster_name, namespace, timeout=None):
async def wait_for_scheduler(cluster_name, namespace, timeout=None):
pod_start_time = None
api = HTTPClient(KubeConfig.from_env())
while True:
async with kubernetes.client.api_client.ApiClient() as api_client:
k8s_api = kubernetes.client.CoreV1Api(api_client)
Expand Down
8 changes: 8 additions & 0 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ async def scheduler_service(self) -> Service:
assert len(services) == 1
return services[0]

async def ready(self) -> bool:
await self._refresh()
return (
"status" in self.raw
and "phase" in self.status
and self.status.phase == "Running"
)


class DaskWorkerGroup(APIObject):
version = "kubernetes.dask.org/v1"
Expand Down
3 changes: 1 addition & 2 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ async def _create_cluster(self):
try:
self._log("Waiting for scheduler pod")
await wait_for_scheduler(
self.k8s_api,
self.name,
self.namespace,
timeout=self._resource_timeout,
Expand Down Expand Up @@ -434,7 +433,7 @@ async def _connect_cluster(self):
service_name = f"{cluster_spec['metadata']['name']}-scheduler"
self._log("Waiting for scheduler pod")
await wait_for_scheduler(
self.k8s_api, self.name, self.namespace, timeout=self._resource_timeout
self.name, self.namespace, timeout=self._resource_timeout
)
self._log("Waiting for scheduler service")
await wait_for_service(core_api, service_name, self.namespace)
Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ flake8>=3.7
black>=18.9b0
dask-ctl>=2021.3.0
pytest>=7.1
git+https://codeberg.org/hjacobs/pytest-kind.git
pytest-kind
pytest-timeout
pytest-rerunfailures
git+https://github.com/elemental-lf/k8s-crd-resolver@v0.14.0
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.6
kr8s==0.8.7

0 comments on commit 6b632ab

Please sign in to comment.