Skip to content

Commit

Permalink
Finish better worker PR
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Apr 3, 2017
1 parent ab8434c commit 36e3bdd
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 104 deletions.
27 changes: 14 additions & 13 deletions bin/send_msg
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ if __name__ == "__main__":

msg = ['run', {
'path': 'eventmq.tests.test_jobmanager',
'callable': 'pretend_job',
'callable': 'work_job',
'class_args': ('blurp',),
'class_kwargs': {'kwarg1': True},
'args': (10, ),
'args': (50, ),
'kwargs': {}
}]

msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=200)
print 'Sent message, use msgid={} to track responses'.format(msgid)
events = dict(poller.poll(500))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=10)
msgid = send_request(s, msg, guarantee=True, reply_requested=True)
# print 'Sent message, use msgid={} to track responses'.format(msgid)
# events = dict(poller.poll(500))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg

# Wait for job reply
events = dict(poller.poll(50000))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
# # Wait for job reply
# events = dict(poller.poll(50000))
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
# print msg
4 changes: 4 additions & 0 deletions eventmq/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@
SETUP_PATH = ''
SETUP_CALLABLE = ''

# Time to wait after receiving SIGTERM to kill the workers in the jobmanager
# forecfully
KILL_GRACE_PERIOD = 300

# }}}
47 changes: 36 additions & 11 deletions eventmq/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def workers(self):
if not hasattr(self, '_workers'):
self._workers = {}
for i in range(0, conf.CONCURRENT_JOBS):
w = Worker(self.request_queue, self.finished_queue)
w = Worker(self.request_queue, self.finished_queue,
os.getpid())
w.start()
self._workers[w.pid] = w

Expand Down Expand Up @@ -155,15 +156,18 @@ def _start_event_loop(self):
not self.should_reset:
if len(self._workers) > 0:
time.sleep(0.1)
elif len(self._workers) == 0:
else:
sys.exit(0)
elif monotonic() - self.disconnect_time > 500:

if monotonic() > self.disconnect_time + \
conf.KILL_GRACE_PERIOD:
logger.debug("Killing unresponsive workers")
for pid in self._workers.keys():
self.kill_worker(pid, signal.SIGKILL)
sys.ext(0)
sys.exit(0)
else:
try:
events = self.poller.poll()
events = self.poller.poll(10)
except zmq.ZMQError:
logger.debug('Disconnecting due to ZMQError while'
'polling')
Expand Down Expand Up @@ -191,6 +195,32 @@ def _start_event_loop(self):
logger.exception("Unhandled exception in main jobmanager loop")

def handle_response(self, resp):
"""
Handles a response from a worker process to the jobmanager
Args:
resp (dict): Must contain a key 'callback' with the desired callback
function as a string, i.e. 'worker_done' which is then called
Sample Input
resp = {
'callback': 'worker_done', (str)
'msgid': 'some_uuid', (str)
'return': 'return value', (dict)
'pid': 'pid_of_worker_process' (int)
}
return_value must be a dictionary that can be json serialized and
formatted like:
{
"value": 'return value of job goes here'
}
if the 'return' value of resp is 'DEATH', the worker died so we clean
that up as well
"""

logger.debug(resp)
pid = resp['pid']
Expand Down Expand Up @@ -257,11 +287,6 @@ def premature_death(self, reply, msgid):
def worker_death(self, reply, msgid):
return

def worker_death_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
self.send_ready()

def worker_done_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
Expand Down Expand Up @@ -316,7 +341,7 @@ def check_worker_health(self):
.format(conf.CONCURRENT_JOBS - len(self.workers)))

for i in range(0, conf.CONCURRENT_JOBS - len(self.workers)):
w = Worker(self.request_queue, self.finished_queue)
w = Worker(self.request_queue, self.finished_queue, os.getpid())
w.start()
self._workers[w.pid] = w

Expand Down
11 changes: 11 additions & 0 deletions eventmq/tests/test_jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def pretend_job(t):
return "I slept for {} seconds".format(t)


def work_job(t):
import time

begin_time = time.time()

while time.time() < begin_time + t:
a = 1+1

return a


def test_setup():
import time
assert time
Loading

0 comments on commit 36e3bdd

Please sign in to comment.