Skip to content

Commit

Permalink
fedmsg.core modifications for STOMP/AMQP hand-off.
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphbean committed Oct 9, 2016
1 parent f9d3bd7 commit 8e5e9d1
Showing 1 changed file with 35 additions and 7 deletions.
42 changes: 35 additions & 7 deletions fedmsg/core.py
Expand Up @@ -91,8 +91,9 @@ def __init__(self, **config):
raise KeyError("Could not find endpoint for fedmsg-relay."
" Try installing fedmsg-relay.")

# Actually set up our publisher
# Actually set up our publisher, but only if we're configured for zmq.
if (
config.get('zmq_enabled', True) and
not config.get("mute", False) and
config.get("name", None) and
config.get("endpoints", None) and
Expand Down Expand Up @@ -151,11 +152,15 @@ def __init__(self, **config):
elif config.get('mute', False):
# Our caller doesn't intend to send any messages. Pass silently.
pass
else:
elif config.get('zmq_enabled', True):
# Something is wrong.
warnings.warn(
"fedmsg is not configured to send any messages "
"fedmsg is not configured to send any zmq messages "
"for name %r" % config.get("name", None))
else:
# We're not configured to send zmq messages, but zmq_enabled is
# False, so no need to warn the user.
pass

# Cleanup. See https://bit.ly/SaGeOr for discussion.
weakref.ref(threading.current_thread(), self.destroy)
Expand Down Expand Up @@ -308,10 +313,28 @@ def publish(self, topic=None, msg=None, modname=None,
if pre_fire_hook:
pre_fire_hook(msg)

self.publisher.send_multipart(
[topic, fedmsg.encoding.dumps(msg).encode('utf-8')],
flags=zmq.NOBLOCK,
)
# We handle zeromq publishing ourselves. But, if that is disabled,
# defer to the moksha' hub's twisted reactor to send messages (if
# available).
if self.c.get('zmq_enabled', True):
self.publisher.send_multipart(
[topic, fedmsg.encoding.dumps(msg).encode('utf-8')],
flags=zmq.NOBLOCK,
)
else:
# Perhaps we're using STOMP or AMQP? Let moksha handle it.
import moksha.hub
# First, a quick sanity check.
if not moksha.hub._hub:
raise AttributeError("Unable to publish non-zeromq msg"
"without moksha-hub initialization.")
# Let moksha.hub do our work.
moksha.hub._hub.send_message(
topic=topic,
message=fedmsg.encoding.dumps(msg).encode('utf-8'),
jsonify=False,
)


def tail_messages(self, topic="", passive=False, **kw):
""" Tail messages on the bus.
Expand All @@ -320,6 +343,11 @@ def tail_messages(self, topic="", passive=False, **kw):
``(name, endpoint, topic, message)``
"""

if not self.c.get('zmq_enabled', True):
raise ValueError("fedmsg.tail_messages() is only available for "
"zeromq. Use the hub-consumer approach for "
"STOMP or AMQP support.")

poller, subs = self._create_poller(topic=topic, passive=False, **kw)
try:
for msg in self._poll(poller, subs):
Expand Down

0 comments on commit 8e5e9d1

Please sign in to comment.