-
Notifications
You must be signed in to change notification settings - Fork 5
/
server.py
268 lines (216 loc) · 9.9 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
from __future__ import annotations
import asyncio
import logging
import pathlib
import shutil
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from json.decoder import JSONDecodeError
from typing import Optional
from uuid import uuid4
from weakref import WeakSet
from aiohttp import WSCloseCode, WSMsgType, web
from janus import Queue
from .custom_types import DownloadResult, UpdateMessage, UpdateStatusCode
from .downloader import AlreadyDownloaded, download
logger = logging.getLogger(__name__)
async def update_publisher(app: web.Application) -> None:
"""
Background task that listens for downloader updates and publishes them to all connected websockets.
This function also mangles the contained path names to avoid information leakage.
:param app: Reference to the overall application.
"""
try:
while True:
update = await app["updates_queue"].async_q.get()
if update["status"] in [UpdateStatusCode.DOWNLOADING, UpdateStatusCode.DOWNLOADED]:
# Squash the directory where the file is being downloaded to
update["filename"] = update["filename"].name
elif update["status"] == UpdateStatusCode.COMPLETED:
# Hide the full directory and instead substitute user-accessible path. If download_dir is
# /var/www/html/downloads, result is in /var/www/html/downloads/Awesome, and download_prefix is
# "downloads", return /downloads/Awesome
if update["video_file"]:
update["video_file"] = (
app["download_prefix"] / update["video_file"].relative_to(app["download_dir"])
).as_posix()
if update["audio_file"]:
update["audio_file"] = (
app["download_prefix"] / update["audio_file"].relative_to(app["download_dir"])
).as_posix()
update["info_file"] = (
app["download_prefix"] / update["info_file"].relative_to(app["download_dir"])
).as_posix()
update["path"] = (app["download_prefix"] / update["path"].relative_to(app["download_dir"])).as_posix()
update["status"] = update["status"].name
# The websocket updates are best effort, not required. Don't wait for it to finish
[asyncio.create_task(ws.send_json(update)) for ws in app["websockets"]]
except asyncio.CancelledError:
pass
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
"""
Handler for "status" websockets. Sends a one-time list of available downloads, :func:`update_publisher` does rest.
:param request: The incoming (empty) request
:return: :mod:`aiohttp` mandated response type
"""
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app["websockets"].add(ws)
available_downloads = []
for child in request.app["download_dir"].iterdir():
if not child.is_dir():
continue
available_downloads.append(
{
"path": (request.app["download_prefix"] / child.relative_to(request.app["download_dir"])).as_posix(),
"key": child.name,
"pretty_name": child.name,
}
)
# Sort the downloads in alphabetical order
available_downloads.sort(key=lambda x: x["pretty_name"])
await ws.send_json({"downloads": available_downloads})
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
break
elif msg.type == WSMsgType.PING:
await ws.pong()
elif msg.type in [WSMsgType.CLOSED, WSMsgType.ERROR]:
break
finally:
request.app["websockets"].discard(ws)
return ws
def download_future_handler(
updates_queue: Queue[UpdateMessage], req_id: str, future: asyncio.Future[DownloadResult]
) -> None:
"""
Filters out `AlreadyDownloaded` from executor calls to `download`. Propagates all other exceptions.
:param updates_queue: A queue to put real-time updates into
:param req_id: The request ID attached to this request
:param future: The future itself
"""
try:
download_result = future.result()
updates_queue.sync_q.put_nowait(
{
"req_id": req_id,
"status": UpdateStatusCode.COMPLETED,
"pretty_name": download_result.pretty_name,
"key": download_result.key,
"path": download_result.info_file.parent,
"info_file": download_result.info_file,
"video_file": download_result.video_file,
"audio_file": download_result.audio_file,
}
)
except AlreadyDownloaded as exc:
updates_queue.sync_q.put_nowait(
{"status": UpdateStatusCode.ERROR, "msg": f'"{exc.key}" already downloaded', "req_id": req_id}
)
logger.info('Request %s for "%s" was already downloaded', req_id, exc.key)
async def download_handler(request: web.Request) -> web.Response:
"""
Handles a request to download a video/audio file. Does type checking, issues the rest, and immediately returns 202.
:param request: JSON request with keys: url, download_video, extract_audio, and optionally audio_quality
:return: :mod:`aiohttp` mandated response type: 202 on success or 400 on bad request parameters
"""
loop = asyncio.get_running_loop()
try:
req_params = await request.json()
except JSONDecodeError:
raise web.HTTPBadRequest(text="Request body must be JSON")
if not isinstance(req_params.get("url"), str):
raise web.HTTPBadRequest(text='"url" must be specified and be a string')
if not isinstance(req_params.get("download_video"), bool):
raise web.HTTPBadRequest(text='"download_video" must be specified and be a boolean')
if not isinstance(req_params.get("extract_audio"), bool):
raise web.HTTPBadRequest(text='"extract_audio" must be specified and be a boolean')
if not isinstance(req_params.get("audio_quality", 3), int) or not 1 <= req_params.get("quality_quality", 3) <= 5:
raise web.HTTPBadRequest(text='"audio_quality" must be between 1-5')
req_id = str(uuid4())
future = loop.run_in_executor(
request.app["executor"],
download,
pathlib.Path(request.app["download_dir"]),
True,
req_params["url"],
req_params.get("download_video"),
req_params.get("extract_audio"),
req_params.get("audio_quality", 3),
request.app["updates_queue"],
req_id,
request.app["ffmpeg_dir"],
)
# typeshed has a bug, see https://github.com/python/typeshed/pull/3935
future.add_done_callback(partial(download_future_handler, request.app["updates_queue"], req_id)) # type: ignore
return web.json_response({"req_id": req_id}, status=202)
async def delete_handler(request: web.Request) -> web.Response:
"""
Handles requests to delete a particular result. The input directory is checked for path traversal attacks.
:param request: JSON request with keys: key
:return: :mod:`aiohttp` mandated response type: 200 on success or 400 on bad request parameters
"""
try:
req_params = await request.json()
except JSONDecodeError:
raise web.HTTPBadRequest(text="Request body must be JSON")
if not isinstance(req_params.get("key"), str):
raise web.HTTPBadRequest(text='"key" must be specified and be a string')
resolved_dir = (request.app["download_dir"] / pathlib.Path(req_params.get("key"))).resolve()
try:
resolved_dir.relative_to(request.app["download_dir"])
except ValueError:
raise web.HTTPBadRequest(text="key specified is forbidden")
if resolved_dir.is_dir():
shutil.rmtree(resolved_dir)
else:
raise web.HTTPBadRequest(text="key does not exist")
request.app["updates_queue"].sync_q.put_nowait({"status": UpdateStatusCode.DELETED, "key": req_params["key"]})
return web.Response(status=200)
async def start_background_tasks(app: web.Application) -> None:
"""
Startup function for aiohttp. Kicks off the update publisher in the background.
:param app: Reference to the overall application.
"""
app["update_publisher"] = asyncio.create_task(update_publisher(app))
async def cleanup_background_tasks(app: web.Application) -> None:
"""
Cleanup function for aiohttp. Shuts down remaining tasks and disconnects open websockets.
:param app: Reference to the overall application.
"""
app["executor"].shutdown()
app["update_publisher"].cancel()
await app["update_publisher"]
for ws in set(app["websockets"]):
await ws.close(code=WSCloseCode.GOING_AWAY, message="Server shutdown")
def server(
download_dir: pathlib.Path, download_prefix: str, port: int, ffmpeg_dir: Optional[pathlib.Path] = None
) -> None:
"""
Starts the API server.
:param download_dir: Local directory to store downloaded files in.
:param download_prefix: Prefix for returned file paths, ultimately used to create download links.
:param port: TCP port to bind on.
:param ffmpeg_dir: Directory containing the FFmpeg binaries.
"""
app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app["download_dir"] = download_dir
app["download_prefix"] = pathlib.Path(download_prefix)
app["ffmpeg_dir"] = ffmpeg_dir
app["websockets"] = WeakSet()
app["updates_queue"] = Queue()
app.add_routes(
[
web.post("/download", download_handler),
web.get("/status", websocket_handler),
web.delete("/remove", delete_handler),
]
)
with ThreadPoolExecutor() as executor:
app["executor"] = executor
web.run_app(app, port=port)