-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
87 lines (65 loc) · 2.56 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""One solution: Use a decorator to poll for the disconnect"""
import asyncio
from functools import wraps
from typing import Any, Awaitable, Callable
from fastapi import FastAPI, Query, Request, HTTPException
app = FastAPI(title="Disconnect example")
async def disconnect_poller(request: Request, result: Any):
"""
Poll for a disconnect.
If the request disconnects, stop polling and return.
"""
try:
while not await request.is_disconnected():
await asyncio.sleep(0.01)
print("Request disconnected")
return result
except asyncio.CancelledError:
print("Stopping polling loop")
def cancel_on_disconnect(handler: Callable[[Request], Awaitable[Any]]):
"""
Decorator that will check if the client disconnects,
and cancel the task if required.
"""
@wraps(handler)
async def cancel_on_disconnect_decorator(request: Request, *args, **kwargs):
sentinel = object()
# Create two tasks, one to poll the request and check if the
# client disconnected, and another which is the request handler
poller_task = asyncio.ensure_future(disconnect_poller(request, sentinel))
handler_task = asyncio.ensure_future(handler(request, *args, **kwargs))
done, pending = await asyncio.wait(
[poller_task, handler_task], return_when=asyncio.FIRST_COMPLETED
)
# Cancel any outstanding tasks
for t in pending:
t.cancel()
try:
await t
except asyncio.CancelledError:
print(f"{t} was cancelled")
except Exception as exc:
print(f"{t} raised {exc} when being cancelled")
# Return the result if the handler finished first
if handler_task in done:
return await handler_task
# Otherwise, raise an exception
# This is not exactly needed, but it will prevent
# validation errors if your request handler is supposed
# to return something.
print("Raising an HTTP error because I was disconnected!!")
raise HTTPException(503)
return cancel_on_disconnect_decorator
@app.get("/example")
@cancel_on_disconnect
async def example(
request: Request,
wait: float = Query(..., description="Time to wait, in seconds"),
):
try:
print(f"Sleeping for {wait:.2f}")
await asyncio.sleep(wait)
print("Sleep not cancelled")
return f"I waited for {wait:.2f}s and now this is the result"
except asyncio.CancelledError:
print("Exiting on cancellation")