11import abc
22import asyncio
33from collections import defaultdict , deque
4- from dataclasses import asdict , dataclass
4+ from dataclasses import asdict , dataclass , field
55import datetime
66import enum
77import hashlib
@@ -943,6 +943,7 @@ def __init__(self, id: str, secret: str):
943943 self .want_enable : bool = False
944944 self .enabled = False
945945 self .enable_count : int = 0
946+ self .keepalive_error : int = 0
946947 self .keepalive_task : Optional [int ] = None
947948 self .delay_enable_task : Optional [int ] = None
948949 self .counter : defaultdict [storages .iStorage , ClusterCounter ] = defaultdict (ClusterCounter )
@@ -1052,6 +1053,7 @@ async def enable(self):
10521053 self .enabled = True
10531054 clusters .status .enter ()
10541055 self .enable_count = 0
1056+ self .keepalive_error = 0
10551057 scheduler .cancel (self .keepalive_task )
10561058 self .keepalive_task = scheduler .run_repeat_later (self .keepalive , 1 , interval = 60 )
10571059 logger .tsuccess ("cluster.success.enabled" , cluster = self .id )
@@ -1083,9 +1085,14 @@ async def keepalive(self):
10831085 }
10841086 )
10851087 if result .err or not result .ack :
1086- logger .twarning ("cluster.warning.kicked_by_remote" )
1087- await self .disable ()
1088+ if self .keepalive_error >= 3 :
1089+ logger .twarning ("cluster.warning.kicked_by_remote" , cluster = self .id )
1090+ await self .disable ()
1091+ else :
1092+ self .keepalive_error += 1
1093+ logger .twarning ("cluster.warning.keepalive" , cluster = self .id , count = self .keepalive_error )
10881094 return
1095+ self .keepalive_error = 0
10891096 self .no_storage_counter -= commit_no_storage_counter
10901097 for storage , counter in commit_counter .items ():
10911098 self .counter [storage ] -= counter
@@ -1290,6 +1297,53 @@ def __init__(self, hash: str, size: int, mtime: float, data: bytes, storage: Opt
12901297 super ().__init__ (hash = hash , size = size , mtime = mtime , storage = storage )
12911298 self .data = data
12921299
1300+ @dataclass
1301+ class Bandwidth :
1302+ cluster_id : str
1303+ bytes : int
1304+
1305+ @dataclass
1306+ class BandwidthList :
1307+ cluster_id : str
1308+ bytes : deque [int ] = field (default_factory = deque )
1309+
1310+ class BandwidthCounter :
1311+ def __init__ (
1312+ self
1313+ ):
1314+ self .bandwidths : defaultdict [str , defaultdict [int , int ]] = defaultdict (lambda : defaultdict (int ))
1315+ scheduler .run_repeat_later (self .gc , 600 , 5 )
1316+
1317+ def hit (self , cluster_id : str , bytes : int ):
1318+ current = int (time .monotonic ())
1319+ self .bandwidths [cluster_id ][current ] += bytes
1320+
1321+
1322+ def gc (self ):
1323+ current = int (time .monotonic ())
1324+ for cluster_id in self .bandwidths :
1325+ for timestamp in list (self .bandwidths [cluster_id ]):
1326+ if current - timestamp > 600 :
1327+ del self .bandwidths [cluster_id ][timestamp ]
1328+
1329+ def total (self , interval : int = 1 ) -> list [Bandwidth ]:
1330+ res : list [Bandwidth ] = []
1331+ for b in self .get (interval ):
1332+ res .append (Bandwidth (b .cluster_id , sum (b .bytes )))
1333+ return res
1334+
1335+ def get (self , interval : int = 1 ) -> list [BandwidthList ]:
1336+ current = int (time .monotonic ()) - 1
1337+ bandwidths : list [BandwidthList ] = []
1338+ for cluster_id , b in self .bandwidths .items ():
1339+ obj = BandwidthList (cluster_id )
1340+ for timestamp in range (current - interval , current + 1 ):
1341+ if current - timestamp > interval :
1342+ continue
1343+ obj .bytes .append (b [timestamp ])
1344+ bandwidths .append (obj )
1345+ return bandwidths
1346+
12931347ROOT = Path (__file__ ).parent .parent
12941348
12951349CHECK_FILE_CONTENT = "Python OpenBMCLAPI"
@@ -1307,7 +1361,7 @@ def __init__(self, hash: str, size: int, mtime: float, data: bytes, storage: Opt
13071361DEFAULT_MEASURES = [
13081362 10
13091363]
1310-
1364+ BANDWIDTH_COUNTER = BandwidthCounter ()
13111365routes = web .routes
13121366aweb = web .web
13131367clusters = ClusterManager ()
@@ -1525,6 +1579,7 @@ async def _(request: aweb.Request):
15251579 size = end - start + 1
15261580
15271581 cluster .hit (file .storage , size )
1582+ BANDWIDTH_COUNTER .hit (cluster .id , size )
15281583 # add database
15291584 # stats
15301585 name = query .get ("name" )
0 commit comments