diff --git a/pyrunner/core/config.py b/pyrunner/core/config.py index 0d3c57a..a53558a 100755 --- a/pyrunner/core/config.py +++ b/pyrunner/core/config.py @@ -19,7 +19,6 @@ from subprocess import Popen, PIPE from collections import deque - class Config: """ Captures framework-level configurations. diff --git a/pyrunner/core/engine.py b/pyrunner/core/engine.py index 6dd12a8..fbd6dc7 100755 --- a/pyrunner/core/engine.py +++ b/pyrunner/core/engine.py @@ -17,6 +17,7 @@ import pyrunner.core.constants as constants from pyrunner.core.config import Config from pyrunner.core.context import Context +from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE from multiprocessing import Manager import os, sys, glob @@ -45,9 +46,10 @@ def __init__(self): def initiate(self, **kwargs): """Begins the execution loop.""" + signal_handler = SignalHandler(self.config) sys.path.append(self.config['worker_dir']) self.start_time = time.time() - wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] > 0 else 0 + wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] >= 1 else 0 last_save = 0 ab_code = 0 @@ -56,6 +58,13 @@ def initiate(self, **kwargs): # Execution loop try: while self.register.running_nodes or self.register.pending_nodes: + sig_set = signal_handler.consume() + + # Check for abort signals + if SIG_ABORT in sig_set: + print('ABORT signal received! Terminating all running Workers.') + self._abort_all_workers() + return -1 # Poll running nodes for completion/failure for node in self.register.running_nodes.copy(): @@ -106,9 +115,8 @@ def initiate(self, **kwargs): except KeyboardInterrupt: print('\nKeyboard Interrupt Received') print('\nCancelling Execution') - for node in self.register.running_nodes: - node.terminate() - return + self._abort_all_workers() + return -1 if not kwargs.get('silent'): self._print_final_state(ab_code) @@ -118,6 +126,13 @@ def initiate(self, **kwargs): return len(self.register.failed_nodes) + def _abort_all_workers(self): + for node in self.register.running_nodes: + node.terminate() + #self.register.running_nodes.remove(node) + #self.register.aborted_nodes.add(node) + #self.register.set_children_defaulted(node) + def _print_current_state(self): elapsed = time.time() - self.start_time diff --git a/pyrunner/core/node.py b/pyrunner/core/node.py index 65c6946..9287667 100755 --- a/pyrunner/core/node.py +++ b/pyrunner/core/node.py @@ -86,10 +86,16 @@ def execute(self): try: worker_class = getattr(importlib.import_module(self.module), self.worker) + + # Check if provided worker actually extends the Worker class. if issubclass(worker_class, Worker): worker = worker_class(self.context, self._retcode, self.logfile, self.argv) + # If it does not extend the Worker class, initialize a reverse-Worker in which the + # worker extends the provided class. else: worker = self.generate_worker()(self.context, self._retcode, self.logfile, self.argv) + + # Launch the "run" method of the provided Worker under a new process. self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False) self._thread.start() except Exception as e: @@ -125,11 +131,15 @@ def poll(self, wait=False): logger.restart_message(self._attempts) self._retcode.value = -1 - return self.retcode if not running or wait else None + return self.retcode if (not running or wait) else None def terminate(self): if self._thread.is_alive(): self._thread.terminate() + logger = lg.FileLogger(self.logfile) + logger.open(False) + logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.") + logger.close() return diff --git a/pyrunner/core/pyrunner.py b/pyrunner/core/pyrunner.py index 6b3b839..f78835a 100755 --- a/pyrunner/core/pyrunner.py +++ b/pyrunner/core/pyrunner.py @@ -29,6 +29,7 @@ from pyrunner.core.engine import ExecutionEngine from pyrunner.core.config import Config from pyrunner.core.register import NodeRegister +from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE, SIG_PULSE from pyrunner.version import __version__ from datetime import datetime as datetime @@ -41,23 +42,25 @@ def __init__(self, **kwargs): self._environ = os.environ.copy() self.config = Config() self.notification = notification.EmailNotification() + self.signal_handler = SignalHandler(self.config) self.serde_obj = serde.ListSerDe() self.register = NodeRegister() self.engine = ExecutionEngine() self._init_params = { - 'config_file' : kwargs.get('config_file'), - 'proc_file' : kwargs.get('proc_file'), - 'restart' : kwargs.get('restart', False), - 'cvar_list' : [], - 'exec_proc_name' : None, - 'exec_only_list' : [], - 'exec_disable_list' : [], - 'exec_from_id' : None, - 'exec_to_id' : None + 'config_file' : kwargs.get('config_file'), + 'proc_file' : kwargs.get('proc_file'), + 'restart' : kwargs.get('restart', False), + 'cvar_list' : [], + 'exec_proc_name' : None, + 'exec_only_list' : [], + 'exec_disable_list' : [], + 'exec_from_id' : None, + 'exec_to_id' : None } + # Lifecycle hooks self._on_create_func = None self._on_start_func = None self._on_restart_func = None @@ -66,11 +69,23 @@ def __init__(self, **kwargs): self._on_destroy_func = None self.parse_args() + + if self.dup_proc_is_running(): + raise OSError('Another process for "{}" is already running!'.format(self.config['app_name'])) def reset_env(self): os.environ.clear() os.environ.update(self._environ) + def dup_proc_is_running(self): + self.signal_handler.emit(SIG_PULSE) + time.sleep(1.1) + if SIG_PULSE not in self.signal_handler.peek(): + print(self.signal_handler.peek()) + return True + else: + return False + def load_proc_file(self, proc_file, restart=False): if not proc_file or not os.path.isfile(proc_file): return False @@ -121,7 +136,7 @@ def plugin_notification(self, obj): if not isinstance(obj, notification.Notification): raise Exception('Notification plugin must implement the Notification interface') self.notification = obj - # App lifecycle hooks + # App lifecycle hooks/decorators def on_create(self, func): self._on_create_func = func def on_start(self, func): @@ -192,15 +207,15 @@ def run(self): emit_notification = True - # # App lifecycle - SUCCESS + # App lifecycle - SUCCESS if retcode == 0: if self._on_success_func: self._on_success_func() if not self.config['email_on_success']: print('Skipping Email Notification: Property "email_on_success" is set to FALSE.') emit_notification = False - # # App lifecycle - FAIL - else: + # App lifecycle - FAIL (<0 is for ABORT or other interrupt) + elif retcode > 0: if self._on_fail_func: self._on_fail_func() if not self.config['email_on_fail']: @@ -265,7 +280,12 @@ def zip_log_files(self, exit_status): try: - suffix = 'FAILURE' if exit_status else 'SUCCESS' + if exit_status == -1: + suffix = 'ABORT' + elif exit_status > 0: + suffix = 'FAILURE' + else: + suffix = 'SUCCESS' zip_file = "{}/{}_{}_{}.zip".format(self.config['log_dir'], self.config['app_name'], constants.EXECUTION_TIMESTAMP, suffix) print('Zipping Up Log Files to: {}'.format(zip_file)) @@ -356,11 +376,13 @@ def exec_from(self, id) : return self.register.exec_from(id) def exec_disable(self, id_list) : return self.register.exec_disable(id_list) def parse_args(self): + abort = False + opt_list = 'c:l:n:e:x:N:D:A:t:drhiv' longopt_list = [ - 'setup', 'help', 'nozip', 'interactive', + 'setup', 'help', 'nozip', 'interactive', 'abort', 'restart', 'version', 'dryrun', 'debug', - 'preserve-context', 'dump-logs', 'disable-exclusive-jobs', + 'preserve-context', 'dump-logs', 'allow-duplicate-jobs', 'email=', 'email-on-fail=', 'email-on-success=', 'ef=', 'es=', 'env=', 'cvar=', 'context=', 'to=', 'from=', 'descendants=', 'ancestors=', @@ -417,10 +439,12 @@ def parse_args(self): self.config['tickrate'] = int(arg) elif opt in ['--preserve-context']: self.preserve_context = True - elif opt in ['--disable-exclusive-jobs']: - self.disable_exclusive_jobs = True + elif opt in ['--allow-duplicate-jobs']: + self._init_params['allow_duplicate_jobs'] = True elif opt in ['--exec-proc-name']: self._init_params['exec_proc_name'] = arg + elif opt == '--abort': + abort = True elif opt in ['--serde']: if arg.lower() == 'json': self.plugin_serde(serde.JsonSerDe()) @@ -441,6 +465,11 @@ def parse_args(self): raise RuntimeError('Config file (app_profile) has not been provided') self.config.source_config_file(self._init_params['config_file']) + if abort: + print('Submitting ABORT signal to running job for: {}'.format(self.config['app_name'])) + self.signal_handler.emit(SIG_ABORT) + sys.exit(0) + # Check if restart is possible (ctllog/ctx files exist) if self._init_params['restart'] and not self.is_restartable(): self._init_params['restart'] = False diff --git a/pyrunner/core/signal.py b/pyrunner/core/signal.py new file mode 100644 index 0000000..f26725c --- /dev/null +++ b/pyrunner/core/signal.py @@ -0,0 +1,44 @@ +# Copyright 2019 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +SIG_ABORT = 'sig.abort' +SIG_PAUSE = 'sig.pause' +SIG_PULSE = 'sig.pulse' + +_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE) + +class SignalHandler: + + def __init__(self, config): + self.config = config + + def sig_file(self, sig): + return '{}/.{}.{}'.format(self.config['temp_dir'], self.config['app_name'], sig) + + def emit(self, sig): + if sig not in _valid_signals: return ValueError('Unknown signal type: {}'.format(sig)) + open(self.sig_file(sig), 'a').close() + + def consume(self): + sig_set = self.peek() + for sig in sig_set: + os.remove(self.sig_file(sig)) + return sig_set + + def peek(self): + return set([ s for s in _valid_signals if os.path.exists(self.sig_file(s)) ]) \ No newline at end of file diff --git a/pyrunner/logger/abstract.py b/pyrunner/logger/abstract.py index 9fee55b..caa616b 100755 --- a/pyrunner/logger/abstract.py +++ b/pyrunner/logger/abstract.py @@ -59,6 +59,13 @@ def error(self, text): """ self._emit_('ERROR', text) + def _system_(self, text): + """ + Write a generic SYSTEM level log message. + This is reserved for internal control messages. + """ + self._emit_('SYSTEM', text) + @abstractmethod def restart_message(self, restart_count): """ diff --git a/pyrunner/version.py b/pyrunner/version.py index a10ee5c..5ee6158 100755 --- a/pyrunner/version.py +++ b/pyrunner/version.py @@ -1 +1 @@ -__version__ = '4.2.5' +__version__ = '4.3.0' diff --git a/pyrunner/worker/abstract.py b/pyrunner/worker/abstract.py index fa5207d..f892682 100755 --- a/pyrunner/worker/abstract.py +++ b/pyrunner/worker/abstract.py @@ -57,8 +57,9 @@ def protected_run(self): methods, if defined. """ - sys.stdout = open(self.logfile, 'a') - sys.stderr = open(self.logfile, 'a') + if self.logfile: + sys.stdout = open(self.logfile, 'a') + sys.stderr = open(self.logfile, 'a') # RUN try: @@ -83,7 +84,7 @@ def protected_run(self): else: # ON FAIL try: - self.retcode = self.on_fail() or self.retcode + self.on_fail() or self.retcode except NotImplementedError: pass except Exception as e: diff --git a/setup.py b/setup.py index f84386b..ced4c0e 100755 --- a/setup.py +++ b/setup.py @@ -30,6 +30,6 @@ license = 'Apache 2.0', long_description = 'Python utility providing text-based workflow manager.', entry_points = { - 'console_scripts': ['pyrunner=pyrunner.cli:main', 'pyrunner-repo=pyrunner.cli:repo'] + 'console_scripts': ['pyrunner=pyrunner.cli:main'] } ) \ No newline at end of file