Skip to content

Commit

Permalink
Remove slave from registry when shutdown-ed
Browse files Browse the repository at this point in the history
Add slave registry to store connected slaves at master side.
Add remove functionality to the existing /shutdown endpoint.
  • Loading branch information
Shriganesh Shintre committed Apr 18, 2018
1 parent 4ff9b19 commit 25217f5
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 114 deletions.
60 changes: 14 additions & 46 deletions app/master/cluster_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from app.master.build_request_handler import BuildRequestHandler
from app.master.build_scheduler_pool import BuildSchedulerPool
from app.master.build_store import BuildStore
from app.master.slave import Slave
from app.master.slave import Slave, SlaveRegistry
from app.master.slave_allocator import SlaveAllocator
from app.slave.cluster_slave import SlaveState
from app.slave.cluster_slave import ClusterSlave
Expand All @@ -33,7 +33,7 @@ class ClusterMaster(ClusterService):
def __init__(self):
self._logger = get_logger(__name__)
self._master_results_path = Configuration['results_directory']
self._all_slaves_by_url = {}
self._slave_registry = SlaveRegistry.singleton()
self._scheduler_pool = BuildSchedulerPool()
self._build_request_handler = BuildRequestHandler(self._scheduler_pool)
self._build_request_handler.start()
Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(self):
self._unresponsive_slaves_cleanup_interval = Configuration['unresponsive_slaves_cleanup_interval']
self._hb_scheduler = sched.scheduler()

SlavesCollector.register_slaves_metrics_collector(lambda: self.all_slaves_by_id().values())
SlavesCollector.register_slaves_metrics_collector(lambda: self._slave_registry.get_all_slaves_by_id().values())

def start_heartbeat_tracker_thread(self):
self._logger.info('Heartbeat tracker will run every {} seconds'.format(
Expand All @@ -74,7 +74,7 @@ def _start_heartbeat_tracker(self):
self._hb_scheduler.run()

def _disconnect_non_heartbeating_slaves(self):
slaves_to_disconnect = [slave for slave in self._all_slaves_by_url.values()
slaves_to_disconnect = [slave for slave in self._slave_registry.get_all_slaves_by_url().values()
if slave.is_alive() and not self._is_slave_responsive(slave)]

for slave in slaves_to_disconnect:
Expand Down Expand Up @@ -102,7 +102,8 @@ def api_representation(self):
Gets a dict representing this resource which can be returned in an API response.
:rtype: dict [str, mixed]
"""
slaves_representation = [slave.api_representation() for slave in self.all_slaves_by_id().values()]
slaves_representation = [slave.api_representation() for slave in
self._slave_registry.get_all_slaves_by_id().values()]
return {
'status': self._get_status(),
'slaves': slaves_representation,
Expand All @@ -125,41 +126,6 @@ def active_builds(self):
"""
return [build for build in self.get_builds() if not build.is_finished]

def all_slaves_by_id(self):
"""
Retrieve all connected slaves
:rtype: dict [int, Slave]
"""
slaves_by_slave_id = {}
for slave in self._all_slaves_by_url.values():
slaves_by_slave_id[slave.id] = slave
return slaves_by_slave_id

def get_slave(self, slave_id=None, slave_url=None):
"""
Get the instance of given slave by either the slave's id or url. Only one of slave_id or slave_url should be
specified.
:param slave_id: The id of the slave to return
:type slave_id: int
:param slave_url: The url of the slave to return
:type slave_url: str
:return: The instance of the slave
:rtype: Slave
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
for slave in self._all_slaves_by_url.values():
if slave.id == slave_id:
return slave
else:
if slave_url in self._all_slaves_by_url:
return self._all_slaves_by_url[slave_url]

raise ItemNotFoundError('Requested slave ({}) does not exist.'.format(slave_id))

def connect_slave(self, slave_url, num_executors, slave_session_id=None):
"""
Connect a slave to this master.
Expand All @@ -173,11 +139,13 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
# todo: Validate arg types for this and other methods called via API.
# If a slave had previously been connected, and is now being reconnected, the cleanest way to resolve this
# bookkeeping is for the master to forget about the previous slave instance and start with a fresh instance.
if slave_url in self._all_slaves_by_url:
old_slave = self._all_slaves_by_url.get(slave_url)
try:
old_slave = self._slave_registry.get_slave(slave_url=slave_url)
except ItemNotFoundError:
pass
else:
self._logger.warning('Slave requested to connect to master, even though previously connected as {}. ' +
'Removing existing slave instance from the master\'s bookkeeping.', old_slave)

# If a slave has requested to reconnect, we have to assume that whatever build the dead slave was
# working on no longer has valid results.
if old_slave.current_build_id is not None:
Expand All @@ -193,7 +161,7 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
old_slave)

slave = Slave(slave_url, num_executors, slave_session_id)
self._all_slaves_by_url[slave_url] = slave
self._slave_registry.add_slave(slave)
self._slave_allocator.add_idle_slave(slave)
self._logger.info('Slave on {} connected to master with {} executors. (id: {})',
slave_url, num_executors, slave.id)
Expand Down Expand Up @@ -229,7 +197,7 @@ def set_shutdown_mode_on_slaves(self, slave_ids):
:type slave_ids: list[int]
"""
# Find all the slaves first so if an invalid slave_id is specified, we 404 before shutting any of them down.
slaves = [self.get_slave(slave_id) for slave_id in slave_ids]
slaves = [self._slave_registry.get_slave(slave_id=slave_id) for slave_id in slave_ids]
for slave in slaves:
self.handle_slave_state_update(slave, SlaveState.SHUTDOWN)

Expand Down Expand Up @@ -333,7 +301,7 @@ def handle_result_reported_from_slave(self, slave_url, build_id, subjob_id, payl
"""
self._logger.info('Results received from {} for subjob. (Build {}, Subjob {})', slave_url, build_id, subjob_id)
build = BuildStore.get(int(build_id))
slave = self._all_slaves_by_url[slave_url]
slave = self._slave_registry.get_slave(slave_url=slave_url)
try:
build.complete_subjob(subjob_id, payload)
finally:
Expand Down
78 changes: 77 additions & 1 deletion app/master/slave.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from datetime import datetime
from threading import Lock
from typing import Union
import requests

from app.master.build import Build
from app.master.subjob import Subjob
from app.util import analytics, log
from app.util.counter import Counter
from app.util.exceptions import ItemNotFoundError
from app.util.network import Network, RequestFailedError
from app.util.secret import Secret
from app.util.session_id import SessionId
from app.util.singleton import Singleton
from app.util.url_builder import UrlBuilder


Expand Down Expand Up @@ -62,6 +66,7 @@ def mark_as_idle(self):

if self._is_in_shutdown_mode:
self.kill()
self._remove_slave_from_registry()
raise SlaveMarkedForShutdownError

def setup(self, build: Build, executor_start_index: int) -> bool:
Expand Down Expand Up @@ -188,11 +193,13 @@ def set_is_alive(self, value):
def set_shutdown_mode(self):
"""
Mark this slave as being in shutdown mode. Slaves in shutdown mode will not get new subjobs and will be
killed when they finish teardown, or killed immediately if they are not processing a build.
killed and removed from slave registry when they finish teardown, or
killed and removed from slave registry immediately if they are not processing a build.
"""
self._is_in_shutdown_mode = True
if self.current_build_id is None:
self.kill()
self._remove_slave_from_registry()

def is_shutdown(self):
"""
Expand Down Expand Up @@ -244,6 +251,75 @@ def update_last_heartbeat_time(self):
def get_last_heartbeat_time(self) -> datetime:
return self._last_heartbeat_time

def _remove_slave_from_registry(self):
"""
Remove shutdown-ed slave from SlaveRegistry.
"""
self._logger.info('Removing slave (url={}; id={}) from Slave Registry.'.format(self.url, self.id))
SlaveRegistry.singleton().remove_slave(slave_url=self.url)


class SlaveRegistry(Singleton):
"""
SlaveRegistry class is a singleton class which stores and maintains list of connected slaves.
"""

def __init__(self):
super().__init__()
self.all_slaves_by_url = {}
self.all_slaves_by_id = {}
self._slave_dict_lock = Lock()

def add_slave(self, slave: Slave):
"""
Add slave in SlaveRegistry.
"""
with self._slave_dict_lock:
self.all_slaves_by_url[slave.url] = slave
self.all_slaves_by_id[slave.id] = slave

def remove_slave(self, slave: Slave=None, slave_url: str=None):
"""
Remove slave from the both url and id dictionary.
"""
if (slave is None) == (slave_url is None):
raise ValueError('Only one of slave or slave_url should be specified to remove_slave().')

with self._slave_dict_lock:
try:
if slave is None:
slave = self.get_slave(slave_url=slave_url)
del self.all_slaves_by_url[slave.url]
del self.all_slaves_by_id[slave.id]
except (ItemNotFoundError, KeyError):
# Ignore if slave does not exists in SlaveRegistry
pass

def get_slave(self, slave_id: int=None, slave_url: str=None) -> Union[Slave, ItemNotFoundError]:
"""
Look for a slave in the registry and if not found raise "ItemNotFoundError" exception.
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
if slave_id in self.all_slaves_by_id:
return self.all_slaves_by_id[slave_id]
else:
if slave_url in self.all_slaves_by_url:
return self.all_slaves_by_url[slave_url]
if slave_id is not None:
error_msg = 'Requested slave (slave_id={}) does not exist.'.format(slave_id)
else:
error_msg = 'Requested slave (slave_url={}) does not exist.'.format(slave_url)
raise ItemNotFoundError(error_msg)

def get_all_slaves_by_id(self):
return self.all_slaves_by_id

def get_all_slaves_by_url(self):
return self.all_slaves_by_url


class SlaveError(Exception):
"""A generic slave error occurred."""
Expand Down
16 changes: 10 additions & 6 deletions app/web_framework/cluster_master_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tornado.web
import prometheus_client

from app.master.slave import SlaveRegistry
from app.util import analytics
from app.util import log
from app.util.conf.configuration import Configuration
Expand Down Expand Up @@ -176,7 +177,7 @@ def get(self, build_id, subjob_id):
class _SubjobResultHandler(_ClusterMasterBaseAPIHandler):
def post(self, build_id, subjob_id):
slave_url = self.decoded_body.get('slave')
slave = self._cluster_master.get_slave(slave_url=slave_url)
slave = SlaveRegistry.singleton().get_slave(slave_url=slave_url)
file_payload = self.request.files.get('file')
if not file_payload:
raise RuntimeError('Result file not provided')
Expand Down Expand Up @@ -369,15 +370,16 @@ def post(self):
self._write_status(response, status_code=201)

def get(self):

response = {
'slaves': [slave.api_representation() for slave in self._cluster_master.all_slaves_by_id().values()]
'slaves': [slave.api_representation() for slave in SlaveRegistry.singleton().get_all_slaves_by_id().values()]
}
self.write(response)


class _SlaveHandler(_ClusterMasterBaseAPIHandler):
def get(self, slave_id):
slave = self._cluster_master.get_slave(int(slave_id))
slave = SlaveRegistry.singleton().get_slave(slave_id=int(slave_id))
response = {
'slave': slave.api_representation()
}
Expand All @@ -386,7 +388,7 @@ def get(self, slave_id):
@authenticated
def put(self, slave_id):
new_slave_state = self.decoded_body.get('slave', {}).get('state')
slave = self._cluster_master.get_slave(int(slave_id))
slave = SlaveRegistry.singleton().get_slave(slave_id=int(slave_id))
self._cluster_master.handle_slave_state_update(slave, new_slave_state)
self._cluster_master.update_slave_last_heartbeat_time(slave)

Expand Down Expand Up @@ -417,8 +419,10 @@ class _SlavesShutdownHandler(_ClusterMasterBaseAPIHandler):
@authenticated
def post(self):
shutdown_all = self.decoded_body.get('shutdown_all')
slaves_to_shutdown = self._cluster_master.all_slaves_by_id().keys() if shutdown_all else\
[int(slave_id) for slave_id in self.decoded_body.get('slaves')]
if shutdown_all:
slaves_to_shutdown = SlaveRegistry.singleton().get_all_slaves_by_id().keys()
else:
slaves_to_shutdown = [int(slave_id) for slave_id in self.decoded_body.get('slaves')]

self._cluster_master.set_shutdown_mode_on_slaves(slaves_to_shutdown)

Expand Down
17 changes: 6 additions & 11 deletions test/functional/master/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,20 @@

class TestShutdown(BaseFunctionalTestCase):

def test_shutdown_all_slaves_should_kill_all_slaves(self):
def test_shutdown_all_slaves_should_kill_and_remove_all_slaves(self):
master = self.cluster.start_master()
self.cluster.start_slaves(2)

master.graceful_shutdown_all_slaves()

slaves_response = master.get_slaves()
slaves = slaves_response['slaves']
living_slaves = [slave for slave in slaves if slave['is_alive']]
dead_slaves = [slave for slave in slaves if not slave['is_alive']]

self.assertEqual(0, len(living_slaves))
self.assertEqual(2, len(dead_slaves))
self.assertEqual(0, len(slaves))

self.cluster.block_until_n_slaves_dead(2, 10)

def test_shutdown_one_slave_should_leave_one_slave_alive(self):
def test_shutdown_one_slave_should_leave_one_slave_alive_and_remove_shutdowned_slave(self):
master = self.cluster.start_master()
self.cluster.start_slaves(2)

Expand All @@ -33,14 +30,13 @@ def test_shutdown_one_slave_should_leave_one_slave_alive(self):
slaves_response = master.get_slaves()
slaves = slaves_response['slaves']
living_slaves = [slave for slave in slaves if slave['is_alive']]
dead_slaves = [slave for slave in slaves if not slave['is_alive']]

self.assertEqual(1, len(living_slaves))
self.assertEqual(1, len(dead_slaves))
self.assertEqual(1, len(slaves))

self.cluster.block_until_n_slaves_dead(1, 10)

def test_shutdown_all_slaves_while_build_is_running_should_finish_build_then_kill_slaves(self):
def test_shutdown_all_slaves_while_build_is_running_should_finish_build_then_kill_and_remove_slaves(self):
master = self.cluster.start_master()
self.cluster.start_slaves(2)

Expand All @@ -64,7 +60,6 @@ def test_shutdown_all_slaves_while_build_is_running_should_finish_build_then_kil
slaves_response = master.get_slaves()
slaves = slaves_response['slaves']
living_slaves = [slave for slave in slaves if slave['is_alive']]
dead_slaves = [slave for slave in slaves if not slave['is_alive']]

self.assertEqual(1, len(living_slaves))
self.assertEqual(1, len(dead_slaves))
self.assertEqual(1, len(slaves))
2 changes: 2 additions & 0 deletions test/functional/test_cluster_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from app.common.build_artifact import BuildArtifact
from app.master.build import BuildStatus
from app.master.slave import SlaveRegistry
from test.framework.functional.base_functional_test_case import BaseFunctionalTestCase
from test.framework.functional.fs_item import Directory, File
from test.functional.job_configs import BASIC_FAILING_JOB, BASIC_JOB, FAILING_SETUP_JOB, JOB_WITH_SETUP_AND_TEARDOWN
Expand Down Expand Up @@ -72,6 +73,7 @@ def test_git_type_demo_project_config(self):
build_id=build_id, expected_build_artifact_contents=expected_artifact_contents)

def test_slave_reconnection_does_not_take_down_master(self):
SlaveRegistry.reset_singleton()
test_config = JOB_WITH_SETUP_AND_TEARDOWN
job_config = yaml.safe_load(test_config.config[os.name])['JobWithSetupAndTeardown']
master = self.cluster.start_master()
Expand Down

0 comments on commit 25217f5

Please sign in to comment.