Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An asyncio hub for eventlet #870

Merged
merged 16 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
- { py: "3.10", toxenv: ipv6, ignore-error: false, os: ubuntu-latest }
- { py: "3.11", toxenv: py311-epolls, ignore-error: false, os: ubuntu-latest }
- { py: "3.12", toxenv: py312-epolls, ignore-error: false, os: ubuntu-latest }
- { py: "3.10", toxenv: py310-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: "3.11", toxenv: py311-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: "3.12", toxenv: py312-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: pypy3.9, toxenv: pypy3-epolls, ignore-error: true, os: ubuntu-20.04 }

steps:
Expand Down
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Contributors
* Victor Stinner
* Samuel Merritt
* Eric Urban
* Miguel Grinberg
itamarst marked this conversation as resolved.
Show resolved Hide resolved
* Tuomo Kriikkula

Linden Lab Contributors
Expand Down
5 changes: 4 additions & 1 deletion eventlet/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def hub_prevent_multiple_readers(state=True):
But if you really know what you are doing you can change the state
to ``False`` to stop the hub from protecting against this mistake.
"""
from eventlet.hubs import hub
from eventlet.hubs import hub, get_hub
from eventlet.hubs import asyncio
if not state and isinstance(get_hub(), asyncio.Hub):
raise RuntimeError("Multiple readers are not yet supported by asyncio hub")
hub.g_prevent_multiple_readers = state


Expand Down
146 changes: 146 additions & 0 deletions eventlet/hubs/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""

import asyncio
import os
import sys

from eventlet.hubs import hub


def is_available():
"""
Indicate whether this hub is available, since some hubs are
platform-specific.

Python always has asyncio, so this is always ``True``.
"""
return True


class Hub(hub.BaseHub):
"""An Eventlet hub implementation on top of an asyncio event loop."""

def __init__(self):
super().__init__()
self.sleep_event = asyncio.Event()
# The presumption is that eventlet is driving the event loop, so we
# want a new one we control.
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def add_timer(self, timer):
"""
Register a ``Timer``.

Typically not called directly by users.
"""
super().add_timer(timer)
self.sleep_event.set()

def _file_cb(self, cb, fileno):
"""
Callback called by ``asyncio`` when a file descriptor has an event.
"""
try:
cb(fileno)
except self.SYSTEM_EXCEPTIONS:
raise

Check warning on line 49 in eventlet/hubs/asyncio.py

View check run for this annotation

Codecov / codecov/patch

eventlet/hubs/asyncio.py#L49

Added line #L49 was not covered by tests
except:
self.squelch_exception(fileno, sys.exc_info())
self.sleep_event.set()

def add(self, evtype, fileno, cb, tb, mark_as_closed):
"""
Add a file descriptor of given event type to the ``Hub``. See the
superclass for details.

Typically not called directly by users.
"""
try:
os.fstat(fileno)
except OSError:
raise ValueError('Invalid file descriptor')
already_listening = self.listeners[evtype].get(fileno) is not None
listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
if not already_listening:
if evtype == hub.READ:
self.loop.add_reader(fileno, self._file_cb, cb, fileno)
else:
self.loop.add_writer(fileno, self._file_cb, cb, fileno)
return listener

def remove(self, listener):
"""
Remove a listener from the ``Hub``. See the superclass for details.

Typically not called directly by users.
"""
super().remove(listener)
evtype = listener.evtype
fileno = listener.fileno
if not self.listeners[evtype].get(fileno):
if evtype == hub.READ:
self.loop.remove_reader(fileno)
else:
self.loop.remove_writer(fileno)

def remove_descriptor(self, fileno):
"""
Remove a file descriptor from the ``asyncio`` loop.

Typically not called directly by users.
"""
have_read = self.listeners[hub.READ].get(fileno)
have_write = self.listeners[hub.WRITE].get(fileno)
super().remove_descriptor(fileno)
if have_read:
self.loop.remove_reader(fileno)
if have_write:
self.loop.remove_writer(fileno)

Check warning on line 101 in eventlet/hubs/asyncio.py

View check run for this annotation

Codecov / codecov/patch

eventlet/hubs/asyncio.py#L101

Added line #L101 was not covered by tests

def run(self, *a, **kw):
"""
Start the ``Hub`` running. See the superclass for details.
"""
async def async_run():
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
while self.closed:
# We ditch all of these first.
self.close_one()
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
try:
await asyncio.wait_for(self.sleep_event.wait(),
sleep_time)
except asyncio.TimeoutError:
pass
self.sleep_event.clear()
else:
await asyncio.sleep(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False

self.loop.run_until_complete(async_run())
8 changes: 8 additions & 0 deletions tests/greenio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import sys
import tempfile

import pytest

import eventlet
from eventlet import event, greenio, debug
from eventlet.hubs import get_hub
Expand Down Expand Up @@ -856,6 +858,12 @@ class TestGreenIoLong(tests.LimitedTestCase):
TEST_TIMEOUT = 10 # the test here might take a while depending on the OS

def test_multiple_readers(self):
from eventlet.hubs.asyncio import Hub
if isinstance(get_hub(), Hub):
itamarst marked this conversation as resolved.
Show resolved Hide resolved
with pytest.raises(RuntimeError):
debug.hub_prevent_multiple_readers(False)
return

debug.hub_prevent_multiple_readers(False)
recvsize = 2 * min_buf_size()
sendsize = 10 * recvsize
Expand Down
5 changes: 5 additions & 0 deletions tests/hub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import sys
import time

import pytest

import tests
from tests import skip_if_no_itimer, skip_unless
import eventlet
from eventlet import debug, hubs
from eventlet.hubs.asyncio import Hub as AsyncioHub
from eventlet.support import greenlets


Expand Down Expand Up @@ -81,6 +84,8 @@ def test_cancel_proportion(self):
eventlet.sleep()


@pytest.mark.skipif(isinstance(hubs.get_hub(), AsyncioHub),
reason="Asyncio hub doesn't yet support multiple readers")
class TestMultipleListenersCleanup(tests.LimitedTestCase):
def setUp(self):
super().setUp()
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ envlist =
py38-openssl
py39-dnspython1
pypy3-epolls
py{38,39,310,311,312}-{selects,poll,epolls}
py{38,39,310,311,312}-{selects,poll,epolls,asyncio}
skipsdist = True

[testenv:ipv6]
Expand Down Expand Up @@ -58,6 +58,7 @@ setenv =
selects: EVENTLET_HUB = selects
poll: EVENTLET_HUB = poll
epolls: EVENTLET_HUB = epolls
asyncio: EVENTLET_HUB = asyncio
tox_cover_args = --cov=eventlet
deps =
coverage
Expand Down