Skip to content

Commit 7966d1e

Browse files
committed
fix: 修复 alist 检查文件时候的速度问题
1 parent 03e1a24 commit 7966d1e

File tree

3 files changed

+188
-33
lines changed

3 files changed

+188
-33
lines changed

assets/index.html

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<!DOCTYPE html>
2+
<html>
3+
<head>
4+
<meta charset="utf-8">
5+
<title>Loading... | Python OpenBMCLAPI Dashboard</title>
6+
<style>
7+
* {
8+
box-sizing: border-box;
9+
}
10+
html, body {
11+
height: 100%;
12+
width: 100%;
13+
margin: 0;
14+
padding: 0;
15+
}
16+
.preloader {
17+
background-color: black;
18+
height: 100%;
19+
width: 100%;
20+
position: fixed;
21+
top: 0;
22+
left: 0;
23+
z-index: 9999;
24+
margin: auto;
25+
overflow: hidden;
26+
}
27+
.preloader .loader {
28+
display: block;
29+
position: relative;
30+
left: 50%;
31+
top: 50%;
32+
width: 150px;
33+
height: 150px;
34+
margin: -75px 0 0 -75px;
35+
border-radius: 50%;
36+
box-shadow: 0 3px 3px 0 rgba(255, 56, 106, 1);
37+
transform: translate3d(0, 0, 0);
38+
animation: spin 2s linear infinite;
39+
}
40+
.preloader .loader:before, .preloader .loader:after {
41+
content: '';
42+
position: absolute;
43+
border-radius: 50%;
44+
}
45+
.preloader .loader:before {
46+
top: 5px;
47+
left: 5px;
48+
right: 5px;
49+
bottom: 5px;
50+
box-shadow: 0 3px 3px 0 rgb(255, 228, 32);
51+
-webkit-animation: spin 3s linear infinite;
52+
animation: spin 3s linear infinite;
53+
}
54+
.preloader .loader:after {
55+
top: 15px;
56+
left: 15px;
57+
right: 15px;
58+
bottom: 15px;
59+
box-shadow: 0 3px 3px 0 rgba(61, 175, 255, 1);
60+
animation: spin 1.5s linear infinite;
61+
}
62+
@-webkit-keyframes spin{0%{transform:rotate(0)}100%{transform:rotate(360deg)}}@-moz-keyframes spin{0%{-moz-transform:rotate(0)}100%{-moz-transform:rotate(360deg)}}@keyframes spin{0%{transform:rotate(0)}100%{transform:rotate(360deg)}}
63+
</style>
64+
</head>
65+
<body>
66+
<div class="preloader">
67+
<div class="loader"></div>
68+
</div>
69+
</body>
70+
</head>
71+
</html>

core/cluster.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(self, clusters: 'ClusterManager'):
4444
self.check_available.acquire()
4545

4646
self.check_type_file = "exists+size"
47+
self.cache_filelist: defaultdict[storages.iStorage, defaultdict[str, storages.File]] = defaultdict(defaultdict)
4748

4849
def init(self):
4950
scheduler.run_repeat(self._check_available, 120)
@@ -74,12 +75,18 @@ async def write_file(self, file: File, content: bytes):
7475
return all(await asyncio.gather(*(asyncio.create_task(storage.write_file(convert_file_to_storage_file(file), content, file.mtime)) for storage in self.available_storages)))
7576

7677
async def get_missing_files(self, files: set[File]) -> set[File | Any]:
77-
async def _(file: File, storage: storages.iStorage):
78-
return True
79-
function = _
78+
function = None
8079
if function is None:
8180
logger.twarning("cluster.warning.no_check_function")
8281
function = self._check_exists
82+
83+
with tqdm(
84+
total=len(self.available_storages) * 256,
85+
desc="List Files",
86+
unit="dir",
87+
unit_scale=True
88+
) as pbar:
89+
await asyncio.gather(*(self.get_storage_filelist(storage, pbar) for storage in self.available_storages))
8390
with tqdm(
8491
total=len(files) * len(self.available_storages),
8592
desc="Checking files",
@@ -95,6 +102,13 @@ async def _(file: File, storage: storages.iStorage):
95102
await asyncio.gather(*(self._get_missing_file_storage(function, missing_files, waiting_files, storage, pbar) for storage in self.available_storages))
96103
return missing_files or set()
97104

105+
async def get_storage_filelist(self, storage: storages.iStorage, pbar: tqdm):
106+
result = await storage.list_files(pbar)
107+
for files in result.values():
108+
for file in files:
109+
self.cache_filelist[storage][file.hash] = file
110+
return result
111+
98112
async def _get_missing_file_storage(self, function: Callable[..., Coroutine[Any, Any, bool]], missing_files: set[File], files: asyncio.Queue[File], storage: storages.iStorage, pbar: tqdm):
99113
while not files.empty():
100114
file = await files.get()
@@ -106,8 +120,14 @@ async def _get_missing_file_storage(self, function: Callable[..., Coroutine[Any,
106120
await asyncio.sleep(0)
107121

108122
async def _check_exists(self, file: File, storage: storages.iStorage):
123+
if storage in self.cache_filelist:
124+
file_hash = file.hash
125+
return file_hash in self.cache_filelist[storage]
109126
return await storage.exists(file.hash)
110127
async def _check_size(self, file: File, storage: storages.iStorage):
128+
if storage in self.cache_filelist:
129+
file_hash = file.hash
130+
return file_hash in self.cache_filelist[storage] and self.cache_filelist[storage][file_hash].size == file.size
111131
return await self._check_exists(file, storage) and await storage.get_size(file.hash) == file.size
112132
async def _check_hash(self, file: File, storage: storages.iStorage):
113133
return await self._check_exists(file, storage) and utils.equals_hash(file.hash, await storage.read_file(file.hash))
@@ -428,8 +448,8 @@ async def start(self):
428448

429449
# start job
430450

431-
for cluster in self.clusters:
432-
await cluster.enable()
451+
#for cluster in self.clusters:
452+
# await cluster.enable()
433453

434454
def get_cluster_by_id(self, id: Optional[str] = None) -> Optional['Cluster']:
435455
return self.cluster_id_tables.get(id or "", None)
@@ -533,9 +553,6 @@ async def _fetch_token(self):
533553
scheduler.cancel(self.token_scheduler)
534554
self.token_scheduler = scheduler.run_later(self.get_token, delay=self.token.ttl - 300)
535555

536-
async def start_job(self):
537-
await self.socket_io.connect()
538-
539556
async def request_cert(self):
540557
ssl_dir = Path(config.const.ssl_dir)
541558
if not ssl_dir.exists():
@@ -592,7 +609,7 @@ async def keepalive(self):
592609
**asdict(commit_counter)
593610
}
594611
)
595-
if result.err:
612+
if result.err or not result.ack:
596613
logger.twarning("cluster.warning.kicked_by_remote")
597614
await self.disable()
598615
return

core/storages/__init__.py

Lines changed: 91 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import abc
2-
from collections import deque
2+
import asyncio
3+
from collections import defaultdict, deque
34
from dataclasses import dataclass
45
import hashlib
56
import os
67
from pathlib import Path
78
import time
89
from typing import Any, Optional
10+
11+
from tqdm import tqdm
912
from .. import scheduler, utils, cache
1013
from ..logger import logger
1114
import urllib.parse as urlparse
@@ -32,6 +35,7 @@ def __init__(self, path: str, width: int) -> None:
3235
self.width = width
3336
self.unique_id = hashlib.md5(f"{self.type},{self.path}".encode("utf-8")).hexdigest()
3437
self.current_width = 0
38+
self.cache_files = cache.MemoryStorage()
3539

3640
def __repr__(self) -> str:
3741
return f"{self.type}({self.path})"
@@ -46,7 +50,7 @@ async def read_file(self, file_hash: str) -> bytes:
4650
async def delete_file(self, file_hash: str) -> bool:
4751
raise NotImplementedError("Not implemented")
4852
@abc.abstractmethod
49-
async def list_files(self) -> list:
53+
async def list_files(self, pbar: Optional[tqdm] = None) -> defaultdict[str, deque[File]]:
5054
raise NotImplementedError("Not implemented")
5155
@abc.abstractmethod
5256
async def exists(self, file_hash: str) -> bool:
@@ -95,17 +99,33 @@ async def delete_file(self, file_hash: str) -> bool:
9599
os.remove(f"{self.path}/{file_hash[:2]}/{file_hash}")
96100
return True
97101

98-
async def list_files(self) -> list:
99-
files = []
100-
for root, dirs, filenames in os.walk(self.path):
101-
for filename in filenames:
102-
file = File(
103-
name=filename,
104-
size=os.path.getsize(os.path.join(root, filename)),
105-
mtime=os.path.getmtime(os.path.join(root, filename)),
106-
hash=filename
107-
)
108-
files.append(file)
102+
async def list_files(self, pbar: Optional[tqdm] = None) -> dict[str, deque[File]]:
103+
def update():
104+
pbar.update(1) # type: ignore
105+
def empty():
106+
...
107+
update_tqdm = empty
108+
if pbar is not None:
109+
update_tqdm = update
110+
111+
files: defaultdict[str, deque[File]] = defaultdict(deque)
112+
for root_id in range(0, 256):
113+
root = f"{self.path}/{root_id:02x}"
114+
if not os.path.exists(root):
115+
update_tqdm()
116+
continue
117+
for file in os.listdir(root):
118+
path = os.path.join(root, file)
119+
if not os.path.isfile(path):
120+
continue
121+
files[root].append(File(
122+
file,
123+
os.path.getsize(path),
124+
os.path.getmtime(path),
125+
file
126+
))
127+
await asyncio.sleep(0)
128+
update_tqdm()
109129
return files
110130

111131
async def get_size(self, file_hash: str) -> int:
@@ -197,7 +217,7 @@ def __str__(self) -> str:
197217
def __repr__(self) -> str:
198218
return f"AlistStorage({self.path})"
199219

200-
async def _action_data(self, action: str, url: str, data: Any, headers: dict[str, str] = {}) -> AlistResult:
220+
async def _action_data(self, action: str, url: str, data: Any, headers: dict[str, str] = {}, session: Optional[aiohttp.ClientSession] = None) -> AlistResult:
201221
hash = hashlib.sha256(f"{action},{url},{data},{headers}".encode()).hexdigest()
202222
if hash in self.cache:
203223
return self.cache.get(hash)
@@ -214,14 +234,18 @@ async def _action_data(self, action: str, url: str, data: Any, headers: dict[str
214234
action, url,
215235
data=data,
216236
) as resp:
217-
result = AlistResult(
218-
**await resp.json()
219-
)
220-
if result.code != 200:
221-
logger.terror("storage.error.alist", status=result.code, message=result.message)
222-
else:
223-
self.cache.set(hash, result, 30)
224-
return result
237+
try:
238+
result = AlistResult(
239+
**await resp.json()
240+
)
241+
if result.code != 200:
242+
logger.terror("storage.error.alist", status=result.code, message=result.message)
243+
else:
244+
self.cache.set(hash, result, 30)
245+
return result
246+
except:
247+
logger.terror("storage.error.alist", status=resp.status, message=await resp.text())
248+
raise
225249

226250
async def __info_file(self, file_hash: str) -> AlistFileInfo:
227251
r = await self._action_data(
@@ -272,8 +296,51 @@ async def get_mtime(self, file_hash: str) -> float:
272296
async def get_size(self, file_hash: str) -> int:
273297
info = await self.__info_file(file_hash)
274298
return max(0, info.size)
275-
async def list_files(self) -> list:
276-
return []
299+
async def list_files(self, pbar: Optional[tqdm] = None) -> defaultdict[str, deque[File]]:
300+
def update():
301+
pbar.update(1) # type: ignore
302+
def empty():
303+
...
304+
update_tqdm = empty
305+
if pbar is not None:
306+
update_tqdm = update
307+
308+
files: defaultdict[str, deque[File]] = defaultdict(deque)
309+
async with aiohttp.ClientSession(
310+
self.url,
311+
headers={
312+
"Authorization": await self._get_token()
313+
}
314+
) as session:
315+
for root_id in range(0, 256):
316+
root = f"{self.path}/{root_id:02x}"
317+
if f"listfile_{root}" in self.cache:
318+
result = self.cache.get(f"listfile_{root}")
319+
else:
320+
async with session.post(
321+
"/api/fs/list",
322+
data={
323+
"path": root
324+
},
325+
) as resp:
326+
result = AlistResult(
327+
**await resp.json()
328+
)
329+
if result.code != 200:
330+
logger.terror("storage.error.alist", status=result.code, message=result.message)
331+
else:
332+
self.cache.set(f"listfile_{root}", result, 30)
333+
for r in result.data["content"]:
334+
file = File(
335+
r["name"],
336+
r["size"],
337+
utils.parse_isotime_to_timestamp(r["modified"]),
338+
r["name"]
339+
)
340+
files[f"{root_id:02x}"].append(file)
341+
update_tqdm()
342+
343+
return files
277344
async def read_file(self, file_hash: str) -> bytes:
278345
info = await self.__info_file(file_hash)
279346
if info.size == -1:

0 commit comments

Comments
 (0)