Skip to content

Commit

Permalink
Immediately abort node creation on first error.
Browse files Browse the repository at this point in the history
Previous behavior was to continue starting nodes and die later.

Also, ensure that thread pool objects are always closed when we stop using
them (which is important when ElastiCluster is used as an API).
  • Loading branch information
riccardomurri committed Jan 6, 2021
1 parent 892c9e3 commit 104f3c4
Showing 1 changed file with 54 additions and 50 deletions.
104 changes: 54 additions & 50 deletions elasticluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from builtins import range
from builtins import object
from collections import defaultdict
from contextlib import closing
from copy import copy
from functools import reduce
import itertools
Expand Down Expand Up @@ -522,44 +523,44 @@ def _start_nodes_parallel(self, nodes, max_thread_pool_size):
Return set of nodes that were actually started.
"""
# Create one thread for each node to start
thread_pool_size = min(len(nodes), max_thread_pool_size)
thread_pool = Pool(processes=thread_pool_size)
log.debug("Note: starting %d nodes concurrently.", thread_pool_size)

# pressing Ctrl+C flips this flag, which in turn stops the main loop
# down below
keep_running = True

def sigint_handler(signal, frame):
"""
Makes sure the cluster is saved, before the sigint results in
exiting during node startup.
"""
log.error(
"Interrupted: will save cluster state and exit"
" after all nodes have started.")
keep_running = False

# intercept Ctrl+C
with sighandler(signal.SIGINT, sigint_handler):
result = thread_pool.map_async(self._start_node, nodes)
while not result.ready():
result.wait(1)
# check if Ctrl+C was pressed
if not keep_running:
log.error("Aborting upon user interruption ...")
# FIXME: `.close()` will keep the pool running until all
# nodes have been started; should we use `.terminate()`
# instead to interrupt node creation as soon as possible?
thread_pool.close()
thread_pool.join()
self.repository.save_or_update(self)
# FIXME: should raise an exception instead!
sys.exit(1)

# keep only nodes that were successfully started
return set(node for node, ok
in zip(nodes, result.get()) if ok)
thread_pool_size = self._get_thread_pool_size(max_thread_pool_size)
# FIXME: starting Py3.3, `Pool()` objects support the context manager
# protocol, so we can remove the `closing(...)` wrapper
with closing(Pool(thread_pool_size)) as thread_pool:
log.debug("Note: starting %d nodes concurrently.", thread_pool_size)

# pressing Ctrl+C flips this flag, which in turn stops the main loop
# down below
keep_running = True

def sigint_handler(signal, frame):
"""
Makes sure the cluster is saved, before the sigint results in
exiting during node startup.
"""
log.error(
"Interrupted: will save cluster state and exit"
" after all nodes have started.")
keep_running = False

# intercept Ctrl+C
with sighandler(signal.SIGINT, sigint_handler):
result = thread_pool.map_async(self._start_node, nodes)
while not result.ready():
result.wait(1)
# check if Ctrl+C was pressed
if not keep_running:
log.error("Aborting upon user interruption ...")
# interrupt node creation as soon as possible
thread_pool.terminate()
thread_pool.join()
self.repository.save_or_update(self)
# FIXME: should raise an exception instead!
sys.exit(1)

# keep only nodes that were successfully started
return set(node for node, ok
in zip(nodes, result.get()) if ok)

@staticmethod
def _start_node(node):
Expand Down Expand Up @@ -835,7 +836,7 @@ def _stop_all_nodes(self, wait=False):
node.name, node.instance_id, err, err.__class__)
return failed

def _make_thread_pool(self, max_thread_pool_size):
def _get_thread_pool_size(self, max_thread_pool_size=0):
try:
if max_thread_pool_size == 0:
max_thread_pool_size = 4 * get_num_processors()
Expand All @@ -844,8 +845,7 @@ def _make_thread_pool(self, max_thread_pool_size):
"Cannot determine number of processors!"
" will start nodes sequentially...")
max_thread_pool_size = 1
thread_pool_size = min(len(self.get_all_nodes()), max_thread_pool_size)
return Pool(processes=thread_pool_size)
return min(len(self.get_all_nodes()), max_thread_pool_size)

def _pause_all_nodes(self, max_thread_pool_size=0):
"""Pause all cluster nodes - ensure that we store data so that in
Expand All @@ -872,20 +872,21 @@ def _pause_specific_node(node):
return None

nodes = self.get_all_nodes()
thread_pool = self._make_thread_pool(max_thread_pool_size)
for node, state in zip(nodes, thread_pool.map(_pause_specific_node, nodes)):
if state is None:
failed += 1
else:
self.paused_nodes[node.name] = state
# FIXME: starting Py3.3, `Pool()` objects support the context manager
# protocol, so we can remove the `closing(...)` wrapper
with closing(Pool(self._get_thread_pool_size(max_thread_pool_size))) as thread_pool:
for node, state in zip(nodes, thread_pool.map(_pause_specific_node, nodes)):
if state is None:
failed += 1
else:
self.paused_nodes[node.name] = state

return failed

def _resume_all_nodes(self, max_thread_pool_size=0):
if not self.paused_nodes:
log.warning("Didn't find any paused nodes - not resuming anything.")
return
thread_pool = self._make_thread_pool(max_thread_pool_size)

def _resume_single_node(node_name):
node_state = self.paused_nodes[node_name]
Expand All @@ -898,8 +899,11 @@ def _resume_single_node(node_name):
log.error("Could not resume node `%s` - %s.", node_name, err)
return None

for node_name in thread_pool.map(_resume_single_node, self.paused_nodes):
del self.paused_nodes[node_name]
# FIXME: starting Py3.3, `Pool()` objects support the context manager
# protocol, so we can remove the `closing(...)` wrapper
with closing(Pool(self._get_thread_pool_size(max_thread_pool_size))) as thread_pool:
for node_name in thread_pool.map(_resume_single_node, self.paused_nodes):
del self.paused_nodes[node_name]
return len(self.paused_nodes)

def get_ssh_to_node(self, ssh_to=None):
Expand Down

0 comments on commit 104f3c4

Please sign in to comment.