Skip to content

Commit

Permalink
grid.promise: kill timed out promise process if terminate is not enough
Browse files Browse the repository at this point in the history
  • Loading branch information
takdj committed Apr 19, 2018
1 parent 6380686 commit e794067
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 7 deletions.
27 changes: 22 additions & 5 deletions slapos/grid/promise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from multiprocessing import Process, Queue as MQueue
import Queue
from slapos.util import mkdir_p, chownDirectory
from slapos.grid.utils import dropPrivileges
from slapos.grid.utils import dropPrivileges, killProcessTree
from slapos.grid.promise import interface
from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
AnomalyResult, TestResult,
Expand Down Expand Up @@ -314,15 +314,15 @@ def __init__(self, config=None, logger=None, dry_run=False):
if not os.path.exists(self.promise_output_dir):
mkdir_p(self.promise_output_dir)

def _getErrorPromiseResult(self, promise_process, promise_name, message,
execution_time=0):
def _getErrorPromiseResult(self, promise_process, promise_name, promise_path,
message, execution_time=0):
if self.check_anomaly:
result = AnomalyResult(problem=True, message=message)
else:
result = TestResult(problem=True, message=message)
return PromiseQueueResult(
item=result,
path=os.path.join(self.promise_folder, promise_name),
path=promise_path,
name=promise_name,
title=promise_process.getPromiseTitle(),
execution_time=execution_time
Expand Down Expand Up @@ -362,6 +362,14 @@ def _loadPromiseResult(self, promise_title):
))
return result

def _emptyQueue(self):
"""Remove all entries from queue until it's empty"""
while True:
try:
self.queue_result.get_nowait()
except Queue.Empty:
return

def _launchPromise(self, promise_name, promise_path, argument_dict,
wrap_process=False):
"""
Expand Down Expand Up @@ -394,6 +402,9 @@ def _launchPromise(self, promise_name, promise_path, argument_dict,
self.logger.error(result.item.message)
return True
return False
# we can do this because we run processes one by one
# we cleanup queue in case previous result was written by a killed process
self._emptyQueue()
promise_process.start()
except Exception:
# only print traceback to not prevent run other promises
Expand Down Expand Up @@ -444,11 +455,16 @@ def _launchPromise(self, promise_name, promise_path, argument_dict,
execution_time = (current_increment + 1) * sleep_time
else:
promise_process.terminate()
promise_process.join() # wait for process to terminate
promise_process.join(1) # wait for process to terminate
# if the process is still alive after 1 seconds, we kill it
if promise_process.is_alive():
self.logger.info("Killing process %s..." % promise_name)
killProcessTree(promise_process.pid, self.logger)
message = 'Promise timed out after %s seconds' % self.promise_timeout
queue_item = self._getErrorPromiseResult(
promise_process,
promise_name=promise_name,
promise_path=promise_path,
message=message,
execution_time=execution_time
)
Expand All @@ -457,6 +473,7 @@ def _launchPromise(self, promise_name, promise_path, argument_dict,
queue_item = self._getErrorPromiseResult(
promise_process,
promise_name=promise_name,
promise_path=promise_path,
message="No output returned by the promise",
execution_time=execution_time
)
Expand Down
32 changes: 32 additions & 0 deletions slapos/grid/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import subprocess
import sys
import logging
import psutil
import time

from slapos.grid.exception import BuildoutFailedError, WrongPermissionError

Expand Down Expand Up @@ -361,3 +363,33 @@ def createPrivateDirectory(path):
raise WrongPermissionError('Wrong permissions in %s: '
'is 0%o, should be 0700'
% (path, permission))

def killProcessTree(pid, logger):
"""
kill all process Tree
We first suspend processes to prevent them from reacting to signals
"""
try:
process = psutil.Process(pid)
process.suspend()
except psutil.Error:
return

process_list = [process]
running_process_list = process.children(recursive=True)
while running_process_list:
process_list += running_process_list
for child in running_process_list:
try:
child.suspend()
except psutil.Error, e:
logger.debug(str(e))

time.sleep(0.2)
running_process_list = set(process.children(recursive=True)).difference(process_list)

for process in process_list:
try:
process.kill()
except psutil.Error, e:
logger.debug("Process kill: %s" % e)
33 changes: 31 additions & 2 deletions slapos/tests/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def tearDown(self):
if sys.path[0] == self.plugin_dir:
del sys.path[0]

def configureLauncher(self, save_method=None, timeout=0.5, master_url="", debug=False,
def configureLauncher(self, save_method=None, timeout=1, master_url="", debug=False,
run_list=[], uid=None, gid=None, enable_anomaly=False, force=False,
logdir=True, dry_run=False):
parameter_dict = {
Expand Down Expand Up @@ -927,13 +927,42 @@ def test_method(result):

self.configureLauncher(save_method=test_method, enable_anomaly=True, timeout=1)
self.generatePromiseScript(promise_name, success=True, content="""import time
time.sleep(5)""")
time.sleep(20)""")

# run promise will timeout
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertTrue(self.called)

def test_runpromise_wrapped_will_timeout(self):
promise_name = "my_bash_promise"
promise_path = os.path.join(self.legacy_promise_dir, promise_name)
self.called = False
with open(promise_path, 'w') as f:
f.write("""#!/bin/bash
sleep 20
echo "success"
""")
os.chmod(promise_path, 0744)

def test_method(result):
self.called = True
self.assertTrue(isinstance(result, PromiseQueueResult))
self.assertTrue(isinstance(result.item, TestResult))
self.assertTrue(result.execution_time >= 1)
self.assertEquals(result.title, promise_name)
self.assertEquals(result.name, promise_name)
self.assertEquals(result.path, promise_path)
self.assertTrue("Promise timed out after" in result.item.message)
self.assertEquals(result.item.hasFailed(), True)
self.assertTrue(isinstance(result.item.date, datetime))

self.configureLauncher(save_method=test_method, timeout=1)
# run promise will timeout
with self.assertRaises(PromiseError):
self.launcher.run()
self.assertTrue(self.called)

class TestSlapOSGenericPromise(TestSlapOSPromiseMixin):

def initialisePromise(self, promise_content="", success=True, timeout=60):
Expand Down

0 comments on commit e794067

Please sign in to comment.