Skip to content

Commit

Permalink
Update JobController to change interface for kill handling (just set …
Browse files Browse the repository at this point in the history
…wait_time)

I think the standard way to do things *ought* to be to send a SIGTERM,
wait politely for termination, then send a SIGKILL if the process does
not wrap up before some timeout.  We can do this just by setting a timeout
argument (and use 0 to indicate "SIGKILL at once" and None to indicate
"never SIGKILL").
  • Loading branch information
dbindel committed Sep 7, 2018
1 parent a005805 commit 569bec5
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 103 deletions.
2 changes: 1 addition & 1 deletion libensemble/balsam_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from libensemble.register import Register
from libensemble.resources import Resources
from libensemble.controller import \
Job, JobController, JobControllerException, jassert, STATES, SIGNALS
Job, JobController, JobControllerException, jassert, STATES

logger = logging.getLogger(__name__ + '(' + Resources.get_my_name() + ')')
#For debug messages in this module - uncomment (see libE.py to change root logging level)
Expand Down
64 changes: 5 additions & 59 deletions libensemble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@
USER_KILLED
FAILED""".split()

SIGNALS = """
SIGTERM
SIGKILL""".split()


#I may want to use a top-level abstract/base class for maximum re-use
# - else inherited controller will be reimplementing common code

class JobControllerException(Exception): pass

Expand Down Expand Up @@ -240,8 +233,6 @@ def __init__(self, registry=None, auto_resources=True,
'-npernode {ranks_per_node}'],
}
self.mpi_command = mpi_commands[Resources.get_MPI_variant()]
self.kill_signal = 'SIGTERM'
self.wait_and_kill = True
self.wait_time = 60
self.list_of_jobs = []
self.workerID = None
Expand Down Expand Up @@ -452,12 +443,10 @@ def kill(self, job):
job: obj: Job
The job object.to be polled.
The signal used is determined by the job_controller attribute
<kill_signal> will be send to the job, followed by a wait for
the process to terminate. If the <wait_and_kill> attribute is
True, then a SIGKILL will be sent if the job has not finished
after <wait_time> seconds. The kill can be configured using the
set_kill_mode function.
Sends SIGTERM, waits for a period of <wait_time> for graceful
termination, then sends a hard kill with SIGKILL. If <wait_time>
is 0, we go immediately to SIGKILL; if <wait_time> is None, we
never do a SIGKILL.
"""

jassert(isinstance(job, Job), "Invalid job has been provided")
Expand All @@ -474,55 +463,12 @@ def kill(self, job):
"check jobs been launched".format(job.name))

logger.debug("Killing job {}".format(job.name))

jassert(self.kill_signal in ['SIGTERM', 'SIGKILL'],
"Unknown kill signal")

timeout = 0 # Default is to just kill and wait
if self.kill_signal == 'SIGTERM': # For a graceful kill
timeout = None # Terminate and just wait
if self.wait_and_kill: # Or if we want to wait and kill...
timeout = self.wait_time # Set a timeout

launcher.cancel(job.process, timeout)

launcher.cancel(job.process, self.wait_time)
job.state = 'USER_KILLED'
job.finished = True
job.calc_job_timing()


def set_kill_mode(self, signal=None, wait_and_kill=None, wait_time=None):
"""Configures the kill mode for the job_controller
Parameters
----------
signal: String, optional
The signal type to be sent to kill job: 'SIGTERM' or 'SIGKILL'
wait_and_kill: boolean, optional
If True, a SIGKILL will be sent after <wait_time> seconds if
the process has not terminated.
wait_time: int, optional
The number of seconds to wait for the job to finish before
sending a SIGKILL when wait_and_kill is set. (Default is 60).
"""
if signal is not None:
jassert(signal in SIGNALS,
"Unknown signal {} supplied to set_kill_mode".
format(signal))
self.kill_signal = signal

if wait_and_kill is not None:
self.wait_and_kill = wait_and_kill

if wait_time is not None:
self.wait_time = wait_time
if not wait_and_kill:
logger.warning('wait_time does nothing if wait_and_kill=False')


def get_job(self, jobid):
""" Returns the job object for the supplied job ID """
if self.list_of_jobs:
Expand Down
45 changes: 2 additions & 43 deletions libensemble/tests/unit_tests/test_jobcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_doublekill():
args_for_sim = 'sleep 2.0'
job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args_for_sim)
jobctl.poll(job)
jobctl.set_kill_mode(wait_and_kill=True, wait_time=5)
jobctl.wait_time = 5

jobctl.kill(job)
assert job.finished, "job.finished should be True. Returned " + str(job.finished)
Expand Down Expand Up @@ -377,7 +377,7 @@ def test_launch_and_kill():
cores = NCORES
args_for_sim = 'sleep 2.0'
job_list = []
jobctl.set_kill_mode(wait_and_kill=True, timeout=1)
jobctl.wait_time = 1
for jobid in range(5):
job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args_for_sim)
jobctl.kill(job)
Expand Down Expand Up @@ -512,46 +512,6 @@ def test_poll_job_with_no_launch():
assert 0


def test_set_kill_mode():
print("\nTest: {}\n".format(sys._getframe().f_code.co_name))
setup_job_controller()
jobctl = JobController.controller
cores = NCORES

signal_b4 = jobctl.kill_signal
wait_and_kill_b4 = jobctl.wait_and_kill
wait_time_b4 = jobctl.wait_time

# Change nothing.
jobctl.set_kill_mode()
assert jobctl.kill_signal == signal_b4
assert jobctl.wait_and_kill == wait_and_kill_b4
assert jobctl.wait_time == wait_time_b4

# While these options are set - wait_time will not be used. Result is warning.
jobctl.set_kill_mode(signal='SIGKILL', wait_and_kill=False, wait_time=10)
assert jobctl.kill_signal == 'SIGKILL'
assert not jobctl.wait_and_kill
assert jobctl.wait_time == 10

# Now correct
jobctl.set_kill_mode(signal='SIGTERM', wait_and_kill=True, wait_time=20)
assert jobctl.kill_signal == 'SIGTERM'
assert jobctl.wait_and_kill
assert jobctl.wait_time == 20

#Todo:
#Testing wait_and_kill is harder - need to create a process that does not respond to sigterm in time.

# Try set to unknown signal
try:
jobctl.set_kill_mode(signal='SIGDIE')
except:
assert 1
else:
assert 0


def test_job_failure():
print("\nTest: {}\n".format(sys._getframe().f_code.co_name))
setup_job_controller()
Expand Down Expand Up @@ -582,7 +542,6 @@ def test_job_failure():
test_launch_no_app()
test_kill_job_with_no_launch()
test_poll_job_with_no_launch()
test_set_kill_mode()
test_job_failure()
#teardown_module(__file__)

0 comments on commit 569bec5

Please sign in to comment.