Skip to content

Commit

Permalink
Deserialize message bodies when they're unicode
Browse files Browse the repository at this point in the history
The validate API should deserialize message bodies of ZMQMessages if
they're six.text_type. Additionally, we'll (for backwards compatibility)
attempt to decode bytes to UTF-8 and log an error since this is not a
safe API and is deprecated.

This is a cherry-pick of #464

Signed-off-by: Jeremy Cline <jeremy@jcline.org>
  • Loading branch information
jeremycline committed Aug 9, 2017
1 parent a8a485d commit 8290f0b
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 2 deletions.
24 changes: 22 additions & 2 deletions fedmsg/consumers/__init__.py
Expand Up @@ -28,6 +28,7 @@
import time

import moksha.hub.api.consumer
import six

import fedmsg.crypto
import fedmsg.encoding
Expand Down Expand Up @@ -214,11 +215,30 @@ def _make_query(page=1):
break

def validate(self, message):
""" This needs to raise an exception, caught by moksha. """
"""
Validate the message before the consumer processes it.
This needs to raise an exception, caught by moksha.
Args:
message (dict): The message as a dictionary. This must, at a minimum,
contain the 'topic' key with a unicode string value and 'body' key
with a dictionary value.
Raises:
RuntimeWarning: If the message is not valid.
"""
if hasattr(message, '__json__'):
message = message.__json__()
if isinstance(message['body'], str):
if isinstance(message['body'], six.text_type):
message['body'] = json.loads(message['body'])
elif isinstance(message['body'], six.binary_type):
# Try to decode the message body as UTF-8 since it's very likely
# that that was the encoding used. This API should eventually only
# accept unicode strings inside messages. If a UnicodeDecodeError
# happens, let that bubble up.
self.log.error('Message body is not unicode; this is deprecated')
message['body'] = json.loads(message['body'].decode('utf-8'))

# Massage STOMP messages into a more compatible format.
if 'topic' not in message['body']:
Expand Down
104 changes: 104 additions & 0 deletions fedmsg/tests/consumers/test_consumers.py
@@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
#
# This file is part of fedmsg.
# Copyright (C) 2017 Red Hat, Inc.
#
# fedmsg is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# fedmsg is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with fedmsg; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors: Jeremy Cline <jcline@redhat.com>
"""Tests for the :mod:`fedmsg.consumers` module."""

import os
import unittest

from moksha.hub.zeromq.zeromq import ZMQMessage
import mock

from fedmsg import crypto
from fedmsg.consumers import FedmsgConsumer
from fedmsg.tests.crypto.test_x509 import SSLDIR


class DummyConsumer(FedmsgConsumer):
"""Set attributes necessary to instantiate a consumer."""
config_key = 'dummy'
validate_signatures = True


class FedmsgConsumerValidateTests(unittest.TestCase):
"""Tests for the :meth:`FedmsgConsumer.validate` method."""

def setUp(self):
self.config = {
'dummy': True,
'ssldir': SSLDIR,
'certname': 'shell-app01.phx2.fedoraproject.org',
'ca_cert_cache': os.path.join(SSLDIR, 'ca.crt'),
'ca_cert_cache_expiry': 1497618475, # Stop fedmsg overwriting my CA, See Issue 420

'crl_location': "http://threebean.org/fedmsg-tests/crl.pem",
'crl_cache': os.path.join(SSLDIR, 'crl.pem'),
'crl_cache_expiry': 1497618475,
'crypto_validate_backends': ['x509'],
}
self.hub = mock.Mock(config=self.config)
self.consumer = DummyConsumer(self.hub)

def test_topic_mismatch(self):
"""Assert a RuntimeWarning is raised for topic mismatches."""
message = {'topic': 't1', 'body': {'topic': 't2'}}

self.assertRaises(RuntimeWarning, self.consumer.validate, message)

def test_valid_signature(self):
"""Assert the API accepts and validates dictionary messages."""
message = {'topic': 't1', 'body': crypto.sign({'topic': 't1'}, **self.config)}

self.consumer.validate(message)

def test_invalid_signature(self):
"""Assert a RuntimeWarning is raised for topic mismatches."""
message = {'topic': 't1', 'body': crypto.sign({'topic': 't1'}, **self.config)}
message['body']['signature'] = 'thisisnotmysignature'

self.assertRaises(RuntimeWarning, self.consumer.validate, message)

def test_no_topic_in_body(self):
"""Assert the API accepts dictionary messages."""
self.consumer.validate_signatures = False
message = {'body': {'some': 'stuff'}}

self.consumer.validate(message)
self.assertEqual({'body': {'topic': None, 'msg': {'some': 'stuff'}}}, message)

@mock.patch('fedmsg.consumers.fedmsg.crypto.validate')
def test_zmqmessage_text_body(self, mock_crypto_validate):
self.consumer.validate_signatures = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', u'{"some": "stuff"}')

self.consumer.validate(message)
mock_crypto_validate.assert_called_once_with({'topic': u't1', 'msg': {'some': 'stuff'}})

@mock.patch('fedmsg.consumers.fedmsg.crypto.validate')
def test_zmqmessage_binary_body(self, mock_crypto_validate):
self.consumer.log = mock.Mock()
self.consumer.validate_signatures = True
self.consumer.hub.config = {}
message = ZMQMessage(u't1', b'{"some": "stuff"}')

self.consumer.validate(message)
mock_crypto_validate.assert_called_once_with({'topic': u't1', 'msg': {'some': 'stuff'}})
self.consumer.log.error.assert_any_call('Message body is not unicode; this is deprecated')

0 comments on commit 8290f0b

Please sign in to comment.