-
Notifications
You must be signed in to change notification settings - Fork 13
/
_manager.py
103 lines (87 loc) · 3.81 KB
/
_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import typing
from types import TracebackType
from ._compat import AsyncExitStack
from ._concurrency import detect_concurrency_backend
from ._exceptions import LifespanNotSupported
from ._types import ASGIApp, Message, Scope
class LifespanManager:
def __init__(
self,
app: ASGIApp,
startup_timeout: typing.Optional[float] = 5,
shutdown_timeout: typing.Optional[float] = 5,
) -> None:
self.app = app
self.startup_timeout = startup_timeout
self.shutdown_timeout = shutdown_timeout
self._concurrency_backend = detect_concurrency_backend()
self._startup_complete = self._concurrency_backend.create_event()
self._shutdown_complete = self._concurrency_backend.create_event()
self._receive_queue = self._concurrency_backend.create_queue(capacity=2)
self._receive_called = False
self._app_exception: typing.Optional[BaseException] = None
self._exit_stack = AsyncExitStack()
async def startup(self) -> None:
await self._receive_queue.put({"type": "lifespan.startup"})
await self._concurrency_backend.run_and_fail_after(
self.startup_timeout, self._startup_complete.wait
)
if self._app_exception:
# Let the caller deal with the exception.
raise self._app_exception
async def shutdown(self) -> None:
await self._receive_queue.put({"type": "lifespan.shutdown"})
await self._concurrency_backend.run_and_fail_after(
self.shutdown_timeout, self._shutdown_complete.wait
)
async def receive(self) -> Message:
self._receive_called = True
return await self._receive_queue.get()
async def send(self, message: Message) -> None:
if not self._receive_called:
raise LifespanNotSupported(
"Application called send() before receive(). "
"Is it missing `assert scope['type'] == 'http'` or similar?"
)
if message["type"] == "lifespan.startup.complete":
self._startup_complete.set()
elif message["type"] == "lifespan.shutdown.complete":
self._shutdown_complete.set()
async def run_app(self) -> None:
scope: Scope = {"type": "lifespan"}
try:
await self.app(scope, self.receive, self.send)
except BaseException as exc:
self._app_exception = exc
# We crashed, so don't make '.startup()' and '.shutdown()'
# wait unnecesarily (or they'll timeout).
self._startup_complete.set()
self._shutdown_complete.set()
if not self._receive_called:
raise LifespanNotSupported(
"Application failed before making its first call to 'receive()'. "
"We expect this to originate from a statement similar to "
"`assert scope['type'] == 'type'`. "
"If that is not the case, then this crash is unexpected and "
"there is probably more debug output in the cause traceback."
) from exc
raise
async def __aenter__(self) -> None:
await self._exit_stack.__aenter__()
await self._exit_stack.enter_async_context(
self._concurrency_backend.run_in_background(self.run_app)
)
try:
await self.startup()
except BaseException:
await self._exit_stack.aclose()
raise
async def __aexit__(
self,
exc_type: typing.Type[BaseException] = None,
exc_value: BaseException = None,
traceback: TracebackType = None,
) -> typing.Optional[bool]:
if exc_type is None:
self._exit_stack.push_async_callback(self.shutdown)
return await self._exit_stack.__aexit__(exc_type, exc_value, traceback)