From 99e917a008c8c22ac5b1796776ce3f061b73056d Mon Sep 17 00:00:00 2001 From: Mike McCarty Date: Mon, 16 May 2022 16:07:14 -0400 Subject: [PATCH 1/5] Port Forward Dashboard in experimental and classic KubeCluster --- dask_kubernetes/classic/kubecluster.py | 9 ++++++++- dask_kubernetes/common/networking.py | 16 +++++++++++----- dask_kubernetes/experimental/kubecluster.py | 7 +++++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py index f720787fc..c4c4cd09c 100644 --- a/dask_kubernetes/classic/kubecluster.py +++ b/dask_kubernetes/classic/kubecluster.py @@ -30,7 +30,10 @@ namespace_default, escape, ) -from ..common.networking import get_external_address_for_scheduler_service +from ..common.networking import ( + get_external_address_for_scheduler_service, + port_forward_dashboard, +) logger = logging.getLogger(__name__) @@ -213,6 +216,10 @@ async def start(self, **kwargs): service_name_resolution_retries=self._service_name_retries, ) + await port_forward_dashboard( + self.service.metadata.name, + self.namespace) + self.pdb = await self._create_pdb() async def close(self, **kwargs): diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 48b1def82..ca5ebe028 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -89,16 +89,22 @@ async def port_forward_service(service_name, namespace, remote_port, local_port= async def is_comm_open(ip, port, retries=10): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while retries > 0: - try: - async with Client(f"tcp://{ip}:{port}", asynchronous=True, timeout=2): - return True - except Exception: - time.sleep(0.5) + result = sock.connect_ex((ip, port)) + if result == 0: + return True + else: + time.sleep(2) retries -= 1 return False +async def port_forward_dashboard(service_name, namespace): + port = await port_forward_service(service_name, namespace, 8787, 8787) + return port + + async def get_scheduler_address(service_name, namespace): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index c635f99bc..ba452a899 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -15,6 +15,7 @@ from dask_kubernetes.common.networking import ( get_scheduler_address, + port_forward_dashboard, wait_for_scheduler, ) @@ -191,6 +192,9 @@ async def _create_cluster(self): await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) + await port_forward_dashboard( + f"{self.name}-cluster-service", + self.namespace) async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -210,6 +214,9 @@ async def _connect_cluster(self): await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) + await port_forward_dashboard( + f"{self.name}-cluster-service", + self.namespace) async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: From 3a844d2825a3245c1530e34c6d2f0b9ceed5f905 Mon Sep 17 00:00:00 2001 From: Mike McCarty Date: Tue, 17 May 2022 08:12:07 -0400 Subject: [PATCH 2/5] flake8 and black --- dask_kubernetes/classic/kubecluster.py | 4 +--- dask_kubernetes/common/networking.py | 1 - dask_kubernetes/experimental/kubecluster.py | 8 ++------ 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py index c4c4cd09c..af98fc213 100644 --- a/dask_kubernetes/classic/kubecluster.py +++ b/dask_kubernetes/classic/kubecluster.py @@ -216,9 +216,7 @@ async def start(self, **kwargs): service_name_resolution_retries=self._service_name_retries, ) - await port_forward_dashboard( - self.service.metadata.name, - self.namespace) + await port_forward_dashboard(self.service.metadata.name, self.namespace) self.pdb = await self._create_pdb() diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index ca5ebe028..246bbd6d1 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -5,7 +5,6 @@ import time from weakref import finalize -from dask.distributed import Client import kubernetes_asyncio as kubernetes from .utils import check_dependency diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index ba452a899..6fa61015e 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -192,9 +192,7 @@ async def _create_cluster(self): await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - await port_forward_dashboard( - f"{self.name}-cluster-service", - self.namespace) + await port_forward_dashboard(f"{self.name}-cluster-service", self.namespace) async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -214,9 +212,7 @@ async def _connect_cluster(self): await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - await port_forward_dashboard( - f"{self.name}-cluster-service", - self.namespace) + await port_forward_dashboard(f"{self.name}-cluster-service", self.namespace) async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: From 1363592333a2d159607a4a10f2d2284e7ff9046c Mon Sep 17 00:00:00 2001 From: Mike McCarty Date: Tue, 17 May 2022 09:59:04 -0400 Subject: [PATCH 3/5] updated dashboard_link --- dask_kubernetes/classic/kubecluster.py | 5 +++-- dask_kubernetes/common/networking.py | 2 +- dask_kubernetes/experimental/kubecluster.py | 10 ++++++++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py index af98fc213..f33685e9f 100644 --- a/dask_kubernetes/classic/kubecluster.py +++ b/dask_kubernetes/classic/kubecluster.py @@ -216,8 +216,6 @@ async def start(self, **kwargs): service_name_resolution_retries=self._service_name_retries, ) - await port_forward_dashboard(self.service.metadata.name, self.namespace) - self.pdb = await self._create_pdb() async def close(self, **kwargs): @@ -631,6 +629,9 @@ async def _start(self): await super()._start() + port = await port_forward_dashboard(self.name, self.namespace) + self.scheduler_info["services"]["dashboard"] = port + @classmethod def from_dict(cls, pod_spec, **kwargs): """Create cluster with worker pod spec defined by Python dictionary diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 246bbd6d1..95ea501f8 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -100,7 +100,7 @@ async def is_comm_open(ip, port, retries=10): async def port_forward_dashboard(service_name, namespace): - port = await port_forward_service(service_name, namespace, 8787, 8787) + port = await port_forward_service(service_name, namespace, 8787) return port diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 6fa61015e..bf16ec0a5 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -162,6 +162,8 @@ async def _start(self): await super()._start() + self.scheduler_info["services"]["dashboard"] = self.dashboard_port + async def _create_cluster(self): if self.shutdown_on_close is None: self.shutdown_on_close = True @@ -192,7 +194,9 @@ async def _create_cluster(self): await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - await port_forward_dashboard(f"{self.name}-cluster-service", self.namespace) + self.dashboard_port = await port_forward_dashboard( + f"{self.name}-cluster-service", self.namespace + ) async def _connect_cluster(self): if self.shutdown_on_close is None: @@ -212,7 +216,9 @@ async def _connect_cluster(self): await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - await port_forward_dashboard(f"{self.name}-cluster-service", self.namespace) + self.dashboard_port = await port_forward_dashboard( + f"{self.name}-cluster-service", self.namespace + ) async def _get_cluster(self): async with kubernetes.client.api_client.ApiClient() as api_client: From 1da6edf7255df94b7e61389a47597358ae0ab9cf Mon Sep 17 00:00:00 2001 From: Mike McCarty Date: Tue, 17 May 2022 10:24:06 -0400 Subject: [PATCH 4/5] overriding dashboard_link property to update port --- dask_kubernetes/classic/kubecluster.py | 12 +++++++++--- dask_kubernetes/experimental/kubecluster.py | 13 ++++++++----- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/dask_kubernetes/classic/kubecluster.py b/dask_kubernetes/classic/kubecluster.py index f33685e9f..5adca3406 100644 --- a/dask_kubernetes/classic/kubecluster.py +++ b/dask_kubernetes/classic/kubecluster.py @@ -12,7 +12,7 @@ import dask.distributed import distributed.security from distributed.deploy import SpecCluster, ProcessInterface -from distributed.utils import Log, Logs +from distributed.utils import format_dashboard_link, Log, Logs import kubernetes_asyncio as kubernetes from kubernetes_asyncio.client.rest import ApiException @@ -498,6 +498,11 @@ def __init__( self.kwargs = kwargs super().__init__(**self.kwargs) + @property + def dashboard_link(self): + host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] + return format_dashboard_link(host, self.forwarded_dashboard_port) + def _get_pod_template(self, pod_template, pod_type): if not pod_template and dask.config.get( "kubernetes.{}-template".format(pod_type), None @@ -629,8 +634,9 @@ async def _start(self): await super()._start() - port = await port_forward_dashboard(self.name, self.namespace) - self.scheduler_info["services"]["dashboard"] = port + self.forwarded_dashboard_port = await port_forward_dashboard( + self.name, self.namespace + ) @classmethod def from_dict(cls, pod_spec, **kwargs): diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index bf16ec0a5..7748893ab 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -5,7 +5,7 @@ from distributed.core import rpc from distributed.deploy import Cluster -from distributed.utils import Log, Logs, LoopRunner +from distributed.utils import format_dashboard_link, Log, Logs, LoopRunner from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.operator import ( @@ -143,6 +143,11 @@ def __init__( def cluster_name(self): return f"{self.name}-cluster" + @property + def dashboard_link(self): + host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] + return format_dashboard_link(host, self.forwarded_dashboard_port) + async def _start(self): await ClusterAuth.load_first(self.auth) cluster_exists = (await self._get_cluster()) is not None @@ -162,8 +167,6 @@ async def _start(self): await super()._start() - self.scheduler_info["services"]["dashboard"] = self.dashboard_port - async def _create_cluster(self): if self.shutdown_on_close is None: self.shutdown_on_close = True @@ -194,7 +197,7 @@ async def _create_cluster(self): await wait_for_scheduler(cluster_name, self.namespace) await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.dashboard_port = await port_forward_dashboard( + self.forwarded_dashboard_port = await port_forward_dashboard( f"{self.name}-cluster-service", self.namespace ) @@ -216,7 +219,7 @@ async def _connect_cluster(self): await wait_for_scheduler(self.cluster_name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) self.scheduler_comm = rpc(await self._get_scheduler_address()) - self.dashboard_port = await port_forward_dashboard( + self.forwarded_dashboard_port = await port_forward_dashboard( f"{self.name}-cluster-service", self.namespace ) From c85d57a7fc4a7698e3704bd84e23c301a94531d3 Mon Sep 17 00:00:00 2001 From: Mike McCarty Date: Tue, 17 May 2022 11:08:55 -0400 Subject: [PATCH 5/5] black formatting --- dask_kubernetes/experimental/kubecluster.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 770a9d3c6..34c430712 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -12,7 +12,14 @@ from distributed.core import Status, rpc from distributed.deploy import Cluster -from distributed.utils import Log, Logs, LoopRunner, TimeoutError, LoopRunner, format_dashboard_link +from distributed.utils import ( + Log, + Logs, + LoopRunner, + TimeoutError, + LoopRunner, + format_dashboard_link, +) from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.operator import (