diff --git a/booltest/booltest_json.py b/booltest/booltest_json.py index 3b4d3577..b156445a 100644 --- a/booltest/booltest_json.py +++ b/booltest/booltest_json.py @@ -145,20 +145,21 @@ def eacirc_generator(self, tmpdir, generator_path, config_js): new_generator_path = os.path.join(tmpdir, 'generator') shutil.copy(generator_path, new_generator_path) + cfg_file = os.path.join(tmpdir, 'generator.json') config_str = json.dumps(config_js, indent=2) - with open(os.path.join(tmpdir, 'generator.json'), 'w') as fh: + with open(cfg_file, 'w') as fh: fh.write(config_str) - # Generate some data here - - p = subprocess.Popen(new_generator_path, shell=True, cwd=tmpdir) + tmp_out = os.path.join(tmpdir, 'stdout') + cmd = "%s -c=%s > %s" % (new_generator_path, cfg_file, tmp_out) + p = subprocess.Popen(cmd, shell=True, cwd=tmpdir) p.communicate() if p.returncode != 0: logger.error('Could not generate data, code: %s' % p.returncode) return None # Generated file: - data_files = [f for f in os.listdir(tmpdir) if os.path.isfile(os.path.join(tmpdir, f)) + data_files = [f for f in (os.listdir(tmpdir) + ['stdout']) if os.path.isfile(os.path.join(tmpdir, f)) and f.endswith('bin')] if len(data_files) != 1: logger.error('Error in generating data to process. Files found: %s' % data_files) diff --git a/booltest/booltest_main.py b/booltest/booltest_main.py index e4c122ad..dfca5385 100644 --- a/booltest/booltest_main.py +++ b/booltest/booltest_main.py @@ -1076,12 +1076,16 @@ def adjust_tvsize(self, tvsize, size, coffset=0): if tvsize < 0: raise ValueError('Negative TV size: %s' % tvsize) - coef = 8 if not self.do_halving else 4 - if (tvsize * coef) % self.blocklen != 0: - rem = (tvsize * coef) % self.blocklen + # multiplier when not byte-aligned to make it byte-aligned on tvsize. + align_blocklen = self.blocklen if not self.do_halving else 2 * self.blocklen + align_mutl = common.comp_byte_align_multiplier(align_blocklen, 8 if not self.do_halving else 16) + aligned_blocklen = align_blocklen * align_mutl + + if (tvsize * 8) % aligned_blocklen != 0: logger.warning('Input data size not aligned to the block size. ' - 'Input bytes: %d, block bits: %d, rem: %d' % (tvsize, self.blocklen, rem)) - tvsize -= rem // coef + 'Input bytes: %d, block bits: %d, align_mutl: %d, align_block: %s, halving: %s' + % (tvsize, self.blocklen, align_mutl, aligned_blocklen, self.do_halving)) + tvsize = (((tvsize * 8) // aligned_blocklen) * aligned_blocklen) // 8 logger.info('Updating TV to %d' % tvsize) return int(tvsize) @@ -1225,7 +1229,6 @@ def work(self): jscres['offset'] = coffset tvsize = self.adjust_tvsize(tvsize, size, coffset) - logger.warning("FUCK") self.hwanalysis = self.setup_hwanalysis(self.deg, self.top_comb, self.top_k, self.all_deg, zscore_thresh) if self.hwanalysis.ref_db_path: logger.info('Using reference data file %s' % self.hwanalysis.ref_db_path) @@ -1324,8 +1327,8 @@ def analyze_iobj(self, iobj, coffset=0, tvsize=None, jscres=None): break if (len(data) * 8 % self.hwanalysis.blocklen) != 0: - logger.info('Not aligned block read, terminating. Data left: %s bits, block size: %s bits' - % (len(data) * 8, self.hwanalysis.blocklen)) + logger.info('Not aligned block read, terminating. Data left: %s bits, block size: %s bits, tv %s' + % (len(data) * 8, self.hwanalysis.blocklen, tvsize)) break with self.timer_data_bins: @@ -1488,7 +1491,7 @@ def argparser(self): help='Probability the given combination is going to be chosen. ' 'Enables stochastic test, useful for large degrees.') - parser.add_argument('--default-params', dest='default_params', action='store_const', const=True, default=False, + parser.add_argument('--default-params', dest='default_params', type=int, default=1, help='Default parameter settings for testing, used in the paper') parser.add_argument('files', nargs=argparse.ZERO_OR_MORE, default=[], diff --git a/booltest/common.py b/booltest/common.py index 06be30df..a83bafc7 100644 --- a/booltest/common.py +++ b/booltest/common.py @@ -638,6 +638,21 @@ def generate_seed(iteration=0): return binascii.hexlify(seed) +def gcd(x, y): + while y: + x, y = y, x % y + return x + + +def comp_byte_align_multiplier(bit_size, bit_align=8): + """ + Computes multiplier X such that bit_size * X % 8 == 0 + X is in [1, 8], 8 is for sure, we can make it smaller + X = 8 / gcd(bit_size, 8) + """ + return bit_align / gcd(bit_size, bit_align) + + # Re-exports, compatibility diff --git a/booltest/job_server.py b/booltest/job_server.py index 7f489523..e04ab860 100644 --- a/booltest/job_server.py +++ b/booltest/job_server.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -#import zmq import argparse import coloredlogs import logging @@ -58,14 +57,14 @@ def __init__(self): self.is_running = True self.job_ctr = 0 self.job_src_files = [] - self.job_entries = {} # type: Dict[str,JobEntry] + self.job_entries = {} # type: Dict[str, JobEntry] self.job_queue = [] # type: List[str] self.preloaded_jobs = [] # type: List[JobEntry] self.failed_jobs = [] # Mapping worker_id -> job_id - self.worker_map = {} # type: Dict[str,str] - self.workers = {} # type: Dict[str,WorkerEntry] + self.worker_map = {} # type: Dict[str, Optional[str]] + self.workers = {} # type: Dict[str, WorkerEntry] self.db_lock = asyncio.Lock() self.db_lock_t = threading.Lock() self.input_lock_t = threading.Lock() @@ -73,7 +72,7 @@ def __init__(self): self.thread_loader = None self.key = None - def job_get(self, worker_id=None): + def job_get(self, worker_id=None) -> Optional[JobEntry]: self.check_job_queue() # TODO: has to be done in async way... if len(self.job_queue) == 0: return None @@ -103,7 +102,7 @@ def on_job_fail(self, jb: JobEntry, timeout=False): def on_job_success(self, jb: JobEntry): jb.unit = None - def on_job_alloc(self, jb, worker_id): + def on_job_alloc(self, jb: Optional[JobEntry], worker_id) -> Optional[JobEntry]: if not jb: return # If worker had another job, finish it now. It failed probably @@ -116,7 +115,7 @@ def on_job_alloc(self, jb, worker_id): self.worker_map[worker_id] = jb.uuid return jb - def on_job_finished(self, uid, worker_id, jmsg): + def on_job_finished(self, uid, worker_id, jmsg: Optional[Dict]): self.worker_map[worker_id] = None jb = self.job_entries[uid] if jmsg and 'ret_code' in jmsg: @@ -140,7 +139,7 @@ def on_worker_ping(self, worker_id): self.workers[worker_id] = WorkerEntry(worker_id) self.workers[worker_id].last_ping = time.time() - def check_auth(self, msg): + def check_auth(self, msg: Dict): if not self.key: return True @@ -239,7 +238,7 @@ def run_loader(self): except Exception as e: logger.warning("Exception in loader: %s" % (e,), exc_info=e) - def buid_resp_job(self, jb): + def buid_resp_job(self, jb: Optional[JobEntry]) -> Dict: if jb is None: return {'res': None} return {'res': jb.unit} @@ -258,7 +257,7 @@ async def on_ws_msg(self, websocket, path): resp_js = json.dumps(resp) await websocket.send(resp_js) - async def on_msg(self, message): + async def on_msg(self, message) -> Dict[str, Any]: try: jmsg = json.loads(message) if 'action' not in jmsg: diff --git a/booltest/runner.py b/booltest/runner.py index 0f3fc66e..7735a435 100644 --- a/booltest/runner.py +++ b/booltest/runner.py @@ -9,6 +9,7 @@ import time import sys import os +import subprocess from shlex import quote import shellescape @@ -20,10 +21,10 @@ def try_fnc(fnc): try: - fnc() + return fnc() except: pass - + class SargeLogFilter(logging.Filter): """Filters out debugging logs generated by sarge - output capture. It is way too verbose for debug""" @@ -49,7 +50,7 @@ def filter(self, record): return 1 except Exception as e: - logger.error("Exception in log filtering: %s" % (e,)) + logger.error("Exception in log filtering: %s" % (e,), exc_info=e) return 1 @@ -100,6 +101,22 @@ def escape_shell(inp): quote(inp) +def win_ctrlc(tm=0): + try: + import ctypes + except: + logger.warning("Inspect/ctypes import failed") + return + + try: + ctypes.windll.kernel32.GenerateConsoleCtrlEvent(0, 0) + time.sleep(tm) + except KeyboardInterrupt: + pass + except: + pass + + class AsyncRunner: def __init__(self, cmd, args=None, stdout=None, stderr=None, cwd=None, shell=True, env=None): self.cmd = cmd @@ -115,10 +132,19 @@ def __init__(self, cmd, args=None, stdout=None, stderr=None, cwd=None, shell=Tru self.cwd = cwd self.shell = shell self.env = env + + self.create_new_group = None self.preexec_setgrp = False + self.preexec_setsid = False + self.win_create_process_group = False self.using_stdout_cap = True self.using_stderr_cap = True + self.do_drain_streams = True + self.do_not_block_runner_thread_on_termination = False + self.force_runner_thread_termination = False + self.try_terminate_children_for_shell = False + self.ret_code = None self.out_acc = [] self.err_acc = [] @@ -130,15 +156,24 @@ def __init__(self, cmd, args=None, stdout=None, stderr=None, cwd=None, shell=Tru self.was_running = False self.terminating = False self.thread = None + self.p = None + self.terminate_timeout = 0.5 + self.signal_timeout = 0.5 + self.terminate_ctrlc_timeout = 0.5 + self.is_win = sys.platform.startswith('win') def run(self): try: self.run_internal() except Exception as e: self.is_running = False - logger.error("Unexpected exception in runner: %s" % (e,)) + logger.error("Unexpected exception in runner: %s" % (e,), exc_info=e) finally: self.was_running = True + logger.debug("Runner thread finished") + + if self.force_runner_thread_termination: + raise SystemError("Terminate runner") def __del__(self): self.deinit() @@ -149,6 +184,9 @@ def deinit(self): if not self.proc: return + if self.do_not_block_runner_thread_on_termination or self.force_runner_thread_termination: + return + if self.using_stdout_cap: try_fnc(lambda: self.proc.stdout.close()) @@ -170,6 +208,10 @@ def run_internal(self): def preexec_function(): os.setpgrp() + def preexec_setsid(): + logger.debug("setsid called") + os.setsid() + cmd = self.cmd if self.shell: args_str = ( @@ -195,8 +237,20 @@ def preexec_function(): logger.debug("Starting command %s in %s" % (cmd, self.cwd)) run_args = {} + if self.create_new_group: + if self.is_win: + self.win_create_process_group = True + else: + self.preexec_setsid = True + if self.preexec_setgrp: run_args['preexec_fn'] = preexec_function + if self.preexec_setsid: + run_args['preexec_fn'] = preexec_setsid + + # https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6 + if self.win_create_process_group: + run_args['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP p = run( cmd, @@ -210,6 +264,7 @@ def preexec_function(): **run_args ) + self.p = p self.time_start = time.time() self.proc = p self.ret_code = 1 @@ -262,7 +317,7 @@ def add_output(buffers, is_err=False, finish=False): while len(p.commands) == 0: time.sleep(0.15) - logger.debug("Program started, progs: %s" % len(p.commands)) + logger.debug("Program started, progs: %s, pid: %s" % (len(p.commands), self.get_pid())) if p.commands[0] is None: self.is_running = False self.was_running = True @@ -288,10 +343,7 @@ def add_output(buffers, is_err=False, finish=False): p.commands[0].poll() if self.terminating and p.commands[0].returncode is None: - logger.debug("Terminating by sigint %s" % p.commands[0]) - sarge_sigint(p.commands[0], signal.SIGTERM) - sarge_sigint(p.commands[0], signal.SIGINT) - logger.debug("Sigint sent") + self.send_term_signals() logger.debug("Process closed") # If there is data, consume it right away. @@ -299,15 +351,32 @@ def add_output(buffers, is_err=False, finish=False): continue time.sleep(0.15) - logger.debug("Runner while ended") - p.wait() + try_fnc(lambda: p.commands[0].poll()) self.ret_code = p.commands[0].returncode if p.commands[0] else -1 - if self.using_stdout_cap: + logger.debug("Runner while-loop ended, retcode: %s" % (p.commands[0].returncode,)) + if self.do_not_block_runner_thread_on_termination: + logger.debug("Not blocking runner thread on termination. Finishing, some output may be lost") + self.was_running = True + self.is_running = False + return + + if self.force_runner_thread_termination: + self.was_running = True + self.is_running = False + return + + logger.debug("Waiting for process to complete") + p.wait() + + self.ret_code = p.commands[0].returncode if p.commands[0] else -1 + if self.do_drain_streams and self.using_stdout_cap: + logger.debug("Draining stdout stream") try_fnc(lambda: p.stdout.close()) add_output(self.drain_stream(p.stdout, True), finish=True) - if self.using_stderr_cap: + if self.do_drain_streams and self.using_stderr_cap: + logger.debug("Draining stderr stream") try_fnc(lambda: p.stderr.close()) add_output(self.drain_stream(p.stderr, True), is_err=True, finish=True) @@ -330,39 +399,252 @@ def add_output(buffers, is_err=False, finish=False): self.was_running = True self.time_elapsed = time.time() - self.time_start try_fnc(lambda: self.feeder.close()) - try_fnc(lambda: self.proc.close()) + + if not self.do_not_block_runner_thread_on_termination: + try_fnc(lambda: self.proc.close()) if self.on_finished: self.on_finished(self) + def test_is_running(self): + try_fnc(lambda: self.p.commands[0].poll()) + return self.is_running and self.p.commands[0] and self.p.commands[0].returncode is None + + def sleep_if_running(self, tm): + stime = time.time() + while time.time() - stime < tm: + if not self.test_is_running(): + return False + time.sleep(0.2) + return True + + def send_term_signals(self): + p = self.p + pid = self.get_pid() + logger.debug("Terminating by sigint %s, PID: %s" % (p.commands[0], pid)) + + test_is_running = self.test_is_running + sleep_if_running = self.sleep_if_running + + if not test_is_running(): return + + # PGid works only on POSIX + if self.preexec_setsid and pid is not None: + pgid = os.getpgid(pid) + logger.debug("Terminating process group %s for process %s" % (pgid, pid)) + logger.debug("Sending pg SIGINT") + try_fnc(lambda: os.killpg(pgid, signal.SIGINT)) + sleep_if_running(self.terminate_ctrlc_timeout) + + if not test_is_running(): return + logger.debug("Sending pg SIGTERM") + try_fnc(lambda: os.killpg(pgid, signal.SIGTERM)) + sleep_if_running(self.terminate_timeout) + if not test_is_running(): return + + logger.debug("Sending pg SIGKILL") + try_fnc(lambda: os.killpg(pgid, signal.SIGKILL)) + sleep_if_running(self.signal_timeout) + if not test_is_running(): return + + if self.is_win: + cmd = "tasklist /fi \"pid eq %s\"" % pid + logger.debug("Retrieving process info on the process %s" % (pid,)) + subprocess.run(cmd, shell=True) + time.sleep(self.signal_timeout) + + if self.is_win: + if self.shell and self.try_terminate_children_for_shell: + logger.debug("Experimental: sending CTRL+C to children. " + "May cause interruption of all processes running in the console") + self._win_terminate_children(pid) + + # Windows - process has to be process leader, otherwise this sends signal to everyone + # Thus do this only if win && is process group leader + if self.win_create_process_group: + logger.debug("Trying to invoke CTRL+C (win) in process group") + try_fnc(lambda: os.kill(pid, signal.CTRL_C_EVENT)) + try_fnc(lambda: p.commands[0].process.send_signal(signal.CTRL_C_EVENT)) + sleep_if_running(self.terminate_ctrlc_timeout) + + cmd = "Taskkill /PID %s /F /T" % pid + logger.debug("Closing process with taskkill: %s" % (cmd,)) + subprocess.run(cmd, shell=True) + time.sleep(self.terminate_timeout) + + logger.debug("Sending win SIGTERM") + try_fnc(lambda: os.kill(pid, signal.SIGTERM)) + sleep_if_running(self.terminate_timeout) + + logger.debug("Sending win SIGKILL") + try_fnc(lambda: os.kill(pid, signal.SIGKILL)) + sleep_if_running(self.terminate_timeout) + + try_fnc(lambda: p.commands[0].terminate()) + time.sleep(self.signal_timeout) + try_fnc(lambda: p.commands[0].kill()) + time.sleep(self.signal_timeout) + return + + # Posix process termination + logger.debug("Sending SIGINT") + try_fnc(lambda: sarge_sigint(p.commands[0], signal.SIGINT)) + sleep_if_running(self.terminate_ctrlc_timeout) + + logger.debug("Sending SIGTERM") + try_fnc(lambda: p.commands[0].terminate()) + sleep_if_running(self.terminate_timeout) + if not test_is_running(): return + + logger.debug("Sending SIGHUP") + try_fnc(lambda: sarge_sigint(p.commands[0], signal.SIGHUP)) + sleep_if_running(self.signal_timeout) + if not test_is_running(): return + + logger.debug("Sending SIGTERM") + try_fnc(lambda: sarge_sigint(p.commands[0], signal.SIGTERM)) + sleep_if_running(self.signal_timeout) + if not test_is_running(): return + + logger.debug("Sending SIGKILL") + try_fnc(lambda: p.commands[0].kill()) + try_fnc(lambda: sarge_sigint(p.commands[0], signal.SIGKILL)) + + def _win_get_children(self, pid): + cmd = "wmic process where (ParentProcessId=%s) get ProcessId" % pid + logger.debug("Obtaining child processes for %s: %s" % (pid, cmd,)) + r = subprocess.run(cmd, shell=True, check=True, text=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) + lines = [x.strip() for x in r.stdout.splitlines()[1:] if x.strip()] + return [z for z in [try_fnc(lambda: int(y)) for y in lines] if z] + + def _win_terminate_children(self, pid): + """ + Tries to send CTRL+C signal to the child process - useful when command is executed with shell=True. + On Windows, CTRL+C signal is not transmitted to the child process from cmd.exe (apparently). + + This solution does not work with `create_new_group`, from some reason, sending CTRL+C event does not work + to new sessions - or at least we did not observe CTRL+C event in our java process. + + On the other hand - calling CTRL+C to children process in this method also causes interrupt event + in the main python code (caller of the shutdown()). From this reason all sleeps has to be guarded with + KeyboardInterrupt checking. This indicates we cannot just send CTRL+C to a child process selectively + but that it is broadcasted to the whole session. -> kills all other running tasks by sending them CTRL+C + + Useful explanation: + - send_signal(CTRL_C_EVENT) does not work because CTRL_C_EVENT is only for os.kill. [REF1] + - os.kill(CTRL_C_EVENT) sends the signal to all processes running in the current cmd window [REF2] + - Popen(..., creationflags=CREATE_NEW_PROCESS_GROUP) does not work because CTRL_C_EVENT is ignored for + process groups [REF2]. This is a bug in the python documentation [REF3]. + + [REF1]: http://docs.python.org/library/signal.html#signal.CTRL_C_EVENT + [REF2]: http://msdn.microsoft.com/en-us/library/windows/desktop/ms683155%28v=vs.85%29.aspx + [REF3]: http://docs.python.org/library/subprocess.html#subprocess.Popen.send_signal + + Proposed workaround: + - Let your program run in a different cmd window with the Windows shell command start. + - Add a CTRL-C request wrapper between your control application and the application which should get the + CTRL-C signal. The wrapper will run in the same cmd window as the application which should get the + CTRL-C signal. + - The wrapper will shutdown itself and the program which should get the CTRL-C signal by sending all + processes in the cmd window the CTRL_C_EVENT. + - The control program should be able to request the wrapper to send the CTRL-C signal. This might be + implemented trough IPC means, e.g. sockets. + + Overall, it is quite pain to make implement graceful shutdown of child processes by sending CTRL+C signal on + Windows. We explored several combinations of settings, none of which enabled sending targeted CTRL+C signal. + We resorted to calling taskkill with killing all child processes (/T). Without this we were not able to + terminate child process (when shell=True) is used, wrappers were hanging on sarge process closing, + processes stayed after python finished, console was uninterruptable and so on. + src: + - https://stackoverflow.com/questions/7085604/sending-c-to-python-subprocess-objects-on-windows + - https://stackoverflow.com/questions/44124338/trying-to-implement-signal-ctrl-c-event-in-python3-6/44128151 + """ + try: + children = self._win_get_children(pid) + logger.debug("Children processes of %s: %s" % (pid, children)) + if len(children) == 0: + return + + for cpid in children: + logger.debug("Trying to invoke CTRL+C (win) for %s (parent %s)" % (cpid, pid)) + try: + try_fnc(lambda: os.kill(cpid, signal.CTRL_C_EVENT)) + time.sleep(0.1) + except KeyboardInterrupt: + logger.debug("Keyboard interrupt _win_terminate_children") + + self.sleep_if_running(self.terminate_ctrlc_timeout) + for cpid in children: + logger.debug("Sending win SIGTERM for %s (parent %s)" % (cpid, pid)) + try_fnc(lambda: os.kill(cpid, signal.SIGTERM)) + + self.sleep_if_running(self.terminate_timeout) + + except Exception as e: + logger.debug("Child process termination failed: %s" % (e,)) + def on_change(self): pass - def shutdown(self): + def get_pid(self): + try: + return self.p.commands[0].process.pid + except: + return None + + def wait(self, timeout=None, require_ok=False): + tstart = time.time() + while self.is_running: + if timeout is not None and time.time() - tstart > timeout: + raise Exception("Timeout") + try: + time.sleep(0.1) + except KeyboardInterrupt: + logger.debug("Keyboard interrupt wait()") + + if require_ok and self.ret_code != 0: + raise Exception("Return code is not zero: %s" % self.ret_code) + + def shutdown(self, timeout=None): if not self.is_running: return - self.terminating = True - time.sleep(1) + try: + self.terminating = True + time.sleep(1) + except KeyboardInterrupt: + logger.debug("Shutdown keyboard interrupt") # Terminating with sigint logger.debug("Waiting for program to terminate...") + tstart = time.time() while self.is_running: - time.sleep(0.1) + if timeout is not None and time.time() - tstart > timeout: + raise Exception("Timeout") + try: + time.sleep(0.1) + except KeyboardInterrupt: + logger.debug("Shutdown Keyboard interrupt loop") + logger.debug("Program terminated") self.deinit() - def start(self, wait_running=True): + def start(self, wait_running=True, timeout=None): install_sarge_filter() self.thread = threading.Thread(target=self.run, args=()) self.thread.setDaemon(False) - self.thread.start() + self.terminating = False + self.is_running = False + self.thread.start() + if not wait_running: self.is_running = True return - self.is_running = False + tstart = time.time() while not self.is_running and not self.was_running: + if timeout is not None and time.time() - tstart > timeout: + raise Exception("Timeout") time.sleep(0.1) return self diff --git a/setup.py b/setup.py index a59c0d1c..25b5a5df 100644 --- a/setup.py +++ b/setup.py @@ -3,12 +3,12 @@ from setuptools import setup from setuptools import find_packages -version = '0.6.7' +version = '0.7.5' # Please update tox.ini when modifying dependency version requirements install_requires = [ 'pycryptodome', # pycrypto alternative, working also on Win - 'bitarray_ph4>=1.2.3', + 'bitarray_ph4>=1.6.4', 'scipy', 'ufx', 'repoze.lru', @@ -33,6 +33,10 @@ 'numpy', ] +network = [ + 'websockets', +] + dev_extras = [ 'nose', 'pep8', @@ -86,6 +90,7 @@ 'dev': dev_extras, 'docs': docs_extras, 'impl': impl_extras, + 'net': network, }, entry_points={ 'console_scripts': [