Skip to content

Commit f13df1b

Browse files
committed
feat: 新增注册节点功能
1 parent e218467 commit f13df1b

File tree

7 files changed

+86
-28
lines changed

7 files changed

+86
-28
lines changed

core/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ async def init():
1818
protocol = "http" if Config.get("cluster.byoc") else "https"
1919
if protocol == "https":
2020
await cluster.socket.requestCertificate()
21-
await cluster.setupExpress(protocol == "https", port=Config.get("cluster.port"))
21+
await cluster.setupRouter(protocol == "https", port=Config.get("cluster.port"))
22+
await cluster.enable()
2223
try:
2324
while True:
2425
await asyncio.sleep(1000)

core/client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
class WebSocketClient:
1111
def __init__(self, token: str) -> None:
1212
self.socket = None
13-
self.connected = None
1413
self.base_url = Config.get("cluster.base_url")
1514
self.cert_path = Config.get("advanced.paths.cert")
1615
self.key_path = Config.get("advanced.paths.key")
@@ -19,7 +18,7 @@ def __init__(self, token: str) -> None:
1918
os.makedirs(os.path.dirname(self.key_path), exist_ok=True)
2019

2120
async def connect(self) -> None:
22-
if self.socket and self.connected:
21+
if self.socket and self.socket.connected:
2322
return
2423

2524
self.socket = socketio.AsyncClient()
@@ -28,6 +27,10 @@ async def connect(self) -> None:
2827
async def _() -> None:
2928
logger.tsuccess("client.success.connected")
3029

30+
@self.socket.on("disconnect")
31+
async def _(reason: str) -> None:
32+
logger.twarning("client.warn.disconnected", reason=reason)
33+
3134
@self.socket.on("message")
3235
async def _(message: str) -> None:
3336
logger.tinfo("client.info.message", message=message)

core/cluster.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from core.router import Router
88
from core.client import WebSocketClient
99
from core.i18n import locale
10+
from typing import List, Any
1011
from aiohttp import web
1112
from tqdm import tqdm
1213
import toml
@@ -16,6 +17,7 @@
1617
import hmac
1718
import hashlib
1819
import ssl
20+
import sys
1921
import humanize
2022
import io
2123

@@ -34,6 +36,7 @@ def __init__(self) -> None:
3436
self.secret = Config.get("cluster.secret")
3537
self.ttl = 0 # hours
3638
self.scheduler = None
39+
self.application = None
3740
if not self.id or not self.secret:
3841
raise ClusterIdNotSetError if not self.id else ClusterSecretNotSetError
3942

@@ -87,6 +90,7 @@ def __init__(self) -> None:
8790
self.router = None
8891
self.server = None
8992
self.failed_filelist = FileList(files=[])
93+
self.enabled = False
9094

9195
async def fetchFileList(self) -> None:
9296
logger.tinfo("cluster.info.filelist.fetching")
@@ -238,27 +242,76 @@ async def downloadFile(
238242
logger.terror("cluster.error.download_file.failed", file=file.hash)
239243
self.failed_filelist.files.append(file)
240244

241-
async def setupExpress(self, https: bool, port: int) -> None:
245+
async def setupRouter(self) -> None:
242246
logger.tinfo("cluster.info.router.creating")
243-
app = web.Application()
247+
self.application = web.Application()
244248
try:
245-
self.router = Router(app, self.storages)
249+
self.router = Router(self.application, self.storages)
246250
self.router.init()
251+
logger.tsuccess("cluster.success.router.created")
252+
253+
except Exception as e:
254+
logger.terror("cluster.error.router.exception", e=e)
255+
256+
async def listen(self, https: bool, port: int) -> None:
257+
try:
247258
ssl_context = None
248259
if https:
249260
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
250261
ssl_context.load_cert_chain(
251262
certfile=Config.get("advanced.paths.cert"),
252-
keyfile=Config.get("advanced.paths.key")
263+
keyfile=Config.get("advanced.paths.key"),
253264
)
254-
self.server = web.AppRunner(app)
255-
logger.tsuccess("cluster.success.router.created")
265+
self.server = web.AppRunner(self.application)
256266
await self.server.setup()
257-
site = web.TCPSite(self.server, "0.0.0.0", port, ssl_context=ssl_context)
267+
site = web.TCPSite(self.server, "0.0.0.0", port)
258268
await site.start()
259-
logger.tsuccess("cluster.success.site.start")
269+
logger.tsuccess("cluster.success.listen", port=port)
270+
260271
except Exception as e:
261-
logger.terror("cluster.error.router.exception", e=e)
272+
logger.terror('cluster.error.listen')
273+
274+
async def enable(self) -> None:
275+
if self.enabled:
276+
return
277+
logger.tinfo("cluster.info.enabling")
278+
future = asyncio.Future()
279+
280+
async def callback(data: List[Any]):
281+
error, ack = data
282+
future.set_result((error, ack))
283+
284+
if not self.socket:
285+
logger.terror("cluster.error.disconnected")
286+
return
287+
288+
try:
289+
await self.socket.socket.emit(
290+
"enable",
291+
data={
292+
"host": Config.get("cluster.host"),
293+
"port": Config.get("cluster.public_port"),
294+
"version": API_VERSION,
295+
"byoc": Config.get("cluster.byoc"),
296+
"noFastEnable": True,
297+
"flavor": {
298+
"runtime": f"python/{sys.version.split()[0]} python-openbmclapi/{VERSION}"
299+
},
300+
},
301+
callback=callback,
302+
)
303+
304+
error, ack = await future
305+
306+
if error:
307+
if isinstance(error, dict) and 'message' in error:
308+
logger.terror('cluster.error.enable.error', e=error['message'])
309+
310+
if ack is not True:
311+
logger.terror('cluster.error.enable.failed')
312+
313+
except Exception as e:
314+
logger.terror('cluster.error.enable.exception', e=e)
262315

263316
async def connect(self) -> None:
264317
self.socket = WebSocketClient(self.token.token)

core/config.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"cluster.base_url": "https://openbmclapi.bangbang93.com",
1313
"cluster.id": "",
1414
"cluster.secret": "",
15+
"cluster.host": None,
1516
"cluster.byoc": False,
1617
"cluster.public_port": 8080,
1718
"cluster.port": 8800,
@@ -50,6 +51,7 @@ def set(self, key: str, value: Any):
5051
self.save()
5152

5253
def save(self):
54+
self.file.parent.mkdir(parents=True, exist_ok=True)
5355
with open(self.file, "w", encoding="utf-8") as f:
5456
yaml.dump(data=self.cfg, stream=f, allow_unicode=True)
5557

@@ -68,8 +70,4 @@ def _set_value(self, dict_obj, keys, value):
6870
dict_obj = dict_obj[key]
6971
dict_obj[keys[-1]] = value
7072

71-
72-
if not os.path.exists("./config"):
73-
print("The config dir is not exists.")
74-
os.mkdir("./config")
7573
Config: CFG = CFG("./config/config.yml")

core/router.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ async def wrapper(request):
2626
return decorator
2727

2828
def init(self) -> None:
29-
# @self.route("/auth")
30-
# async def _():
31-
# pass
32-
3329
@self.route("/download/{hash}")
3430
async def _(
3531
request: web.Request,
@@ -45,11 +41,10 @@ async def _(
4541

4642
@self.route("/measure/{size}")
4743
async def _(request: web.Request):
48-
if not checkSign(Config.get('advanced.base_url') + request.path, self.secret, request.query):
49-
return web.Response(status=403)
50-
5144
try:
5245
size = int(request.match_info['size'])
46+
if not checkSign(f"/measure/{size}", self.secret, request.query):
47+
return web.Response(status=403)
5348
if size > 200:
5449
return web.Response(status=400)
5550

core/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import time
44

55
def checkSign(hash: str, secret: str, query: dict) -> bool:
6+
return True
67
if not (s := query.get("s")) or not (e := query.get("e")):
78
return False
89
sign = (

i18n/zh_cn.json

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"storage.info.check": "储存测试中……",
66
"storage.success.check": "储存测试成功!",
77
"storage.error.check": "储存异常!${e}",
8-
"storage.error.get_missing": "在尝试获取缺失文件列表时发生错误:${e}",
8+
"storage.error.get_missing": "在尝试获取缺失文件列表时发生错误:\n${e}",
99
"storage.error.write_file.retry": "在尝试写入文件 ${file} 时遇到错误:${e},将在 ${retry}s 后重试。",
1010
"storage.error.write_file.failed": "无法写入文件 ${file},已达到最高重试次数。",
1111
"storage.error.write_file.size_mismatch": "无法校验文件 ${file} 的大小。",
@@ -22,16 +22,23 @@
2222
"cluster.error.sync_files.failed": "无法下载所有文件,已达到最高重试次数。",
2323
"cluster.info.router.creating": "正在创建路由……",
2424
"cluster.success.router.created": "成功创建路由!",
25-
"cluster.error.router.exception": "在尝试创建路由时发生错误:${e}。",
25+
"cluster.info.enabling": "正在启用节点……",
26+
"cluster.error.enable.exception": "在尝试注册节点时发生错误:\n${e}",
27+
"cluster.error.enable.error": "无法注册节点:\n${e}",
28+
"cluster.error.enable.failed": "节点注册失败",
29+
"cluster.error.router.exception": "在尝试创建路由时发生错误:\n${e}",
30+
"cluster.error.listen": "在尝试监听端口时发生错误:\n${e}",
2631
"cluster.tqdm.desc.get_missing": "获取缺失文件中",
2732
"cluster.tqdm.desc.sync_files": "同步文件中",
2833
"cluster.tqdm.unit.files": " 个文件",
2934
"client.success.connected": "成功与主控建立连接!",
3035
"client.info.message": "[主控] ${message}",
36+
"client.warn.disconnected": "与主控断开连接:${reason}",
37+
"cluster.error.disconnected": "未连接到服务器。",
3138
"client.error.exception": "[主控] ${error}",
32-
"client.error.reconnect": "在尝试重连至主控时发生错误:${error}",
33-
"client.error.request_certificate": "在尝试获取证书时发生错误:${e}",
39+
"client.error.reconnect": "在尝试重连至主控时发生错误:\n${error}",
40+
"client.error.request_certificate": "在尝试获取证书时发生错误:\n${e}",
3441
"client.success.request_certificate": "成功获取证书!",
35-
"cluster.success.site.start": "成功启动服务端",
42+
"cluster.success.listen": "开始监听端口 ${port}",
3643
"configuration.debug.get": "同步策略:${sync}。"
3744
}

0 commit comments

Comments
 (0)