Skip to content

Commit

Permalink
Remove bad nodes from cluster during the test execution (#325)
Browse files Browse the repository at this point in the history
* work in progress - hacking around removing nodes from the cluster

* temporarily revert to older paramiko

* work in progress - hacking around removing nodes from the cluster

* fixed most of this crap

* fixed the rest of the issues

* added couple of tests; more needed

* fixed more tests, fixed a bug when cluster becomes empty

* fixed the rest of the tests

* added another test, plus comments and simplify check cluster size changed

* removed unused var

* style

* merge fixes

* remove debug output

* refactor and more tests

* pr comment

* moved kwarg after positional args in json and vagrant

* updated vagrant to ubuntu20 and fixed network discovery to account for other possible interface names

* rever requirements.txt change

* use exception instead of success variable

* another test case + style

* just create ssh client instead of sending ping

* pr comments

* removed a separate class for a remove_node result and return a tuple instead; occams razor

* added type annotation

* unused import
  • Loading branch information
stan-is-hate committed Jun 29, 2022
1 parent 2feedb8 commit dc44ae9
Show file tree
Hide file tree
Showing 21 changed files with 796 additions and 158 deletions.
1 change: 1 addition & 0 deletions ducktape/cluster/cluster_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ClusterSpec(object):
"""
The specification for a ducktape cluster.
"""
nodes: NodeContainer = None

@staticmethod
def empty():
Expand Down
12 changes: 9 additions & 3 deletions ducktape/cluster/finite_subcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ def __init__(self, nodes):
self._in_use_nodes = NodeContainer()

def do_alloc(self, cluster_spec):
allocated = self._available_nodes.remove_spec(cluster_spec)
self._in_use_nodes.add_nodes(allocated)
return allocated
# there cannot be any bad nodes here,
# since FiniteSubcluster operates on ClusterNode objects,
# which are not checked for health by NodeContainer.remove_spec
# however there could be an error, specifically if a test decides to alloc more nodes than are available
# in a previous ducktape version this exception was raised by remove_spec
# in this one, for consistency, we let the cluster itself deal with allocation errors
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
self._in_use_nodes.add_nodes(good_nodes)
return good_nodes

def free_single(self, node):
self._in_use_nodes.remove_node(node)
Expand Down
45 changes: 29 additions & 16 deletions ducktape/cluster/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

from __future__ import absolute_import


from ducktape.cluster.cluster_spec import ClusterSpec, WINDOWS
from ducktape.cluster.node_container import NodeContainer
from ducktape.cluster.node_container import NodeContainer, InsufficientHealthyNodesError
from ducktape.command_line.defaults import ConsoleDefaults
from .cluster import Cluster, ClusterNode
from ducktape.cluster.linux_remoteaccount import LinuxRemoteAccount
Expand All @@ -27,11 +28,20 @@
import traceback


def make_remote_account(ssh_config, *args, **kwargs):
"""Factory function for creating the correct RemoteAccount implementation."""

if ssh_config.host and WINDOWS in ssh_config.host:
return WindowsRemoteAccount(ssh_config, *args, **kwargs)
else:
return LinuxRemoteAccount(ssh_config, *args, **kwargs)


class JsonCluster(Cluster):
"""An implementation of Cluster that uses static settings specified in a cluster file or json-serializeable dict
"""

def __init__(self, cluster_json=None, *args, **kwargs):
def __init__(self, cluster_json=None, *args, make_remote_account_func=make_remote_account, **kwargs):
"""Initialize JsonCluster
JsonCluster can be initialized from:
Expand Down Expand Up @@ -75,8 +85,9 @@ def __init__(self, cluster_json=None, *args, **kwargs):
"""
super(JsonCluster, self).__init__()
self._available_accounts = NodeContainer()
self._in_use_nodes = NodeContainer()
self._available_accounts: NodeContainer = NodeContainer()
self._bad_accounts: NodeContainer = NodeContainer()
self._in_use_nodes: NodeContainer = NodeContainer()
if cluster_json is None:
# This is a directly instantiation of JsonCluster rather than from a subclass (e.g. VagrantCluster)
cluster_file = kwargs.get("cluster_file")
Expand All @@ -91,8 +102,8 @@ def __init__(self, cluster_json=None, *args, **kwargs):

ssh_config = RemoteAccountSSHConfig(**ninfo.get("ssh_config", {}))
remote_account = \
JsonCluster.make_remote_account(ssh_config, ninfo.get("externally_routable_ip"),
ssh_exception_checks=kwargs.get("ssh_exception_checks"))
make_remote_account_func(ssh_config, ninfo.get("externally_routable_ip"),
ssh_exception_checks=kwargs.get("ssh_exception_checks"))
if remote_account.externally_routable_ip is None:
remote_account.externally_routable_ip = self._externally_routable_ip(remote_account)
self._available_accounts.add_node(remote_account)
Expand All @@ -101,22 +112,24 @@ def __init__(self, cluster_json=None, *args, **kwargs):
raise ValueError(msg)
self._id_supplier = 0

@staticmethod
def make_remote_account(ssh_config, *args, **kwargs):
"""Factory function for creating the correct RemoteAccount implementation."""
def do_alloc(self, cluster_spec):
try:
good_nodes, bad_nodes = self._available_accounts.remove_spec(cluster_spec)
except InsufficientHealthyNodesError as e:
self._bad_accounts.add_nodes(e)
raise e

if ssh_config.host and WINDOWS in ssh_config.host:
return WindowsRemoteAccount(ssh_config, *args, **kwargs)
else:
return LinuxRemoteAccount(ssh_config, *args, **kwargs)
# even in case of no exceptions, we can still run into bad nodes, so let's track them
if bad_nodes:
self._bad_accounts.add_nodes(bad_nodes)

def do_alloc(self, cluster_spec):
allocated_accounts = self._available_accounts.remove_spec(cluster_spec)
# now let's gather all the good ones and convert them into ClusterNode objects
allocated_nodes = []
for account in allocated_accounts:
for account in good_nodes:
allocated_nodes.append(ClusterNode(account, slot_id=self._id_supplier))
self._id_supplier += 1
self._in_use_nodes.add_nodes(allocated_nodes)

return allocated_nodes

def free_single(self, node):
Expand Down
8 changes: 5 additions & 3 deletions ducktape/cluster/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ def __init__(self, *args, **kwargs):
self._in_use_nodes = NodeContainer()

def do_alloc(self, cluster_spec):
allocated = self._available_nodes.remove_spec(cluster_spec)
self._in_use_nodes.add_nodes(allocated)
return allocated
# there shouldn't be any bad nodes in localhost cluster
# since ClusterNode object does not implement `available()` method
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
self._in_use_nodes.add_nodes(good_nodes)
return good_nodes

def free_single(self, node):
self._in_use_nodes.remove_node(node)
Expand Down
68 changes: 56 additions & 12 deletions ducktape/cluster/node_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Tuple

from six import iteritems

from ducktape.cluster.remoteaccount import RemoteAccount


class NodeNotPresentError(Exception):
pass
Expand All @@ -23,8 +26,17 @@ class InsufficientResourcesError(Exception):
pass


class InsufficientHealthyNodesError(InsufficientResourcesError):

def __init__(self, bad_nodes: List, *args):
self.bad_nodes = bad_nodes
super().__init__(*args)


class NodeContainer(object):
def __init__(self, nodes=None):
os_to_nodes: Dict = None

def __init__(self, nodes: List = None):
"""
Create a NodeContainer with the given nodes.
Expand Down Expand Up @@ -103,25 +115,57 @@ def remove_nodes(self, nodes):
for node in nodes:
self.remove_node(node)

def remove_spec(self, cluster_spec):
def remove_spec(self, cluster_spec) -> Tuple[List, List]:
"""
Remove nodes matching a ClusterSpec from this NodeContainer.
:param cluster_spec: The cluster spec. This will not be modified.
:returns: A list of the nodes that were removed.
:throws InsufficientResourcesError: If there are not enough nodes in the NodeContainer.
Nothing will be removed unless enough are available.
:returns: List of good nodes and a list of bad nodes.
:raises: InsufficientResourcesError when there aren't enough total nodes
InsufficientHealthyNodesError when there aren't enough healthy nodes
"""
msg = self.attempt_remove_spec(cluster_spec)
if len(msg) > 0:
raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg)
removed = []

err = self.attempt_remove_spec(cluster_spec)
if err:
# there weren't enough nodes to even attempt allocations, raise the exception
raise InsufficientResourcesError(err)

good_nodes = []
bad_nodes = []
msg = ""
# we have enough nodes for each OS, now try allocating while doing health checks if nodes support them
for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes):
num_nodes = len(node_specs)
good_per_os = []
avail_nodes = self.os_to_nodes.get(os, [])
for i in range(0, num_nodes):
removed.append(avail_nodes.pop(0))
return removed
# loop over all available nodes
# for i in range(0, len(avail_nodes)):
while avail_nodes and (len(good_per_os) < num_nodes):
node = avail_nodes.pop(0)
if isinstance(node, RemoteAccount):
if node.available():
good_per_os.append(node)
else:
bad_nodes.append(node)
else:
good_per_os.append(node)

good_nodes.extend(good_per_os)
# if we don't have enough good nodes to allocate for this OS,
# set the status as failed
if len(good_per_os) < num_nodes:
msg += f"{os} nodes requested: {num_nodes}. Healthy {os} nodes available: {len(good_per_os)}"

# we didn't have enough healthy nodes for at least one of the OS-s
# no need to keep the allocated nodes, since there aren't enough of them
# let's return good ones back to this container
# and raise the exception with bad ones
if msg:
for node in good_nodes:
self.add_node(node)
raise InsufficientHealthyNodesError(bad_nodes, msg)

return good_nodes, bad_nodes

def can_remove_spec(self, cluster_spec):
"""
Expand Down
10 changes: 10 additions & 0 deletions ducktape/cluster/remoteaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ def _can_ping_url(self, url, headers):
except Exception:
return False

def available(self):
try:
self.ssh_client
except Exception:
return False
else:
return True
finally:
self.close()

@check_ssh
def ssh(self, cmd, allow_fail=False):
"""Run the given command on the remote host, and block until the command has finished running.
Expand Down
13 changes: 7 additions & 6 deletions ducktape/cluster/vagrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import absolute_import

from .json import JsonCluster
from .json import JsonCluster, make_remote_account
import json
import os
from .remoteaccount import RemoteAccountSSHConfig
Expand All @@ -33,7 +33,7 @@ class VagrantCluster(JsonCluster):
- Otherwise, retrieve cluster info via "vagrant ssh-config" from vagrant
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, make_remote_account_func=make_remote_account, **kwargs):
is_read_from_file = False
self.ssh_exception_checks = kwargs.get("ssh_exception_checks")
cluster_file = kwargs.get("cluster_file")
Expand All @@ -47,10 +47,11 @@ def __init__(self, *args, **kwargs):

if not is_read_from_file:
cluster_json = {
"nodes": self._get_nodes_from_vagrant()
"nodes": self._get_nodes_from_vagrant(make_remote_account_func)
}

super(VagrantCluster, self).__init__(cluster_json, *args, **kwargs)
super(VagrantCluster, self).__init__(
cluster_json, *args, make_remote_account_func=make_remote_account_func, **kwargs)

# If cluster file is specified but the cluster info is not read from it, write the cluster info into the file
if not is_read_from_file and cluster_file is not None:
Expand All @@ -69,7 +70,7 @@ def __init__(self, *args, **kwargs):
for node_account in self._available_accounts:
node_account.close()

def _get_nodes_from_vagrant(self):
def _get_nodes_from_vagrant(self, make_remote_account_func):
ssh_config_info, error = self._vagrant_ssh_config()

nodes = []
Expand All @@ -81,7 +82,7 @@ def _get_nodes_from_vagrant(self):

account = None
try:
account = JsonCluster.make_remote_account(ssh_config, ssh_exception_checks=self.ssh_exception_checks)
account = make_remote_account_func(ssh_config, ssh_exception_checks=self.ssh_exception_checks)
externally_routable_ip = account.fetch_externally_routable_ip()
finally:
if account:
Expand Down

0 comments on commit dc44ae9

Please sign in to comment.