Skip to content

Commit

Permalink
DRAFT - Remove disconnected slaves from slave list
Browse files Browse the repository at this point in the history
This is just a rough draft to demonstrate what I'm thinking around
how to remove slaves from the slave list when they're disconnected.
This is all untested and should not be merged. :P
  • Loading branch information
josephharrington committed Dec 8, 2016
1 parent 11dc3cd commit 079e82c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 24 deletions.
4 changes: 2 additions & 2 deletions app/master/build_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from queue import Empty
from threading import Lock

from app.master.slave import SlaveMarkedForShutdownError
from app.master.slave import SlaveMarkedForShutdownError, DeadSlaveError
from app.util import analytics
from app.util.log import get_logger

Expand Down Expand Up @@ -109,7 +109,7 @@ def execute_next_subjob_or_free_executor(self, slave):
try:
slave.start_subjob(subjob)
subjob.mark_in_progress(slave)
except SlaveMarkedForShutdownError:
except (SlaveMarkedForShutdownError, DeadSlaveError):
self._build._unstarted_subjobs.put(subjob) # todo: This changes subjob execution order. (Issue #226)
# An executor is currently allocated for this subjob in begin_subjob_executions_on_slave.
# Since the slave has been marked for shutdown, we need to free the executor.
Expand Down
89 changes: 68 additions & 21 deletions app/master/cluster_master.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import OrderedDict
import os
from threading import Lock

from app.common.cluster_service import ClusterService
from app.master.build import Build
Expand All @@ -25,7 +26,7 @@ class ClusterMaster(ClusterService):
def __init__(self):
self._logger = get_logger(__name__)
self._master_results_path = Configuration['results_directory']
self._all_slaves_by_url = {}
self._slave_registry = SlaveRegistry()
self._all_builds_by_id = OrderedDict()
self._scheduler_pool = BuildSchedulerPool()
self._build_request_handler = BuildRequestHandler(self._scheduler_pool)
Expand Down Expand Up @@ -78,10 +79,7 @@ def all_slaves_by_id(self):
Retrieve all connected slaves
:rtype: dict [int, Slave]
"""
slaves_by_slave_id = {}
for slave in self._all_slaves_by_url.values():
slaves_by_slave_id[slave.id] = slave
return slaves_by_slave_id
return self._slave_registry.all_slaves_by_id()

def get_slave(self, slave_id=None, slave_url=None):
"""
Expand All @@ -95,18 +93,10 @@ def get_slave(self, slave_id=None, slave_url=None):
:return: The instance of the slave
:rtype: Slave
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
for slave in self._all_slaves_by_url.values():
if slave.id == slave_id:
return slave
else:
if slave_url in self._all_slaves_by_url:
return self._all_slaves_by_url[slave_url]

raise ItemNotFoundError('Requested slave ({}) does not exist.'.format(slave_id))
slave = self._slave_registry.get_slave(slave_id, slave_url)
if not slave:
raise ItemNotFoundError('Requested slave ({}) does not exist.'.format(slave_id or slave_url))
return slave

def connect_slave(self, slave_url, num_executors, slave_session_id=None):
"""
Expand All @@ -121,8 +111,8 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
# todo: Validate arg types for this and other methods called via API.
# If a slave had previously been connected, and is now being reconnected, the cleanest way to resolve this
# bookkeeping is for the master to forget about the previous slave instance and start with a fresh instance.
if slave_url in self._all_slaves_by_url:
old_slave = self._all_slaves_by_url.get(slave_url)
old_slave = self._slave_registry.get_slave(slave_url=slave_url)
if old_slave:
self._logger.warning('Slave requested to connect to master, even though previously connected as {}. ' +
'Removing existing slave instance from the master\'s bookkeeping.', old_slave)

Expand All @@ -141,7 +131,7 @@ def connect_slave(self, slave_url, num_executors, slave_session_id=None):
old_slave)

slave = Slave(slave_url, num_executors, slave_session_id)
self._all_slaves_by_url[slave_url] = slave
self._slave_registry.add_slave(slave)
self._slave_allocator.add_idle_slave(slave)
self._logger.info('Slave on {} connected to master with {} executors. (id: {})',
slave_url, num_executors, slave.id)
Expand Down Expand Up @@ -196,6 +186,7 @@ def _disconnect_slave(self, slave):
# Mark slave dead. We do not remove it from the list of all slaves. We also do not remove it from idle_slaves;
# that will happen during slave allocation.
slave.mark_dead()
self._slave_registry.remove_slave(slave)
# todo: Fail/resend any currently executing subjobs still executing on this slave.
self._logger.info('Slave on {} was disconnected. (id: {})', slave.url, slave.id)

Expand Down Expand Up @@ -274,7 +265,7 @@ def handle_result_reported_from_slave(self, slave_url, build_id, subjob_id, payl
"""
self._logger.info('Results received from {} for subjob. (Build {}, Subjob {})', slave_url, build_id, subjob_id)
build = self._all_builds_by_id[int(build_id)]
slave = self._all_slaves_by_url[slave_url]
slave = self._slave_registry.get_slave(slave_url=slave_url)
# If the build has been canceled, don't work on the next subjob.
if not build.is_finished: # WIP(joey): This check should be internal to the Build object.
try:
Expand Down Expand Up @@ -314,3 +305,59 @@ def get_path_for_build_results_archive(self, build_id):
raise ItemNotReadyError('Build artifact file is not yet ready. Try again later.')

return archive_file


class SlaveRegistry:
def __init__(self):
self._all_slaves_by_url = {}
self._slave_list_lock = Lock()

def add_slave(self, slave):
"""
:type slave: Slave
"""
with self._slave_list_lock:
self._all_slaves_by_url[slave.url] = slave

def remove_slave(self, slave):
"""
:type slave: Slave
"""
with self._slave_list_lock:
if self._all_slaves_by_url[slave.url] == slave:
del self._all_slaves_by_url[slave.url]

def all_slaves_by_id(self):
"""
Retrieve all connected slaves
:rtype: dict [int, Slave]
"""
slaves_by_slave_id = {}
for slave in self._all_slaves_by_url.values():
slaves_by_slave_id[slave.id] = slave
return slaves_by_slave_id

def get_slave(self, slave_id=None, slave_url=None):
"""
Get the instance of given slave by either the slave's id or url. Only one of slave_id or slave_url should be
specified.
:param slave_id: The id of the slave to return
:type slave_id: int
:param slave_url: The url of the slave to return
:type slave_url: str
:return: The instance of the slave
:rtype: Slave | None
"""
if (slave_id is None) == (slave_url is None):
raise ValueError('Only one of slave_id or slave_url should be specified to get_slave().')

if slave_id is not None:
for slave in self._all_slaves_by_url.values():
if slave.id == slave_id:
return slave
else:
if slave_url in self._all_slaves_by_url:
return self._all_slaves_by_url[slave_url]

return None
2 changes: 1 addition & 1 deletion app/slave/cluster_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def kill(self):
"""
Exits without error.
"""
sys.exit(0)
sys.exit(0) # Should trigger graceful shutdown which will send DISCONNECTED status to master.


class SlaveState(str, Enum):
Expand Down

0 comments on commit 079e82c

Please sign in to comment.