Skip to content

Commit 8704c14

Browse files
committed
feat: 支持 AList
1 parent 962ceb5 commit 8704c14

File tree

7 files changed

+243
-45
lines changed

7 files changed

+243
-45
lines changed

core/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ async def syncFiles():
3535
Config.get("advanced.retry"),
3636
Config.get("advanced.delay"),
3737
)
38-
asyncio.create_task(cluster.recycleFiles())
38+
# asyncio.create_task(cluster.recycleFiles())
3939
if not cluster.enabled and cluster.socket:
4040
await cluster.enable()
4141
if cluster.scheduler:

core/cluster.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from core.logger import logger
99
from core.scheduler import *
1010
from core.exceptions import ClusterIdNotSetError, ClusterSecretNotSetError
11-
from core.storages import getStorages, LocalStorage
11+
from core.storages import getStorages, LocalStorage, AListStorage
1212
from core.classes import FileInfo, FileList, AgentConfiguration
1313
from core.router import Router
1414
from core.orm import writeHits
@@ -69,7 +69,7 @@ async def fetchToken(self):
6969
).hexdigest()
7070
response = await session.post(
7171
"/openbmclapi-agent/token",
72-
data={
72+
json={
7373
"clusterId": self.id,
7474
"challenge": challenge,
7575
"signature": signature,
@@ -226,6 +226,7 @@ async def recycleFiles(self) -> None:
226226
async def downloadFile(
227227
self, file: FileInfo, session: aiohttp.ClientSession, pbar: tqdm
228228
) -> None:
229+
# logger.debug(file)
229230
async with self.semaphore:
230231
delay, retry = Config.get("advanced.delay"), Config.get("advanced.retry")
231232

@@ -271,7 +272,7 @@ async def downloadFile(
271272
async def report(self, error: ClientResponseError, session: aiohttp.ClientSession) -> None:
272273
history_urls = [urljoin(self.base_url), *error.history]
273274
try:
274-
async with session.post("/openbmclapi/report", data={"url": history_urls, "error": error.message}) as response:
275+
async with session.post("/openbmclapi/report", json={"url": history_urls, "error": error.message}) as response:
275276
response.raise_for_status()
276277
logger.tdebug("cluster.debug.report", url=history_urls)
277278
except Exception:
@@ -338,12 +339,12 @@ async def callback(data: List[Any]):
338339
"flavor": {
339340
"runtime": f"python/{sys.version.split()[0]} python-openbmclapi/{VERSION}",
340341
"storage": "+".join(
341-
[
342-
"file"
343-
for storage in self.storages
344-
if isinstance(storage, LocalStorage)
345-
]
346-
),
342+
[
343+
"file" if isinstance(storage, LocalStorage) else
344+
"webdav" if isinstance(storage, AListStorage) else
345+
""
346+
for storage in self.storages
347+
])
347348
},
348349
},
349350
callback=callback,

core/router.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from core.config import Config
33
from core.utils import checkSign
44
from core.api import getStatus
5+
from core.storages import AListStorage
56
from typing import Union
67
from aiohttp import web
78
import aiohttp
@@ -49,6 +50,12 @@ async def _(request: web.Request) -> web.StreamResponse:
4950
):
5051
return web.Response(status=403 if size > 200 else 400)
5152

53+
response = None
54+
for storage in self.storages:
55+
if isinstance(storage, AListStorage):
56+
await storage.measure(size, request, response)
57+
return response
58+
5259
buffer = b"\x00\x66\xcc\xff" * 256 * 1024
5360
response = web.StreamResponse(
5461
status=200,

core/storages/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,12 @@ def getStorages() -> List[Storage]:
1212
if storage["type"] == "local":
1313
storages.append(LocalStorage(path=storage["path"]))
1414
if storage["type"] == "alist":
15-
storages.append(AListStorage())
15+
storages.append(
16+
AListStorage(
17+
username=storage["username"],
18+
password=storage["password"],
19+
url=storage["url"],
20+
path=storage["path"],
21+
)
22+
)
1623
return storages

core/storages/alist.py

Lines changed: 172 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,175 @@
1-
from dataclasses import dataclass
1+
from core.classes import Storage, FileInfo, FileList
2+
from core.scheduler import scheduler, IntervalTrigger
3+
from core.logger import logger
4+
from core.i18n import locale
5+
from typing import List, Set, Tuple, Dict, Any
6+
from tqdm import tqdm
7+
from aiohttp import web
8+
import aiohttp
9+
import secrets
10+
import io
11+
import asyncio
12+
import humanize
213

314

4-
class AListStorage():
5-
def __init__(self) -> None:
15+
class AListStorage(Storage):
16+
def __init__(self, username: str, password: str, url: str, path: str) -> None:
17+
self.username = username
18+
self.password = password
19+
self.url = url
20+
self.path = path
21+
self.token = ""
22+
self.scheduler = None
23+
self.headers = {}
24+
25+
async def init(self) -> None:
26+
async def fetchToken() -> None:
27+
logger.tinfo("storage.info.alist.fetch_token")
28+
async with aiohttp.ClientSession(self.url) as session:
29+
try:
30+
async with session.post("/api/auth/login", json={"username": self.username, "password": self.password}) as response:
31+
response.raise_for_status()
32+
data = await response.json()
33+
if data["code"] != 200:
34+
raise aiohttp.ClientResponseError(status=data["code"], request_info=response.request_info, history=response.history)
35+
self.token = data["data"]["token"]
36+
self.headers = {"Authorization": self.token}
37+
logger.tsuccess("storage.success.alist.fetch_token")
38+
except Exception as e:
39+
logger.terror("storage.error.alist.fetch_token", e=e)
40+
if not self.scheduler:
41+
self.scheduler = scheduler.add_job(fetchToken, IntervalTrigger(days=2))
42+
43+
await fetchToken()
44+
45+
async def check(self) -> None:
46+
file_name = secrets.token_hex(8)
47+
file_path = self.path + file_name
48+
try:
49+
async with aiohttp.ClientSession(self.url, headers={**self.headers, "File-Path": file_path, "Content-Type": "application/octet-stream"}) as session:
50+
response = await session.put("/api/fs/put", data=b"")
51+
response.raise_for_status()
52+
data = await response.json()
53+
if data["code"] != 200:
54+
raise aiohttp.ClientResponseError(status=data["code"], request_info=response.request_info, history=response.history)
55+
async with aiohttp.ClientSession(self.url, headers=self.headers) as session:
56+
response = await session.post("/api/fs/remove", json={"names": [file_name], "dir": self.path})
57+
response.raise_for_status()
58+
data = await response.json()
59+
if data["code"] != 200:
60+
raise aiohttp.ClientResponseError(status=data["code"], request_info=response.request_info, history=response.history)
61+
logger.tsuccess("storage.success.alist.check")
62+
except Exception as e:
63+
logger.terror("storage.error.alist.check", e=e)
64+
65+
async def getMissingFiles(self, files: FileList, pbar: tqdm) -> FileList:
66+
existing_files: List[FileInfo] = []
67+
async with aiohttp.ClientSession(self.url, headers=self.headers) as session:
68+
async def getFileList(dir: str, pbar: tqdm) -> List[FileInfo]:
69+
file_path = self.path + dir
70+
response = await session.post("/api/fs/list", headers=self.headers, json={"path": file_path})
71+
response.raise_for_status()
72+
data = await response.json()
73+
pbar.update(1)
74+
if data["code"] != 200:
75+
return []
76+
return [FileInfo(size=content["size"], hash=content["name"], path="", mtime=-1) for content in data["data"]["content"] if not content["is_dir"]]
77+
78+
with tqdm(desc=locale.t("storage.tqdm.alist.get_filelist"), total=256) as pbar:
79+
for i in range(256):
80+
dir = f"/{i:02x}"
81+
existing_files += await getFileList(dir, pbar)
82+
83+
existing_info: Set[Tuple[str, int]] = {(file.hash, file.size) for file in existing_files}
84+
missing_files = [file for file in files.files if (file.hash, file.size) not in existing_info]
85+
86+
return FileList(files=missing_files)
87+
88+
async def measure(self, size: int, request: web.Request, response):
89+
file_path = f"{self.path}/.{size}"
90+
try:
91+
async with aiohttp.ClientSession(self.url, headers=self.headers) as session:
92+
response = await session.post("/api/fs/get", json={"path": file_path, "password": self.password})
93+
response.raise_for_status()
94+
data = await response.json()
95+
96+
if data["code"] == 200:
97+
response = web.HTTPFound(data["raw_url"])
98+
response.prepare(request)
99+
return
100+
101+
if data["code"] != 200:
102+
try:
103+
buffer = b"\x00\x66\xcc\xff" * 256 * 1024
104+
response = await session.put("/api/fs/put", data=buffer, headers={**self.headers, "File-Path": file_path, "Content-Type": "application/octet-stream"})
105+
response.raise_for_status()
106+
data = await response.json()
107+
if data["code"] != 200:
108+
raise aiohttp.ClientResponseError(status=500, request_info=response.request_info, history=response.history)
109+
except Exception as e:
110+
logger.terror("storage.error.alist.upload", e=e)
111+
raise
112+
113+
response = await session.post("/api/fs/get", json={"path": file_path, "password": self.password})
114+
response.raise_for_status()
115+
data = await response.json()
116+
117+
response = web.HTTPFound(data["raw_url"])
118+
response.prepare(request)
119+
return
120+
except Exception as e:
121+
logger.terror("storage.error.alist.measure", e=e)
122+
return
123+
124+
async def express(self, hash: str, request: web.Request, response) -> Dict[str, Any]:
125+
path = f"{self.path}/{hash[:2]}/{hash}"
126+
async with aiohttp.ClientSession(self.url, headers=self.headers) as session:
127+
res = await session.post("/api/fs/get", json={"path": path, "password": self.password})
128+
data = await res.json()
129+
if data["code"] != 200:
130+
response = web.HTTPNotFound()
131+
await response.prepare(request)
132+
return {"bytes": 0, "hits": 0}
133+
try:
134+
response = web.HTTPFound(data["raw_url"])
135+
response.headers["x-bmclapi-hash"] = hash
136+
await response.prepare(request)
137+
return {"bytes": data["size"], "hits": 1}
138+
except Exception as e:
139+
response = web.HTTPInternalServerError(reason=e)
140+
await response.prepare(request)
141+
logger.debug(e)
142+
return {"bytes": 0, "hits": 0}
143+
144+
async def writeFile(self, file: FileInfo, content: io.BytesIO, delay: int, retry: int) -> bool:
145+
file_path = f"{self.path}/{file.hash[:2]}/{file.hash}"
146+
async def getSize() -> int:
147+
async with aiohttp.ClientSession(self.url, headers=self.headers) as session:
148+
response = await session.post("/api/fs/get", json={"path": file_path, "password": self.password})
149+
response.raise_for_status()
150+
data = await response.json()
151+
if data["code"] != 200:
152+
raise aiohttp.ClientResponseError(status=data["code"], request_info=response.request_info, history=response.history)
153+
return data["data"]["size"]
154+
155+
for _ in range(retry):
156+
try:
157+
async with aiohttp.ClientSession(self.url, headers={**self.headers, "File-Path": file_path, "Content-Type": "application/octet-stream"}) as session:
158+
response = await session.put("/api/fs/put", data=content.getvalue())
159+
response.raise_for_status()
160+
data = await response.json()
161+
if data["code"] != 200:
162+
raise aiohttp.ClientResponseError(status=data["code"], request_info=response.request_info, history=response.history)
163+
size = await getSize()
164+
if size == file.size:
165+
return True
166+
else:
167+
logger.terror("storage.error.alist.write_file.size_mismatch", file=file.hash, file_size=humanize.naturalsize(file.size, binary=True), actual_file_size=humanize.naturalsize(size, binary=True))
168+
return False
169+
except Exception as e:
170+
logger.terror("storage.error.alist.write_file.retry", file=file.hash, e=e, retry=delay)
171+
await asyncio.sleep(delay)
172+
return False
173+
174+
async def recycleFiles(files):
6175
pass

core/storages/local.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import humanize
1414

1515

16-
1716
class LocalStorage(Storage):
1817
def __init__(self, path: str) -> None:
1918
self.path = path
@@ -22,20 +21,23 @@ async def init(self) -> None:
2221
os.makedirs(self.path, exist_ok=True)
2322

2423
async def check(self) -> None:
25-
logger.tinfo("storage.info.check")
24+
logger.tinfo("storage.info.local.check")
2625
try:
2726
with tempfile.NamedTemporaryFile(dir=self.path, delete=True) as temp_file:
2827
temp_file.write(b"")
29-
logger.tsuccess("storage.success.check")
28+
logger.tsuccess("storage.success.local.check")
3029
except Exception as e:
31-
raise Exception(locale.t("storage.error.check", e=e))
30+
raise Exception(locale.t("storage.error.local.check", e=e))
3231

3332
async def writeFile(
3433
self, file: FileInfo, content: io.BytesIO, delay: int, retry: int
3534
) -> bool:
3635
file_path = os.path.join(self.path, file.hash[:2], file.hash)
3736
os.makedirs(os.path.dirname(file_path), exist_ok=True)
38-
37+
if Path(file_path).exists and Path(file_path).stat().st_size == len(
38+
content.getvalue()
39+
):
40+
return True
3941
for _ in range(retry):
4042
try:
4143
async with aiofiles.open(file_path, "wb") as f:
@@ -46,23 +48,27 @@ async def writeFile(
4648
return True
4749
else:
4850
logger.terror(
49-
"storage.error.write_file.size_mismatch", file=file.hash,
51+
"storage.error.local.write_file.size_mismatch",
52+
file=file.hash,
5053
file_size=humanize.naturalsize(file.size, binary=True),
51-
actual_file_size=humanize.naturalsize(size, binary=True)
54+
actual_file_size=humanize.naturalsize(size, binary=True),
5255
)
5356
return False
5457
except Exception as e:
5558
logger.terror(
56-
"storage.error.write_file.retry", file=file.hash, e=e, retry=delay
59+
"storage.error.local.write_file.retry",
60+
file=file.hash,
61+
e=e,
62+
retry=delay,
5763
)
5864

5965
await asyncio.sleep(delay)
6066

61-
logger.terror("storage.error.write_file.failed", file=file.hash)
67+
logger.terror("storage.error.local.write_file.failed", file=file.hash)
6268
return False
6369

6470
async def getMissingFiles(self, files: FileList, pbar: tqdm) -> FileList:
65-
async def check_file(file: FileInfo, pbar: tqdm) -> bool:
71+
async def checkFile(file: FileInfo, pbar: tqdm) -> bool:
6672
pbar.update(1)
6773
file_path = os.path.join(self.path, file.hash[:2], file.hash)
6874
try:
@@ -71,9 +77,7 @@ async def check_file(file: FileInfo, pbar: tqdm) -> bool:
7177
except FileNotFoundError:
7278
return True
7379

74-
results = await asyncio.gather(
75-
*[check_file(file, pbar) for file in files.files]
76-
)
80+
results = await asyncio.gather(*[checkFile(file, pbar) for file in files.files])
7781
missing_files = [
7882
file for file, is_missing in zip(files.files, results) if is_missing
7983
]
@@ -84,8 +88,8 @@ async def express(
8488
) -> Dict[str, Any]:
8589
path = os.path.join(self.path, hash[:2], hash)
8690
if not os.path.exists(path):
87-
response = web.Response()
88-
response.set_status(404, "File not found")
91+
response = web.HTTPNotFound()
92+
await response.prepare(request)
8993
return {"bytes": 0, "hits": 0}
9094
try:
9195
file_size = os.path.getsize(path)
@@ -117,9 +121,9 @@ async def recycleFiles(self, files: FileList) -> None:
117121
delete_files.append(file)
118122

119123
if len(delete_files) == 0:
120-
logger.tinfo("storage.success.no_need_to_recycle")
124+
logger.tinfo("storage.success.local.no_need_to_recycle")
121125
return
122-
126+
123127
with tqdm(
124128
desc=locale.t("storage.tqdm.desc.recycling"),
125129
total=len(delete_files),
@@ -133,8 +137,9 @@ async def recycleFiles(self, files: FileList) -> None:
133137
try:
134138
file.unlink()
135139
except Exception as e:
136-
logger.terror("storage.error.recycle", e=e)
137-
138-
logger.tsuccess("storage.success.recycled", size=humanize.naturalsize(size, binary=True))
140+
logger.terror("storage.error.local.recycle", e=e)
139141

140-
142+
logger.tsuccess(
143+
"storage.success.local.recycled",
144+
size=humanize.naturalsize(size, binary=True),
145+
)

0 commit comments

Comments
 (0)