Skip to content

Commit

Permalink
"Add reply handling in router"
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Apr 3, 2017
1 parent ab8434c commit 9195e05
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 103 deletions.
27 changes: 14 additions & 13 deletions bin/send_msg
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ if __name__ == "__main__":

msg = ['run', {
'path': 'eventmq.tests.test_jobmanager',
'callable': 'pretend_job',
'callable': 'work_job',
'class_args': ('blurp',),
'class_kwargs': {'kwarg1': True},
'args': (10, ),
'args': (50, ),
'kwargs': {}
}]

msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=200)
print 'Sent message, use msgid={} to track responses'.format(msgid)
events = dict(poller.poll(500))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10)
msgid = send_request(s, msg, guarantee=True, reply_requested=True)
# print 'Sent message, use msgid={} to track responses'.format(msgid)
# events = dict(poller.poll(500))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg

# Wait for job reply
events = dict(poller.poll(50000))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
# # Wait for job reply
# events = dict(poller.poll(50000))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg
4 changes: 4 additions & 0 deletions eventmq/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@
SETUP_PATH = ''
SETUP_CALLABLE = ''

# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
# forecfully
KILL_GRACE_PERIOD = 300

# }}}
14 changes: 6 additions & 8 deletions eventmq/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,16 @@ def _start_event_loop(self):
time.sleep(0.1)
elif len(self._workers) == 0:
sys.exit(0)
elif monotonic() - self.disconnect_time > 500:

if monotonic() > self.disconnect_time + \
conf.KILL_GRACE_PERIOD:
logger.debug("Killing unresponsive workers")
for pid in self._workers.keys():
self.kill_worker(pid, signal.SIGKILL)
sys.ext(0)
sys.exit(0)
else:
try:
events = self.poller.poll()
events = self.poller.poll(10)
except zmq.ZMQError:
logger.debug('Disconnecting due to ZMQError while'
'polling')
Expand Down Expand Up @@ -257,11 +260,6 @@ def premature_death(self, reply, msgid):
def worker_death(self, reply, msgid):
return

def worker_death_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
self.send_ready()

def worker_done_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
Expand Down
11 changes: 11 additions & 0 deletions eventmq/tests/test_jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def pretend_job(t):
return "I slept for {} seconds".format(t)


def work_job(t):
import time

begin_time = time.time()

while time.time() < begin_time + t:
a = 1+1

return a


def test_setup():
import time
assert time
141 changes: 59 additions & 82 deletions eventmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@
from . import conf


if sys.version[0] == '2':
import Queue
else:
import queue as Queue


class StoppableThread(Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
Expand All @@ -58,10 +52,6 @@ def run(self):
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)

def join(self, timeout=None):
Thread.join(self, timeout=timeout)
return {'value': self._return}


class MultiprocessWorker(Process):
"""
Expand Down Expand Up @@ -98,8 +88,7 @@ def run(self):
conf.SETUP_PATH,
conf.SETUP_CALLABLE,
os.getpid()))
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE,
self.logger)
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
except Exception as e:
self.logger.warning('Unable to do setup task ({}.{}): {}'
.format(conf.SETUP_PATH,
Expand All @@ -112,11 +101,9 @@ def run(self):

while True:
try:
payload = self.input_queue.get_nowait()
payload = self.input_queue.get()
if payload == 'DONE':
break
except Queue.Empty:
continue
except Exception as e:
break
finally:
Expand All @@ -125,50 +112,40 @@ def run(self):

try:
return_val = 'None'
self.logger.debug("Job started")
self.job_count += 1
timeout = payload.get("timeout", None)
msgid = payload.get('msgid', '')
callback = payload.get('callback', '')

if timeout:
worker_thread = StoppableThread(target=_run,
args=(payload['params'],
self.logger))
worker_thread.start()
return_val = worker_thread.join(timeout)

if worker_thread.isAlive():
worker_thread.stop()
self.output_queue.put({
{'msgid': msgid,
'return': 'TimeoutError',
'pid': os.getpid(),
'callback': payload['callback']}
})
break

else:
return_val = _run(payload['params'])

except Exception as e:
return_val = str(e)
worker_thread = StoppableThread(target=_run,
args=(payload['params'],
self.logger))
worker_thread.start()
worker_thread.join(timeout)
return_val = {"value": worker_thread._return}

if self.job_count >= conf.MAX_JOB_COUNT \
or return_val == 'TimeoutError':
death_callback = 'worker_death_with_reply' \
if 'reply' in payload['callback'] else \
'worker_death'
break
if worker_thread.isAlive():
worker_thread.stop()
return_val = 'TimeoutError'

else:
self.output_queue.put(
self.output_queue.put_nowait(
{'msgid': msgid,
'return': return_val,
'pid': os.getpid(),
'callback': payload['callback']}
'return': return_val,
'pid': os.getpid(),
'callback': callback}
)

self.output_queue.put(
if self.job_count >= conf.MAX_JOB_COUNT:
death_callback = 'worker_death'
break

except Exception as e:
return_val = str(e)
finally:
if os.getppid() == 1:
break

self.output_queue.put_nowait(
{'msgid': None,
'return': 'DEATH',
'pid': os.getpid(),
Expand All @@ -181,46 +158,46 @@ def _run(payload, logger):
"""
Takes care of actually executing the code given a message payload
"""
if ":" in payload["path"]:
_pkgsplit = payload["path"].split(':')
s_package = _pkgsplit[0]
s_cls = _pkgsplit[1]
else:
s_package = payload["path"]
s_cls = None
try:
if ":" in payload["path"]:
_pkgsplit = payload["path"].split(':')
s_package = _pkgsplit[0]
s_cls = _pkgsplit[1]
else:
s_package = payload["path"]
s_cls = None

s_callable = payload["callable"]
s_callable = payload["callable"]

package = import_module(s_package)
if s_cls:
cls = getattr(package, s_cls)
package = import_module(s_package)
if s_cls:
cls = getattr(package, s_cls)

if "class_args" in payload:
class_args = payload["class_args"]
else:
class_args = ()
if "class_args" in payload:
class_args = payload["class_args"]
else:
class_args = ()

if "class_kwargs" in payload:
class_kwargs = payload["class_kwargs"]
else:
class_kwargs = {}
if "class_kwargs" in payload:
class_kwargs = payload["class_kwargs"]
else:
class_kwargs = {}

obj = cls(*class_args, **class_kwargs)
callable_ = getattr(obj, s_callable)
else:
callable_ = getattr(package, s_callable)
obj = cls(*class_args, **class_kwargs)
callable_ = getattr(obj, s_callable)
else:
callable_ = getattr(package, s_callable)

if "args" in payload:
args = payload["args"]
else:
args = ()
if "args" in payload:
args = payload["args"]
else:
args = ()

if "kwargs" in payload:
kwargs = payload["kwargs"]
else:
kwargs = {}
if "kwargs" in payload:
kwargs = payload["kwargs"]
else:
kwargs = {}

try:
return_val = callable_(*args, **kwargs)
except Exception as e:
logger.exception(e)
Expand Down

0 comments on commit 9195e05

Please sign in to comment.