diff --git a/VERSION b/VERSION index 4f1ec5f..d862caa 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v1.10.1-708e4ab +v1.10.1-708e4ab \ No newline at end of file diff --git a/bmclapi_dashboard/static/js/index.min.js b/bmclapi_dashboard/static/js/index.min.js index 21accd9..107afe3 100644 --- a/bmclapi_dashboard/static/js/index.min.js +++ b/bmclapi_dashboard/static/js/index.min.js @@ -3,6 +3,7 @@ const langs = { "zh_cn": { "dashboard": "数据统计", + "version": "版本信息", "measure": "宽带测试", "authentication.fetching": "获取账户信息中", "authentication.login": "登陆", @@ -238,6 +239,21 @@ ".root .left .copyright": [ "position: fixed;", "bottom: 2px", + "width: 184px", + "box-sizing: border-sizing", + "display: flex", + "justify-content: center", + "flex-direction: column", + "align-items: center" + ], + ".root .left .box": [ + "box-sizing: border-sizing", + "border-radius: 8px", + "padding-left: 8px", + "padding-right: 8px", + "background: white", + "margin-bottom: 4px", + "margin-top: 4px" ], ".qps .icon": [ "width: 14px", @@ -471,14 +487,15 @@ display_left() update_left() } - header.append(ttb.createElement("h3").append(ttb.createElement("a").setText(document.title).setAttribute("href", (() => { + const github = (() => { for (child of document.head.children) { if (child.getAttribute("github")) return "//github.com/" + child.getAttribute("github") } return "" - })()))) + })(); + header.append(ttb.createElement("h3").append(ttb.createElement("a").setText(document.title).setAttribute("href", github))) left_copyright.append( - ttb.createElement("p").append(ttb.createElement("a").setAttribute("href", "//github.com/TTB-Network").setText("TTB Network"), " - ", ttb.VERSION) + ttb.createElement("a").class("box").setAttribute("href", "//github.com/TTB-Network").setText("TTB Network") ) left_arrow.event("click", () => { left.toggle("hide") @@ -841,7 +858,7 @@ onopen() { if (this._isws) ttb.createNotication("info", "", ttb.createElement("h4").setText("实时隧道已开启")) this.send("uptime") - this.send("storage") + this.send("version") this.send("status") this._timer_qps?.block(); this._timer?.block() @@ -1099,6 +1116,13 @@ }) } + if (type == "version") { + if (data.cur != data.latest) { + ttb.createNotication("info", "", ttb.createElement("h4").setText("有新的版本更新"), ttb.createElement("p").setText(data.latest)) + } + version.version = data + version._update() + } root_handler("_ws_message", type, data) } _deserializeData(input) { @@ -1362,11 +1386,40 @@ page.push(...this._page) } } + class Version { + constructor() { + this.version = { + "cur": "Unknown", + "latest": "Unknown" + } + this._page = [ + ttb.createElement("div").class("panel").append( + ttb.createElement("p").class("title").setText("当前节点版本"), + ttb.createElement("p").class("value").setText("-"), + ttb.createElement("p").class("title").setText("最新版本"), + ttb.createElement("p").class("value").setText("-"), + ttb.createElement("button").class("button").setText("查看详情").setAttribute("href").event('click', () => { + if (this.version.cur == this.version.latest) return + window.location.href = github + "/releases/" + this.version.latest + }) + ) + ] + this._update() + } + _update() { + this._page[0].getChildrens()[1].setText(this.version.cur) + this._page[0].getChildrens()[3].setText(this.version.latest) + this._page[0].getChildrens()[4].valueOf().style.display = (this.version.cur != this.version.latest ? "" : "none") + } + connect(page) { + page.push(...this._page) + } + } class Storage { constructor() { this._storages = [] } - connect() { + connect(page) { //ws.send("storage") } } @@ -1374,7 +1427,9 @@ const dashboard = new Dashboard() const measure = new Measure() const storage = new Storage() + const version = new Version() menu("dashboard", "", dashboard) + menu("version", "", version) ws.menu("measure", "", measure) ws.setAuthInfo() })(); diff --git a/core/api.py b/core/api.py index 9ec3bf6..4d3b677 100644 --- a/core/api.py +++ b/core/api.py @@ -9,7 +9,6 @@ import zlib import aiofiles -from tqdm import tqdm from core.config import Config @@ -28,7 +27,7 @@ class BMCLAPIFile: mtime: int = 0 def __hash__(self): - return int.from_bytes(bytes.fromhex(self.hash)) + return int.from_bytes(bytes.fromhex(self.hash), byteorder="big") def __eq__(self, other): if isinstance(other, BMCLAPIFile): @@ -47,6 +46,7 @@ class File: size: int last_hit: float = 0 last_access: float = 0 + expiry: Optional[float] = None data: Optional[io.BytesIO] = None cache: bool = False @@ -99,15 +99,6 @@ async def get_size(self, hash: str) -> int: """ raise NotImplementedError - @abc.abstractmethod - async def copy(self, origin: Path, hash: str) -> int: - """ - origin: src path - hash: desc path (new path) - return File size - """ - raise NotImplementedError - @abc.abstractmethod async def write(self, hash: str, io: io.BytesIO) -> int: """ diff --git a/core/cluster.py b/core/cluster.py index e5a16d0..16ec598 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -1,6 +1,5 @@ import asyncio import base64 -from dataclasses import dataclass from enum import Enum import hashlib import hmac @@ -26,6 +25,8 @@ import core.web as web from core.logger import logger import plugins +import aiowebdav.client as webdav3_client + from core.api import ( File, @@ -43,7 +44,7 @@ VERSION = f.read().split("\n")[0] f.close() else: - VERSION = "" + VERSION = "Unknown" API_VERSION = "1.10.1" USER_AGENT = f"openbmclapi-cluster/{API_VERSION} python-openbmclapi/{VERSION}" BASE_URL = "https://openbmclapi.bangbang93.com/" @@ -58,7 +59,7 @@ CACHE_BUFFER: int = 1024 * 1024 * 512 CACHE_TIME: int = 1800 CHECK_CACHE: int = 60 -SIGN_SKIP: bool = False +SIGN_SKIP: bool = True DASHBOARD_USERNAME: str = Config.get("dashboard.username") DASHBOARD_PASSWORD: str = Config.get("dashboard.password") @@ -101,7 +102,11 @@ async def fetchToken(self): logger.info("Fetched token.") except aiohttp.ClientError as e: - logger.error(f"An error occured whilst fetching token: {e}.") + logger.error( + f"An error occured whilst fetching token, retrying in 5s: {e}." + ) + await asyncio.sleep(5) + return self.fetchToken() async def getToken(self) -> str: if not self.token: @@ -167,7 +172,12 @@ async def get_files(self) -> list[BMCLAPIFile]: logger.debug(f"Filelist response status: {req.status}") if req.status == 204: return [] - req.raise_for_status() + if req.status != 200: + try: + req.raise_for_status() + except: + logger.error(traceback.format_exc()) + return [] logger.info("Requested filelist.") files = await ParseFileList()(zstd.decompress(await req.read())) self.last_modified = max( @@ -280,6 +290,12 @@ async def __call__( sorted(files, key=lambda x: x.hash) if not self.checked: await dashboard.set_status("正在检查缺失文件") + if not files: + logger.warn("File check skipped as there are currently no files available.") + if self.check_files_timer: + self.check_files_timer.block() + self.check_files_timer = Timer.repeat(self, (), 1800, 1800) + return with tqdm( total=len(files) * len(storages.get_storages()), unit=" file(s)", @@ -311,11 +327,11 @@ async def __call__( more_files = {storage: [] for storage in storages.get_storages()} prefix_files = { prefix: [] - for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } prefix_hash = { prefix: [] - for prefix in (prefix.to_bytes().hex() for prefix in range(256)) + for prefix in (prefix.to_bytes(1, "big").hex() for prefix in range(256)) } for file in files: @@ -462,6 +478,7 @@ async def get(self, hash: str) -> File: if hash in self.cache: file = self.cache[hash] file.last_access = time.time() + file.expiry = file.last_access + CACHE_TIME file.cache = True return file path = Path(str(self.dir) + f"/{hash[:2]}/{hash}") @@ -473,6 +490,7 @@ async def get(self, hash: str) -> File: file.set_data(buf.getbuffer()) self.cache[hash] = file file.cache = False + file.expiry = file.last_access + CACHE_TIME return file async def exists(self, hash: str) -> bool: @@ -514,13 +532,15 @@ async def clear_cache(self): size: int = 0 old_keys: list[str] = [] old_size: int = 0 - for file in sorted( - self.cache.items(), key=lambda x: x[1].last_access, reverse=True + file: File + key: str + for key, file in sorted( + self.cache.items(), key=lambda x: x[1].expiry, reverse=True ): - if size <= CACHE_BUFFER and time.time() - file[1].last_access <= CACHE_TIME: + if size <= CACHE_BUFFER and time.time() >= file.expiry: continue - old_keys.append(file[0]) - old_size += file[1].size + old_keys.append(key) + old_size += file.size if not old_keys: return for key in old_keys: @@ -563,10 +583,133 @@ async def get_cache_stats(self) -> StatsCache: class WebDav(Storage): - def __init__(self, username: str, password: str, endpoint: str) -> None: + def __init__( + self, + username: str, + password: str, + hostname: str, + endpoint: str, + token: Optional[str] = None, + dav: str = "/dav", + ) -> None: self.username = username self.password = password + self.hostname = hostname self.endpoint = endpoint + self.dav = dav + self.files: dict[str, File] = {} + self.token = token + self.fetch = False + self.cache: dict[str, File] = {} + self.empty = File("", "", 0) + Timer.delay(self._list_all) + + def _endpoint(self, file: str): + return f"{self.endpoint}/{file.removeprefix('/')}" + + def _client(self): + return webdav3_client.Client( + { + "webdav_username": self.username, + "webdav_password": self.password, + "webdav_hostname": self.hostname + self.dav, + "webdav_token": self.token, + } + ) + + async def get(self, file: str) -> File: + if file in self.cache and self.cache[file].expiry - 10 > time.time(): + self.cache[file].cache = True + self.cache[file].last_hit = time.time() + return self.cache[file] + async with aiohttp.ClientSession( + self.hostname, auth=aiohttp.BasicAuth(self.username, self.password) + ) as session: + async with session.get( + self.dav + self._endpoint(file[:2] + "/" + file), allow_redirects=False + ) as resp: + f = File( + self.dav + self._endpoint(file[:2] + "/" + file), + file, + size=int(resp.headers.get("Content-Length", 0)), + ) + if resp.status == 200: + f.set_data(await resp.read()) + elif resp.status // 100 == 3: + f.path = resp.headers.get("Location") + self.cache[file] = f + return self.cache[file] + + async def _list_all(self, force=False): + if self.fetch and not force: + return + self.fetch = True + self.files = {} + async with self._client() as client: + dirs = (await client.list(self.endpoint))[1:] + with tqdm(total=len(dirs), desc="[WebDav List Files]") as pbar: + await dashboard.set_status_by_tqdm("正在获取 WebDav 文件列表中", pbar) + for dir in (await client.list(self.endpoint))[1:]: + pbar.update(1) + for file in ( + await client.list( + self._endpoint( + dir, + ), + get_info=True, + ) + )[1:]: + self.files[file["name"]] = File( + file["path"].removeprefix(f"/dav/{self.endpoint}/"), + file["name"], + int(file["size"]), + ) + await asyncio.sleep(0) + return self.files + + async def exists(self, hash: str) -> bool: + if not self.fetch: + self.fetch = True + await self._list_all() + return hash in self.files + + async def get_size(self, hash: str) -> int: + return self.files.get(hash, self.empty).size + + async def write(self, hash: str, io: io.BytesIO) -> int: + async with self._client() as client: + path = self._endpoint(f"{hash[:2]}/{hash}") + await client.upload_to(io, path) + self.files[hash] = File(path, hash, len(io.getbuffer())) + return self.files[hash].size + + async def get_files(self, dir: str) -> list[str]: + return list((hash for hash in self.files.keys() if hash.startswith(dir))) + + async def get_hash(self, hash: str) -> str: + async with self._client() as session: + h = get_hash(hash) + async for data in await session.download_iter( + self._endpoint(f"{hash[:2]}/{hash}") + ): + h.update(data) + return h.hexdigest() + + async def get_files_size(self, dir: str) -> int: + return sum( + (file.size for hash, file in self.files.items() if hash.startswith(dir)) + ) + + async def removes(self, hashs: list[str]) -> int: + success = 0 + async with self._client() as client: + for hash in hashs: + await client.clean(self._endpoint(f"{hash[:2]}/{hash}")) + success += 1 + return success + + async def get_cache_stats(self) -> StatsCache: + return StatsCache() class TypeStorage(Enum): @@ -637,54 +780,77 @@ def is_enable(value): class Cluster: def __init__(self) -> None: + self.connected = False self.sio = socketio.AsyncClient() self.sio.on("message", self._message) self.stats_storage: Optional[stats.SyncStorage] = None - self.keepaliveTimer: Optional[Task] = None - self.keepaliveTimeoutTimer: Optional[Task] = None - self.keepalive_lock = asyncio.Lock() - self.connected = False - self.check_files_timer: Optional[Task] = None - self.trusted = True - self.state = ClusterState.INIT self.downloader = FileDownloader() self.file_check = FileCheck(self.downloader) self._enable_timer: Optional[Task] = None + self.keepaliveTimer: Optional[Task] = None + self.keepaliveTimeoutTimer: Optional[Task] = None + self.keepalive_lock = asyncio.Lock() def _message(self, message): logger.info(f"[Remote] {message}") if "信任度过低" in message: self.trusted = False + async def emit(self, channel, data=None): + await self.sio.emit( + channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) + ) + async def init(self): - await dashboard.set_status("初始化节点中") - await self.file_check() - await dashboard.set_status("初始化节点成功") + if not self.sio.connected: + try: + await self.sio.connect( + BASE_URL, + auth={"token": await token.getToken()}, + transports=["websocket"], + ) + except: + logger.warn("Failed to connect to the main server, retrying after 5s.") + Timer.delay(self.init, (), 5) + return await self.start() async def start(self): - self.state = ClusterState.START - logger.info(f"Starting cluster version {API_VERSION}.") - await dashboard.set_status("启动节点中") - try: - await self.sio.connect( - BASE_URL, - auth={"token": await token.getToken()}, - transports=["websocket"], + await self.cert() + await self.file_check() + await self.enable() + + async def cert(self): + await self.emit("request-cert") + + async def enable(self): + if self.connected: + logger.debug( + "Still trying to enable cluster? You has been blocked. (\nFrom bangbang93:\n 谁他妈\n 一秒钟发了好几百个enable请求\n ban了解一下等我回杭州再看\n ban了先\n\n > Timestamp at 2024/3/30 14:07 GMT+8\n)" ) - except: - logger.warn("Failed to connect to the main server. Retrying after 5s.") - Timer.delay(self.start, (), 5) return - await self.cert() - await self.reconnect() + self.connected = True + if self._enable_timer != None: + self._enable_timer.block() + self._enable_timer = Timer.delay(self.reconnect, (), 30) + await self._enable() - async def enable(self) -> None: + async def reconnect(self): + if self.connected: + await self.disable() + self.connected = False + logger.info("Retrying after 5s.") + await asyncio.sleep(5) + await self.enable() + + async def _enable(self): storage_str = {"file": 0, "webdav": 0} self.trusted = True for storage in storages.get_storages(): if isinstance(storage, FileStorage): storage_str["file"] += 1 + elif isinstance(storage, WebDav): + storage_str["webdav"] += 1 await self.emit( "enable", { @@ -701,16 +867,8 @@ async def enable(self) -> None: }, }, ) - self._enable_timer = Timer.delay( - self._enable_task, - (), - 30, - ) await dashboard.set_status("巡检中") - async def _enable_task(self): - await self.reconnect() - async def message(self, type, data: list[Any]): if len(data) == 1: data.append(None) @@ -724,6 +882,9 @@ async def message(self, type, data: list[Any]): certificate.load_text(ack["cert"], ack["key"]) elif type == "enable": err, ack = data + if self._enable_timer != None: + self._enable_timer.block() + self._enable_timer = None if err: logger.error( f"Unable to start service: {err['message']} Retry in 5s to reconnect." @@ -736,9 +897,6 @@ async def message(self, type, data: list[Any]): logger.info( f"Hosting on {CLUSTER_ID}.openbmclapi.933.moe:{PUBLIC_PORT or PORT}." ) - if self._enable_timer != None: - self._enable_timer.block() - self._enable_timer = None await self.start_keepalive() await dashboard.set_status( "正常工作" + ("" if self.trusted else "(节点信任度过低)") @@ -793,39 +951,15 @@ async def _keepalive(self): ) await self.start_keepalive(60) - async def reconnect(self): - try: - await self.disable() - except: - ... - await self.cert() - await self.enable() - async def _keepalive_timeout(self): logger.warn("Failed to keepalive! Reconnecting the main server...") await self.reconnect() - async def cert(self): - if Path("./.ssl/cert").exists() == Path("./.ssl/key").exists() == True: - return - await self.emit("request-cert") - - async def emit(self, channel, data=None): - await self.sio.emit( - channel, data, callback=lambda x: Timer.delay(self.message, (channel, x)) - ) - async def disable(self): - self.connected = False - if self.keepalive_lock and self.keepalive_lock.locked(): - self.keepalive_lock.release() - if self.keepaliveTimer: - self.keepaliveTimer.block() - if self.keepaliveTimeoutTimer: - self.keepaliveTimeoutTimer.block() if self.sio.connected: await self.emit("disable") logger.info("Disconnected from the main server...") + await dashboard.set_status("已下线") async def get_cache_stats(self) -> StatsCache: stat = StatsCache() @@ -841,9 +975,12 @@ async def get_cache_stats(self) -> StatsCache: last_status: str = "-" storages = StorageManager() github_api = "https://api.github.com" +download_url = "" async def check_update(): + global fetched_version + fetched_version = "Unknown" async with aiohttp.ClientSession(base_url=github_api) as session: logger.info("Checking update...") try: @@ -851,13 +988,16 @@ async def check_update(): "/repos/TTB-Network/python-openbmclapi/releases/latest" ) as req: req.raise_for_status() - fetched_version: str = (await req.json())["tag_name"] + data = await req.json() + fetched_version = data["tag_name"] if fetched_version != VERSION: logger.success(f"New version found: {fetched_version}!") + await dashboard.trigger("version") else: logger.info(f"Already up to date.") except aiohttp.ClientError as e: logger.error(f"An error occured whilst checking update: {e}.") + Timer.delay(check_update, (), 3600) async def init(): diff --git a/core/config.py b/core/config.py index af1adee..807ad96 100644 --- a/core/config.py +++ b/core/config.py @@ -33,6 +33,12 @@ def __init__(self, path: str) -> None: self.cfg = {} if self.file.exists(): self.load() + else: + logger.warn( + f'''The config file "{self.file.absolute()}" doesn't exist, creating a default config file...''' + ) + for key, value in defaults.items(): + self.set(key, value) def load(self): with open(self.file, "r", encoding="utf-8") as f: diff --git a/core/dashboard.py b/core/dashboard.py index 76e04d8..b739f23 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -129,8 +129,8 @@ async def process(type: str, data: Any): else StatsCache() ), } - if type == "storage": - return stats.get_storage_stats() + if type == "version": + return {"cur": cluster.VERSION, "latest": cluster.fetched_version} async def set_status_by_tqdm(text: str, pbar: tqdm, format=unit.format_numbers): @@ -182,6 +182,13 @@ async def set_status(text): last_text = text +async def trigger(type: str, data: Any = None): + app = web.app + output = to_bytes(type, await process(type, data)) + for ws in app.get_websockets("/bmcl/"): + await ws.send(output.io.getvalue()) + + def to_bytes(type: str, data: Any): output = utils.DataOutputStream() output.writeString(type) diff --git a/core/utils.py b/core/utils.py index 1099e4b..63e8c61 100644 --- a/core/utils.py +++ b/core/utils.py @@ -421,20 +421,20 @@ def write(self, value: bytes | int): if isinstance(value, bytes): self.io.write(value) else: - self.io.write((value + 256 if value < 0 else value).to_bytes()) # type: ignore + self.io.write((value + 256 if value < 0 else value).to_bytes(1, "big")) # type: ignore def writeBoolean(self, value: bool): - self.write(value.to_bytes()) + self.write(value.to_bytes(1, "big")) def writeShort(self, data: int): - self.write(((data >> 8) & 0xFF).to_bytes()) - self.write(((data >> 0) & 0xFF).to_bytes()) + self.write(((data >> 8) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 0) & 0xFF).to_bytes(1, "big")) def writeInteger(self, data: int): - self.write(((data >> 24) & 0xFF).to_bytes()) - self.write(((data >> 16) & 0xFF).to_bytes()) - self.write(((data >> 8) & 0xFF).to_bytes()) - self.write((data & 0xFF).to_bytes()) + self.write(((data >> 24) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 16) & 0xFF).to_bytes(1, "big")) + self.write(((data >> 8) & 0xFF).to_bytes(1, "big")) + self.write((data & 0xFF).to_bytes(1, "big")) def writeVarInt(self, value: int): self.write(MinecraftUtils.getVarInt(value)) @@ -474,7 +474,7 @@ def readIntegetr(self): return (value[0] << 24) + (value[1] << 16) + (value[2] << 8) + (value[3] << 0) def readBoolean(self): - return bool(int.from_bytes(self.read(1))) + return bool(int.from_bytes(self.read(1), byteorder="big")) def readShort(self): value = self.read(2) @@ -501,12 +501,12 @@ def readVarInt(self) -> int: j: int = 0 k: int while 1: - k = int.from_bytes(self.read(1)) + k = int.from_bytes(self.read(1), byteorder="big") i |= (k & 0x7F) << j * 7 j += 1 if (k & 0x80) != 128: break - return i - 2**31 * 2 if i >= 2**31 - 1 else i + return i def readString( self, maximun: Optional[int] = None, encoding: Optional[str] = None diff --git a/core/web.py b/core/web.py index 23966ab..a5e2d4f 100644 --- a/core/web.py +++ b/core/web.py @@ -319,7 +319,7 @@ def __init__(self, opcode: int, data: io.BytesIO) -> None: self.data = data self.close = self.opcode == WebSocketOpcode.CLOSE.value if self.close: - self.status = int.from_bytes(self.data.getbuffer()[:2]) + self.status = int.from_bytes(self.data.getbuffer()[:2], byteorder="big") self.reason = self.data.getbuffer()[:2] self.content = data if self.opcode == WebSocketOpcode.TEXT.value: @@ -947,7 +947,7 @@ async def __call__( class RedirectResponse(Response): def __init__(self, location: str) -> None: - super().__init__(headers=Header({"Location": location}), status_code=301) + super().__init__(headers=Header({"Location": location}), status_code=307) class Request: