Skip to content

Commit 9331fef

Browse files
committed
refactor: 重构部分代码,退出操作仍需处理
1 parent f13df1b commit 9331fef

File tree

8 files changed

+130
-101
lines changed

8 files changed

+130
-101
lines changed

core/__init__.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from core.cluster import Cluster
22
from core.config import Config
33
import asyncio
4+
import signal
45

6+
cluster = Cluster()
57

6-
async def init():
7-
cluster = Cluster()
8+
async def main():
89
await cluster.token.fetchToken()
910
await cluster.getConfiguration()
1011
await cluster.fetchFileList()
@@ -18,10 +19,11 @@ async def init():
1819
protocol = "http" if Config.get("cluster.byoc") else "https"
1920
if protocol == "https":
2021
await cluster.socket.requestCertificate()
21-
await cluster.setupRouter(protocol == "https", port=Config.get("cluster.port"))
22+
await cluster.setupRouter()
23+
await cluster.listen(protocol == "https", Config.get("cluster.port"))
2224
await cluster.enable()
23-
try:
24-
while True:
25-
await asyncio.sleep(1000)
26-
except KeyboardInterrupt:
27-
raise KeyboardInterrupt
25+
while True:
26+
await asyncio.sleep(3600)
27+
28+
def init():
29+
asyncio.run(main())

core/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def _() -> None:
4545

4646
@self.socket.on("reconnect_error")
4747
async def _(error: str) -> None:
48-
logger.terror("client.error.reconnect", error=error)
48+
logger.terror("client.error.reconnect", e=error)
4949

5050
@self.socket.on("reconnect_failed")
5151
async def _() -> None:

core/cluster.py

Lines changed: 82 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from core.logger import logger
33
from core.scheduler import *
44
from core.exceptions import ClusterIdNotSetError, ClusterSecretNotSetError
5-
from core.storages import getStorages
5+
from core.storages import getStorages, LocalStorage
66
from core.classes import FileInfo, FileList, AgentConfiguration
77
from core.router import Router
88
from core.client import WebSocketClient
@@ -50,8 +50,9 @@ async def fetchToken(self):
5050
) as response:
5151
response.raise_for_status()
5252
challenge = (await response.json())["challenge"]
53+
5354
signature = hmac.new(
54-
self.secret.encode(), challenge.encode(), digestmod=hashlib.sha256
55+
self.secret.encode(), challenge.encode(), hashlib.sha256
5556
)
5657
async with session.post(
5758
"/openbmclapi-agent/token",
@@ -66,7 +67,7 @@ async def fetchToken(self):
6667
self.token = res["token"]
6768
self.ttl = res["ttl"] / 3600000
6869
logger.tsuccess("token.success.fetched", ttl=int(self.ttl))
69-
if self.scheduler == None:
70+
if not self.scheduler:
7071
self.scheduler = Scheduler.add_job(
7172
self.fetchToken, IntervalTrigger(hours=self.ttl)
7273
)
@@ -91,6 +92,7 @@ def __init__(self) -> None:
9192
self.server = None
9293
self.failed_filelist = FileList(files=[])
9394
self.enabled = False
95+
self.site = None
9496

9597
async def fetchFileList(self) -> None:
9698
logger.tinfo("cluster.info.filelist.fetching")
@@ -110,15 +112,15 @@ async def fetchFileList(self) -> None:
110112
io.BytesIO(await response.read())
111113
)
112114
decompressed_data = io.BytesIO(decompressor.read())
113-
for _ in range(self.readLong(decompressed_data)):
114-
self.filelist.files.append(
115-
FileInfo(
116-
self.readString(decompressed_data),
117-
self.readString(decompressed_data),
118-
self.readLong(decompressed_data),
119-
self.readLong(decompressed_data),
120-
)
115+
self.filelist.files = [
116+
FileInfo(
117+
self.readString(decompressed_data),
118+
self.readString(decompressed_data),
119+
self.readLong(decompressed_data),
120+
self.readLong(decompressed_data),
121121
)
122+
for _ in range(self.readLong(decompressed_data))
123+
]
122124
size = sum(file.size for file in self.filelist.files)
123125
logger.tsuccess(
124126
"cluster.success.filelist.parsed",
@@ -135,9 +137,8 @@ async def getConfiguration(self) -> None:
135137
},
136138
) as session:
137139
async with session.get("/openbmclapi/configuration") as response:
138-
self.configuration = AgentConfiguration(
139-
**(await response.json())["sync"]
140-
)
140+
config_data = (await response.json())["sync"]
141+
self.configuration = AgentConfiguration(**config_data)
141142
self.semaphore = asyncio.Semaphore(self.configuration.concurrency)
142143
logger.tdebug("configuration.debug.get", sync=self.configuration)
143144

@@ -148,28 +149,22 @@ async def getMissingFiles(self) -> FileList:
148149
unit=locale.t("cluster.tqdm.unit.files"),
149150
unit_scale=True,
150151
) as pbar:
151-
try:
152-
files = set()
153-
missing_files = [
154-
file
155-
for storage in self.storages
156-
for file in (
157-
await storage.getMissingFiles(self.filelist, pbar)
158-
).files
159-
if file.hash not in files and not files.add(file.hash)
160-
]
161-
missing_filelist = FileList(files=missing_files)
162-
missing_files_count = len(missing_filelist.files)
163-
missing_files_size = sum(file.size for file in missing_filelist.files)
164-
logger.tsuccess(
165-
"storage.success.get_missing",
166-
count=humanize.intcomma(missing_files_count),
167-
size=humanize.naturalsize(missing_files_size, binary=True),
168-
)
169-
return missing_filelist
170-
except Exception as e:
171-
logger.terror("storage.error.get_missing", e=e)
172-
return None
152+
files = set()
153+
missing_files = [
154+
file
155+
for storage in self.storages
156+
for file in (await storage.getMissingFiles(self.filelist, pbar)).files
157+
if file.hash not in files and not files.add(file.hash)
158+
]
159+
missing_filelist = FileList(files=missing_files)
160+
missing_files_count = len(missing_filelist.files)
161+
missing_files_size = sum(file.size for file in missing_filelist.files)
162+
logger.tsuccess(
163+
"storage.success.get_missing",
164+
count=humanize.intcomma(missing_files_count),
165+
size=humanize.naturalsize(missing_files_size, binary=True),
166+
)
167+
return missing_filelist
173168

174169
async def syncFiles(
175170
self, missing_filelist: FileList, retry: int, delay: int
@@ -189,9 +184,7 @@ async def syncFiles(
189184
) as pbar:
190185
async with aiohttp.ClientSession(
191186
self.base_url,
192-
headers={
193-
"User-Agent": self.user_agent,
194-
},
187+
headers={"User-Agent": self.user_agent},
195188
) as session:
196189
self.failed_filelist = FileList(files=[])
197190
tasks = [
@@ -244,12 +237,11 @@ async def downloadFile(
244237

245238
async def setupRouter(self) -> None:
246239
logger.tinfo("cluster.info.router.creating")
247-
self.application = web.Application()
248240
try:
241+
self.application = web.Application()
249242
self.router = Router(self.application, self.storages)
250243
self.router.init()
251244
logger.tsuccess("cluster.success.router.created")
252-
253245
except Exception as e:
254246
logger.terror("cluster.error.router.exception", e=e)
255247

@@ -264,12 +256,11 @@ async def listen(self, https: bool, port: int) -> None:
264256
)
265257
self.server = web.AppRunner(self.application)
266258
await self.server.setup()
267-
site = web.TCPSite(self.server, "0.0.0.0", port)
268-
await site.start()
259+
self.site = web.TCPSite(self.server, "0.0.0.0", port, ssl_context=ssl_context)
260+
await self.site.start()
269261
logger.tsuccess("cluster.success.listen", port=port)
270-
271262
except Exception as e:
272-
logger.terror('cluster.error.listen')
263+
logger.terror("cluster.error.listen", e=e)
273264

274265
async def enable(self) -> None:
275266
if self.enabled:
@@ -278,8 +269,7 @@ async def enable(self) -> None:
278269
future = asyncio.Future()
279270

280271
async def callback(data: List[Any]):
281-
error, ack = data
282-
future.set_result((error, ack))
272+
future.set_result(data)
283273

284274
if not self.socket:
285275
logger.terror("cluster.error.disconnected")
@@ -295,24 +285,54 @@ async def callback(data: List[Any]):
295285
"byoc": Config.get("cluster.byoc"),
296286
"noFastEnable": True,
297287
"flavor": {
298-
"runtime": f"python/{sys.version.split()[0]} python-openbmclapi/{VERSION}"
288+
"runtime": f"python/{sys.version.split()[0]} python-openbmclapi/{VERSION}",
289+
"storage": '+'.join(['file' for storage in self.storages if isinstance(storage, LocalStorage)])
299290
},
300291
},
301-
callback=callback,
292+
callback=callback
302293
)
303294

304-
error, ack = await future
295+
response = await future
296+
error, ack = (response + [None, None])[:2] if isinstance(response, list) else (None, None)
305297

306-
if error:
307-
if isinstance(error, dict) and 'message' in error:
308-
logger.terror('cluster.error.enable.error', e=error['message'])
298+
if error and isinstance(error, dict) and 'message' in error:
299+
logger.terror("cluster.error.enable.error", e=error['message'])
309300

310301
if ack is not True:
311-
logger.terror('cluster.error.enable.failed')
302+
logger.terror("cluster.error.enable.failed")
303+
return
304+
305+
self.enabled = True
312306

313307
except Exception as e:
314-
logger.terror('cluster.error.enable.exception', e=e)
308+
logger.terror("cluster.error.enable.exception", e=e)
309+
310+
async def disable(self) -> None:
311+
if not self.socket:
312+
return
313+
logger.tinfo("cluster.info.disabling")
314+
future = asyncio.Future()
315+
316+
async def callback(data: List[Any]):
317+
future.set_result(data)
315318

319+
try:
320+
await self.socket.socket.emit(
321+
"disable",
322+
callback=callback
323+
)
324+
325+
response = await future
326+
error, ack = (response + [None, None])[:2] if isinstance(response, list) else (None, None)
327+
328+
if error and isinstance(error, dict) and 'message' in error:
329+
logger.terror("cluster.error.disable.error", e=error['message'])
330+
331+
if ack is not True:
332+
logger.terror("cluster.error.disable.failed")
333+
334+
except Exception as e:
335+
logger.terror("cluster.error.enable.exception", e=e)
316336
async def connect(self) -> None:
317337
self.socket = WebSocketClient(self.token.token)
318338
await self.socket.connect()
@@ -325,14 +345,14 @@ async def checkStorages(self) -> bool:
325345
return all(results)
326346

327347
def readLong(self, stream: io.BytesIO):
328-
b = ord(stream.read(1))
329-
n = b & 0x7F
330-
shift = 7
331-
while (b & 0x80) != 0:
332-
b = ord(stream.read(1))
333-
n |= (b & 0x7F) << shift
348+
result, shift = 0, 0
349+
while True:
350+
byte = ord(stream.read(1))
351+
result |= (byte & 0x7F) << shift
352+
if not (byte & 0x80):
353+
break
334354
shift += 7
335-
return (n >> 1) ^ -(n & 1)
355+
return (result >> 1) ^ -(result & 1)
336356

337357
def readString(self, stream: io.BytesIO):
338358
return stream.read(self.readLong(stream)).decode()

core/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"cluster.base_url": "https://openbmclapi.bangbang93.com",
1313
"cluster.id": "",
1414
"cluster.secret": "",
15-
"cluster.host": None,
15+
"cluster.host": "",
1616
"cluster.byoc": False,
1717
"cluster.public_port": 8080,
1818
"cluster.port": 8800,
@@ -70,4 +70,5 @@ def _set_value(self, dict_obj, keys, value):
7070
dict_obj = dict_obj[key]
7171
dict_obj[keys[-1]] = value
7272

73+
7374
Config: CFG = CFG("./config/config.yml")

core/router.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from core.classes import Storage
44
from core.logger import logger
55
from core.utils import checkSign
6-
from typing import List
6+
from typing import List, Union
77
from aiohttp import web
88
import random
99

@@ -17,7 +17,7 @@ def __init__(self, app: web.Application, storages: List[Storage]) -> None:
1717
def route(self, path, method="GET"):
1818
def decorator(func):
1919
@wraps(func)
20-
async def wrapper(request):
20+
async def wrapper(request: web.Request):
2121
return await func(request)
2222

2323
self.app.router.add_route(method, path, wrapper)
@@ -29,33 +29,37 @@ def init(self) -> None:
2929
@self.route("/download/{hash}")
3030
async def _(
3131
request: web.Request,
32-
) -> web.Response | web.FileResponse | web.StreamResponse:
33-
hash = request.match_info.get("hash").lower()
34-
valid = checkSign(hash, self.secret, request.query)
35-
if not valid:
32+
) -> Union[web.Response, web.FileResponse, web.StreamResponse]:
33+
file_hash = request.match_info.get("hash", "").lower()
34+
if not checkSign(file_hash, self.secret, request.query):
3635
return web.Response(text="Invalid signature.", status=403)
36+
3737
response = None
38-
data = await random.choice(self.storages).express(hash, request, response)
38+
data = await random.choice(self.storages).express(
39+
file_hash, request, response
40+
)
3941
logger.debug(data)
4042
return response
4143

4244
@self.route("/measure/{size}")
43-
async def _(request: web.Request):
45+
async def _(request: web.Request) -> web.StreamResponse:
4446
try:
45-
size = int(request.match_info['size'])
46-
if not checkSign(f"/measure/{size}", self.secret, request.query):
47-
return web.Response(status=403)
48-
if size > 200:
49-
return web.Response(status=400)
47+
size = int(request.match_info.get("size", "0"))
48+
49+
if (
50+
not checkSign(f"/measure/{size}", self.secret, request.query)
51+
or size > 200
52+
):
53+
return web.Response(status=403 if size > 200 else 400)
5054

51-
buffer = b'\x00\x66\xcc\xff' * 256 * 1024
55+
buffer = b"\x00\x66\xcc\xff" * 256 * 1024
5256
response = web.StreamResponse(
5357
status=200,
54-
reason='OK',
58+
reason="OK",
5559
headers={
56-
'Content-Length': str(size * 1024 * 1024),
57-
'Content-Type': 'application/octet-stream'
58-
}
60+
"Content-Length": str(size * 1024 * 1024),
61+
"Content-Type": "application/octet-stream",
62+
},
5963
)
6064

6165
await response.prepare(request)

core/utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import time
44

55
def checkSign(hash: str, secret: str, query: dict) -> bool:
6-
return True
76
if not (s := query.get("s")) or not (e := query.get("e")):
87
return False
98
sign = (

0 commit comments

Comments
 (0)