@@ -183,6 +183,36 @@ def __init__(
183183 300
184184 )
185185 self ._loop = loop
186+
187+ def start_keepalive (self ):
188+ return
189+ scheduler .run_repeat_later (
190+ self .keepalive ,
191+ 2 ,
192+ 30
193+ )
194+
195+ async def send_keepalive (
196+ self ,
197+ client : SSEClient
198+ ):
199+ current = str (utils .ObjectId ())
200+ await self .send_message_client (
201+ client ,
202+ current ,
203+ "keepalive" ,
204+ current
205+ )
206+
207+
208+ async def keepalive (
209+ self
210+ ):
211+ for client in self ._connections .copy ():
212+ try :
213+ await self .send_keepalive (client )
214+ except :
215+ await self .close_client (client )
186216
187217 async def send (
188218 self ,
@@ -195,6 +225,17 @@ async def send(
195225 )
196226
197227 async def raw_send_client (
228+ self ,
229+ client : SSEClient ,
230+ data : str
231+ ):
232+ if len (data ) == 0 :
233+ return
234+ for _ in range (2 - data [- 2 :].count ("\n " )):
235+ data += "\n "
236+ await client .resp .write (data .encode ("utf-8" ))
237+
238+ async def send_message_client (
198239 self ,
199240 client : SSEClient ,
200241 id : Optional [str ] = None ,
@@ -209,8 +250,10 @@ async def raw_send_client(
209250 }.items ():
210251 if item [1 ] is not None :
211252 buffer .write (f"{ item [0 ]} : { item [1 ]} \n " )
212- buffer .write ("\n " )
213- await client .resp .write (buffer .getvalue ().encode ("utf-8" ))
253+ await self .raw_send_client (
254+ client ,
255+ buffer .getvalue ()
256+ )
214257
215258 async def send_client (
216259 self ,
@@ -224,7 +267,7 @@ async def send_client(
224267 await self .close_client (client )
225268 return
226269 try :
227- await self .raw_send_client (
270+ await self .send_message_client (
228271 client ,
229272 id = str (message .id ),
230273 event = "message" ,
@@ -236,9 +279,9 @@ async def send_client(
236279 )
237280 )
238281 except :
282+ logger .traceback ()
239283 await self .close_client (client )
240284
241-
242285 async def request (
243286 self ,
244287 request : web .Request ,
@@ -260,13 +303,22 @@ async def request(
260303 await resp .prepare (request )
261304 self ._connections .append (client )
262305 last_event_id : str = request .headers .get ("Last-Event-ID" , "0" * 24 )
263- last_id = utils .ObjectId (last_event_id )
306+ try :
307+ last_id = utils .ObjectId (last_event_id )
308+ except :
309+ last_id = utils .ObjectId ("0" * 24 )
310+
311+ #await self.send_keepalive(client)
312+
264313 for message in self ._cache .keys ():
265314 if message .generation_time <= last_id .generation_time :
266315 break
316+ msg = self ._cache .get (message )
317+ if msg is cache .EMPTY :
318+ continue
267319 await self .send_client (
268320 client ,
269- self . _cache . get ( message )
321+ msg
270322 )
271323
272324 try :
@@ -964,26 +1016,46 @@ def query_geo(
9641016 cn : bool = False
9651017):
9661018 resp_data : defaultdict [str , int ] = defaultdict (int )
1019+ since_hour = get_query_hour_tohour (day * 24 )
1020+ with db .SESSION as session :
1021+ q = session .query (db .ResponseTable ).filter (
1022+ db .ResponseTable .hour >= since_hour
1023+ ).order_by (db .ResponseTable .hour )
1024+ for item in q .all ():
1025+ wait_threads (
1026+ thread_decompress_data_and_query ,
1027+ [
1028+ (item .ip_tables , cn , resp_data )
1029+ ]
1030+ )
1031+
1032+ return resp_data
1033+
1034+
1035+ def wait_threads (
1036+ target ,
1037+ args : list [tuple [Any , ...]] = []
1038+ ):
9671039 threads : list [threading .Thread ] = []
968- for hour , data in query_addresses ( day * 24 ). items () :
1040+ for arg in args :
9691041 threads .append (threading .Thread (
970- target = thread_query_geo ,
971- args = ( data , resp_data , cn )
1042+ target = target ,
1043+ args = arg
9721044 ))
9731045 for thread in threads :
9741046 thread .start ()
9751047 for thread in threads :
9761048 thread .join ()
9771049 threads .clear ()
978- return resp_data
9791050
980- def thread_query_geo (
981- data : dict [ str , int ] ,
982- resp_data : defaultdict [ str , int ] ,
983- cn : bool
1051+ def thread_decompress_data_and_query (
1052+ origin : bytes ,
1053+ cn ,
1054+ resp_data : defaultdict [ str , int ]
9841055):
985- for ip , count in data .items ():
986- address = query_ip (ip )
1056+ data = db .decompress (origin )
1057+ for k , count in data .items ():
1058+ address = query_ip (k )
9871059 if cn and address .country == "CN" :
9881060 resp_data [address .province ] += count
9891061 else :
@@ -1068,6 +1140,7 @@ def record():
10681140
10691141async def init ():
10701142 global task , status_task
1143+ SSEEMIT .start_keepalive ()
10711144 task = threading .Thread (target = record )
10721145 task .start ()
10731146 status_task = asyncio .create_task (push_status ())
0 commit comments