Skip to content

Commit 8e18fcb

Browse files
committed
feat: 支持 download 路由并适配一系列动作
- 支持 WebSocket 连接 - 支持证书获取 - 使用 aiohttp 来服务文件
1 parent 407fa87 commit 8e18fcb

File tree

8 files changed

+178
-49
lines changed

8 files changed

+178
-49
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.vscode/
2-
config/config.properties
32
**/__pycache__
43
logs
54
config/config.yml
6-
cache
5+
cache
6+
cert

core/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from core.cluster import Cluster
22
from core.config import Config
3+
import asyncio
34

45

56
async def init():
@@ -13,3 +14,13 @@ async def init():
1314
delay = Config.get("advanced.delay")
1415
retry = Config.get("advanced.retry")
1516
await cluster.syncFiles(missing_filelist, retry, delay)
17+
await cluster.connect()
18+
protocol = "http" if Config.get("cluster.byoc") else "https"
19+
if protocol == "https":
20+
await cluster.socket.requestCertificate()
21+
await cluster.setupExpress(protocol == "https", port=Config.get('cluster.port'))
22+
try:
23+
while True:
24+
await asyncio.sleep(1000)
25+
except KeyboardInterrupt:
26+
raise KeyboardInterrupt

core/client.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from core.logger import logger
2+
from core.config import Config
3+
from typing import List, Any
4+
import socketio
5+
import asyncio
6+
import aiofiles
7+
import os
8+
9+
10+
class WebSocketClient:
11+
def __init__(self, token: str) -> None:
12+
self.socket = None
13+
self.connected = None
14+
self.base_url = Config.get("cluster.base_url")
15+
self.cert_path = Config.get("advanced.paths.cert")
16+
self.key_path = Config.get("advanced.paths.key")
17+
self.token = token
18+
os.makedirs(os.path.dirname(self.cert_path), exist_ok=True)
19+
os.makedirs(os.path.dirname(self.key_path), exist_ok=True)
20+
21+
async def connect(self) -> None:
22+
if self.socket and self.connected:
23+
return
24+
25+
self.socket = socketio.AsyncClient()
26+
27+
@self.socket.on("connect")
28+
async def _() -> None:
29+
logger.tsuccess("client.success.connected")
30+
31+
@self.socket.on("message")
32+
async def _(message: str) -> None:
33+
logger.tinfo("client.info.message", message=message)
34+
35+
@self.socket.on("exception")
36+
async def _(error: str) -> None:
37+
logger.tinfo("client.error.exception", error=error)
38+
39+
@self.socket.on("reconnect")
40+
async def _() -> None:
41+
pass
42+
43+
@self.socket.on("reconnect_error")
44+
async def _(error: str) -> None:
45+
logger.terror("client.error.reconnect", error=error)
46+
47+
@self.socket.on("reconnect_failed")
48+
async def _() -> None:
49+
pass
50+
51+
await self.socket.connect(
52+
self.base_url, transports=["websocket"], auth={"token": str(self.token)}
53+
)
54+
55+
async def requestCertificate(self) -> None:
56+
future = asyncio.Future()
57+
58+
async def callback(data: List[Any]):
59+
error, cert = data
60+
future.set_result((error, cert))
61+
62+
try:
63+
await self.socket.emit("request-cert", callback=callback)
64+
error, cert = await future
65+
if error:
66+
raise Exception(error)
67+
async with aiofiles.open(self.cert_path, "w") as f:
68+
await f.write(cert["cert"])
69+
async with aiofiles.open(self.key_path, "w") as f:
70+
await f.write(cert["key"])
71+
logger.tsuccess("client.success.request_certificate")
72+
except Exception as e:
73+
logger.terror("client.error.request_certificate", e=e)

core/cluster.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from core.storages import getStorages
66
from core.classes import FileInfo, FileList, AgentConfiguration
77
from core.router import Router
8+
from core.client import WebSocketClient
89
from core.i18n import locale
910
from aiohttp import web
1011
from tqdm import tqdm
@@ -14,7 +15,7 @@
1415
import asyncio
1516
import hmac
1617
import hashlib
17-
import socketio
18+
import ssl
1819
import humanize
1920
import io
2021

@@ -82,6 +83,9 @@ def __init__(self) -> None:
8283
self.storages = getStorages()
8384
self.configuration = None
8485
self.semaphore = None
86+
self.socket = None
87+
self.router = None
88+
self.server = None
8589
self.failed_filelist = FileList(files=[])
8690

8791
async def fetchFileList(self) -> None:
@@ -214,7 +218,9 @@ async def downloadFile(
214218
content = await response.read()
215219
results = await asyncio.gather(
216220
*(
217-
storage.writeFile(file, io.BytesIO(content), delay, retry)
221+
storage.writeFile(
222+
file, io.BytesIO(content), delay, retry
223+
)
218224
for storage in self.storages
219225
)
220226
)
@@ -226,18 +232,37 @@ async def downloadFile(
226232
"cluster.error.download_file.retry",
227233
file=file.hash,
228234
e=e,
229-
retry=delay
235+
retry=delay,
230236
)
231237
await asyncio.sleep(delay)
232238
logger.terror("cluster.error.download_file.failed", file=file.hash)
233239
self.failed_filelist.files.append(file)
234240

235-
async def setupExpress(self, https: bool) -> None:
241+
async def setupExpress(self, https: bool, port: int) -> None:
236242
logger.tinfo("cluster.info.router.creating")
237-
app = web.Application
238-
router = Router(https, app)
239-
router.init()
243+
app = web.Application()
244+
try:
245+
self.router = Router(app, self.storages)
246+
self.router.init()
247+
ssl_context = None
248+
# if https:
249+
# ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
250+
# ssl_context.load_cert_chain(
251+
# certfile=Config.get("advanced.paths.cert"),
252+
# keyfile=Config.get("advanced.paths.key")
253+
# )
254+
self.server = web.AppRunner(app)
255+
logger.tsuccess("cluster.success.router.created")
256+
await self.server.setup()
257+
site = web.TCPSite(self.server, "0.0.0.0", port, ssl_context=ssl_context)
258+
await site.start()
259+
logger.tsuccess("cluster.success.site.start")
260+
except Exception as e:
261+
logger.terror("cluster.error.router.exception", e=e)
240262

263+
async def connect(self) -> None:
264+
self.socket = WebSocketClient(self.token.token)
265+
await self.socket.connect()
241266

242267
async def init(self) -> None:
243268
await asyncio.gather(*(storage.init() for storage in self.storages))

core/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@
1212
"cluster.base_url": "https://openbmclapi.bangbang93.com",
1313
"cluster.id": "",
1414
"cluster.secret": "",
15+
"cluster.byoc": False,
16+
"cluster.public_port": 8080,
17+
"cluster.port": 8800,
1518
"storages": [{"type": "local", "path": "./cache"}],
19+
"advanced.paths.cert": "./cert/cert.pem",
20+
"advanced.paths.key": "./cert/key.pem",
1621
}
1722

1823

core/router.py

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,59 @@
11
from functools import wraps
22
from core.config import Config
33
from core.classes import Storage
4-
from typing import Any, List
4+
from core.logger import logger
5+
from typing import List
56
from aiohttp import web
67
import random
7-
import aiohttp
8-
import asyncio
98
import base64
109
import hashlib
1110
import time
1211

12+
1313
class Router:
14-
def __init__(self, https: bool, app: web.Application) -> None:
15-
self.https = https
14+
def __init__(self, app: web.Application, storages: List[Storage]) -> None:
1615
self.app = app
17-
self.secret = Config.get('cluster.secret')
16+
self.secret = Config.get("cluster.secret")
17+
self.storages = storages
1818

1919
def route(self, path, method="GET"):
2020
def decorator(func):
2121
@wraps(func)
2222
async def wrapper(request):
2323
return await func(request)
24+
2425
self.app.router.add_route(method, path, wrapper)
2526
return wrapper
27+
2628
return decorator
27-
29+
2830
def init(self) -> None:
29-
@self.route("/auth")
30-
async def _():
31-
pass
31+
# @self.route("/auth")
32+
# async def _():
33+
# pass
3234

3335
@self.route("/download/{hash}")
34-
async def _(self, request: web.Request, storages: List[Storage]) -> web.Response | web.StreamResponse:
36+
async def _(
37+
request: web.Request,
38+
) -> web.Response | web.FileResponse | web.StreamResponse:
3539
def check_sign(hash: str, secret: str, query: dict) -> bool:
36-
if not (s := query.get('s')) or not (e := query.get('e')): return False
37-
sign = base64.urlsafe_b64encode(hashlib.sha1(f"{secret}{hash}{e}".encode('utf-8')).digest()).decode('utf-8').rstrip('=')
40+
# return True
41+
if not (s := query.get("s")) or not (e := query.get("e")):
42+
return False
43+
sign = (
44+
base64.urlsafe_b64encode(
45+
hashlib.sha1(f"{secret}{hash}{e}".encode("utf-8")).digest()
46+
)
47+
.decode("utf-8")
48+
.rstrip("=")
49+
)
3850
return sign == s and time.time() < int(e, 36)
39-
40-
hash = request.match_info.get('hash').lower()
51+
52+
hash = request.match_info.get("hash").lower()
4153
valid = check_sign(hash, self.secret, request.query)
4254
if not valid:
4355
return web.Response(text="invalid sign", status=403)
44-
response = web.StreamResponse(status=200)
45-
response.headers['x-bmclapi-hash'] = hash
46-
data = await random.choice(storages).express(hash, request, response)
56+
response = None
57+
data = await random.choice(self.storages).express(hash, request, response)
58+
logger.debug(data)
4759
return response
48-
49-

core/storages/local.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ def __init__(self, path: str) -> None:
1717
self.checked = False
1818

1919
async def init(self) -> None:
20-
if not os.path.exists(self.path):
21-
os.makedirs(self.path)
20+
os.makedirs(self.path, exist_ok=True)
2221

2322
async def check(self) -> None:
2423
logger.tinfo("storage.info.check")
@@ -77,24 +76,21 @@ async def check_file(file: FileInfo, pbar: tqdm) -> bool:
7776
file for file, is_missing in zip(files.files, results) if is_missing
7877
]
7978
return FileList(files=missing_files)
80-
81-
async def express(self, hash: str, request: web.Request, response: web.StreamResponse) -> Dict[str, Any]:
79+
80+
async def express(
81+
self, hash: str, request: web.Request, response
82+
) -> Dict[str, Any]:
8283
path = os.path.join(self.path, hash[:2], hash)
8384
if not os.path.exists(path):
84-
response.set_status(404, 'File not found')
85-
return {'bytes': 0, 'hits': 0}
86-
file_size = os.path.getsize(path)
87-
response.content_length = file_size
88-
response.content_type = 'application/octet-stream'
89-
response.headers['Cache-Control'] = 'max-age=2592000'
90-
response.prepare(request)
91-
85+
response = web.Response()
86+
response.set_status(404, "File not found")
87+
return {"bytes": 0, "hits": 0}
9288
try:
93-
transport = request.transport
94-
socket = transport.get_extra_info('socket')
95-
with open(path, 'rb') as f:
96-
await asyncio.get_event_loop().sendfile(socket, f, 0, file_size)
97-
return {'bytes': file_size, 'hits': 1}
98-
99-
except Exception:
100-
return {'bytes': 0, 'hits': 0}
89+
file_size = (os.path.getsize(path),)
90+
response = web.FileResponse(path, status=200)
91+
response.headers["x-bmclapi-hash"] = hash
92+
await response.prepare(request)
93+
return {"bytes": file_size, "hits": 1}
94+
except Exception as e:
95+
logger.debug(e)
96+
return {"bytes": 0, "hits": 0}

i18n/zh_cn.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,17 @@
2121
"cluster.error.sync_files.retry": "无法下载所有文件,将在 ${retry}s 后重试。",
2222
"cluster.error.sync_files.failed": "无法下载所有文件,已达到最高重试次数。",
2323
"cluster.info.router.creating": "正在创建路由……",
24+
"cluster.success.router.created": "成功创建路由!",
25+
"cluster.error.router.exception": "在尝试创建路由时发生错误:${e}。",
2426
"cluster.tqdm.desc.get_missing": "获取缺失文件中",
2527
"cluster.tqdm.desc.sync_files": "同步文件中",
2628
"cluster.tqdm.unit.files": " 个文件",
29+
"client.success.connected": "成功与主控建立连接!",
30+
"client.info.message": "[主控] ${message}",
31+
"client.error.exception": "[主控] ${error}",
32+
"client.error.reconnect": "在尝试重连至主控时发生错误:${error}。",
33+
"client.error.request_certificate": "在尝试获取证书时发生错误:${e}。",
34+
"client.success.request_certificate": "成功获取证书!",
35+
"cluster.success.site.start": "成功启动服务端!",
2736
"configuration.debug.get": "同步策略:${sync}。"
2837
}

0 commit comments

Comments
 (0)