Skip to content

Commit

Permalink
better neo support for async
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Mar 23, 2017
1 parent 84c042d commit 204cc6c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
12 changes: 4 additions & 8 deletions examples/async/async_neo.py
Expand Up @@ -56,18 +56,16 @@ def __init__(self, *args, **kwargs):
async def hello(self):
partial = self.field("partial", True, cast = bool)
handler = self.handler_partial if partial else self.handler
await appier.header_a()
await appier.await_yield("before\n")
yield "before\n"
await handler()
await appier.await_yield("after\n")
yield "after\n"

@appier.route("/async/callable", "GET")
async def callable(self):
sleep = self.field("sleep", 3.0, cast = float)
await appier.header_a()
await appier.await_yield("before\n")
yield "before\n"
await appier.ensure_a(lambda: time.sleep(sleep))
await appier.await_yield("after\n")
yield "after\n"

@appier.route("/async/file", "GET")
async def file(self):
Expand All @@ -77,7 +75,6 @@ async def file(self):
type, _encoding = mimetypes.guess_type(file_path, strict = True)
type = type or "application/octet-stream"
self.request.content_type = type
await appier.header_a()
await appier.ensure_a(
self.read_file,
args = [file_path],
Expand All @@ -90,7 +87,6 @@ async def http(self):
url = self.field("url", "https://www.flickr.com/")
delay = self.field("delay", 0.0, cast = float)
self.request.content_type = "text/html"
await appier.header_a()
await appier.sleep(delay)
yield await appier.get_w(url)

Expand Down
12 changes: 12 additions & 0 deletions src/appier/async_neo.py
Expand Up @@ -79,26 +79,34 @@ class CoroutineWrapper(object):

def __init__(self, coroutine):
self.coroutine = coroutine
self._buffer = None

def __iter__(self):
return self

def __next__(self):
if self._buffer: self._buffer.pop(0)
return self.coroutine.send(None)

def next(self):
return self.__next__()

def restore(self, value):
if self._buffer == None: self._buffer = []
self._buffer.append(value)

class AyncgenWrapper(object):

def __init__(self, async_iter):
self.async_iter = async_iter
self.current = None
self._buffer = None

def __iter__(self):
return self

def __next__(self):
if self._buffer: return self._buffer.pop(0)
try:
if self.current == None: self.current = self.async_iter.asend(None)
try: return next(self.current)
Expand All @@ -112,6 +120,10 @@ def __next__(self):
def next(self):
return self.__next__()

def restore(self, value):
if self._buffer == None: self._buffer = []
self._buffer.append(value)

def await_wrap(generator):
return AwaitWrapper(generator)

Expand Down
15 changes: 10 additions & 5 deletions src/appier/base.py
Expand Up @@ -1109,13 +1109,18 @@ def application_l(self, environ, start_response):
# it so that it may be used for length evaluation (protocol definition)
# at this stage it's possible to have an exception raised for a non
# existent file or any other pre validation based problem
is_async_generator = legacy.is_async_generator(result)
is_coroutine = asynchronous.is_coroutine_native(result)
is_generator, result = asynchronous.ensure_generator(result)
if is_async_generator: first = -1
elif is_coroutine: first = -1
elif is_generator: first = next(result)
if is_generator: first = next(result)
else: first = None

# tries to determine if the first element of the generator (if existent)
# is valid and if that's not the case tries to find a fallback
is_valid = first == None or type(first) in legacy.INTEGERS
if not is_valid:
if hasattr(result, "restore"): result.restore(first); first = -1
else: raise exceptions.OperationalError(
message = "No message size defined for generator"
)
except BaseException as exception:
# resets the values associated with the generator based strategy so
# that the error/exception is handled in the proper (non generator)
Expand Down

0 comments on commit 204cc6c

Please sign in to comment.