Skip to content

Commit

Permalink
Merge branch 'release/2.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Anders Jensen committed Feb 18, 2020
2 parents 8f44f83 + fc4ea71 commit 160654c
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 228 deletions.
6 changes: 6 additions & 0 deletions .isort.cfg
@@ -0,0 +1,6 @@
[settings]
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
use_parentheses=True
line_length=88
5 changes: 5 additions & 0 deletions CHANGELOG.rst
@@ -1,6 +1,11 @@
Changelog
=========

Version 2.2.0 (17-02-2020)
-----------------------------------------------------------

* Added support for asgi3

Version 2.1.7 (21-12-2018)
-----------------------------------------------------------

Expand Down
13 changes: 13 additions & 0 deletions README.rst
Expand Up @@ -40,6 +40,19 @@ As ASGI Protocol server on a different port and ip

twistd -n txasgi -a yourdjangoproject.asgi:channel_layer -d tcp:5566:interface=0.0.0.0

Supported specifications
------------------------

.. csv-table::
:header: "Specification", "Supported"

"asgi2", "Yes, through compatibility handler"
"asgi3", "Yes"
"HTTP", "Yes, v2.0"
"Websocket", "Yes, v2.0"
"Lifespan", "No"


Status
------

Expand Down
2 changes: 2 additions & 0 deletions setup.py
Expand Up @@ -16,11 +16,13 @@
author_email='johndoee@tidalstream.org',
description='ASGI implemented as a Twisted resource',
long_description=long_description,
long_description_content_type="text/x-rst",
license='MIT',
packages=['txasgiresource', 'txasgiresource.tests', 'twisted.plugins'],
install_requires=[
'twisted>=16.0',
'autobahn>=0.12',
'asgiref>=2.3.2'
],
classifiers=[
'Development Status :: 3 - Alpha',
Expand Down
36 changes: 23 additions & 13 deletions twisted/plugins/txasgi.py
@@ -1,5 +1,5 @@
import asyncio
import sys
import asyncio # isort:skip
import sys # isort:skip

from twisted.internet import asyncioreactor # isort:skip

Expand All @@ -8,26 +8,35 @@
asyncio.set_event_loop(loop)
asyncioreactor.install(loop)

import asyncio

import importlib


from zope.interface import implementer

from twisted.application.service import IServiceMaker, Service, MultiService
from twisted.internet import endpoints, reactor, defer, threads
from twisted.application.service import IServiceMaker, MultiService, Service
from twisted.internet import defer, endpoints, reactor, threads
from twisted.plugin import IPlugin
from twisted.python import usage
from twisted.web import server

from txasgiresource import ASGIResource


class Options(usage.Options):

optParameters = [
["application", "a", None, "Application"],
["description", "d", "tcp:8000:interface=127.0.0.1", "Twisted server description"],
["proxy_headers", "p", False, "Parse proxy header and use them to replace client ip"],
[
"description",
"d",
"tcp:8000:interface=127.0.0.1",
"Twisted server description",
],
[
"proxy_headers",
"p",
False,
"Parse proxy header and use them to replace client ip",
],
]


Expand All @@ -54,14 +63,15 @@ class ServiceMaker(object):
def makeService(self, options):
asyncio.set_event_loop(reactor._asyncioEventloop)

module, function = options['application'].split(':')
module, function = options["application"].split(":")
application = getattr(importlib.import_module(module), function)

ms = MultiService()

resource = ASGIResource(application)
ms.addService(ASGIService(resource, options['description']))
resource = ASGIResource(application, use_proxy_headers=options["proxy_headers"])
ms.addService(ASGIService(resource, options["description"]))

return ms

txasgi = ServiceMaker()

txasgi = ServiceMaker()
9 changes: 4 additions & 5 deletions txasgiresource/__init__.py
@@ -1,12 +1,11 @@
import sys

from .asgiresource import ASGIResource # NOQA

from twisted.internet import asyncioreactor # isort:skip

if "twisted.internet.reactor" not in sys.modules:
asyncioreactor.install()

try:
from .asgiresource import ASGIResource # NOQA
except ImportError:
pass

__version__ = '2.1.7'
__version__ = "2.2.0"
20 changes: 13 additions & 7 deletions txasgiresource/application.py
@@ -1,7 +1,7 @@
import asyncio
from concurrent.futures import CancelledError

from twisted.internet import defer, threads
from twisted.internet import defer


class ApplicationManager:
Expand All @@ -26,14 +26,10 @@ def create_application_instance(self, protocol, scope):
async def handle_reply(msg):
protocol.handle_reply(msg)

application_instance = self.application(scope)
queue = asyncio.Queue()

self.application_instances[protocol] = asyncio.ensure_future(
application_instance(
receive=queue.get,
send=handle_reply
)
self.application(scope=scope, receive=queue.get, send=handle_reply)
)

return queue
Expand All @@ -42,7 +38,17 @@ def finish_protocol(self, protocol):
wait_for = False
if protocol in self.application_instances:
if not self.application_instances[protocol].done():
self.application_instances[protocol].cancel()
if self.application_instances[protocol].cancel():

def handle_cancel_exception(f):
try:
f.exception()
except CancelledError:
pass

self.application_instances[protocol].add_done_callback(
handle_cancel_exception
)
wait_for = True
del self.application_instances[protocol]
return wait_for
106 changes: 64 additions & 42 deletions txasgiresource/asgiresource.py
@@ -1,7 +1,9 @@
import ipaddress
import logging

from asgiref.compatibility import guarantee_single_callable
from autobahn.twisted.resource import WebSocketResource

from twisted.web import resource, server

from .application import ApplicationManager
Expand All @@ -14,19 +16,22 @@
class ASGIResource(resource.Resource):
isLeaf = True

def __init__(self,
application,
root_path='',
http_timeout=120,
websocket_timeout=86400,
ping_interval=20,
ping_timeout=30,
ws_protocols=None,
use_proxy_headers=False,
use_proxy_proto_header=False,
automatic_proxy_header_handling=False, # ignores use_proxy_headers and use_proxy_proto_header
use_x_sendfile=False):
self.application = ApplicationManager(application)
def __init__(
self,
application,
root_path="",
http_timeout=120,
websocket_timeout=86400,
ping_interval=20,
ping_timeout=30,
ws_protocols=None,
use_proxy_headers=False,
use_proxy_proto_header=False,
automatic_proxy_header_handling=False, # ignores use_proxy_headers and use_proxy_proto_header
use_x_sendfile=False,
):

self.application = ApplicationManager(guarantee_single_callable(application))
self.root_path = root_path

self.http_timeout = http_timeout
Expand All @@ -45,30 +50,35 @@ def stop(self):
return self.application.stop()

def dispatch_websocket(self, request, base_scope):
wsfactory = ASGIWebSocketServerFactory(application=self.application,
base_scope=base_scope,
idle_timeout=self.websocket_timeout,
protocols=self.ws_protocols)

wsfactory.setProtocolOptions(autoPingInterval=self.ping_interval,
autoPingTimeout=self.ping_timeout)
wsfactory = ASGIWebSocketServerFactory(
application=self.application,
base_scope=base_scope,
idle_timeout=self.websocket_timeout,
protocols=self.ws_protocols,
)

wsfactory.setProtocolOptions(
autoPingInterval=self.ping_interval, autoPingTimeout=self.ping_timeout
)
wsfactory.startFactory()
return WebSocketResource(wsfactory).render(request)

def dispatch_http(self, request, base_scope):
return ASGIHTTPResource(application=self.application,
base_scope=base_scope,
timeout=self.http_timeout,
use_x_sendfile=self.use_x_sendfile).render(request)
return ASGIHTTPResource(
application=self.application,
base_scope=base_scope,
timeout=self.http_timeout,
use_x_sendfile=self.use_x_sendfile,
).render(request)

def render(self, request):
path = [b''] + request.postpath
path = '/'.join(p.decode('utf-8') for p in path)
path = [b""] + request.postpath
path = "/".join(p.decode("utf-8") for p in path)

if b'?' in request.uri:
query_string = request.uri.split(b'?', 1)[1]
if b"?" in request.uri:
query_string = request.uri.split(b"?", 1)[1]
else:
query_string = ''
query_string = ""

is_websocket = False
headers = []
Expand All @@ -80,10 +90,10 @@ def render(self, request):
name = name.lower()
for value in values:
headers.append([name, value])
if name == b'upgrade' and value.lower() == b'websocket':
if name == b"upgrade" and value.lower() == b"websocket":
is_websocket = True

if hasattr(request.client, 'host') and hasattr(request.client, 'port'):
if hasattr(request.client, "host") and hasattr(request.client, "port"):
client_info = [request.client.host, request.client.port]
server_info = [request.host.host, request.host.port]
else:
Expand All @@ -103,8 +113,16 @@ def render(self, request):
use_proxy_proto_header = True

if use_proxy_headers:
proxy_forwarded_host = request.requestHeaders.getRawHeaders(b"x-forwarded-for", [b""])[0].split(b",")[0].strip()
proxy_forwarded_port = request.requestHeaders.getRawHeaders(b"x-forwarded-port", [b""])[0].split(b",")[0].strip()
proxy_forwarded_host = (
request.requestHeaders.getRawHeaders(b"x-forwarded-for", [b""])[0]
.split(b",")[0]
.strip()
)
proxy_forwarded_port = (
request.requestHeaders.getRawHeaders(b"x-forwarded-port", [b""])[0]
.split(b",")[0]
.strip()
)

if proxy_forwarded_host:
port = 0
Expand All @@ -114,20 +132,24 @@ def render(self, request):
except ValueError:
pass

client_info = [proxy_forwarded_host.decode('utf-8'), port]
client_info = [proxy_forwarded_host.decode("utf-8"), port]

if use_proxy_proto_header:
headers.append([b'x-forwarded-proto', b'http%s' % (request.isSecure() and b's' or b'')])
headers.append(
[b"x-forwarded-proto", b"http%s" % (request.isSecure() and b"s" or b"")]
)

# build base payload used by both websocket and normal as handshake
base_scope = {
'path': path,
'query_string': query_string,
'root_path': self.root_path,
'headers': headers,
'client': client_info,
'server': server_info,
'_ssl': request.isSecure() and 's' or '',
"asgi": {"version": "3.0", "spec_version": "2.0"},
"path": path,
"raw_path": request.uri,
"query_string": query_string,
"root_path": self.root_path,
"headers": headers,
"client": client_info,
"server": server_info,
"_ssl": request.isSecure() and "s" or "",
}

if is_websocket:
Expand Down

0 comments on commit 160654c

Please sign in to comment.