-
Notifications
You must be signed in to change notification settings - Fork 0
/
txasgi.py
77 lines (55 loc) · 1.99 KB
/
txasgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio # isort:skip
import sys # isort:skip
from twisted.internet import asyncioreactor # isort:skip
if "twisted.internet.reactor" not in sys.modules:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asyncioreactor.install(loop)
import importlib
from zope.interface import implementer
from twisted.application.service import IServiceMaker, MultiService, Service
from twisted.internet import defer, endpoints, reactor, threads
from twisted.plugin import IPlugin
from twisted.python import usage
from twisted.web import server
from txasgiresource import ASGIResource
class Options(usage.Options):
optParameters = [
["application", "a", None, "Application"],
[
"description",
"d",
"tcp:8000:interface=127.0.0.1",
"Twisted server description",
],
[
"proxy_headers",
"p",
False,
"Parse proxy header and use them to replace client ip",
],
]
class ASGIService(Service):
def __init__(self, resource, description):
self.resource = resource
self.description = description
@defer.inlineCallbacks
def startService(self):
self.endpoint = yield endpoints.serverFromString(reactor, self.description)
self.endpoint.listen(server.Site(self.resource))
def stopService(self):
self.resource.stop()
@implementer(IServiceMaker, IPlugin)
class ServiceMaker(object):
tapname = "txasgi"
description = "ASGI Server"
options = Options
def makeService(self, options):
asyncio.set_event_loop(reactor._asyncioEventloop)
module, function = options["application"].split(":")
application = getattr(importlib.import_module(module), function)
ms = MultiService()
resource = ASGIResource(application, use_proxy_headers=options["proxy_headers"])
ms.addService(ASGIService(resource, options["description"]))
return ms
txasgi = ServiceMaker()