diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 95dce13..6579af2 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py @@ -222,7 +222,6 @@ def handle_response(self, resp): """ - logger.debug(resp) pid = resp['pid'] callback = getattr(self, resp['callback']) @@ -282,17 +281,35 @@ def on_request(self, msgid, msg): self.request_queue.put(payload) def premature_death(self, reply, msgid): + """ + Worker died before running any jobs + """ return def worker_death(self, reply, msgid): - return + """ + Worker died of natural causes, if we're actively running, + tell the broker a slot opened up + """ + if self.status == STATUS.running: + self.send_ready() def worker_done_with_reply(self, reply, msgid): - reply = serializer(reply) + """ + Worker finished a job and requested the return value + """ + try: + reply = serializer(reply) + except TypeError as e: + reply = {"value": str(e)} + self.send_reply(reply, msgid) self.send_ready() - def worker_done(self, msgid): + def worker_done(self, reply, msgid): + """ + Worker finished a job, notify broker of an additional slot opening + """ self.send_ready() def send_ready(self): diff --git a/eventmq/worker.py b/eventmq/worker.py index 82a110c..e413f1d 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -107,13 +107,16 @@ def run(self): import zmq zmq.Context.instance().term() + callback = 'premature_death' + # Main execution loop, only break in cases that we can't recover from # or we reach job count limit while True: try: - payload = self.input_queue.get(block=False, timeout=1000) + payload = self.input_queue.get(timeout=1) if payload == 'DONE': break + except Queue.Empty: if os.getppid() != self.ppid: break @@ -121,6 +124,7 @@ def run(self): except Exception as e: break finally: + # If I'm an orphan, die if os.getppid() != self.ppid: break @@ -140,6 +144,7 @@ def run(self): if worker_thread.isAlive(): worker_thread.stop() + # TODO: this should actually kill the process return_val = 'TimeoutError' try: diff --git a/setup.py b/setup.py index ae0e828..7f20f22 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='eventmq', - version='0.3.4', + version='0.3.4.1', description='EventMQ job execution and messaging system based on ZeroMQ', packages=find_packages(), install_requires=['pyzmq==15.4.0',