Skip to content

Commit 49fd95f

Browse files
committed
fix: 修复缓存导致内存暴增
1 parent abba2c7 commit 49fd95f

File tree

8 files changed

+111
-110
lines changed

8 files changed

+111
-110
lines changed

assets/js/index.js

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,12 @@ class Channel {
177177
total: 0,
178178
current: 0,
179179
};
180+
this.timeout = 10000
180181
if (!this.support_websocket) return;
181182
this._ws_init();
182183
this._ws_initizalized = false;
183184
this._ws_callbacks = {};
185+
this._ws_timeouts = {}
184186
}
185187
// websocket
186188
_ws_init() {
@@ -191,21 +193,23 @@ class Channel {
191193
this._ws.onmessage = (event) => {
192194
var data = JSON.parse(event.data);
193195
if (data.echo_id) {
196+
clearTimeout(this._ws_timeouts[data.echo_id])
197+
delete this._ws_timeouts[data.echo_id]
194198
this._ws_callbacks[data.echo_id].resolve(data.data);
195199
delete this._ws_callbacks[data.echo_id];
196200
return;
197201
}
198-
console.log(data)
202+
window.dispatchEvent(new CustomEvent(`channel_${data.event}`, { detail: data.data }))
199203
}
200204
this._ws.onclose = () => {
201205
this._ws_initizalized = false;
202-
this._Ws_reconnect();
206+
this._ws_reconnect();
203207
}
204208
this._ws.onerror = (event) => {
205209
console.log("websocket error", event)
206210
}
207211
}
208-
_Ws_reconnect() {
212+
_ws_reconnect() {
209213
if (this._ws_reconnect_task) return;
210214
this._ws_reconnect_task = setTimeout(() => {
211215
this._ws_init();
@@ -223,6 +227,10 @@ class Channel {
223227
data,
224228
echo_id
225229
}))
230+
this._ws_timeouts[echo_id] = setTimeout(() => {
231+
delete this._ws_callbacks[echo_id];
232+
reject("timeout")
233+
}, this.timeout)
226234
})
227235
}
228236

@@ -250,6 +258,7 @@ class Channel {
250258
})
251259
xhr.addEventListener("readystatechange", (event) => {
252260
if (xhr.readyState == 4) {
261+
clearTimeout(timer)
253262
if (xhr.statusText == "OK") {
254263
resolve(JSON.parse(xhr.responseText)[0].data)
255264
} else {
@@ -261,6 +270,10 @@ class Channel {
261270
event,
262271
data: data
263272
}))
273+
var timer = setTimeout(() => {
274+
xhr.abort();
275+
reject("timeout")
276+
}, this.timeout)
264277
})
265278
}
266279

core/cache.py

Lines changed: 45 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,63 @@
11
import abc
2+
import collections
23
from dataclasses import dataclass
34
import inspect
45
import time
56
from typing import Any, Optional, TypeVar
67

7-
from . import scheduler
8+
from apscheduler.schedulers.background import BackgroundScheduler
9+
from apscheduler.job import Job
810

911

1012
T = TypeVar("T")
1113
EMPTY = inspect._empty
1214

13-
@dataclass
14-
class StorageValue:
15-
value: Any
16-
expires: Optional[float]
17-
timestamp: float
18-
19-
20-
class Storage(metaclass=abc.ABCMeta):
21-
def __init__(self) -> None:
22-
self.cache: dict[str, StorageValue] = {}
23-
self._clean_task: int = -1
24-
@abc.abstractmethod
25-
def set(self, key: str, value: object, expires: Optional[float] = None) -> None:
26-
raise NotImplementedError
27-
@abc.abstractmethod
28-
def get(self, key: str, _def: Any = EMPTY) -> Any:
29-
raise NotImplementedError
30-
31-
@abc.abstractmethod
32-
def delete(self, key: str) -> None:
33-
raise NotImplementedError
34-
35-
def __setitem__(self, key: str, value: object) -> Any:
36-
return self.set(key, value)
37-
38-
def __getitem__(self, key: str) -> Any:
39-
value = self.get(key)
40-
if value == EMPTY:
41-
raise IndexError
42-
return value
43-
def exists(self, key: str) -> bool:
44-
obj = self.cache.get(key, None)
45-
return obj is not None and (obj.expires is not None and obj.expires + obj.timestamp >= time.time() or not obj.expires)
46-
def __contains__(self, key: str) -> bool:
47-
return self.exists(key)
48-
49-
def clean(self):
50-
for k, v in filter(lambda x: (x[1].expires is not None and x[1].expires + x[1].timestamp >= time.time()), list(self.cache.items())):
51-
self.cache.pop(k)
52-
print(k)
53-
self._start_clean()
54-
15+
class CacheValue[T]:
16+
def __init__(self, value: T, expires: Optional[float] = None) -> None:
17+
self.value: T = value
18+
self.expires: Optional[float] = expires
19+
self.timestamp: float = time.monotonic_ns()
20+
self.job: Optional[Job] = None
21+
22+
23+
# Time out cache
24+
class TimeoutCache[T]:
25+
def __init__(self):
26+
self.cache: collections.OrderedDict[str, CacheValue] = collections.OrderedDict()
27+
self.background = BackgroundScheduler()
28+
29+
def set(self, key: str, value: T, timeout: Optional[float] = None) -> None:
30+
self._delete_job(key)
31+
self.cache[key] = CacheValue(
32+
value,
33+
timeout
34+
)
35+
if timeout is not None:
36+
self.cache[key].job = self.background.add_job(self.delete, 'interval', seconds=timeout, args=[key])
5537

56-
def _start_clean(self):
57-
scheduler.cancel(self._clean_task)
58-
ready = filter(lambda x: (x.expires is not None and x.expires + x.timestamp >= time.time()), list(self.cache.values()))
59-
if not ready:
38+
def _delete_job(self, key: str):
39+
current = self.cache.get(key, None)
40+
if current is None or current.job is None:
6041
return
61-
next_time = max(ready, key=lambda x: (x.expires + x.timestamp) - time.time()) # type: ignore
62-
self._clean_task = scheduler.run_later(self.clean, delay=(next_time.expires + next_time.timestamp) - time.time()) # type: ignore
42+
current.job.remove()
43+
current.job = None
6344

64-
def get_keys(self) -> list[str]:
65-
return list(self.cache.keys())
66-
67-
def get_startswith_all(self, key: str) -> dict[str, Any]:
68-
return {k: v for k, v in self.cache.items() if k.startswith(key)}
69-
70-
def get_endswith_all(self, key: str) -> dict[str, Any]:
71-
return {k: v for k, v in self.cache.items() if k.endswith(key)}
72-
73-
def get_contains_all(self, key: str) -> dict[str, Any]:
74-
return {k: v for k, v in self.cache.items() if key in k}
75-
76-
class MemoryStorage(Storage):
77-
def __init__(self) -> None:
78-
super().__init__()
79-
80-
def set(self, key: str, value: object, expires: float | None = None) -> None:
81-
data = value
82-
obj = StorageValue(
83-
data,
84-
expires,
85-
time.time()
86-
)
87-
self.cache[key] = obj
88-
8945
def get(self, key: str, _def: Any = EMPTY) -> Any:
90-
if not self.exists(key):
46+
current = self.cache.get(key, None)
47+
if current is None:
48+
return _def
49+
if current.expires is not None and current.expires + current.timestamp < time.monotonic_ns():
50+
self.delete(key)
9151
return _def
92-
return self.cache[key].value or _def
52+
return current.value
9353

9454
def delete(self, key: str) -> None:
95-
self.cache.pop(key, None)
55+
self._delete_job(key)
56+
self.cache.pop(key, None)
57+
58+
def __contains__(self, key: str) -> bool:
59+
if key in self.cache:
60+
current = self.cache[key]
61+
if current.expires is not None and current.expires + current.timestamp < time.monotonic_ns():
62+
self.delete(key)
63+
return key in self.cache

core/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def rank_clusters_url(self):
170170

171171
const = Const()
172172

173-
VERSION = "3.3.2"
173+
VERSION = "3.3.3"
174174
API_VERSION = "1.13.1"
175175
USER_AGENT = f"openbmclapi/{API_VERSION} python-openbmclapi/{VERSION}"
176176
PYTHON_VERSION = ".".join(map(str, (sys.version_info.major, sys.version_info.minor, sys.version_info.micro)))

core/dashboard.py

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -566,9 +566,9 @@ async def handle_api(
566566
for item in resp:
567567
clusters[item["_id"]] = item["name"]
568568

569-
return [
570-
clusters.get(c.id, c.id) for c in cluster.clusters.clusters
571-
]
569+
return {
570+
c.id: clusters.get(c.id, c.id) for c in cluster.clusters.clusters
571+
}
572572

573573
if event == "rank":
574574
async with aiohttp.ClientSession() as session:
@@ -644,18 +644,17 @@ async def init():
644644
global task
645645
task = threading.Thread(target=record)
646646
task.start()
647-
if config.const.auto_sync_assets:
648-
scheduler.run_repeat_later(
649-
sync_assets,
650-
5,
651-
interval=3600
652-
)
647+
scheduler.run_repeat_later(
648+
fetch_github_repo,
649+
5,
650+
interval=3600
651+
)
653652

654653
async def unload():
655654
global running
656655
running = 0
657656

658-
async def sync_assets():
657+
async def fetch_github_repo():
659658
async def get_dir_list(path: str = "/"):
660659
result = []
661660
if not path.startswith("/"):
@@ -699,15 +698,8 @@ async def get_file(file: GithubPath):
699698
return data
700699
return io.BytesIO()
701700

702-
headers = {
703-
"User-Agent": config.USER_AGENT
704-
}
705-
if config.const.github_token:
706-
headers["Authorization"] = f"Bearer {config.const.github_token}"
707-
async with aiohttp.ClientSession(
708-
GITHUB_BASEURL,
709-
headers=headers
710-
) as session:
701+
702+
async def sync_assets():
711703
res: list[GithubPath] = await get_dir_list("/assets")
712704
if not res:
713705
return
@@ -738,4 +730,32 @@ async def get_file(file: GithubPath):
738730
path.parent.mkdir(exist_ok=True, parents=True)
739731
with open(path, "wb") as f:
740732
f.write(files[file].getvalue())
741-
logger.info(f"Synced {len(files)} files")
733+
logger.info(f"Synced {len(files)} files")
734+
735+
async def check_update():
736+
async with session.get(
737+
f"/repos/{GITHUB_REPO}/releases/latest"
738+
) as resp:
739+
if resp.status == 200:
740+
json_data = await resp.json()
741+
tag_name = json_data["tag_name"][1:]
742+
if tag_name != config.VERSION:
743+
logger.tinfo("dashboard.info.new_version", current=config.VERSION, latest=tag_name)
744+
return
745+
746+
747+
headers = {
748+
"User-Agent": config.USER_AGENT
749+
}
750+
if config.const.github_token:
751+
headers["Authorization"] = f"Bearer {config.const.github_token}"
752+
async with aiohttp.ClientSession(
753+
GITHUB_BASEURL,
754+
headers=headers
755+
) as session:
756+
tasks = [
757+
check_update()
758+
]
759+
if config.const.auto_sync_assets:
760+
tasks.append(sync_assets())
761+
await asyncio.gather(*tasks)

core/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
AsyncIOScheduler as AsyncBackground
99
)
1010
from apscheduler.job import Job
11-
from . import logger
11+
from .logger import logger
1212
from weakref import WeakValueDictionary
1313

1414
from . import units

core/storages/__init__.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ def __init__(self, path: str, weight: int) -> None:
3939
self.path = path
4040
self.weight = weight
4141
self.current_weight = 0
42-
self.cache_files = cache.MemoryStorage()
4342

4443
@property
4544
def unique_id(self) -> str:
@@ -221,7 +220,7 @@ def __init__(self, path: str, weight: int, url: str, username: Optional[str], pa
221220
self.wait_token.acquire()
222221
self.tcp_connector = aiohttp.TCPConnector(limit=256)
223222
self.download_connector = aiohttp.TCPConnector(limit=256)
224-
self.cache = cache.MemoryStorage()
223+
self.cache = cache.TimeoutCache()
225224
self.link_cache_timeout = utils.parse_time(link_cache_expires).to_seconds if link_cache_expires is not None else None
226225
self.link_cache: defaultdict[str, AlistLink] = defaultdict(lambda: AlistLink())
227226
scheduler.run_repeat_later(self._check_token, 0, 3600)
@@ -312,7 +311,7 @@ async def _action_data(self, action: str, url: str, data: Any, headers: dict[str
312311
logger.debug(data)
313312
logger.debug(result)
314313
else:
315-
self.cache.set(hash, result, 30)
314+
self.cache.set(hash, result, 600)
316315
return result
317316
except:
318317
logger.terror("storage.error.alist", status=resp.status, message=await resp.text())
@@ -399,7 +398,7 @@ async def get_files(root_id: int):
399398
if result.code != 200:
400399
logger.tdebug("storage.debug.error_alist", status=result.code, message=result.message)
401400
else:
402-
self.cache.set(f"listfile_{root}", result, 60)
401+
self.cache.set(f"listfile_{root}", result, 600)
403402
except:
404403
logger.traceback()
405404
return []
@@ -506,7 +505,7 @@ def __init__(self, path: str, weight: int, url: str, username: str, password: st
506505
"User-Agent": config.USER_AGENT
507506
})
508507
self.client_lock = asyncio.Lock()
509-
self.cache = cache.MemoryStorage()
508+
self.cache = cache.TimeoutCache()
510509
self.session = aiohttp.ClientSession(
511510
auth=aiohttp.BasicAuth(
512511
self.username,
@@ -549,7 +548,7 @@ async def get_files(root_id: int):
549548
name=r["name"],
550549
size=int(r["size"])
551550
) for r in result if not r["isdir"]]
552-
self.cache.set(f"listfile_{root}", res, 60)
551+
self.cache.set(f"listfile_{root}", res, 600)
553552
except webdav3_exceptions.RemoteResourceNotFound:
554553
return []
555554
except:

core/web.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ async def check_server():
374374
for server in privates.values():
375375
servers.append(CheckServer(server.server.sockets[0].getsockname()[1], start_private_server, server.key))
376376

377-
logger.tdebug("web.debug.check_server", servers=len(servers))
377+
#logger.tdebug("web.debug.check_server", servers=len(servers))
378378
results = await asyncio.gather(*[asyncio.create_task(_check_server(server)) for server in servers])
379379
for server, result in zip(servers, results):
380380
if result:

i18n/zh_cn.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,6 @@
5555
"cluster.error.configuration": "无法获取 [${id}] 下载配置,原因:",
5656
"cluster.warning.measure_storage": "尝试重定向至存储测速失败,已自动切换至本地测速",
5757
"database.error.unable.to.decompress": "无法解压数据库,数据:%{data}",
58-
"storage.error.check_available": "存储 [${type}] [${path}] [${url}] 检查可用性出错,原因:"
58+
"storage.error.check_available": "存储 [${type}] [${path}] [${url}] 检查可用性出错,原因:",
59+
"dashboard.info.new_version": "当前版本 [${current}] 发现新版本 [${latest}]"
5960
}

0 commit comments

Comments
 (0)