Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions dask_kubernetes/classic/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import dask.distributed
import distributed.security
from distributed.deploy import SpecCluster, ProcessInterface
from distributed.utils import Log, Logs
from distributed.utils import format_dashboard_link, Log, Logs
import kubernetes_asyncio as kubernetes
from kubernetes_asyncio.client.rest import ApiException

Expand All @@ -30,7 +30,10 @@
namespace_default,
escape,
)
from ..common.networking import get_external_address_for_scheduler_service
from ..common.networking import (
get_external_address_for_scheduler_service,
port_forward_dashboard,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -495,6 +498,11 @@ def __init__(
self.kwargs = kwargs
super().__init__(**self.kwargs)

@property
def dashboard_link(self):
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
return format_dashboard_link(host, self.forwarded_dashboard_port)

def _get_pod_template(self, pod_template, pod_type):
if not pod_template and dask.config.get(
"kubernetes.{}-template".format(pod_type), None
Expand Down Expand Up @@ -626,6 +634,10 @@ async def _start(self):

await super()._start()

self.forwarded_dashboard_port = await port_forward_dashboard(
self.name, self.namespace
)

@classmethod
def from_dict(cls, pod_spec, **kwargs):
"""Create cluster with worker pod spec defined by Python dictionary
Expand Down
17 changes: 11 additions & 6 deletions dask_kubernetes/common/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time
from weakref import finalize

from dask.distributed import Client
import kubernetes_asyncio as kubernetes

from .utils import check_dependency
Expand Down Expand Up @@ -89,16 +88,22 @@ async def port_forward_service(service_name, namespace, remote_port, local_port=


async def is_comm_open(ip, port, retries=10):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while retries > 0:
try:
async with Client(f"tcp://{ip}:{port}", asynchronous=True, timeout=2):
return True
except Exception:
time.sleep(0.5)
result = sock.connect_ex((ip, port))
if result == 0:
return True
else:
time.sleep(2)
retries -= 1
return False


async def port_forward_dashboard(service_name, namespace):
port = await port_forward_service(service_name, namespace, 8787)
return port


async def get_scheduler_address(service_name, namespace):
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)
Expand Down
22 changes: 20 additions & 2 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@

from distributed.core import Status, rpc
from distributed.deploy import Cluster

from distributed.utils import Log, Logs, LoopRunner, TimeoutError
from distributed.utils import (
Log,
Logs,
LoopRunner,
TimeoutError,
LoopRunner,
format_dashboard_link,
)

from dask_kubernetes.common.auth import ClusterAuth
from dask_kubernetes.operator import (
Expand All @@ -23,6 +29,7 @@

from dask_kubernetes.common.networking import (
get_scheduler_address,
port_forward_dashboard,
wait_for_scheduler,
)

Expand Down Expand Up @@ -154,6 +161,11 @@ def __init__(
def cluster_name(self):
return f"{self.name}-cluster"

@property
def dashboard_link(self):
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
return format_dashboard_link(host, self.forwarded_dashboard_port)

async def _start(self):
await ClusterAuth.load_first(self.auth)
cluster_exists = (await self._get_cluster()) is not None
Expand Down Expand Up @@ -203,6 +215,9 @@ async def _create_cluster(self):
await wait_for_scheduler(cluster_name, self.namespace)
await wait_for_service(core_api, f"{cluster_name}-service", self.namespace)
self.scheduler_comm = rpc(await self._get_scheduler_address())
self.forwarded_dashboard_port = await port_forward_dashboard(
f"{self.name}-cluster-service", self.namespace
)

async def _connect_cluster(self):
if self.shutdown_on_close is None:
Expand All @@ -222,6 +237,9 @@ async def _connect_cluster(self):
await wait_for_scheduler(self.cluster_name, self.namespace)
await wait_for_service(core_api, service_name, self.namespace)
self.scheduler_comm = rpc(await self._get_scheduler_address())
self.forwarded_dashboard_port = await port_forward_dashboard(
f"{self.name}-cluster-service", self.namespace
)

async def _get_cluster(self):
async with kubernetes.client.api_client.ApiClient() as api_client:
Expand Down