Skip to content

Commit

Permalink
Merge pull request #507 from fedora-infra/return-boolean
Browse files Browse the repository at this point in the history
Return any return value from ._consume(..).
  • Loading branch information
jeremycline committed Mar 19, 2018
2 parents 6326189 + 7c9af8e commit 928f18a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
30 changes: 28 additions & 2 deletions fedmsg/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,32 @@ def validate(self, message):
raise RuntimeWarning("Failed to authn message.")

def _consume(self, message):
""" Called when a message is consumed.
This private method handles some administrative setup and teardown
before calling the public interface `consume` typically implemented
by a subclass.
When `moksha.blocking_mode` is set to `False` in the config, this
method always returns `None`. The argued message is stored in an
internal queue where the consumer's worker threads should eventually
pick it up.
When `moksha.blocking_mode` is set to `True` in the config, this
method should return True or False, indicating whether the message
was handled or not. Specifically, in the event that the inner
`consume` method raises an exception of any kind, this method
should return `False` indicating that the message was not
successfully handled.
Args:
message (dict): The message as a dictionary.
Returns:
bool: Should be interpreted as whether or not the message was
handled by the consumer, or `None` if `moksha.blocking_mode` is
set to False.
"""

try:
self.validate(message)
Expand All @@ -290,11 +316,11 @@ def _consume(self, message):

try:
self.validate(m)
super(FedmsgConsumer, self)._consume(m)
return super(FedmsgConsumer, self)._consume(m)
except RuntimeWarning as e:
self.log.warn("Received invalid message {}".format(e))
else:
super(FedmsgConsumer, self)._consume(message)
return super(FedmsgConsumer, self)._consume(message)

def pre_consume(self, message):
self.save_status(dict(
Expand Down
33 changes: 33 additions & 0 deletions fedmsg/tests/consumers/test_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,36 @@ def test_zmqmessage_binary_body(self, mock_crypto_validate, mock_warn):
self.consumer.validate(message)
mock_crypto_validate.assert_called_once_with({'topic': u't1', 'msg': {'some': 'stuff'}})
mock_warn.assert_any_call('Message body is not unicode', DeprecationWarning)


class FedmsgConsumerHandleTests(unittest.TestCase):
"""Tests for the :meth:`FedmsgConsumer._consume` method."""

def setUp(self):
self.config = {
'dummy': True,
}
self.hub = mock.Mock(config=self.config)
self.consumer = DummyConsumer(self.hub)

def test_return_value_positive(self):
self.consumer.validate_signatures = False
self.consumer.blocking_mode = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', u'{"some": "stuff"}')

# Consumer does nothing, no exception, returns True : handled.
with mock.patch.object(self.consumer, 'consume'):
handled = self.consumer._consume(message)
assert handled is True

def test_return_value_negative(self):
self.consumer.validate_signatures = False
self.consumer.blocking_mode = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', u'{"some": "stuff"}')

# Consumer is unimplemented, and so raises and exception,
# returning False : unhandled.
handled = self.consumer._consume(message)
assert handled is False

0 comments on commit 928f18a

Please sign in to comment.