44import datetime
55import hmac
66from pathlib import Path
7+ import sys
78import tempfile
89import time
910from typing import Any , Optional
2021from .logger import logger
2122from .config import API_VERSION , ROOT_PATH , cfg , USER_AGENT
2223from .storage import CheckStorage , StorageManager
24+ from .database import get_db
2325
2426class TokenManager :
2527 def __init__ (
@@ -132,12 +134,13 @@ class Cluster:
132134 def __init__ (
133135 self ,
134136 id : str ,
135- secret : str
137+ secret : str ,
138+ manager : 'ClusterManager'
136139 ):
137140 self ._token = TokenManager (id , secret )
138141 self ._last_modified = 0
139142 self .sio = socketio .AsyncClient (
140- handle_sigint = False
143+ handle_sigint = False ,
141144 )
142145 self ._keepalive_lock = utils .CustomLock (locked = True )
143146 self ._storage_wait = utils .CustomLock (locked = True )
@@ -149,6 +152,7 @@ def __init__(
149152 self ._failed_keepalive = 0
150153 self ._retry_times = 0
151154 self ._task_group = None
155+ self ._manager = manager
152156
153157 @property
154158 def id (self ):
@@ -167,14 +171,17 @@ async def setup(
167171 @self .sio .on ("warden-error" ) # type: ignore
168172 async def _ (message : Any ):
169173 logger .twarning ("cluster.warden" , id = self .id , msg = message )
174+ await get_db ().insert_cluster_info (self .id , "server-push" , "warden-error" , message )
170175
171176 @self .sio .on ("exception" ) # type: ignore
172177 async def _ (message : Any ):
173178 logger .terror ("cluster.exception" , id = self .id , msg = message )
179+ await get_db ().insert_cluster_info (self .id , "server-push" , "exception" , message )
174180
175181 @self .sio .on ("message" ) # type: ignore
176182 async def _ (message : Any ):
177183 logger .tinfo ("cluster.message" , id = self .id , msg = message )
184+ await get_db ().insert_cluster_info (self .id , "server-push" , "message" , message )
178185
179186 @self .sio .on ("connect" ) # type: ignore
180187 async def _ ():
@@ -183,6 +190,7 @@ async def _():
183190 return
184191 self ._enabled = False
185192 logger .tinfo ("cluster.reconnect" , id = self .id )
193+ await get_db ().insert_cluster_info (self .id , "socketio" , "reconnect" )
186194 await self .enable ()
187195
188196
@@ -225,6 +233,10 @@ async def keepalive(self):
225233
226234 self .counter .hits -= current_counter .hits
227235 self .counter .bytes -= current_counter .bytes
236+
237+ # insert into db
238+ await get_db ().upsert_cluster_counter (self .id , current_counter .hits , current_counter .bytes )
239+
228240 resp_time = time .time () - datetime .datetime .fromisoformat (res .ack ).timestamp ()
229241 logger .tsuccess ("cluster.keepalive" , id = self .id , hits = units .format_number (current_counter .hits ), bytes = units .format_number (current_counter .bytes ), delay = F"{ resp_time * 1000 :.4f} " )
230242 except :
@@ -252,14 +264,19 @@ async def request_cert(self):
252264 return Certificate (CertificateType .CLUSTER , str (cert ), str (key ))
253265
254266 async def connect (self ):
255- await self .sio .connect (
256- cfg .base_url ,
257- transports = ['websocket' ],
258- headers = {
259- "User-Agent" : USER_AGENT ,
260- },
261- auth = self ._token .get_socketio_token
262- )
267+ try :
268+ await self .sio .connect (
269+ cfg .base_url ,
270+ transports = ['websocket' ],
271+ headers = {
272+ "User-Agent" : USER_AGENT ,
273+ },
274+ auth = self ._token .get_socketio_token
275+ )
276+ except :
277+ logger .debug_traceback ()
278+ await anyio .sleep (5 )
279+ await self .connect ()
263280
264281 async def emit (self , event : str , data : Any = None , timeout : Optional [int ] = None ) -> SocketEmitResult :
265282 err , ack = None , None
@@ -269,8 +286,13 @@ async def callback(data: Any):
269286 err , ack = data
270287 else :
271288 err , ack = data , None
289+ if event not in ("keep-alive" , ):
290+ await get_db ().insert_cluster_info (self .id , "server" , event , err )
272291 fut .set ()
273292
293+ if event not in ("keep-alive" , ):
294+ await get_db ().insert_cluster_info (self .id , "client" , event )
295+
274296 await self .sio .emit (event , data , callback = callback )
275297
276298 fut = anyio .Event ()
@@ -352,8 +374,8 @@ async def enable(self):
352374 "noFastEnable" : False ,
353375 "byoc" : utils .get_certificate_type () != CertificateType .CLUSTER ,
354376 "flavor" : {
355- "runtime" : "python" ,
356- "storage" : "local"
377+ "runtime" : f "python/ { sys . version_info . major } . { sys . version_info . minor } . { sys . version_info . micro } " ,
378+ "storage" : self . _manager . storages . get_storage_type . type
357379 }
358380 }, 300 )
359381 if res .err is not None :
@@ -546,7 +568,7 @@ def add_cluster(
546568 id : str ,
547569 secret : str
548570 ):
549- self ._clusters [id ] = Cluster (id , secret )
571+ self ._clusters [id ] = Cluster (id , secret , self )
550572
551573 def get_cluster (
552574 self ,
0 commit comments