Skip to content

Commit

Permalink
Assert the signature/certificate fields are unicode
Browse files Browse the repository at this point in the history
This checks the certificate and signature fields in messages to ensure
they're text. If they're not, an error is logged and we do our best to
decode the message as UTF-8. This decoding is a short-term measure until
the users of the APIs have been hunted down and fixed.

This is a cherry-pick from
#456

Signed-off-by: Jeremy Cline <jeremy@jcline.org>
  • Loading branch information
ralphbean authored and jeremycline committed Aug 9, 2017
1 parent 8b2028f commit a8a485d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 10 deletions.
16 changes: 12 additions & 4 deletions fedmsg/crypto/x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import fedmsg.encoding # noqa: E402


log = logging.getLogger(__name__)
_log = logging.getLogger(__name__)

if six.PY3:
long = int
Expand Down Expand Up @@ -106,15 +106,23 @@ def _m2crypto_validate(message, ssldir=None, **config):
raise ValueError("You must set the ssldir keyword argument.")

def fail(reason):
log.warn("Failed validation. %s" % reason)
_log.warn("Failed validation. %s" % reason)
return False

# Some sanity checking
for field in ['signature', 'certificate']:
if field not in message:
return fail("No %r field found." % field)
if not isinstance(message[field], str):
return fail("msg[%r] is not a string" % field)
if not isinstance(message[field], six.text_type):
_log.error('msg[%r] is not a unicode string' % field)
try:
# Make an effort to decode it, it's very likely utf-8 since that's what
# is hardcoded throughout fedmsg. Worst case scenario is it'll cause a
# validation error when there shouldn't be one.
message[field] = message[field].decode('utf-8')
except UnicodeError as e:
_log.error("Unable to decode the message '%s' field: %s", field, str(e))
return False

# Peal off the auth datums
signature = message['signature'].decode('base64')
Expand Down
24 changes: 18 additions & 6 deletions fedmsg/crypto/x509_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
_cryptography = True
except ImportError: # pragma: no cover
_cryptography = False
import six

from . import utils
import fedmsg.crypto
Expand Down Expand Up @@ -143,12 +144,23 @@ def validate(message, ssldir=None, **config):
Returns:
bool: True of the message passes validation, False otherwise.
"""
try:
signature = message['signature'].decode('base64')
certificate = message['certificate'].decode('base64')
except KeyError:
return False

for field in ['signature', 'certificate']:
if field not in message:
_log.warn('No %s field found.', field)
return False
if not isinstance(message[field], six.text_type):
_log.error('msg[%r] is not a unicode string' % field)
try:
# Make an effort to decode it, it's very likely utf-8 since that's what
# is hardcoded throughout fedmsg. Worst case scenario is it'll cause a
# validation error when there shouldn't be one.
message[field] = message[field].decode('utf-8')
except UnicodeError as e:
_log.error("Unable to decode the message '%s' field: %s", field, str(e))
return False

signature = base64.b64decode(message['signature'])
certificate = base64.b64decode(message['certificate'])
message = fedmsg.crypto.strip_credentials(message)

crl_file = None
Expand Down
74 changes: 74 additions & 0 deletions fedmsg/tests/crypto/test_x509.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
#
# This file is part of fedmsg.
# Copyright (C) 2012 - 2017 Red Hat, Inc.
#
Expand All @@ -19,6 +21,9 @@
# Authors: Jeremy Cline <jcline@redhat.com>
import os

import mock
import six

_m2crypto, _cryptography = False, False
try:
import M2Crypto # noqa: F401
Expand Down Expand Up @@ -82,6 +87,18 @@ def test_sign(self):
self.assertTrue('signature' in signed)
self.assertTrue('certificate' in signed)

@expectedFailure
def test_signature_text(self):
"""Assert signature fields are unicode text."""
signed = self.sign({'my': 'message'}, **self.config)
self.assertTrue(isinstance(signed['signature'], six.text_type))

@expectedFailure
def test_certificate_text(self):
"""Assert signing a message inserts the certificate and a signature."""
signed = self.sign({'my': 'message'}, **self.config)
self.assertTrue(isinstance(signed['certificate'], six.text_type))

def test_sign_and_verify(self):
"""Assert signed messages are verified."""
signed = self.sign({'my': 'message'}, **self.config)
Expand Down Expand Up @@ -156,6 +173,35 @@ def test_valid_policy(self):
signed = self.sign({'topic': 'mytopic', 'message': 'so secure'}, **self.config)
self.assertTrue(self.validate(signed, **self.config))

def test_bytes_undecodable(self):
"""Assert un-decodable signatures/certificates fails validation."""
signed = self.sign({'topic': 'mytopic', 'message': 'so secure'}, **self.config)

# This assertion will fail when the sign API changes to return text types, at
# which point this test should encode the signature to bytes.
self.assertTrue(isinstance(signed['signature'], six.binary_type))
self.assertTrue(isinstance(signed['certificate'], six.binary_type))

# This is a non-ascii character that encodes to a bytestring in latin-1
# that won't decode in UTF-8
signed['signature'] = u'Ö'.encode('latin-1')
signed['certificate'] = u'Ö'.encode('latin-1')

self.assertFalse(self.validate(signed, **self.config))

def test_text_validate(self):
"""Assert unicode-type signatures/certificates work."""
signed = self.sign({'topic': 'mytopic', 'message': 'so secure'}, **self.config)

# This assertion will fail when the sign API changes to return text types, at
# which point this test should drop the decode step.
self.assertTrue(isinstance(signed['signature'], six.binary_type))
self.assertTrue(isinstance(signed['certificate'], six.binary_type))
signed['signature'] = signed['signature'].decode('utf-8')
signed['certificate'] = signed['certificate'].decode('utf-8')

self.assertTrue(self.validate(signed, **self.config))


@skipIf(not _cryptography, "cryptography/pyOpenSSL are missing.")
class X509CryptographyTests(X509BaseTests):
Expand All @@ -173,6 +219,20 @@ def test_old_crl(self):

self.assertFalse(self.validate(signed, **self.config))

@mock.patch('fedmsg.crypto.x509_ng._log')
def test_bytes_logs_error(self, mock_log):
"""Assert calling validate with byte signature/certificate logs an error."""
signed = self.sign({'topic': 'mytopic', 'message': 'so secure'}, **self.config)

# This assertion will fail when the sign API changes to return text types, at
# which point this test should encode the signature to bytes.
self.assertTrue(isinstance(signed['signature'], six.binary_type))
self.assertTrue(isinstance(signed['certificate'], six.binary_type))

self.assertTrue(self.validate(signed, **self.config))
mock_log.error.assert_any_call("msg['signature'] is not a unicode string")
mock_log.error.assert_any_call("msg['certificate'] is not a unicode string")


@skipIf(not _cryptography, "M2Crypto/m2ext are missing.")
class X509M2CryptoTests(X509BaseTests):
Expand All @@ -191,6 +251,20 @@ def test_old_crl(self):

self.assertFalse(self.validate(signed, **self.config))

@mock.patch('fedmsg.crypto.x509._log')
def test_bytes_logs_error(self, mock_log):
"""Assert calling validate with byte signature/certificate logs an error."""
signed = self.sign({'topic': 'mytopic', 'message': 'so secure'}, **self.config)

# This assertion will fail when the sign API changes to return text types, at
# which point this test should encode the signature to bytes.
self.assertTrue(isinstance(signed['signature'], six.binary_type))
self.assertTrue(isinstance(signed['certificate'], six.binary_type))

self.assertTrue(self.validate(signed, **self.config))
mock_log.error.assert_any_call("msg['signature'] is not a unicode string")
mock_log.error.assert_any_call("msg['certificate'] is not a unicode string")


@skipIf(not (_cryptography and _m2crypto), 'M2Crypto and cryptography required.')
class M2CryptoWithCryptoTests(X509BaseTests):
Expand Down

0 comments on commit a8a485d

Please sign in to comment.