diff --git a/bmclapi_dashboard/static/js/index.min.js b/bmclapi_dashboard/static/js/index.min.js index 80d347a..4b12b73 100644 --- a/bmclapi_dashboard/static/js/index.min.js +++ b/bmclapi_dashboard/static/js/index.min.js @@ -534,7 +534,36 @@ class Application { border-radius: 4px; background-color: var(--border-background); padding-left: 24px; - padding-bottom: 16px;` + padding-bottom: 16px;`, + '.tqdm': [ + 'display: flex;', + 'margin-top: 16px', + ], + ".tqdm-outline": [ + "display: block" + ], + '.tqdm .tqdm-progressbar': [ + 'margin-left: 8px', + 'margin-right: 8px', + 'display: flex', + 'flex-grow: 1;', + 'align-items: center' + ], + ".tqdm.tqdm-outline .tqdm-progressbar": [ + "margin: 0" + ], + '.tqdm .tqdm-backgroundbar': [ + 'padding: 2px', + 'margin-left: 8px', + 'height: 4px', + 'background: var(--background)', + 'width: 100%' + ], + '.tqdm .tqdm-bar': [ + 'width: 50%', + 'height: 4px', + 'background: var(--main-color)', + ] } this.$side = this.createElement("aside").class("side") this.$container = this.createElement("div").class("main").append( @@ -1990,7 +2019,7 @@ $I18N.addLangs("zh_cn", { "menu.master.rank": "排行榜", "menu.config": "配置", "menu.config.storage": "存储设置", - "tqdm": "%value%/%total%, %item%/s", + "tqdm": "%value%/%total% [%start% < %end%, %item%/s]", "storage.webdav": "正在获取WebDav文件中", "cluster.want_enable": "正在启用", "cluster.enabled.trusted": "正常工作", @@ -2704,6 +2733,16 @@ app.$Menu.add("dashboard", new class { hits: app.createEcharts().style("min-height: 162px;"), bytes: app.createEcharts().style("min-height: 162px;").setFormatter((n) => this._format_bytes(n)) } + this.pbar = app.createElement("div").class("tqdm").append( + app.createElement("p"), + app.createElement("p").class("tqdm-progressbar").append( + app.createElement("span").setText("100%"), + app.createElement("div").class("tqdm-backgroundbar").append( + app.createElement("div").class("tqdm-bar") + ), + ), + app.createElement("p") + ) this.page = [ app.createElement("div").class("panel").append( app.createFlex().append( @@ -2713,17 +2752,18 @@ app.$Menu.add("dashboard", new class { ), app.createElement("div").append( app.createElement("p").class("title").setI18N("dashboard.status"), - app.createElement("p").append( - app.createElement("span").class("value").setText("-"), - app.createElement("span").append( - app.createElement("span").class("value").setText(" | "), - app.createElement("span").class("value").setText(""), - app.createElement("span").class("value").setText(" "), - app.createElement("span").setText("") - ), - ) + app.createElement("p").class("value") ) - ).minWidth(896).child(2) + ).minWidth(896).child(2).addResize(() => { + this.pbar.removeClass("tqdm-outline") + var width = this.pbar.getChildren()[1].valueOf().offsetWidth + if (width >= 84) { + this.pbar.removeClass("tqdm-outline") + } else { + this.pbar.class("tqdm-outline") + } + }), + this.pbar ), app.createElement("div").class("panel nopadding").style("margin-bottom: 0").append( app.createFlex(true).class("flex-space-between").child(2).minWidth(512).append( @@ -3696,15 +3736,21 @@ app.$Menu.add("dashboard", new class { } } setStatus() { - this.page[0].getChildren()[0].getChildren()[1].getChildren()[1].getChildren()[0].setI18N(this.status.key) - this.page[0].getChildren()[0].getChildren()[1].getChildren()[1].getChildren()[1].style(`display: ${this.status.progress ? 'inline' : 'none'}`) + this.page[0].getChildren()[0].getChildren()[1].getChildren()[1].setI18N(this.status.key) + this.pbar.valueOf().style.display = `${this.status.progress ? '' : 'none'}` if (this.status.progress) { var value_formatter = this.status.progress.desc == "files.downloading" ? (n) => this._format_bytes(n) : (n) => n - this.page[0].getChildren()[0].getChildren()[1].getChildren()[1].getChildren()[1].getChildren()[1].setI18N(this.status.progress.desc) - this.page[0].getChildren()[0].getChildren()[1].getChildren()[1].getChildren()[1].getChildren()[3].setI18N("tqdm", { + var percent = ((this.status.progress.value / this.status.progress.total) * 100) + percent = isNaN(percent) ? 0 : percent + this.pbar.getChildren()[0].setI18N(this.status.progress.desc) + this.pbar.getChildren()[1].getChildren()[0].setText(`${percent.toFixed(0)}%`) + this.pbar.getChildren()[1].getChildren()[1].getChildren()[0].style(`width: ${percent}%`) + this.pbar.getChildren()[2].setI18N("tqdm", { value: value_formatter(this.status.progress.value), total: value_formatter(this.status.progress.total), item: value_formatter(this.status.progress.speed), + start: this.status.progress.start, + end: this.status.progress.end }) } } diff --git a/core/api.py b/core/api.py index 084ad24..5f69792 100644 --- a/core/api.py +++ b/core/api.py @@ -5,13 +5,14 @@ import hashlib import io from pathlib import Path +import time from typing import Optional import pyzstd as zstd import aiofiles -from core import web +from core import logger, scheduler, unit, web from core.config import Config -from core.const import CACHE_BUFFER_COMPRESSION_MIN_LENGTH +from core.const import CACHE_BUFFER_COMPRESSION_MIN_LENGTH, CACHE_TIME, CHECK_CACHE, CACHE_BUFFER class FileCheckType(Enum): @@ -27,6 +28,7 @@ class FileContentType(Enum): DATA = "data" URL = "url" PATH = "path" + EMPTY = "empty" @dataclass class BMCLAPIFile: @@ -47,56 +49,55 @@ def __eq__(self, other): ) return False - @dataclass class File: - path: Path | str hash: str size: int - last_hit: float = 0 - last_access: float = 0 - expiry: Optional[float] = None - data: Optional[io.BytesIO] = None + type: FileContentType + data: io.BytesIO | str | Path = None + expiry: float = 0 + compressed: bool = False + data_length: int = 0 cache: bool = False headers: Optional["web.Header"] = None - compressed: bool = False - - def is_url(self): - if not isinstance(self.path, str): - return False - return self.path.startswith("http://") or self.path.startswith("https://") - - def is_path(self): - return isinstance(self.path, Path) - def get_path(self) -> str | Path: - return self.path + def set_data(self, data: io.BytesIO | str | Path): + if isinstance(data, io.BytesIO): + length = len(data.getbuffer()) + if CACHE_BUFFER_COMPRESSION_MIN_LENGTH <= length: + self.data = io.BytesIO(zstd.compress(data.getbuffer())) + self.data_length = len(self.data.getbuffer()) + self.compressed = True + else: + self.data = data + self.data_length = len(data.getbuffer()) + self.compressed = False + self.type = FileContentType.DATA + elif isinstance(data, str): + self.data_length = len(data) + self.data = data + self.type = FileContentType.URL + elif isinstance(data, Path): + self.data_length = len(str(data)) + self.data = data + self.type = FileContentType.PATH def get_data(self): - if not self.data: - return io.BytesIO() - if not self.compressed: + if self.compressed: + return io.BytesIO(zstd.decompress(self.data.getbuffer())) + else: return self.data - return io.BytesIO(zstd.decompress(self.data.getbuffer())) - - def set_data(self, data: io.BytesIO | memoryview | bytes): - if not isinstance(data, io.BytesIO): - data = io.BytesIO(data) - data_length = len(data.getbuffer()) - if data_length >= CACHE_BUFFER_COMPRESSION_MIN_LENGTH: - compressed_data = zstd.compress(data.getbuffer()) - if data_length > len(compressed_data): - self.compressed = True - self.data = io.BytesIO(compressed_data) - return - self.compressed = False - self.data = data - - + def is_url(self): + return self.type == FileContentType.URL + def is_path(self): + return self.type == FileContentType.PATH + def get_path(self) -> Path: + return self.data @dataclass class StatsCache: total: int = 0 bytes: int = 0 + data_bytes: int = 0 class Storage(metaclass=abc.ABCMeta): @@ -104,9 +105,58 @@ def __init__(self, name, width: int) -> None: self.name = name self.disabled = False self.width = width - + self.cache: dict[str, File] = {} + self.cache_timer = scheduler.repeat( + self.clear_cache, delay=CHECK_CACHE, interval=CHECK_CACHE + ) def get_name(self): return self.name + + def get_cache(self, hash: str) -> Optional[File]: + file = self.cache.get(hash, None) + if file is not None: + file.cache = True + if not file.is_url(): + file.expiry = time.time() + CACHE_TIME + return file + + def is_cache(self, hash: str) -> Optional[File]: + return hash in self.cache + + def set_cache(self, hash: str, file: File): + self.cache[hash] = file + + def clear_cache(self): + hashs = set() + data = sorted( + self.cache.copy().items(), + key=lambda x: x[1].expiry, reverse=True) + size = 0 + old_size = 0 + for hash, file in data: + if file.type == FileContentType.EMPTY: + continue + size += file.data_length + if (size <= CACHE_BUFFER and file.expiry >= time.time()): + continue + hashs.add(hash) + old_size += file.data_length + for hash in hashs: + self.cache.pop(hash) + logger.tinfo( + "cluster.info.clear_cache.count", + name=self.name, + count=unit.format_number(len(hashs)), + size=unit.format_bytes(old_size), + ) + + def get_cache_stats(self) -> StatsCache: + stat = StatsCache() + for file in self.cache.values(): + stat.total += 1 + stat.bytes += file.size + stat.data_bytes += file.data_length + return stat @abc.abstractmethod async def get(self, file: str, offset: int = 0) -> File: @@ -175,15 +225,15 @@ async def removes(self, hashs: list[str]) -> int: """ raise NotImplementedError - @abc.abstractmethod - async def get_cache_stats(self) -> StatsCache: - """ - dir: path - Getting cache files - return StatsCache - """ - raise NotImplementedError +@dataclass +class OpenbmclapiAgentConfiguration: + source: str + concurrency: int +@dataclass +class ResponseRedirects: + status: int + url: str def get_hash(org): if len(org) == 32: diff --git a/core/cluster.py b/core/cluster.py index f369d5f..7daf741 100644 --- a/core/cluster.py +++ b/core/cluster.py @@ -16,9 +16,9 @@ import aiofiles import aiohttp from typing import Any, Optional, Type +import aiohttp.client_exceptions import socketio from tqdm import tqdm -import core from core import system from core.config import Config from core import certificate, dashboard, unit @@ -37,9 +37,12 @@ from core.const import * from core.api import ( - File, BMCLAPIFile, + File, FileCheckType, + FileContentType, + OpenbmclapiAgentConfiguration, + ResponseRedirects, StatsCache, Storage, get_hash, @@ -146,12 +149,9 @@ async def get_files(self) -> list[BMCLAPIFile]: ) as session: logger.tdebug("cluster.debug.get_files.created_session") async with session.get( - "/openbmclapi/files", - data={ - "responseType": "buffer", - "cache": "", - "lastModified": self.last_modified, - }, + "/openbmclapi/files", params={ + "lastModified": self.last_modified + } ) as req: logger.tdebug( "cluster.debug.get_files.response_status", status=req.status @@ -189,7 +189,6 @@ async def _download_temporarily_file(self, hash: str): content: io.BytesIO = io.BytesIO() async with session.get( f"/openbmclapi/download/{hash}", - data={"responseType": "buffer", "searchParams": {"noopen": 1}}, ) as resp: while data := await resp.content.read(IO_BUFFER): if not data: @@ -214,14 +213,34 @@ async def _download_temporarily_file(self, hash: str): ) return content - async def _download(self, pbar: tqdm, session: aiohttp.ClientSession): + async def _download(self, pbar: tqdm, lock: asyncio.Semaphore): + async def put(size, file: BMCLAPIFile): + await self.queues.put(file) + pbar.update(-size) + async def error(*responses: aiohttp.ClientResponse): + msg = [] + history = list((ResponseRedirects(resp.status, str(resp.real_url)) for resp in responses)) + source = "主控" if len(history) == 1 else "节点" + for history in history: + msg.append(f" > {history.status} | {history.url}") + history = '\n'.join(msg) + logger.error(f"下载错误 {file.hash}({unit.format_bytes(file.size)}) 来自{source}地址 [{responses[-1].host}] 响应 [{responses[-1].status}] 历史地址:\n{history}") while not self.queues.empty() and storages.available: - file = await self.queues.get() - hash = get_hash(file.hash) - size = 0 - content: io.BytesIO = io.BytesIO() - try: - async with session.get(file.path) as resp: + async with aiohttp.ClientSession( + BASE_URL, + headers={ + "User-Agent": USER_AGENT, + "Authorization": f"Bearer {await token.getToken()}", + }, + ) as session: + file = await self.queues.get() + hash = get_hash(file.hash) + size = 0 + content: io.BytesIO = io.BytesIO() + resp = None + try: + #async with lock: + resp = await session.get(file.path) while data := await resp.content.read(IO_BUFFER): if not data: break @@ -230,18 +249,30 @@ async def _download(self, pbar: tqdm, session: aiohttp.ClientSession): pbar.update(byte) content.write(data) hash.update(data) + resp.close() + except asyncio.CancelledError: + return + except aiohttp.client_exceptions.ClientConnectionError: + if resp is not None: + await error(*resp.history, resp) + await put(size, file) + continue + except: + logger.error(traceback.format_exc()) + if resp is not None: + await error(*resp.history, resp) + await put(size, file) + continue if file.hash != hash.hexdigest(): - raise EOFError + await error(*resp.history, resp) + await put(size, file) + await asyncio.sleep(5) + continue r = await self._mount_file(file, content) if r[0] == -1: - raise EOFError - except asyncio.CancelledError: - break - except: - pbar.update(-size) - await self.queues.put(file) - del content - await session.close() + logger.error("放入存储时候错误") + await put(size, file) + continue async def _mount_file( self, file: BMCLAPIFile, buf: io.BytesIO @@ -251,6 +282,8 @@ async def _mount_file( r = -1 try: r = await storage.write(file.hash, buf) + except asyncio.CancelledError: + return buf, -1 except: logger.error(traceback.format_exc()) if r != file.size: @@ -267,7 +300,7 @@ async def _mount_file( result = result or r return buf, result or -1 - async def download(self, miss: list[BMCLAPIFile]): + async def download(self, miss: list[BMCLAPIFile], configuration: OpenbmclapiAgentConfiguration): if not storages.available: logger.terror("cluster.erorr.cluster.storage.available") return @@ -281,23 +314,9 @@ async def download(self, miss: list[BMCLAPIFile]): await dashboard.set_status_by_tqdm("files.downloading", pbar) for file in miss: await self.queues.put(file) - timers = [] - for _ in range(0, MAX_DOWNLOAD, max(MAX_DOWNLOAD, 32)): - for __ in range(max(32, MAX_DOWNLOAD)): - timers.append( - self._download( - pbar, - aiohttp.ClientSession( - BASE_URL, - headers={ - "User-Agent": USER_AGENT, - "Authorization": f"Bearer {await token.getToken()}", - }, - ), - ), - ) + lock = asyncio.Semaphore(configuration.concurrency) try: - await asyncio.gather(*timers) + await asyncio.gather(*[self._download(pbar, lock) for _ in range(MAX_DOWNLOAD)]) except asyncio.CancelledError: raise asyncio.CancelledError logger.tsuccess("cluster.info.download.finished") @@ -316,12 +335,26 @@ def __init__(self, downloader: FileDownloader) -> None: self.pbar: Optional[tqdm] = None self.check_files_timer: Optional[int] = None logger.tinfo("cluster.info.check_files.check_type", type=self.check_type.name) + self.configurations: dict[str, OpenbmclapiAgentConfiguration] = {} def start_task(self): if self.check_files_timer: scheduler.cancel(self.check_files_timer) self.check_files_timer = scheduler.delay(self.__call__, delay=1800) + async def get_configuration(self) -> tuple[str, OpenbmclapiAgentConfiguration]: + async with aiohttp.ClientSession( + base_url=BASE_URL, + headers={ + "User-Agent": USER_AGENT, + "Authorization": f"Bearer {await token.getToken()}" + } + ) as session: + async with session.get("/openbmclapi/configuration") as resp: + self.configuration = { + key: OpenbmclapiAgentConfiguration(**value) for key, value in (await resp.json()).items() + } + return sorted(self.configuration.items(), key=lambda x: x[1].concurrency)[0] async def __call__( self, ) -> Any: @@ -473,7 +506,9 @@ async def __call__( "cluster.info.check_files.missing", count=unit.format_number(len(miss_all_files)), ) - await self.downloader.download(list(miss_all_files)) + configuration = await self.get_configuration() + logger.tinfo("cluster.info.download.configuration", type=configuration[0], source=configuration[1].source, concurrency=configuration[1].concurrency) + await self.downloader.download(list(miss_all_files), configuration[1]) self.start_task() async def _exists(self, file: BMCLAPIFile, storage: Storage): @@ -519,28 +554,24 @@ def __init__(self, name: str, dir: Path, width: int) -> None: if self.dir.is_file(): raise FileExistsError(f"Cannot copy file: '{self.dir}': Is a file.") self.dir.mkdir(exist_ok=True, parents=True) - self.cache: dict[str, File] = {} - self.timer = scheduler.repeat( - self.clear_cache, delay=CHECK_CACHE, interval=CHECK_CACHE - ) async def get(self, hash: str, offset: int = 0) -> File: - if hash in self.cache: - file = self.cache[hash] - file.last_access = time.time() - file.cache = True + if self.is_cache(hash): + file = self.get_cache(hash) return file path = Path(str(self.dir) + f"/{hash[:2]}/{hash}") - file = File(path, hash, path.stat().st_size) + file = File(hash, path.stat().st_size, FileContentType.EMPTY) if CACHE_ENABLE: buf = io.BytesIO() async with aiofiles.open(path, "rb") as r: while data := await r.read(IO_BUFFER): buf.write(data) - file = File(path, hash, buf.tell(), time.time(), time.time()) - file.set_data(buf.getbuffer()) - self.cache[hash] = file - file.cache = False + file = File(hash, buf.tell(), time.time(), time.time()) + file.set_data(buf) + else: + file.set_data(path) + if CACHE_ENABLE: + self.set_cache(hash, file) return file async def exists(self, hash: str) -> bool: @@ -567,29 +598,6 @@ async def get_hash(self, hash: str) -> str: await asyncio.sleep(0.001) return h.hexdigest() - async def clear_cache(self): - size: int = 0 - old_keys: list[str] = [] - old_size: int = 0 - file: File - key: str - for key, file in sorted( - self.cache.items(), key=lambda x: x[1].last_access, reverse=True - ): - if size <= CACHE_BUFFER and file.last_access + CACHE_TIME >= time.time(): - continue - old_keys.append(key) - old_size += file.size - if not old_keys: - return - for key in old_keys: - self.cache.pop(key) - logger.tinfo( - "cluster.info.clear_cache.count", - count=unit.format_number(len(old_keys)), - size=unit.format_bytes(old_size), - ) - async def get_files(self, dir: str) -> list[str]: files = [] if os.path.exists(str(self.dir) + f"/{dir}"): @@ -615,13 +623,6 @@ async def get_files_size(self, dir: str) -> int: size += file.stat().st_size return size - async def get_cache_stats(self) -> StatsCache: - stat = StatsCache() - for file in self.cache.values(): - stat.total += 1 - stat.bytes += file.size - return stat - class WebDav(Storage): def __init__( @@ -637,14 +638,10 @@ def __init__( self.username = username self.password = password self.hostname = hostname - self.endpoint = endpoint.replace("\\", "/").replace("//", "/").removesuffix("/") + self.endpoint = "/" + endpoint.replace("\\", "/").replace("//", "/").removesuffix("/").removeprefix('/') self.files: dict[str, File] = {} self.dirs: list[str] = [] self.fetch: bool = False - self.cache: dict[str, File] = {} - self.timer = scheduler.repeat( - self.clear_cache, delay=CHECK_CACHE, interval=CHECK_CACHE - ) self.empty = File("", "", 0) self.lock = utils.WaitLock() self.session = webdav3_client.Client( @@ -656,9 +653,8 @@ def __init__( } ) self.session_lock = asyncio.Lock() - self.session_tcp_connector = aiohttp.TCPConnector(limit = LIMIT_SESSION_WEBDAV) + self.get_session_lock = asyncio.Semaphore(LIMIT_SESSION_WEBDAV) self.get_session = aiohttp.ClientSession( - connector=self.session_tcp_connector, auth=aiohttp.BasicAuth( self.username, self.password @@ -667,29 +663,6 @@ def __init__( scheduler.delay(self._list_all) scheduler.repeat(self._keepalive, interval=60) - async def clear_cache(self): - size: int = 0 - old_keys: list[str] = [] - old_size: int = 0 - file: File - key: str - for key, file in sorted( - self.cache.items(), key=lambda x: x[1].expiry, reverse=True - ): - if size <= CACHE_BUFFER and file.expiry < time.time() - 600: - continue - old_keys.append(key) - old_size += file.size - if not old_keys: - return - for key in old_keys: - self.cache.pop(key) - logger.tinfo( - "cluster.info.clear_cache.count", - count=unit.format_number(len(old_keys)), - size=unit.format_bytes(old_size), - ) - async def _keepalive(self): try: hostname = self.hostname @@ -808,9 +781,9 @@ def stop(tqdm: Optional[tqdm] = None): if file["isdir"]: continue files[file["name"]] = File( - file["path"].removeprefix(f"/dav/{self.endpoint}/"), file["name"], - int(file["size"]) + int(file["size"]), + FileContentType.EMPTY ) try: await asyncio.sleep(0) @@ -837,10 +810,9 @@ async def _wait_lock(self): await self.lock.wait() async def get(self, hash: str, offset: int = 0) -> File: - if hash in self.cache and self.cache[hash].expiry is not None and self.cache[hash].expiry - 10 > time.time(): - self.cache[hash].cache = True - self.cache[hash].last_hit = time.time() - return self.cache[hash] + if self.is_cache(hash): + file = self.get_cache(hash) + return file try: f = File( hash, @@ -848,34 +820,35 @@ async def get(self, hash: str, offset: int = 0) -> File: 0, ) session = self.get_session - async with session.get( - self.hostname + self._file_endpoint(hash[:2] + "/" + hash), - allow_redirects=False, - ) as resp: - if resp.status == 200: - f.headers = {} - for field in ( - "ETag", - "Last-Modified", - "Content-Length", - ): - if field not in resp.headers: - continue - f.headers[field] = resp.headers.get(field) - f.size = int(resp.headers.get("Content-Length", 0)) - f.set_data(await resp.read()) - if CACHE_ENABLE: - f.expiry = time.time() + CACHE_TIME - self.cache[hash] = f - elif resp.status // 100 == 3: - f.size = await self.get_size(hash) - f.path = resp.headers.get("Location") - expiry = re.search(r"max-age=(\d+)", resp.headers.get("Cache-Control", "")) or 0 - if max(expiry, CACHE_TIME) == 0: - return f - f.expiry = time.time() + expiry - if CACHE_ENABLE or f.expiry != 0: - self.cache[hash] = f + async with self.get_session_lock: + async with session.get( + self.hostname + self._file_endpoint(hash[:2] + "/" + hash), + allow_redirects=False, + ) as resp: + if resp.status == 200: + f.headers = {} + for field in ( + "ETag", + "Last-Modified", + "Content-Length", + ): + if field not in resp.headers: + continue + f.headers[field] = resp.headers.get(field) + f.size = int(resp.headers.get("Content-Length", 0)) + f.set_data(io.BytesIO(await resp.read())) + if CACHE_ENABLE: + f.expiry = time.time() + CACHE_TIME + self.set_cache(hash, f) + elif resp.status // 100 == 3: + f.size = await self.get_size(hash) + f.set_data(resp.headers.get("Location")) + expiry = re.search(r"max-age=(\d+)", resp.headers.get("Cache-Control", "")) or 0 + if max(expiry, CACHE_TIME) == 0: + return f + f.expiry = time.time() + expiry + if CACHE_ENABLE or f.expiry != 0: + self.set_cache(hash, f) return f except Exception: storages.disable(self) @@ -896,7 +869,7 @@ async def write(self, hash: str, io: io.BytesIO) -> int: path = self._file_endpoint(f"{hash[:2]}/{hash}") await self._mkdir(self._file_endpoint(f"{hash[:2]}")) await self._execute(self.session.upload_to(io.getbuffer(), path)) - self.files[hash] = File(path, hash, len(io.getbuffer())) + self.files[hash] = File(hash, len(io.getbuffer()), FileContentType.EMPTY) return self.files[hash].size async def get_files(self, dir: str) -> list[str]: @@ -923,19 +896,12 @@ async def get_files_size(self, dir: str) -> int: async def removes(self, hashs: list[str]) -> int: success = 0 for hash in hashs: - await self._execute( - self.session.clean(self._file_endpoint(f"{hash[:2]}/{hash}")) - ) + #await self._execute( + # self.session.clean(self._file_endpoint(f"{hash[:2]}/{hash}")) + #) success += 1 return success - async def get_cache_stats(self) -> StatsCache: - stat = StatsCache() - for file in self.cache.values(): - stat.total += 1 - stat.bytes += file.size - return stat - class TypeStorage(Enum): FILE = "file" @@ -1399,8 +1365,8 @@ async def _(request: web.Request, size: int, config: web.ResponseConfiguration): yield b"\x00" * 1024 * 1024 return - @app.get("/download/{hash}", access_logs=False) - async def _(request: web.Request, hash: str): + @app.get("/download/{hash}") + async def _(request: web.Request, hash: str, config: web.ResponseConfiguration): if ( not SIGN_SKIP and not utils.check_sign( @@ -1431,10 +1397,11 @@ async def _(request: web.Request, hash: str): ) if not data: return web.Response(status_code=404) + config.access_log = DOWNLOAD_ACCESS_LOG if data.is_url() and isinstance(data.get_path(), str): - return web.RedirectResponse(str(data.get_path())).set_headers(name) + return web.RedirectResponse(str(data.get_path()), response_configuration=config).set_headers(name) return web.Response( - data.get_data().getbuffer() if not data.is_path() else data.get_path(), headers=data.headers or {} + data.get_data().getbuffer() if not data.is_path() else data.get_path(), headers=data.headers or {}, response_configuration=config ).set_headers(name) dir = Path("./bmclapi_dashboard/") diff --git a/core/config.py b/core/config.py index 944a38a..8a59763 100644 --- a/core/config.py +++ b/core/config.py @@ -38,7 +38,7 @@ "advanced.debug": False, "advanced.language": "zh_cn", "advanced.auto_update": False, - "advanced.copy_from_other_storage": True, + "advanced.copy_from_another_storage": True, "dashboard.username": "admin", "dashboard.websocket": True, "dashboard.password": "".join( diff --git a/core/const.py b/core/const.py index e57f149..1059544 100644 --- a/core/const.py +++ b/core/const.py @@ -2,6 +2,7 @@ import gzip import os from pathlib import Path +import re from typing import Any import zlib @@ -22,6 +23,7 @@ API_VERSION = "1.10.4" USER_AGENT = f"openbmclapi-cluster/{API_VERSION} python-openbmclapi/{VERSION}" BASE_URL = Config.get("advanced.url", "https://openbmclapi.bangbang93.com/") +BD_URL = BASE_URL.replace("openbmclapi", "bd") CLUSTER_ID: str = Config.get("cluster.id") CLUSTER_SECERT: str = Config.get("cluster.secret") IO_BUFFER: int = Config.get("advanced.io_buffer") @@ -51,6 +53,8 @@ RESPONSE_HEADERS = { "Server": Config.get("web.server_name"), } +CLUSTER_PATTERN = re.compile(r'https?://([a-fA-F0-9]*)\.openbmclapi\.933\.moe(:\d+)/') +DOWNLOAD_ACCESS_LOG: bool = True RESPONSE_DATE = "%a, %d %b %Y %H:%M:%S GMT" RESPONSE_COMPRESSION_IGNORE_SIZE_THRESHOLD: int = 16777216 SKIP_FILE_CHECK: bool = False diff --git a/core/dashboard.py b/core/dashboard.py index 80c15aa..00c7837 100644 --- a/core/dashboard.py +++ b/core/dashboard.py @@ -143,6 +143,8 @@ async def process(type: str, data: Any): "speed": cur_tqdm.speed, "unit": cur_tqdm.object.unit, "desc": cur_tqdm.desc, + "start": utils.format_stime(time.time() - cur_tqdm.object.start_t), + "end": utils.format_stime(((cur_tqdm.object.total - cur_tqdm.object.n) / cur_tqdm.speed)) if cur_tqdm.speed != 0 else utils.format_stime(None) } } ) @@ -157,7 +159,7 @@ async def process(type: str, data: Any): "connections": system.get_connections(), "cpu": system.get_cpus(), "cache": ( - asdict(await get_cache_stats()) if cluster.cluster else StatsCache() + asdict(get_cache_stats()) if cluster.cluster else StatsCache() ), } if type == "version": @@ -182,12 +184,13 @@ async def process(type: str, data: Any): return system.get_loads_detail() -async def get_cache_stats() -> StatsCache: +def get_cache_stats() -> StatsCache: stat = StatsCache() for storage in cluster.storages.get_storages(): - t = await storage.get_cache_stats() + t = storage.get_cache_stats() stat.total += t.total stat.bytes += t.bytes + stat.data_bytes += t.data_bytes return stat diff --git a/core/utils.py b/core/utils.py index 73008b0..baea4f9 100644 --- a/core/utils.py +++ b/core/utils.py @@ -454,12 +454,12 @@ def updateDict(dict: dict, new: dict): def format_stime(n): if not n: - return "--:--:--" + return "--:--" n = int(n) hour = int(n / 60 / 60) minutes = int(n / 60 % 60) second = int(n % 60) - return f"{hour:02d}:{minutes:02d}:{second:02d}" + return f"{minutes:02d}:{second:02d}" if hour == 0 else f"{hour:02d}:{minutes:02d}:{second:02d}" def format_time(k: float): diff --git a/core/web.py b/core/web.py index 9c52fd4..07b9509 100644 --- a/core/web.py +++ b/core/web.py @@ -100,7 +100,7 @@ def __init__( path: str, method: Optional[str], handler: Callable[..., Coroutine], - access_logs: bool = True, + response_configuration: Optional["ResponseConfiguration"] = None, **kwargs, ) -> None: self._path = path if path.startswith("/") else "/" + path @@ -112,7 +112,7 @@ def __init__( self.regexp: re.Pattern = re.compile( rf"^{path.replace('{', '(?P<').replace('}', r'>[^/]*)')}$" ) - self.access_logs = access_logs + self._response = response_configuration self.kwargs = kwargs def is_params(self): @@ -512,7 +512,6 @@ def redirect(self, path: str, redirect_to: str): return self._routes[0].redierct(path, redirect_to) async def handle(self, request: "Request"): - access_logs = True url = request.get_url() redirect = None for route in self._routes: @@ -524,11 +523,12 @@ async def handle(self, request: "Request"): return result = None cur_route = None + response_configuration = None method = await request.get_method() for route in self._routes: cur_route = route.get_route(method, url) if cur_route: - access_logs = cur_route.access_logs + response_configuration = cur_route._response prefix = route.get_prefix() break if cur_route is not None: @@ -608,7 +608,7 @@ async def handle(self, request: "Request"): } ), status_code=101, - access_logs=access_logs, + response_configuration=response_configuration ) ws.start() if request.get_url() not in self._ws: @@ -639,7 +639,7 @@ async def handle(self, request: "Request"): yield Response( content=result or "", headers=Header({"Server": Config.get("web.server_name")}), - access_logs=access_logs, + response_configuration=response_configuration, ) def mount(self, router: Router): @@ -714,6 +714,7 @@ def __len__(self) -> int: @dataclass class ResponseConfiguration: length: Optional[int] = None + access_log: bool = True class Response: @@ -725,6 +726,7 @@ def __init__( content_type: Optional[str] = None, compress=None, status_code: int = 200, + response_configuration: Optional[ResponseConfiguration] = None, **kwargs, ) -> None: self.status_code = status_code @@ -733,6 +735,7 @@ def __init__( self._cookies = cookies self.content_type = content_type self._compress = compress + self._response_configuration = response_configuration self._kwargs = kwargs def set_headers(self, header: Header | dict[str, Any]): @@ -788,6 +791,7 @@ async def raw(self): self._headers = self.content._headers self._cookies = self.content._cookies self._kwargs = self.content._kwargs + self._response_configuration = self.content._response_configuration self.content = self.content.content if isinstance(self.content, (Coroutine, Response)): await self.raw() @@ -927,12 +931,12 @@ async def __call__( class RedirectResponse(Response): def __init__( - self, location: str, headers: Header | dict[str, Any] | None = None + self, location: str, headers: Header | dict[str, Any] | None = None, response_configuration: Optional[ResponseConfiguration] = None ) -> None: header = Header() header.update(headers) header.update({"Location": location}) - super().__init__(headers=header, status_code=307) + super().__init__(headers=header, status_code=307, response_configuration=response_configuration) class Request: @@ -1310,7 +1314,7 @@ async def handle(data, client: Client): await resp(request, client) if not log: log = True - if resp._kwargs.get("access_logs", True): + if resp._response_configuration is None or resp._response_configuration.access_log: logger.info( request.get_request_time(), "|", diff --git a/i18n/zh_cn.json b/i18n/zh_cn.json index 5ca2ac7..23e4c2d 100644 --- a/i18n/zh_cn.json +++ b/i18n/zh_cn.json @@ -34,7 +34,7 @@ "cluster.error.check_files.failed_to_copy": "无法复制缺失文件:$hash ($file) => $hash ($target)。", "cluster.success.check_files.finished": "文件已全部检查成功!文件数量:$count ($size)。", "cluster.info.check_files.missing": "文件缺失:$count。", - "cluster.info.clear_cache.count": "过期缓存:$count ($size)。", + "cluster.info.clear_cache.count": "$name 过期缓存:$count ($size)。", "cluster.tqdm.desc.cleaning_cache": "清除缓存中", "cluster.tqdm.desc.parsing_file_list": "解析文件列表中", "cluster.tqdm.desc.fetching_webdav_files": "获取 WebDAV 文件列表中", @@ -79,5 +79,6 @@ "core.info.shutting_down_web_service": "正在关闭 Web 服务……", "stats.error.broken_stats": "统计信息已损坏……", "system.info.loading": "正在初始化系统信息统计……", - "timings.info.finished": "任务已完成,耗时:$time。" + "timings.info.finished": "任务已完成,耗时:$time。", + "cluster.info.download.configuration": "下载配置策略:$type 下载源:$source 下载并发:$concurrency" }