|
|
@@ -3,13 +3,15 @@ |
|
|
import pytest
|
|
|
import socket
|
|
|
|
|
|
import warnings
|
|
|
from case import ContextMock, Mock, call
|
|
|
|
|
|
from amqp import Connection
|
|
|
from amqp import spec
|
|
|
from amqp.connection import SSLError
|
|
|
from amqp.exceptions import ConnectionError, NotFound, ResourceError
|
|
|
from amqp.five import items
|
|
|
from amqp.sasl import SASL, AMQPLAIN, PLAIN
|
|
|
from amqp.transport import TCPTransport
|
|
|
|
|
|
|
|
|
@@ -22,16 +24,35 @@ def setup_conn(self): |
|
|
self.conn = Connection(
|
|
|
frame_handler=self.frame_handler,
|
|
|
frame_writer=self.frame_writer,
|
|
|
authentication=AMQPLAIN('foo', 'bar'),
|
|
|
)
|
|
|
self.conn.Channel = Mock(name='Channel')
|
|
|
self.conn.Transport = Mock(name='Transport')
|
|
|
self.conn.transport = self.conn.Transport.return_value
|
|
|
self.conn.send_method = Mock(name='send_method')
|
|
|
self.conn.frame_writer = Mock(name='frame_writer')
|
|
|
|
|
|
def test_login_response(self):
|
|
|
self.conn = Connection(login_response='foo')
|
|
|
assert self.conn.login_response == 'foo'
|
|
|
def test_sasl_authentication(self):
|
|
|
authentication = SASL()
|
|
|
self.conn = Connection(authentication=authentication)
|
|
|
assert self.conn.authentication == (authentication,)
|
|
|
|
|
|
def test_sasl_authentication_iterable(self):
|
|
|
authentication = SASL()
|
|
|
self.conn = Connection(authentication=(authentication,))
|
|
|
assert self.conn.authentication == (authentication,)
|
|
|
|
|
|
def test_amqplain(self):
|
|
|
self.conn = Connection(userid='foo', password='bar')
|
|
|
assert isinstance(self.conn.authentication[1], AMQPLAIN)
|
|
|
assert self.conn.authentication[1].username == 'foo'
|
|
|
assert self.conn.authentication[1].password == 'bar'
|
|
|
|
|
|
def test_plain(self):
|
|
|
self.conn = Connection(userid='foo', password='bar')
|
|
|
assert isinstance(self.conn.authentication[2], PLAIN)
|
|
|
assert self.conn.authentication[2].username == 'foo'
|
|
|
assert self.conn.authentication[2].password == 'bar'
|
|
|
|
|
|
def test_enter_exit(self):
|
|
|
self.conn.connect = Mock(name='connect')
|
|
|
@@ -68,47 +89,80 @@ def test_connect__already_connected(self): |
|
|
callback.assert_called_with()
|
|
|
|
|
|
def test_on_start(self):
|
|
|
self.conn._on_start(3, 4, {'foo': 'bar'}, 'x y z', 'en_US en_GB')
|
|
|
self.conn._on_start(3, 4, {'foo': 'bar'}, b'x y z AMQPLAIN PLAIN',
|
|
|
'en_US en_GB')
|
|
|
assert self.conn.version_major == 3
|
|
|
assert self.conn.version_minor == 4
|
|
|
assert self.conn.server_properties == {'foo': 'bar'}
|
|
|
assert self.conn.mechanisms == ['x', 'y', 'z']
|
|
|
assert self.conn.mechanisms == [b'x', b'y', b'z',
|
|
|
b'AMQPLAIN', b'PLAIN']
|
|
|
assert self.conn.locales == ['en_US', 'en_GB']
|
|
|
self.conn.send_method.assert_called_with(
|
|
|
spec.Connection.StartOk, 'FsSs', (
|
|
|
self.conn.client_properties, self.conn.login_method,
|
|
|
self.conn.login_response, self.conn.locale,
|
|
|
self.conn.client_properties, b'AMQPLAIN',
|
|
|
self.conn.authentication[0].start(self.conn), self.conn.locale,
|
|
|
),
|
|
|
)
|
|
|
|
|
|
def test_missing_credentials(self):
|
|
|
with pytest.raises(ValueError):
|
|
|
self.conn = Connection(userid=None, password=None)
|
|
|
with pytest.raises(ValueError):
|
|
|
self.conn = Connection(password=None)
|
|
|
|
|
|
def test_mechanism_mismatch(self):
|
|
|
with pytest.raises(ConnectionError):
|
|
|
self.conn._on_start(3, 4, {'foo': 'bar'}, b'x y z',
|
|
|
'en_US en_GB')
|
|
|
|
|
|
def test_login_method_response(self):
|
|
|
# An old way of doing things.:
|
|
|
login_method, login_response = b'foo', b'bar'
|
|
|
with warnings.catch_warnings(record=True) as w:
|
|
|
warnings.simplefilter("always")
|
|
|
self.conn = Connection(login_method=login_method,
|
|
|
login_response=login_response)
|
|
|
self.conn.send_method = Mock(name='send_method')
|
|
|
self.conn._on_start(3, 4, {'foo': 'bar'}, login_method,
|
|
|
'en_US en_GB')
|
|
|
assert len(w) == 1
|
|
|
assert issubclass(w[0].category, DeprecationWarning)
|
|
|
|
|
|
self.conn.send_method.assert_called_with(
|
|
|
spec.Connection.StartOk, 'FsSs', (
|
|
|
self.conn.client_properties, login_method,
|
|
|
login_response, self.conn.locale,
|
|
|
),
|
|
|
)
|
|
|
|
|
|
def test_on_start__consumer_cancel_notify(self):
|
|
|
self.conn._on_start(
|
|
|
3, 4, {'capabilities': {'consumer_cancel_notify': 1}},
|
|
|
'', '',
|
|
|
b'AMQPLAIN', '',
|
|
|
)
|
|
|
cap = self.conn.client_properties['capabilities']
|
|
|
assert cap['consumer_cancel_notify']
|
|
|
|
|
|
def test_on_start__connection_blocked(self):
|
|
|
self.conn._on_start(
|
|
|
3, 4, {'capabilities': {'connection.blocked': 1}},
|
|
|
'', '',
|
|
|
b'AMQPLAIN', '',
|
|
|
)
|
|
|
cap = self.conn.client_properties['capabilities']
|
|
|
assert cap['connection.blocked']
|
|
|
|
|
|
def test_on_start__authentication_failure_close(self):
|
|
|
self.conn._on_start(
|
|
|
3, 4, {'capabilities': {'authentication_failure_close': 1}},
|
|
|
'', '',
|
|
|
b'AMQPLAIN', '',
|
|
|
)
|
|
|
cap = self.conn.client_properties['capabilities']
|
|
|
assert cap['authentication_failure_close']
|
|
|
|
|
|
def test_on_start__authentication_failure_close__disabled(self):
|
|
|
self.conn._on_start(
|
|
|
3, 4, {'capabilities': {}},
|
|
|
'', '',
|
|
|
b'AMQPLAIN', '',
|
|
|
)
|
|
|
assert 'capabilities' not in self.conn.client_properties
|
|
|
|
|
|
|