1515import humanize
1616import io
1717
18- API_VERSION = Config .get ('advanced.api_version' )
19- VERSION = toml .loads (open ('pyproject.toml' , 'r' ).read ())['tool' ]['poetry' ]['version' ]
18+ API_VERSION = Config .get ("advanced.api_version" )
19+ VERSION = toml .loads (open ("pyproject.toml" , "r" ).read ())["tool" ]["poetry" ]["version" ]
20+
2021
2122class Token :
2223 def __init__ (self ) -> None :
23- self .user_agent = f'openbmclapi-cluster/{ API_VERSION } python-openbmclapi/{ VERSION } '
24- self .base_url = Config .get ('cluster.base_url' )
24+ self .user_agent = (
25+ f"openbmclapi-cluster/{ API_VERSION } python-openbmclapi/{ VERSION } "
26+ )
27+ self .base_url = Config .get ("cluster.base_url" )
2528 self .token = None
26- self .id = Config .get (' cluster.id' )
27- self .secret = Config .get (' cluster.secret' )
28- self .ttl = 0 # hours
29+ self .id = Config .get (" cluster.id" )
30+ self .secret = Config .get (" cluster.secret" )
31+ self .ttl = 0 # hours
2932 self .scheduler = None
3033 if not self .id or not self .secret :
3134 raise ClusterIdNotSetError if not self .id else ClusterSecretNotSetError
32-
35+
3336 async def fetchToken (self ):
34- logger .tinfo ('token.info.fetching' )
35- async with aiohttp .ClientSession (self .base_url , headers = {"User-Agent" : self .user_agent }) as session :
36- async with session .get ('/openbmclapi-agent/challenge' , params = {"clusterId" : self .id }) as response :
37+ logger .tinfo ("token.info.fetching" )
38+ async with aiohttp .ClientSession (
39+ self .base_url , headers = {"User-Agent" : self .user_agent }
40+ ) as session :
41+ async with session .get (
42+ "/openbmclapi-agent/challenge" , params = {"clusterId" : self .id }
43+ ) as response :
3744 response .raise_for_status ()
38- challenge = (await response .json ())['challenge' ]
39- signature = hmac .new (self .secret .encode (), challenge .encode (), digestmod = hashlib .sha256 )
40- async with session .post ('/openbmclapi-agent/token' , data = {"clusterId" : self .id , "challenge" : challenge , "signature" : signature .hexdigest ()}) as response :
45+ challenge = (await response .json ())["challenge" ]
46+ signature = hmac .new (
47+ self .secret .encode (), challenge .encode (), digestmod = hashlib .sha256
48+ )
49+ async with session .post (
50+ "/openbmclapi-agent/token" ,
51+ data = {
52+ "clusterId" : self .id ,
53+ "challenge" : challenge ,
54+ "signature" : signature .hexdigest (),
55+ },
56+ ) as response :
4157 response .raise_for_status ()
4258 res = await response .json ()
43- self .token = res [' token' ]
44- self .ttl = res [' ttl' ] / 3600000
45- logger .tsuccess (' token.success.fetched' , ttl = int (self .ttl ))
59+ self .token = res [" token" ]
60+ self .ttl = res [" ttl" ] / 3600000
61+ logger .tsuccess (" token.success.fetched" , ttl = int (self .ttl ))
4662 if self .scheduler == None :
47- self .scheduler = Scheduler .add_job (self .fetchToken ,IntervalTrigger (hours = self .ttl ))
63+ self .scheduler = Scheduler .add_job (
64+ self .fetchToken , IntervalTrigger (hours = self .ttl )
65+ )
66+
4867
4968class Cluster :
5069 def __init__ (self ) -> None :
51- self .user_agent = f'openbmclapi-cluster/{ API_VERSION } python-openbmclapi/{ VERSION } '
52- self .base_url = Config .get ('cluster.base_url' )
70+ self .user_agent = (
71+ f"openbmclapi-cluster/{ API_VERSION } python-openbmclapi/{ VERSION } "
72+ )
73+ self .base_url = Config .get ("cluster.base_url" )
5374 self .last_modified = 1000
54- self .id = Config .get (' cluster.id' )
55- self .secret = Config .get (' cluster.secret' )
75+ self .id = Config .get (" cluster.id" )
76+ self .secret = Config .get (" cluster.secret" )
5677 self .token = Token ()
5778 self .filelist = FileList (files = [])
5879 self .storages = getStorages ()
@@ -61,118 +82,157 @@ def __init__(self) -> None:
6182 self .failed_filelist = FileList (files = [])
6283
6384 async def fetchFileList (self ) -> None :
64- logger .tinfo ('cluster.info.filelist.fetching' )
65- async with aiohttp .ClientSession (self .base_url , headers = {
66- 'User-Agent' : self .user_agent ,
67- 'Authorization' : f'Bearer { self .token .token } ' ,
68- }) as session :
69- async with session .get ('/openbmclapi/files' , params = {"lastModified" : self .last_modified }) as response :
85+ logger .tinfo ("cluster.info.filelist.fetching" )
86+ async with aiohttp .ClientSession (
87+ self .base_url ,
88+ headers = {
89+ "User-Agent" : self .user_agent ,
90+ "Authorization" : f"Bearer { self .token .token } " ,
91+ },
92+ ) as session :
93+ async with session .get (
94+ "/openbmclapi/files" , params = {"lastModified" : self .last_modified }
95+ ) as response :
7096 response .raise_for_status ()
71- logger .tsuccess ('cluster.success.filelist.fetched' )
72- decompressor = zstd .ZstdDecompressor ().stream_reader (io .BytesIO (await response .read ()))
97+ logger .tsuccess ("cluster.success.filelist.fetched" )
98+ decompressor = zstd .ZstdDecompressor ().stream_reader (
99+ io .BytesIO (await response .read ())
100+ )
73101 decompressed_data = io .BytesIO (decompressor .read ())
74102 for _ in range (self .read_long (decompressed_data )):
75- self .filelist .files .append (FileInfo (
76- self .read_string (decompressed_data ),
77- self .read_string (decompressed_data ),
78- self .read_long (decompressed_data ),
79- self .read_long (decompressed_data )
80- ))
103+ self .filelist .files .append (
104+ FileInfo (
105+ self .read_string (decompressed_data ),
106+ self .read_string (decompressed_data ),
107+ self .read_long (decompressed_data ),
108+ self .read_long (decompressed_data ),
109+ )
110+ )
81111 size = sum (file .size for file in self .filelist .files )
82- logger .tsuccess ('cluster.success.filelist.parsed' , count = humanize .intcomma (len (self .filelist .files )), size = humanize .naturalsize (size , binary = True ))
112+ logger .tsuccess (
113+ "cluster.success.filelist.parsed" ,
114+ count = humanize .intcomma (len (self .filelist .files )),
115+ size = humanize .naturalsize (size , binary = True ),
116+ )
83117
84118 async def getConfiguration (self ) -> None :
85- async with aiohttp .ClientSession (self .base_url , headers = {
86- 'User-Agent' : self .user_agent ,
87- 'Authorization' : f'Bearer { self .token .token } ' ,
88- }) as session :
89- async with session .get ('/openbmclapi/configuration' ) as response :
90- self .configuration = AgentConfiguration (** (await response .json ())['sync' ])
119+ async with aiohttp .ClientSession (
120+ self .base_url ,
121+ headers = {
122+ "User-Agent" : self .user_agent ,
123+ "Authorization" : f"Bearer { self .token .token } " ,
124+ },
125+ ) as session :
126+ async with session .get ("/openbmclapi/configuration" ) as response :
127+ self .configuration = AgentConfiguration (
128+ ** (await response .json ())["sync" ]
129+ )
91130 self .semaphore = asyncio .Semaphore (self .configuration .concurrency )
92- logger .tdebug (' configuration.debug.get' , sync = self .configuration )
131+ logger .tdebug (" configuration.debug.get" , sync = self .configuration )
93132
94133 async def getMissingFiles (self ) -> FileList :
95134 with tqdm (
96- desc = locale .t (' cluster.tqdm.desc.get_missing' ),
135+ desc = locale .t (" cluster.tqdm.desc.get_missing" ),
97136 total = len (self .filelist .files ) * len (self .storages ),
98- unit = locale .t (' cluster.tqdm.unit.files' ),
99- unit_scale = True
137+ unit = locale .t (" cluster.tqdm.unit.files" ),
138+ unit_scale = True ,
100139 ) as pbar :
101140 try :
102141 files = set ()
103142 missing_files = [
104143 file
105144 for storage in self .storages
106- for file in (await storage .getMissingFiles (self .filelist , pbar )).files
145+ for file in (
146+ await storage .getMissingFiles (self .filelist , pbar )
147+ ).files
107148 if file .hash not in files and not files .add (file .hash )
108149 ]
109150 missing_filelist = FileList (files = missing_files )
110151 missing_files_count = len (missing_filelist .files )
111152 missing_files_size = sum (file .size for file in missing_filelist .files )
112- logger .tsuccess ('storage.success.get_missing' , count = humanize .intcomma (missing_files_count ), size = humanize .naturalsize (missing_files_size , binary = True ))
153+ logger .tsuccess (
154+ "storage.success.get_missing" ,
155+ count = humanize .intcomma (missing_files_count ),
156+ size = humanize .naturalsize (missing_files_size , binary = True ),
157+ )
113158 return missing_filelist
114159 except Exception as e :
115- logger .terror (' storage.error.get_missing' , e = e )
160+ logger .terror (" storage.error.get_missing" , e = e )
116161 return None
117162
118- async def syncFiles (self , missing_filelist : FileList , retry : int , delay : int ) -> None :
119- if missing_filelist .files == []:
120- logger .tinfo ('cluster.info.sync_files.skipped' )
163+ async def syncFiles (
164+ self , missing_filelist : FileList , retry : int , delay : int
165+ ) -> None :
166+ if not missing_filelist .files :
167+ logger .tinfo ("cluster.info.sync_files.skipped" )
121168 return
169+
170+ total_size = sum (file .size for file in missing_filelist .files )
171+
122172 with tqdm (
123- desc = locale .t (' cluster.tqdm.desc.sync_files' ),
124- total = sum ( file . size for file in missing_filelist . files ) ,
125- unit = 'iB' ,
173+ desc = locale .t (" cluster.tqdm.desc.sync_files" ),
174+ total = total_size ,
175+ unit = "iB" ,
126176 unit_scale = True ,
127- unit_divisor = 1024
177+ unit_divisor = 1024 ,
128178 ) as pbar :
129- delay = Config .get ('advanced.delay' )
130- retry = Config .get ('advanced.retry' )
131- async with aiohttp .ClientSession (self .base_url , headers = {
132- 'User-Agent' : self .user_agent ,
133- 'Authorization' : f'Bearer { self .token .token } ' ,
134- }) as session :
179+ async with aiohttp .ClientSession (
180+ self .base_url ,
181+ headers = {
182+ "User-Agent" : self .user_agent ,
183+ "Authorization" : f"Bearer { self .token .token } " ,
184+ },
185+ ) as session :
135186 self .failed_filelist = FileList (files = [])
136- tasks = []
137- for file in missing_filelist .files :
138- tasks .append (asyncio .create_task (self .downloadFile (file , session , pbar )))
187+ tasks = [
188+ asyncio .create_task (self .downloadFile (file , session , pbar ))
189+ for file in missing_filelist .files
190+ ]
139191 await asyncio .gather (* tasks )
140- if self .failed_filelist .files == None :
141- logger .tsuccess ('cluster.success.sync_files.downloaded' )
142- return
143- else :
144- if retry == 1 :
145- logger .terror ('cluster.error.sync_files.failed' )
146- return
147- else :
148- logger .terror ('cluster.error.sync_files.retry' , retry = delay )
149- await asyncio .sleep (delay )
150- await self .syncFiles (self .failed_filelist , retry - 1 , delay )
151192
193+ if not self .failed_filelist .files :
194+ logger .tsuccess ("cluster.success.sync_files.downloaded" )
195+ elif retry > 1 :
196+ logger .terror ("cluster.error.sync_files.retry" , retry = delay )
197+ await asyncio .sleep (delay )
198+ await self .syncFiles (self .failed_filelist , retry - 1 , delay )
199+ else :
200+ logger .terror ("cluster.error.sync_files.failed" )
152201
153- async def downloadFile (self , file : FileInfo , session : aiohttp .ClientSession , pbar : tqdm ) -> None :
202+ async def downloadFile (
203+ self , file : FileInfo , session : aiohttp .ClientSession , pbar : tqdm
204+ ) -> None :
154205 async with self .semaphore :
155- delay = Config .get ('advanced.delay' )
156- retry = Config .get ('advanced.retry' )
206+ delay = Config .get ("advanced.delay" )
207+ retry = Config .get ("advanced.retry" )
208+
157209 for _ in range (retry ):
158210 try :
159211 response = await session .get (file .path )
160212 content = await response .read ()
161- results = [await storage .writeFile (file , io .BytesIO (content ), delay , retry ) for storage in self .storages ]
213+ results = await asyncio .gather (
214+ * (
215+ storage .writeFile (file , io .BytesIO (content ), delay , retry )
216+ for storage in self .storages
217+ )
218+ )
162219 if all (results ):
163220 pbar .update (len (content ))
164221 return
165222 except Exception as e :
166- if _ == retry - 1 :
167- logger .terror ('cluster.error.download_file.failed' )
168- else :
169- logger .terror ('cluster.error.download_file.retry' , file = file .hash , e = e , retry = delay )
223+ logger .terror (
224+ "cluster.error.download_file.retry" ,
225+ file = file .hash ,
226+ e = e ,
227+ retry = delay ,
228+ )
170229 await asyncio .sleep (delay )
171- self .failed_filelist .files .append (file )
172-
230+ logger .terror ("cluster.error.download_file.failed" )
231+ self .failed_filelist .files .append (file )
232+
173233 async def init (self ) -> None :
174234 await asyncio .gather (* (storage .init () for storage in self .storages ))
175-
235+
176236 async def checkStorages (self ) -> bool :
177237 results = await asyncio .gather (* (storage .check () for storage in self .storages ))
178238 return all (results )
0 commit comments