Skip to content

Commit

Permalink
version bump to 0.3.4
Browse files Browse the repository at this point in the history
  • Loading branch information
sideshowdave7 committed Apr 3, 2017
1 parent 860a110 commit ab8434c
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 73 deletions.
20 changes: 12 additions & 8 deletions bin/send_msg
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ if __name__ == "__main__":
'callable': 'pretend_job',
'class_args': ('blurp',),
'class_kwargs': {'kwarg1': True},
'args': (2, ),
'args': (10, ),
'kwargs': {}
}]

send_request(s, msg, guarantee=True, reply_requested=True, timeout=1)
# print zmq.POLLOUT
# events = dict(poller.poll(500))
# print events
# if events[s.zsocket] == zmq.POLLIN:
# msg = s.recv_multipart()
msgid = send_request(s, msg, guarantee=True, reply_requested=True, timeout=200)
print 'Sent message, use msgid={} to track responses'.format(msgid)
events = dict(poller.poll(500))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg

# print msg
# Wait for job reply
events = dict(poller.poll(50000))
if events[s.zsocket] == zmq.POLLIN:
msg = s.recv_multipart()
print msg
2 changes: 1 addition & 1 deletion eventmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__author__ = 'EventMQ Contributors'
__version__ = '0.3.3'
__version__ = '0.3.4'

PROTOCOL_VERSION = 'eMQP/1.0'

Expand Down
16 changes: 10 additions & 6 deletions eventmq/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
================================
Ensures things about jobs and spawns the actual tasks
"""
from json import loads as deserializer
from json import dumps as serializer, loads as deserializer

import logging
from multiprocessing import Queue as mp_queue
import os
import signal
import sys
import os
import time

import zmq
Expand Down Expand Up @@ -257,10 +258,12 @@ def worker_death(self, reply, msgid):
return

def worker_death_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
self.send_ready()

def worker_done_with_reply(self, reply, msgid):
reply = serializer(reply)
self.send_reply(reply, msgid)
self.send_ready()

Expand Down Expand Up @@ -343,11 +346,12 @@ def sighup_handler(self, signum, frame):
self.received_disconnect = True

def sigterm_handler(self, signum, frame):
logger.info('Shutting down..')
sendmsg(self.outgoing, KBYE)
if not self.received_disconnect:
logger.info('Shutting down..')
sendmsg(self.outgoing, KBYE)

self.awaiting_startup_ack = False
self.received_disconnect = True
self.awaiting_startup_ack = False
self.received_disconnect = True

def jobmanager_main(self, broker_addr=None):
"""
Expand Down
2 changes: 2 additions & 0 deletions eventmq/tests/test_jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def test__start_event_loop(self, send_ready_mock, maybe_send_hb_mock,
sender_mock.return_value)

jm.received_disconnect = True
jm.should_reset = True
jm._start_event_loop()

def test_on_request(self):
Expand Down Expand Up @@ -170,6 +171,7 @@ def start_jm(jm, addr):

def pretend_job(t):
time.sleep(t)
return "I slept for {} seconds".format(t)


def test_setup():
Expand Down
3 changes: 2 additions & 1 deletion eventmq/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# You should have received a copy of the GNU Lesser General Public License
# along with eventmq. If not, see <http://www.gnu.org/licenses/>.

import logging
from multiprocessing import Pool
import time

Expand All @@ -38,7 +39,7 @@ def test_run_with_timeout():
'args': [2]
}

msgid = worker._run(payload)
msgid = worker._run(payload, logging.getLogger())

assert msgid

Expand Down
132 changes: 76 additions & 56 deletions eventmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@
Defines different short-lived workers that execute jobs
"""
from importlib import import_module

import logging

from multiprocessing import Process

import os
import sys

from threading import Event, Thread
import time

from . import conf


if sys.version[0] == '2':
import Queue
else:
import queue as Queue

from . import conf

logger = logging.getLogger(__name__)


class StoppableThread(Thread):
"""Thread class with a stop() method. The thread itself has to check
Expand All @@ -42,6 +44,7 @@ class StoppableThread(Thread):
def __init__(self, target, name=None, args=()):
super(StoppableThread, self).__init__(name=name, target=target,
args=args)
self._return = None
self._stop = Event()

def stop(self):
Expand All @@ -50,6 +53,15 @@ def stop(self):
def stopped(self):
return self._stop.isSet()

def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)

def join(self, timeout=None):
Thread.join(self, timeout=timeout)
return {'value': self._return}


class MultiprocessWorker(Process):
"""
Expand All @@ -63,7 +75,13 @@ def __init__(self, input_queue, output_queue, run_setup=True):
self.job_count = 0
self.run_setup = run_setup
self.ppid = os.getppid()
self.last_job_rcvd = None

@property
def logger(self):
if not hasattr(self, '_logger'):
self._logger = logging.getLogger(__name__ + '.' + str(os.getpid()))

return self._logger

def run(self):
"""
Expand All @@ -75,19 +93,22 @@ def run(self):
self.run_setup = False
if any(conf.SETUP_CALLABLE) and any(conf.SETUP_PATH):
try:
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE)
self.logger.debug("Running setup ({}.{}) for worker id {}"
.format(
conf.SETUP_PATH,
conf.SETUP_CALLABLE,
os.getpid()))
run_setup(conf.SETUP_PATH, conf.SETUP_CALLABLE,
self.logger)
except Exception as e:
logger.warning('Unable to complete setup task ({}.{}): {}'
.format(conf.SETUP_PATH,
conf.SETUP_CALLABLE, str(e)))
self.logger.warning('Unable to do setup task ({}.{}): {}'
.format(conf.SETUP_PATH,
conf.SETUP_CALLABLE, str(e)))

import zmq
zmq.Context.instance().term()

resp = {'msgid': None,
'return': 'None',
'pid': os.getpid(),
'callback': 'premature_death'}
death_callback = 'worker_death'

while True:
try:
Expand All @@ -102,53 +123,61 @@ def run(self):
if os.getppid() == 1:
break

logger.debug("Worker got request")
try:
self.running_job = True
return_val = 'None'
self.logger.debug("Job started")
self.job_count += 1
timeout = payload.get("timeout", None)
msgid = payload.get('msgid', '')

resp = {'msgid': msgid,
'return': 'None',
'pid': os.getpid(),
'callback': payload['callback']}

if timeout:
worker_thread = StoppableThread(target=_run,
args=(payload['params'], ))
args=(payload['params'],
self.logger))
worker_thread.start()
worker_thread.join(timeout)
return_val = worker_thread.join(timeout)

if worker_thread.isAlive():
worker_thread.stop()
resp['return'] = 'TimeoutError'
else:
resp['return'] = 'DONE'
self.output_queue.put({
{'msgid': msgid,
'return': 'TimeoutError',
'pid': os.getpid(),
'callback': payload['callback']}
})
break

else:
resp['reutrn'] = _run(payload['params'])
return_val = _run(payload['params'])

except Exception as e:
resp['return'] = str(e)
return_val = str(e)

if self.job_count >= conf.MAX_JOB_COUNT or resp['return'] == 'TimeoutError':
resp['callback'] = 'worker_death_with_reply' \
if 'reply' in resp['callback'] else \
'worker_death'
if self.job_count >= conf.MAX_JOB_COUNT \
or return_val == 'TimeoutError':
death_callback = 'worker_death_with_reply' \
if 'reply' in payload['callback'] else \
'worker_death'
break

else:
self.output_queue.put(resp)

resp['return'] = 'DEATH'

if not resp['callback']:
resp['callback'] = 'worker_death'

self.output_queue.put_nowait(resp)
logger.debug("Worker death, PID: {}".format(os.getpid()))


def _run(payload):
self.output_queue.put(
{'msgid': msgid,
'return': return_val,
'pid': os.getpid(),
'callback': payload['callback']}
)

self.output_queue.put(
{'msgid': None,
'return': 'DEATH',
'pid': os.getpid(),
'callback': death_callback}
)
self.logger.debug("Worker death")


def _run(payload, logger):
"""
Takes care of actually executing the code given a message payload
"""
Expand Down Expand Up @@ -192,21 +221,16 @@ def _run(payload):
kwargs = {}

try:
callable_(*args, **kwargs)
return_val = callable_(*args, **kwargs)
except Exception as e:
logger.exception(e)
return str(e)

# Signal that we're done with this job
return 'DONE'
return return_val


def run_setup(setup_path, setup_callable):
logger.debug("Running setup ({}.{}) for worker id {}".format(
setup_path,
setup_callable,
os.getpid()))

if ":" in setup_path:
_pkgsplit = setup_path.split(':')
s_setup_package = _pkgsplit[0]
Expand All @@ -218,8 +242,4 @@ def run_setup(setup_path, setup_callable):

setup_callable_ = getattr(setup_package, setup_callable)

try:
setup_callable_()
except Exception as e:
logger.exception(e)
return str(e)
setup_callable_()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='eventmq',
version='0.3.3',
version='0.3.4',
description='EventMQ job execution and messaging system based on ZeroMQ',
packages=find_packages(),
install_requires=['pyzmq==15.4.0',
Expand Down

0 comments on commit ab8434c

Please sign in to comment.