Skip to content

Commit f4e2350

Browse files
committed
fix: 修改了 dashboard 的获取数据方式
1 parent 7a01b9b commit f4e2350

File tree

4 files changed

+168
-52
lines changed

4 files changed

+168
-52
lines changed

assets/js/index.js

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,71 @@ class Utils {
1717
}
1818
}
1919
}
20-
return uuid.join('');
20+
return uuid.join('').toLocaleLowerCase();
2121
}
2222
}
2323

2424
class Socket {
2525
constructor(baseurl) {
26-
this._baseurl = baseurl;
2726
this._ws = null;
2827
this._wsReconnectTask = null;
28+
this._clientId = Utils.uuid();
29+
this._baseurl = baseurl;
30+
this._url = this._baseurl + "?id=" + this._clientId
31+
this._keepaliveTask = null;
32+
this._echoCallbacks = {}
2933
this._wsConnect()
30-
window.addEventListener("beforeinput", () => {
34+
window.addEventListener("beforeunload", () => {
35+
this.send("disconnect")
3136
this._ws?.close()
3237
})
38+
setInterval(() => this._keepalive(), 5000);
39+
this._keepalive();
3340
}
34-
async _sendByFetch(event, data) {
35-
var resp = await fetch({
36-
"url": this._baseurl,
37-
"body": JSON.stringify(
38-
{
39-
"event": event,
40-
"data": data,
41-
"echo_id": Utils.uuid()
42-
}
43-
)
41+
async _keepalive() {
42+
console.log(await this.send("keepalive", {
43+
"timestamp": new Date()
44+
}))
45+
}
46+
async _sendByXHR(event, data) {
47+
var xhr = new XMLHttpRequest();
48+
var echo_id = Utils.uuid();
49+
xhr.addEventListener("readystatechange", (event) => {
50+
if (event.target.readyState == XMLHttpRequest.DONE) {
51+
this._dispatchData(JSON.parse(xhr.response));
52+
}
4453
})
45-
return await resp.json()
54+
xhr.open("POST", this._url);
55+
return this._setEchoCallback(echo_id, () => {
56+
xhr.send(JSON.stringify({
57+
"event": event,
58+
"data": data,
59+
echo_id
60+
}));
61+
setTimeout(() => {
62+
xhr.abort();
63+
}, 10000)
64+
});
65+
}
66+
_dispatchData(responses) {
67+
responses.forEach(response => {
68+
const { echo_id, event, data } = response;
69+
console.log(echo_id, event)
70+
if (echo_id == null) { // global dispatch event
71+
72+
} else if (echo_id in this._echoCallbacks) {
73+
var { resolve, reject, timer } = this._echoCallbacks[echo_id];
74+
delete this._echoCallbacks[echo_id];
75+
clearTimeout(timer);
76+
resolve(data);
77+
}
78+
});
4679
}
4780
_wsConnect() {
4881
clearTimeout(this._wsReconnectTask)
4982
this._ws?.close()
5083
this._ws = new WebSocket(
51-
this._baseurl
84+
this._url
5285
)
5386
this._ws.addEventListener("close", () => {
5487
console.warn("The websocket has disconnected. After 5s to reconnect.")
@@ -58,29 +91,36 @@ class Socket {
5891
})
5992
this._ws.addEventListener("message", (event) => {
6093
var raw_data = JSON.parse(event.data);
94+
this._dispatchData(raw_data)
6195
})
6296
}
63-
async _sendByWs(event, data) {
64-
if (this._ws?.state != WebSocket.OPEN) return;
97+
_sendByWs(event, data) {
98+
if (this._ws?.readyState != WebSocket.OPEN) return;
6599
var echo_id = Utils.uuid();
66-
this._ws.send({
67-
"event": event,
68-
"data": data,
69-
echo_id
70-
})
71-
var promise = new Promise((resolve, reject) => {
72-
100+
return this._setEchoCallback(echo_id, () => {
101+
this._ws.send(JSON.stringify(
102+
{
103+
"event": event,
104+
"data": data,
105+
echo_id
106+
}
107+
))
73108
});
74-
return promise;
109+
}
110+
_setEchoCallback(id, executor) {
111+
return new Promise((resolve, reject) => {
112+
this._echoCallbacks[id] = { resolve, reject, timer: setTimeout(() => {
113+
reject("Timeout Error.")
114+
}, 10000)};
115+
executor();
116+
})
75117
}
76118
async send(event, data) {
77-
if (this._ws != null && this._ws.state == WebSocket.OPEN) {
78-
return this._sendByWs(event, data)
79-
} else {
80-
return this._sendByFetch(event, data)
81-
}
119+
var handler = this._ws?.readyState == WebSocket.OPEN ? this._sendByWs : this._sendByXHR
120+
return handler.bind(this)(event, data);
82121
}
83122

84123
}
85124

86-
const $socket = new Socket(window.location.origin + "/api")
125+
const $socket = new Socket(window.location.origin + "/api");
126+
$socket.send("echo", "hello world")

core/cluster.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -735,12 +735,14 @@ async def callback(data: tuple[Any, Any]):
735735
await self.sio.emit(
736736
event, data, callback=callback
737737
)
738+
timeout_task = None
738739
if timeout is not None:
739-
scheduler.run_later(lambda: fut.set_exception(asyncio.TimeoutError), timeout)
740+
timeout_task = scheduler.run_later(lambda: not fut.done() and fut.set_exception(asyncio.TimeoutError), timeout)
740741
try:
741742
await fut
742743
except:
743744
raise
745+
scheduler.cancel(timeout_task)
744746
return fut.result()
745747

746748
@dataclass
@@ -785,7 +787,7 @@ def __init__(self, hash: str, size: int, mtime: float, data: bytes) -> None:
785787

786788
ROOT = Path(__file__).parent.parent
787789

788-
API_VERSION = "1.11.0"
790+
API_VERSION = "1.12.0"
789791
USER_AGENT = f"openbmclapi/{API_VERSION} python-openbmclapi/3.0"
790792
CHECK_FILE_CONTENT = "Python OpenBMCLAPI"
791793
CHECK_FILE_MD5 = hashlib.md5(CHECK_FILE_CONTENT.encode("utf-8")).hexdigest()

core/dashboard.py

Lines changed: 92 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
import json
44
import os
55
import socket
6+
from typing import Any, Optional
67

78
import aiohttp
89
import psutil
910

10-
from core import config
11+
from core import cache, config
1112

1213
from . import utils
1314

@@ -76,7 +77,54 @@ def get_json(self):
7677
"qps": item.value.qps
7778
}} for item in self._data
7879
]
80+
81+
class ClientRoom:
82+
def __init__(self) -> None:
83+
self.clients = cache.MemoryStorage()
84+
self.__default_data = {
85+
"lastKeepalive": 0,
86+
"messages": []
87+
}
88+
89+
@property
90+
def _default_data(self):
91+
return self.__default_data.copy()
7992

93+
def join_ws(self, id: str):
94+
self.clients.set(id, self._default_data)
95+
def join_http(self, id: str):
96+
self.clients.set(id, self._default_data, 600)
97+
98+
def keepalive(self, id: str):
99+
client = self.clients.get(id, self._default_data)
100+
client["lastKeepalive"] = utils.get_runtime()
101+
self.clients.set(id, client)
102+
103+
104+
def exit(self, id: str):
105+
self.clients.delete(f"ws-{id}")
106+
self.clients.delete(f"http-{id}")
107+
108+
def send(self, event: str, data: Any, id: Optional[str] = None):
109+
ids = [_id for _id in self.clients.get_keys() if id is None or (id is not None and _id == id)]
110+
for id in ids:
111+
client = self.clients.get(id, self._default_data)
112+
client["messages"].append({
113+
"event": event,
114+
"data": data
115+
})
116+
self.clients.set(id, client)
117+
118+
def get_messages(self, id: str):
119+
client = self.clients.get(id, self._default_data)
120+
data: list[Any] = client["messages"].copy()
121+
client["messages"].clear()
122+
self.clients.set(id, client)
123+
return data
124+
125+
126+
127+
room = ClientRoom()
80128
counter = Counter()
81129
process = psutil.Process(os.getpid())
82130

@@ -110,27 +158,52 @@ async def _(request: web.Request):
110158
@route.get("/api")
111159
async def _(request: web.Request):
112160
if request.headers.get("Connection", "").lower() == "upgrade" and request.headers.get("Upgrade", "").lower() == "websocket":
113-
ws = web.WebSocketResponse()
114-
ws.can_prepare(request)
115-
await ws.prepare(request)
116-
await ws.send_json({
117-
"event": "echo",
118-
"data": "hello world",
119-
"echo_id": None
120-
})
121-
while not ws.closed:
122-
try:
123-
raw_data = await ws.receive()
161+
try:
162+
ws = web.WebSocketResponse()
163+
ws.can_prepare(request)
164+
await ws.prepare(request)
165+
id: str = request.query.get("id") # type: ignore
166+
room.join_ws(id)
167+
while not ws.closed:
124168
try:
125-
print(raw_data)
169+
raw_data = json.loads((await ws.receive()).data)
170+
messages = room.get_messages(id)
171+
messages.append({
172+
"echo_id": raw_data.get("echo_id"),
173+
"event": raw_data["event"],
174+
"data": raw_data
175+
})
176+
if raw_data["event"] == "keepalive":
177+
room.keepalive(id)
178+
await ws.send_json(messages)
126179
except:
127-
...
128-
except:
129-
break
130-
return ws
131-
else:
132-
return web.json_response({
180+
break
181+
room.exit(id)
182+
return ws
183+
except:
184+
...
185+
return web.HTTPNotFound()
186+
187+
@route.post("/api")
188+
async def _(request: web.Request):
189+
try:
190+
id: str = request.query.get("id") # type: ignore
191+
room.join_http(id)
192+
body = await request.read()
193+
raw_data = json.loads(body)
194+
messages = room.get_messages(id)
195+
messages.append({
196+
"event": raw_data["event"],
197+
"echo_id": raw_data.get("echo_id"),
198+
"data": raw_data
133199
})
200+
if raw_data["event"] == "disconnect":
201+
room.exit(id)
202+
if raw_data["event"] == "keepalive":
203+
room.keepalive(id)
204+
return web.json_response(messages)
205+
except:
206+
return web.HTTPInternalServerError()
134207

135208

136209
route.static("/assets", "./assets")

core/storages/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ def empty():
338338
r["name"]
339339
)
340340
files[f"{root_id:02x}"].append(file)
341+
await asyncio.sleep(0)
341342
update_tqdm()
342343

343344
return files

0 commit comments

Comments
 (0)