Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyrunner/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from subprocess import Popen, PIPE
from collections import deque


class Config:
"""
Captures framework-level configurations.
Expand Down
23 changes: 19 additions & 4 deletions pyrunner/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pyrunner.core.constants as constants
from pyrunner.core.config import Config
from pyrunner.core.context import Context
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE
from multiprocessing import Manager

import os, sys, glob
Expand Down Expand Up @@ -45,9 +46,10 @@ def __init__(self):
def initiate(self, **kwargs):
"""Begins the execution loop."""

signal_handler = SignalHandler(self.config)
sys.path.append(self.config['worker_dir'])
self.start_time = time.time()
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] > 0 else 0
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] >= 1 else 0
last_save = 0
ab_code = 0

Expand All @@ -56,6 +58,13 @@ def initiate(self, **kwargs):
# Execution loop
try:
while self.register.running_nodes or self.register.pending_nodes:
sig_set = signal_handler.consume()

# Check for abort signals
if SIG_ABORT in sig_set:
print('ABORT signal received! Terminating all running Workers.')
self._abort_all_workers()
return -1

# Poll running nodes for completion/failure
for node in self.register.running_nodes.copy():
Expand Down Expand Up @@ -106,9 +115,8 @@ def initiate(self, **kwargs):
except KeyboardInterrupt:
print('\nKeyboard Interrupt Received')
print('\nCancelling Execution')
for node in self.register.running_nodes:
node.terminate()
return
self._abort_all_workers()
return -1

if not kwargs.get('silent'):
self._print_final_state(ab_code)
Expand All @@ -118,6 +126,13 @@ def initiate(self, **kwargs):

return len(self.register.failed_nodes)

def _abort_all_workers(self):
for node in self.register.running_nodes:
node.terminate()
#self.register.running_nodes.remove(node)
#self.register.aborted_nodes.add(node)
#self.register.set_children_defaulted(node)

def _print_current_state(self):
elapsed = time.time() - self.start_time

Expand Down
12 changes: 11 additions & 1 deletion pyrunner/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,16 @@ def execute(self):

try:
worker_class = getattr(importlib.import_module(self.module), self.worker)

# Check if provided worker actually extends the Worker class.
if issubclass(worker_class, Worker):
worker = worker_class(self.context, self._retcode, self.logfile, self.argv)
# If it does not extend the Worker class, initialize a reverse-Worker in which the
# worker extends the provided class.
else:
worker = self.generate_worker()(self.context, self._retcode, self.logfile, self.argv)

# Launch the "run" method of the provided Worker under a new process.
self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False)
self._thread.start()
except Exception as e:
Expand Down Expand Up @@ -125,11 +131,15 @@ def poll(self, wait=False):
logger.restart_message(self._attempts)
self._retcode.value = -1

return self.retcode if not running or wait else None
return self.retcode if (not running or wait) else None

def terminate(self):
if self._thread.is_alive():
self._thread.terminate()
logger = lg.FileLogger(self.logfile)
logger.open(False)
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
logger.close()
return


Expand Down
65 changes: 47 additions & 18 deletions pyrunner/core/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pyrunner.core.engine import ExecutionEngine
from pyrunner.core.config import Config
from pyrunner.core.register import NodeRegister
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE, SIG_PULSE
from pyrunner.version import __version__

from datetime import datetime as datetime
Expand All @@ -41,23 +42,25 @@ def __init__(self, **kwargs):
self._environ = os.environ.copy()
self.config = Config()
self.notification = notification.EmailNotification()
self.signal_handler = SignalHandler(self.config)

self.serde_obj = serde.ListSerDe()
self.register = NodeRegister()
self.engine = ExecutionEngine()

self._init_params = {
'config_file' : kwargs.get('config_file'),
'proc_file' : kwargs.get('proc_file'),
'restart' : kwargs.get('restart', False),
'cvar_list' : [],
'exec_proc_name' : None,
'exec_only_list' : [],
'exec_disable_list' : [],
'exec_from_id' : None,
'exec_to_id' : None
'config_file' : kwargs.get('config_file'),
'proc_file' : kwargs.get('proc_file'),
'restart' : kwargs.get('restart', False),
'cvar_list' : [],
'exec_proc_name' : None,
'exec_only_list' : [],
'exec_disable_list' : [],
'exec_from_id' : None,
'exec_to_id' : None
}

# Lifecycle hooks
self._on_create_func = None
self._on_start_func = None
self._on_restart_func = None
Expand All @@ -66,11 +69,23 @@ def __init__(self, **kwargs):
self._on_destroy_func = None

self.parse_args()

if self.dup_proc_is_running():
raise OSError('Another process for "{}" is already running!'.format(self.config['app_name']))

def reset_env(self):
os.environ.clear()
os.environ.update(self._environ)

def dup_proc_is_running(self):
self.signal_handler.emit(SIG_PULSE)
time.sleep(1.1)
if SIG_PULSE not in self.signal_handler.peek():
print(self.signal_handler.peek())
return True
else:
return False

def load_proc_file(self, proc_file, restart=False):
if not proc_file or not os.path.isfile(proc_file):
return False
Expand Down Expand Up @@ -121,7 +136,7 @@ def plugin_notification(self, obj):
if not isinstance(obj, notification.Notification): raise Exception('Notification plugin must implement the Notification interface')
self.notification = obj

# App lifecycle hooks
# App lifecycle hooks/decorators
def on_create(self, func):
self._on_create_func = func
def on_start(self, func):
Expand Down Expand Up @@ -192,15 +207,15 @@ def run(self):

emit_notification = True

# # App lifecycle - SUCCESS
# App lifecycle - SUCCESS
if retcode == 0:
if self._on_success_func:
self._on_success_func()
if not self.config['email_on_success']:
print('Skipping Email Notification: Property "email_on_success" is set to FALSE.')
emit_notification = False
# # App lifecycle - FAIL
else:
# App lifecycle - FAIL (<0 is for ABORT or other interrupt)
elif retcode > 0:
if self._on_fail_func:
self._on_fail_func()
if not self.config['email_on_fail']:
Expand Down Expand Up @@ -265,7 +280,12 @@ def zip_log_files(self, exit_status):

try:

suffix = 'FAILURE' if exit_status else 'SUCCESS'
if exit_status == -1:
suffix = 'ABORT'
elif exit_status > 0:
suffix = 'FAILURE'
else:
suffix = 'SUCCESS'

zip_file = "{}/{}_{}_{}.zip".format(self.config['log_dir'], self.config['app_name'], constants.EXECUTION_TIMESTAMP, suffix)
print('Zipping Up Log Files to: {}'.format(zip_file))
Expand Down Expand Up @@ -356,11 +376,13 @@ def exec_from(self, id) : return self.register.exec_from(id)
def exec_disable(self, id_list) : return self.register.exec_disable(id_list)

def parse_args(self):
abort = False

opt_list = 'c:l:n:e:x:N:D:A:t:drhiv'
longopt_list = [
'setup', 'help', 'nozip', 'interactive',
'setup', 'help', 'nozip', 'interactive', 'abort',
'restart', 'version', 'dryrun', 'debug',
'preserve-context', 'dump-logs', 'disable-exclusive-jobs',
'preserve-context', 'dump-logs', 'allow-duplicate-jobs',
'email=', 'email-on-fail=', 'email-on-success=', 'ef=', 'es=',
'env=', 'cvar=', 'context=',
'to=', 'from=', 'descendants=', 'ancestors=',
Expand Down Expand Up @@ -417,10 +439,12 @@ def parse_args(self):
self.config['tickrate'] = int(arg)
elif opt in ['--preserve-context']:
self.preserve_context = True
elif opt in ['--disable-exclusive-jobs']:
self.disable_exclusive_jobs = True
elif opt in ['--allow-duplicate-jobs']:
self._init_params['allow_duplicate_jobs'] = True
elif opt in ['--exec-proc-name']:
self._init_params['exec_proc_name'] = arg
elif opt == '--abort':
abort = True
elif opt in ['--serde']:
if arg.lower() == 'json':
self.plugin_serde(serde.JsonSerDe())
Expand All @@ -441,6 +465,11 @@ def parse_args(self):
raise RuntimeError('Config file (app_profile) has not been provided')
self.config.source_config_file(self._init_params['config_file'])

if abort:
print('Submitting ABORT signal to running job for: {}'.format(self.config['app_name']))
self.signal_handler.emit(SIG_ABORT)
sys.exit(0)

# Check if restart is possible (ctllog/ctx files exist)
if self._init_params['restart'] and not self.is_restartable():
self._init_params['restart'] = False
Expand Down
44 changes: 44 additions & 0 deletions pyrunner/core/signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2019 Comcast Cable Communications Management, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0

import os

SIG_ABORT = 'sig.abort'
SIG_PAUSE = 'sig.pause'
SIG_PULSE = 'sig.pulse'

_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE)

class SignalHandler:

def __init__(self, config):
self.config = config

def sig_file(self, sig):
return '{}/.{}.{}'.format(self.config['temp_dir'], self.config['app_name'], sig)

def emit(self, sig):
if sig not in _valid_signals: return ValueError('Unknown signal type: {}'.format(sig))
open(self.sig_file(sig), 'a').close()

def consume(self):
sig_set = self.peek()
for sig in sig_set:
os.remove(self.sig_file(sig))
return sig_set

def peek(self):
return set([ s for s in _valid_signals if os.path.exists(self.sig_file(s)) ])
7 changes: 7 additions & 0 deletions pyrunner/logger/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ def error(self, text):
"""
self._emit_('ERROR', text)

def _system_(self, text):
"""
Write a generic SYSTEM level log message.
This is reserved for internal control messages.
"""
self._emit_('SYSTEM', text)

@abstractmethod
def restart_message(self, restart_count):
"""
Expand Down
2 changes: 1 addition & 1 deletion pyrunner/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '4.2.5'
__version__ = '4.3.0'
7 changes: 4 additions & 3 deletions pyrunner/worker/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ def protected_run(self):
methods, if defined.
"""

sys.stdout = open(self.logfile, 'a')
sys.stderr = open(self.logfile, 'a')
if self.logfile:
sys.stdout = open(self.logfile, 'a')
sys.stderr = open(self.logfile, 'a')

# RUN
try:
Expand All @@ -83,7 +84,7 @@ def protected_run(self):
else:
# ON FAIL
try:
self.retcode = self.on_fail() or self.retcode
self.on_fail() or self.retcode
except NotImplementedError:
pass
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@
license = 'Apache 2.0',
long_description = 'Python utility providing text-based workflow manager.',
entry_points = {
'console_scripts': ['pyrunner=pyrunner.cli:main', 'pyrunner-repo=pyrunner.cli:repo']
'console_scripts': ['pyrunner=pyrunner.cli:main']
}
)