@@ -481,8 +481,10 @@ def __init__(self, clusterID: str, clusterSecert: str) -> None:
481481 self .byoc = BYOC
482482 self .cert_expires : float = 0
483483 self .retry_timer = None
484+ self ._retry : int = 0
484485 self .keepalive_timer = None
485486 self .enabled = False
487+ self .trusted = True
486488
487489 async def __call__ (self ) -> Any :
488490 try :
@@ -538,11 +540,21 @@ async def cluster_keepalive(self):
538540 async def cluster_retry (self ):
539541 if self .enabled :
540542 await self .cluster_disable ()
543+ if RECONNECT_RETRY != - 1 and self ._retry >= RECONNECT_RETRY :
544+ logger .terror (
545+ "cluster.error.cluster.reached_maximum_retry_count" ,
546+ count = RECONNECT_RETRY ,
547+ )
548+ return
549+ self ._retry += 1
550+ logger .tinfo ("cluster.info.cluster.retry" , t = RECONNECT_DELAY )
541551 scheduler .cancel (self .retry_timer )
542- self .retry_timer = scheduler .delay (self .cluster_enable , delay = 60 )
552+ self .retry_timer = scheduler .delay (self .cluster_enable , delay = RECONNECT_DELAY )
553+
543554 async def cluster_cert (self , ):
544555 if self .byoc :
545556 return
557+ await dashboard .set_status ("cluster.got.cert" )
546558 err , ack = await self .socketio_emit_with_ack (
547559 "request-cert"
548560 )
@@ -551,28 +563,43 @@ async def cluster_cert(self, ):
551563 return
552564 certificate .load_text (ack ["cert" ], ack ["key" ])
553565 self .cert_expires = utils .parse_iso_time (ack ["expires" ])
566+ await dashboard .set_status ("cluster.get.cert" )
567+
554568 async def cluster_enable (self ):
569+ if not ENABLE :
570+ return
555571 try :
556- err , ack = await self .socketio_emit_with_ack ("enable" , {
557- "host" : PUBLIC_HOST ,
558- "port" : PUBLIC_PORT or PORT ,
559- "byoc" : self .byoc ,
560- "version" : API_VERSION ,
561- "noFastEnable" : True ,
562- "flavor" : {
563- "runtime" : f"python/{ sys .version_info .major } .{ sys .version_info .minor } .{ sys .version_info .micro } { VERSION } " ,
564- "storage" : "+" .join ((key for key , value in asdict (self .storages .get_storage_info (available = True )).items () if key != "total" and value != 0 )),
565- ** asdict (self .storages .get_storage_info (available = True ))
566- }
567- }, timeout = 600 )
572+ await dashboard .set_status ("cluster.want_enable" )
573+ if not self .storages .available_storages :
574+ err , ack = ({"message" : "当前无可用的存储" }, None )
575+ logger .twarn ("cluster.warn.cluster.no_storage" )
576+ else :
577+ err , ack = await self .socketio_emit_with_ack ("enable" , {
578+ "host" : PUBLIC_HOST ,
579+ "port" : PUBLIC_PORT or PORT ,
580+ "byoc" : self .byoc ,
581+ "version" : API_VERSION ,
582+ "noFastEnable" : True ,
583+ "flavor" : {
584+ "runtime" : f"python/{ sys .version_info .major } .{ sys .version_info .minor } .{ sys .version_info .micro } { VERSION } " ,
585+ "storage" : "+" .join ((key for key , value in asdict (self .storages .get_storage_info (available = True )).items () if key != "total" and value != 0 )),
586+ ** asdict (self .storages .get_storage_info (available = True ))
587+ }
588+ }, timeout = 600 )
568589 if err :
569- logger .error (self .cluster_error (err ))
590+ logger .terror (
591+ "cluster.error.cluster.failed_to_start_service" ,
592+ e = err ["message" ],
593+ )
570594 await self .cluster_retry ()
571595 return
572596 if ack :
573597 self .enabled = True
574598 logger .tinfo ("cluster.info.cluster.hosting" , id = self .clusterID , port = PUBLIC_PORT or PORT )
575599 self .keepalive_timer = scheduler .repeat (self .cluster_keepalive , interval = 60 )
600+ await dashboard .set_status (
601+ "cluster.enabled" + (".trusted" if self .trusted else "" )
602+ )
576603 except asyncio .CancelledError :
577604 return asyncio .CancelledError
578605 except asyncio .TimeoutError :
@@ -586,6 +613,7 @@ async def cluster_disable(self, wait: bool = False):
586613 await self .socketio_emit_with_ack ("disable" )
587614 else :
588615 await self .socketio_emit ("disable" )
616+ self .enabled = False
589617 return True
590618 except :
591619 self .enabled = False
@@ -767,26 +795,37 @@ async def _(request: web.Request, name: str):
767795 def _ ():
768796 return "User-agent: * Disallow: /"
769797
770-
771798 def cluster_error (self , err ):
772799 if isinstance (err , dict ) and 'message' in err :
773800 return str (err ['message' ])
774801 return str (err )
775-
776802
777803 async def socketio_disconnect (self ):
778804 if not self .sio .connected :
779805 return
780806 await self .cluster_disable (True )
781807 await self .sio .disconnect ()
808+
782809 async def socketio_init (self ):
783810 if self .sio is not None :
784811 return
785812 self .sio = socketio .AsyncClient (
786813 logger = socketio_logger ,
787814 engineio_logger = socketio_logger
788815 )
816+ self .sio .on ("message" , self ._message )
817+ self .sio .on ("exception" , self ._exception )
789818 await self .socketio_connect ()
819+
820+ def _message (self , message ):
821+ logger .tinfo ("cluster.info.cluster.remote_message" , message = message )
822+ if "信任度过低" in message :
823+ self .trusted = False
824+
825+ def _exception (self , message ):
826+ logger .terror ("cluster.error.cluster.remote_message" , message = message )
827+ scheduler .delay (self .cluster_retry )
828+
790829 async def socketio_connect (self ):
791830 if self .sio .connected :
792831 return
@@ -797,6 +836,7 @@ async def socketio_connect(self):
797836 transports = ["websocket" ],
798837 )
799838 await self .cluster_cert ()
839+
800840 async def socketio_message (self , channel , data : list [Any ], callback = None ):
801841 if len (data ) == 1 :
802842 data .append (None )
@@ -809,6 +849,7 @@ async def socketio_message(self, channel, data: list[Any], callback=None):
809849 await callback (err , ack )
810850 except :
811851 logger .error (traceback .format_exc ())
852+
812853 async def socketio_emit (self , channel , data = None , callback = None ):
813854 async def _ ():
814855 try :
@@ -870,6 +911,7 @@ def __init__(self, cluster: Cluster) -> None:
870911 self .errors : defaultdict [BMCLAPIFile , int ] = defaultdict (int )
871912 self .sem : asyncio .Semaphore = None
872913 self .check_timer = None
914+ self .checked = False
873915 self .storage_cur = 0
874916 if FILECHECK == "hash" :
875917 self .check_type_handler = self ._hash
@@ -916,6 +958,7 @@ async def fetch(self):
916958 "Authorization" : f"Bearer { self .cluster .token } "
917959 }
918960 ) as session :
961+ await dashboard .set_status ("files.fetching" )
919962 async with session .get (
920963 "/openbmclapi/files" , params = {
921964 "lastModified" : self .lastModified * 1000.0
@@ -1011,13 +1054,16 @@ async def start_check(self):
10111054
10121055 async def check (self , files : set [BMCLAPIFile ]) -> dict [Storage , set [BMCLAPIFile ]]:
10131056 miss_storages : dict [Storage , set [BMCLAPIFile ]] = {}
1057+ if not self .checked :
1058+ await dashboard .set_status ("files.checking" )
10141059 storages = self .available_storages
10151060 with tqdm (
10161061 desc = locale .t ("cluster.tqdm.desc.check_files" ),
10171062 total = len (files ) * len (storages ),
10181063 unit = locale .t ("cluster.tqdm.unit.file" ),
10191064 unit_scale = True ,
10201065 ) as pbar :
1066+ await dashboard .set_status_by_tqdm ("files.checking" , pbar )
10211067 try :
10221068 for storage , storage_result in zip (storages , await asyncio .gather (* [asyncio .create_task (self ._check_by_storage (files , storage , pbar )) for storage in storages ])):
10231069 miss_storages [storage ] = storage_result
@@ -1044,6 +1090,7 @@ async def check(self, files: set[BMCLAPIFile]) -> dict[Storage, set[BMCLAPIFile]
10441090 unit_divisor = 1024 ,
10451091 unit_scale = True ,
10461092 ) as pbar :
1093+ await dashboard .set_status_by_tqdm ("files.copying" , pbar )
10471094 for storage , failed in zip (copy_storage .keys (), await asyncio .gather (* [asyncio .create_task (self .copy_storage (origin , files_storage , pbar )) for origin , files_storage in copy_storage .items ()])):
10481095 miss_storages [storage ] = failed
10491096 return miss_storages
@@ -1154,6 +1201,7 @@ async def download(self, storages: dict[Storage, set[BMCLAPIFile]], configuratio
11541201 total = sum ((file .size for file in files )),
11551202 unit_scale = True ,
11561203 ) as pbar :
1204+ await dashboard .set_status_by_tqdm ("files.downloading" , pbar )
11571205 self .update_tqdm (pbar )
11581206 for _ in range (0 , max (1 , MAX_DOWNLOAD ), step ):
11591207 session = await self .get_session ()
0 commit comments