Skip to content

Commit

Permalink
Allocating, de-allocating, and then re-allocating slaves should work.
Browse files Browse the repository at this point in the history
This PR should fix #313

Currently, when slaves are added, removed, and added again to a build,
the build fails to complete successfully.
  • Loading branch information
TJ Lee committed Jun 23, 2016
1 parent e46fc84 commit 8623b08
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 45 deletions.
20 changes: 3 additions & 17 deletions app/master/build_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ class BuildRequestHandler(object):
entity) to pull Builds from.
All of the input of builds come through self.handle_build_request() calls, and all of the output
of builds go through self.next_prepared_build_scheduler() calls.
of builds go through self._scheduler_pool.next_prepared_build_scheduler() calls.
"""
def __init__(self, scheduler_pool):
"""
:type scheduler_pool: BuildSchedulerPool
:type scheduler_pool: app.master.build_scheduler_pool.BuildSchedulerPool
"""
self._logger = get_logger(__name__)
self._scheduler_pool = scheduler_pool
self._builds_waiting_for_slaves = Queue()
self._request_queue = Queue()
self._request_queue_worker_thread = SafeThread(
target=self._build_preparation_loop, name='RequestHandlerLoop', daemon=True)
Expand All @@ -55,19 +54,6 @@ def handle_build_request(self, build):
analytics.record_event(analytics.BUILD_REQUEST_QUEUED, build_id=build.build_id(),
log_msg='Queued request for build {build_id}.')

def next_prepared_build_scheduler(self):
"""
Get the scheduler for the next build that has successfully completed build preparation.
This is a blocking call--if there are no more builds that have completed build preparation and this
method gets invoked, the execution will hang until the next build has completed build preparation.
:rtype: BuildScheduler
"""
build = self._builds_waiting_for_slaves.get()
build_scheduler = self._scheduler_pool.get(build)
return build_scheduler

def _build_preparation_loop(self):
"""
Grabs a build off the request_queue (populated by self.handle_build_request()), prepares it,
Expand Down Expand Up @@ -111,7 +97,7 @@ def _prepare_build_async(self, build, project_lock):
# If there is work to be done, this build must queue to be allocated slaves.
else:
self._logger.info('Build {} is waiting for slaves.', build.build_id())
self._builds_waiting_for_slaves.put(build)
self._scheduler_pool.add_build_waiting_for_slaves(build)

except Exception as ex: # pylint: disable=broad-except
build.mark_failed(str(ex)) # WIP(joey): Build should do this internally.
Expand Down
24 changes: 16 additions & 8 deletions app/master/build_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ class BuildScheduler(object):
itself shouldn't know anything about its scheduler or slaves or where subjobs
are executing. All that goes here.
Ususally this class is instantiated and managed by a BuildSchedulerPool.
This class is instantiated and managed by a BuildSchedulerPool.
"""
def __init__(self, build):
def __init__(self, build, scheduler_pool):
"""
:type build: Build
:type build: app.master.build.Build
:type scheduler_pool: app.master.build_scheduler_pool.BuildSchedulerPool
"""
self._logger = get_logger(__name__)
self._build = build
self._scheduler_pool = scheduler_pool

job_config = build.project_type.job_config()
self._max_executors = job_config.max_executors
Expand Down Expand Up @@ -63,10 +65,6 @@ def allocate_slave(self, slave):
:type slave: Slave
"""
if not self._slaves_allocated:
# If this is the first slave to be allocated, update the build state.
self._build.mark_started()

self._slaves_allocated.append(slave)
slave.setup(self._build, executor_start_index=self._num_executors_allocated)
self._num_executors_allocated += min(slave.num_executors, self._max_executors_per_slave)
Expand Down Expand Up @@ -101,18 +99,23 @@ def execute_next_subjob_or_free_executor(self, slave):
# this method, finds the subjob queue empty, and is torn down. If that was the last 'living' slave, the
# build would be stuck.
with self._subjob_assignment_lock:
is_first_subjob = False
if self._build._unstarted_subjobs.qsize() == len(self._build.all_subjobs()):
is_first_subjob = True
subjob = self._build._unstarted_subjobs.get(block=False)
self._logger.debug('Sending subjob {} (build {}) to slave {}.',
subjob.subjob_id(), subjob.build_id(), slave.url)
try:
slave.start_subjob(subjob)
subjob.mark_in_progress(slave)

except SlaveMarkedForShutdownError:
self._build._unstarted_subjobs.put(subjob) # todo: This changes subjob execution order. (Issue #226)
# An executor is currently allocated for this subjob in begin_subjob_executions_on_slave.
# Since the slave has been marked for shutdown, we need to free the executor.
self._free_slave_executor(slave)
else:
if is_first_subjob:
self._build.mark_started()

except Empty:
self._free_slave_executor(slave)
Expand All @@ -126,3 +129,8 @@ def _free_slave_executor(self, slave):
pass # We have already deallocated this slave, no need to teardown
else:
slave.teardown()
# If all slaves are removed from a build that isn't done, but had already started, then we must
# make sure that when slave resources are available again, that this build them allocated.
# https://github.com/box/ClusterRunner/issues/313
if len(self._slaves_allocated) == 0 and self.needs_more_slaves():
self._scheduler_pool.add_build_waiting_for_slaves(self._build)
22 changes: 21 additions & 1 deletion app/master/build_scheduler_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from queue import Queue
from threading import Lock

from app.master.build_scheduler import BuildScheduler
Expand All @@ -12,6 +13,7 @@ class exists to make it easier to create and manage scheduler instances.
def __init__(self):
self._schedulers_by_build_id = {}
self._scheduler_creation_lock = Lock()
self._builds_waiting_for_slaves = Queue()

def get(self, build):
"""
Expand All @@ -22,7 +24,25 @@ def get(self, build):
scheduler = self._schedulers_by_build_id.get(build.build_id())
if scheduler is None:
# WIP(joey): clean up old schedulers (search through list and remove any with finished builds)
scheduler = BuildScheduler(build)
scheduler = BuildScheduler(build, self)
self._schedulers_by_build_id[build.build_id()] = scheduler

return scheduler

def next_prepared_build_scheduler(self):
"""
Get the scheduler for the next build that has successfully completed build preparation.
This is a blocking call--if there are no more builds that have completed build preparation and this
method gets invoked, the execution will hang until the next build has completed build preparation.
:rtype: BuildScheduler
"""
build = self._builds_waiting_for_slaves.get()
return self.get(build)

def add_build_waiting_for_slaves(self, build):
"""
:type build: app.master.build.Build
"""
self._builds_waiting_for_slaves.put(build)
2 changes: 1 addition & 1 deletion app/master/cluster_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self):
self._scheduler_pool = BuildSchedulerPool()
self._build_request_handler = BuildRequestHandler(self._scheduler_pool)
self._build_request_handler.start()
self._slave_allocator = SlaveAllocator(self._build_request_handler)
self._slave_allocator = SlaveAllocator(self._scheduler_pool)
self._slave_allocator.start()

# Asynchronously delete (but immediately rename) all old builds when master starts.
Expand Down
8 changes: 4 additions & 4 deletions app/master/slave_allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ class SlaveAllocator(object):
The SlaveAllocator class is responsible for allocating slaves to prepared builds.
"""

def __init__(self, build_request_handler):
def __init__(self, scheduler_pool):
"""
:type build_request_handler: BuildRequestHandler
:type scheduler_pool: app.master.build_scheduler_pool.BuildSchedulerPool
"""
self._logger = get_logger(__name__)
self._build_request_handler = build_request_handler
self._scheduler_pool = scheduler_pool
self._idle_slaves = OrderedSetQueue()
self._allocation_thread = SafeThread(
target=self._slave_allocation_loop, name='SlaveAllocationLoop', daemon=True)
Expand All @@ -36,7 +36,7 @@ def _slave_allocation_loop(self):
"""
while True:
# This is a blocking call that will block until there is a prepared build.
build_scheduler = self._build_request_handler.next_prepared_build_scheduler()
build_scheduler = self._scheduler_pool.next_prepared_build_scheduler()

while build_scheduler.needs_more_slaves():
claimed_slave = self._idle_slaves.get()
Expand Down
31 changes: 31 additions & 0 deletions test/functional/job_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,34 @@ def __init__(self, config, expected_to_fail, expected_num_subjobs, expected_num_
File('subjob_file_3.txt', contents='setup.\nsubjob 3.\nteardown.\n'),
],
)

# This is a very basic job where each atom just creates a simple text file.
JOB_WITH_SLEEPS = FunctionalTestJobConfig(
config={
'posix': """
BasicSleepingJob:
commands:
- sleep 1
atomizers:
- TOKEN: seq 0 4 | xargs -I {} echo "This is atom {}"
""",
'nt': """
BasicSleepingJob:
commands:
- timeout 1 > NUL
atomizers:
- TOKEN: FOR /l %n in (0,1,4) DO @echo This is atom %n
""",
},
expected_to_fail=False,
expected_num_subjobs=5,
expected_num_atoms=5,
expected_artifact_contents=[
Directory('artifact_0_0', DEFAULT_ATOM_FILES + [File('result.txt', contents='This is atom 0\n')]),
Directory('artifact_1_0', DEFAULT_ATOM_FILES + [File('result.txt', contents='This is atom 1\n')]),
Directory('artifact_2_0', DEFAULT_ATOM_FILES + [File('result.txt', contents='This is atom 2\n')]),
Directory('artifact_3_0', DEFAULT_ATOM_FILES + [File('result.txt', contents='This is atom 3\n')]),
Directory('artifact_4_0', DEFAULT_ATOM_FILES + [File('result.txt', contents='This is atom 4\n')]),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import tempfile
import yaml

from test.framework.functional.base_functional_test_case import BaseFunctionalTestCase
from test.functional.job_configs import JOB_WITH_SLEEPS


class TestDeallocationAndAllocationOfSlavesMidBuild(BaseFunctionalTestCase):
def test_build_completes_after_allocating_deallocating_and_reallocating_slaves_to_build(self):
master = self.cluster.start_master()
# Only one slave, with one executor. This means that the slave should be able to
# theoretically finish the build in 5 seconds, as this job definition has 5 atoms,
# with each sleeping for 1 second.
self.cluster.start_slaves(1, num_executors_per_slave=1, start_port=43001)
project_dir = tempfile.TemporaryDirectory()
build_resp = master.post_new_build({
'type': 'directory',
'config': yaml.safe_load(JOB_WITH_SLEEPS.config[os.name])['BasicSleepingJob'],
'project_directory': project_dir.name,
})
build_id = build_resp['build_id']
master.block_until_build_started(build_id, timeout=10)
master.graceful_shutdown_slaves_by_id([1])
self.cluster.block_until_n_slaves_dead(num_slaves=1, timeout=10)
self.cluster.kill_slaves(kill_gracefully=False)
self.assert_build_status_contains_expected_data(build_id, {'status': 'BUILDING'})
self.cluster.start_slaves(1, num_executors_per_slave=1, start_port=43001)
master.block_until_build_finished(build_id, timeout=10)
self.assert_build_has_successful_status(build_id)
17 changes: 8 additions & 9 deletions test/unit/master/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ def test_build_status_returns_prepared_after_build_preparation(self):
self.assertEqual(build._status(), BuildStatus.PREPARED,
'Build status should be PREPARED after build has been prepared.')

def test_build_status_returns_building_after_setup_has_started(self):
def test_build_status_returns_building_after_first_subjob_has_been_executed(self):
mock_slave = self._create_mock_slave()
build = self._create_test_build(BuildStatus.PREPARED)
scheduler = self.scheduler_pool.get(build)

scheduler.allocate_slave(mock_slave)
scheduler.execute_next_subjob_or_free_executor(mock_slave)

self.assertEqual(build._status(), BuildStatus.BUILDING,
'Build status should be BUILDING after setup has started on slaves.')
Expand Down Expand Up @@ -359,23 +359,22 @@ def test_preparing_build_creates_empty_results_directory(self):

self.mock_util.fs.create_dir.assert_called_once_with(build._build_results_dir())

def test_allocating_slave_to_build_sets_building_timestamp_only_on_first_slave_allocation(self):
def test_execute_next_subjob_or_free_executor_sets_building_timestamp_only_on_first_execution(self):
mock_slave1 = self._create_mock_slave()
mock_slave2 = self._create_mock_slave()
build = self._create_test_build(BuildStatus.PREPARED)
scheduler = self.scheduler_pool.get(build)
scheduler.allocate_slave(slave=mock_slave1)

self.assertIsNone(self._get_build_state_timestamp(build, BuildState.BUILDING),
'"building" timestamp should not be set until slave allocated.')

scheduler.allocate_slave(slave=mock_slave1)
scheduler.execute_next_subjob_or_free_executor(mock_slave1)
building_timestamp1 = self._get_build_state_timestamp(build, BuildState.BUILDING)
scheduler.allocate_slave(slave=mock_slave2)
scheduler.execute_next_subjob_or_free_executor(mock_slave1)
building_timestamp2 = self._get_build_state_timestamp(build, BuildState.BUILDING)

self.assertIsNotNone(building_timestamp1, '"building" timestamp should be set after first slave allocated.')
self.assertIsNotNone(building_timestamp1, '"building" timestamp should be set after first subjob is started.')
self.assertEqual(building_timestamp1, building_timestamp2,
'"building" timestamp should not change upon further slave allocation.')
'"building" timestamp should not change upon further subjob execution.')

def test_finishing_build_sets_finished_timestamp(self):
build = self._create_test_build(BuildStatus.BUILDING)
Expand Down
9 changes: 4 additions & 5 deletions test/unit/master/test_slave_allocator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import Mock

from app.master.build import Build
from app.master.build_request_handler import BuildRequestHandler
from app.master.build_scheduler_pool import BuildSchedulerPool
from app.master.slave import Slave
from app.master.slave_allocator import SlaveAllocator
from test.framework.base_unit_test_case import BaseUnitTestCase
Expand Down Expand Up @@ -29,7 +29,7 @@ def test_slave_allocation_loop_should_allocate_a_slave(self):
allocate_slave=Mock(side_effect=AbortLoopForTesting))
mock_slave = Mock(spec=Slave, url='', is_alive=Mock(return_value=True), is_shutdown=Mock(return_value=False))
slave_allocator = self._create_slave_allocator()
slave_allocator._build_request_handler.next_prepared_build_scheduler = Mock(return_value=mock_build)
slave_allocator._scheduler_pool.next_prepared_build_scheduler = Mock(return_value=mock_build)
slave_allocator._idle_slaves.get = Mock(return_value=mock_slave)

self.assertRaises(AbortLoopForTesting, slave_allocator._slave_allocation_loop)
Expand All @@ -38,7 +38,7 @@ def test_slave_allocation_loop_should_return_idle_slave_to_queue_if_not_needed(s
mock_build = Mock(spec=Build, needs_more_slaves=Mock(side_effect=[True, False]))
mock_slave = Mock(spec=Slave, url='', is_alive=Mock(return_value=True), is_shutdown=Mock(return_value=False))
slave_allocator = self._create_slave_allocator()
slave_allocator._build_request_handler.next_prepared_build_scheduler = Mock(return_value=mock_build)
slave_allocator._scheduler_pool.next_prepared_build_scheduler = Mock(return_value=mock_build)
slave_allocator._idle_slaves.get = Mock(return_value=mock_slave)
slave_allocator.add_idle_slave = Mock(side_effect=AbortLoopForTesting)

Expand Down Expand Up @@ -71,8 +71,7 @@ def _create_slave_allocator(self, **kwargs):
:param kwargs: Any constructor parameters for the slave; if none are specified, test defaults will be used.
:rtype: SlaveAllocator
"""
kwargs.setdefault('build_request_handler', Mock(spec_set=BuildRequestHandler))
return SlaveAllocator(**kwargs)
return SlaveAllocator(Mock(spec_set=BuildSchedulerPool))

class AbortLoopForTesting(Exception):
"""
Expand Down

0 comments on commit 8623b08

Please sign in to comment.