Skip to content

Commit

Permalink
DRAFT - Remove disconnected slaves from slave list
Browse files Browse the repository at this point in the history
This is just a rough draft to demonstrate what I'm thinking around
how to remove slaves from the slave list when they're disconnected.
This is all untested and should not be merged. :P
  • Loading branch information
josephharrington committed Dec 8, 2016
1 parent 11dc3cd commit c83a96f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 26 deletions.
4 changes: 2 additions & 2 deletions app/master/build_scheduler.py
@@ -1,7 +1,7 @@
from queue import Empty
from threading import Lock

from app.master.slave import SlaveMarkedForShutdownError
from app.master.slave import SlaveMarkedForShutdownError, DeadSlaveError
from app.util import analytics
from app.util.log import get_logger

Expand Down Expand Up @@ -109,7 +109,7 @@ def execute_next_subjob_or_free_executor(self, slave):
try:
slave.start_subjob(subjob)
subjob.mark_in_progress(slave)
except SlaveMarkedForShutdownError:
except (SlaveMarkedForShutdownError, DeadSlaveError):
self._build._unstarted_subjobs.put(subjob) # todo: This changes subjob execution order. (Issue #226)
# An executor is currently allocated for this subjob in begin_subjob_executions_on_slave.
# Since the slave has been marked for shutdown, we need to free the executor.
Expand Down
101 changes: 78 additions & 23 deletions app/master/cluster_master.py
@@ -1,5 +1,6 @@
from collections import OrderedDict
import os
from threading import Lock

from app.common.cluster_service import ClusterService
from app.master.build import Build
Expand All @@ -25,7 +26,7 @@ class ClusterMaster(ClusterService):
def __init__(self):
self._logger = get_logger(__name__)
self._master_results_path = Configuration['results_directory']
self._all_slaves_by_url = {}
self._slave_registry = SlaveRegistry()
self._all_builds_by_id = OrderedDict()
self._scheduler_pool = BuildSchedulerPool()
self._build_request_handler = BuildRequestHandler(self._scheduler_pool)
Expand Down Expand Up @@ -78,10 +79,7 @@ def all_slaves_by_id(self):
Retrieve all connected slaves
:rtype: dict [int, Slave]
"""
slaves_by_slave_id = {}
for slave in self._all_slaves_by_url.values():
slaves_by_slave_id[slave.id] = slave
return slaves_by_slave_id
return self._slave_registry.all_slaves_by_id()

def get_slave(self, slave_id=None, slave_url=None):
"""
Expand All @@ -95,18 +93,10 @@ def get_slave(self, slave_id=None, slave_url=None):
:return: The instance of the slave
:rtype: Slave
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
for slave in self._all_slaves_by_url.values():
if slave.id == slave_id:
return slave
else:
if slave_url in self._all_slaves_by_url:
return self._all_slaves_by_url[slave_url]

raise ItemNotFoundError('Requested slave ({}) does not exist.'.format(slave_id))
slave = self._slave_registry.get_slave(slave_id, slave_url)
if not slave:
raise ItemNotFoundError('Requested slave ({}) does not exist.'.format(slave_id or slave_url))
return slave

def connect_slave(self, slave_url, num_executors, slave_session_id=None):
"""
Expand All @@ -121,8 +111,8 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
# todo: Validate arg types for this and other methods called via API.
# If a slave had previously been connected, and is now being reconnected, the cleanest way to resolve this
# bookkeeping is for the master to forget about the previous slave instance and start with a fresh instance.
if slave_url in self._all_slaves_by_url:
old_slave = self._all_slaves_by_url.get(slave_url)
old_slave = self._slave_registry.get_slave(slave_url=slave_url)
if old_slave:
self._logger.warning('Slave requested to connect to master, even though previously connected as {}. ' +
'Removing existing slave instance from the master\'s bookkeeping.', old_slave)

Expand All @@ -141,7 +131,7 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
old_slave)

slave = Slave(slave_url, num_executors, slave_session_id)
self._all_slaves_by_url[slave_url] = slave
self._slave_registry.add_slave(slave)
self._slave_allocator.add_idle_slave(slave)
self._logger.info('Slave on {} connected to master with {} executors. (id: {})',
slave_url, num_executors, slave.id)
Expand Down Expand Up @@ -193,9 +183,11 @@ def _disconnect_slave(self, slave):
:type slave: Slave
"""
# Mark slave dead. We do not remove it from the list of all slaves. We also do not remove it from idle_slaves;
# that will happen during slave allocation.
# Mark slave dead. We remove it from the slave registry, but we do not remove it from the SlaveAllocator's
# idle slaves queue because it is difficult to safely remove specific items from a queue. Instead, the slave
# allocator discards dead slaves as it encounters them.
slave.mark_dead()
self._slave_registry.remove_slave(slave)
# todo: Fail/resend any currently executing subjobs still executing on this slave.
self._logger.info('Slave on {} was disconnected. (id: {})', slave.url, slave.id)

Expand Down Expand Up @@ -274,7 +266,7 @@ def handle_result_reported_from_slave(self, slave_url, build_id, subjob_id, payl
"""
self._logger.info('Results received from {} for subjob. (Build {}, Subjob {})', slave_url, build_id, subjob_id)
build = self._all_builds_by_id[int(build_id)]
slave = self._all_slaves_by_url[slave_url]
slave = self._slave_registry.get_slave(slave_url=slave_url)
# If the build has been canceled, don't work on the next subjob.
if not build.is_finished: # WIP(joey): This check should be internal to the Build object.
try:
Expand Down Expand Up @@ -314,3 +306,66 @@ def get_path_for_build_results_archive(self, build_id):
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.')

return archive_file


class SlaveRegistry:
def __init__(self):
self._all_slaves_by_url = {}
self._slave_list_lock = Lock()

def add_slave(self, slave):
"""
:type slave: Slave
"""
with self._slave_list_lock:
self._all_slaves_by_url[slave.url] = slave

def remove_slave(self, slave):
"""
:type slave: Slave
"""
# We lock and check equivalence here to protect against the case when the following events happen nearly
# simultaneously:
# (A) A new slave is connected with the same url as an existing one so we teardown the existing slave and
# register the new slave.
# (B) A slave is disconnected and removed from the registry.
# This guards against a small race condition between those two events updating the slave registry.
with self._slave_list_lock:
registered_slave = self._all_slaves_by_url.get(slave.url)
if registered_slave and registered_slave is slave:
del self._all_slaves_by_url[slave.url]

def all_slaves_by_id(self):
"""
Retrieve all connected slaves
:rtype: dict [int, Slave]
"""
slaves_by_slave_id = {}
for slave in self._all_slaves_by_url.values():

This comment has been minimized.

Copy link
@cmcginty

cmcginty Apr 13, 2018

Contributor

probably want to just cache this.

slaves_by_slave_id[slave.id] = slave
return slaves_by_slave_id

def get_slave(self, slave_id=None, slave_url=None):
"""
Get the instance of given slave by either the slave's id or url. Only one of slave_id or slave_url should be
specified.
:param slave_id: The id of the slave to return
:type slave_id: int
:param slave_url: The url of the slave to return
:type slave_url: str
:return: The instance of the slave
:rtype: Slave | None
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
for slave in self._all_slaves_by_url.values():

This comment has been minimized.

Copy link
@cmcginty

cmcginty Apr 13, 2018

Contributor

use all_saves_by_id instead

if slave.id == slave_id:
return slave
else:
if slave_url in self._all_slaves_by_url:
return self._all_slaves_by_url[slave_url]

return None
2 changes: 1 addition & 1 deletion app/slave/cluster_slave.py
Expand Up @@ -329,7 +329,7 @@ def kill(self):
"""
Exits without error.
"""
sys.exit(0)
sys.exit(0) # Should trigger graceful shutdown which will send DISCONNECTED status to master.


class SlaveState(str, Enum):
Expand Down

1 comment on commit c83a96f

@shriganeshs-zz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. After our discussion I really liked the idea of SlaveRegistry. We need to make slave registry available at multiple places (ClusterMaster and Slave). So do we need to make it Singleton?

Please sign in to comment.