Skip to content

Commit

Permalink
Merge 30bf81a into a407f87
Browse files Browse the repository at this point in the history
  • Loading branch information
joamag committed Nov 12, 2019
2 parents a407f87 + 30bf81a commit 9a5ceba
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 25 deletions.
12 changes: 12 additions & 0 deletions examples/async/async_neo.py
Expand Up @@ -60,6 +60,18 @@ async def hello(self):
await handler()
yield "after\n"

@appier.route("/async/sender", "GET")
async def sender(self):
import asyncio
sleep = self.field("sleep", 1.0, cast = float)
self.request_ctx.set_content_type("text/plain")
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (1)\n")
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (2)\n")
await asyncio.sleep(sleep)
await self.request_ctx.send(b"Sender (3)\n")

@appier.route("/async/callable", "GET")
async def callable(self):
sleep = self.field("sleep", 3.0, cast = float)
Expand Down
2 changes: 1 addition & 1 deletion src/appier/__init__.py
Expand Up @@ -86,7 +86,7 @@
from .base import APP, LEVEL, NAME, VERSION, PLATFORM, IDENTIFIER_SHORT, IDENTIFIER_LONG, IDENTIFIER,\
API_VERSION, BUFFER_SIZE, MAX_LOG_SIZE, MAX_LOG_COUNT, App, APIApp, WebApp, Template, get_app, get_name,\
get_base_path, get_cache, get_preferences, get_bus, get_request, get_session, get_model, get_controller, get_part,\
get_adapter, get_manager, get_logger, get_level, is_loaded, is_devel, is_safe, to_locale, on_exit, build_asgi
get_adapter, get_manager, get_logger, get_level, is_loaded, is_devel, is_safe, to_locale, on_exit, build_asgi, build_asgi_i
from .bus import Bus, MemoryBus, RedisBus
from .cache import Cache, MemoryCache, FileCache, RedisCache, SerializedCache
from .component import Component
Expand Down
180 changes: 169 additions & 11 deletions src/appier/asgi.py
Expand Up @@ -37,6 +37,13 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import io
import asyncio
import inspect
import tempfile

from . import util
from . import legacy
from . import exceptions

class ASGIApp(object):
Expand All @@ -48,6 +55,20 @@ async def asgi_entry(cls, scope, receive, send):
cls._asgi = cls()
return await cls._asgi.app_asgi(scope, receive, send)

def serve_uvicorn(self, host, port, **kwargs):
import uvicorn
reload = kwargs.get("reload", False)
app_asgi = build_asgi_i(self)
uvicorn.run(app_asgi, host = host, port = port, reload=reload)

def serve_hypercorn(self, host, port, **kwargs):
import hypercorn.config
import hypercorn.asyncio
config = hypercorn.config.Config()
config.bind = ["%s:%d" % (host, port)]
app_asgi = build_asgi_i(self)
asyncio.run(hypercorn.asyncio.serve(app_asgi, config))

async def app_asgi(self, *args, **kwargs):
return await self.application_asgi(*args, **kwargs)

Expand Down Expand Up @@ -94,23 +115,160 @@ async def asgi_lifespan(self, scope, receive, send):
running = False

async def asgi_http(self, scope, receive, send):
self.prepare()
try:
await send({
"type": "http.response.start",
"status" : 200,
"headers" : [
[b"content-type", b"text/plain"],
]
# creates the context dictionary so that this new "pseudo" request
# can have its own context for futures placement
ctx = dict(start_task = None)

# runs the asynchronous building of the intermediate structures
# to get to the final WSGI compliant environment dictionary
start_response = await self._build_start_response(ctx, send)
sender = await self._build_sender(ctx, send, start_response)
body = await self._build_body(receive)
environ = await self._build_environ(scope, body, sender)

self.prepare()
try:
result = self.application_l(environ, start_response, ensure_gen = False)
self.set_request_ctx()
finally:
self.restore()

# verifies if the resulting value is an awaitable and if
# that's the case waits for it's "real" result value (async)
if inspect.isawaitable(result): result = await result

# waits for the start (code and headers) send operation to be
# completed (async) so that we can proceed with body sending
await ctx["start_task"]

# iterates over the complete set of chunks in the response
# iterator to send each of them to the client side
for chunk in (result if result else [b""]):
if asyncio.iscoroutine(chunk):
await chunk
elif asyncio.isfuture(chunk):
await chunk
elif isinstance(chunk, int):
continue
else:
if legacy.is_string(chunk):
chunk = chunk.encode("utf-8")
await send({
"type" : "http.response.body",
"body" : chunk
})
finally:
self.unset_request_ctx()

async def _build_start_response(self, ctx, send):
def start_response(status, headers):
if ctx["start_task"]: return
code = status.split(" ", 1)[0]
code = int(code)
headers = [
(name.lower().encode("ascii"), value.encode("ascii"))
for name, value in headers
]
send_coro = send({
"type" : "http.response.start",
"status" : code,
"headers" : headers
})
await send({
ctx["start_task"] = asyncio.create_task(send_coro)
return start_response

async def _build_sender(self, ctx, send, start_response):
async def sender(data):
if not ctx["start_task"]:
self.request_ctx.set_headers_b()
code_s = self.request_ctx.get_code_s()
headers = self.request_ctx.get_headers() or []
if self.sort_headers: headers.sort()
start_response(code_s, headers)
await ctx["start_task"]
return await send({
"type" : "http.response.body",
"body" : b"Hello, world!",
"body" : data,
"more_body" : True
})
finally:
self.restore()
return sender

async def _build_body(self, receive):
with tempfile.SpooledTemporaryFile(max_size = 65536) as body:
while True:
message = await receive()
util.verify(message["type"] == "http.request")
body.write(message.get("body", b""))
if not message.get("more_body"): break
body.seek(0)
return body

async def _build_environ(self, scope, body, sender):
"""
Builds a scope and request body into a WSGI environ object.
:type scope: Dictionary
:param scope: The scope dictionary from ASGI.
:type: body: File
:param body: The body callable to be used for the reading
of the input.
:type sender: Function
:param sender: The sender function responsible for the sending
of data to the client side (reponse).
:rtype: Dictionary
:return: The WSGI compatible environ dictionary converted
from ASGI and ready to be used by WSGI apps.
"""

environ = {
"REQUEST_METHOD": scope["method"],
"SCRIPT_NAME": scope.get("root_path", ""),
"PATH_INFO": scope["path"],
"QUERY_STRING": scope["query_string"].decode("ascii"),
"SERVER_PROTOCOL": "HTTP/%s" % scope["http_version"],
"wsgi.version": (1, 0),
"wsgi.url_scheme": scope.get("scheme", "http"),
"wsgi.input": body,
"wsgi.output": sender,
"wsgi.errors": io.BytesIO(),
"wsgi.multithread": True,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}

if "server" in scope:
environ["SERVER_NAME"] = scope["server"][0]
environ["SERVER_PORT"] = str(scope["server"][1])
else:
environ["SERVER_NAME"] = "localhost"
environ["SERVER_PORT"] = "80"

if "client" in scope:
environ["REMOTE_ADDR"] = scope["client"][0]

for name, value in scope.get("headers", []):
name = name.decode("latin1")
if name == "content-length":
corrected_name = "CONTENT_LENGTH"
elif name == "content-type":
corrected_name = "CONTENT_TYPE"
else:
corrected_name = "HTTP_%s" % name.upper().replace("-", "_")

value = value.decode("latin1")
if corrected_name in environ:
value = environ[corrected_name] + "," + value
environ[corrected_name] = value

return environ

def build_asgi(app_cls):
async def app_asgi(scope, receive, send):
return await app_cls.asgi_entry(scope, receive, send)
return app_asgi

def build_asgi_i(app):
async def app_asgi(scope, receive, send):
return await app.app_asgi(scope, receive, send)
return app_asgi
6 changes: 4 additions & 2 deletions src/appier/async_neo.py
Expand Up @@ -108,8 +108,10 @@ def __iter__(self):
def __next__(self):
if self._buffer: return self._buffer.pop(0)
try:
if self.current == None: self.current = self.async_iter.asend(None)
try: return next(self.current)
if self.current == None:
self.current = self.async_iter.asend(None)
try:
return next(self.current)
except StopIteration as exception:
self.current = None
if not exception.args: return None
Expand Down

0 comments on commit 9a5ceba

Please sign in to comment.