Skip to content

Commit

Permalink
WIP: support async coroutine functions
Browse files Browse the repository at this point in the history
  • Loading branch information
spaceone committed Dec 31, 2022
1 parent 1adff34 commit 36b02d7
Show file tree
Hide file tree
Showing 131 changed files with 461 additions and 413 deletions.
10 changes: 5 additions & 5 deletions circuits/app/daemon.py
Expand Up @@ -61,14 +61,14 @@ def init(self, pidfile, path="/", stdin=None, stdout=None, stderr=None, channel=

self.stderr = stderr if stderr is not None and isabs(stderr) else "/dev/null"

def deletepid(self):
async def deletepid(self):
remove(self.pidfile)

def writepid(self):
async def writepid(self):
with open(self.pidfile, "w") as fd:
fd.write(str(getpid()))

def daemonize(self):
async def daemonize(self):
try:
pid = fork()
if pid > 0:
Expand Down Expand Up @@ -116,11 +116,11 @@ def daemonize(self):
self.fire(writepid())
self.fire(daemonized(self))

def registered(self, component, manager):
async def registered(self, component, manager):
if component == self and manager.root.running:
self.fire(daemonize())

@handler("started", priority=100.0, channel="*")
def on_started(self, component):
async def on_started(self, component):
if component is not self:
self.fire(daemonize())
2 changes: 1 addition & 1 deletion circuits/app/dropprivileges.py
Expand Up @@ -43,7 +43,7 @@ def drop_privileges(self):
raise SystemExit(-1)

@handler("ready", channel="*")
def on_ready(self, server, bind):
async def on_ready(self, server, bind):
try:
self.drop_privileges()
finally:
Expand Down
9 changes: 5 additions & 4 deletions circuits/core/bridge.py
Expand Up @@ -68,7 +68,7 @@ def _process_packet(self, eid, obj):
self.fire(event, self.channel)

@handler("value_changed", channel="*")
def _on_value_changed(self, value):
async def _on_value_changed(self, value):
try:
eid = self._values[value]
if value.errors:
Expand All @@ -78,7 +78,7 @@ def _on_value_changed(self, value):
pass

@handler("read")
def _on_read(self, data):
async def _on_read(self, data):
self._buffer += data
items = self._buffer.split(_sentinel)

Expand All @@ -101,7 +101,7 @@ def __write(self, eid, data):
self._socket.write(dumps((eid, data)) + _sentinel)

@handler("ipc")
def _on_ipc(self, event, ipc_event, channel=None):
async def _on_ipc(self, event, ipc_event, channel=None):
"""Send event to a child/parentprocess
Event handler to run an event on a child/parent process
Expand All @@ -121,7 +121,7 @@ def _on_ipc(self, event, ipc_event, channel=None):
:Example:
``# hello is your event to execute in the child process
result = yield self.fire(ipc(hello()))
result = await self.fire(ipc(hello()))
print(result.value)``
"""

Expand All @@ -130,6 +130,7 @@ def _on_ipc(self, event, ipc_event, channel=None):

eid = hash(ipc_event)
self.__send(eid, ipc_event)
#await self.wait(Bridge.__waiting_event(eid))
yield self.wait(Bridge.__waiting_event(eid))

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion circuits/core/components.py
Expand Up @@ -123,7 +123,7 @@ def __init__(self, *args, **kwargs):
self.init(*args, **kwargs)

@handler("prepare_unregister_complete", channel=self)
def _on_prepare_unregister_complete(self, event, e, value):
async def _on_prepare_unregister_complete(self, event, e, value):
self._do_prepare_unregister_complete(event.parent, value)
self.addHandler(_on_prepare_unregister_complete)

Expand Down
8 changes: 4 additions & 4 deletions circuits/core/debugger.py
Expand Up @@ -52,13 +52,13 @@ def __init__(self, errors=True, events=True, file=None, logger=None,
self.IgnoreChannels.extend(kwargs.get("IgnoreChannels", []))

@handler("signal", channel="*")
def _on_signal(self, signo, stack):
async def _on_signal(self, signo, stack):
if signo in [SIGINT, SIGTERM]:
raise SystemExit(0)

@handler("exception", channel="*", priority=100.0)
def _on_exception(self, error_type, value, traceback,
handler=None, fevent=None):
async def _on_exception(self, error_type, value, traceback,
handler=None, fevent=None):

if not self._errors:
return
Expand Down Expand Up @@ -88,7 +88,7 @@ def _on_exception(self, error_type, value, traceback,
pass

@handler(priority=101.0)
def _on_event(self, event, *args, **kwargs):
async def _on_event(self, event, *args, **kwargs):
"""Global Event Handler
Event handler to listen to all events printing
Expand Down
4 changes: 3 additions & 1 deletion circuits/core/handlers.py
@@ -1,6 +1,7 @@
"""
This module define the @handler decorator/function and the HandlesType type.
"""
import inspect
from collections.abc import Callable

from circuits.tools import getargspec
Expand Down Expand Up @@ -38,7 +39,7 @@ def handler(*names, **kwargs):
Normally, the results returned by the handlers for an event are simply
collected in the :class:`circuits.core.events.Event`'s :attr:`value`
attribute. As a special case, a handler may return a
:class:`types.GeneratorType`. This signals to the dispatcher that the
:class:`types.AsyncGeneratorType`. This signals to the dispatcher that the
handler isn't ready to deliver a result yet.
Rather, it has interrupted it's execution with a ``yield None``
statement, thus preserving its current execution state.
Expand All @@ -63,6 +64,7 @@ def handler(*names, **kwargs):
"""

def wrapper(f):
assert f.__name__ == 'init' or inspect.iscoroutinefunction(f) or inspect.isasyncgenfunction(f), f
if names and isinstance(names[0], bool) and not names[0]:
f.handler = False
return f
Expand Down
8 changes: 4 additions & 4 deletions circuits/core/helpers.py
Expand Up @@ -19,7 +19,7 @@ def __init__(self, *args, **kwargs):
self._continue = Event()

@handler("generate_events", priority=-100)
def _on_generate_events(self, event):
async def _on_generate_events(self, event):
"""
Fall back handler for the :class:`~.events.GenerateEvents` event.
Expand Down Expand Up @@ -77,8 +77,8 @@ class FallBackExceptionHandler(BaseComponent):
"""

@handler("exception", channel="*")
def _on_exception(self, error_type, value, traceback,
handler=None, fevent=None):
async def _on_exception(self, error_type, value, traceback,
handler=None, fevent=None):
s = []

if handler is None:
Expand All @@ -105,6 +105,6 @@ class FallBackSignalHandler(BaseComponent):
"""

@handler("signal", channel="*")
def _on_signal(self, signo, stack):
async def _on_signal(self, signo, stack):
if signo in [SIGINT, SIGTERM]:
raise SystemExit(0)
2 changes: 1 addition & 1 deletion circuits/core/loader.py
Expand Up @@ -35,7 +35,7 @@ def __init__(self, auto_register=True, init_args=None, init_kwargs=None, paths=N
sys.path.insert(0, path)

@handler("load")
def load(self, name):
async def load(self, name):
module = safeimport(name)
if module is not None:

Expand Down
70 changes: 57 additions & 13 deletions circuits/core/manager.py
Expand Up @@ -15,7 +15,7 @@
from threading import RLock, Thread, current_thread
from time import time
from traceback import format_exc
from types import GeneratorType
from types import AsyncGeneratorType, CoroutineType, GeneratorType
from uuid import uuid4 as uuid

from ..tools import tryimport
Expand Down Expand Up @@ -73,9 +73,17 @@ def __init__(self, seconds):
def __iter__(self):
return self

def __aiter__(self):
return self

def __repr__(self):
return f"sleep({self.expiry - time()!r})"

def __anext__(self):
if time() >= self.expiry:
raise StopIteration()
return self

def __next__(self):
if time() >= self.expiry:
raise StopIteration()
Expand Down Expand Up @@ -504,7 +512,7 @@ def unregisterTask(self, g):
if g in self.root._tasks:
self.root._tasks.remove(g)

def waitEvent(self, event, *channels, **kwargs): # noqa
async def waitEvent(self, event, *channels, **kwargs): # noqa
# XXX: C901: This has a high McCabe complexity score of 16.
# TODO: Refactor this method.

Expand Down Expand Up @@ -565,9 +573,13 @@ def _on_tick(self):
if state.event is not None:
yield CallValue(state.event.value)

async def waita(self, event, *channels, **kwargs):
for x in self.wait(event, *channels, **kwargs):
yield x

wait = waitEvent

def callEvent(self, event, *channels, **kwargs):
async def callEvent(self, event, *channels, **kwargs):
"""
Fire the given event to the specified channels and suspend
execution until it has been dispatched. This method may only
Expand All @@ -578,7 +590,14 @@ def callEvent(self, event, *channels, **kwargs):
been dispatched (see :func:`circuits.core.handlers.handler`).
"""
value = self.fire(event, *channels)
yield from self.waitEvent(event, *event.channels, **kwargs)
async for r in self.waitEvent(event, *event.channels, **kwargs):
await r # pass ?
return CallValue(value)

async def calla(self, event, *channels, **kwargs):
value = self.fire(event, *channels)
async for r in self.waitEvent(event, *event.channels, **kwargs):
yield r
yield CallValue(value)

call = callEvent
Expand Down Expand Up @@ -676,7 +695,7 @@ def _dispatcher(self, event, channels, remaining): # noqa
self.stop()
except SystemExit as e:
self.stop(e.code)
except BaseException:
except BaseException: # FIXME: cannot happen anymore with coroutines # FIXME: looks like code duplication with what happens in processTask()
value = err = _exc_info()
event.value.errors = True

Expand All @@ -686,7 +705,7 @@ def _dispatcher(self, event, channels, remaining): # noqa
self.fire(exception(*err, handler=event_handler, fevent=event))

if value is not None:
if isinstance(value, GeneratorType):
if isinstance(value, (GeneratorType, CoroutineType, AsyncGeneratorType)):
event.waitingHandlers += 1
event.value.promise = True
self.registerTask((event, value, None))
Expand Down Expand Up @@ -817,20 +836,28 @@ def processTask(self, event, task, parent=None): # noqa

value = None
try:
value = next(task)
if isinstance(task, AsyncGeneratorType):
value = task.asend(None)
elif hasattr(task, 'send'):
value = task.send(None)
else:
value = next(task)
if isinstance(value, CallValue):
# Done here, next() will StopIteration anyway
self.unregisterTask((event, task, parent))
# We are in a callEvent
value = parent.send(value.value)
if isinstance(value, GeneratorType):
if isinstance(value, (GeneratorType, CoroutineType, AsyncGeneratorType)):
# We loose a yield but we gain one,
# we don't need to change
# event.waitingHandlers
# The below code is delegated to handlers
# in the waitEvent generator
# self.registerTask((event, value, parent))
task_state = next(value)
if isinstance(value, AsyncGeneratorType):
task_state = value.asend(None).send(None)
else:
task_state = value.send(None)
task_state.task_event = event
task_state.task = value
task_state.parent = parent
Expand All @@ -839,11 +866,14 @@ def processTask(self, event, task, parent=None): # noqa
if value is not None:
event.value.value = value
self.registerTask((event, parent, None))
elif isinstance(value, GeneratorType):
elif isinstance(value, (GeneratorType, CoroutineType, AsyncGeneratorType)):
event.waitingHandlers += 1
self.unregisterTask((event, task, None))
# First yielded value is always the task state
task_state = next(value)
if isinstance(value, AsyncGeneratorType):
task_state = value.asend(None).send(None)
else:
task_state = value.send(None)
task_state.task_event = event
task_state.task = value
task_state.parent = task
Expand All @@ -856,7 +886,9 @@ def processTask(self, event, task, parent=None): # noqa
if parent:
value = parent.throw(value.extract())
if value is not None:
value_generator = (val for val in (value,))
async def value_generator():
return value # yield?
# value_generator = (val for val in (value,))
self.registerTask((event, value_generator, parent))
else:
raise value.extract()
Expand All @@ -867,7 +899,7 @@ def processTask(self, event, task, parent=None): # noqa
self.unregisterTask((event, task, parent))
elif value is not None:
event.value.value = value
except StopIteration:
except (StopAsyncIteration, StopIteration):
event.waitingHandlers -= 1
self.unregisterTask((event, task, parent))

Expand Down Expand Up @@ -974,6 +1006,18 @@ def run(self, socket=None):
self.tick()
except Exception:
pass
while self._tasks:
event, task, parent = self._tasks.pop()
if isinstance(task, CoroutineType):
try:
task.throw(RuntimeError('circuits stopped'))
except RuntimeError:
pass
elif isinstance(task, AsyncGeneratorType):
try:
task.athrow(RuntimeError('circuits stopped'))
except RuntimeError:
pass

self.root._executing_thread = None
self.__thread = None
Expand Down
2 changes: 1 addition & 1 deletion circuits/core/pollers.py
Expand Up @@ -82,7 +82,7 @@ def accept():
return (res_list[0], clnt_sock)

@handler("generate_events", priority=-9)
def _on_generate_events(self, event):
async def _on_generate_events(self, event):
"""
Pollers have slightly higher priority than the default handler
from Manager to ensure that they are invoked before the
Expand Down
2 changes: 1 addition & 1 deletion circuits/core/timers.py
Expand Up @@ -47,7 +47,7 @@ def __init__(self, interval, event, *channels, **kwargs):
self.reset(interval)

@handler("generate_events")
def _on_generate_events(self, event):
async def _on_generate_events(self, event):
if self.expiry is None:
return

Expand Down
12 changes: 6 additions & 6 deletions circuits/core/workers.py
Expand Up @@ -72,16 +72,16 @@ def init(self, process=False, workers=None, channel=channel):
self.pool = Pool(self.workers)

@handler("stopped", "unregistered", channel="*")
def _on_stopped(self, event, *args):
async def _on_stopped(self, event, *args):
if event.name == "unregistered" and args[0] is not self:
return

self.pool.close()
self.pool.join()

@handler("task")
def _on_task(self, f, *args, **kwargs):
result = self.pool.apply_async(f, args, kwargs)
while not result.ready():
yield
yield result.get()
async def _on_task(self, f, *args, **kwargs):
await self.pool.apply_async(f, args, kwargs)
#while not result.ready():
# await
return result.get()

0 comments on commit 36b02d7

Please sign in to comment.