From 8c90319ef65045ddd100bc957af3f30c718b5e16 Mon Sep 17 00:00:00 2001 From: "Andre B. Oliveira" Date: Tue, 15 Oct 2013 18:37:04 -0300 Subject: [PATCH 1/2] Add traceback_format instead of Exception object. - Improves the way broker shows the exceptions. --- pypelinin/broker.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pypelinin/broker.py b/pypelinin/broker.py index 3e1b373..04cd925 100755 --- a/pypelinin/broker.py +++ b/pypelinin/broker.py @@ -8,7 +8,7 @@ from signal import SIGKILL from . import Client from .monitoring import (get_host_info, get_outgoing_ip, get_process_info) - +from traceback import format_exc as traceback_format def worker_wrapper(pipe, workers_module_name): #TODO: should receive the document or database's configuration? @@ -44,7 +44,8 @@ def worker_wrapper(pipe, workers_module_name): try: result = workers[worker_name].process(data) except Exception as e: - result = {'_error': True, '_exception': e} + result = {'_error': True, + '_traceback': traceback_format(e)} #TODO: handle this on broker finally: pipe.send(result) @@ -216,8 +217,8 @@ def save_monitoring_information(self): except Exception as e: #TODO: what to do? self.logger.error('Could not save monitoring information into ' - 'store with parameters: {}. Exception: {}'\ - .format(data, e)) + 'store with parameters: {}. Traceback: {}'\ + .format(data, traceback_format(e))) return self.last_time_saved_monitoring_information = time() self.logger.info('Saved monitoring information') @@ -250,8 +251,8 @@ def start_job(self, job_description): except Exception as e: #TODO: what to do? self.logger.error('Could not retrieve data from store ' - 'with parameters: {}. Exception: {}'\ - .format(info, e)) + 'with parameters: {}. Traceback: {}'\ + .format(info, traceback_format(e))) return job_info = {'worker': worker, 'worker_input': worker_input, @@ -328,8 +329,9 @@ def check_if_some_job_finished_and_do_what_you_need_to(self): #TODO: what to do? self.logger.error('Could not save data into store ' 'with parameters: ' - '{}. Exception: {}'\ - .format(job_information, e)) + '{}. Traceback: {}'\ + .format(job_information, + traceback_format(e))) return except ValueError: self.request({'command': 'job failed', From 7baed259d5ec4ffd25f8337134db0b4b15e46f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lvaro=20Justen=20aka=20Turicas?= Date: Wed, 16 Oct 2013 13:29:31 -0300 Subject: [PATCH 2/2] Enhances broker log messages and levels --- pypelinin/broker.py | 67 +++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/pypelinin/broker.py b/pypelinin/broker.py index 04cd925..c616af7 100755 --- a/pypelinin/broker.py +++ b/pypelinin/broker.py @@ -4,11 +4,13 @@ from importlib import import_module from multiprocessing import Process, Pipe, cpu_count from os import kill, getpid -from time import sleep, time from signal import SIGKILL +from time import sleep, time +from traceback import format_exc as traceback_format + from . import Client from .monitoring import (get_host_info, get_outgoing_ip, get_process_info) -from traceback import format_exc as traceback_format + def worker_wrapper(pipe, workers_module_name): #TODO: should receive the document or database's configuration? @@ -44,9 +46,7 @@ def worker_wrapper(pipe, workers_module_name): try: result = workers[worker_name].process(data) except Exception as e: - result = {'_error': True, - '_traceback': traceback_format(e)} - #TODO: handle this on broker + result = {'_traceback': traceback_format(e)} finally: pipe.send(result) except KeyboardInterrupt: @@ -217,12 +217,16 @@ def save_monitoring_information(self): except Exception as e: #TODO: what to do? self.logger.error('Could not save monitoring information into ' - 'store with parameters: {}. Traceback: {}'\ - .format(data, traceback_format(e))) + 'store with parameters: {}. Traceback: ' + .format(data)) + self.logger.error('-' * 40) + for line in traceback_format(e).split('\n'): + self.logger.error(line) + self.logger.error('-' * 40) return self.last_time_saved_monitoring_information = time() self.logger.info('Saved monitoring information') - self.logger.debug(' Information: {}'.format(data)) + self.logger.debug(' Information saved: {}'.format(data)) def start(self): try: @@ -251,8 +255,12 @@ def start_job(self, job_description): except Exception as e: #TODO: what to do? self.logger.error('Could not retrieve data from store ' - 'with parameters: {}. Traceback: {}'\ - .format(info, traceback_format(e))) + 'with parameters: {}. Traceback:'\ + .format(info)) + self.logger.error('-' * 40) + for line in traceback_format(e).split('\n'): + self.logger.error(line) + self.logger.error('-' * 40) return job_info = {'worker': worker, 'worker_input': worker_input, @@ -260,11 +268,13 @@ def start_job(self, job_description): 'job id': job_description['job id'], 'worker_requires': worker_requires,} self.worker_pool.start_job(job_info) - self.logger.debug('Started job: worker="{}", data="{}"'\ - .format(worker, job_description['data'])) + self.logger.info('Started job: id={}, worker={}' + .format(job_info['job id'], job_info['worker'])) + self.logger.debug(' Job data: {}'.format(job_info['data'])) def get_a_job(self): - self.logger.debug('Available workers: {}'.format(self.worker_pool.available())) + self.logger.info('Available workers: {}' + .format(self.worker_pool.available())) for i in range(self.worker_pool.available()): self.request({'command': 'get job'}) message = self.get_reply() @@ -273,12 +283,14 @@ def get_a_job(self): break # Don't have a job, stop asking elif 'worker' in message and 'data' in message: if message['worker'] not in self.available_workers: - self.logger.info('Ignoring job (inexistent worker): {}'.format(message)) + self.logger.warning('Ignoring job (inexistent worker): {}' + .format(message)) #TODO: send a 'rejecting job' request to router else: self.start_job(message) else: - self.logger.info('Ignoring malformed job: {}'.format(message)) + self.logger.warning('Ignoring malformed job: {}' + .format(message)) #TODO: send a 'rejecting job' request to router def router_has_job(self): @@ -310,8 +322,10 @@ def check_if_some_job_finished_and_do_what_you_need_to(self): result = worker.get_result() end_time = time() self.logger.info('Job finished: id={}, worker={}, ' - 'data={}, start time={}, result={}'.format(job_id, - worker_name, job_data, start_time, result)) + 'start time={}'.format(job_id, + worker_name, start_time)) + self.logger.debug('job id={}, data={}, result={}'.format(job_id, + job_data, result)) job_information = { 'worker': worker_name, @@ -319,6 +333,16 @@ def check_if_some_job_finished_and_do_what_you_need_to(self): 'worker_result': result, 'data': job_data, } + + if '_traceback' in result: + self.logger.error('Exception raised on worker execution: ' + 'worker={}, job id={}' + .format(worker_name, job_id)) + self.logger.error('-' * 40) + for line in result['_traceback'].split('\n'): + self.logger.error(line) + self.logger.error('-' * 40) + try: #TODO: what if I want to the caller to receive job information # as a "return" from a function call? Should use a store? @@ -329,9 +353,12 @@ def check_if_some_job_finished_and_do_what_you_need_to(self): #TODO: what to do? self.logger.error('Could not save data into store ' 'with parameters: ' - '{}. Traceback: {}'\ - .format(job_information, - traceback_format(e))) + '{}. Traceback:'\ + .format(job_information)) + self.logger.error('-' * 40) + for line in traceback_format(e).split('\n'): + self.logger.error(line) + self.logger.error('-' * 40) return except ValueError: self.request({'command': 'job failed',