@@ -71,7 +71,9 @@ async def write_file(self, file: File, content: bytes):
7171 return all (await asyncio .gather (* (asyncio .create_task (storage .write_file (convert_file_to_storage_file (file ), content , file .mtime )) for storage in self .available_storages )))
7272
7373 async def get_missing_files (self , files : set [File ]) -> set [File | Any ]:
74- function = None
74+ async def _ (file : File , storage : storages .iStorage ):
75+ return True
76+ function = _
7577 if function is None :
7678 logger .twarning ("cluster.warning.no_check_function" )
7779 function = self ._check_exists
@@ -81,21 +83,24 @@ async def get_missing_files(self, files: set[File]) -> set[File | Any]:
8183 unit = "file" ,
8284 unit_scale = True ,
8385 ) as pbar :
84- missing_files = set ().union (await asyncio .gather (* (self ._get_missing_file (function , file , pbar ) for file in files )))
85- if None in missing_files :
86- missing_files .remove (None )
86+ missing_files = set ()
87+ waiting_files : asyncio .Queue [File ] = asyncio .Queue ()
88+
89+ for file in files :
90+ waiting_files .put_nowait (file )
91+
92+ await asyncio .gather (* (self ._get_missing_file_storage (function , missing_files , waiting_files , storage , pbar ) for storage in self .available_storages ))
8793 return missing_files or set ()
88-
89-
90- async def _get_missing_file (self , function : Callable [..., Coroutine [Any , Any , bool ]], file : File , pbar : tqdm ):
91- if all (await asyncio .gather (* (self ._get_missing_file_storage (function , file , storage , pbar ) for storage in self .available_storages ))):
92- return None
93- return file
9494
95- async def _get_missing_file_storage (self , function : Callable [..., Coroutine [Any , Any , bool ]], file : File , storage : storages .iStorage , pbar : tqdm ):
96- result = await function (file , storage )
97- pbar .update (1 )
98- return result
95+ async def _get_missing_file_storage (self , function : Callable [..., Coroutine [Any , Any , bool ]], missing_files : set [File ], files : asyncio .Queue [File ], storage : storages .iStorage , pbar : tqdm ):
96+ while not files .empty ():
97+ file = await files .get ()
98+ if await function (file , storage ):
99+ pbar .update (1 )
100+ else :
101+ missing_files .add (file )
102+ files .task_done ()
103+ await asyncio .sleep (0 )
99104
100105 async def _check_exists (self , file : File , storage : storages .iStorage ):
101106 return await storage .exists (file .hash )
@@ -161,7 +166,7 @@ class FileListManager:
161166 def __init__ (self , clusters : 'ClusterManager' ):
162167 self .clusters = clusters
163168 self .cluster_last_modified : defaultdict ['Cluster' , int ] = defaultdict (lambda : 0 )
164- self .sync_sem : utils .SemaphoreLock = utils .SemaphoreLock (10 )
169+ self .sync_sem : utils .SemaphoreLock = utils .SemaphoreLock (256 )
165170 self .download_statistics = DownloadStatistics ()
166171
167172 async def _get_filelist (self , cluster : 'Cluster' ):
@@ -205,6 +210,10 @@ async def sync(self):
205210
206211 missing = await self .clusters .storage_manager .get_missing_files (result )
207212
213+ if not missing :
214+ logger .tsuccess ("cluster.success.no_missing_files" )
215+ return
216+
208217 await self .clusters .storage_manager .available ()
209218 configurations : defaultdict [str , deque [OpenBMCLAPIConfiguration ]] = defaultdict (deque )
210219 for configuration in await asyncio .gather (* (asyncio .create_task (self ._get_configuration (cluster )) for cluster in self .clusters .clusters )):
@@ -213,7 +222,7 @@ async def sync(self):
213222 # get better configuration
214223 configuration = max (configurations .items (), key = lambda x : x [1 ][0 ].concurrency )[1 ][0 ]
215224 logger .tinfo ("cluster.info.sync_configuration" , source = configuration .source , concurrency = configuration .concurrency )
216- self .sync_sem .set_value (configuration .concurrency )
225+ # self.sync_sem.set_value(configuration.concurrency)
217226
218227 await self .download (missing )
219228
@@ -315,7 +324,7 @@ def report(self, file: File, error: Exception, resp: Optional[aiohttp.ClientResp
315324 responses .append (f"{ r .status } | { r .url } " )
316325 responses .append (f"{ resp .status } | { resp .url } " )
317326 host = resp .host
318- hash = msg [0 ] if len (msg ) > 0 else None
327+ hash = msg [0 ] if len (msg ) > 0 and type == "file" else None
319328 logger .debug (error )
320329 logger .terror (f"clusters.error.downloading" , type = type , file_hash = file .hash , file_size = units .format_bytes (file .size ), host = host , file_path = file .path , hash = hash , responses = "\n " .join (("" , * responses )))
321330
@@ -351,6 +360,8 @@ async def start(self):
351360 # check files
352361 await self .file_manager .sync ()
353362
363+ # start job
364+
354365
355366
356367class Cluster :
@@ -455,7 +466,10 @@ async def init():
455466 continue
456467 logger .tsuccess ("cluster.success.load_cluster" , id = cluster .id , host = cluster .host , port = cluster .port )
457468 clusters .add_cluster (cluster )
458-
469+ if len (clusters .clusters ) == 0 :
470+ logger .terror ("cluster.error.no_cluster" )
471+ utils .pause ()
472+ return
459473 config_storages = config_clusters = config .Config .get ("storages" )
460474 for cstorage in config_storages :
461475 type = cstorage ['type' ]
0 commit comments