Skip to content

Commit

Permalink
Ensure the streamer works when NSQ is namespaced
Browse files Browse the repository at this point in the history
This is a bit of a nasty hack to ensure that we correctly process
messages which have come from a namespaced NSQ topic (such as in stage,
where all topic names are prefixed with 'stage-').
  • Loading branch information
nickstenning committed Feb 24, 2016
1 parent 4d46f1f commit c2b17be
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 29 deletions.
28 changes: 23 additions & 5 deletions h/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ def __init__(self, namespace, *args, **kwargs):
self.namespace = namespace

def publish(self, topic, data):
if self.namespace is not None:
topic = '{0}-{1}'.format(self.namespace, topic)
topic = resolve_topic(topic, namespace=self.namespace)
if not isinstance(data, str):
data = json.dumps(data)
return self.client.publish(topic, data)
Expand All @@ -27,10 +26,8 @@ def get_reader(settings, topic, channel):
The caller is responsible for adding appropriate `on_message` hooks and
starting the reader.
"""
ns = settings.get('nsq.namespace')
addrs = aslist(settings.get('nsq.reader.addresses', 'localhost:4150'))
if ns is not None:
topic = '{0}-{1}'.format(ns, topic)
topic = resolve_topic(topic, settings=settings)
reader = gnsq.Reader(topic, channel, nsqd_tcp_addresses=addrs)
return reader

Expand All @@ -48,6 +45,27 @@ def get_writer(settings):
return nsqd


def resolve_topic(topic, namespace=None, settings=None):
"""
Return a resolved name for the requested topic.
This uses the passed `namespace` to resolve the topic name, or,
alternatively, a pyramid settings object.
"""
if namespace is not None and settings is not None:
raise ValueError('you must provide only one of namespace or settings')

if settings is not None:
ns = settings.get('nsq.namespace')
else:
ns = namespace

if ns is not None:
return '{0}-{1}'.format(ns, topic)

return topic


def includeme(config):
config.add_request_method(
lambda req, t, c: get_reader(req.registry.settings, t, c),
Expand Down
12 changes: 1 addition & 11 deletions h/streamer/nsq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

log = logging.getLogger(__name__)

# NSQ message topics that the WebSocket server
# processes messages from
ANNOTATIONS_TOPIC = 'annotations'
USER_TOPIC = 'user'

# An incoming message from a subscribed NSQ topic
Message = namedtuple('Message', ['topic', 'payload'])
Expand Down Expand Up @@ -67,7 +63,7 @@ def _handler(reader, message):
raise RuntimeError('Queue reader quit unexpectedly!')


def handle_message(message, topic_handlers=None):
def handle_message(message, topic_handlers):
"""
Deserialize and process a message from the reader.
Expand All @@ -78,12 +74,6 @@ def handle_message(message, topic_handlers=None):
object. It is assumed that there is a 1:1 request-reply mapping between
incoming messages and messages to be sent out over the websockets.
"""
if topic_handlers is None:
topic_handlers = {
ANNOTATIONS_TOPIC: handle_annotation_event,
USER_TOPIC: handle_user_event,
}

data = json.loads(message.payload)

try:
Expand Down
21 changes: 16 additions & 5 deletions h/streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from h import db
from h import stats
from h.queue import resolve_topic
from h.streamer import nsq
from h.streamer import websocket

Expand All @@ -20,6 +21,10 @@
# using .put(...) with a timeout or .put_nowait(...) as appropriate.
WORK_QUEUE = gevent.queue.Queue(maxsize=4096)

# NSQ message topics that the streamer processes messages from
ANNOTATIONS_TOPIC = 'annotations'
USER_TOPIC = 'user'


class UnknownMessageType(Exception):
"""Raised if a message in the work queue if of an unknown type."""
Expand All @@ -38,24 +43,24 @@ def start(event):
# Start greenlets to process messages from NSQ
gevent.spawn(nsq.process_nsq_topic,
settings,
nsq.ANNOTATIONS_TOPIC,
ANNOTATIONS_TOPIC,
WORK_QUEUE),
gevent.spawn(nsq.process_nsq_topic,
settings,
nsq.USER_TOPIC,
USER_TOPIC,
WORK_QUEUE),
# A greenlet to periodically report to statsd
gevent.spawn(report_stats, settings),
# And one to process the queued work
gevent.spawn(process_work_queue, WORK_QUEUE)
gevent.spawn(process_work_queue, settings, WORK_QUEUE)
]

# Start a "greenlet of last resort" to monitor the worker greenlets and
# bail if any unexpected errors occur.
gevent.spawn(supervise, greenlets)


def process_work_queue(queue, session_factory=db.Session):
def process_work_queue(settings, queue, session_factory=db.Session):
"""
Process each message from the queue in turn, handling exceptions.
Expand All @@ -65,6 +70,12 @@ def process_work_queue(queue, session_factory=db.Session):
closed between messages.
"""
session = session_factory()
annotations_topic = resolve_topic(ANNOTATIONS_TOPIC, settings=settings)
user_topic = resolve_topic(USER_TOPIC, settings=settings)
topic_handlers = {
annotations_topic: nsq.handle_annotation_event,
user_topic: nsq.handle_user_event,
}

for msg in queue:
try:
Expand All @@ -76,7 +87,7 @@ def process_work_queue(queue, session_factory=db.Session):
"DEFERRABLE")

if isinstance(msg, nsq.Message):
nsq.handle_message(msg)
nsq.handle_message(msg, topic_handlers=topic_handlers)
elif isinstance(msg, websocket.Message):
websocket.handle_message(msg)
else:
Expand Down
34 changes: 26 additions & 8 deletions h/streamer/test/streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,34 @@ def test_process_work_queue_sends_nsq_messages_to_nsq_handle_message(session):
message = nsq.Message(topic='foo', payload='bar')
queue = [message]

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

nsq.handle_message.assert_called_once_with(message)
nsq.handle_message.assert_called_once_with(message,
topic_handlers=mock.ANY)


def test_process_work_queue_uses_appropriate_topic_handlers_for_nsq_messages(session):
message = nsq.Message(topic='foo', payload='bar')
queue = [message]

streamer.process_work_queue({'nsq.namespace': 'wibble'},
queue,
session_factory=lambda: session)

topic_handlers = {
'wibble-annotations': nsq.handle_annotation_event,
'wibble-user': nsq.handle_user_event,
}

nsq.handle_message.assert_called_once_with(mock.ANY,
topic_handlers=topic_handlers)


def test_process_work_queue_sends_websocket_messages_to_websocket_handle_message(session):
message = websocket.Message(socket=mock.sentinel.SOCKET, payload='bar')
queue = [message]

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

websocket.handle_message.assert_called_once_with(message)

Expand All @@ -32,7 +50,7 @@ def test_process_work_queue_commits_after_each_message(session):
message2 = nsq.Message(topic='foo', payload='bar')
queue = [message1, message2]

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

assert session.commit.call_count == 2

Expand All @@ -43,7 +61,7 @@ def test_process_work_queue_rolls_back_on_handler_exception(session):

nsq.handle_message.side_effect = RuntimeError('explosion')

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

session.commit.assert_not_called()
session.rollback.assert_called_once_with()
Expand All @@ -53,7 +71,7 @@ def test_process_work_queue_rolls_back_on_unknown_message_type(session):
message = 'something that is not a message'
queue = [message]

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

session.commit.assert_not_called()
session.rollback.assert_called_once_with()
Expand All @@ -63,7 +81,7 @@ def test_process_work_queue_calls_close_after_commit(session):
message = nsq.Message(topic='foo', payload='bar')
queue = [message]

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

assert session.method_calls[-2:] == [
call.commit(),
Expand All @@ -77,7 +95,7 @@ def test_process_work_queue_calls_close_after_rollback(session):

nsq.handle_message.side_effect = RuntimeError('explosion')

streamer.process_work_queue(queue, session_factory=lambda: session)
streamer.process_work_queue({}, queue, session_factory=lambda: session)

assert session.method_calls[-2:] == [
call.rollback(),
Expand Down
25 changes: 25 additions & 0 deletions h/test/queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def test_get_writer_namespace(fake_nsqd):
writer.publish('sometopic', 'somedata')
fake_client.publish.assert_called_with('abc123-sometopic', 'somedata')


@patch('gnsq.Nsqd')
def test_writer_serializes_dict(fake_nsqd):
fake_client = fake_nsqd.return_value
Expand All @@ -85,3 +86,27 @@ def test_writer_serializes_dict(fake_nsqd):
'key': 'value',
})
fake_client.publish.assert_called_with('abc-sometopic', '{"key": "value"}')


@pytest.mark.parametrize('topic,namespace,settings_obj,expected', [
# No namespace
('foo', None, None, 'foo'),
('foo', None, {}, 'foo'),
('foo', None, {'nsq.namespace': None}, 'foo'),
# Namespace provided
('foo', 'myns', None, 'myns-foo'),
('foo', None, {'nsq.namespace': 'myns'}, 'myns-foo'),
])
def test_resolve_topic(topic, namespace, settings_obj, expected):
result = queue.resolve_topic(topic,
namespace=namespace,
settings=settings_obj)

assert result == expected


def test_resolve_topic_raises_if_namespace_and_topic_both_given():
with pytest.raises(ValueError):
queue.resolve_topic('foo',
namespace='prefix',
settings={'nsq.namespace': 'prefix'})

0 comments on commit c2b17be

Please sign in to comment.