/
sanic.py
184 lines (147 loc) · 6.38 KB
/
sanic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import asyncio
import json
import uuid
from typing import Tuple, Any, Dict, Union, Optional, cast
from sanic import Blueprint, Sanic, request, response
from sanic_cors import CORS
from mypy_extensions import TypedDict
from websockets import WebSocketCommonProtocol
from idom.core.dispatcher import (
SingleStateDispatcher,
SharedStateDispatcher,
SendCoroutine,
RecvCoroutine,
)
from idom.core.layout import LayoutEvent
from idom.client.manage import find_path
from .base import AbstractRenderServer
class Config(TypedDict, total=False):
cors: Union[bool, Dict[str, Any]]
url_prefix: Optional[str]
server_static_files: bool
redirect_root_to_index: bool
class SanicRenderServer(AbstractRenderServer[Sanic, Config]):
"""Base ``sanic`` extension."""
def stop(self) -> None:
self.application.stop()
def _init_config(self) -> Config:
return Config(
cors=False,
url_prefix=None,
server_static_files=True,
redirect_root_to_index=True,
)
def _update_config(self, old: Config, new: Config) -> Config:
old.update(new)
return old
def _default_application(self, config: Config) -> Sanic:
return Sanic()
def _setup_application(self, app: Sanic, config: Config) -> None:
cors_config = config["cors"]
if cors_config:
cors_params = cors_config if isinstance(cors_config, dict) else {}
CORS(app, **cors_params)
bp = Blueprint(f"idom_dispatcher_{id(self)}", url_prefix=config["url_prefix"])
self._setup_blueprint_routes(bp, config)
app.blueprint(bp)
def _setup_blueprint_routes(self, blueprint: Blueprint, config: Config) -> None:
"""Add routes to the application blueprint"""
@blueprint.websocket("/stream") # type: ignore
async def model_stream(
request: request.Request, socket: WebSocketCommonProtocol
) -> None:
async def sock_send(value: Any) -> None:
await socket.send(json.dumps(value))
async def sock_recv() -> LayoutEvent:
message = json.loads(await socket.recv())
event = message["body"]["event"]
return LayoutEvent(event["target"], event["data"])
param_dict = {k: request.args.get(k) for k in request.args}
await self._run_dispatcher(sock_send, sock_recv, param_dict)
def handler_name(function: Any) -> str:
return f"{blueprint.name}.{function.__name__}"
if config["server_static_files"]:
@blueprint.route("/client/<path:path>") # type: ignore
async def client_files(
request: request.Request, path: str
) -> response.HTTPResponse:
file_extensions = [".html", ".js", ".json"]
abs_path = find_path(path)
return (
(await response.file_stream(str(abs_path)))
if abs_path is not None and abs_path.suffix in file_extensions
else response.text(f"Could not find: {path!r}", status=404)
)
if config["redirect_root_to_index"]:
@blueprint.route("/") # type: ignore
def redirect_to_index(request: request.Request) -> response.HTTPResponse:
return response.redirect(
request.app.url_for(handler_name(client_files), path="index.html")
)
def _run_application(
self, app: Sanic, config: Config, args: Tuple[Any, ...], kwargs: Dict[str, Any]
) -> None:
if not self._daemonized:
app.run(*args, **kwargs)
else:
# copied from:
# https://github.com/huge-success/sanic/blob/master/examples/run_async_advanced.py
serv_coro = app.create_server(*args, **kwargs, return_asyncio_server=True)
loop = asyncio.get_event_loop()
serv_task = asyncio.ensure_future(serv_coro, loop=loop)
server = loop.run_until_complete(serv_task)
server.after_start()
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
finally:
server.before_stop()
# Wait for server to close
close_task = server.close()
loop.run_until_complete(close_task)
# Complete all tasks on the loop
for connection in server.connections:
connection.close_if_idle()
server.after_stop()
class PerClientStateServer(SanicRenderServer):
"""Each client view will have its own state."""
_dispatcher_type = SingleStateDispatcher
async def _run_dispatcher(
self,
send: SendCoroutine,
recv: RecvCoroutine,
parameters: Dict[str, Any],
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
async with self._make_dispatcher(parameters, loop) as dispatcher:
await dispatcher.run(send, recv, None)
class SharedClientStateServer(SanicRenderServer):
"""All connected client views will have shared state."""
_dispatcher_type = SharedStateDispatcher
_dispatcher: SharedStateDispatcher
def _setup_application(self, app: Sanic, config: Config) -> None:
app.listener("before_server_start")(self._activate_dispatcher)
app.listener("before_server_stop")(self._deactivate_dispatcher)
super()._setup_application(app, config)
async def _activate_dispatcher(
self, app: Sanic, loop: asyncio.AbstractEventLoop
) -> None:
self._dispatcher = cast(SharedStateDispatcher, self._make_dispatcher({}, loop))
await self._dispatcher.__aenter__()
async def _deactivate_dispatcher(
self, app: Sanic, loop: asyncio.AbstractEventLoop
) -> None: # pragma: no cover
# this doesn't seem to get triffered during testing for some reason
await self._dispatcher.__aexit__(None, None, None)
async def _run_dispatcher(
self,
send: SendCoroutine,
recv: RecvCoroutine,
parameters: Dict[str, Any],
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
if parameters:
msg = f"SharedClientState server does not support per-client view parameters {parameters}"
raise ValueError(msg)
await self._dispatcher.run(send, recv, uuid.uuid4().hex, join=True)