Skip to content

Commit

Permalink
Added several new async helpers and dropped the @asynchronous decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
agronholm committed Apr 7, 2016
1 parent 3c9c590 commit 19b06de
Show file tree
Hide file tree
Showing 29 changed files with 811 additions and 329 deletions.
2 changes: 1 addition & 1 deletion asphalt/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async def start(ctx: Context):
'{package}'
],
install_requires=[
'asphalt >= {current_version}, < {next_major_version}'
'asphalt'
]
)
""".format(package=package, project_name=project_name, current_version=current_version,
Expand Down
2 changes: 0 additions & 2 deletions asphalt/core/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from typeguard import check_argument_types

from asphalt.core.concurrency import asynchronous
from asphalt.core.context import Context
from asphalt.core.util import PluginContainer, merge_config

Expand Down Expand Up @@ -70,7 +69,6 @@ def add_component(self, alias: str, type: Union[str, type]=None, **kwargs):
component = component_types.create_object(type or alias, **kwargs)
self.child_components[alias] = component

@asynchronous
async def start(self, ctx: Context):
"""
Create child components that have been configured but not yet created and then calls their
Expand Down
4 changes: 3 additions & 1 deletion asphalt/core/concurrency/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .eventloop import *
from .async import *
from .threads import *
from .eventloop import *
from .file import *
152 changes: 152 additions & 0 deletions asphalt/core/concurrency/async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from collections import Coroutine, AsyncIterator
from functools import wraps
from inspect import iscoroutinefunction
from typing import Callable, Optional

__all__ = ('yield_async', 'async_generator', 'async_contextmanager')


async def _work_coroutine(
coro: Coroutine, exception: BaseException = None) -> Optional['_AsyncYieldValue']:
"""
Run the coroutine until it does ``await yield_async(...)``.
:return: the value contained by :class:`_AsyncYieldValue`
"""
value = None
while True:
try:
if exception is not None:
value = coro.throw(type(exception), exception, exception.__traceback__)
else:
value = coro.send(value)
except StopIteration:
return None

if isinstance(value, _AsyncYieldValue):
return value
else:
try:
value = await value
except Exception as e:
exception = e
else:
exception = None


class _AsyncGeneratorWrapper:
__slots__ = 'coroutine'

def __init__(self, coroutine: Coroutine):
self.coroutine = coroutine

async def __aiter__(self):
return self

async def __anext__(self):
value = await _work_coroutine(self.coroutine)
if value is not None:
return value.value
else:
raise StopAsyncIteration


class _AsyncContextManager:
__slots__ = 'coroutine'

def __init__(self, coroutine: Coroutine):
self.coroutine = coroutine

async def __aenter__(self):
retval = await _work_coroutine(self.coroutine)
if retval is not None:
return retval.value
else:
raise RuntimeError('coroutine finished without yielding a value')

async def __aexit__(self, exc_type, exc_val, exc_tb):
retval = await _work_coroutine(self.coroutine, exc_val)
if retval is not None:
raise RuntimeError('coroutine yielded a value in the exit phase: {!r}'.
format(retval.value))


class _AsyncYieldValue:
__slots__ = 'value'

def __init__(self, value):
self.value = value

def __await__(self):
yield self


def yield_async(value):
"""The equivalent of ``yield`` in an asynchronous context manager or asynchronous generator."""
return _AsyncYieldValue(value)


def async_generator(func: Callable[..., Coroutine]) -> Callable[..., AsyncIterator]:
"""
Transform a generator function into something that works with the ``async for``.
Any awaitable yielded by the given generator function will be awaited on and the result passed
back to the generator. Any other yielded values will be yielded to the actual consumer of the
asynchronous iterator.
For example:
>>> @async_generator
>>> async def mygenerator(websites):
>>> for website in websites:
>>> page = await http_fetch(website)
>>> await yield_async(page)
>>>
>>> async def fetch_pages():
>>> websites = ('http://foo.bar', 'http://example.org')
>>> async for sanitized_page in mygenerator(websites):
>>> print(sanitized_page)
:param func: a generator function
:return: a callable that can be used with ``async for``
"""
@wraps(func)
def wrapper(*args, **kwargs):
return _AsyncGeneratorWrapper(func(*args, **kwargs))

assert iscoroutinefunction(func), '"func" must be a coroutine function'
return wrapper


def async_contextmanager(func: Callable[..., Coroutine]) -> Callable:
"""
Transform a generator function into something that works with ``async with``.
The generator may yield any number of awaitables which are resolved and sent back to the
generator. To indicate that the setup phase is complete, the generator must yield one
non-awaitable value. The rest of the generator will then be processed after the context block
has been executed. If the context was exited with an exception, this exception will be raised
in the generator.
For example:
>>> @async_contextmanager
>>> async def mycontextmanager(arg):
>>> context = await setup_remote_context(arg)
>>> await yield_async(context)
>>> await context.teardown()
>>>
>>> async def frobnicate(arg):
>>> async with mycontextmanager(arg) as context:
>>> do_something_with(context)
:param func: a generator function
:return: a callable that can be used with ``async with``
"""
@wraps(func)
def wrapper(*args, **kwargs):
return _AsyncContextManager(func(*args, **kwargs))

assert iscoroutinefunction(func), '"func" must be a coroutine function'
return wrapper
27 changes: 12 additions & 15 deletions asphalt/core/concurrency/eventloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

from typeguard import check_argument_types

__all__ = ('set_event_loop', 'get_event_loop', 'is_event_loop_thread', 'stop_event_loop')
__all__ = ('set_event_loop', 'get_event_loop', 'is_event_loop_thread')

_event_loop = None
_event_loop_thread_id = main_thread().ident


def set_event_loop(loop: AbstractEventLoop, thread: Thread=None) -> None:
def set_event_loop(loop: AbstractEventLoop, thread: Thread = None) -> None:
"""
Set the event loop to be used by Asphalt applications.
Mark the current event loop instance and thread to be used by Asphalt applications.
This is necessary in order for :func:`blocking` and :func:`asynchronous` to work.
This is necessary in order for :func:`~asphalt.core.concurrency.async.call_async` and
:func:`~is_event_loop_thread` to work.
:param loop: the event loop that will run the Asphalt application
:param thread: thread the event loop runs in (omit to use the current thread)
Expand All @@ -28,7 +29,13 @@ def set_event_loop(loop: AbstractEventLoop, thread: Thread=None) -> None:


def get_event_loop() -> Optional[AbstractEventLoop]:
"""Return the current event loop, as set by :func:`~set_event_loop`."""
"""
Return the current event loop, as set by :func:`~set_event_loop`.
Users are discouraged from using this function, and should prefer
:func:`asyncio.get_event_loop` instead.
"""
return _event_loop


Expand All @@ -40,13 +47,3 @@ def is_event_loop_thread() -> bool:
"""
return get_ident() == _event_loop_thread_id


def stop_event_loop() -> None:
"""
Schedule the current event loop to stop on the next iteration.
This function is the only supported way to stop the event loop from a non-eventloop thread.
"""
_event_loop.call_soon_threadsafe(_event_loop.stop)
97 changes: 97 additions & 0 deletions asphalt/core/concurrency/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from concurrent.futures import Executor
from pathlib import Path
from typing import Union, Optional

from typeguard import check_argument_types

from asphalt.core.concurrency import threadpool, async_generator, yield_async, call_in_thread

__all__ = ('AsyncFileWrapper', 'open_async')


class AsyncFileWrapper:
"""
Wraps certain file I/O operations so they're guaranteed to run in a thread pool.
The wrapped methods work like coroutines when called in the event loop thread, but when called
in any other thread, they work just like the methods of the ``file`` type.
This class supports use as an asynchronous context manager.
The wrapped methods are:
* ``flush()``
* ``read()``
* ``readline()``
* ``readlines()``
* ``seek()``
* ``truncate()``
* ``write()``
* ``writelines()``
"""

__slots__ = ('_open_args', '_open_kwargs', '_executor', '_raw_file', 'flush', 'read',
'readline', 'readlines', 'seek', 'truncate', 'write', 'writelines')

def __init__(self, path: str, args: tuple, kwargs: dict, executor: Optional[Executor]):
self._open_args = (path,) + args
self._open_kwargs = kwargs
self._executor = executor
self._raw_file = None

def __getattr__(self, name):
return getattr(self._raw_file, name)

def __await__(self):
if self._raw_file is None:
self._raw_file = yield from call_in_thread(
open, *self._open_args, executor=self._executor, **self._open_kwargs)
self.flush = threadpool(self._executor)(self._raw_file.flush)
self.read = threadpool(self._executor)(self._raw_file.read)
self.readline = threadpool(self._executor)(self._raw_file.readline)
self.readlines = threadpool(self._executor)(self._raw_file.readlines)
self.seek = threadpool(self._executor)(self._raw_file.seek)
self.truncate = threadpool(self._executor)(self._raw_file.truncate)
self.write = threadpool(self._executor)(self._raw_file.write)
self.writelines = threadpool(self._executor)(self._raw_file.writelines)

return self

def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._raw_file.close()

@async_generator
async def async_readchunks(self, size: int):
"""
Read data from the file in chunks.
:param size: the maximum number of bytes or characters to read at once
:return: an asynchronous iterator yielding bytes or strings
"""
assert check_argument_types()
while True:
data = await self.read(size)
if data:
await yield_async(data)
else:
return


def open_async(file: Union[str, Path], *args, executor: Executor = None,
**kwargs) -> AsyncFileWrapper:
"""
Open a file and wrap it in an :class:`~AsyncFileWrapper`.
:param file: the file path to open
:param args: positional arguments to :func:`open`
:param executor: the ``executor`` argument to :class:`~AsyncFileWrapper`
:param kwargs: keyword arguments to :func:`open`
:return: the wrapped file object
"""
assert check_argument_types()
return AsyncFileWrapper(str(file), args, kwargs, executor)

0 comments on commit 19b06de

Please sign in to comment.