Skip to content

Commit

Permalink
Finish better worker PR
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Apr 3, 2017
1 parent ab8434c commit 5cd9a1d
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 101 deletions.
27 changes: 14 additions & 13 deletions bin/send_msg
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ if __name__ == "__main__":

msg = ['run', {
'path': 'eventmq.tests.test_jobmanager',
'callable': 'pretend_job',
'callable': 'work_job',
'class_args': ('blurp',),
'class_kwargs': {'kwarg1': True},
'args': (10, ),
'args': (50, ),
'kwargs': {}
}]

msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=200)
print 'Sent message, use msgid={} to track responses'.format(msgid)
events = dict(poller.poll(500))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10)
msgid = send_request(s, msg, guarantee=True, reply_requested=True)
# print 'Sent message, use msgid={} to track responses'.format(msgid)
# events = dict(poller.poll(500))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg

# Wait for job reply
events = dict(poller.poll(50000))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
# # Wait for job reply
# events = dict(poller.poll(50000))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg
4 changes: 4 additions & 0 deletions eventmq/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@
SETUP_PATH = ''
SETUP_CALLABLE = ''

# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
# forecfully
KILL_GRACE_PERIOD = 300

# }}}
47 changes: 36 additions & 11 deletions eventmq/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def workers(self):
if not hasattr(self, '_workers'):
self._workers = {}
for i in range(0, conf.CONCURRENT_JOBS):
w = Worker(self.request_queue, self.finished_queue)
w = Worker(self.request_queue, self.finished_queue,
os.getpid())
w.start()
self._workers[w.pid] = w

Expand Down Expand Up @@ -155,15 +156,18 @@ def _start_event_loop(self):
not self.should_reset:
if len(self._workers) > 0:
time.sleep(0.1)
elif len(self._workers) == 0:
else:
sys.exit(0)
elif monotonic() - self.disconnect_time > 500:

if monotonic() > self.disconnect_time + \
conf.KILL_GRACE_PERIOD:
logger.debug("Killing unresponsive workers")
for pid in self._workers.keys():
self.kill_worker(pid, signal.SIGKILL)
sys.ext(0)
sys.exit(0)
else:
try:
events = self.poller.poll()
events = self.poller.poll(10)
except zmq.ZMQError:
logger.debug('Disconnecting due to ZMQError while'
'polling')
Expand Down Expand Up @@ -191,6 +195,32 @@ def _start_event_loop(self):
logger.exception("Unhandled exception in main jobmanager loop")

def handle_response(self, resp):
"""
Handles a response from a worker process to the jobmanager
Args:
resp (dict): Must contain a key 'callback' with the desired callback
function as a string, i.e. 'worker_done' which is then called
Sample Input
resp = {
'callback': 'worker_done', (str)
'msgid': 'some_uuid', (str)
'return': 'return value', (dict)
'pid': 'pid_of_worker_process' (int)
}
return_value must be a dictionary that can be json serialized and
formatted like:
{
"value": 'return value of job goes here'
}
if the 'return' value of resp is 'DEATH', the worker died so we clean
that up as well
"""

logger.debug(resp)
pid = resp['pid']
Expand Down Expand Up @@ -257,11 +287,6 @@ def premature_death(self, reply, msgid):
def worker_death(self, reply, msgid):
return

def worker_death_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
self.send_ready()

def worker_done_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
Expand Down Expand Up @@ -316,7 +341,7 @@ def check_worker_health(self):
.format(conf.CONCURRENT_JOBS - len(self.workers)))

for i in range(0, conf.CONCURRENT_JOBS - len(self.workers)):
w = Worker(self.request_queue, self.finished_queue)
w = Worker(self.request_queue, self.finished_queue, os.getpid())
w.start()
self._workers[w.pid] = w

Expand Down
11 changes: 11 additions & 0 deletions eventmq/tests/test_jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def pretend_job(t):
return "I slept for {} seconds".format(t)


def work_job(t):
import time

begin_time = time.time()

while time.time() < begin_time + t:
a = 1+1

return a


def test_setup():
import time
assert time
140 changes: 63 additions & 77 deletions eventmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

from . import conf


if sys.version[0] == '2':
import Queue
else:
Expand Down Expand Up @@ -58,23 +57,19 @@ def run(self):
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)

def join(self, timeout=None):
Thread.join(self, timeout=timeout)
return {'value': self._return}


class MultiprocessWorker(Process):
"""
Defines a worker that spans the job in a multiprocessing task
"""

def __init__(self, input_queue, output_queue, run_setup=True):
def __init__(self, input_queue, output_queue, ppid, run_setup=True):
super(MultiprocessWorker, self).__init__()
self.input_queue = input_queue
self.output_queue = output_queue
self.job_count = 0
self.run_setup = run_setup
self.ppid = os.getppid()
self.ppid = ppid

@property
def logger(self):
Expand All @@ -98,8 +93,7 @@ def run(self):
conf.SETUP_PATH,
conf.SETUP_CALLABLE,
os.getpid()))
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE,
self.logger)
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
except Exception as e:
self.logger.warning('Unable to do setup task ({}.{}): {}'
.format(conf.SETUP_PATH,
Expand All @@ -112,62 +106,54 @@ def run(self):

while True:
try:
payload = self.input_queue.get_nowait()
payload = self.input_queue.get(block=False, timeout=1000)
if payload == 'DONE':
break
except Queue.Empty:
if os.getppid() != self.ppid:
break
continue
except Exception as e:
break
break
finally:
if os.getppid() == 1:
if os.getppid() != self.ppid:
break

try:
return_val = 'None'
self.logger.debug("Job started")
self.job_count += 1
timeout = payload.get("timeout", None)
msgid = payload.get('msgid', '')
callback = payload.get('callback', '')

worker_thread = StoppableThread(target=_run,
args=(payload['params'],
self.logger))
worker_thread.start()
worker_thread.join(timeout)
return_val = {"value": worker_thread._return}

if timeout:
worker_thread = StoppableThread(target=_run,
args=(payload['params'],
self.logger))
worker_thread.start()
return_val = worker_thread.join(timeout)

if worker_thread.isAlive():
worker_thread.stop()
self.output_queue.put({
{'msgid': msgid,
'return': 'TimeoutError',
'pid': os.getpid(),
'callback': payload['callback']}
})
break

else:
return_val = _run(payload['params'])
if worker_thread.isAlive():
worker_thread.stop()
return_val = 'TimeoutError'

try:
self.output_queue.put_nowait(
{'msgid': msgid,
'return': return_val,
'pid': os.getpid(),
'callback': callback}
)
except Exception:
break

except Exception as e:
return_val = str(e)

if self.job_count >= conf.MAX_JOB_COUNT \
or return_val == 'TimeoutError':
death_callback = 'worker_death_with_reply' \
if 'reply' in payload['callback'] else \
'worker_death'
if self.job_count >= conf.MAX_JOB_COUNT:
death_callback = 'worker_death'
break

else:
self.output_queue.put(
{'msgid': msgid,
'return': return_val,
'pid': os.getpid(),
'callback': payload['callback']}
)

self.output_queue.put(
{'msgid': None,
'return': 'DEATH',
Expand All @@ -181,46 +167,46 @@ def _run(payload, logger):
"""
Takes care of actually executing the code given a message payload
"""
if ":" in payload["path"]:
_pkgsplit = payload["path"].split(':')
s_package = _pkgsplit[0]
s_cls = _pkgsplit[1]
else:
s_package = payload["path"]
s_cls = None
try:
if ":" in payload["path"]:
_pkgsplit = payload["path"].split(':')
s_package = _pkgsplit[0]
s_cls = _pkgsplit[1]
else:
s_package = payload["path"]
s_cls = None

s_callable = payload["callable"]
s_callable = payload["callable"]

package = import_module(s_package)
if s_cls:
cls = getattr(package, s_cls)
package = import_module(s_package)
if s_cls:
cls = getattr(package, s_cls)

if "class_args" in payload:
class_args = payload["class_args"]
else:
class_args = ()
if "class_args" in payload:
class_args = payload["class_args"]
else:
class_args = ()

if "class_kwargs" in payload:
class_kwargs = payload["class_kwargs"]
else:
class_kwargs = {}
if "class_kwargs" in payload:
class_kwargs = payload["class_kwargs"]
else:
class_kwargs = {}

obj = cls(*class_args, **class_kwargs)
callable_ = getattr(obj, s_callable)
else:
callable_ = getattr(package, s_callable)
obj = cls(*class_args, **class_kwargs)
callable_ = getattr(obj, s_callable)
else:
callable_ = getattr(package, s_callable)

if "args" in payload:
args = payload["args"]
else:
args = ()
if "args" in payload:
args = payload["args"]
else:
args = ()

if "kwargs" in payload:
kwargs = payload["kwargs"]
else:
kwargs = {}
if "kwargs" in payload:
kwargs = payload["kwargs"]
else:
kwargs = {}

try:
return_val = callable_(*args, **kwargs)
except Exception as e:
logger.exception(e)
Expand Down

0 comments on commit 5cd9a1d

Please sign in to comment.