Skip to content

Commit

Permalink
Fixes async read of large message payloads. Fix for celery/celery#922
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Aug 29, 2012
1 parent cf06eba commit e195575
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions amqp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
from __future__ import absolute_import

import errno
import re
import socket

Expand Down Expand Up @@ -103,7 +104,7 @@ def __del__(self):
finally:
self.sock = None

def _read(self, n):
def _read(self, n, initial=False):
"""Read exactly n bytes from the peer"""
raise NotImplementedError('Must be overriden in subclass')

Expand Down Expand Up @@ -132,7 +133,7 @@ def close(self):

def read_frame(self):
"""Read an AMQP frame."""
frame_type, channel, size = unpack('>BHI', self._read(7))
frame_type, channel, size = unpack('>BHI', self._read(7, True))
payload = self._read(size)
ch = ord(self._read(1))
if ch == 206: # '\xce'
Expand Down Expand Up @@ -176,15 +177,20 @@ def _shutdown_transport(self):
self.sock = self.sslobj.unwrap()
self.sslobj = None

def _read(self, n):
def _read(self, n, initial=False):
"""It seems that SSL Objects read() method may not supply as much
as you're asking for, at least with extremely large messages.
somewhere > 16K - found this in the test_channel.py test_large
unittest."""
result = self.sslobj.read(n)
result = ''

while len(result) < n:
s = self.sslobj.read(n - len(result))
try:
s = self.sslobj.read(n - len(result))
except socket.error, exc:
if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
continue
raise
if not s:
raise IOError('Socket closed')
result += s
Expand All @@ -209,10 +215,15 @@ def _setup_transport(self):
self._write = self.sock.sendall
self._read_buffer = bytes()

def _read(self, n):
def _read(self, n, initial=False):
"""Read exactly n bytes from the socket"""
while len(self._read_buffer) < n:
s = self.sock.recv(65536)
try:
s = self.sock.recv(65536)
except socket.error, exc:
if not initial and exc.errno in (errno.EAGAIN, errno.EINTR):
continue
raise
if not s:
raise IOError('Socket closed')
self._read_buffer += s
Expand Down

0 comments on commit e195575

Please sign in to comment.