22import asyncio
33from collections import defaultdict , deque
44from dataclasses import asdict , dataclass
5+ import datetime
56import hashlib
67import hmac
78import io
@@ -36,6 +37,17 @@ class File:
3637 def __hash__ (self ) -> int :
3738 return hash ((self .hash , self .size , self .mtime ))
3839
40+ @dataclass
41+ class FailedFile :
42+ file : File
43+ failed_times : int
44+ first_time : datetime .datetime
45+ last_failed_time : float
46+
47+ def __hash__ (self ) -> int :
48+ return hash (self .file )
49+
50+
3951class StorageManager :
4052 def __init__ (self , clusters : 'ClusterManager' ):
4153 self .clusters = clusters
@@ -278,7 +290,7 @@ def __init__(self, clusters: 'ClusterManager'):
278290 self .cluster_last_modified : defaultdict ['Cluster' , int ] = defaultdict (lambda : 0 )
279291 self .sync_sem : utils .SemaphoreLock = utils .SemaphoreLock (256 )
280292 self .download_statistics = DownloadStatistics ()
281- self .failed_hashs : asyncio .Queue [File ] = asyncio .Queue ()
293+ self .failed_hashs : asyncio .Queue [FailedFile ] = asyncio .Queue ()
282294 self .failed_hash_urls : defaultdict [str , FileDownloadInfo ] = defaultdict (lambda : FileDownloadInfo (set ()))
283295 self .task = None
284296
@@ -383,10 +395,18 @@ async def download(self, filelist: set[File]):
383395 await session .close ()
384396
385397 async def _download (self , session : aiohttp .ClientSession , file_queues : asyncio .Queue [File ], pbar : DownloadStatistics ):
386- while not file_queues .empty ():
398+ while not file_queues .empty () or not self . failed_hashs . empty () :
387399 recved = 0
388400 try :
389- file = await file_queues .get ()
401+ failed_file = None
402+ if file_queues .empty ():
403+ failed_file = await self .failed_hashs .get ()
404+ file = failed_file .file
405+ t = max (0 , min (failed_file .failed_times * 30 , 600 ) * 1e9 - (time .monotonic_ns () - failed_file .last_failed_time ))
406+ logger .tdebug ("cluster.debug.retry_download" , start_date = failed_file .first_time , file_path = file .path , file_hash = file .hash , file_size = units .format_bytes (file .size ), time = units .format_count_time (t ), count = failed_file .failed_times )
407+ await asyncio .sleep (t )
408+ else :
409+ file = await file_queues .get ()
390410 content = io .BytesIO ()
391411 async with self .sync_sem :
392412 async with session .get (
@@ -409,7 +429,10 @@ async def _download(self, session: aiohttp.ClientSession, file_queues: asyncio.Q
409429 break
410430 except Exception as e :
411431 pbar .update (- recved )
412- await file_queues .put (file )
432+ if failed_file is None :
433+ failed_file = FailedFile (file , 0 , datetime .datetime .now (), time .monotonic_ns ())
434+ failed_file .failed_times += 1
435+ await self .failed_hashs .put (failed_file )
413436 pbar .update_failed ()
414437 r = None
415438 if "resp" in locals ():
@@ -933,7 +956,7 @@ def __init__(self, hash: str, size: int, mtime: float, data: bytes) -> None:
933956ROOT = Path (__file__ ).parent .parent
934957
935958API_VERSION = "1.12.1"
936- USER_AGENT = f"openbmclapi/{ API_VERSION } python-openbmclapi/3.0 "
959+ USER_AGENT = f"openbmclapi/{ API_VERSION } python-openbmclapi/{ config . VERSION } "
937960CHECK_FILE_CONTENT = "Python OpenBMCLAPI"
938961CHECK_FILE_MD5 = hashlib .md5 (CHECK_FILE_CONTENT .encode ("utf-8" )).hexdigest ()
939962CHECK_FILE = File (
@@ -957,7 +980,7 @@ def convert_file_to_storage_file(file: File) -> SFile:
957980 )
958981
959982async def init ():
960- logger .tinfo ("cluster.info.init" , openbmclapi_version = API_VERSION , version = core .VERSION )
983+ logger .tinfo ("cluster.info.init" , openbmclapi_version = API_VERSION , version = config .VERSION )
961984 # read clusters from config
962985 config_clusters = config .Config .get ("clusters" )
963986 for ccluster in config_clusters :
0 commit comments