Skip to content
Merged

v5 #11

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# python-batch-runner
[![Documentation Status](https://readthedocs.org/projects/python-batch-runner/badge/?version=latest)](https://python-batch-runner.readthedocs.io/en/latest/?badge=latest)

For more complete documentation, please see: https://python-batch-runner.readthedocs.io/

python-batch-runner is a microframework to assist with building small to medium scale batch applications without needing to build the scaffolding from scratch.
Expand Down
2 changes: 1 addition & 1 deletion pyrunner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def setup():
app_profile.write('export APP_EXEC_TIMESTAMP=$(date +"%Y%m%d_%H%M%S")\n\n')
app_profile.write('export APP_LOG_DIR="${APP_ROOT_LOG_DIR}/${DATE}"\n\n')
app_profile.write('if [ ! -e ${APP_LOG_DIR} ]; then mkdir -p ${APP_LOG_DIR}; fi\n')
app_profile.write('if [ ! -e ${APP_TEMP_DIR} ]; then mkdir ${APP_TEMP_DIR}; fi\n')
app_profile.write('if [ ! -e ${APP_TEMP_DIR} ]; then mkdir -p ${APP_TEMP_DIR}; fi\n')

print('Creating Blank Process List File: {}/config/{}.lst'.format(app_root, app_name))
with open('{}/config/{}.lst'.format(app_root, app_name), 'w') as lst_file:
Expand Down
4 changes: 0 additions & 4 deletions pyrunner/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@

EXECUTION_TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")

MODE_SHELL = 'SHELL'
MODE_PYTHON = 'PYTHON'

HEADER_SHELL = '#{}\n#ID|PARENT_IDS|MAX_ATTEMPTS|RETRY_WAIT_TIME|PROCESS_NAME|SHELL_COMMAND|LOGFILE'.format(MODE_SHELL)
HEADER_PYTHON = '#{}\n#ID|PARENT_IDS|MAX_ATTEMPTS|RETRY_WAIT_TIME|PROCESS_NAME|MODULE_NAME|WORKER_NAME|ARGUMENTS|LOGFILE'.format(MODE_PYTHON)

ROOT_NODE_NAME = 'PyRunnerRootNode'
Expand All @@ -40,7 +37,6 @@

import os, sys
from pyrunner import PyRunner
from pathlib import Path

# Determine absolute path of this file's parent directory at runtime
abs_dir_path = os.path.dirname(os.path.realpath(__file__))
Expand Down
61 changes: 32 additions & 29 deletions pyrunner/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def initiate(self, **kwargs):
self.start_time = time.time()
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] >= 1 else 0
last_save = 0
ab_code = 0

if not self.register: raise RuntimeError('NodeRegister has not been initialized!')

Expand Down Expand Up @@ -120,7 +119,7 @@ def initiate(self, **kwargs):
if p.id >= 0 and p not in self.register.completed_nodes.union(self.register.norun_nodes):
runnable = False
break
if runnable:
if runnable and node.is_runnable():
self.register.pending_nodes.remove(node)
node.context = self.context
node.execute()
Expand All @@ -147,7 +146,6 @@ def initiate(self, **kwargs):
print('\nKeyboard Interrupt Received')
print('\nCancelling Execution')
self._abort_all_workers()
self.save_state_func(False, True)
return -1

# App lifecycle - SUCCESS
Expand All @@ -164,7 +162,7 @@ def initiate(self, **kwargs):
self._on_destroy_func()

if not kwargs.get('silent'):
self._print_final_state(ab_code)
self._print_final_state()

if not self.config['test_mode'] and self.save_state_func:
self.save_state_func()
Expand All @@ -173,10 +171,12 @@ def initiate(self, **kwargs):

def _abort_all_workers(self):
for node in self.register.running_nodes.copy():
node.terminate()
node.terminate('Keyboard Interrupt (SIGINT) received. Terminating Worker and exiting.')
self.register.running_nodes.remove(node)
self.register.aborted_nodes.add(node)
self.register.set_children_defaulted(node)
self.save_state_func(False, True)
self._print_final_state(True)

def _print_current_state(self):
elapsed = time.time() - self.start_time
Expand Down Expand Up @@ -205,39 +205,42 @@ def _print_current_state(self):

return

def _print_final_state(self, ab_code=0):
def _print_final_state(self, aborted=False):
print('\nCompleted in {:0.2f} seconds\n'.format(time.time() - self.start_time))

if ab_code > 0:
if aborted:
print('Final Status: ABORTED\n')
print('Aborted Processes:\n')

for n in self.register.aborted_nodes:
self._print_node_info(n, self.config['dump_logs'])

elif len(self.register.failed_nodes) + len(self.register.defaulted_nodes):
print('Final Status: FAILURE\n')
print('Failed Processes:\n')

for n in self.register.failed_nodes:
print('ID: {}'.format(n.id))
print('Name: {}'.format(n.name))
print('Module: {}'.format(n.module))
print('Worker: {}'.format(n.worker))
print('Arguments: {}'.format(n.arguments))
print('Log File: {}\n'.format(n.logfile))

if self.config['dump_logs']:
print('DUMPING FAILURE LOGS\n')

for n in self.register.failed_nodes:
print('############################################################################')
print('# ID: {}'.format(n.id))
print('# Name: {}'.format(n.name))
print('# Module: {}'.format(n.module))
print('# Worker: {}'.format(n.worker))
print('# Arguments: {}'.format(n.arguments))
print('# Log File: {}'.format(n.logfile))
with open(n.logfile, 'r') as f:
for line in f:
print(line, end='')
self._print_node_info(n, self.config['dump_logs'])

else:
print('Final Status: SUCCESS\n')

return
return

def _print_node_info(self, n, dump_logs=False):
if dump_logs:
print('############################################################################')

print('# ID: {}'.format(n.id))
print('# Name: {}'.format(n.name))
print('# Module: {}'.format(n.module))
print('# Worker: {}'.format(n.worker))
print('# Arguments: {}'.format(n.arguments))
print('# Log File: {}'.format(n.logfile))

if dump_logs:
with open(n.logfile, 'r') as f:
for line in f:
print(line, end='')

print('')
Loading