From e819709456bd8c54671298adc0bfb0438d5b642e Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Fri, 13 May 2022 13:18:03 -0400 Subject: [PATCH 1/8] Replace RPC with scheduler API --- dask_kubernetes/operator/operator.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index e1e7e6b8b..7e54fa256 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,4 +1,6 @@ import asyncio +import aiohttp +import json from distributed.core import rpc @@ -212,6 +214,13 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): worker_ids = await scheduler.workers_to_close( n=-workers_needed, attribute="name" ) + async with aiohttp.ClientSession() as session: + params = {"n": -workers_needed} + async with session.post( + "http://localhost:8787/api/v1/workers_to_close", json=params + ) as resp: + workers_to_close = json.loads(await resp.text())["workers"] + logger.info(f"Workers to close API: {workers_to_close}") # TODO: Check that were deting workers in the right worker group logger.info(f"Workers to close: {worker_ids}") for wid in worker_ids: From 41a4862af18cc2f69aab51c7cc08244af87bc3dc Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 May 2022 21:19:12 -0700 Subject: [PATCH 2/8] Name worker the same as pod --- dask_kubernetes/operator/operator.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 6caec4120..8403a4475 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -201,6 +201,9 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): data = build_worker_pod_spec( name, spec["cluster"], uuid4().hex, spec["worker"]["spec"] ) + data["spec"]["containers"][0]["args"].append( + f"--name={data['metadata']['name']}" + ) kopf.adopt(data) await api.create_namespaced_pod( namespace=namespace, @@ -219,10 +222,10 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): async with aiohttp.ClientSession() as session: params = {"n": -workers_needed} async with session.post( - "http://localhost:8787/api/v1/workers_to_close", json=params + "http://localhost:8787/api/v1/retire_workers", json=params ) as resp: - workers_to_close = json.loads(await resp.text())["workers"] - logger.info(f"Workers to close API: {workers_to_close}") + retired_workers = json.loads(await resp.text())["workers"] + logger.info(f"Retired workers API: {retired_workers}") # TODO: Check that were deting workers in the right worker group logger.info(f"Workers to close: {worker_ids}") for wid in worker_ids: From 373adcac284fb68d03ca81aec87814e4537b708a Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 May 2022 21:43:25 -0700 Subject: [PATCH 3/8] Pass the names of the workers from the scheduler API to pod deletion function --- dask_kubernetes/operator/operator.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 8403a4475..e4b977376 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -2,17 +2,12 @@ import aiohttp import json -from distributed.core import rpc - import kopf import kubernetes_asyncio as kubernetes from uuid import uuid4 from dask_kubernetes.common.auth import ClusterAuth -from dask_kubernetes.common.networking import ( - get_scheduler_address, -) def build_scheduler_pod_spec(name, spec): @@ -204,6 +199,7 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): data["spec"]["containers"][0]["args"].append( f"--name={data['metadata']['name']}" ) + logger.info("Creating worker with config {data}") kopf.adopt(data) await api.create_namespaced_pod( namespace=namespace, @@ -213,12 +209,6 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): f"Scaled worker group {name} up to {spec['worker']['replicas']} workers." ) if workers_needed < 0: - service_name = f"{name.split('-')[0]}-cluster-service" - address = await get_scheduler_address(service_name, namespace) - async with rpc(address) as scheduler: - worker_ids = await scheduler.workers_to_close( - n=-workers_needed, attribute="name" - ) async with aiohttp.ClientSession() as session: params = {"n": -workers_needed} async with session.post( @@ -226,6 +216,10 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): ) as resp: retired_workers = json.loads(await resp.text())["workers"] logger.info(f"Retired workers API: {retired_workers}") + worker_ids = [ + retired_workers[worker_address]["name"] + for worker_address in retired_workers.keys() + ] # TODO: Check that were deting workers in the right worker group logger.info(f"Workers to close: {worker_ids}") for wid in worker_ids: From d396d909516164862162d3d06ffb652a7416b1c5 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 May 2022 08:25:45 -0700 Subject: [PATCH 4/8] Remove line adding name to worker --- dask_kubernetes/operator/operator.py | 31 ++++++++++++---------------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 0180f3d30..070200245 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,6 +1,6 @@ import asyncio - -from distributed.core import rpc +import aiohttp +import json import kopf import kubernetes_asyncio as kubernetes @@ -8,9 +8,6 @@ from uuid import uuid4 from dask_kubernetes.common.auth import ClusterAuth -from dask_kubernetes.common.networking import ( - get_scheduler_address, -) def build_scheduler_pod_spec(name, spec): @@ -204,9 +201,6 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): data = build_worker_pod_spec( name, spec["cluster"], uuid4().hex, spec["worker"]["spec"] ) - data["spec"]["containers"][0]["args"].append( - f"--name={data['metadata']['name']}" - ) logger.info("Creating worker with config {data}") kopf.adopt(data) await api.create_namespaced_pod( @@ -217,16 +211,17 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): f"Scaled worker group {name} up to {spec['worker']['replicas']} workers." ) if workers_needed < 0: - service_address = await get_scheduler_address( - f"{spec['cluster']}-service", namespace - ) - logger.info( - f"Asking scheduler to retire {-workers_needed} on {service_address}" - ) - async with rpc(service_address) as scheduler: - worker_ids = await scheduler.workers_to_close( - n=-workers_needed, attribute="name" - ) + async with aiohttp.ClientSession() as session: + params = {"n": -workers_needed} + async with session.post( + "http://localhost:8787/api/v1/retire_workers", json=params + ) as resp: + retired_workers = json.loads(await resp.text())["workers"] + logger.info(f"Retired workers API: {retired_workers}") + worker_ids = [ + retired_workers[worker_address]["name"] + for worker_address in retired_workers.keys() + ] # TODO: Check that were deting workers in the right worker group logger.info(f"Workers to close: {worker_ids}") for wid in worker_ids: From 9e141a2124f151e0f64ad5f71c0d39f249288526 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 May 2022 10:25:30 -0700 Subject: [PATCH 5/8] Generalize scheduler API endpoint --- dask_kubernetes/common/networking.py | 15 +++++++++++---- dask_kubernetes/operator/operator.py | 9 +++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 95ea501f8..9008856c4 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -11,13 +11,17 @@ async def get_external_address_for_scheduler_service( - core_api, service, port_forward_cluster_ip=None, service_name_resolution_retries=20 + core_api, + service, + port_forward_cluster_ip=None, + service_name_resolution_retries=20, + port_name="comm", ): """Take a service object and return the scheduler address.""" [port] = [ port.port for port in service.spec.ports - if port.name == service.metadata.name or port.name == "comm" + if port.name == service.metadata.name or port.name == port_name ] if service.spec.type == "LoadBalancer": lb = service.status.load_balancer.ingress[0] @@ -104,13 +108,16 @@ async def port_forward_dashboard(service_name, namespace): return port -async def get_scheduler_address(service_name, namespace): +async def get_scheduler_address(service_name, namespace, port_name="comm"): async with kubernetes.client.api_client.ApiClient() as api_client: api = kubernetes.client.CoreV1Api(api_client) service = await api.read_namespaced_service(service_name, namespace) port_forward_cluster_ip = None address = await get_external_address_for_scheduler_service( - api, service, port_forward_cluster_ip=port_forward_cluster_ip + api, + service, + port_forward_cluster_ip=port_forward_cluster_ip, + port_name=port_name, ) return address diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 070200245..b51d26b3c 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -8,6 +8,9 @@ from uuid import uuid4 from dask_kubernetes.common.auth import ClusterAuth +from dask_kubernetes.common.networking import ( + get_scheduler_address, +) def build_scheduler_pod_spec(name, spec): @@ -201,7 +204,6 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): data = build_worker_pod_spec( name, spec["cluster"], uuid4().hex, spec["worker"]["spec"] ) - logger.info("Creating worker with config {data}") kopf.adopt(data) await api.create_namespaced_pod( namespace=namespace, @@ -211,10 +213,13 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): f"Scaled worker group {name} up to {spec['worker']['replicas']} workers." ) if workers_needed < 0: + service_address = await get_scheduler_address( + f"{spec['cluster']}-service", namespace, port_name="dashboard" + ) async with aiohttp.ClientSession() as session: params = {"n": -workers_needed} async with session.post( - "http://localhost:8787/api/v1/retire_workers", json=params + f"{service_address}/api/v1/retire_workers", json=params ) as resp: retired_workers = json.loads(await resp.text())["workers"] logger.info(f"Retired workers API: {retired_workers}") From 1a72eb6c9bb869ca3d873424075b80ed9893fffd Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 20 May 2022 11:05:15 +0100 Subject: [PATCH 6/8] Fix retreiving JSON response --- dask_kubernetes/operator/operator.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 9c4184c34..2a15836a1 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,6 +1,5 @@ import asyncio import aiohttp -import json import kopf import kubernetes_asyncio as kubernetes @@ -234,8 +233,11 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): async with session.post( f"{service_address}/api/v1/retire_workers", json=params ) as resp: - retired_workers = json.loads(await resp.text())["workers"] - logger.info(f"Retired workers API: {retired_workers}") + # This try block can be removed after https://github.com/dask/distributed/pull/6397 is merged + try: + retired_workers = await resp.json() + except aiohttp.client_exceptions.ContentTypeError: + retired_workers = await resp.json(content_type="text/json") worker_ids = [ retired_workers[worker_address]["name"] for worker_address in retired_workers.keys() From 032c378f739e112c7482cf0f320f0f9987fcd635 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 20 May 2022 12:16:09 +0100 Subject: [PATCH 7/8] Remove MIME type handling now that it is fixed upstream --- dask_kubernetes/operator/operator.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 2a15836a1..b26c09921 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -233,11 +233,7 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): async with session.post( f"{service_address}/api/v1/retire_workers", json=params ) as resp: - # This try block can be removed after https://github.com/dask/distributed/pull/6397 is merged - try: - retired_workers = await resp.json() - except aiohttp.client_exceptions.ContentTypeError: - retired_workers = await resp.json(content_type="text/json") + retired_workers = await resp.json() worker_ids = [ retired_workers[worker_address]["name"] for worker_address in retired_workers.keys() From 722cc5313c41c5fb39b3aae503990563c3bc798d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 27 May 2022 14:40:24 +0100 Subject: [PATCH 8/8] Reinstate RPC as fallback option, and last-in-first-out as second fallback --- dask_kubernetes/operator/operator.py | 68 ++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index b26c09921..553337389 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -1,11 +1,14 @@ import asyncio import aiohttp +from contextlib import suppress import kopf import kubernetes_asyncio as kubernetes from uuid import uuid4 +from distributed.core import rpc + from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.networking import ( get_scheduler_address, @@ -194,6 +197,52 @@ async def daskworkergroup_create(spec, name, namespace, logger, **kwargs): ) +async def retire_workers( + n_workers, scheduler_service_name, worker_group_name, namespace, logger +): + # Try gracefully retiring via the HTTP API + dashboard_address = await get_scheduler_address( + scheduler_service_name, + namespace, + port_name="dashboard", + ) + async with aiohttp.ClientSession() as session: + url = f"{dashboard_address}/api/v1/retire_workers" + params = {"n": n_workers} + async with session.post(url, json=params) as resp: + if resp.status <= 300: + retired_workers = await resp.json() + return [retired_workers[w]["name"] for w in retired_workers.keys()] + + # Otherwise try gracefully retiring via the RPC + logger.info( + f"Scaling {worker_group_name} failed via the HTTP API, falling back to the Dask RPC" + ) + # Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways + with suppress(Exception): + comm_address = await get_scheduler_address( + scheduler_service_name, + namespace, + ) + async with rpc(comm_address) as scheduler_comm: + return await scheduler_comm.workers_to_close( + n=n_workers, + attribute="name", + ) + + # Finally fall back to last-in-first-out scaling + logger.info( + f"Scaling {worker_group_name} failed via the Dask RPC, falling back to LIFO scaling" + ) + async with kubernetes.client.api_client.ApiClient() as api_client: + api = kubernetes.client.CoreV1Api(api_client) + workers = await api.list_namespaced_pod( + namespace=namespace, + label_selector=f"dask.org/workergroup-name={worker_group_name}", + ) + return [w["metadata"]["name"] for w in workers.items[:-n_workers]] + + @kopf.on.update("daskworkergroup") async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): async with kubernetes.client.api_client.ApiClient() as api_client: @@ -225,20 +274,13 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): f"Scaled worker group {name} up to {spec['worker']['replicas']} workers." ) if workers_needed < 0: - service_address = await get_scheduler_address( - f"{spec['cluster']}-service", namespace, port_name="dashboard" + worker_ids = await retire_workers( + n_workers=-workers_needed, + scheduler_service_name=f"{spec['cluster']}-service", + worker_group_name=name, + namespace=namespace, + logger=logger, ) - async with aiohttp.ClientSession() as session: - params = {"n": -workers_needed} - async with session.post( - f"{service_address}/api/v1/retire_workers", json=params - ) as resp: - retired_workers = await resp.json() - worker_ids = [ - retired_workers[worker_address]["name"] - for worker_address in retired_workers.keys() - ] - # TODO: Check that were deting workers in the right worker group logger.info(f"Workers to close: {worker_ids}") for wid in worker_ids: await api.delete_namespaced_pod(