Skip to content

Commit

Permalink
Better asyncio support
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Nov 12, 2019
1 parent edad505 commit 828c27a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 29 deletions.
28 changes: 13 additions & 15 deletions examples/async/async_neo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ def __init__(self, *args, **kwargs):
*args, **kwargs
)

@appier.route("/async/tobias", "GET")
async def tobias(self):
import asyncio
await asyncio.sleep(2)
await self.request_ctx.send(b"hello tobias1\n")
await self.request_ctx.send(b"hello tobias2\n")
await self.request_ctx.send(b"hello tobias3\n")

@appier.route("/async", "GET")
@appier.route("/async/hello", "GET")
async def hello(self):
Expand All @@ -68,6 +60,17 @@ async def hello(self):
await handler()
yield "after\n"

@appier.route("/async/sender", "GET")
async def sender(self):
import asyncio
sleep = self.field("sleep", 1.0, cast = float)
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (1)\n")
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (2)\n")
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (3)\n")

@appier.route("/async/callable", "GET")
async def callable(self):
sleep = self.field("sleep", 3.0, cast = float)
Expand Down Expand Up @@ -133,10 +136,5 @@ async def read_file(self, file_path, chunk = 65536, delay = 0.0):
file.close()
return count

import uvicorn
app_asgi = appier.build_asgi(AsyncNeoApp)
uvicorn.run(app_asgi)


#app = AsyncNeoApp()
#app.serve()
app = AsyncNeoApp()
app.serve()
2 changes: 1 addition & 1 deletion src/appier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
from .base import APP, LEVEL, NAME, VERSION, PLATFORM, IDENTIFIER_SHORT, IDENTIFIER_LONG, IDENTIFIER,\
API_VERSION, BUFFER_SIZE, MAX_LOG_SIZE, MAX_LOG_COUNT, App, APIApp, WebApp, Template, get_app, get_name,\
get_base_path, get_cache, get_preferences, get_bus, get_request, get_session, get_model, get_controller, get_part,\
get_adapter, get_manager, get_logger, get_level, is_loaded, is_devel, is_safe, to_locale, on_exit, build_asgi
get_adapter, get_manager, get_logger, get_level, is_loaded, is_devel, is_safe, to_locale, on_exit, build_asgi, build_asgi_i
from .bus import Bus, MemoryBus, RedisBus
from .cache import Cache, MemoryCache, FileCache, RedisCache, SerializedCache
from .component import Component
Expand Down
37 changes: 28 additions & 9 deletions src/appier/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ async def asgi_entry(cls, scope, receive, send):
cls._asgi = cls()
return await cls._asgi.app_asgi(scope, receive, send)

def serve_uvicorn(self, host, port, **kwargs):
import uvicorn
reload = kwargs.get("reload", False)
app_asgi = build_asgi_i(self)
uvicorn.run(app_asgi, host = host, port = port, reload=reload)

async def app_asgi(self, *args, **kwargs):
return await self.application_asgi(*args, **kwargs)

Expand Down Expand Up @@ -102,21 +108,29 @@ async def asgi_lifespan(self, scope, receive, send):

async def asgi_http(self, scope, receive, send):
try:
ctx = dict(send_task = None)
body = await self._build_body( receive)
ctx = dict(start_task = None)
body = await self._build_body(receive)
environ = await self._build_environ(scope, body)
start_response = await self._build_start_response(ctx, send)
sender = await self._build_sender(ctx, send, start_response)

self.prepare()
try:
result = self.application_l(environ, start_response, sender = sender)
self._request_ctx.set(self.request) #@todo this is a great trick
self.set_request_ctx()
finally:
self.restore()

# verifies if the resulting value is an awaitable and if
# that's the case waits for it's "real" result value (async)
if inspect.isawaitable(result): result = await result
await ctx["send_task"]

# waits for the start (code and headers) send operation to be
# completed (async) so that we can proceed with body sending
await ctx["start_task"]

# iterates over the complete set of chunks in the response
# iterator to send each of them to the client side
for chunk in (result if result else [b""]):
if asyncio.iscoroutine(chunk):
await chunk
Expand All @@ -132,11 +146,11 @@ async def asgi_http(self, scope, receive, send):
"body" : chunk
})
finally:
self._request_ctx.set(None)
self.unset_request_ctx()

async def _build_start_response(self, ctx, send):
def start_response(status, headers):
if ctx["send_task"]: return
if ctx["start_task"]: return
code = status.split(" ", 1)[0]
code = int(code)
headers = [
Expand All @@ -148,19 +162,19 @@ def start_response(status, headers):
"status" : code,
"headers" : headers
})
ctx["send_task"] = asyncio.create_task(send_coro)
ctx["start_task"] = asyncio.create_task(send_coro)
return start_response

async def _build_sender(self, ctx, send, start_response):
async def sender(data):
if not ctx["send_task"]:
if not ctx["start_task"]:
start_response(
"200 OK",
[
("Content-Type", "text/plain")
]
)
await ctx["send_task"]
await ctx["start_task"]
return await send({
"type" : "http.response.body",
"body" : data,
Expand Down Expand Up @@ -237,3 +251,8 @@ def build_asgi(app_cls):
async def app_asgi(scope, receive, send):
return await app_cls.asgi_entry(scope, receive, send)
return app_asgi

def build_asgi_i(app):
async def app_asgi(scope, receive, send):
return await app.app_asgi(scope, receive, send)
return app_asgi
25 changes: 21 additions & 4 deletions src/appier/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
from . import preferences
from . import asynchronous

try: import contextvars
except ImportError: contextvars = None

APP = None
""" The global reference to the application object this
should be a singleton object and so no multiple instances
Expand Down Expand Up @@ -315,8 +318,10 @@
from . import asgi
EXTRA_CLS.append(asgi.ASGIApp)
build_asgi = asgi.build_asgi
build_asgi_i = asgi.build_asgi_i
else:
build_asgi = None
build_asgi_i = None

class App(
legacy.with_meta(
Expand Down Expand Up @@ -441,8 +446,7 @@ def __init__(
self.lib_loaders = {}
self.parts_l = []
self.parts_m = {}
import contextvars #@todo this is very usefull but tricky
self._request_ctx = contextvars.ContextVar("request")
self._request_ctx = contextvars.ContextVar("request") if contextvars else None
self._loaded = False
self._resolved = False
self._locale_d = locales[0]
Expand Down Expand Up @@ -474,7 +478,10 @@ def request(self):

@property
def request_ctx(self):
return self._request_ctx.get()
if not self._request_ctx: return self.request
request = self._request_ctx.get(None)
if request == None: return self.request
return request

@property
def locale(self):
Expand Down Expand Up @@ -1414,7 +1421,10 @@ def application_l(self, environ, start_response, sender = None):
params = util.decode_params(params)
self.request.set_params(params)

self.request.send = sender #@todo make this a little bit better
# sets the send operation that allows an async sending of
# data to the client side (important for asyncio)
self.request.send = sender
self.request.write = sender #@todo review this

# reads the data from the input stream file and then tries
# to load the data appropriately handling all normal cases
Expand Down Expand Up @@ -3165,6 +3175,13 @@ def get_field(
if cast and not value in (None, ""): value = cast(value)
return value

def set_request_ctx(self, request = None):
request = request or self.request
self._request_ctx.set(request)

def unset_request_ctx(self):
self._request_ctx.set(None)

def set_field(self, name, value, request = None):
request = request or self.request
request.args[name] = [value]
Expand Down

0 comments on commit 828c27a

Please sign in to comment.