diff --git a/Lib/common.py b/Lib/common.py index a67cbca..215f0a9 100644 --- a/Lib/common.py +++ b/Lib/common.py @@ -6,7 +6,6 @@ import sys import threading import time -import traceback import uuid from collections import OrderedDict from io import BytesIO @@ -222,14 +221,32 @@ def save_exc_dump(description: str = None, path: str = None): description: 保存的dump描述,为空则默认 path: 保存的路径,为空则自动根据错误生成 """ + # 扫描是否存在非当前日期且为归档的exc_dump + exc_dump_files = [ + file for file in os.listdir(DUMPS_PATH) if file.startswith("coredumpy_") and file.endswith(".dump") + ] + + today_date = time.strftime("%Y%m%d") + date_flags = [] + + for file in exc_dump_files: + file_date = file.split("coredumpy_", 1)[1].split("_", 1)[0][:len("YYYYMMDD")] + if file_date != today_date: + os.makedirs(os.path.join(DUMPS_PATH, f"coredumpy_archive_{file_date}"), exist_ok=True) + os.rename(os.path.join(DUMPS_PATH, file), os.path.join(DUMPS_PATH, f"coredumpy_archive_{file_date}", file)) + if file_date not in date_flags: + logger.info(f"已自动归档 {file_date} 的异常堆栈到 coredumpy_archive_{file_date}") + date_flags.append(file_date) + + # 保存dump文件 try: import coredumpy except ImportError: logger.warning("coredumpy未安装,无法保存异常堆栈") - return + return None try: - exc_type, exc_value, exc_traceback = sys.exc_info() + _, _, exc_traceback = sys.exc_info() if not exc_traceback: raise Exception("No traceback found") @@ -245,12 +262,12 @@ def save_exc_dump(description: str = None, path: str = None): if i > 0: path_ = os.path.join(DUMPS_PATH, f"coredumpy_" - f"{time.strftime('%Y%m%d%H%M%S')}_" + f"{time.strftime('%Y%m%d%-H%M%S')}_" f"{frame.f_code.co_name}_{i}.dump") else: path_ = os.path.join(DUMPS_PATH, f"coredumpy_" - f"{time.strftime('%Y%m%d%H%M%S')}_" + f"{time.strftime('%Y%m%d-%H%M%S')}_" f"{frame.f_code.co_name}.dump") if not os.path.exists(path_): break @@ -270,8 +287,7 @@ def save_exc_dump(description: str = None, path: str = None): coredumpy.dump(**kwargs) except Exception as e: - logger.error(f"保存异常堆栈时发生错误: {repr(e)}\n" - f"{traceback.format_exc()}") + logger.error(f"保存异常堆栈时发生错误: {repr(e)}", exc_info=True) return None return kwargs["path"] diff --git a/Lib/core/EventManager.py b/Lib/core/EventManager.py index 871bf77..ab3a85e 100644 --- a/Lib/core/EventManager.py +++ b/Lib/core/EventManager.py @@ -1,16 +1,15 @@ """ 事件管理器,用于管理事件与事件监听器 """ -import traceback -from typing import Any, TypeVar - +import inspect from collections.abc import Callable from dataclasses import dataclass, field +from typing import Any, TypeVar from Lib.core.ThreadPool import async_task from Lib.core import ConfigManager from Lib.utils import Logger -import inspect +from Lib.common import save_exc_dump logger = Logger.get_logger() @@ -26,6 +25,7 @@ class Hook(_Event): """ 钩子事件,用于在事件处理过程中跳过某些监听器 """ + def __init__(self, event, listener): self.event = event self.listener = listener @@ -36,17 +36,21 @@ def call(self): """ if self.__class__ in event_listeners: for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): - if not ConfigManager.GlobalConfig().debug.enable: - try: - res = listener.func(self, **listener.kwargs) - except Exception as e: - logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") - continue - else: + try: res = listener.func(self, **listener.kwargs) + except Exception as e: + if ConfigManager.GlobalConfig().debug.save_dump: + dump_path = save_exc_dump(f"监听器中发生错误") + else: + dump_path = None + logger.error(f"监听器中发生错误: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) + continue if res is True: return True return False + return None T = TypeVar('T', bound='_Event') @@ -64,7 +68,7 @@ class EventListener: def __post_init__(self): # 确保监听器函数至少有一个参数 - assert len(inspect.signature(self.func).parameters) >= 1, "The listener takes at least 1 parameter" + assert len(inspect.signature(self.func).parameters) >= 1, "监听器至少接受 1 个参数" # 定义监听器的类型和存储 @@ -81,7 +85,8 @@ def event_listener(event_class: type[T], priority: int = 0, **kwargs): priority: 优先级,默认为0 **kwargs: 附加参数 """ - assert issubclass(event_class, _Event), "Event class must be a subclass of Event" + if not issubclass(event_class, _Event): + raise TypeError("event_class 类必须是 _Event 的子类") def wrapper(func: Callable[[T, ...], Any]): # 注册事件监听器 @@ -92,6 +97,40 @@ def wrapper(func: Callable[[T, ...], Any]): return wrapper +def unregister_listener(event_class: type[T], func: Callable[[T, ...], Any]): + """ + 用于取消注册监听器 + 注意,会删除所有与给定函数匹配的监听器。 + + Args: + event_class: 事件类型 + func: 监听器函数 + """ + if not issubclass(event_class, _Event): + raise TypeError("event_class 类必须是 _Event 的子类") + + listeners_list = event_listeners.get(event_class) + + if not listeners_list: + raise ValueError(f"事件类型 {event_class.__name__} 没有已注册的监听器。") + + # 查找所有与给定函数匹配的监听器对象 + listeners_to_remove = [listener for listener in listeners_list if listener.func == func] + + if not listeners_to_remove: + # 如果没有找到匹配的函数 + raise ValueError(f"未找到函数 {func.__name__} 对应的监听器,无法为事件 {event_class.__name__} 注销。") + + # 移除所有找到的监听器 + removed_count = 0 + for listener_obj in listeners_to_remove: + listeners_list.remove(listener_obj) + removed_count += 1 + + if not listeners_list: + del event_listeners[event_class] + + class Event(_Event): """ 基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件 @@ -108,16 +147,19 @@ def call(self): res_list = [] for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): if self._call_hook(listener): - logger.debug(f"Skipped listener: {listener.func.__name__}") + logger.debug(f"由 Hook 跳过监听器: {listener.func.__name__}") continue - if not ConfigManager.GlobalConfig().debug.enable: - try: - res = listener.func(self, **listener.kwargs) - except Exception as e: - logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") - continue - else: + try: res = listener.func(self, **listener.kwargs) + except Exception as e: + if ConfigManager.GlobalConfig().debug.save_dump: + dump_path = save_exc_dump(f"监听器中发生错误") + else: + dump_path = None + logger.error(f"监听器中发生错误: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) + continue res_list.append(res) @async_task diff --git a/Lib/core/OnebotAPI.py b/Lib/core/OnebotAPI.py index e6a6e7c..0c19549 100644 --- a/Lib/core/OnebotAPI.py +++ b/Lib/core/OnebotAPI.py @@ -128,9 +128,9 @@ def get(self, node, data: dict = None, original: bool = None): else: dump_path = None logger.error( - f"调用 API: {node} data: {data} 异常: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" + f"调用 API: {node} data: {data} 异常: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True ) raise e diff --git a/Lib/core/PluginManager.py b/Lib/core/PluginManager.py index f4c9d12..6e55dac 100644 --- a/Lib/core/PluginManager.py +++ b/Lib/core/PluginManager.py @@ -6,7 +6,6 @@ import importlib import inspect import sys -import traceback from Lib.common import save_exc_dump from Lib.constants import * @@ -61,8 +60,7 @@ def load_plugin(plugin): logger.debug(f"尝试加载: {import_path}") module = importlib.import_module(import_path) except ImportError as e: - logger.error(f"加载 {import_path} 失败: {repr(e)}\n" - f"{traceback.format_exc()}") + logger.error(f"加载 {import_path} 失败: {repr(e)}", exc_info=True) raise plugin_info = None @@ -133,9 +131,9 @@ def load_plugins(): dump_path = save_exc_dump(f"尝试加载插件 {full_path} 时失败") else: dump_path = None - logger.error(f"尝试加载插件 {full_path} 时失败! 原因:{repr(e)}\n" - f"{"".join(traceback.format_exc())}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + logger.error(f"尝试加载插件 {full_path} 时失败! 原因:{repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) continue logger.debug(f"插件 {name}({full_path}) 加载成功!") @@ -188,9 +186,9 @@ def requirement_plugin(plugin_name: str): dump_path = save_exc_dump(f"尝试加载被依赖的插件 {plugin_name} 时失败!") else: dump_path = None - logger.error(f"尝试加载被依赖的插件 {plugin_name} 时失败! 原因:{repr(e)}\n" - f"{"".join(traceback.format_exc())}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + logger.error(f"尝试加载被依赖的插件 {plugin_name} 时失败! 原因:{repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) raise e logger.debug(f"由于插件依赖,插件 {plugin_name} 加载成功!") else: diff --git a/Lib/core/ThreadPool.py b/Lib/core/ThreadPool.py index 949a2b9..cdbd0f6 100644 --- a/Lib/core/ThreadPool.py +++ b/Lib/core/ThreadPool.py @@ -3,16 +3,13 @@ Created by BigCookie233 """ -import sys -import traceback +import atexit from concurrent.futures import ThreadPoolExecutor +from Lib.common import save_exc_dump from Lib.core import ConfigManager from Lib.core.ConfigManager import GlobalConfig from Lib.utils.Logger import get_logger -from Lib.common import save_exc_dump - -import atexit thread_pool = None logger = get_logger() @@ -50,10 +47,11 @@ def _wrapper(func, *args, **kwargs): dump_path = None # 打印到日志中 logger.error( - f"Error in async task({func.__module__}.{func.__name__}): {repr(e)}\n" - f"{"".join(traceback.format_exc())}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" + f"Error in async task({func.__module__}.{func.__name__}): {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True ) + return None def async_task(func): diff --git a/Lib/utils/Actions.py b/Lib/utils/Actions.py index 47cba1a..98407fd 100644 --- a/Lib/utils/Actions.py +++ b/Lib/utils/Actions.py @@ -2,8 +2,6 @@ 操作 """ -import traceback - from Lib.common import save_exc_dump from Lib.core import OnebotAPI, ThreadPool, ConfigManager from Lib.utils import QQRichText, Logger, QQDataCacher @@ -110,8 +108,8 @@ def set_callback(self, callback: Callable[[Result], ...]): else: dump_path = None logger.warning(f"执行回调函数异常: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) return self def call(self): @@ -133,9 +131,9 @@ def call(self): dump_path = save_exc_dump(f"调用日志记录函数异常") else: dump_path = None - logger.warning(f"调用日志记录函数异常: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + logger.warning(f"调用日志记录函数异常: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) if self.callback is not None: try: self.callback(self._result) @@ -144,9 +142,9 @@ def call(self): dump_path = save_exc_dump(f"执行回调函数异常") else: dump_path = None - logger.warning(f"回调函数异常: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + logger.warning(f"回调函数异常: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) return self def logger(self, *args, **kwargs): diff --git a/Lib/utils/EventClassifier.py b/Lib/utils/EventClassifier.py index ebda116..4d846bb 100644 --- a/Lib/utils/EventClassifier.py +++ b/Lib/utils/EventClassifier.py @@ -166,6 +166,7 @@ def logger(self): f"{self.message.render()}" f"({self.message_id})" ) + return None elif self.sub_type == "group": logger.info( @@ -182,6 +183,7 @@ def logger(self): f"{self.message.render()}" f"({self.message_id})" ) + return None elif self.sub_type == "other": logger.info( @@ -195,6 +197,7 @@ def logger(self): f"{self.message.render()}" f"({self.message_id})" ) + return None else: return super().logger() @@ -227,6 +230,7 @@ def logger(self): f"{self.message.render(group_id=self.group_id)}" f"({self.message_id})" ) + return None elif self.sub_type == "anonymous": anonymous_data = self.get('anonymous', {}) @@ -242,6 +246,7 @@ def logger(self): f"{self.message.render(group_id=self.group_id)}" f"({self.message_id})" ) + return None elif self.sub_type == "notice": logger.info( @@ -252,6 +257,7 @@ def logger(self): f"{self.message.render(group_id=self.group_id)}" f"({self.message_id})" ) + return None else: return super().logger() diff --git a/Lib/utils/EventHandlers.py b/Lib/utils/EventHandlers.py index 5960851..55a4507 100644 --- a/Lib/utils/EventHandlers.py +++ b/Lib/utils/EventHandlers.py @@ -2,15 +2,13 @@ 事件处理器 """ import copy -import traceback +import inspect +from typing import Literal, Callable, Any, Type from Lib.common import save_exc_dump from Lib.core import EventManager, ConfigManager, PluginManager from Lib.utils import EventClassifier, Logger, QQRichText, StateManager -import inspect -from typing import Literal, Callable, Any, Type - logger = Logger.get_logger() @@ -29,6 +27,16 @@ def match(self, event_data: EventClassifier.Event): """ pass + def __and__(self, other: "Rule"): + if not isinstance(other, Rule): + raise TypeError("other must be a Rule") + return AllRule(self, other) + + def __or__(self, other: "Rule"): + if not isinstance(other, Rule): + raise TypeError("other must be a Rule") + return AnyRule(self, other) + class AnyRule(Rule): """ @@ -89,14 +97,15 @@ def match(self, event_data: EventClassifier.Event): return self.value not in event_data.get(self.key) case "func": return self.func(event_data.get(self.key), self.value) + return None except Exception as e: if ConfigManager.GlobalConfig().debug.save_dump: dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") else: dump_path = None - logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"已保存异常到 {dump_path}" if dump_path else ""}") + logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) return False @@ -122,9 +131,9 @@ def match(self, event_data: EventClassifier.Event): else: dump_path = None - logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"已保存异常到 {dump_path}" if dump_path else ""}" + logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True ) return False @@ -197,7 +206,7 @@ def match(self, event_data: EventClassifier.MessageEvent): if ( len(segments) > 0 and isinstance(segments[0], QQRichText.At) and - int(segments[0].data.get("qq")) == event_data.self_id + str(segments[0].data.get("qq")) == str(event_data.self_id) ): segments = segments[1:] is_at = True @@ -274,11 +283,12 @@ def _to_me(event_data: EventClassifier.MessageEvent): if not isinstance(event_data, EventClassifier.MessageEvent): logger.warning(f"event {event_data} is not a MessageEvent, cannot match to_me") return False - if isinstance(event_data, EventClassifier.PrivateMessageEvent): + if event_data.message_type == "private": return True - if isinstance(event_data, EventClassifier.GroupMessageEvent): + if event_data.message_type == "group": for rich in event_data.message.rich_array: - if isinstance(rich, QQRichText.At) and int(rich.data.get("qq")) == event_data.self_id: + if (isinstance(rich, QQRichText.At) and str(rich.data.get("qq")) == + str(ConfigManager.GlobalConfig().account.user_id)): return True return False @@ -332,11 +342,13 @@ def match(self, event_data: EventClassifier.Event, plugin_data: dict): for name, param in sig.parameters.items(): if name == "state": - if (isinstance(event_data, EventClassifier.MessageEvent) or - isinstance(event_data, EventClassifier.PrivateMessageEvent)): - state_id = f"u{event_data.user_id}" - elif isinstance(event_data, EventClassifier.GroupMessageEvent): - state_id = f"g{event_data.group_id}_u{event_data.user_id}" + if isinstance(event_data, EventClassifier.MessageEvent): + if event_data.message_type == "private": + state_id = f"u{event_data.user_id}" + elif event_data.message_type == "group": + state_id = f"g{event_data["group_id"]}_u{event_data.user_id}" + else: + raise TypeError("event_data.message_type must be private or group") else: raise TypeError("event_data must be a MessageEvent") handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) @@ -364,9 +376,9 @@ def match(self, event_data: EventClassifier.Event, plugin_data: dict): else: dump_path = None logger.error( - f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" + f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True ) diff --git a/Lib/utils/Logger.py b/Lib/utils/Logger.py index c158fc1..2e48e79 100644 --- a/Lib/utils/Logger.py +++ b/Lib/utils/Logger.py @@ -1,7 +1,7 @@ """ 日志记录器 """ - +import inspect import logging import logging.handlers as handlers import sys @@ -10,7 +10,8 @@ import coloredlogs -logger: logging.Logger = None +logger_instance: logging.Logger = None # 重命名全局变量以区分 +FRAMEWORK_LOGGER_NAME = "murainbot" def init(logs_path: str = LOGS_PATH, logger_level: int = logging.INFO): @@ -22,10 +23,10 @@ def init(logs_path: str = LOGS_PATH, logger_level: int = logging.INFO): Returns: None """ - global logger + global logger_instance - if logger is not None: - return logger + if logger_instance is not None: + return logger_instance # 日志颜色 log_colors = { "DEBUG": "white", @@ -40,14 +41,14 @@ def init(logs_path: str = LOGS_PATH, logger_level: int = logging.INFO): "levelname": {"color": "white"} } # 日志格式 - fmt = "[%(asctime)s] [%(filename)s] [%(levelname)s]: %(message)s" + fmt = "[%(asctime)s] [%(name)s] [%(levelname)s]: %(message)s" # 设置日志 coloredlogs.install(isatty=True, stream=sys.stdout, field_styles=log_field_styles, fmt=fmt, colors=log_colors) # 设置文件日志 - logger = logging.getLogger() + logger_instance = logging.getLogger() - logger.setLevel(logger_level) + logger_instance.setLevel(logger_level) coloredlogs.set_level(logger_level) log_name = "latest.log" @@ -73,8 +74,8 @@ def namer(filename): file_handler.namer = namer file_handler.suffix = "%Y-%m-%d.log" file_handler.setFormatter(logging.Formatter(fmt)) - logger.addHandler(file_handler) - return logger + logger_instance.addHandler(file_handler) + return logger_instance def set_logger_level(level: int): @@ -85,17 +86,43 @@ def set_logger_level(level: int): Returns: None """ - global logger - logger.setLevel(level) + global logger_instance + logger_instance.setLevel(level) coloredlogs.set_level(level) -def get_logger(): +def get_logger(name: str | None = None): """ 获取日志记录器 Returns: Logger """ - if not logger: + + if name is None: + try: + frame = inspect.currentframe().f_back + # 从栈帧的全局变量中获取 __name__ + module_name = frame.f_globals.get('__name__') + + if module_name and isinstance(module_name, str): + if module_name == "__main__": + logger_name = FRAMEWORK_LOGGER_NAME + elif module_name.startswith("Lib"): + logger_name = FRAMEWORK_LOGGER_NAME + module_name[3:] + elif module_name.startswith("plugins"): + logger_name = FRAMEWORK_LOGGER_NAME + "." + module_name + else: + logger_name = module_name + else: + logger_name = FRAMEWORK_LOGGER_NAME + except Exception: + logger_name = FRAMEWORK_LOGGER_NAME + elif isinstance(name, str): + logger_name = f"{FRAMEWORK_LOGGER_NAME}.{name}" + else: + logger_name = FRAMEWORK_LOGGER_NAME + + if not logger_instance: init() - return logger + + return logging.getLogger(logger_name) diff --git a/Lib/utils/QQRichText.py b/Lib/utils/QQRichText.py index 4c18810..48cfc76 100644 --- a/Lib/utils/QQRichText.py +++ b/Lib/utils/QQRichText.py @@ -1,14 +1,14 @@ """ QQ富文本 """ +from __future__ import annotations + import inspect import json -import traceback -from typing import Any from urllib.parse import urlparse -from Lib.constants import * from Lib.common import save_exc_dump +from Lib.constants import * from Lib.core import ConfigManager from Lib.utils import QQDataCacher, Logger @@ -49,7 +49,7 @@ def cq_encode(text, in_cq: bool = False) -> str: replace("]", "]") -def cq_2_array(cq: str) -> list[dict[str, dict[str, Any]]]: +def cq_2_array(cq: str) -> list[dict[str, dict[str, str]]]: """ 将CQCode格式的字符串转换为消息段数组。 @@ -57,72 +57,154 @@ def cq_2_array(cq: str) -> list[dict[str, dict[str, Any]]]: cq (str): CQCode字符串。 Returns: - list[dict[str, dict[str, Any]]]: 解析后的消息段数组。 + list[dict[str, dict[str, str]]]: 解析后的消息段数组。 + + Raises: + TypeError: 如果输入类型不是字符串。 + ValueError: 如果解析过程中遇到格式错误,包含错误位置信息。 """ if not isinstance(cq, str): raise TypeError("cq_2_array: 输入类型错误") - cq_array = [] # 存储解析后的消息段 + cq_array = [] now_state = 0 # 当前解析状态 - # 0: 不在CQ码内 - # 1: 在CQ码的类型部分 - # 2: 在CQ码的参数键部分 - # 3: 在CQ码的参数值部分 - - segment_data = {"text": ""} # 用于存储当前解析的消息段 - now_key = "" # 当前解析的参数键 - now_segment_type = "" # 当前解析的CQ码类型 + # 0: 不在CQ码内 (初始/普通文本) + # 1: 在CQ码内,正在解析类型 (包括验证 [CQ: 前缀) + # 2: 在CQ码内,正在解析参数键 (key) + # 3: 在CQ码内,正在解析参数值 (value) + + segment_data = {"text": ""} # 存储当前普通文本段 + current_cq_data = {} # 存储当前 CQ 码的 data 部分 + now_key = "" + now_value = "" # 使用 now_value 暂存值,避免直接操作 current_cq_data[now_key] + now_segment_type = "" # 存储当前 CQ 码的完整类型部分 (包括 CQ:) 或处理后的类型 + cq_start_pos = -1 # 记录当前 CQ 码 '[' 的位置 + + for i, c in enumerate(cq): + error_context = f"在字符 {i} ('{c}') 附近" + cq_error_context = f"在起始于字符 {cq_start_pos} 的 CQ 码中,{error_context}" - for c in cq: if now_state == 0: # 解析普通文本 - if c == "[": # 进入CQ码解析 - now_state = 1 - if len(segment_data["text"]): # 先存储之前的普通文本 + if cq_start_pos == -1: # 文本块开始 + cq_start_pos = i + if c == "[": + # 遇到可能的 CQ 码开头,先保存之前的文本 + if len(segment_data["text"]): cq_array.append({"type": "text", "data": {"text": cq_decode(segment_data["text"])}}) - segment_data = {} # 重置segment_data用于存储新的CQ码 + segment_data = {"text": ""} # 重置文本段 + + # 记录起始位置,进入状态 1 + cq_start_pos = i + now_state = 1 + # 重置当前 CQ 码的临时变量 + now_segment_type = "" # 开始累积类型部分 + current_cq_data = {} + now_key = "" + now_value = "" + elif c == "]": + raise ValueError(f"cq_2_array: {error_context}: 文本块中包含非法字符: ']'") else: segment_data["text"] += c # 继续拼接普通文本 - elif now_state == 1: # 解析CQ码类型 + elif now_state == 1: # 解析类型 (包含 [CQ: 前缀) if c == ",": # 类型解析结束,进入参数键解析 - now_state = 2 - now_segment_type = now_segment_type[3:] # 移除CQ:前缀 + if not now_segment_type.startswith("CQ:"): + raise ValueError(f"cq_2_array: {cq_error_context}: 期望 'CQ:' 前缀,但得到 '{now_segment_type}'") + + actual_type = now_segment_type[3:] + if not actual_type: + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码类型不能为空") + + now_segment_type = actual_type # 保存处理后的类型名 + now_state = 2 # 进入参数键解析状态 + now_key = "" # 准备解析第一个键 + elif c == "]": # 类型解析结束,无参数 CQ 码结束 + if not now_segment_type.startswith("CQ:"): + # 如果不是 CQ: 开头,根据严格程度,可以报错或当作普通文本处理 + # 这里我们严格处理,既然进入了状态1,就必须是 CQ: 开头 + raise ValueError(f"cq_2_array: {cq_error_context}: 期望 'CQ:' 前缀,但得到 '{now_segment_type}'") + + actual_type = now_segment_type[3:] + if not actual_type: + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码类型不能为空") + + # 存入无参数的 CQ 码段 + cq_array.append({"type": actual_type, "data": {}}) # data 为空 + now_state = 0 # 回到初始状态 + cq_start_pos = -1 # 重置 + elif c == '[': # 类型名中不应包含未转义的 '[' + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码类型 '{now_segment_type}' 中包含非法字符 '['") else: - now_segment_type += c # 继续拼接类型字符串 + # 继续拼接类型部分 (此时包含 CQ:) + now_segment_type += c - elif now_state == 2: # 解析参数键 + elif now_state == 2: # 解析参数键 (key) if c == "=": # 键名解析结束,进入值解析 - now_state = 3 - now_key = cq_decode(now_key, in_cq=True) # 解码键名 - if now_key not in segment_data: - segment_data[now_key] = "" # 初始化键值 - else: - raise ValueError("cq_2_array: CQ码键名称重复") + if not now_key: + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码参数键不能为空") + + # 检查键名重复 (键名通常不解码,或按需解码) + # decoded_key = cq_decode(now_key, in_cq=True) # 如果键名需要解码 + decoded_key = now_key # 假设键名不解码 + if decoded_key in current_cq_data: + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码参数键 '{decoded_key}' 重复") + + now_key = decoded_key # 保存解码后(或原始)的键名 + now_state = 3 # 进入参数值解析状态 + now_value = "" # 准备解析值 + elif c == "," or c == "]": # 在键名后遇到逗号或方括号是错误的 + raise ValueError(f"cq_2_array: {cq_error_context}: 在参数键 '{now_key}' 后期望 '=',但遇到 '{c}'") + elif c == '[': # 键名中不应包含未转义的 '[' (根据规范,& 和 , 也应转义,但这里简化检查) + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码参数键 '{now_key}' 中包含非法字符 '['") else: now_key += c # 继续拼接键名 - elif now_state == 3: # 解析参数值 - if c == "]": # CQ码结束 - now_state = 0 - segment_data[now_key] = cq_decode(segment_data[now_key], in_cq=True) # 解码值 - cq_array.append({"type": now_segment_type, "data": segment_data}) # 存入解析结果 - segment_data = {"text": ""} # 重置segment_data - now_segment_type = "" # 清空类型 - now_key = "" # 清空键名 - elif c == ",": # 进入下一个参数键解析 - segment_data[now_key] = cq_decode(segment_data[now_key], in_cq=True) # 解码值 - now_state = 2 - now_key = "" # 清空键名,准备解析下一个键 + elif now_state == 3: # 解析参数值 (value) + if c == ",": # 当前值结束,进入下一个键解析 + # 解码当前值并存入 + current_cq_data[now_key] = cq_decode(now_value, in_cq=True) + now_state = 2 # 回到解析键的状态 + now_key = "" # 重置键,准备解析下一个 + # now_value 不需要在这里重置,进入状态 2 后,遇到 = 进入状态 3 时会重置 + elif c == "]": # 当前值结束,整个 CQ 码结束 + # 解码当前值并存入 + current_cq_data[now_key] = cq_decode(now_value, in_cq=True) + # 存入带参数的 CQ 码段 + cq_array.append({"type": now_segment_type, "data": current_cq_data}) + now_state = 0 # 回到初始状态 + cq_start_pos = -1 # 重置 + elif c == '[': # 值中不应出现未转义的 '[' + raise ValueError(f"cq_2_array: {cq_error_context}: CQ 码参数值 '{now_value}' 中包含非法字符 '['") else: - segment_data[now_key] += c # 继续拼接参数值 + now_value += c # 继续拼接值 (转义由 cq_decode 处理) + + # --- 循环结束后检查 --- + final_error_context = f"在字符串末尾" + if now_state != 0: + if cq_start_pos != -1: + # 根据当前状态给出更具体的错误提示 + if now_state == 1: + error_detail = f"类型部分 '{now_segment_type}' 未完成" + elif now_state == 2: + error_detail = f"参数键 '{now_key}' 未完成或缺少 '='" + elif now_state == 3: + error_detail = f"参数值 '{now_value}' 未结束" + else: # 理论上不会有其他状态 + error_detail = f"解析停留在未知状态 {now_state}" + raise ValueError( + f"cq_2_array: {final_error_context},起始于字符 {cq_start_pos} 的 CQ 码未正确结束 ({error_detail})") + else: + # 如果 cq_start_pos 是 -1 但状态不是 0,说明逻辑可能出错了 + raise ValueError(f"cq_2_array: {final_error_context},解析器状态异常 ({now_state}) 但未记录 CQ 码起始位置") - if "text" in segment_data and len(segment_data["text"]): # 处理末尾可能存在的文本内容 + # 处理末尾可能剩余的普通文本 + if len(segment_data["text"]): cq_array.append({"type": "text", "data": {"text": cq_decode(segment_data["text"])}}) return cq_array -def array_2_cq(cq_array: list | dict) -> str: +def array_2_cq(cq_array: list[dict[str, dict[str, str]]] | dict[str, dict[str, str]]) -> str: """ array消息段转CQCode Args: @@ -140,17 +222,47 @@ def array_2_cq(cq_array: list | dict) -> str: # 将json形式的富文本转换为CQ码 text = "" for segment in cq_array: - # 纯文本 - if segment.get("type") == "text": - text += cq_encode(segment.get("data").get("text")) + segment_type = segment.get("type") + if not isinstance(segment_type, str): + # 或者根据需求跳过这个 segment + raise ValueError(f"array_2_cq: 消息段缺少有效的 'type': {segment}") + + # 文本 + if segment_type == "text": + data = segment.get("data") + if not isinstance(data, dict): + raise ValueError(f"array_2_cq: 'text' 类型的消息段缺少有效的 'data' 字典: {segment}") + text_content = data.get("text") + if not isinstance(text_content, str): + raise ValueError(f"array_2_cq: 'text' 类型的消息段 'data' 字典缺少有效的 'text' 字符串: {segment}") + text += cq_encode(text_content) # CQ码 else: - if segment.get("data"): # 特判 - text += f"[CQ:{segment.get('type')}," + ",".join( - [cq_encode(x, in_cq=True) + "=" + cq_encode(segment.get("data")[x], in_cq=True) - for x in segment.get("data").keys()]) + "]" - else: - text += f"[CQ:{segment.get('type')}]" + cq_type_str = f"[CQ:{segment_type}" + data = segment.get("data") + if isinstance(data, dict) and data: # data 存在且是包含内容的字典 + params = [] + for key, value in data.items(): + if not isinstance(key, str): + raise ValueError( + f"array_2_cq: '{segment_type}' 类型的消息段 'data' 字典的键 '{key}' 不是字符串") + if value is None: + continue + if isinstance(value, bool): + value = "1" if value else "0" + if not isinstance(value, str): + try: + value = str(value) + except Exception as e: + raise ValueError(f"array_2_cq: '{segment_type}' 类型的消息段 " + f"'data' 字典的键 '{key}' 的值 '{value}' 无法被转换: {repr(e)}") + params.append(f"{cq_encode(key, in_cq=True)}={cq_encode(value, in_cq=True)}") + if params: + text += cq_type_str + "," + ",".join(params) + "]" + else: # 如果 data 非空但过滤后 params 为空(例如 data 里全是 None 值) + text += cq_type_str + "]" + else: # data 不存在、为 None 或为空字典 {} + text += cq_type_str + "]" return text @@ -206,26 +318,24 @@ class Segment(metaclass=SegmentMeta): """ segment_type = None - def __init__(self, cq): + def __init__(self, cq: str | dict[str, dict[str, str]] | "Segment"): self.cq = cq if isinstance(cq, str): - self.array = cq_2_array(cq)[0] - self.type, self.data = list(self.array.values()) + self.array = cq_2_array(cq) + if len(self.array) != 1: + raise ValueError("cq_2_array: 输入 CQ 码格式错误") + self.array = self.array[0] elif isinstance(cq, dict): self.array = cq - self.cq = array_2_cq(self.array) - self.type, self.data = list(self.array.values()) else: for segment in segments: if isinstance(cq, segment): self.array = cq.array - self.cq = str(self.cq) - # print(self.array.values(), list(self.array.values())) - self.type, self.data = list(self.array.values()) break else: - # print(cq, str(cq), type(cq)) raise TypeError("Segment: 输入类型错误") + self.type = self.array["type"] + self.data = self.array.get("data", {}) def __str__(self): return self.__repr__() @@ -341,25 +451,25 @@ class Face(Segment): """ segment_type = "face" - def __init__(self, face_id): + def __init__(self, id_): """ Args: - face_id: 表情id + id_: 表情id """ - self.face_id = face_id - super().__init__({"type": "face", "data": {"id": str(face_id)}}) + self.id_ = id_ + super().__init__({"type": "face", "data": {"id": str(id_)}}) - def set_id(self, face_id): + def set_id(self, id_): """ 设置表情id Args: - face_id: 表情id + id_: 表情id """ - self.face_id = face_id - self.array["data"]["id"] = str(face_id) + self.id_ = id_ + self.array["data"]["id"] = str(id_) def render(self, group_id: int | None = None): - return "[表情: %s]" % self.face_id + return f"[表情: {self.id_}]" class At(Segment): @@ -488,14 +598,14 @@ class Rps(Segment): segment_type = "rps" def __init__(self): - super().__init__({"type": "rps"}) + super().__init__({"type": "rps", "data": {}}) class Dice(Segment): segment_type = "dice" def __init__(self): - super().__init__({"type": "dice"}) + super().__init__({"type": "dice", "data": {}}) class Shake(Segment): @@ -506,7 +616,7 @@ class Shake(Segment): segment_type = "shake" def __init__(self): - super().__init__({"type": "shake"}) + super().__init__({"type": "shake", "data": {}}) class Poke(Segment): @@ -515,33 +625,33 @@ class Poke(Segment): """ segment_type = "poke" - def __init__(self, type_, poke_id): + def __init__(self, type_, id_): """ Args: type_: 见https://github.com/botuniverse/onebot-11/blob/master/message/segment.md#%E6%88%B3%E4%B8%80%E6%88%B3 - poke_id: 同上 + id_: 同上 """ self.type = type_ - self.poke_id = poke_id - super().__init__({"type": "poke", "data": {"type": str(self.type)}, "id": str(self.poke_id)}) + self.id_ = id_ + super().__init__({"type": "poke", "data": {"type": str(self.type)}, "id": str(self.id_)}) - def set_type(self, qq_type): + def set_type(self, type_): """ 设置戳一戳类型 Args: - qq_type: qq类型 + type_: 见https://github.com/botuniverse/onebot-11/blob/master/message/segment.md#%E6%88%B3%E4%B8%80%E6%88%B3 """ - self.type = qq_type - self.array["data"]["type"] = str(qq_type) + self.type = type_ + self.array["data"]["type"] = str(type_) - def set_id(self, poke_id): + def set_id(self, id_): """ 设置戳一戳id Args: - poke_id: 戳一戳id + id_: 戳一戳id """ - self.poke_id = poke_id - self.array["data"]["id"] = str(poke_id) + self.id_ = id_ + self.array["data"]["id"] = str(id_) def render(self, group_id: int | None = None): return f"[戳一戳: {self.type}]" @@ -736,6 +846,7 @@ class Node(Segment): """ 合并转发消息节点 接收时,此消息段不会直接出现在消息事件的 message 中,需通过 get_forward_msg API 获取。 + 这是最阴间的消息段之一,tm的Onebot协议,各种转换的细节根本就没定义清楚,感觉CQ码的支持就像后加的,而且纯纯草台班子 """ segment_type = "node" @@ -790,6 +901,14 @@ def render(self, group_id: int | None = None): else: return f"[合并转发节点: {self.message_id}]" + def __repr__(self): + """ + 去tm的CQ码 + Raises: + NotImplementedError: 暂不支持此方法 + """ + raise NotImplementedError("不支持将Node转成CQ码") + class Music(Segment): """ @@ -906,25 +1025,25 @@ class Reply(Segment): """ segment_type = "reply" - def __init__(self, message_id): + def __init__(self, id_): """ Args: - message_id: 回复消息 ID + id_: 回复消息 ID """ - self.message_id = message_id - super().__init__({"type": "reply", "data": {"id": str(self.message_id)}}) + self.id_ = id_ + super().__init__({"type": "reply", "data": {"id": str(self.id_)}}) - def set_message_id(self, message_id): + def set_id(self, id_): """ 设置消息 ID Args: - message_id: 消息 ID + id_: 消息 ID """ - self.message_id = message_id - self.array["data"]["id"] = str(self.message_id) + self.id_ = id_ + self.array["data"]["id"] = str(self.id_) def render(self, group_id: int | None = None): - return f"[回复: {self.message_id}]" + return f"[回复: {self.id_}]" class Forward(Segment): @@ -933,25 +1052,25 @@ class Forward(Segment): """ segment_type = "forward" - def __init__(self, forward_id): + def __init__(self, id_): """ Args: - forward_id: 合并转发消息 ID + id_: 合并转发消息 ID """ - self.forward_id = forward_id - super().__init__({"type": "forward", "data": {"id": str(self.forward_id)}}) + self.id_ = id_ + super().__init__({"type": "forward", "data": {"id": str(self.id_)}}) - def set_forward_id(self, forward_id): + def set_id(self, id_): """ - 设置合并转发消息 ID + 设置合并转发 ID Args: - forward_id: 合并转发消息 ID + id_: 合并转发消息 ID """ - self.forward_id = forward_id - self.array["data"]["id"] = str(self.forward_id) + self.id_ = id_ + self.array["data"]["id"] = str(self.id_) def render(self, group_id: int | None = None): - return f"[合并转发: {self.forward_id}]" + return f"[合并转发: {self.id_}]" class XML(Segment): @@ -1011,7 +1130,11 @@ class QQRichText: QQ富文本 """ - def __init__(self, *rich: str | dict | list | tuple | Segment): + def __init__( + self, + *rich: dict[str, dict[str, str]] | str | Segment | "QQRichText" | list[ + dict[str, dict[str, str]] | str | Segment | "QQRichText"] + ): """ Args: *rich: 富文本内容,可为 str、dict、list、tuple、Segment、QQRichText @@ -1019,47 +1142,30 @@ def __init__(self, *rich: str | dict | list | tuple | Segment): # 特判 self.rich_array: list[Segment] = [] - if len(rich) == 1: + + if len(rich) == 1 and isinstance(rich[0], (list, tuple)): rich = rich[0] # 识别输入的是CQCode or json形式的富文本 # 如果输入的是CQCode,则转换为json形式的富文本 # 处理CQCode - if isinstance(rich, str): - rich_string = rich - rich = cq_2_array(rich_string) - - elif isinstance(rich, dict): - rich = [rich] - elif isinstance(rich, (list, tuple)): - array = [] - for item in rich: - if isinstance(item, dict): - array.append(item) - elif isinstance(item, str): - array += cq_2_array(item) - else: - for segment in segments: - if isinstance(item, segment): - array.append(item.array) - break - else: - if isinstance(rich, QQRichText): - array += rich.rich_array - else: - raise TypeError("QQRichText: 输入类型错误") - rich = array - else: - for segment in segments: - if isinstance(rich, segment): - rich = [rich.array] - break + array = [] + for item in rich: + if isinstance(item, dict): + array.append(item) + elif isinstance(item, str): + array += cq_2_array(item) + elif isinstance(item, QQRichText): + array += item.rich_array else: - if isinstance(rich, QQRichText): - rich = rich.rich_array + for segment in segments: + if isinstance(item, segment): + array.append(item.array) + break else: raise TypeError("QQRichText: 输入类型错误") + rich = array # 将rich转换为的Segment for i in range(len(rich)): @@ -1071,15 +1177,7 @@ def __init__(self, *rich: str | dict | list | tuple | Segment): if param in rich[i]["data"]: kwargs[param] = rich[i]["data"][param] else: - if rich[i]["type"] == "reply" and param == "message_id": - kwargs[param] = rich[i]["data"].get("id") - elif rich[i]["type"] == "face" and param == "face_id": - kwargs[param] = rich[i]["data"].get("id") - elif rich[i]["type"] == "forward" and param == "forward_id": - kwargs[param] = rich[i]["data"].get("id") - elif rich[i]["type"] == "poke" and param == "poke_id": - kwargs[param] = rich[i]["data"].get("id") - elif param == "id_": + if param == "id_": kwargs[param] = rich[i]["data"].get("id") elif param == "type_": kwargs[param] = rich[i]["data"].get("type") @@ -1097,9 +1195,9 @@ def __init__(self, *rich: str | dict | list | tuple | Segment): dump_path = save_exc_dump(f"转换{rich[i]}时失败") else: dump_path = None - Logger.get_logger().warning(f"转换{rich[i]}时失败,报错信息: {repr(e)}\n" - f"{traceback.format_exc()}" - f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}") + Logger.get_logger().warning(f"转换{rich[i]}时失败,报错信息: {repr(e)}" + f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}", + exc_info=True) rich[i] = Segment(rich[i]) else: rich[i] = Segment(rich[i]) @@ -1118,7 +1216,7 @@ def render(self, group_id: int | None = None): return text def __str__(self): - self.rich_string = array_2_cq(self.rich_array) + self.rich_string = array_2_cq(self.get_array()) return self.rich_string def __repr__(self): @@ -1145,9 +1243,9 @@ def __contains__(self, other): except (TypeError, AttributeError): return False - def get_array(self): + def get_array(self) -> list[dict[str, dict[str, str]]]: """ - 获取rich_array(非抽象类,可用于API调用等) + 获取rich_array的纯数组形式(用于序列化) Returns: rich_array """ @@ -1186,7 +1284,9 @@ def add(self, *segments): print(rich) print(rich.render()) - print(QQRichText(At(114514))) + print(QQRichText(rich)) + + print(QQRichText(At(114514), At(1919810), "114514", Reply(133).array)) print(Segment(At(1919810))) print(QQRichText([{"type": "text", "data": {"text": "1919810"}}])) print(QQRichText().add(At(114514)).add(Text("我吃柠檬")) + QQRichText(At(1919810)).rich_array) @@ -1194,3 +1294,37 @@ def add(self, *segments): rich = QQRichText(rich_array) print(rich) print(rich.get_array()) + + print("--- 正确示例 ---") + print(cq_2_array("你好[CQ:face,id=123]世界[CQ:image,file=abc.jpg,url=http://a.com/b?c=1&d=2]")) + print(cq_2_array("[CQ:shake]")) + print(cq_2_array("只有文本")) + print(cq_2_array("[CQ:at,qq=123][CQ:at,qq=456]")) + + print("\n--- 错误示例 ---") + # 触发不同类型的 ValueError + error_inputs = [ + "文本[CQ:face,id=123", # 未闭合 (类型 3 结束) + "文本[CQ:face,id]", # 缺少= + "文本[CQ:,id=123]", # 类型为空 + "文本[NotCQ:face,id=123]", # 非 CQ: 开头 + "文本[:face,id=123]", # 非 CQ: 开头 (更具体) + "文本[CQ:face,id=123,id=456]", # 重复键 + "文本[CQ:face,,id=123]", # 多余逗号 (会导致空键名错误) + "文本[CQ:fa[ce,id=123]", # 类型中非法字符 '[' + "文本[CQ:face,ke[y=value]", # 键中非法字符 '[' + "文本[CQ:face,key=val]ue]", # 文本中非法字符 ']' + "[", # 未闭合 (类型 1 结束) + "[CQ", # 未闭合 (类型 1 结束) + "[CQ:", # 未闭合 (类型 1 结束) + "[CQ:type,", # 未闭合 (类型 2 结束) + "[CQ:type,key", # 未闭合 (类型 2 结束) + "[CQ:type,key=", # 未闭合 (类型 3 结束) + "[CQ:type,key=value" # 未闭合 (类型 2 结束) + ] + for err_cq in error_inputs: + try: + print(f"\nTesting: {err_cq}") + cq_2_array(err_cq) + except ValueError as e: + print(f"捕获到错误: {e}") diff --git a/README.md b/README.md index 8f234ff..3e39452 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@
-![MuRainBot2](https://socialify.git.ci/MuRainBot/MuRainBot2/image?custom_description=%E5%9F%BA%E4%BA%8Epython%E9%80%82%E9%85%8Donebot11%E5%8D%8F%E8%AE%AE%E7%9A%84%E8%BD%BB%E9%87%8F%E7%BA%A7QQBot%E6%A1%86%E6%9E%B6&description=1&forks=1&issues=1&logo=https%3A%2F%2Fgithub.com%2FMuRainBot%2FMuRainBot2Doc%2Fblob%2Fmaster%2Fdocs%2Fpublic%2Ficon.png%3Fraw%3Dtrue&name=1&pattern=Overlapping+Hexagons&pulls=1&stargazers=1&theme=Auto) +![MuRainBot2](https://socialify.git.ci/MuRainBot/MuRainBot2/image?custom_description=%E5%9F%BA%E4%BA%8Epython%E9%80%82%E9%85%8Donebot11%E5%8D%8F%E8%AE%AE%E7%9A%84%E8%BD%BB%E9%87%8F%E7%BA%A7OnebotBot%E6%A1%86%E6%9E%B6&description=1&forks=1&issues=1&logo=https%3A%2F%2Fgithub.com%2FMuRainBot%2FMuRainBot2Doc%2Fblob%2Fmaster%2Fdocs%2Fpublic%2Ficon.png%3Fraw%3Dtrue&name=1&pattern=Overlapping+Hexagons&pulls=1&stargazers=1&theme=Auto) GitHub license @@ -18,7 +18,7 @@ ## 🤔 概述 -MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的**轻量级** QQ 机器人框架。 +MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的轻量级开发框架。 它专注于提供稳定高效的核心事件处理与 API 调用能力,所有具体功能(如关键词回复、群管理等)均通过插件实现,赋予开发者高度的灵活性。 @@ -37,7 +37,7 @@ MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的**轻量 ## 🚨 重要提醒:关于重构与兼容性 > [!CAUTION] -> **请注意:** 本项目在 2024 年底至 2025 年初进行了一次 **彻底的框架重构**(主要涉及 `dev` 分支并在 2025年1月29日 合并至 `master`)。 +> **请注意:** 本项目在 2024 年底至 2025 年初进行了一次 **彻底的重构**(主要涉及 `dev` 分支并在 2025年1月29日 合并至 `master`)。 > > **当前的 MRB2 版本与重构前的旧版本插件完全不兼容。** 如果您是旧版本用户或拥有旧插件,请参考 **[最新文档](https://mrb2.xiaosu.icu)** 进行适配迁移。 @@ -45,16 +45,16 @@ MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的**轻量 * **MRB2:** MuRainBot2 的缩写。 * **OneBot v11 协议:** 一个广泛应用于即时通讯软件中的聊天机器人的应用层协议标准,本项目基于此标准开发。详情请见 [OneBot v11](https://11.onebot.dev/)。 -* **框架 (SDK):** MRB2 作为一个 OneBot SDK(或称开发框架),负责处理与 OneBot 实现端的通信、事件分发、API 调用封装等底层工作,以及提供插件系统,让开发者可以专注于插件功能的实现。更多通用术语可参考 [OneBot v12 术语表](https://12.onebot.dev/glossary/) (v11 与 v12 大体相通)。 +* **框架:** MRB2 作为一个 OneBot 开发框架,负责处理与 OneBot 实现端的通信、事件分发、API 调用封装等底层工作,以及提供插件系统,让开发者可以专注于插件功能的实现。更多通用术语可参考 [OneBot v12 术语表](https://12.onebot.dev/glossary/) (v11 与 v12 大体相通)。 * **插件:** MRB2 的所有功能都由插件提供。插件通常是放置在 `plugins` 目录下的 Python 文件或包含 `__init__.py` 的 Python 包。 -~~*什么?你问我为什么要叫MRB2,因为这个框架最初是给我的一个叫做沐雨的qqbot写的,然后之前还有[一个写的很垃圾](https://github.com/xiaosuyyds/PyQQbot)的版本,所以就叫做MRB2*~~ +~~*什么?你问我为什么要叫MRB2,因为这个框架最初是给我的一个叫做沐雨的bot写的,然后之前还有[一个写的很垃圾](https://github.com/xiaosuyyds/PyQQbot)的版本,所以就叫做MRB2*~~ ## 🐛 问题反馈 如果使用时遇到问题,请按以下步骤操作: -1. 将框架版本更新到 [`dev`](https://github.com/MuRainBot/MuRainBot2/tree/dev) 分支 +1. 将框架版本更新到 [`dev`](https://github.com/MuRainBot/MuRainBot2/tree/dev) 分支(可选,但推荐) 2. 将 `config.yml` 中的 `debug.enable` 设置为 `true`。 3. 复现您遇到的 Bug。 4. **检查 Onebot 实现端的日志**,确认问题是否源于实现端本身。如果是,请向您使用的实现端反馈。 @@ -62,7 +62,9 @@ MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的**轻量 * 请准备**完整**的 MRB2 日志文件 (`logs` 目录下)。您可以自行遮挡日志中的 QQ 号、群号等敏感信息。 * 提供清晰的错误描述、复现步骤。 * 如果开启了 `save_dump` 且生成了 dump 文件,可以一并提供。(不强制,但是推荐提供,不过需要注意可以检查一下是否包含apikey等敏感信息) - * 将以上信息提交到项目的 [**Issues**](https://github.com/MuRainBot/MuRainBot2/issues/new/choose) 页面。 + * 将当前使用的MRB2版本、日志、错误描述、复现步骤,以及dump文件(可选),提交到项目的 [**Issues**](https://github.com/MuRainBot/MuRainBot2/issues/new/choose) 页面。 + +如果不遵守以上要求,您的问题可能会被关闭或无视。 ## 📁 目录结构 @@ -113,7 +115,7 @@ MuRainBot2 (MRB2) 是一个基于 Python、适配 OneBot v11 协议的**轻量 ## ❤️ 鸣谢 ❤️ -**贡献指南:** 我们欢迎各种形式的贡献!请将您的 Pull Request 提交到 `dev` 分支。我们会定期将 `dev` 分支的稳定更新合并到 `master` 分支。 +**贡献指南:** 我们欢迎各种形式的贡献!包括 Issues 和 Pull Request,您可以向我们反馈 bug 提供建议,也可以通过 PR 直接帮我们编写代码来实现功能或者修复bug。请将您的 Pull Request 提交到 `dev` 分支。我们会定期将 `dev` 分支的稳定更新合并到 `master` 分支。 **感谢所有为 MRB2 付出努力的贡献者!** diff --git a/main.py b/main.py index 9aacb60..e59be83 100644 --- a/main.py +++ b/main.py @@ -72,9 +72,9 @@ def print_loading(wait_str): banner_start_color = (14, 190, 255) banner_end_color = (255, 66, 179) banner = BANNER.split("\n") + color_banner = "" # 输出banner for i in range(len(banner)): - color_banner = "" for j in range(len(banner[i])): color_banner += color_text( banner[i][j], @@ -84,13 +84,13 @@ def print_loading(wait_str): ((j / (len(banner[i]) - 1) + i / (len(banner) - 1)) / 2) ) ) - print(color_banner, flush=True) - time.sleep(0.05) + color_banner += "\n" + + print(color_banner.strip()) # 输出项目链接 for c in color_text(BANNER_LINK, get_gradient(banner_start_color, banner_end_color, 0.5)): - print(c, end="", flush=True) - time.sleep(0.02) + print(c, end="") wait_str = color_text("正在加载 Lib, 首次启动可能需要几秒钟,请稍等...", banner_start_color) print("\n" + wait_str, end="") diff --git a/plugins/Helper.py b/plugins/Helper.py index db4e8e0..39306f6 100644 --- a/plugins/Helper.py +++ b/plugins/Helper.py @@ -45,23 +45,27 @@ def get_help_text(): rule = EventHandlers.CommandRule("help", aliases={"帮助"}) -matcher = EventHandlers.on_event(EventClassifier.GroupMessageEvent, priority=0, rules=[rule]) +matcher = EventHandlers.on_event(EventClassifier.MessageEvent, priority=0, rules=[rule]) @matcher.register_handler() -def on_help(event_data): +def on_help(event_data: EventClassifier.MessageEvent): """ 帮助命令处理 """ - if event_data.message == "help": + cmd = str(event_data.message).strip().split(" ", 1) + if len(cmd) == 1: Actions.SendMsg( message=QQRichText.QQRichText( QQRichText.Reply(event_data["message_id"]), - get_help_text() - ), group_id=event_data["group_id"] + QQRichText.Text(get_help_text()) + ), + **{"group_id": event_data["group_id"]} + if event_data["message_type"] == "group" else + {"user_id": event_data["user_id"]} ).call() else: - plugin_name = str(event_data.message).split(" ", 1)[1].lower() + plugin_name = cmd[1].lower() for plugin in PluginManager.plugins: try: plugin_info = plugin["info"] @@ -71,8 +75,11 @@ def on_help(event_data): Actions.SendMsg( message=QQRichText.QQRichText( QQRichText.Reply(event_data["message_id"]), - plugin_info.HELP_MSG + "\n----------\n发送/help以获取全部的插件帮助信息" - ), group_id=event_data["group_id"] + QQRichText.Text(plugin_info.HELP_MSG + "\n----------\n发送/help以获取全部的插件帮助信息") + ), + **{"group_id": event_data["group_id"]} + if event_data["message_type"] == "group" else + {"user_id": event_data["user_id"]} ).call() return except Exception as e: @@ -82,6 +89,9 @@ def on_help(event_data): Actions.SendMsg( message=QQRichText.QQRichText( QQRichText.Reply(event_data["message_id"]), - "没有找到此插件,请检查是否有拼写错误" - ), group_id=event_data["group_id"] + QQRichText.Text("没有找到此插件,请检查是否有拼写错误") + ), + **{"group_id": event_data["group_id"]} + if event_data["message_type"] == "group" else + {"user_id": event_data["user_id"]} ).call()