Skip to content

Commit

Permalink
Merge branch 'fix/exception' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Álvaro Justen aka Turicas committed Oct 16, 2013
2 parents 4a250ac + 7baed25 commit a51b7c8
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions pypelinin/broker.py
Expand Up @@ -4,8 +4,10 @@
from importlib import import_module
from multiprocessing import Process, Pipe, cpu_count
from os import kill, getpid
from time import sleep, time
from signal import SIGKILL
from time import sleep, time
from traceback import format_exc as traceback_format

from . import Client
from .monitoring import (get_host_info, get_outgoing_ip, get_process_info)

Expand Down Expand Up @@ -44,8 +46,7 @@ def worker_wrapper(pipe, workers_module_name):
try:
result = workers[worker_name].process(data)
except Exception as e:
result = {'_error': True, '_exception': e}
#TODO: handle this on broker
result = {'_traceback': traceback_format(e)}
finally:
pipe.send(result)
except KeyboardInterrupt:
Expand Down Expand Up @@ -216,12 +217,16 @@ def save_monitoring_information(self):
except Exception as e:
#TODO: what to do?
self.logger.error('Could not save monitoring information into '
'store with parameters: {}. Exception: {}'\
.format(data, e))
'store with parameters: {}. Traceback: '
.format(data))
self.logger.error('-' * 40)
for line in traceback_format(e).split('\n'):
self.logger.error(line)
self.logger.error('-' * 40)
return
self.last_time_saved_monitoring_information = time()
self.logger.info('Saved monitoring information')
self.logger.debug(' Information: {}'.format(data))
self.logger.debug(' Information saved: {}'.format(data))

def start(self):
try:
Expand Down Expand Up @@ -250,20 +255,26 @@ def start_job(self, job_description):
except Exception as e:
#TODO: what to do?
self.logger.error('Could not retrieve data from store '
'with parameters: {}. Exception: {}'\
.format(info, e))
'with parameters: {}. Traceback:'\
.format(info))
self.logger.error('-' * 40)
for line in traceback_format(e).split('\n'):
self.logger.error(line)
self.logger.error('-' * 40)
return
job_info = {'worker': worker,
'worker_input': worker_input,
'data': job_description['data'],
'job id': job_description['job id'],
'worker_requires': worker_requires,}
self.worker_pool.start_job(job_info)
self.logger.debug('Started job: worker="{}", data="{}"'\
.format(worker, job_description['data']))
self.logger.info('Started job: id={}, worker={}'
.format(job_info['job id'], job_info['worker']))
self.logger.debug(' Job data: {}'.format(job_info['data']))

def get_a_job(self):
self.logger.debug('Available workers: {}'.format(self.worker_pool.available()))
self.logger.info('Available workers: {}'
.format(self.worker_pool.available()))
for i in range(self.worker_pool.available()):
self.request({'command': 'get job'})
message = self.get_reply()
Expand All @@ -272,12 +283,14 @@ def get_a_job(self):
break # Don't have a job, stop asking
elif 'worker' in message and 'data' in message:
if message['worker'] not in self.available_workers:
self.logger.info('Ignoring job (inexistent worker): {}'.format(message))
self.logger.warning('Ignoring job (inexistent worker): {}'
.format(message))
#TODO: send a 'rejecting job' request to router
else:
self.start_job(message)
else:
self.logger.info('Ignoring malformed job: {}'.format(message))
self.logger.warning('Ignoring malformed job: {}'
.format(message))
#TODO: send a 'rejecting job' request to router

def router_has_job(self):
Expand Down Expand Up @@ -309,15 +322,27 @@ def check_if_some_job_finished_and_do_what_you_need_to(self):
result = worker.get_result()
end_time = time()
self.logger.info('Job finished: id={}, worker={}, '
'data={}, start time={}, result={}'.format(job_id,
worker_name, job_data, start_time, result))
'start time={}'.format(job_id,
worker_name, start_time))
self.logger.debug('job id={}, data={}, result={}'.format(job_id,
job_data, result))

job_information = {
'worker': worker_name,
'worker_requires': worker_requires,
'worker_result': result,
'data': job_data,
}

if '_traceback' in result:
self.logger.error('Exception raised on worker execution: '
'worker={}, job id={}'
.format(worker_name, job_id))
self.logger.error('-' * 40)
for line in result['_traceback'].split('\n'):
self.logger.error(line)
self.logger.error('-' * 40)

try:
#TODO: what if I want to the caller to receive job information
# as a "return" from a function call? Should use a store?
Expand All @@ -328,8 +353,12 @@ def check_if_some_job_finished_and_do_what_you_need_to(self):
#TODO: what to do?
self.logger.error('Could not save data into store '
'with parameters: '
'{}. Exception: {}'\
.format(job_information, e))
'{}. Traceback:'\
.format(job_information))
self.logger.error('-' * 40)
for line in traceback_format(e).split('\n'):
self.logger.error(line)
self.logger.error('-' * 40)
return
except ValueError:
self.request({'command': 'job failed',
Expand Down

0 comments on commit a51b7c8

Please sign in to comment.