@@ -89,9 +89,15 @@ async def _check_available(self):
8989 logger .debug (f"Available storages: { len (self .available_storages )} " )
9090 if len (self .available_storages ) > 0 :
9191 self .check_available .release ()
92+ await clusters .enable ()
9293 else :
9394 self .check_available .acquire ()
94- logger .twarning ("storage.warning.unavailable" , storages = len (self .storages ))
95+ logger .twarning ("storage.warning.storage.unavailable" , storages = len (self .storages ))
96+ if any ((
97+ cluster .enabled for cluster in clusters .clusters
98+ )):
99+ logger .twarning ("storage.warning.storage.unavailable.for.clusters" )
100+ await clusters .disable ()
95101
96102 async def __check_available (self , storage : storages .iStorage ):
97103 file = MeasureFile (
@@ -183,14 +189,14 @@ async def _check_exists(self, file: File, storage: storages.iStorage):
183189 if storage in self .cache_filelist :
184190 file_hash = file .hash
185191 return file_hash in self .cache_filelist [storage ]
186- return await storage .exists (convert_file_to_storage_file (file ))
192+ return False # await storage.exists(convert_file_to_storage_file(file))
187193 async def _check_size (self , file : File , storage : storages .iStorage ):
188194 if storage in self .cache_filelist :
189195 file_hash = file .hash
190196 return file_hash in self .cache_filelist [storage ] and self .cache_filelist [storage ][file_hash ].size == file .size
191- return await self ._check_exists (file , storage ) and await storage .get_size (convert_file_to_storage_file (file )) == file .size
197+ return False # await self._check_exists(file, storage) and await storage.get_size(convert_file_to_storage_file(file)) == file.size
192198 async def _check_hash (self , file : File , storage : storages .iStorage ):
193- return await self ._check_exists (file , storage ) and utils .equals_hash (file .hash , (await storage .read_file (convert_file_to_storage_file (file ))).getvalue ())
199+ return False # await self._check_exists(file, storage) and utils.equals_hash(file.hash, (await storage.read_file(convert_file_to_storage_file(file))).getvalue())
194200
195201 async def get_file (self , hash : str , use_master : bool = False ):
196202 file = None
@@ -268,6 +274,15 @@ async def get_file(self, hash: str, use_master: bool = False):
268274 if got_hash == hash :
269275 break
270276 logger .terror ("cluster.error.download_hash" , got_hash = got_hash , hash = hash , content = body .decode ("utf-8" , "ignore" )[:64 ])
277+ if got_hash == hash :
278+ scheduler .run_later (
279+ self .write_file ,
280+ 1 ,
281+ (
282+ file ,
283+ body
284+ )
285+ )
271286 return file
272287
273288 def get_width_storage (self , c_storage : Optional [storages .iStorage ] = None ) -> storages .iStorage :
@@ -618,6 +633,7 @@ def __init__(self):
618633 self .file_manager = FileListManager (self )
619634 self .storage_manager = StorageManager (self )
620635 self .clusters_certificate : dict [str , ClusterCertificate ] = {}
636+ self .initialized = False
621637
622638 def add_cluster (self , cluster : 'Cluster' ):
623639 self .cluster_id_tables [cluster .id ] = cluster
@@ -644,11 +660,27 @@ async def start(self):
644660 # check files
645661 await self .file_manager .sync ()
646662
663+ # init measure
664+ await init_measure ()
665+
647666 # start job
648667
668+ self .initialized = True
669+ await self .enable ()
670+
671+ async def enable (self ):
672+ if not self .initialized :
673+ return
649674 for cluster in self .clusters :
650675 await cluster .enable ()
651676
677+ async def disable (self ):
678+ await asyncio .gather (* (
679+ cluster .disable ()
680+ for cluster in self .clusters
681+ ))
682+
683+
652684 def get_cluster_by_id (self , id : Optional [str ] = None ) -> Optional ['Cluster' ]:
653685 return self .cluster_id_tables .get (id or "" , None )
654686
@@ -798,7 +830,8 @@ async def request_cert(self):
798830 key .write (result .ack ["key" ])
799831
800832 async def enable (self ):
801- cert = await clusters .get_certificates (self .id )
833+ if self .enabled :
834+ return
802835 scheduler .cancel (self .delay_enable_task )
803836 if self .want_enable :
804837 return
@@ -1065,6 +1098,9 @@ def __init__(self, hash: str, size: int, mtime: float, data: bytes, storage: Opt
10651098MEASURES_HASH : dict [int , str ] = {
10661099}
10671100MEASURE_BUFFER : bytes = b'\x00 '
1101+ DEFAULT_MEASURES = [
1102+ 10
1103+ ]
10681104
10691105routes = web .routes
10701106aweb = web .web
@@ -1081,9 +1117,10 @@ def convert_file_to_storage_file(file: File) -> SFile:
10811117def init_measure_block (size : int ):
10821118 MEASURES_HASH [size ] = hashlib .md5 (MEASURE_BUFFER * 1024 * 1024 * size ).hexdigest ()
10831119
1084- async def init_measure (maxsize : int = 50 ):
1085- for i in range (1 , maxsize , 10 ):
1086- init_measure_block (i )
1120+ async def init_measure ():
1121+ for size in DEFAULT_MEASURES :
1122+ init_measure_block (size )
1123+ await init_measure_files ()
10871124
10881125async def init_measure_file (
10891126 storage : storages .iStorage ,
@@ -1107,7 +1144,8 @@ async def init_measure_file(
11071144 logger .ttraceback ("cluster.error.init_measure_file" , path = storage .path , type = storage .type , size = units .format_bytes (size * 1024 * 1024 ), hash = hash )
11081145 return False
11091146async def init_measure_files ():
1110- results = await asyncio .gather (* [init_measure_file (storage , size , MEASURES_HASH [size ]) for storage in clusters .storage_manager .storages for size in MEASURES_HASH ])
1147+ await clusters .storage_manager .available ()
1148+ results = await asyncio .gather (* [asyncio .create_task (init_measure_file (storage , size , MEASURES_HASH [size ])) for storage in clusters .storage_manager .available_storages for size in MEASURES_HASH ])
11111149 logger .debug (results )
11121150
11131151async def init ():
@@ -1226,7 +1264,11 @@ async def _(request: aweb.Request):
12261264 except :
12271265 logger .traceback ()
12281266 return aweb .Response (status = 400 )
1229-
1267+
1268+ @utils .retry (3 , 1 )
1269+ async def get_file (hash : str ):
1270+ return await asyncio .wait_for (asyncio .create_task (clusters .storage_manager .get_file (hash )), 5 )
1271+
12301272@routes .get ("/download/{hash}" )
12311273async def _ (request : aweb .Request ):
12321274 try :
@@ -1255,7 +1297,7 @@ async def _(request: aweb.Request):
12551297 )
12561298 return aweb .Response (status = 403 )
12571299 try :
1258- file = await asyncio .wait_for ( asyncio . create_task (clusters . storage_manager . get_file (hash )), 5 )
1300+ file = await asyncio .create_task (get_file (hash ))
12591301 except :
12601302 logger .ttraceback ("cluster.error.get_file" , hash = hash )
12611303 file = await asyncio .create_task (clusters .storage_manager .get_file (hash , True ))
0 commit comments