Skip to content

Commit

Permalink
Delete pending pods first when manually scaling down (#69)
Browse files Browse the repository at this point in the history
* Use minikube v0.25.2

v0.26.0 requires systemd which is not installed on the Ubuntu 14.04 image used by circleci
whith "machine: true".

* Limit the number of pods on the kubernetes cluster

* Delete pending pod first when manually scaling down

* Thread safe add_callback + offload call to scale_down
  • Loading branch information
ogrisel authored and mrocklin committed Apr 13, 2018
1 parent d6b3fc6 commit e9bedd8
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 46 deletions.
13 changes: 9 additions & 4 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@ jobs:
build:
machine: true
environment:
MINIKUBE_WANTUPDATENOTIFICATION: false
MINIKUBE_WANTREPORTERRORPROMPT: false
CHANGE_MINIKUBE_NONE_USER: true
environment:
PYTHON: "3.6"
ENV_NAME: "dask-kubernetes-test"

steps:
- checkout
- run:
command: |
curl -Lo kubectl https://storage.googleapis.com/kubernetes-release/release/v1.8.4/bin/linux/amd64/kubectl && chmod +x kubectl && sudo mv kubectl /usr/local/bin/
# Use minikube v0.25.2 as v0.26.0 requires systemd to manage the daemons which
# is not available in the Ubuntu 14.04 image used by Circle CI with the
# "machine: true" option.
- run:
command: |
curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/
curl -Lo minikube https://github.com/kubernetes/minikube/releases/download/v0.25.2/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/
- run:
command: |
sudo minikube start --vm-driver=none
sudo -E minikube start --vm-driver=none --extra-config=kubelet.MaxPods=20
- run:
command: |
sudo minikube update-context
sudo -E minikube update-context
- run:
command: |
JSONPATH='{range .items[*]}{@.metadata.name}:{range @.status.conditions[*]}{@.type}={@.status};{end}{end}'; until sudo kubectl get nodes -o jsonpath="$JSONPATH" 2>&1 | grep -q "Ready=True"; do sleep 1; done
Expand Down
129 changes: 87 additions & 42 deletions dask_kubernetes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
except ImportError:
yaml = False

from tornado import gen
from distributed.deploy import LocalCluster, Cluster
from distributed.config import config
from distributed.comm.utils import offload
import kubernetes

from .objects import make_pod_from_dict, clean_pod_template
Expand Down Expand Up @@ -312,13 +314,62 @@ def scale(self, n):
KubeCluster.scale_up
KubeCluster.scale_down
"""
pods = self.pods()
pods = self._cleanup_succeeded_pods(self.pods())
if n >= len(pods):
return self.scale_up(n, pods=pods)
else:
to_close = select_workers_to_close(self.scheduler, len(pods) - n)
n_to_delete = len(pods) - n
# Before trying to close running workers, check if we can cancel
# pending pods (in case the kubernetes cluster was too full to
# provision those pods in the first place).
running_workers = list(self.scheduler.workers.keys())
running_ips = set(urlparse(worker).hostname
for worker in running_workers)
pending_pods = [p for p in pods
if p.status.pod_ip not in running_ips]
if pending_pods:
pending_to_delete = pending_pods[:n_to_delete]
logger.debug("Deleting pending pods: %s", pending_to_delete)
self._delete_pods(pending_to_delete)
n_to_delete = n_to_delete - len(pending_to_delete)
if n_to_delete <= 0:
return

to_close = select_workers_to_close(self.scheduler, n_to_delete)
logger.debug("Closing workers: %s", to_close)
return self.scale_down(to_close)
if len(to_close) < len(self.scheduler.workers):
# Close workers cleanly to migrate any temporary results to
# remaining workers.
@gen.coroutine
def f(to_close):
yield self.scheduler.retire_workers(
workers=to_close, remove=True, close_workers=True)
yield offload(self.scale_down, to_close)

self.scheduler.loop.add_callback(f, to_close)
return

# Terminate all pods without waiting for clean worker shutdown
self.scale_down(to_close)

def _delete_pods(self, to_delete):
for pod in to_delete:
try:
self.core_api.delete_namespaced_pod(
pod.metadata.name,
self.namespace,
kubernetes.client.V1DeleteOptions()
)
logger.info('Deleted pod: %s', pod.metadata.name)
except kubernetes.client.rest.ApiException as e:
# If a pod has already been removed, just ignore the error
if e.status != 404:
raise

def _cleanup_succeeded_pods(self, pods):
terminated_pods = [p for p in pods if p.status.phase == 'Succeeded']
self._delete_pods(terminated_pods)
return [p for p in pods if p.status.phase != 'Succeeded']

def scale_up(self, n, pods=None, **kwargs):
"""
Expand All @@ -328,14 +379,15 @@ def scale_up(self, n, pods=None, **kwargs):
--------
>>> cluster.scale_up(20) # ask for twenty workers
"""
pods = pods or self.pods()

pods = pods or self._cleanup_succeeded_pods(self.pods())
to_create = n - len(pods)
new_pods = []
for i in range(3):
try:
out = [
self.core_api.create_namespaced_pod(self.namespace, self.pod_template)
for _ in range(n - len(pods))
]
for _ in range(to_create):
new_pods.append(self.core_api.create_namespaced_pod(
self.namespace, self.pod_template))
to_create -= 1
break
except kubernetes.client.rest.ApiException as e:
if e.status == 500 and 'ServerTimeout' in e.body:
Expand All @@ -348,45 +400,37 @@ def scale_up(self, n, pods=None, **kwargs):
else:
raise last_exception

return out
return new_pods
# fixme: wait for this to be ready before returning!

def scale_down(self, workers):
"""
When the worker process exits, Kubernetes leaves the pods in a completed
state. Kill them when we are asked to.
def scale_down(self, workers, pods=None):
""" Remove the pods for the requested list of workers
When scale_down is called by the _adapt async loop, the workers are
assumed to have been cleanly closed first and in-memory data has been
migrated to the remaining workers.
Note that when the worker process exits, Kubernetes leaves the pods in
a 'Succeeded' state that we collect here.
If some workers have not been closed, we just delete the pods with
matching ip addresses.
Parameters
----------
workers: List[str]
List of addresses of workers to close
workers: List[str] List of addresses of workers to close
"""
# Get the existing worker pods
pods = self.pods()
pods = pods or self._cleanup_succeeded_pods(self.pods())

# Work out pods that we are going to delete
# Work out the list of pods that we are going to delete
# Each worker to delete is given in the form "tcp://<worker ip>:<port>"
# Convert this to a set of IPs
ips = set(urlparse(worker).hostname for worker in workers)
to_delete = [
p for p in pods
# Every time we run, purge any completed pods as well as the specified ones
if p.status.phase == 'Succeeded' or p.status.pod_ip in ips
]
to_delete = [p for p in pods if p.status.pod_ip in ips]
if not to_delete:
return
for pod in to_delete:
try:
self.core_api.delete_namespaced_pod(
pod.metadata.name,
self.namespace,
kubernetes.client.V1DeleteOptions()
)
logger.info('Deleted pod: %s', pod.metadata.name)
except kubernetes.client.rest.ApiException as e:
# If a pod has already been removed, just ignore the error
if e.status != 404:
raise
self._delete_pods(to_delete)

def __enter__(self):
return self
Expand Down Expand Up @@ -440,15 +484,16 @@ def _namespace_default():
return 'default'


def select_workers_to_close(s, n):
""" Select n workers to close from scheduler s """
assert n <= len(s.workers)
def select_workers_to_close(scheduler, n_to_close):
""" Select n workers to close from scheduler """
workers = list(scheduler.workers.values())
assert n_to_close <= len(workers)
key = lambda ws: ws.info['memory']
to_close = set(sorted(s.idle, key=key)[:n])
to_close = set(sorted(scheduler.idle, key=key)[:n_to_close])

if len(to_close) < n:
rest = sorted(s.workers.values(), key=key, reverse=True)
while len(to_close) < n:
if len(to_close) < n_to_close:
rest = sorted(workers, key=key, reverse=True)
while len(to_close) < n_to_close:
to_close.add(rest.pop())

return [ws.address for ws in to_close]
96 changes: 96 additions & 0 deletions dask_kubernetes/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from distributed.utils_test import loop # noqa: F401
from distributed.utils import tmpfile
import kubernetes
from random import random


try:
Expand Down Expand Up @@ -313,6 +314,101 @@ def test_scale_up_down(cluster, client):
assert set(cluster.scheduler.workers) == {b}


def test_scale_up_down_fast(cluster, client):
cluster.scale(1)

start = time()
while len(cluster.scheduler.workers) != 1:
sleep(0.1)
assert time() < start + 10

worker = next(iter(cluster.scheduler.workers.values()))

# Put some data on this worker
future = client.submit(lambda: b'\x00' * int(1e6))
wait(future)
assert worker in cluster.scheduler.tasks[future.key].who_has

# Rescale the cluster many times without waiting: this should put some
# pressure on kubernetes but this should never fail nor delete our worker
# with the temporary result.
for i in range(10):
cluster.scale(4)
sleep(random() / 2)
cluster.scale(1)
sleep(random() / 2)

start = time()
while len(cluster.scheduler.workers) != 1:
sleep(0.1)
assert time() < start + 10

# The original task result is still stored on the original worker: this pod
# has never been deleted when rescaling the cluster and the result can
# still be fetched back.
assert worker in cluster.scheduler.tasks[future.key].who_has
assert len(future.result()) == int(1e6)


def test_scale_down_pending(cluster, client):
# Try to scale the cluster to use more pods than available
nodes = cluster.core_api.list_node().items
max_pods = sum(int(node.status.allocatable['pods']) for node in nodes)
if max_pods > 50:
# It's probably not reasonable to run this test against a large
# kubernetes cluster.
pytest.skip("Require a small test kubernetes cluster (maxpod <= 50)")
extra_pods = 5
requested_pods = max_pods + extra_pods
cluster.scale(requested_pods)

start = time()
while len(cluster.scheduler.workers) < 2:
sleep(0.1)
# Wait a bit because the kubernetes cluster can take time to provision
# the requested pods as we requested a large number of pods.
assert time() < start + 60

pending_pods = [p for p in cluster.pods() if p.status.phase == 'Pending']
assert len(pending_pods) >= extra_pods

running_workers = list(cluster.scheduler.workers.keys())
assert len(running_workers) >= 2

# Put some data on those workers to make them important to keep as long
# as possible.
def load_data(i):
return b'\x00' * (i * int(1e6))

futures = [client.submit(load_data, i, workers=w)
for i, w in enumerate(running_workers)]
wait(futures)

# Reduce the cluster size down to the actually useful nodes: pending pods
# and running pods without results should be shutdown and removed first:
cluster.scale(len(running_workers))

start = time()
pod_statuses = [p.status.phase for p in cluster.pods()]
while len(pod_statuses) != len(running_workers):
if time() - start > 60:
raise AssertionError("Expected %d running pods but got %r"
% (len(running_workers), pod_statuses))
sleep(0.1)
pod_statuses = [p.status.phase for p in cluster.pods()]

assert pod_statuses == ['Running'] * len(running_workers)
assert list(cluster.scheduler.workers.keys()) == running_workers

# Terminate everything
cluster.scale(0)

start = time()
while len(cluster.scheduler.workers) > 0:
sleep(0.1)
assert time() < start + 60


def test_automatic_startup(image_name, loop, ns):
test_yaml = {
"kind": "Pod",
Expand Down

0 comments on commit e9bedd8

Please sign in to comment.