-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
329 lines (264 loc) · 11.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
try:
import secret_values
except ImportError:
pass
import os
import discord
import re
from detect_hate import call_gpt
from discord.ext import commands
from discord.ext.commands import Context
from discord import Message
from utils import *
from discord import Guild
from detect_misinfo import if_misinfo
from detect_hate import filter_levels
import harmful_content
from audio_filter import *
from datetime import datetime
from video_filter import *
# Check if bot key is in environment variables (heroku) or in secret_values.py (local dev), get from correct location
bot_token = os.environ.get("BOT_TOKEN", default=None)
if not bot_token:
bot_token = secret_values.BOT_TOKEN
gpt_key = os.environ.get("GPT_KEY", default=None)
if not gpt_key:
gpt_key = secret_values.GPT_KEY
vt_key = os.environ.get("VT_KEY", default=None)
if not vt_key:
vt_key = secret_values.VT_KEY
intents = discord.Intents.default()
intents.message_content = True
pic_ext = (".png", ".jpg", ".jpeg") # image ext
url_pattern = re.compile(r'https?://\S+')
bot = commands.Bot(command_prefix="$", intents=intents)
@bot.event
async def on_ready():
print(f"We have logged in as {bot.user}")
@bot.event
async def on_message(message: Message):
if message.author == bot.user: # ignore the bot responses
return
await bot.process_commands(message)
ctx = await bot.get_context(message)
# create a mod channel called 'flag_count' if it doesn't exist already
# keeps track of flagged responses
existing_channel = discord.utils.get(ctx.guild.channels, name="flag-count")
existing_channel2 = discord.utils.get(ctx.guild.channels, name="filter-log")
if not existing_channel:
flag_count_channel = await ctx.guild.create_text_channel("flag-count")
print("flag-count channel created")
if not existing_channel2:
filter_log_channel = await ctx.guild.create_text_channel("filter-log")
print("filter-log channel created")
# image detection - harmful content
if message.attachments:
for attachment in message.attachments:
print(attachment.content_type)
if attachment.content_type.startswith("image/"):
# await message.channel.send('Image attachment detected')
print("Image:", attachment)
result = harmful_content.image_processing(attachment.url, gpt_key)
elif attachment.content_type.startswith("audio/"):
# attachment is audio
print("Attachment:", attachment)
result = get_text(attachment.url)
result = process_info(result, get_bot_role(ctx))
elif attachment.content_type.startswith("video/"):
print("Attachment:", attachment)
result = get_answer(attachment.url)
else: # file ext
# print("File:", await attachment.read())
result = harmful_content.file_processing(await attachment.read(), vt_key)
elif message.content.endswith(pic_ext):
# await message.channel.send('Image detected')
print("URL:", message.content)
result = harmful_content.image_processing(message.content, gpt_key)
elif url_pattern.search(message.content):
result = harmful_content.url_processing(message.content, vt_key)
else:
# set filter level
#ctx = await bot.get_context(message)
# roles = ctx.guild.me.roles
# role_names = [role.name for role in roles]
# if "Total_Filter" in role_names:
# role = "Total_Filter"
# elif "Harmful_Filter" in role_names:
# role = "Harmful_Filter"
# else:
# role = None
role = get_bot_role(ctx)
result = call_gpt(message, gpt_key, role)
if result == False:
# not hate speech
pass
else:
copy = message.content
username_of_message_sent = message.author.name
await message.delete()
# send the copy into the log channel
filter_channel = discord.utils.get(
ctx.guild.text_channels, name="filter-log"
)
await filter_channel.send(f"{username_of_message_sent} : '{copy}' | @ {datetime.now()} ")
# respond to the user with a correct response prompt (i.e, this is bad)
await message.channel.send(
f"{message.author.mention} your prior message/image has been removed for hate speech, harmful content, or vulgarity"
)
print(f"username: {username_of_message_sent}")
# get all messages in flag-counts
channel = discord.utils.get(
ctx.guild.text_channels, name="flag-count"
)
if channel:
# split message by delimeter
async for message in channel.history():
index_of_delimiter = message.content.find(":")
username = message.content[0:index_of_delimiter]
count = message.content[index_of_delimiter + 1]
if username == username_of_message_sent:
print("username found in flag-counts")
count = int(count)
count += 1
if count >= 5:
# send warning DM
message_string = f"You have triggered the filter {count} times. Your actions are being monitored by the administrators."
#await message.author.send(message_string)
count = str(count)
# rewrite current count
updated_string = f"{username}:{count}"
await message.edit(content=updated_string)
return
# person not in channel, write into channel (below!!!)
await channel.send(f"{username_of_message_sent}:1")
else:
print("channel does not exist")
async def set_filter_role(target_filter_level: str, guild: Guild) -> (bool, str):
filter_names = [filter_level["name"] for filter_level in filter_levels]
if target_filter_level not in filter_names:
return False, "Invalid filter level"
roles = guild.me.roles
role_names = [role.name for role in roles]
if target_filter_level in role_names:
return True, "The bot is already set to this filter level"
else:
role = discord.utils.get(guild.roles, name=target_filter_level)
if role is None:
role = await guild.create_role(name=target_filter_level)
names_to_remove = [
filter_name
for filter_name in filter_names
if filter_name != target_filter_level and filter_name in role_names
]
for name in names_to_remove:
try:
role_to_remove = discord.utils.get(guild.roles, name=name)
await guild.me.remove_roles(role_to_remove)
except Exception as e:
pass
await guild.me.add_roles(role)
target_filter_description = None
for filter_level in filter_levels:
if filter_level["name"] == target_filter_level:
target_filter_description = filter_level["description"]
break
return True, f"The bot has been set to {target_filter_description}"
@bot.event
async def on_guild_join(guild):
# create roles for all filter levels
guild_roles = [role["name"] for role in guild.roles]
filters_to_create = [
filter_level["name"]
for filter_level in filter_levels
if filter_level["name"] not in guild_roles
]
for filter_level in filters_to_create:
await guild.create_role(name=filter_level)
# set filter to default level
default_filter_level = filter_levels[1]
await set_filter_role(default_filter_level["name"], guild)
@bot.command()
async def test(ctx):
await ctx.send("Hello world!")
@bot.command()
async def faq(ctx):
await ctx.send("to set the level of moderation: $strictnessX (x=1-4)")
await ctx.send("to fact check text: reply to a piece of text with $factcheck")
@bot.command()
async def strictness(ctx: Context, level: int):
user = ctx.author
if not user.guild_permissions.administrator:
await ctx.send("You do not have permission to use this command")
return
if level < 1 or level > len(filter_levels):
await ctx.send("Invalid level")
return
filter_level = filter_levels[level - 1]
success, message = await set_filter_role(filter_level["name"], ctx.guild)
await ctx.send(message)
@bot.command()
async def strictness1(ctx: Context):
user = ctx.author
if not user.guild_permissions.administrator:
await ctx.send("You do not have permission to use this command")
return
filter_level = filter_levels[0]
success, message = await set_filter_role(filter_level["name"], ctx.guild)
await ctx.send(message)
@bot.command()
async def strictness2(ctx: Context):
user = ctx.author
if not user.guild_permissions.administrator:
await ctx.send("You do not have permission to use this command")
return
filter_level = filter_levels[1]
success, message = await set_filter_role(filter_level["name"], ctx.guild)
await ctx.send(message)
@bot.command()
async def strictness3(ctx: Context):
user = ctx.author
if not user.guild_permissions.administrator:
await ctx.send("You do not have permission to use this command")
return
filter_level = filter_levels[2]
success, message = await set_filter_role(filter_level["name"], ctx.guild)
await ctx.send(message)
@bot.command()
async def strictness4(ctx: Context):
user = ctx.author
if not user.guild_permissions.administrator:
await ctx.send("You do not have permission to use this command")
return
filter_level = filter_levels[3]
success, message = await set_filter_role(filter_level["name"], ctx.guild)
await ctx.send(message)
@bot.command()
async def factcheck(ctx: Context):
# check if the message is a response
if not ctx.message.reference:
await ctx.send("Sorry, This command only works as a response to a message.")
return
if not (
(ctx.message.reference.resolved and ctx.message.reference.resolved.content)
or ctx.message.reference.cached_message
):
await ctx.send("Sorry, I couldn't find the message you were replying to.")
return
if ctx.message.reference.resolved.author == bot.user:
await ctx.send("Sorry, I can't factcheck myself.")
return
# get the message that was replied to
if (
ctx.message.reference.cached_message.attachments
): # there is attatchment in message
for attachment in ctx.message.reference.cached_message.attachments:
if attachment.content_type.startswith("audio/"):
# attachment is audio
print("Attachment:", attachment)
replied_message = get_text(attachment.url)
else: # no attachment in message
replied_message = ctx.message.reference.resolved.content
# replied_message = ctx.message.reference.resolved.content
misinfo_res = if_misinfo(replied_message)
await ctx.send(misinfo_res)
bot.run(bot_token)