Skip to content

Commit

Permalink
Fixed bug (#18) warning message when setting heartbeat using uri
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Jun 9, 2016
1 parent 49ffb01 commit 287e708
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 14 deletions.
48 changes: 37 additions & 11 deletions amqpstorm/tests/uri_connection_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@

from amqpstorm import UriConnection

from amqpstorm.tests.utility import MockLoggingHandler

logging.basicConfig(level=logging.DEBUG)


class UriConnectionTests(unittest.TestCase):
def setUp(self):
self.logging_handler = MockLoggingHandler()
logging.root.addHandler(self.logging_handler)

def test_uri_default(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
Expand All @@ -24,53 +30,61 @@ def test_uri_default(self):
self.assertEqual(connection.parameters['heartbeat'], 60)
self.assertEqual(connection.parameters['timeout'], 30)
self.assertFalse(connection.parameters['ssl'])
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_hostname(self):
connection = \
UriConnection('amqp://guest:guest@my-server:5672/%2F?'
UriConnection('amqps://guest:guest@my-server:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['hostname'], str)
self.assertEqual(connection.parameters['hostname'], 'my-server')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_username(self):
connection = \
UriConnection('amqp://username:guest@localhost:5672/%2F?'
UriConnection('amqps://username:guest@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['username'], str)
self.assertEqual(connection.parameters['username'], 'username')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_password(self):
connection = \
UriConnection('amqp://guest:password@localhost:5672/%2F?'
UriConnection('amqps://guest:password@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['password'], str)
self.assertEqual(connection.parameters['password'], 'password')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_port(self):
connection = \
UriConnection('amqp://guest:guest@localhost:1337/%2F', True)
UriConnection('amqps://guest:guest@localhost:1337/%2F', True)
self.assertIsInstance(connection.parameters['port'], int)
self.assertEqual(connection.parameters['port'], 1337)
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_heartbeat(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
UriConnection('amqps://guest:guest@localhost:5672/%2F?'
'heartbeat=1337', True)
self.assertIsInstance(connection.parameters['heartbeat'], int)
self.assertEqual(connection.parameters['heartbeat'], 1337)
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_timeout(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F?'
UriConnection('amqps://guest:guest@localhost:5672/%2F?'
'timeout=1337', True)
self.assertIsInstance(connection.parameters['timeout'], int)
self.assertEqual(connection.parameters['timeout'], 1337)
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_virtual_host(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/travis', True)
UriConnection('amqps://guest:guest@localhost:5672/travis', True)
self.assertIsInstance(connection.parameters['virtual_host'], str)
self.assertEqual(connection.parameters['virtual_host'], 'travis')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_set_ssl(self):
connection = UriConnection('amqps://guest:guest@localhost:5671/%2F?'
Expand All @@ -90,34 +104,43 @@ def test_uri_set_ssl(self):
'file.crt')
self.assertEqual(connection.parameters['ssl_options']['ca_certs'],
'test')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_get_ssl_version(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.PROTOCOL_TLSv1,
connection._get_ssl_version('protocol_tlsv1'))
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_get_invalid_ssl_version(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
self.assertEqual(connection._get_ssl_version('protocol_test'),
ssl.PROTOCOL_TLSv1)
self.assertIn("ssl_options: ssl_version 'protocol_test' not found "
"falling back to PROTOCOL_TLSv1.",
self.logging_handler.messages['warning'][0])

def test_uri_get_ssl_validation(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
UriConnection('amqps://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.CERT_REQUIRED,
connection._get_ssl_validation('cert_required'))
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_get_invalid_ssl_validation(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
UriConnection('amqps://guest:guest@localhost:5672/%2F', True)
self.assertEqual(ssl.CERT_NONE,
connection._get_ssl_validation('cert_test'))
self.assertIn("ssl_options: cert_reqs 'cert_test' not found "
"falling back to CERT_NONE.",
self.logging_handler.messages['warning'][0])

def test_uri_get_ssl_options(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
UriConnection('amqps://guest:guest@localhost:5672/%2F', True)
ssl_kwargs = {
'cert_reqs': ['cert_required'],
'ssl_version': ['protocol_tlsv1'],
Expand All @@ -129,15 +152,18 @@ def test_uri_get_ssl_options(self):
self.assertEqual(ssl_options['ssl_version'], ssl.PROTOCOL_TLSv1)
self.assertEqual(ssl_options['keyfile'], 'file.key')
self.assertEqual(ssl_options['certfile'], 'file.crt')
self.assertFalse(self.logging_handler.messages['warning'])

def test_uri_invalid_ssl_options(self):
connection = \
UriConnection('amqp://guest:guest@localhost:5672/%2F', True)
UriConnection('amqps://guest:guest@localhost:5672/%2F', True)
ssl_kwargs = {
'unit_test': ['not_required'],
}
ssl_options = connection._parse_ssl_options(ssl_kwargs)
self.assertFalse(ssl_options)
self.assertIn("invalid option: unit_test",
self.logging_handler.messages['warning'][0])


class UriConnectionExceptionTests(unittest.TestCase):
Expand Down
6 changes: 3 additions & 3 deletions amqpstorm/uri_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ def _parse_uri_options(self, parsed_uri, use_ssl, lazy):
options = {
'ssl': use_ssl,
'virtual_host': urlparse.unquote(parsed_uri.path[1:]) or '/',
'heartbeat': int(kwargs.get('heartbeat', [60])[0]),
'timeout': int(kwargs.get('timeout', [30])[0]),
'heartbeat': int(kwargs.pop('heartbeat', [60])[0]),
'timeout': int(kwargs.pop('timeout', [30])[0]),
'lazy': lazy
}
if compatibility.SSL_SUPPORTED and use_ssl:
Expand All @@ -67,7 +67,7 @@ def _parse_ssl_options(self, ssl_kwargs):
ssl_options = {}
for key in ssl_kwargs:
if key not in compatibility.SSL_OPTIONS:
LOGGER.warning('invalid ssl option: %s', key)
LOGGER.warning('invalid option: %s', key)
continue
if 'ssl_version' in key:
value = self._get_ssl_version(ssl_kwargs[key][0])
Expand Down

0 comments on commit 287e708

Please sign in to comment.