Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support, tests, and docs for login_method with Qpid transport. #502

Merged
merged 1 commit into from Aug 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 36 additions & 13 deletions kombu/tests/transport/test_qpid.py
Expand Up @@ -1693,8 +1693,9 @@ class MockClient(object):
self.client.connect_timeout = 4
self.client.ssl = False
self.client.transport_options = {}
self.client.userid = ''
self.client.password = ''
self.client.userid = None
self.client.password = None
self.client.login_method = None
self.transport = Transport(self.client)
self.mock_conn = Mock()
self.transport.Connection = self.mock_conn
Expand All @@ -1706,11 +1707,12 @@ def tearDown(self):
self.patcher.stop()

def test_transport_establish_conn_new_option_overwrites_default(self):
new_userid_string = 'new-userid'
self.client.userid = new_userid_string
self.client.userid = 'new-userid'
self.client.password = 'new-password'
self.transport.establish_connection()
self.mock_conn.assert_called_once_with(
username=new_userid_string,
username=self.client.userid,
password=self.client.password,
sasl_mechanisms='PLAIN',
host='127.0.0.1',
timeout=4,
Expand Down Expand Up @@ -1752,10 +1754,38 @@ def test_transport_establish_conn_transform_localhost_to_127_0_0_1(self):
transport='tcp',
)

def test_transport_password_no_username_raises_exception(self):
def test_transport_password_no_userid_raises_exception(self):
self.client.password = 'somepass'
self.assertRaises(Exception, self.transport.establish_connection)

def test_transport_userid_no_password_raises_exception(self):
self.client.userid = 'someusername'
self.assertRaises(Exception, self.transport.establish_connection)

def test_transport_overrides_sasl_mech_from_login_method(self):
self.client.login_method = 'EXTERNAL'
self.transport.establish_connection()
self.mock_conn.assert_called_once_with(
sasl_mechanisms='EXTERNAL',
host='127.0.0.1',
timeout=4,
port=5672,
transport='tcp',
)

def test_transport_overrides_sasl_mech_has_username(self):
self.client.userid = 'new-userid'
self.client.login_method = 'EXTERNAL'
self.transport.establish_connection()
self.mock_conn.assert_called_once_with(
username=self.client.userid,
sasl_mechanisms='EXTERNAL',
host='127.0.0.1',
timeout=4,
port=5672,
transport='tcp',
)

def test_transport_establish_conn_set_password(self):
self.client.userid = 'someuser'
self.client.password = 'somepass'
Expand Down Expand Up @@ -1867,9 +1897,6 @@ class TestTransportClassAttributes(Case):
def test_verify_Connection_attribute(self):
self.assertIs(Connection, Transport.Connection)

def test_verify_default_port(self):
self.assertEqual(5672, Transport.default_port)

def test_verify_polling_disabled(self):
self.assertIsNone(Transport.polling_interval)

Expand Down Expand Up @@ -2016,11 +2043,7 @@ def test_default_connection_params(self):
"""Test that the default_connection_params are correct"""
correct_params = {
'hostname': 'localhost',
'password': None,
'port': 5672,
'sasl_mechanisms': 'ANONYMOUS',
'userid': None,
'virtual_host': '',
}
my_transport = Transport(self.mock_client)
result_params = my_transport.default_connection_params
Expand Down
115 changes: 87 additions & 28 deletions kombu/transport/qpid.py
Expand Up @@ -27,6 +27,55 @@
This transport should be used with caution due to a known
potential deadlock. See `Issue 2199`_ for more details.

Authentication
==============

This transport supports SASL authentication with the Qpid broker. Normally,
SASL mechanisms are negotiated from a client list and a server list of
possible mechanisms, but in practice, different SASL client libraries give
different behaviors. These different behaviors cause the expected SASL
mechanism to not be selected in many cases. As such, this transport restricts
the mechanism types based on Kombu's configuration according to the following
table.

+------------------------------------+--------------------+
| **Broker String** | **SASL Mechanism** |
+------------------------------------+--------------------+
| qpid://hostname/ | ANONYMOUS |
+------------------------------------+--------------------+
| qpid://username:password@hostname/ | PLAIN |
+------------------------------------+--------------------+
| see instructions below | EXTERNAL |
+------------------------------------+--------------------+

The user can override the above SASL selection behaviors and specify the SASL
string using the :attr:`~kombu.Connection.login_method` argument to the
:class:`~kombu.Connection` object. The string can be a single SASL mechanism
or a space separated list of SASL mechanisms. If you are using Celery with
Kombu, this can be accomplished by setting the *BROKER_LOGIN_METHOD* Celery
option.

.. note::

While using SSL, Qpid users may want to override the SASL mechanism to
use *EXTERNAL*. In that case, Qpid requires a username to be presented
that matches the *CN* of the SSL client certificate. Ensure that the
broker string contains the corresponding username. For example, if the
client certificate has *CN=asdf* and the client connects to *example.com*
on port 5671, the broker string should be:

**qpid://asdf@example.com:5671/**

Transport Options
=================

The :attr:`~kombu.Connection.transport_options` argument to the
:class:`~kombu.Connection` object are passed directly to the
:class:`qpid.messaging.endpoints.Connection` as keyword arguments. These
options override and replace any other default or specified values. If using
Celery with Kombu, this can be accomplished by setting the
*BROKER_TRANSPORT_OPTIONS* Celery option.

"""
from __future__ import absolute_import

Expand Down Expand Up @@ -77,8 +126,6 @@
logger = get_logger(__name__)


DEFAULT_PORT = 5672

OBJECT_ALREADY_EXISTS_STRING = 'object already exists'

VERSION = (1, 0, 0)
Expand Down Expand Up @@ -655,13 +702,14 @@ def exchange_declare(self, exchange='', type='direct', durable=False,
functionality.

:keyword type: The exchange type. Valid values include 'direct',
'topic', and 'fanout'.
'topic', and 'fanout'.
:type type: str
:keyword exchange: The name of the exchange to be created. If no
exchange is specified, then a blank string will be used as the name.
exchange is specified, then a blank string will be used as the
name.
:type exchange: str
:keyword durable: True if the exchange should be durable, or False
otherwise.
otherwise.
:type durable: bool
"""
options = {'durable': durable}
Expand Down Expand Up @@ -1211,8 +1259,10 @@ def __init__(self, **connection_options):
establish = qpid.messaging.Connection.establish

# There are several inconsistent behaviors in the sasl libraries
# used on different systems. This implementation uses only
# advertises one type to the server either ANONYMOUS or PLAIN.
# used on different systems. Although qpid.messaging allows
# multiple space separated sasl mechanisms, this implementation
# only advertises one type to the server. These are either
# ANONYMOUS, PLAIN, or an overridden value specified by the user.

sasl_mech = connection_options['sasl_mechanisms']

Expand Down Expand Up @@ -1246,7 +1296,6 @@ def __init__(self, **connection_options):
raise AuthenticationFailure(sys.exc_info()[1])
raise


def get_qpid_connection(self):
"""Return the existing connection (singleton).

Expand Down Expand Up @@ -1393,9 +1442,6 @@ class Transport(base.Transport):
# Reference to the class that should be used as the Connection object
Connection = Connection

# The default port
default_port = DEFAULT_PORT

# This Transport does not specify a polling interval.
polling_interval = None

Expand Down Expand Up @@ -1602,22 +1648,39 @@ def establish_connection(self):
conninfo.transport_options['ssl_skip_hostname_check'] = True
else:
conninfo.qpid_transport = 'tcp'
opts = dict({

credentials = {}
if conninfo.login_method is None:
if conninfo.userid is not None and conninfo.password is not None:
sasl_mech = 'PLAIN'
credentials['username'] = conninfo.userid
credentials['password'] = conninfo.password
elif conninfo.userid is None and conninfo.password is not None:
raise Exception(
'Password configured but no username. SASL PLAIN '
'requires a username when using a password.')
elif conninfo.userid is not None and conninfo.password is None:
raise Exception(
'Username configured but no password. SASL PLAIN '
'requires a password when using a username.')
else:
sasl_mech = 'ANONYMOUS'
else:
sasl_mech = conninfo.login_method
if conninfo.userid is not None:
credentials['username'] = conninfo.userid

opts = {
'host': conninfo.hostname,
'port': conninfo.port,
'sasl_mechanisms': conninfo.sasl_mechanisms,
'sasl_mechanisms': sasl_mech,
'timeout': conninfo.connect_timeout,
'transport': conninfo.qpid_transport
}, **conninfo.transport_options or {})
if conninfo.userid is not None:
opts['username'] = conninfo.userid
opts['sasl_mechanisms'] = 'PLAIN'
elif conninfo.password is not None:
raise Exception(
'Password configured but no username. A username is '
'required when using a password.')
if conninfo.password is not None:
opts['password'] = conninfo.password
}

opts.update(credentials)
opts.update(conninfo.transport_options)

conn = self.Connection(**opts)
conn.client = self.client
self.session = conn.get_qpid_connection().session()
Expand Down Expand Up @@ -1699,11 +1762,7 @@ def default_connection_params(self):
"""
return {
'hostname': 'localhost',
'password': None,
'port': self.default_port,
'sasl_mechanisms': 'ANONYMOUS',
'userid': None,
'virtual_host': ''
'port': 5672,
}

def __del__(self):
Expand Down