-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
57a8f67
commit c8c76cb
Showing
18 changed files
with
613 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
name = "hydroroll" | ||
|
||
from hydroroll.bot import Bot | ||
from hydroroll.config import GlobalConfig |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from iamai import Bot as _Bot | ||
from typing import Optional, Dict, List, Type, Any, Union | ||
from pathlib import Path | ||
import os | ||
# 获取当前目录路径 | ||
current_dir = Path.cwd() | ||
|
||
# 获取当前脚本文件路径 | ||
script_file = current_dir.resolve() / __file__ | ||
|
||
# 获取当前脚本文件所在目录路径 | ||
script_dir = script_file.parent | ||
|
||
__all__ = ["Bot"] | ||
|
||
class Bot: | ||
def __init__( | ||
self, | ||
*, | ||
config_file: Optional[str] = "config.toml", | ||
config_dict: Optional[Dict] = None, | ||
hot_reload: bool = False, | ||
) -> None: | ||
self.bot = _Bot(hot_reload=hot_reload, | ||
config_file=config_file, | ||
config_dict=config_dict | ||
) | ||
self.bot.load_plugins_from_dirs(Path(f"{script_dir}/plugins")) | ||
self.create_folders() | ||
self.init_data() | ||
self.init_conf() | ||
self.init_webui() | ||
|
||
def run(self) -> None: | ||
self.bot.run() | ||
|
||
def restart(self) -> None: | ||
self.bot.restart() | ||
|
||
def create_folders(self): | ||
folder_path = os.path.dirname(os.path.abspath('__file__')) # 获取main.py所在文件夹路径 | ||
if not os.path.isdir(os.path.join(folder_path, 'user')): | ||
os.mkdir(os.path.join(folder_path, 'user')) | ||
if not os.path.isdir(os.path.join(folder_path, 'data')): | ||
os.mkdir(os.path.join(folder_path, 'data')) | ||
if not os.path.isdir(os.path.join(folder_path, 'models')): | ||
os.mkdir(os.path.join(folder_path, 'models')) | ||
if not os.path.isdir(os.path.join(folder_path, 'web')): | ||
os.mkdir(os.path.join(folder_path, 'web')) | ||
if not os.path.isdir(os.path.join(folder_path, 'config')): | ||
os.mkdir(os.path.join(folder_path, 'config')) | ||
if not os.path.isdir(os.path.join(folder_path, 'logs')): | ||
os.mkdir(os.path.join(folder_path, 'logs')) | ||
if not os.path.isdir(os.path.join(folder_path, 'rules')): | ||
os.mkdir(os.path.join(folder_path, 'rules')) | ||
|
||
def init_data(self): | ||
... | ||
|
||
def init_webui(self): | ||
... | ||
|
||
def init_conf(self): | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
import argparse | ||
import sys | ||
import platform | ||
from importlib.metadata import version | ||
from iamai import Plugin | ||
import os | ||
import pickle | ||
import threading | ||
|
||
|
||
# 创建全局 ArgumentParser 对象 | ||
global_parser = argparse.ArgumentParser(description='hydroroll[水系] 全局命令参数') | ||
|
||
# 定义全局配置类 | ||
class GlobalConfig: | ||
_name = "hydroroll" | ||
_version = "0.1.0" | ||
_svn = "2" | ||
_author = "简律纯" | ||
_iamai_version = version('iamai') | ||
_python_ver = sys.version | ||
_python_ver_raw= '.'.join(map(str, platform.python_version_tuple()[:3])) | ||
current_path = os.path.dirname(os.path.abspath('__file__')) | ||
|
||
# 定义系统组件 | ||
class HydroSystem: | ||
def __init__(self): | ||
self.parser = argparse.ArgumentParser(description='hydroroll[水系].system 系统命令参数') | ||
self.subparsers = self.parser.add_subparsers() | ||
self.status_parser = self.subparsers.add_parser('status', aliases=['s'], help='系统状态') | ||
self.reload_parser = self.subparsers.add_parser('reload', aliases=['rld'], help='重新加载系统') | ||
self.restart_parser = self.subparsers.add_parser('restart', aliases=['rst'], help='重启系统') | ||
self.help = '\n'.join(self.parser.format_help().replace('\r\n', '\n').replace('\r', '').split('\n')[2:-3]) | ||
class HydroBot: | ||
def __init__(self) -> None: | ||
self.parser = argparse.ArgumentParser(description="Bot命令") | ||
|
||
|
||
class ConfigManager: | ||
def __init__(self, directory): | ||
self.directory = directory | ||
self.configs = {} | ||
self.locks = {} | ||
|
||
def get(self, filename, section, key, default=None): | ||
if not self._check_file_exists(filename): | ||
return default | ||
config = self._get_config(filename) | ||
if section not in config: | ||
return default | ||
return config[section].get(key, default) | ||
|
||
def set(self, filename, section, key, value): | ||
config = self._get_config(filename) | ||
if section not in config: | ||
config[section] = {} | ||
config[section][key] = value | ||
self._save_config(filename, config) | ||
|
||
def delete(self, filename, section, key): | ||
config = self._get_config(filename) | ||
if section not in config: | ||
return | ||
if key not in config[section]: | ||
return | ||
del config[section][key] | ||
self._save_config(filename, config) | ||
|
||
def sections(self, filename): | ||
if not self._check_file_exists(filename): | ||
return [] | ||
config = self._get_config(filename) | ||
return list(config.keys()) | ||
|
||
def section_items(self, filename, section): | ||
if not self._check_file_exists(filename): | ||
return [] | ||
config = self._get_config(filename) | ||
if section not in config: | ||
return [] | ||
return list(config[section].items()) | ||
|
||
def files(self): | ||
return [filename for filename in os.listdir(self.directory) if filename.endswith('.dat')] | ||
|
||
def _get_lock(self, filename): | ||
if filename not in self.locks: | ||
self.locks[filename] = threading.Lock() | ||
return self.locks[filename] | ||
|
||
def _get_config(self, filename): | ||
with self._get_lock(filename): | ||
if filename not in self.configs: | ||
filepath = os.path.join(self.directory, filename) | ||
if os.path.exists(filepath): | ||
try: | ||
with open(filepath, 'rb') as f: | ||
self.configs[filename] = pickle.load(f) | ||
except: | ||
pass | ||
if filename not in self.configs: | ||
self.configs[filename] = {} | ||
return self.configs[filename] | ||
|
||
def _save_config(self, filename, config): | ||
with self._get_lock(filename): | ||
try: | ||
filepath = os.path.join(self.directory, filename) | ||
backuppath = os.path.join(self.directory, filename + '.bak') | ||
lockpath = os.path.join(self.directory, filename + '.lock') | ||
with open(lockpath, 'wb') as f: | ||
pass | ||
if os.path.exists(filepath): | ||
os.replace(filepath, backuppath) | ||
with open(filepath, 'wb') as f: | ||
pickle.dump(config, f) | ||
os.remove(backuppath) | ||
os.remove(lockpath) | ||
except: | ||
pass | ||
|
||
def _check_file_exists(self, filename): | ||
filepath = os.path.join(self.directory, filename) | ||
return os.path.exists(filepath) and filename.endswith('.dat') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import re | ||
from abc import ABC, abstractmethod | ||
from typing import Type, Union, Generic, TypeVar | ||
|
||
from iamai import Plugin | ||
from iamai.typing import T_State | ||
from iamai.adapter.cqhttp.event import GroupMessageEvent, PrivateMessageEvent | ||
|
||
from .config import BasePluginConfig, RegexPluginConfig, CommandPluginConfig | ||
|
||
T_Config = TypeVar("T_Config", bound=BasePluginConfig) | ||
T_RegexPluginConfig = TypeVar("T_RegexPluginConfig", bound=RegexPluginConfig) | ||
T_CommandPluginConfig = TypeVar("T_CommandPluginConfig", bound=CommandPluginConfig) | ||
|
||
|
||
class BasePlugin( | ||
Plugin[Union[PrivateMessageEvent, GroupMessageEvent], T_State, T_Config], | ||
ABC, | ||
Generic[T_State, T_Config], | ||
): | ||
Config: Type[T_Config] = BasePluginConfig | ||
|
||
def format_str(self, format_str: str, message_str: str = "") -> str: | ||
return format_str.format( | ||
message=message_str, | ||
user_name=self.event.sender.nickname, | ||
user_id=self.event.sender.user_id, | ||
) | ||
|
||
async def rule(self) -> bool: | ||
is_bot_off = True | ||
|
||
if self.event.adapter.name != "cqhttp": | ||
return False | ||
if self.event.type != "message": | ||
return False | ||
match_str = self.event.message.get_plain_text() | ||
if is_bot_off: | ||
if self.event.message.startswith(f'[CQ:at,qq={self.event.self_id}]'): | ||
match_str = re.sub(fr'^\[CQ:at,qq={self.event.self_id}\]', '', match_str) | ||
else: | ||
return False | ||
if self.config.handle_all_message: | ||
return self.str_match(match_str) | ||
elif self.config.handle_friend_message: | ||
if self.event.message_type == "private": | ||
return self.str_match(match_str) | ||
elif self.config.handle_group_message: | ||
if self.event.message_type == "group": | ||
if ( | ||
self.config.accept_group is None | ||
or self.event.group_id in self.config.accept_group | ||
): | ||
return self.str_match(match_str) | ||
return False | ||
|
||
@abstractmethod | ||
def str_match(self, msg_str: str) -> bool: | ||
raise NotImplemented | ||
|
||
|
||
class RegexPluginBase(BasePlugin[T_State, T_RegexPluginConfig], ABC): | ||
msg_match: re.Match | ||
re_pattern: re.Pattern | ||
Config: Type[T_RegexPluginConfig] = RegexPluginConfig | ||
|
||
def str_match(self, msg_str: str) -> bool: | ||
msg_str = msg_str.strip() | ||
self.msg_match = self.re_pattern.fullmatch(msg_str) | ||
return bool(self.msg_match) | ||
|
||
|
||
class CommandPluginBase(RegexPluginBase[T_State, T_CommandPluginConfig], ABC): | ||
command_match: re.Match | ||
command_re_pattern: re.Pattern | ||
Config: Type[T_CommandPluginConfig] = CommandPluginConfig | ||
|
||
def str_match(self, msg_str: str) -> bool: | ||
if not hasattr(self, "command_re_pattern"): | ||
self.command_re_pattern = re.compile( | ||
f'[{"".join(self.config.command_prefix)}]' | ||
f'({"|".join(self.config.command)})' | ||
r"\s*(?P<command_args>.*)", | ||
flags=re.I if self.config.ignore_case else 0, | ||
) | ||
msg_str = msg_str.strip() | ||
self.command_match = self.command_re_pattern.fullmatch(msg_str) | ||
if not self.command_match: | ||
return False | ||
self.msg_match = self.re_pattern.fullmatch( | ||
self.command_match.group("command_args") | ||
) | ||
return bool(self.msg_match) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from typing import Set, Optional | ||
|
||
from iamai import ConfigModel | ||
|
||
|
||
class BasePluginConfig(ConfigModel): | ||
__config_name__ = "" | ||
handle_all_message: bool = True | ||
"""是否处理所有类型的消息,此配置为 True 时会覆盖 handle_friend_message 和 handle_group_message。""" | ||
handle_friend_message: bool = True | ||
"""是否处理好友消息。""" | ||
handle_group_message: bool = True | ||
"""是否处理群消息。""" | ||
accept_group: Optional[Set[int]] = None | ||
"""处理消息的群号,仅当 handle_group_message 为 True 时生效,留空表示处理所有群。""" | ||
message_str: str = "*{user_name} {message}" | ||
"""最终发送消息的格式。""" | ||
|
||
|
||
class RegexPluginConfig(BasePluginConfig): | ||
pass | ||
|
||
|
||
class CommandPluginConfig(RegexPluginConfig): | ||
command_prefix: Set[str] = {".", "。","!",":"} | ||
"""命令前缀。""" | ||
command: Set[str] = {} | ||
"""命令文本。""" | ||
ignore_case: bool = True | ||
"""忽略大小写。""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import re | ||
from importlib.metadata import version | ||
from plugins.hydroroll_plugin_base import CommandPluginBase | ||
from hydroroll.config import GlobalConfig | ||
|
||
from .config import Config | ||
|
||
|
||
class HydroBot(CommandPluginBase[None, Config]): | ||
Config = Config | ||
CurrentConfig = GlobalConfig | ||
priority = 0 | ||
|
||
def __post_init__(self): | ||
self.re_pattern = re.compile(r"(?P<bot_info_str>.*)", flags=re.I) | ||
|
||
def bot_info(self): | ||
info_str = f'{self.CurrentConfig._name} '\ | ||
f'{self.CurrentConfig._version}({self.CurrentConfig._svn}) '\ | ||
f'by {self.CurrentConfig._author} '\ | ||
f'on Python {self.CurrentConfig._python_ver_raw} '\ | ||
f'with {" & ".join([adapter + "("+version("iamai-adapter-"+adapter) +")" for adapter in dict(self.bot.config.adapter)])} '\ | ||
f'for iamai({self.CurrentConfig._iamai_version})' | ||
|
||
return info_str | ||
|
||
async def handle(self) -> None: | ||
await self.event.reply( | ||
self.format_str(self.config.message_str, self.bot_info()) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from typing import Set | ||
|
||
from plugins.hydroroll_plugin_base import CommandPluginConfig | ||
|
||
|
||
class Config(CommandPluginConfig): | ||
__config_name__ = "plugin_bot_info" | ||
command: Set[str] = {"bot"} | ||
"""命令文本。""" | ||
message_str: str = "{message}" | ||
"""最终发送消息的格式。""" |
Oops, something went wrong.
c8c76cb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Successfully deployed to the following URLs:
hydro-roll – ./
hydro-roll-docs.vercel.app
hydro-roll-retrofor.vercel.app
hydro-roll-git-main-retrofor.vercel.app
hydroroll.retrofor.space