-
Notifications
You must be signed in to change notification settings - Fork 1
/
__init__.py
270 lines (236 loc) · 9.82 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from nonebot.plugin.on import on_command
from nonebot.matcher import Matcher
from nonebot.adapters.onebot.v11 import (
GROUP_ADMIN,
GROUP_OWNER,
Bot,
GroupMessageEvent,
Message,
)
from nonebot.params import CommandArg
from nonebot.permission import SUPERUSER
from nonebot.log import logger
from pathlib import Path
import os
import re
try:
import ujson as json
except ModuleNotFoundError:
import json
# 加载阻断配置
path = Path() / "data" / "block"
if not path.exists():
os.makedirs(path)
def load(file:Path) -> dict:
if file.exists():
with open(file, "r", encoding="utf8") as f:
return json.load(f)
return {}
# 屏蔽
group_config_file = path / "group_config.json"
group_config = load(group_config_file)
global_group_config = group_config.get("global",[])
# 冷却
time_config_file = path / "time_config.json"
time_config = load(time_config_file)
global_time_config = time_config.get("global",{})
# 共享
share_config_file = path / "share_config.json"
share_config = load(share_config_file)
global_share_config = share_config.get("global",{})
def is_block_group(event:GroupMessageEvent) -> bool:
"""
规则:屏蔽
"""
msg = event.message.extract_plain_text().strip()
commands = group_config.setdefault(str(event.group_id),global_group_config.copy())
for command in commands:
if command.startswith("^") and re.match(re.compile(command),msg):
return True
elif msg.startswith(command):
return True
else:
return False
cache = {}
def is_block_time(event:GroupMessageEvent) -> bool:
"""
规则:冷却
"""
msg = event.message.extract_plain_text().strip()
group_id = str(event.group_id)
commands = time_config.setdefault(group_id,global_time_config.copy())
for command in commands:
if command.startswith("^") and re.match(re.compile(command),msg):
break
elif msg.startswith(command):
break
else:
return False
user_id = event.user_id
usercache = cache.setdefault(group_id, {}).setdefault(user_id, {})
cd = event.time - usercache.get(command, 0)
if cd > commands[command]:
usercache[command] = event.time
return False
else:
return f"你的【{msg}】冷却还有{int(commands[command] - cd) + 1}秒"
def is_block_share(event:GroupMessageEvent) -> bool:
"""
规则:共享冷却
"""
msg = event.message.extract_plain_text().strip()
group_id = str(event.group_id)
commands = share_config.setdefault(group_id,global_share_config.copy())
for command in commands:
if command.startswith("^") and re.match(re.compile(command),msg):
break
elif msg.startswith(command):
break
else:
return False
sharecache = cache.setdefault(group_id, {}).setdefault("share", {})
cd = event.time - sharecache.get(command, 0)
if cd > commands[command]:
sharecache[command] = event.time
return False
else:
return f"本群【{msg}】冷却还有{int(commands[command] - cd) + 1}秒"
from nonebot.message import event_preprocessor
from nonebot.exception import IgnoredException
@event_preprocessor
async def do_something(bot:Bot, event:GroupMessageEvent , permission = SUPERUSER | GROUP_ADMIN | GROUP_OWNER):
if not await permission(bot,event):
if is_block_group(event):
raise IgnoredException("本群已屏蔽此指令")
elif echo := is_block_share(event):
await bot.send(event = event, message = echo)
raise IgnoredException("本群指令正在冷却")
elif echo := is_block_time(event):
await bot.send(event = event, message = echo)
raise IgnoredException("用户指令正在冷却")
add_block = on_command("添加阻断",aliases = {"设置阻断"}, permission = SUPERUSER | GROUP_ADMIN | GROUP_OWNER, priority = 5, block = True)
@add_block.handle()
async def _(bot:Bot,event:GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip().split()
if not msg:
return
command = msg[0]
if command in {"添加阻断","设置阻断","解除阻断","删除阻断"}:
await add_block.finish("不可以这么做")
args = set(msg[1:])
if await SUPERUSER(bot,event) and {"全局"} & args:
group_id = "global"
else:
group_id = str(event.group_id)
args.discard("全局")
echo = "添加阻断指令格式错误。"
if {"群","屏蔽"} & args:
global group_config,group_config_file
commands = group_config.setdefault(group_id,global_group_config.copy())
if command not in commands:
commands.append(command)
if group_id == "global":
group_config = {k:list(set(v.append(command))) for k,v in group_config.items()}
with open(group_config_file, "w", encoding="utf8") as f:
json.dump(group_config, f, ensure_ascii=False, indent=4)
echo = f"指令【{command}】已在{'全局' if group_id == 'global' else f'群{group_id}'}屏蔽。"
logger.info(echo)
else:
def set_config(data,file):
commands = data.setdefault(group_id,data.get("global",{}).copy())
commands[command] = cd
if group_id == "global":
for v in data.values():
v[command] = cd
with open(file, "w", encoding="utf8") as f:
json.dump(data, f, ensure_ascii = False, indent = 4)
if block_type := {"时间","冷却"} & args:
args = args - block_type
cd = args.pop()
try:
cd = int(cd)
except:
echo = f"添加阻断接受了错误的时间参数:{cd}"
logger.warning(echo)
global time_config,time_config_file
set_config(time_config,time_config_file)
echo = f"指令【{command}】已在{'全局' if group_id == 'global' else group_id}设置冷却:{cd}s"
logger.info(echo)
elif block_type := {"共享","共享冷却"} & args:
args = args - block_type
cd = args.pop()
try:
cd = int(cd)
except:
echo = f"添加阻断接受了错误的时间参数:{cd}"
logger.warning(echo)
global share_config,share_config_file
set_config(share_config,share_config_file)
echo = f"指令【{command}】已在{'全局' if group_id == 'global' else group_id}设置共享冷却:{cd}s"
logger.info(echo)
await add_block.finish(echo)
del_block = on_command("解除阻断",aliases={"删除阻断"}, permission = SUPERUSER | GROUP_ADMIN | GROUP_OWNER, priority = 5, block = True)
@del_block.handle()
async def _(bot:Bot,event:GroupMessageEvent, arg: Message = CommandArg()):
msg = arg.extract_plain_text().strip().split()
if not msg:
return
command = msg[0]
args = set(msg[1:])
if await SUPERUSER(bot,event) and {"全局"} & args:
group_id = "global"
else:
group_id = str(event.group_id)
echo = []
global group_config,group_config_file,share_config,share_config_file,time_config,time_config_file
if group_id in group_config and command in group_config[group_id]:
group_config[group_id].remove(command)
if group_id == "global":
group_config = {k:[x for x in v if x != command] for k,v in group_config.items()}
with open(group_config_file, "w", encoding="utf8") as f:
json.dump(group_config, f, ensure_ascii=False, indent=4)
echo.append(f"指令【{command}】已解除屏蔽")
if group_id in share_config and command in share_config[group_id]:
del share_config[group_id][command]
if group_id == "global":
share_config = {k:{vk:vv for vk,vv in v.items() if vk != command} for k,v in share_config.items()}
with open(share_config_file, "w", encoding="utf8") as f:
json.dump(share_config, f, ensure_ascii=False, indent=4)
echo.append(f"指令【{command}】已解除共享冷却限制")
if group_id in time_config and command in time_config[group_id]:
del time_config[group_id][command]
if group_id == "global":
time_config = {k:{vk:vv for vk,vv in v.items() if vk != command} for k,v in time_config.items()}
with open(time_config_file, "w", encoding="utf8") as f:
json.dump(time_config, f, ensure_ascii=False, indent=4)
echo.append(f"指令【{command}】已解除冷却限制")
await del_block.finish("\n".join(echo) if echo else f"指令【{command}】未被阻断")
show_block = on_command("查看阻断", permission = SUPERUSER | GROUP_ADMIN | GROUP_OWNER, priority = 5, block = True)
@show_block.handle()
async def _(matcher: Matcher, event:GroupMessageEvent):
group_id = str(event.group_id)
msg = ""
if group_id in group_config:
msg += "".join(f"【{command}】:屏蔽\n" for command in group_config[group_id])
if group_id in time_config:
msg += "".join(f"【{command}】:{cd}s\n" for command,cd in time_config[group_id].items())
if group_id in share_config:
msg += "".join(f"【{command}】:共享{cd}s\n" for command,cd in share_config[group_id].items())
msg = msg[:-1] if msg else "本群没有阻断。"
await show_block.finish(msg)
from nonebot import get_driver
driver = get_driver()
bots = driver.bots
history = []
@event_preprocessor
async def do_something(event: GroupMessageEvent):
if len(bots) > 1:
if event.user_id in bots:
raise IgnoredException("event来自其他bot")
global history
history = history[:20]
message_id = event.message_id
if message_id in history:
raise IgnoredException("event已被其他bot处理")
else:
history.insert(0, message_id)