Skip to content

Commit 0700466

Browse files
committed
feat: 增加 dashboard SSE(EventSource) 连接
1 parent 4858c68 commit 0700466

File tree

7 files changed

+439
-66
lines changed

7 files changed

+439
-66
lines changed

assets/js/index.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
ElementManager,
55
Style,
66
I18NManager,
7-
Router,
87
createElement,
98
SVGContainers,
109
calcElementHeight,
@@ -211,6 +210,12 @@ class Channel {
211210
// event source
212211
_event_source_init() {
213212
this._event_source = new EventSource(this.url + "_event");
213+
this._event_source.onopen = (event) => {
214+
console.log(event)
215+
}
216+
this._event_source.onmessage = (event) => {
217+
console.log(event)
218+
}
214219

215220
}
216221
// websocket

core/cache.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ def __init__(self, default_timeout: Optional[float] = None):
2828
self.background = BackgroundScheduler()
2929
self.default_timeout = default_timeout
3030
self.background.start()
31+
32+
def keys(self):
33+
return self.cache.keys()
3134

3235
def set(self, key: K, value: T, timeout: Optional[float] = None) -> None:
3336
self._delete_job(key)

core/cluster.py

Lines changed: 126 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from tqdm import tqdm
1818

1919
from . import web
20-
from . import utils, logger, config, scheduler, units, storages, i18n
20+
from . import utils, logger, config, scheduler, units, storages, i18n, dashboard
2121
from .storages import File as SFile, MeasureFile
2222
import socketio
2323
import urllib.parse as urlparse
@@ -55,13 +55,13 @@ class FailedFile:
5555
def __hash__(self) -> int:
5656
return hash(self.file)
5757

58-
5958
class StorageManager:
6059
def __init__(self, clusters: 'ClusterManager'):
6160
self.clusters = clusters
6261
self.storages: deque[storages.iStorage] = deque()
6362
self.available_storages: deque[storages.iStorage] = deque()
6463
self.check_available: utils.CountLock = utils.CountLock()
64+
self.logger = clusters.event_logger
6565

6666
self.check_available.acquire()
6767

@@ -75,18 +75,38 @@ def init(self):
7575

7676
async def _check_available(self):
7777
for storage in self.storages:
78+
err = None
7879
res = False
7980
try:
8081
res = await asyncio.wait_for(self.__check_available(storage), 10)
81-
except:
82+
except Exception as e:
8283
logger.ttraceback("storage.error.check_available", type=storage.type, path=storage.path, url=getattr(storage, "url", None))
83-
if storage in self.available_storages:
84-
self.available_storages.remove(storage)
84+
res = False
85+
err = e
8586
if res:
8687
if storage not in self.available_storages:
8788
self.available_storages.append(storage)
89+
await self.logger.logger(
90+
EventLoggerType.storage,
91+
"up",
92+
{
93+
"id": storage.unique_id,
94+
"type": storage.type,
95+
"name": storage.name
96+
}
97+
)
8898
elif storage in self.available_storages:
8999
self.available_storages.remove(storage)
100+
await self.logger.logger(
101+
EventLoggerType.storage,
102+
"down",
103+
{
104+
"id": storage.unique_id,
105+
"type": storage.type,
106+
"name": storage.name,
107+
"error": str(err)
108+
}
109+
)
90110
logger.debug(f"Available storages: {len(self.available_storages)}")
91111
if len(self.available_storages) > 0:
92112
self.check_available.release()
@@ -191,11 +211,13 @@ async def _check_exists(self, file: File, storage: storages.iStorage):
191211
file_hash = file.hash
192212
return file_hash in self.cache_filelist[storage]
193213
return False #await storage.exists(convert_file_to_storage_file(file))
214+
194215
async def _check_size(self, file: File, storage: storages.iStorage):
195216
if storage in self.cache_filelist:
196217
file_hash = file.hash
197218
return file_hash in self.cache_filelist[storage] and self.cache_filelist[storage][file_hash].size == file.size
198219
return False #await self._check_exists(file, storage) and await storage.get_size(convert_file_to_storage_file(file)) == file.size
220+
199221
async def _check_hash(self, file: File, storage: storages.iStorage):
200222
return False #await self._check_exists(file, storage) and utils.equals_hash(file.hash, (await storage.read_file(convert_file_to_storage_file(file))).getvalue())
201223

@@ -311,6 +333,17 @@ async def _write_file(self, file: File, content: io.BytesIO, storage: storages.i
311333
retries += 1
312334
return False
313335

336+
@property
337+
def flavor_storage(self):
338+
res = []
339+
for storage in self.storages:
340+
if storage.type in res:
341+
continue
342+
res.append(storage.type)
343+
return "+".join(res)
344+
345+
346+
314347
@dataclass
315348
class OpenBMCLAPIConfiguration:
316349
source: str
@@ -428,7 +461,6 @@ async def _get_filelist(self, cluster: 'Cluster'):
428461
logger.ttraceback("cluster.error.fetch_filelist", cluster=cluster.id)
429462
return []
430463

431-
432464
async def fetch_filelist(self) -> set[File]:
433465
result_filelist = set().union(*await asyncio.gather(*(asyncio.create_task(self._get_filelist(cluster)) for cluster in self.clusters.clusters)))
434466
logger.tsuccess("cluster.success.fetch_filelist", total=units.format_number(len(result_filelist)), size=units.format_bytes(sum(f.size for f in result_filelist)))
@@ -586,6 +618,7 @@ async def report(self, file: File, error: Exception, resp: Optional[aiohttp.Clie
586618
await self._report(self.failed_hash_urls[file.hash].urls)
587619
self.failed_hash_urls[file.hash].failed = 0
588620
self.failed_hash_urls[file.hash].urls = set()
621+
589622
async def _report(self, urls: set[tuple[URLResponse, ...]]):
590623
async def _r(urls: tuple[URLResponse, ...]):
591624
async with session.post(
@@ -604,7 +637,6 @@ async def _r(urls: tuple[URLResponse, ...]):
604637
) as session:
605638
await asyncio.gather(*[_r(urls) for urls in urls])
606639

607-
608640
async def _get_configuration(self, cluster: 'Cluster'):
609641
try:
610642
async with aiohttp.ClientSession(
@@ -629,29 +661,27 @@ async def _get_configuration(self, cluster: 'Cluster'):
629661
return {}
630662

631663
@dataclass
632-
class ClusterLoggerSchema:
633-
type: "ClusterLoggerType"
664+
class EventLoggerSchema:
665+
type: "EventLoggerType"
634666
cluster_id: str
635667
event: str
636668
data: Any
637669
time: datetime.datetime
638670

639-
class ClusterLoggerType(enum.Enum):
640-
emit = "emit"
641-
response = "response"
671+
class EventLoggerType(enum.Enum):
672+
cluster = "cluster"
673+
storage = "storage"
642674

643-
class ClusterLogger:
675+
class EventLogger:
644676
def __init__(
645677
self,
646678
dir: str = logger.dir,
647-
prefix: str = "cluster",
648-
emit_filter: Callable[[str], bool] = lambda x: False,
649-
callback_filter: Callable[[str], bool] = lambda x: x in ("request-cert", ),
679+
prefix: str = "event",
680+
sse_emiter: dashboard.SSEEmiter = dashboard.SSEEMIT
650681
):
651682
self._dir = dir
652683
self._prefix = prefix
653-
self._emit_filter = emit_filter
654-
self._callback_filter = callback_filter
684+
self._sse_emiter = sse_emiter
655685

656686
@property
657687
def file_path(self):
@@ -663,58 +693,87 @@ def file_path(self):
663693
)
664694
return self._file_path
665695

666-
def emit(self, cluster_id: str, event: str, data: Any):
667-
if self._emit_filter(event):
668-
data = "(This data is filtered)"
669-
self._write(ClusterLoggerType.emit, cluster_id, event, data)
696+
async def emit_cluster(
697+
self,
698+
cluster_id: str,
699+
event: str,
700+
data: Any
701+
):
702+
await self.logger(
703+
EventLoggerType.cluster,
704+
event,
705+
{
706+
"cluster_id": cluster_id,
707+
"type": "emit",
708+
"data": data
709+
}
710+
)
670711

671-
def callback(self, cluster_id: str, event: str, data: Any):
672-
if self._callback_filter(event):
673-
data = "(This data is filtered)"
674-
self._write(ClusterLoggerType.response, cluster_id, event, data)
712+
async def callback_cluster(
713+
self,
714+
cluster_id: str,
715+
event: str,
716+
data: Any
717+
):
718+
await self.logger(
719+
EventLoggerType.cluster,
720+
event,
721+
{
722+
"cluster_id": cluster_id,
723+
"type": "callback",
724+
"data": data
725+
}
726+
)
675727

676-
def _write(self, type: ClusterLoggerType, cluster_id: str, event: str, data: Any):
728+
async def logger(
729+
self,
730+
type: EventLoggerType,
731+
event: str,
732+
data: Any
733+
):
734+
buffer = {
735+
"type": type.value,
736+
"event": event,
737+
"data": data
738+
}
677739
with open(self.file_path, "a") as f:
678740
f.write(json.dumps(
679-
{
680-
"type": type.value,
681-
"cluster_id": cluster_id,
682-
"event": event,
683-
"data": data,
684-
"time": datetime.datetime.now().isoformat()
685-
}
741+
buffer
686742
) + "\n")
743+
await self._sse_emiter.push_message(
744+
f"{type.value}_{event}",
745+
data
746+
)
687747

688748
def _list_files(self):
689749
return list(Path(self._dir).glob(f"{self._prefix}_*.log"))
690750

691-
def _read_file(self, file: Path) -> list[ClusterLoggerSchema]:
751+
def _read_file(self, file: Path) -> list[EventLoggerSchema]:
692752
res = []
693753
with open(file, "r") as f:
694754
for line in f:
695755
if not line:
696756
continue
697757
data = json.loads(line)
698-
obj = ClusterLoggerSchema(**data)
699-
obj.type = ClusterLoggerType(obj.type)
758+
obj = EventLoggerSchema(**data)
759+
obj.type = EventLoggerType(obj.type)
700760
res.append(obj)
701761
return res
702762

703-
def read(self) -> list[ClusterLoggerSchema]:
704-
res: list[ClusterLoggerSchema] = []
763+
def read(self) -> list[EventLoggerSchema]:
764+
res: list[EventLoggerSchema] = []
705765
for file in self._list_files():
706766
res.extend(self._read_file(file))
707767
res.sort(key=lambda x: x.time)
708768
return res
709769

710-
711770
class ClusterManager:
712771
def __init__(self):
713772
self.cluster_id_tables: dict[str, 'Cluster'] = {}
714773
self.file_manager = FileListManager(self)
715-
self.storage_manager = StorageManager(self)
716774
self.clusters_certificate: dict[str, ClusterCertificate] = {}
717-
self.event_logger = ClusterLogger()
775+
self.event_logger = EventLogger()
776+
self.storage_manager = StorageManager(self)
718777
self.initialized = False
719778

720779
def add_cluster(self, cluster: 'Cluster'):
@@ -914,6 +973,7 @@ async def enable(self):
914973
if self.enabled:
915974
return
916975
scheduler.cancel(self.delay_enable_task)
976+
self.delay_enable_task = None
917977
if not await clusters.storage_manager.available():
918978
return
919979
if self.want_enable:
@@ -930,8 +990,8 @@ async def enable(self):
930990
"version": API_VERSION,
931991
"noFastEnable": True,
932992
"flavor": {
933-
"storage": "local",
934-
"runtime": f"python/{config.PYTHON_VERSION}"
993+
"storage": clusters.storage_manager.flavor_storage,
994+
"runtime": f"python/{config.PYTHON_VERSION} python-openbmclapi/{config.VERSION}"
935995
}
936996
}
937997
, 120)
@@ -1049,31 +1109,31 @@ async def connect(self):
10491109
def setup_handlers(self):
10501110
@self.sio.on("connect") # type: ignore
10511111
async def _() -> None:
1052-
clusters.event_logger.callback(self.cluster.id, "connect", None)
1112+
await clusters.event_logger.callback_cluster(self.cluster.id, "connect", None)
10531113
logger.tdebug("cluster.debug.socketio.connected", cluster=self.cluster.id)
10541114

10551115
@self.sio.on("disconnect") # type: ignore
10561116
async def _() -> None:
1057-
clusters.event_logger.callback(self.cluster.id, "disconnect", None)
1117+
await clusters.event_logger.callback_cluster(self.cluster.id, "disconnect", None)
10581118
logger.tdebug("cluster.debug.socketio.disconnected", cluster=self.cluster.id)
10591119

10601120
@self.sio.on("message") # type: ignore
10611121
async def _(message: Any):
1062-
clusters.event_logger.callback(self.cluster.id, "message", message)
1122+
await clusters.event_logger.callback_cluster(self.cluster.id, "message", message)
10631123
if isinstance(message, dict) and "message" in message:
10641124
message = message["message"]
10651125
logger.tinfo("cluster.info.socketio.message", cluster=self.cluster.id, message=message)
10661126

10671127
@self.sio.on("exception") # type: ignore
10681128
async def _(message: Any):
1069-
clusters.event_logger.callback(self.cluster.id, "exception", message)
1129+
await clusters.event_logger.callback_cluster(self.cluster.id, "exception", message)
10701130
if isinstance(message, dict) and "message" in message:
10711131
message = message["message"]
10721132
logger.terror("cluster.error.socketio.message", cluster=self.cluster.id, message=message)
10731133

10741134
@self.sio.on("warden-error") # type: ignore
10751135
async def _(message: Any):
1076-
clusters.event_logger.callback(self.cluster.id, "warden-error", message)
1136+
await clusters.event_logger.callback_cluster(self.cluster.id, "warden-error", message)
10771137
if isinstance(message, dict) and "message" in message:
10781138
message = message["message"]
10791139
logger.terror("cluster.error.socketio.warden", cluster=self.cluster.id, message=message)
@@ -1090,15 +1150,27 @@ async def emit(self, event: str, data: Any = None, timeout: Optional[float] = No
10901150
fut = asyncio.get_event_loop().create_future()
10911151

10921152
async def callback(data: tuple[Any, Any]):
1093-
clusters.event_logger.callback(self.cluster.id, event, {
1094-
"err": data[0],
1095-
"ack": data[1] if len(data) > 1 else None
1153+
if event in (
1154+
"request-cert",
1155+
):
1156+
callback_data = (
1157+
"None",
1158+
"(The data is filtered.)"
1159+
)
1160+
else:
1161+
callback_data = (
1162+
data[0],
1163+
data[1] if len(data) > 1 else None
1164+
)
1165+
await clusters.event_logger.callback_cluster(self.cluster.id, event, {
1166+
"err": callback_data[0],
1167+
"ack": callback_data[1]
10961168
})
10971169
fut.set_result(SocketIOEmitResult(data[0], data[1] if len(data) > 1 else None))
10981170
await self.sio.emit(
10991171
event, data, callback=callback
11001172
)
1101-
clusters.event_logger.emit(self.cluster.id, event, data)
1173+
await clusters.event_logger.emit_cluster(self.cluster.id, event, data)
11021174
timeout_task = None
11031175
if timeout is not None:
11041176
timeout_task = scheduler.run_later(lambda: not fut.done() and fut.set_exception(asyncio.TimeoutError), timeout)
@@ -1229,6 +1301,7 @@ async def init_measure_file(
12291301
except:
12301302
logger.ttraceback("cluster.error.init_measure_file", path=storage.path, type=storage.type, size=units.format_bytes(size * 1024 * 1024), hash=hash)
12311303
return False
1304+
12321305
async def init_measure_files():
12331306
await clusters.storage_manager.available()
12341307
results = await asyncio.gather(*[asyncio.create_task(init_measure_file(storage, size, MEASURES_HASH[size])) for storage in clusters.storage_manager.available_storages for size in MEASURES_HASH])

0 commit comments

Comments
 (0)