Skip to content

Commit

Permalink
perf(adapter, event): 使用链队列代替列表作为事件队列,优化 get() 方法的性能
Browse files Browse the repository at this point in the history
fix(adapter): 修复 get() 方法超时后,适配器接收到下一个事件不会被传播处理的 bug
  • Loading branch information
st1020 committed Aug 12, 2021
1 parent 35534ec commit fecc7b1
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 11 deletions.
89 changes: 79 additions & 10 deletions alicebot/adapter/__init__.py
Expand Up @@ -8,7 +8,7 @@
import asyncio
from functools import wraps
from abc import ABC, abstractmethod
from typing import Awaitable, List, Callable, TypeVar, Union, Optional, TYPE_CHECKING
from typing import Awaitable, Callable, TypeVar, Union, Optional, TYPE_CHECKING

from alicebot.log import logger
from alicebot.exception import AdapterTimeout
Expand All @@ -20,6 +20,77 @@
T_Adapter = TypeVar('T_Adapter', bound='AbstractAdapter')


class EventQueue:
"""
事件队列。
使用限定长度的链队列。
"""
def __init__(self, max_len=None):
self._head: ['T_Event'] = None
self._rear: ['T_Event'] = None
self._len: int = 0

self.max_len = max_len

def __len__(self):
return self._len

def push(self, event: 'T_Event') -> None:
if self._head is None:
self._head = event
self._rear = event
self._len = 1
return
self._rear.next_event = event
self._rear = event
self._len += 1
while self._len > self.max_len:
self.pop()

def pop(self):
if self._head is None:
raise ValueError('EventQueue is empty.')
temp = self._head
self._head = self._head.next_event
temp.next_event = None
if self._head is None:
self._rear = None
self._len -= 1
return temp

def top(self) -> ['T_Event']:
return self._head

def append(self, obj: 'T_Event'):
self.push(obj)

def clear(self):
self._head = None
self._rear = None
self._len = None

def index(self, obj: 'T_Event', start=None, end=None):
for index, item in enumerate(self):
if item == obj and (start is None or start <= index) and (end is None or index <= end):
return index
raise ValueError(f'{obj} is not in EventQueue')

def __contains__(self, item):
try:
self.index(item)
except ValueError:
return False
else:
return True

def __iter__(self):
point = self._head
while point.next_event is not None:
yield point
point = point.next_event
yield point


class AbstractAdapter(ABC):
"""
协议适配器基类。
Expand All @@ -32,7 +103,7 @@ class AbstractAdapter(ABC):
"""
当前的机器人对象。
"""
event_queue: List['T_Event'] = []
event_queue: EventQueue
"""
事件队列,用于 ``get()`` 方法。
"""
Expand All @@ -46,6 +117,7 @@ class AbstractAdapter(ABC):
"""

def __init__(self, bot: 'Bot'):
self.event_queue = EventQueue(max_len=self.max_event_queue_len)
self.bot: 'Bot' = bot

async def safe_run(self):
Expand Down Expand Up @@ -85,9 +157,7 @@ def handle_event(self, event: 'T_Event'):
:param event: 待处理的事件。
"""
logger.info(f'Adapter {self.__class__.__name__} received: {event!r}')
self.event_queue.append(event)
if len(self.event_queue) > self.max_event_queue_len:
self.event_queue.pop(0)
self.event_queue.push(event)
if self.wait_for_get:
self.wait_for_get = False
else:
Expand Down Expand Up @@ -132,12 +202,10 @@ def while_condition():

try_times = 0
start_time = time.time()
event = current_event
while while_condition():
try:
index = self.event_queue.index(current_event)
except ValueError:
raise ValueError('can not find current_event')
for event in self.event_queue[index + 1:]:
while event.next_event is not None:
event = event.next_event
if not event.handled:
if await func(event):
event.handled = True
Expand All @@ -149,5 +217,6 @@ def while_condition():
await asyncio.sleep(0)
try_times += 1

self.wait_for_get = False
if not self.bot.should_exit:
raise AdapterTimeout
7 changes: 6 additions & 1 deletion alicebot/event/__init__.py
Expand Up @@ -29,7 +29,12 @@ class Event(ABC, BaseModel):
handled: bool = False
"""
表示事件是否被处理过了,用于适配器处理。
注意:请勿手动更改此属性的值。
警告:请勿手动更改此属性的值。
"""
next_event: Any = None
"""
下一个事件,用于适配器处理。
警告:请勿手动更改此属性的值。
"""

def __str__(self) -> str:
Expand Down

0 comments on commit fecc7b1

Please sign in to comment.