Skip to content

Commit

Permalink
An asyncio hub for eventlet (#870)
Browse files Browse the repository at this point in the history
An asyncio hub based on a prototype by Miguel Grinberg: https://gist.github.com/miguelgrinberg/829a20792d7283ae27b1f6a390c378b9

---------

Co-authored-by: Itamar Turner-Trauring <itamar@pythonspeed.com>
  • Loading branch information
itamarst and pythonspeed committed Jan 11, 2024
1 parent 268c4d7 commit df81d5d
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ jobs:
- { py: "3.10", toxenv: ipv6, ignore-error: false, os: ubuntu-latest }
- { py: "3.11", toxenv: py311-epolls, ignore-error: false, os: ubuntu-latest }
- { py: "3.12", toxenv: py312-epolls, ignore-error: false, os: ubuntu-latest }
- { py: "3.10", toxenv: py310-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: "3.11", toxenv: py311-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: "3.12", toxenv: py312-asyncio, ignore-error: false, os: ubuntu-latest }
- { py: pypy3.9, toxenv: pypy3-epolls, ignore-error: true, os: ubuntu-20.04 }

steps:
Expand Down
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Contributors
* Victor Stinner
* Samuel Merritt
* Eric Urban
* Miguel Grinberg
* Tuomo Kriikkula

Linden Lab Contributors
Expand Down
5 changes: 4 additions & 1 deletion eventlet/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def hub_prevent_multiple_readers(state=True):
But if you really know what you are doing you can change the state
to ``False`` to stop the hub from protecting against this mistake.
"""
from eventlet.hubs import hub
from eventlet.hubs import hub, get_hub
from eventlet.hubs import asyncio
if not state and isinstance(get_hub(), asyncio.Hub):
raise RuntimeError("Multiple readers are not yet supported by asyncio hub")
hub.g_prevent_multiple_readers = state


Expand Down
146 changes: 146 additions & 0 deletions eventlet/hubs/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""

import asyncio
import os
import sys

from eventlet.hubs import hub


def is_available():
"""
Indicate whether this hub is available, since some hubs are
platform-specific.
Python always has asyncio, so this is always ``True``.
"""
return True


class Hub(hub.BaseHub):
"""An Eventlet hub implementation on top of an asyncio event loop."""

def __init__(self):
super().__init__()
self.sleep_event = asyncio.Event()
# The presumption is that eventlet is driving the event loop, so we
# want a new one we control.
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def add_timer(self, timer):
"""
Register a ``Timer``.
Typically not called directly by users.
"""
super().add_timer(timer)
self.sleep_event.set()

def _file_cb(self, cb, fileno):
"""
Callback called by ``asyncio`` when a file descriptor has an event.
"""
try:
cb(fileno)
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
self.sleep_event.set()

def add(self, evtype, fileno, cb, tb, mark_as_closed):
"""
Add a file descriptor of given event type to the ``Hub``. See the
superclass for details.
Typically not called directly by users.
"""
try:
os.fstat(fileno)
except OSError:
raise ValueError('Invalid file descriptor')
already_listening = self.listeners[evtype].get(fileno) is not None
listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
if not already_listening:
if evtype == hub.READ:
self.loop.add_reader(fileno, self._file_cb, cb, fileno)
else:
self.loop.add_writer(fileno, self._file_cb, cb, fileno)
return listener

def remove(self, listener):
"""
Remove a listener from the ``Hub``. See the superclass for details.
Typically not called directly by users.
"""
super().remove(listener)
evtype = listener.evtype
fileno = listener.fileno
if not self.listeners[evtype].get(fileno):
if evtype == hub.READ:
self.loop.remove_reader(fileno)
else:
self.loop.remove_writer(fileno)

def remove_descriptor(self, fileno):
"""
Remove a file descriptor from the ``asyncio`` loop.
Typically not called directly by users.
"""
have_read = self.listeners[hub.READ].get(fileno)
have_write = self.listeners[hub.WRITE].get(fileno)
super().remove_descriptor(fileno)
if have_read:
self.loop.remove_reader(fileno)
if have_write:
self.loop.remove_writer(fileno)

def run(self, *a, **kw):
"""
Start the ``Hub`` running. See the superclass for details.
"""
async def async_run():
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
while self.closed:
# We ditch all of these first.
self.close_one()
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
try:
await asyncio.wait_for(self.sleep_event.wait(),
sleep_time)
except asyncio.TimeoutError:
pass
self.sleep_event.clear()
else:
await asyncio.sleep(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False

self.loop.run_until_complete(async_run())
8 changes: 8 additions & 0 deletions tests/greenio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import sys
import tempfile

import pytest

import eventlet
from eventlet import event, greenio, debug
from eventlet.hubs import get_hub
Expand Down Expand Up @@ -856,6 +858,12 @@ class TestGreenIoLong(tests.LimitedTestCase):
TEST_TIMEOUT = 10 # the test here might take a while depending on the OS

def test_multiple_readers(self):
from eventlet.hubs.asyncio import Hub
if isinstance(get_hub(), Hub):
with pytest.raises(RuntimeError):
debug.hub_prevent_multiple_readers(False)
return

debug.hub_prevent_multiple_readers(False)
recvsize = 2 * min_buf_size()
sendsize = 10 * recvsize
Expand Down
5 changes: 5 additions & 0 deletions tests/hub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import sys
import time

import pytest

import tests
from tests import skip_if_no_itimer, skip_unless
import eventlet
from eventlet import debug, hubs
from eventlet.hubs.asyncio import Hub as AsyncioHub
from eventlet.support import greenlets


Expand Down Expand Up @@ -81,6 +84,8 @@ def test_cancel_proportion(self):
eventlet.sleep()


@pytest.mark.skipif(isinstance(hubs.get_hub(), AsyncioHub),
reason="Asyncio hub doesn't yet support multiple readers")
class TestMultipleListenersCleanup(tests.LimitedTestCase):
def setUp(self):
super().setUp()
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ envlist =
py38-openssl
py39-dnspython1
pypy3-epolls
py{38,39,310,311,312}-{selects,poll,epolls}
py{38,39,310,311,312}-{selects,poll,epolls,asyncio}
skipsdist = True

[testenv:ipv6]
Expand Down Expand Up @@ -58,6 +58,7 @@ setenv =
selects: EVENTLET_HUB = selects
poll: EVENTLET_HUB = poll
epolls: EVENTLET_HUB = epolls
asyncio: EVENTLET_HUB = asyncio
tox_cover_args = --cov=eventlet
deps =
coverage
Expand Down

0 comments on commit df81d5d

Please sign in to comment.