From c9fb155d89ab1793be16a78c9f641e1e47f76aa6 Mon Sep 17 00:00:00 2001 From: "barryp@macbook.home" Date: Thu, 15 Jan 2009 11:16:09 -0600 Subject: [PATCH] A little pylint cleanup, add a unittest for sending large messages (with bodies of about 1.6K, 16K, and 160K bytes). --- amqplib/client_0_8/transport.py | 54 ++++++++++++++++++++++++++++---- tests/client_0_8/test_channel.py | 19 +++++++++++ 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/amqplib/client_0_8/transport.py b/amqplib/client_0_8/transport.py index 114455c..7e0a9e4 100644 --- a/amqplib/client_0_8/transport.py +++ b/amqplib/client_0_8/transport.py @@ -56,6 +56,31 @@ def __del__(self): self.close() + def _read(self, n): + """ + Read exactly n bytes from the peer + + """ + raise NotImplementedError('Must be overriden in subclass') + + + def _setup_transport(self): + """ + Do any additional initialization of the class (used + by the subclasses). + + """ + pass + + + def _write(self, s): + """ + Completely write a string to the peer. + + """ + raise NotImplementedError('Must be overriden in subclass') + + def close(self): if self.sock is not None: self.sock.close() @@ -87,7 +112,16 @@ def write_frame(self, frame_type, channel, payload): class SSLTransport(_AbstractTransport): + """ + Transport that works over SSL + + """ def _setup_transport(self): + """ + Wrap the socket in an sslobj, and use that + directly for _read() and _write(). + + """ self.sslobj = socket.ssl(self.sock) self._read = self.sslobj.read @@ -95,13 +129,18 @@ def _setup_transport(self): class TCPTransport(_AbstractTransport): + """ + Transport that deals directly with TCP socket. + + """ def _setup_transport(self): - self._read_buffer = '' + """ + Setup to _write() directly to the socket, and + do our own buffered reads. - # - # Make the _write() function a direct call to socket.sendall - # + """ self._write = self.sock.sendall + self._read_buffer = '' def _read(self, n): @@ -110,6 +149,9 @@ def _read(self, n): """ while len(self._read_buffer) < n: - self._read_buffer += self.sock.recv(4096) - result, self._read_buffer = self._read_buffer[:n], self._read_buffer[n:] + self._read_buffer += self.sock.recv(65536) + + result = self._read_buffer[:n] + self._read_buffer = self._read_buffer[n:] + return result diff --git a/tests/client_0_8/test_channel.py b/tests/client_0_8/test_channel.py index 79463dd..5c1401f 100755 --- a/tests/client_0_8/test_channel.py +++ b/tests/client_0_8/test_channel.py @@ -169,6 +169,25 @@ def test_exception(self): """ self.assertRaises(AMQPChannelException, self.ch.queue_delete, 'bogus_queue_that_does_not_exist') + def test_large(self): + """ + Test sending some extra large messages. + + """ + self.ch.access_request('/data', active=True, write=True, read=True) + + qname, _, _ = self.ch.queue_declare() + + for multiplier in [100, 1000, 10000]: + msg = Message('unittest message' * multiplier, + content_type='text/plain', + application_headers={'foo': 7, 'bar': 'baz'}) + + self.ch.basic_publish(msg, routing_key=qname) + + msg2 = self.ch.basic_get(no_ack=True) + self.assertEqual(msg, msg2) + def test_publish(self): tkt = self.ch.access_request('/data', active=True, write=True)