/
qwq.py
292 lines (260 loc) · 11.5 KB
/
qwq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import re
from nonebot import on_regex
from nonebot.adapters import Event
from nonebot.adapters.onebot.v11 import Message
from nonebot.params import EventMessage
from src.libraries.maimaidx_music import *
from src.libraries.qwq_function import *
#根据游戏ID获取人数,可以前往网站的“机厅”中查看具体的游戏ID
getGP=on_regex(r"(吾悦麦麦)$")
@getGP.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
payload = {"gameId":1003}
r= await getGamePlayer(payload)
if r['code']==-1:
await getGP.send("没人上报,似乎没有人!")
elif r['code']==0:
if r['message']=="waiting":
await getGP.send("当前人数正在变动,请稍后查询!")
else:
print("你的gameId不正确。")
else:
await getGP.send("当前人数:"+str(r['data']['player'])+"人\n上报时间:"+str(r['data']['time']).replace("T"," "))
except Exception as e:
print(e)
await getGP.send("指令实现过程出错,请联系管理员。")
incGPS=on_regex(r"^(吾悦麦麦加)([0-9][0-9]?)")
@incGPS.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
inf =re.match("^(吾悦麦麦加)([0-9][0-9]?)", str(message)).groups()
player = int(inf[1])
if player <0:
await incGPS.send("人数小于了0人,有鬼!")
return
payload = {"gameId":1003,"player":player}
r= await incGamePlayers(payload)
if r['code']==0:
if r['message']=="waiting":
await incGPS.send("当前人数正在变动,请稍后再发出指令!")
elif r['message']=="size":
await incGPS.send("人数不太对吧!")
else:
print("你的gameId不正确。")
elif r['code']==1:
await incGPS.send("成功更改,当前人数:"+str(r['data']['player'])+"人")
except Exception as e:
print(e)
await incGPS.send("指令实现过程出错,请联系管理员。")
decGPS=on_regex(r"^(吾悦麦麦减)([0-9][0-9]?)")
@decGPS.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
inf =re.match("^(吾悦麦麦减)([0-9][0-9]?)", str(message)).groups()
player = int(inf[1])
if player <0:
await decGPS.send("人数小于了0人,有鬼!")
return
payload = {"gameId":1003,"player":player}
r= await decGamePlayers(payload)
if r['code']==0:
if r['message']=="waiting":
await decGPS.send("当前人数正在变动,请稍后再发出指令!")
elif r['message']=="size":
await decGPS.send("人数不太对吧!")
else:
print("你的gameId不正确。")
elif r['code']==1:
await decGPS.send("成功更改,当前人数:"+str(r['data']['player'])+"人")
except Exception as e:
print(e)
await decGPS.send("指令实现过程出错,请联系管理员。")
incGP=on_regex(r"(吾悦麦麦\+\+)$")
@incGP.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
payload = {"gameId":1003}
r= await incGamePlayer(payload)
if r['code']==0:
if r['message']=="waiting":
await incGP.send("当前人数正在变动,请稍后再发出指令!")
elif r['message']=="size":
await incGP.send("人数超过了20人,真的有这么多吗!")
else:
print("你的gameId不正确。")
else:
await incGP.send("成功增加,当前人数:"+str(r['data']['player'])+"人")
except Exception as e:
print(e)
await incGP.send("指令实现过程出错,请联系管理员。")
decGP=on_regex(r"(吾悦麦麦\-\-)$")
@decGP.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
payload = {"gameId":1003}
r= await decGamePlayer(payload)
if r['code']==0:
if r['message']=="waiting":
await decGP.send("当前人数正在变动,请稍后再发出指令!")
elif r['message']=="size":
await decGP.send("当前没有人游玩,不能再减了!")
else:
print("你的gameId不正确。")
elif r['code']==1:
await decGP.send("成功增加,当前人数:"+str(r['data']['player'])+"人")
except Exception as e:
print(e)
await decGP.send("指令实现过程出错,请联系管理员。")
setGP=on_regex(r"^(吾悦麦麦)([0-9][0-9]?)(人)")
@setGP.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
inf =re.match("^(吾悦麦麦)([0-9][0-9]?)(人)", str(message)).groups()
player = int(inf[1])
if player >20:
await setGP.send("人数超过了20人,真的有这么多吗!")
return
elif player <0:
await setGP.send("人数小于了0人,有鬼!")
return
payload = {"gameId":1003,"player":player}
r= await setGamePlayer(payload)
if r['code']==0:
if r['message']=="waiting":
await setGP.send("当前人数正在变动,请稍后再发出指令!")
elif r['message']=="size":
await setGP.send("人数不太对吧!")
else:
print("你的gameId不正确。")
elif r['code']==1:
await setGP.send("成功更改,当前人数:"+str(r['data']['player'])+"人")
except Exception as e:
print(e)
await setGP.send("指令实现过程出错,请联系管理员。")
qwqQQ = on_regex(r"qwq验证.*")
@qwqQQ.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
payload = {"username": re.match("(qwq验证)(.*)", str(message)).groups()[1], 'qq': event.get_user_id()}
response = await verifyQQ(payload)
await qwqQQ.send(response['message'])
except Exception as e:
print(e)
await qwqQQ.send("指令实现过程出错,请联系管理员。")
signByQQ = on_regex(r"qwq签到.*")
@signByQQ.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
payload = {"username": re.match("(qwq签到)(.*)", str(message)).groups()[1], 'qq': event.get_user_id()}
response = await signByQQ(payload)
await signByQQ.send(response['message'])
except Exception as e:
print(e)
await signByQQ.send("指令实现过程出错,请联系管理员。")
getSongIDName = on_regex(r"是什么歌$")
@getSongIDName.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
aliasName = re.match("(.*)(是什么歌)$", str(message)).groups()[0]
payload = {"text": aliasName}
response = await getSongByAlias(payload)
songList =response['data']
listLen = len(songList)
if listLen == 0:
await getSongIDName.send("没有歌曲拥有此别称!")
return
if response['code']==1:
if listLen == 1:
# 调用自己的函数,生成并输出歌曲图片
songId = str(songList[0]['songId'])
songPhoto = "这是一张ID为" + songId + "叫做" + total_list.by_id(songId).title + "的歌曲的图片!"
#
await getSongIDName.send(songPhoto)
else:
myStr = "以下是拥有\"" + aliasName + "\"别称的歌曲:\n"
for i in songList:
myStr += "ID:" + str(i['songId']) + ",歌曲名:《" + total_list.by_id(str(i['songId'])).title + "》\n"
await getSongIDName.send(myStr)
elif response['code']==2:
# 模糊搜索到了结果
myStr = f"没有直接找到拥有\"{aliasName}\"别称的歌曲,以下是别称中含有\"" + aliasName + "\"文字的歌曲:\n"
myAlias = dict()
myID = dict()
for i in songList:
if myAlias.get(str(i['songId'])) is None:
myAlias[str(i['songId'])] = f"《{str(i['text'])}》"
else:
myAlias[str(i['songId'])] += f",《{str(i['text'])}》"
for i in songList:
if myID.get(str(i['songId'])) is None:
myStr += "ID:" + str(i['songId']) + "\n歌曲名:《" + total_list.by_id(str(i['songId'])).title \
+ f"》\n别称:{myAlias[str(i['songId'])]}\n\n"
myID[str(i['songId'])]=True
await getSongIDName.send(myStr)
else:
await getSongIDName.send("指令实现过程出错,请联系管理员。")
except Exception as e:
print(f"异常{e}")
await getSongIDName.send("指令实现过程出错,请联系管理员。")
addAlias = on_regex(r"添加别称 [0-9]* .*")
@addAlias.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
inf = re.match("(添加别称) ([0-9]*) (.*)$", str(message)).groups()
songID = inf[1]
alias = inf[2]
if len(songID) > 5:
# 不存在这样的歌曲ID,直接忽略请求
return
if len(alias) < 2:
await addAlias.send("别称长度需要大于等于2。")
return
payload = {"songId": songID, "text": alias}
r = await addSongAlias(payload)
if r['code'] == 1:
await addAlias.send("添加成功。")
else:
if r['message'] == "added":
await addAlias.send("添加失败。\n原因:该歌曲已经拥有此别称。")
else:
# 可以自己多自定义几段话
await addAlias.send("添加失败。")
except Exception as e:
print(e)
await addAlias.send("指令实现过程出错,请联系管理员。")
getSongAlias = on_regex(r"有什么别称$")
@getSongAlias.handle()
async def _(event: Event, message: Message = EventMessage()):
try:
songId = re.match("(.*)(有什么别称$)$", str(message)).groups()[0]
payload = {"songId": songId}
r:list = await getSongAliases(payload)
if len(r)==0:
await getSongAlias.send("它好像没有别称,输入:\n添加别称 "+songId+" 哈哈哈哈哈\n来为它增加一个别称吧!")
else:
myStr = "《" + total_list.by_id(songId).title + "》拥有以下别称:\n"
for index, s in enumerate(r):
# (该歌曲的别称下标).别称( (别称ID) )
myStr += str(index + 1) + "." + s['text'] + "(" + str(s['id']) + ")" + "\n"
await getSongAlias.send(myStr)
except Exception as e:
print(e)
await getSongAlias.send("指令实现过程出错,请联系管理员。")
#接受别称ID
deleteSongAlias = on_regex("删除别称 [0-9]*")
@deleteSongAlias.handle()
async def _(event: Event, message: Message = EventMessage()):
#建议建一个能够删除别称的QQ表
qq = event.get_user_id()
try:
aliasId = re.match("(删除别称) ([0-9]*)$", str(message)).groups()[1]
payload={'id':aliasId}
state = await deleteAliasByID(payload)
if state:
await deleteSongAlias.send("删除成功。")
else:
await deleteSongAlias.send("删除失败。")
except Exception as e:
print(e)
await deleteSongAlias.send("指令实现过程出错,请联系管理员。")