diff --git a/bin/send_msg b/bin/send_msg index 652010c..47de45f 100755 --- a/bin/send_msg +++ b/bin/send_msg @@ -19,22 +19,23 @@ if __name__ == "__main__": msg = ['run', { 'path': 'eventmq.tests.test_jobmanager', - 'callable': 'pretend_job', + 'callable': 'work_job', 'class_args': ('blurp',), 'class_kwargs': {'kwarg1': True}, - 'args': (10, ), + 'args': (50, ), 'kwargs': {} }] - msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=200) - print 'Sent message, use msgid={} to track responses'.format(msgid) - events = dict(poller.poll(500)) - if events[s.zsocket] == zmq.POLLIN: - msg = s.recv_multipart() - print msg + msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10) + msgid = send_request(s, msg, guarantee=True, reply_requested=True) + # print 'Sent message, use msgid={} to track responses'.format(msgid) + # events = dict(poller.poll(500)) + # if events[s.zsocket] == zmq.POLLIN: + # msg = s.recv_multipart() + # print msg - # Wait for job reply - events = dict(poller.poll(50000)) - if events[s.zsocket] == zmq.POLLIN: - msg = s.recv_multipart() - print msg + # # Wait for job reply + # events = dict(poller.poll(50000)) + # if events[s.zsocket] == zmq.POLLIN: + # msg = s.recv_multipart() + # print msg diff --git a/eventmq/conf.py b/eventmq/conf.py index 4ad17f7..5710933 100644 --- a/eventmq/conf.py +++ b/eventmq/conf.py @@ -92,4 +92,8 @@ SETUP_PATH = '' SETUP_CALLABLE = '' +# Time to wait after receiving SIGTERM to kill the workers in the jobmanager +# forecfully +KILL_GRACE_PERIOD = 300 + # }}} diff --git a/eventmq/jobmanager.py b/eventmq/jobmanager.py index 94e8ee1..1c43736 100644 --- a/eventmq/jobmanager.py +++ b/eventmq/jobmanager.py @@ -113,7 +113,8 @@ def workers(self): if not hasattr(self, '_workers'): self._workers = {} for i in range(0, conf.CONCURRENT_JOBS): - w = Worker(self.request_queue, self.finished_queue) + w = Worker(self.request_queue, self.finished_queue, + os.getpid()) w.start() self._workers[w.pid] = w @@ -155,15 +156,18 @@ def _start_event_loop(self): not self.should_reset: if len(self._workers) > 0: time.sleep(0.1) - elif len(self._workers) == 0: + else: sys.exit(0) - elif monotonic() - self.disconnect_time > 500: + + if monotonic() > self.disconnect_time + \ + conf.KILL_GRACE_PERIOD: + logger.debug("Killing unresponsive workers") for pid in self._workers.keys(): self.kill_worker(pid, signal.SIGKILL) - sys.ext(0) + sys.exit(0) else: try: - events = self.poller.poll() + events = self.poller.poll(10) except zmq.ZMQError: logger.debug('Disconnecting due to ZMQError while' 'polling') @@ -191,6 +195,32 @@ def _start_event_loop(self): logger.exception("Unhandled exception in main jobmanager loop") def handle_response(self, resp): + """ + Handles a response from a worker process to the jobmanager + + Args: + resp (dict): Must contain a key 'callback' with the desired callback + function as a string, i.e. 'worker_done' which is then called + + Sample Input + resp = { + 'callback': 'worker_done', (str) + 'msgid': 'some_uuid', (str) + 'return': 'return value', (dict) + 'pid': 'pid_of_worker_process' (int) + } + + return_value must be a dictionary that can be json serialized and + formatted like: + + { + "value": 'return value of job goes here' + } + + if the 'return' value of resp is 'DEATH', the worker died so we clean + that up as well + + """ logger.debug(resp) pid = resp['pid'] @@ -257,11 +287,6 @@ def premature_death(self, reply, msgid): def worker_death(self, reply, msgid): return - def worker_death_with_reply(self, reply, msgid): - reply = serializer(reply) - self.send_reply(reply, msgid) - self.send_ready() - def worker_done_with_reply(self, reply, msgid): reply = serializer(reply) self.send_reply(reply, msgid) @@ -316,7 +341,7 @@ def check_worker_health(self): .format(conf.CONCURRENT_JOBS - len(self.workers))) for i in range(0, conf.CONCURRENT_JOBS - len(self.workers)): - w = Worker(self.request_queue, self.finished_queue) + w = Worker(self.request_queue, self.finished_queue, os.getpid()) w.start() self._workers[w.pid] = w diff --git a/eventmq/tests/test_jobmanager.py b/eventmq/tests/test_jobmanager.py index cefe771..927e5f6 100644 --- a/eventmq/tests/test_jobmanager.py +++ b/eventmq/tests/test_jobmanager.py @@ -174,6 +174,17 @@ def pretend_job(t): return "I slept for {} seconds".format(t) +def work_job(t): + import time + + begin_time = time.time() + + while time.time() < begin_time + t: + a = 1+1 + + return a + + def test_setup(): import time assert time diff --git a/eventmq/worker.py b/eventmq/worker.py index 44db71d..4984c06 100644 --- a/eventmq/worker.py +++ b/eventmq/worker.py @@ -30,7 +30,6 @@ from . import conf - if sys.version[0] == '2': import Queue else: @@ -58,23 +57,19 @@ def run(self): self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) - def join(self, timeout=None): - Thread.join(self, timeout=timeout) - return {'value': self._return} - class MultiprocessWorker(Process): """ Defines a worker that spans the job in a multiprocessing task """ - def __init__(self, input_queue, output_queue, run_setup=True): + def __init__(self, input_queue, output_queue, ppid, run_setup=True): super(MultiprocessWorker, self).__init__() self.input_queue = input_queue self.output_queue = output_queue self.job_count = 0 self.run_setup = run_setup - self.ppid = os.getppid() + self.ppid = ppid @property def logger(self): @@ -98,8 +93,7 @@ def run(self): conf.SETUP_PATH, conf.SETUP_CALLABLE, os.getpid())) - run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE, - self.logger) + run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE) except Exception as e: self.logger.warning('Unable to do setup task ({}.{}): {}' .format(conf.SETUP_PATH, @@ -112,62 +106,54 @@ def run(self): while True: try: - payload = self.input_queue.get_nowait() + payload = self.input_queue.get(block=False, timeout=1000) if payload == 'DONE': break except Queue.Empty: + if os.getppid() != self.ppid: + break continue except Exception as e: - break + break finally: - if os.getppid() == 1: + if os.getppid() != self.ppid: break try: return_val = 'None' - self.logger.debug("Job started") self.job_count += 1 timeout = payload.get("timeout", None) msgid = payload.get('msgid', '') + callback = payload.get('callback', '') + + worker_thread = StoppableThread(target=_run, + args=(payload['params'], + self.logger)) + worker_thread.start() + worker_thread.join(timeout) + return_val = {"value": worker_thread._return} - if timeout: - worker_thread = StoppableThread(target=_run, - args=(payload['params'], - self.logger)) - worker_thread.start() - return_val = worker_thread.join(timeout) - - if worker_thread.isAlive(): - worker_thread.stop() - self.output_queue.put({ - {'msgid': msgid, - 'return': 'TimeoutError', - 'pid': os.getpid(), - 'callback': payload['callback']} - }) - break - - else: - return_val = _run(payload['params']) + if worker_thread.isAlive(): + worker_thread.stop() + return_val = 'TimeoutError' + + try: + self.output_queue.put_nowait( + {'msgid': msgid, + 'return': return_val, + 'pid': os.getpid(), + 'callback': callback} + ) + except Exception: + break except Exception as e: return_val = str(e) - if self.job_count >= conf.MAX_JOB_COUNT \ - or return_val == 'TimeoutError': - death_callback = 'worker_death_with_reply' \ - if 'reply' in payload['callback'] else \ - 'worker_death' + if self.job_count >= conf.MAX_JOB_COUNT: + death_callback = 'worker_death' break - else: - self.output_queue.put( - {'msgid': msgid, - 'return': return_val, - 'pid': os.getpid(), - 'callback': payload['callback']} - ) - self.output_queue.put( {'msgid': None, 'return': 'DEATH', @@ -181,46 +167,46 @@ def _run(payload, logger): """ Takes care of actually executing the code given a message payload """ - if ":" in payload["path"]: - _pkgsplit = payload["path"].split(':') - s_package = _pkgsplit[0] - s_cls = _pkgsplit[1] - else: - s_package = payload["path"] - s_cls = None + try: + if ":" in payload["path"]: + _pkgsplit = payload["path"].split(':') + s_package = _pkgsplit[0] + s_cls = _pkgsplit[1] + else: + s_package = payload["path"] + s_cls = None - s_callable = payload["callable"] + s_callable = payload["callable"] - package = import_module(s_package) - if s_cls: - cls = getattr(package, s_cls) + package = import_module(s_package) + if s_cls: + cls = getattr(package, s_cls) - if "class_args" in payload: - class_args = payload["class_args"] - else: - class_args = () + if "class_args" in payload: + class_args = payload["class_args"] + else: + class_args = () - if "class_kwargs" in payload: - class_kwargs = payload["class_kwargs"] - else: - class_kwargs = {} + if "class_kwargs" in payload: + class_kwargs = payload["class_kwargs"] + else: + class_kwargs = {} - obj = cls(*class_args, **class_kwargs) - callable_ = getattr(obj, s_callable) - else: - callable_ = getattr(package, s_callable) + obj = cls(*class_args, **class_kwargs) + callable_ = getattr(obj, s_callable) + else: + callable_ = getattr(package, s_callable) - if "args" in payload: - args = payload["args"] - else: - args = () + if "args" in payload: + args = payload["args"] + else: + args = () - if "kwargs" in payload: - kwargs = payload["kwargs"] - else: - kwargs = {} + if "kwargs" in payload: + kwargs = payload["kwargs"] + else: + kwargs = {} - try: return_val = callable_(*args, **kwargs) except Exception as e: logger.exception(e)