diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py index f720787fc..5adca3406 100644 --- a/dask_kubernetes/classic/kubecluster.py +++ b/dask_kubernetes/classic/kubecluster.py @@ -12,7 +12,7 @@ import dask.distributed import distributed.security from distributed.deploy import SpecCluster, ProcessInterface -from distributed.utils import Log, Logs +from distributed.utils import format_dashboard_link, Log, Logs import kubernetes_asyncio as kubernetes from kubernetes_asyncio.client.rest import ApiException @@ -30,7 +30,10 @@ namespace_default, escape, ) -from ..common.networking import get_external_address_for_scheduler_service +from ..common.networking import ( + get_external_address_for_scheduler_service, + port_forward_dashboard, +) logger = logging.getLogger(__name__) @@ -495,6 +498,11 @@ def __init__( self.kwargs = kwargs super().__init__(**self.kwargs) + @property + def dashboard_link(self): + host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] + return format_dashboard_link(host, self.forwarded_dashboard_port) + def _get_pod_template(self, pod_template, pod_type): if not pod_template and dask.config.get( "kubernetes.{}-template".format(pod_type), None @@ -626,6 +634,10 @@ async def _start(self): await super()._start() + self.forwarded_dashboard_port = await port_forward_dashboard( + self.name, self.namespace + ) + @classmethod def from_dict(cls, pod_spec, **kwargs): """Create cluster with worker pod spec defined by Python dictionary diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 48b1def82..95ea501f8 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -5,7 +5,6 @@ import time from weakref import finalize -from dask.distributed import Client import kubernetes_asyncio as kubernetes from .utils import check_dependency @@ -89,16 +88,22 @@ async def port_forward_service(service_name, namespace, remote_port, local_port= async def is_comm_open(ip, port, retries=10): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while retries > 0: - try: - async with Client(f"tcp://{ip}:{port}", asynchronous=True, timeout=2): - return True - except Exception: - time.sleep(0.5) + result = sock.connect_ex((ip, port)) + if result == 0: + return True + else: + time.sleep(2) retries -= 1 return False +async def port_forward_dashboard(service_name, namespace): + port = await port_forward_service(service_name, namespace, 8787) + return port + + async def get_scheduler_address(service_name, namespace): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index e1aee1420..34c430712 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -12,8 +12,14 @@ from distributed.core import Status, rpc from distributed.deploy import Cluster - -from distributed.utils import Log, Logs, LoopRunner, TimeoutError +from distributed.utils import ( + Log, + Logs, + LoopRunner, + TimeoutError, + LoopRunner, + format_dashboard_link, +) from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.operator import ( @@ -23,6 +29,7 @@ from dask_kubernetes.common.networking import ( get_scheduler_address, + port_forward_dashboard, wait_for_scheduler, ) @@ -154,6 +161,11 @@ def __init__( def cluster_name(self): return f"{self.name}-cluster" + @property + def dashboard_link(self): + host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] + return format_dashboard_link(host, self.forwarded_dashboard_port) + async def _start(self): await ClusterAuth.load_first(self.auth) cluster_exists = (await self._get_cluster()) is not None @@ -203,6 +215,9 @@ async def _create_cluster(self): await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) + self.forwarded_dashboard_port = await port_forward_dashboard( + f"{self.name}-cluster-service", self.namespace + ) async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -222,6 +237,9 @@ async def _connect_cluster(self): await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) + self.forwarded_dashboard_port = await port_forward_dashboard( + f"{self.name}-cluster-service", self.namespace + ) async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: