Skip to content

Commit

Permalink
Fixes a bunch of worker logic, add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Apr 6, 2017
1 parent 492d16a commit 08d3125
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
25 changes: 21 additions & 4 deletions eventmq/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def handle_response(self, resp):
"""

logger.debug(resp)
pid = resp['pid']

callback = getattr(self, resp['callback'])
Expand Down Expand Up @@ -282,17 +281,35 @@ def on_request(self, msgid, msg):
self.request_queue.put(payload)

def premature_death(self, reply, msgid):
"""
Worker died before running any jobs
"""
return

def worker_death(self, reply, msgid):
return
"""
Worker died of natural causes, if we're actively running,
tell the broker a slot opened up
"""
if self.status == STATUS.running:
self.send_ready()

def worker_done_with_reply(self, reply, msgid):
reply = serializer(reply)
"""
Worker finished a job and requested the return value
"""
try:
reply = serializer(reply)
except TypeError as e:
reply = {"value": str(e)}

self.send_reply(reply, msgid)
self.send_ready()

def worker_done(self, msgid):
def worker_done(self, reply, msgid):
"""
Worker finished a job, notify broker of an additional slot opening
"""
self.send_ready()

def send_ready(self):
Expand Down
7 changes: 6 additions & 1 deletion eventmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,24 @@ def run(self):
import zmq
zmq.Context.instance().term()

callback = 'premature_death'

# Main execution loop, only break in cases that we can't recover from
# or we reach job count limit
while True:
try:
payload = self.input_queue.get(block=False, timeout=1000)
payload = self.input_queue.get(timeout=1)
if payload == 'DONE':
break

except Queue.Empty:
if os.getppid() != self.ppid:
break
continue
except Exception as e:
break
finally:
# If I'm an orphan, die
if os.getppid() != self.ppid:
break

Expand All @@ -140,6 +144,7 @@ def run(self):

if worker_thread.isAlive():
worker_thread.stop()
# TODO: this should actually kill the process
return_val = 'TimeoutError'

try:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='eventmq',
version='0.3.4',
version='0.3.4.1',
description='EventMQ job execution and messaging system based on ZeroMQ',
packages=find_packages(),
install_requires=['pyzmq==15.4.0',
Expand Down

0 comments on commit 08d3125

Please sign in to comment.