1+ import abc
12import asyncio
23from collections import defaultdict , deque
34from dataclasses import asdict , dataclass
@@ -32,7 +33,7 @@ class File:
3233
3334 def __hash__ (self ) -> int :
3435 return hash ((self .hash , self .size , self .mtime ))
35-
36+
3637class StorageManager :
3738 def __init__ (self , clusters : 'ClusterManager' ):
3839 self .clusters = clusters
@@ -111,6 +112,51 @@ async def _check_size(self, file: File, storage: storages.iStorage):
111112 async def _check_hash (self , file : File , storage : storages .iStorage ):
112113 return await self ._check_exists (file , storage ) and utils .equals_hash (file .hash , await storage .read_file (file .hash ))
113114
115+ async def get_file (self , hash : str ):
116+ file = None
117+ if self .available ():
118+ storage = self .get_width_storage ()
119+ if isinstance (storage , storages .LocalStorage ) and await storage .exists (hash ):
120+ return LocalStorageFile (
121+ hash ,
122+ await storage .get_size (hash ),
123+ await storage .get_mtime (hash ),
124+ Path (storage .get_path (hash ))
125+ )
126+ elif isinstance (storage , storages .AlistStorage ):
127+ ...
128+ if file is None :
129+ async with aiohttp .ClientSession (
130+ config .const .base_url ,
131+ headers = {
132+ "User-Agent" : USER_AGENT ,
133+ "Authorization" : await self .clusters .clusters [0 ].get_token ()
134+ }
135+ ) as session :
136+ async with session .get (
137+ f"/openbmclapi/download/{ hash } "
138+ ) as resp :
139+ file = MemoryStorageFile (
140+ hash ,
141+ resp .content_length or - 1 ,
142+ time .time (),
143+ await resp .content .read ()
144+ )
145+ return file
146+
147+ def get_width_storage (self , c_storage : Optional [storages .iStorage ] = None ) -> storages .iStorage :
148+ current_storage = self .available_storages [0 ]
149+ if current_storage .current_width > current_storage .width :
150+ current_storage .current_width += 1
151+ return current_storage
152+ else :
153+ self .available_storages .remove (current_storage )
154+ self .available_storages .append (current_storage )
155+ if c_storage is not None :
156+ return c_storage
157+ return self .get_width_storage (c_storage = c_storage or current_storage )
158+
159+
114160@dataclass
115161class OpenBMCLAPIConfiguration :
116162 source : str
@@ -360,14 +406,21 @@ async def start(self):
360406 # check files
361407 await self .file_manager .sync ()
362408
363- # start job
364-
365- # remote certificate
366409 # start web ssl
367410 cert = await self .get_certificate ()
368411 await web .start_ssl_server (
369412 cert .cert , cert .key
370413 )
414+ public_port = config .const .public_port
415+ public_host = cert .host
416+
417+ logger .tdebug ("cluster.debug.public_host" , host = public_host , port = public_port )
418+
419+ # start job
420+
421+ for cluster in self .clusters :
422+ await cluster .enable ()
423+
371424
372425 async def get_certificate (self ):
373426 await asyncio .gather (* [cluster .request_cert () for cluster in self .clusters ])
@@ -396,6 +449,8 @@ def __init__(self, id: str, secret: str, port: int, public_port: int = -1, host:
396449 self .token_ttl : float = 0
397450 self .token_scheduler : Optional [int ] = None
398451 self .socket_io = ClusterSocketIO (self )
452+ self .want_enable : bool = False
453+ self .enabled = True
399454
400455 def __repr__ (self ):
401456 return f"Cluster(id={ self .id } , host={ self .host } , port={ self .port } , public_port={ self .public_port } , byoc={ self .byoc } , cert={ self .cert } , key={ self .key } )"
@@ -461,6 +516,29 @@ async def request_cert(self):
461516 cert .write (result .ack ["cert" ])
462517 key .write (result .ack ["key" ])
463518
519+ async def enable (self ):
520+ cert = await clusters .get_certificate ()
521+ if self .want_enable :
522+ return
523+ self .want_enable = True
524+ result = await self .socket_io .emit (
525+ "enable" , {
526+ "host" : cert .host ,
527+ "port" : config .const .public_port ,
528+ "byoc" : True ,
529+ "version" : API_VERSION ,
530+ "noFastEnable" : False ,
531+ "flavor" : {
532+ "storage" : "local" ,
533+ "runtime" : "python"
534+ }
535+ }
536+ )
537+ self .want_enable = False
538+ if result .err :
539+ self .socketio_error ("enable" , result )
540+ return
541+ self .enabled = True
464542 @property
465543 def certificate (self ):
466544 return ClusterCertificate (
@@ -469,6 +547,13 @@ def certificate(self):
469547 Path (config .const .ssl_dir ) / f"{ self .id } _key.pem"
470548 )
471549
550+ def socketio_error (self , type : str , result : 'SocketIOEmitResult' ):
551+ err = result .err
552+ if "message" in err :
553+ logger .terror ("cluster.error.socketio" , type = type , id = self .id , err = err ["message" ])
554+ else :
555+ logger .terror ("cluster.error.socketio" , type = type , id = self .id , err = err )
556+
472557class ClusterSocketIO :
473558 def __init__ (self , cluster : Cluster ) -> None :
474559 self .cluster = cluster
@@ -507,8 +592,8 @@ async def emit(self, event: str, data: Any = {}, timeout: Optional[float] = None
507592 fut = asyncio .get_event_loop ().create_future ()
508593
509594 async def callback (data : tuple [Any , Any ]):
510- fut .set_result (SocketIOEmitResult (data [0 ], data [1 ]))
511-
595+ fut .set_result (SocketIOEmitResult (data [0 ], data [1 ] if len ( data ) > 1 else None ))
596+ print ( data )
512597 await self .sio .emit (
513598 event , data , callback = callback
514599 )
@@ -535,6 +620,31 @@ class SocketIOEmitResult:
535620 err : Any
536621 ack : Any
537622
623+ class StorageFile (metaclass = abc .ABCMeta ):
624+ type : str = "abstract"
625+ def __init__ (self , hash : str , size : int , mtime : float ) -> None :
626+ self .hash = hash
627+ self .size = size
628+ self .mtime = int (mtime * 1000 )
629+
630+ class LocalStorageFile (StorageFile ):
631+ type : str = "local"
632+ def __init__ (self , hash : str , size : int , mtime : float , path : Path ) -> None :
633+ super ().__init__ (hash = hash , size = size , mtime = mtime )
634+ self .path = path
635+
636+ class URLStorageFile (StorageFile ):
637+ type : str = "url"
638+ def __init__ (self , hash : str , size : int , mtime : float , url : str ) -> None :
639+ super ().__init__ (hash = hash , size = size , mtime = mtime )
640+ self .url = url
641+
642+ class MemoryStorageFile (StorageFile ):
643+ type : str = "memory"
644+ def __init__ (self , hash : str , size : int , mtime : float , data : bytes ) -> None :
645+ super ().__init__ (hash = hash , size = size , mtime = mtime )
646+ self .data = data
647+
538648ROOT = Path (__file__ ).parent .parent
539649
540650API_VERSION = "1.11.0"
@@ -584,14 +694,73 @@ async def init():
584694 for cstorage in config_storages :
585695 type = cstorage ['type' ]
586696 if type == "local" :
587- storage = storages .LocalStorage (cstorage ['path' ])
697+ storage = storages .LocalStorage (cstorage ['path' ], cstorage [ 'width' ] )
588698 elif type == "alist" :
589- storage = storages .AlistStorage (cstorage ['path' ], cstorage ['url' ], cstorage ['username' ], cstorage ['password' ])
699+ storage = storages .AlistStorage (cstorage ['path' ], cstorage ['width' ], cstorage [ ' url' ], cstorage ['username' ], cstorage ['password' ])
590700 else :
591701 logger .terror ("cluster.error.unspported_storage" , type = type , path = cstorage ['path' ])
592702 continue
593703 clusters .storage_manager .add_storage (storage )
594704
595705 scheduler .run_later (
596706 clusters .start , 0
597- )
707+ )
708+
709+ def check_sign (hash : str , s : str , e : str ):
710+ if not config .const .check_sign :
711+ return True
712+ return any (
713+ utils .check_sign (hash , cluster .secret , s , e ) for cluster in clusters .clusters
714+ )
715+
716+
717+ routes = web .routes
718+ aweb = web .web
719+
720+ @routes .get ("/measure/{size}" )
721+ async def _ (request : aweb .Request ):
722+ try :
723+ size = int (request .match_info ["size" ])
724+ query = request .query
725+ s = query .get ("s" , "" )
726+ e = query .get ("e" , "" )
727+ if not check_sign (request .match_info ["size" ], s , e ):
728+ return aweb .Response (status = 403 )
729+ response = aweb .StreamResponse (
730+ status = 200 ,
731+ reason = "OK" ,
732+ headers = {
733+ "Content-Length" : str (size * 1024 * 1024 ),
734+ "Content-Type" : "application/octet-stream" ,
735+ },
736+ )
737+ await response .prepare (request )
738+ for _ in range (size ):
739+ await response .write (b'\x00 ' * 1024 * 1024 )
740+ await response .write_eof ()
741+ return response
742+ except :
743+ return aweb .Response (status = 400 )
744+
745+ @routes .get ("/download/{hash}" )
746+ async def _ (request : aweb .Request ):
747+ try :
748+ hash = request .match_info ["hash" ]
749+ query = request .query
750+ s = query .get ("s" , "" )
751+ e = query .get ("e" , "" )
752+ if not check_sign (request .match_info ["hash" ], s , e ):
753+ return aweb .Response (status = 403 )
754+ file = await clusters .storage_manager .get_file (hash )
755+ if isinstance (file , LocalStorageFile ):
756+ return aweb .FileResponse (
757+ file .path
758+ )
759+ elif isinstance (file , MemoryStorageFile ):
760+ return aweb .Response (
761+ body = file .data
762+ )
763+ else :
764+ aweb .Response (status = 403 )
765+ except :
766+ return aweb .Response (status = 400 )
0 commit comments