Skip to content

Commit

Permalink
Fix second simultaneous read (parallel paramiko issue)
Browse files Browse the repository at this point in the history
#94

Because of the way paramiko utilises a client thread to manage its
communication, it's not been compatible with eventlet when run in
parallel.

It's not the only place these problems would arise.

This stemmed from the reuse of a fileno by the underlying OS.
Because listeners are registered against this descriptor, it would
be possible for old listeners to receive events destined for newer
descriptors; occasionally code would attempt to utilise the new
descriptor from a different greenlet, giving rise to the 'second
simultaneous read' problem.

Whenever a Python object is created to wrap one of these filenos,
we now signal the hub in order that it can correctly obsolete
extant listeners against that fileno. This is a fairly tricky
operation, due to the way that listeners' threads are interleaved
with the hub's operation - there are a number of small fixes here
to defend against one listener from effectively obsoleting another
when an event is pending against it.
  • Loading branch information
Jan Grant authored and temoto committed Aug 11, 2014
1 parent 2f49f86 commit da87716
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 62 deletions.
43 changes: 43 additions & 0 deletions eventlet/green/builtin.py
@@ -0,0 +1,43 @@
"""
In order to detect a filehandle that's been closed, our only clue may be
the operating system returning the same filehandle in response to some
other operation.
The builtins 'file' and 'open' are patched to collaborate with the
notify_opened protocol.
"""

builtins_orig = __builtins__

from eventlet import hubs
from eventlet.hubs import hub
from eventlet.patcher import slurp_properties
import sys

__all__ = dir(builtins_orig)
__patched__ = ['file', 'open']

slurp_properties(builtins_orig, globals(),
ignore=__patched__, srckeys=dir(builtins_orig))

hubs.get_hub()

__original_file = file
class file(__original_file):
def __init__(self, *args, **kwargs):
super(file, self).__init__(*args, **kwargs)
hubs.notify_opened(self.fileno())

__original_open = open
__opening = False
def open(*args):
global __opening
result = __original_open(*args)
if not __opening:
# This is incredibly ugly. 'open' is used under the hood by
# the import process. So, ensure we don't wind up in an
# infinite loop.
__opening = True
hubs.notify_opened(result.fileno())
__opening = False
return result
17 changes: 14 additions & 3 deletions eventlet/green/os.py
Expand Up @@ -9,7 +9,7 @@
from eventlet.patcher import slurp_properties

__all__ = os_orig.__all__
__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid']
__patched__ = ['fdopen', 'read', 'write', 'wait', 'waitpid', 'open']

slurp_properties(os_orig, globals(),
ignore=__patched__, srckeys=dir(os_orig))
Expand Down Expand Up @@ -40,7 +40,10 @@ def read(fd, n):
if get_errno(e) == errno.EPIPE:
return ''
raise
hubs.trampoline(fd, read=True)
try:
hubs.trampoline(fd, read=True)
except hubs.IOClosed:
return ''

__original_write__ = os_orig.write
def write(fd, st):
Expand Down Expand Up @@ -81,4 +84,12 @@ def waitpid(pid, options):
return rpid, status
greenthread.sleep(0.01)

# TODO: open
__original_open__ = os_orig.open
def open(file, flags, mode=0777):
""" Wrap os.open
This behaves identically, but collaborates with
the hub's notify_opened protocol.
"""
fd = __original_open__(file, flags, mode)
hubs.notify_opened(fd)
return fd
16 changes: 11 additions & 5 deletions eventlet/green/ssl.py
Expand Up @@ -8,7 +8,7 @@
time = __import__('time')

from eventlet.support import get_errno
from eventlet.hubs import trampoline
from eventlet.hubs import trampoline, IOClosed
from eventlet.greenio import set_nonblocking, GreenSocket, SOCKET_CLOSED, CONNECT_ERR, CONNECT_SUCCESS
orig_socket = __import__('socket')
socket = orig_socket.socket
Expand Down Expand Up @@ -98,8 +98,11 @@ def write(self, data):
def read(self, len=1024):
"""Read up to LEN bytes and return them.
Return zero-length string on EOF."""
return self._call_trampolining(
super(GreenSSLSocket, self).read, len)
try:
return self._call_trampolining(
super(GreenSSLSocket, self).read, len)
except IOClosed:
return ''

def send (self, data, flags=0):
if self._sslobj:
Expand Down Expand Up @@ -164,8 +167,11 @@ def recv(self, buflen=1024, flags=0):
if self.act_non_blocking:
raise
if get_errno(e) == errno.EWOULDBLOCK:
trampoline(self, read=True,
timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out'))
try:
trampoline(self, read=True,
timeout=self.gettimeout(), timeout_exc=timeout_exc('timed out'))
except IOClosed:
return ''
if get_errno(e) in SOCKET_CLOSED:
return ''
raise
Expand Down
129 changes: 110 additions & 19 deletions eventlet/greenio.py
Expand Up @@ -7,7 +7,7 @@
import warnings

from eventlet.support import get_errno, six
from eventlet.hubs import trampoline
from eventlet.hubs import trampoline, notify_close, notify_opened, IOClosed

__all__ = ['GreenSocket', 'GreenPipe', 'shutdown_safe']

Expand Down Expand Up @@ -122,6 +122,8 @@ def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
should_set_nonblocking = kwargs.pop('set_nonblocking', True)
if isinstance(family_or_realsock, six.integer_types):
fd = _original_socket(family_or_realsock, *args, **kwargs)
# Notify the hub that this is a newly-opened socket.
notify_opened(fd.fileno())
else:
fd = family_or_realsock

Expand Down Expand Up @@ -151,6 +153,7 @@ def __init__(self, family_or_realsock=socket.AF_INET, *args, **kwargs):
self.listen = fd.listen
self.setsockopt = fd.setsockopt
self.shutdown = fd.shutdown
self._closed = False

@property
def _sock(self):
Expand All @@ -166,6 +169,25 @@ def __getattr__(self, name):
setattr(self, name, attr)
return attr

def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
""" We need to trampoline via the event hub.
We catch any signal back from the hub indicating that the operation we
were waiting on was associated with a filehandle that's since been
invalidated.
"""
if self._closed:
# If we did any logging, alerting to a second trampoline attempt on a closed
# socket here would be useful.
raise IOClosed()
try:
return trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"),
mark_as_closed=self._mark_as_closed)
except IOClosed:
# This socket's been obsoleted. De-fang it.
self._mark_as_closed()
raise

def accept(self):
if self.act_non_blocking:
return self.fd.accept()
Expand All @@ -176,16 +198,37 @@ def accept(self):
client, addr = res
set_nonblocking(client)
return type(self)(client), addr
trampoline(fd, read=True, timeout=self.gettimeout(),
self._trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))

def _defanged_close(self):
# Already closed the once
pass


def _mark_as_closed(self):
""" Mark this socket as being closed """
self.close = self._defanged_close
self._closed = True

def close(self):
notify_close(self.fd)
self._mark_as_closed() # Don't do this twice.
return self.fd.close()

def __del__(self):
self.close()

def connect(self, address):
if self.act_non_blocking:
return self.fd.connect(address)
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
trampoline(fd, write=True)
try:
self._trampoline(fd, write=True)
except IOClosed:
raise socket.error(errno.EBADFD)
socket_checkerr(fd)
else:
end = time.time() + self.gettimeout()
Expand All @@ -194,8 +237,12 @@ def connect(self, address):
return
if time.time() >= end:
raise socket.timeout("timed out")
trampoline(fd, write=True, timeout=end - time.time(),
try:
self._trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout("timed out"))
except IOClosed:
# ... we need some workable errno here.
raise socket.error(errno.EBADFD)
socket_checkerr(fd)

def connect_ex(self, address):
Expand All @@ -205,10 +252,12 @@ def connect_ex(self, address):
if self.gettimeout() is None:
while not socket_connect(fd, address):
try:
trampoline(fd, write=True)
self._trampoline(fd, write=True)
socket_checkerr(fd)
except socket.error as ex:
return get_errno(ex)
except IOClosed:
return errno.EBADFD
else:
end = time.time() + self.gettimeout()
while True:
Expand All @@ -217,11 +266,13 @@ def connect_ex(self, address):
return 0
if time.time() >= end:
raise socket.timeout(errno.EAGAIN)
trampoline(fd, write=True, timeout=end - time.time(),
self._trampoline(fd, write=True, timeout=end - time.time(),
timeout_exc=socket.timeout(errno.EAGAIN))
socket_checkerr(fd)
except socket.error as ex:
return get_errno(ex)
except IOClosed:
return errno.EBADFD

def dup(self, *args, **kw):
sock = self.fd.dup(*args, **kw)
Expand Down Expand Up @@ -255,27 +306,31 @@ def recv(self, buflen, flags=0):
return ''
else:
raise
trampoline(
fd,
read=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
try:
self._trampoline(
fd,
read=True,
timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
except IOClosed as e:
# Perhaps we should return '' instead?
raise EOFError()

def recvfrom(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(),
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom(*args)

def recvfrom_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(),
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom_into(*args)

def recv_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(),
self._trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recv_into(*args)

Expand All @@ -297,8 +352,11 @@ def send(self, data, flags=0):
if total_sent == len_data:
break

trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
try:
self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
except IOClosed:
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')

return total_sent

Expand All @@ -309,7 +367,7 @@ def sendall(self, data, flags=0):
tail += self.send(data[tail:], flags)

def sendto(self, *args):
trampoline(self.fd, write=True)
self._trampoline(self.fd, write=True)
return self.fd.sendto(*args)

def setblocking(self, flag):
Expand Down Expand Up @@ -357,6 +415,30 @@ class _SocketDuckForFd(object):

def __init__(self, fileno):
self._fileno = fileno
notify_opened(fileno)
self._closed = False

def _trampoline(self, fd, read=False, write=False, timeout=None, timeout_exc=None):
if self._closed:
# Don't trampoline if we're already closed.
raise IOClosed()
try:
return trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"),
mark_as_closed=self.mark_as_closed)
except IOClosed:
# Our fileno has been obsoleted. Defang ourselves to
# prevent spurious closes.
self._mark_as_closed()
raise

def _defanged_close(self):
# Don't let anything close the wrong filehandle.
pass

def _mark_as_closed(self):
self.close = self._close = self._defanged_close
self._closed = True

@property
def _sock(self):
Expand All @@ -373,7 +455,7 @@ def recv(self, buflen):
except OSError as e:
if get_errno(e) not in SOCKET_BLOCKING:
raise IOError(*e.args)
trampoline(self, read=True)
self._trampoline(self, read=True)

def recv_into(self, buf, nbytes=0, flags=0):
if nbytes == 0:
Expand Down Expand Up @@ -402,7 +484,7 @@ def sendall(self, data):
raise IOError(*e.args)
total_sent = 0
while total_sent < len_data:
trampoline(self, write=True)
self._trampoline(self, write=True)
try:
total_sent += os_write(fileno, data[total_sent:])
except OSError as e:
Expand All @@ -413,6 +495,8 @@ def __del__(self):
self._close()

def _close(self):
notify_close(self._fileno)
self._mark_as_closed()
try:
os.close(self._fileno)
except:
Expand Down Expand Up @@ -484,7 +568,14 @@ def __repr__(self):
self.mode,
(id(self) < 0) and (sys.maxint + id(self)) or id(self))

def _defanged_close(self):
pass

def _mark_as_closed(self):
self.close = self._defanged_close

def close(self):
self._mark_as_closed()
super(GreenPipe, self).close()
for method in [
'fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
Expand Down

0 comments on commit da87716

Please sign in to comment.