Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listener removes wrong guy #645

Merged
merged 2 commits into from Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 23 additions & 12 deletions eventlet/hubs/hub.py
Expand Up @@ -225,14 +225,18 @@ def remove(self, listener):

fileno = listener.fileno
evtype = listener.evtype
self.listeners[evtype].pop(fileno, None)
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype].get(fileno, None)
if not sec:
return
self.listeners[evtype][fileno] = sec.pop(0)
if not sec:
if listener is self.listeners[evtype][fileno]:
del self.listeners[evtype][fileno]
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype][fileno]
if sec:
self.listeners[evtype][fileno] = sec.pop(0)
if not sec:
del self.secondaries[evtype][fileno]
else:
self.secondaries[evtype][fileno].remove(listener)
if not self.secondaries[evtype][fileno]:
del self.secondaries[evtype][fileno]

def mark_as_reopened(self, fileno):
Expand All @@ -247,16 +251,23 @@ def mark_as_reopened(self, fileno):
def remove_descriptor(self, fileno):
""" Completely remove all listeners for this fileno. For internal use
only."""
# gather any listeners we have
listeners = []
listeners.append(self.listeners[READ].pop(fileno, noop))
listeners.append(self.listeners[WRITE].pop(fileno, noop))
listeners.extend(self.secondaries[READ].pop(fileno, ()))
listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
listeners.append(self.listeners[READ].get(fileno, noop))
listeners.append(self.listeners[WRITE].get(fileno, noop))
listeners.extend(self.secondaries[READ].get(fileno, ()))
listeners.extend(self.secondaries[WRITE].get(fileno, ()))
for listener in listeners:
try:
# listener.cb may want to remove(listener)
listener.cb(fileno)
except Exception:
self.squelch_generic_exception(sys.exc_info())
# NOW this fileno is now dead to all
self.listeners[READ].pop(fileno, None)
self.listeners[WRITE].pop(fileno, None)
self.secondaries[READ].pop(fileno, None)
self.secondaries[WRITE].pop(fileno, None)

def close_one(self):
""" Triggered from the main run loop. If a listener's underlying FD was
Expand Down
47 changes: 46 additions & 1 deletion tests/hub_test.py
@@ -1,11 +1,14 @@
from __future__ import with_statement
import errno
import fcntl
import os
import sys
import time

import tests
from tests import skip_with_pyevent, skip_if_no_itimer, skip_unless
import eventlet
from eventlet import hubs
from eventlet import debug, hubs
from eventlet.support import greenlets
import six

Expand Down Expand Up @@ -83,6 +86,48 @@ def test_cancel_proportion(self):
eventlet.sleep()


class TestMultipleListenersCleanup(tests.LimitedTestCase):
def setUp(self):
super(TestMultipleListenersCleanup, self).setUp()
debug.hub_prevent_multiple_readers(False)
debug.hub_exceptions(False)

def tearDown(self):
super(TestMultipleListenersCleanup, self).tearDown()
debug.hub_prevent_multiple_readers(True)
debug.hub_exceptions(True)

def test_cleanup(self):
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)

fcntl.fcntl(r, fcntl.F_SETFL,
fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)

def readfd(fd):
while True:
try:
return os.read(fd, 1)
except OSError as e:
if e.errno != errno.EAGAIN:
raise
hubs.trampoline(fd, read=True)

first_listener = eventlet.spawn(readfd, r)
eventlet.sleep()

second_listener = eventlet.spawn(readfd, r)
eventlet.sleep()

hubs.get_hub().schedule_call_global(0, second_listener.throw,
eventlet.Timeout(None))
eventlet.sleep()

os.write(w, b'.')
self.assertEqual(first_listener.wait(), b'.')


class TestScheduleCall(tests.LimitedTestCase):

def test_local(self):
Expand Down