Skip to content

Commit

Permalink
A little pylint cleanup, add a unittest for sending large messages
Browse files Browse the repository at this point in the history
(with bodies of about 1.6K, 16K, and 160K bytes).
  • Loading branch information
barryp@macbook.home committed Jan 15, 2009
1 parent 1c91a0d commit c9fb155
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
54 changes: 48 additions & 6 deletions amqplib/client_0_8/transport.py
Expand Up @@ -56,6 +56,31 @@ def __del__(self):
self.close()


def _read(self, n):
"""
Read exactly n bytes from the peer
"""
raise NotImplementedError('Must be overriden in subclass')


def _setup_transport(self):
"""
Do any additional initialization of the class (used
by the subclasses).
"""
pass


def _write(self, s):
"""
Completely write a string to the peer.
"""
raise NotImplementedError('Must be overriden in subclass')


def close(self):
if self.sock is not None:
self.sock.close()
Expand Down Expand Up @@ -87,21 +112,35 @@ def write_frame(self, frame_type, channel, payload):


class SSLTransport(_AbstractTransport):
"""
Transport that works over SSL
"""
def _setup_transport(self):
"""
Wrap the socket in an sslobj, and use that
directly for _read() and _write().
"""
self.sslobj = socket.ssl(self.sock)

self._read = self.sslobj.read
self._write = self.sslobj.write


class TCPTransport(_AbstractTransport):
"""
Transport that deals directly with TCP socket.
"""
def _setup_transport(self):
self._read_buffer = ''
"""
Setup to _write() directly to the socket, and
do our own buffered reads.
#
# Make the _write() function a direct call to socket.sendall
#
"""
self._write = self.sock.sendall
self._read_buffer = ''


def _read(self, n):
Expand All @@ -110,6 +149,9 @@ def _read(self, n):
"""
while len(self._read_buffer) < n:
self._read_buffer += self.sock.recv(4096)
result, self._read_buffer = self._read_buffer[:n], self._read_buffer[n:]
self._read_buffer += self.sock.recv(65536)

result = self._read_buffer[:n]
self._read_buffer = self._read_buffer[n:]

return result
19 changes: 19 additions & 0 deletions tests/client_0_8/test_channel.py
Expand Up @@ -169,6 +169,25 @@ def test_exception(self):
"""
self.assertRaises(AMQPChannelException, self.ch.queue_delete, 'bogus_queue_that_does_not_exist')

def test_large(self):
"""
Test sending some extra large messages.
"""
self.ch.access_request('/data', active=True, write=True, read=True)

qname, _, _ = self.ch.queue_declare()

for multiplier in [100, 1000, 10000]:
msg = Message('unittest message' * multiplier,
content_type='text/plain',
application_headers={'foo': 7, 'bar': 'baz'})

self.ch.basic_publish(msg, routing_key=qname)

msg2 = self.ch.basic_get(no_ack=True)
self.assertEqual(msg, msg2)


def test_publish(self):
tkt = self.ch.access_request('/data', active=True, write=True)
Expand Down

0 comments on commit c9fb155

Please sign in to comment.