11import inspect
2+ from io import BytesIO
23import tempfile
34import time
45from typing import Any
78import anyio .to_thread
89import aioboto3
910import urllib .parse as urlparse
11+
12+ from core import logger
1013from . import abc
1114import cachetools
1215
@@ -60,9 +63,8 @@ def __init__(
6063 self .custom_s3_host = kwargs .get ("custom_s3_host" , "" )
6164 self .public_endpoint = kwargs .get ("public_endpoint" , "" )
6265 self .session = aioboto3 .Session ()
63- self .list_lock = anyio .Lock ()
64- self .cache_list_bucket : dict [str , abc .FileInfo ] = {}
65- self .last_cache : float = 0
66+ self ._cache : cachetools .TTLCache [str , abc .ResponseFile ] = cachetools .TTLCache (maxsize = 10000 , ttl = 60 )
67+ self ._cache_files : cachetools .TTLCache [str , abc .FileInfo ] = cachetools .TTLCache (maxsize = 10000 , ttl = 60 )
6668 self ._config = {
6769 "endpoint_url" : self .endpoint ,
6870 "aws_access_key_id" : self .access_key ,
@@ -80,11 +82,6 @@ async def setup(
8082
8183 self .task_group .start_soon (self ._check )
8284
83- async def list_bucket (
84- self ,
85- ):
86- ...
87-
8885 async def list_files (
8986 self ,
9087 path : str
@@ -148,31 +145,61 @@ async def upload(
148145 obj = await bucket .Object (str (self .path / path ))
149146 await obj .upload_fileobj (tmp_file )
150147 return True
151-
148+
152149
153150 async def _check (
154151 self ,
155152 ):
156153 while 1 :
157154 try :
158- await self .list_bucket ()
155+ async with self .session .resource (
156+ "s3" ,
157+ endpoint_url = self .endpoint ,
158+ aws_access_key_id = self .access_key ,
159+ aws_secret_access_key = self .secret_key ,
160+ region_name = self .region
161+ ) as resource :
162+ bucket = await resource .Bucket (self .bucket )
163+ obj = await bucket .Object (str (self .path / ".py_check" ))
164+ await obj .upload_fileobj (BytesIO (str (time .perf_counter_ns ()).encode ()))
165+ await obj .delete ()
159166 self .online = True
160167 except :
161168 self .online = False
169+ logger .traceback ()
162170 finally :
163171 self .emit_status ()
164- await anyio .sleep (300 )
172+ await anyio .sleep (60 )
165173
166174 async def get_response_file (self , hash : str ) -> abc .ResponseFile :
167175 cpath = str (self .path / "download" / hash [:2 ] / hash )
168- if cpath not in self .cache_list_bucket :
169- return abc .ResponseFile (
170- 0
171- )
176+ fileinfo = self ._cache_files .get (hash )
177+
178+ if fileinfo is None :
179+ async with self .session .resource (
180+ "s3" ,
181+ endpoint_url = self .endpoint ,
182+ aws_access_key_id = self .access_key ,
183+ aws_secret_access_key = self .secret_key ,
184+ region_name = self .region
185+ ) as resource :
186+ bucket = await resource .Bucket (self .bucket )
187+ obj = await bucket .Object (cpath )
188+ info = await obj .get ()
189+ fileinfo = abc .FileInfo (
190+ name = hash ,
191+ size = info ["ContentLength" ],
192+ path = cpath
193+ )
194+ self ._cache_files [hash ] = fileinfo
195+
196+ if fileinfo is None :
197+ return abc .ResponseFileNotFound ()
198+
172199 if self .custom_s3_host :
173200 return abc .ResponseFileRemote (
174201 f"{ self .custom_s3_host } { cpath } " ,
175- self . cache_list_bucket [ cpath ] .size
202+ fileinfo .size
176203 )
177204 if self .public_endpoint :
178205 async with self .session .client (
@@ -204,7 +231,7 @@ async def get_response_file(self, hash: str) -> abc.ResponseFile:
204231 )
205232 return abc .ResponseFileRemote (
206233 url ,
207- self . cache_list_bucket [ cpath ] .size
234+ fileinfo .size
208235 )
209236 async with self .session .resource (
210237 "s3" ,
0 commit comments