/
bot.py
1259 lines (1045 loc) · 43.8 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import inspect
import logging
import os
import platform
import shutil
import sys
from collections import namedtuple
from datetime import datetime
from enum import IntEnum
from importlib.machinery import ModuleSpec
from pathlib import Path
from typing import (
Optional,
Union,
List,
Dict,
NoReturn,
Set,
Coroutine,
TypeVar,
Callable,
Awaitable,
Any,
)
from types import MappingProxyType
import discord
from discord.ext import commands as dpy_commands
from discord.ext.commands import when_mentioned_or
from discord.ext.commands.bot import BotBase
from . import Config, i18n, commands, errors, drivers, modlog, bank
from .cog_manager import CogManager, CogManagerUI
from .core_commands import license_info_command, Core
from .data_manager import cog_data_path
from .dev_commands import Dev
from .events import init_events
from .global_checks import init_global_checks
from .settings_caches import PrefixManager, IgnoreManager, WhitelistBlacklistManager
from .rpc import RPCMixin
from .utils import common_filters
CUSTOM_GROUPS = "CUSTOM_GROUPS"
SHARED_API_TOKENS = "SHARED_API_TOKENS"
log = logging.getLogger("redbot")
__all__ = ["RedBase", "Red", "ExitCodes"]
NotMessage = namedtuple("NotMessage", "guild")
PreInvokeCoroutine = Callable[[commands.Context], Awaitable[Any]]
T_BIC = TypeVar("T_BIC", bound=PreInvokeCoroutine)
def _is_submodule(parent, child):
return parent == child or child.startswith(parent + ".")
# barely spurious warning caused by our intentional shadowing
class RedBase(
commands.GroupMixin, dpy_commands.bot.BotBase, RPCMixin
): # pylint: disable=no-member
"""Mixin for the main bot class.
This exists because `Red` inherits from `discord.AutoShardedClient`, which
is something other bot classes may not want to have as a parent class.
"""
def __init__(self, *args, cli_flags=None, bot_dir: Path = Path.cwd(), **kwargs):
self._shutdown_mode = ExitCodes.CRITICAL
self._cli_flags = cli_flags
self._config = Config.get_core_conf(force_registration=False)
self._co_owners = cli_flags.co_owner
self.rpc_enabled = cli_flags.rpc
self.rpc_port = cli_flags.rpc_port
self._last_exception = None
self._config.register_global(
token=None,
prefix=[],
packages=[],
owner=None,
whitelist=[],
blacklist=[],
locale="en-US",
embeds=True,
color=15158332,
fuzzy=False,
custom_info=None,
help__page_char_limit=1000,
help__max_pages_in_guild=2,
help__delete_delay=0,
help__use_menus=False,
help__show_hidden=False,
help__verify_checks=True,
help__verify_exists=False,
help__tagline="",
description="Red V3",
invite_public=False,
invite_perm=0,
disabled_commands=[],
disabled_command_msg="That command is disabled.",
extra_owner_destinations=[],
owner_opt_out_list=[],
last_system_info__python_version=[3, 7],
last_system_info__machine=None,
last_system_info__system=None,
schema_version=0,
)
self._config.register_guild(
prefix=[],
whitelist=[],
blacklist=[],
admin_role=[],
mod_role=[],
embeds=None,
ignored=False,
use_bot_color=False,
fuzzy=False,
disabled_commands=[],
autoimmune_ids=[],
)
self._config.register_channel(embeds=None, ignored=False)
self._config.register_user(embeds=None)
self._config.init_custom(CUSTOM_GROUPS, 2)
self._config.register_custom(CUSTOM_GROUPS)
self._config.init_custom(SHARED_API_TOKENS, 2)
self._config.register_custom(SHARED_API_TOKENS)
self._prefix_cache = PrefixManager(self._config, cli_flags)
self._ignored_cache = IgnoreManager(self._config)
self._whiteblacklist_cache = WhitelistBlacklistManager(self._config)
async def prefix_manager(bot, message) -> List[str]:
prefixes = await self._prefix_cache.get_prefixes(message.guild)
if cli_flags.mentionable:
return when_mentioned_or(*prefixes)(bot, message)
return prefixes
if "command_prefix" not in kwargs:
kwargs["command_prefix"] = prefix_manager
if cli_flags.owner and "owner_id" not in kwargs:
kwargs["owner_id"] = cli_flags.owner
if "command_not_found" not in kwargs:
kwargs["command_not_found"] = "Command {} not found.\n{}"
message_cache_size = cli_flags.message_cache_size
if cli_flags.no_message_cache:
message_cache_size = None
kwargs["max_messages"] = message_cache_size
self._max_messages = message_cache_size
self._uptime = None
self._checked_time_accuracy = None
self._color = discord.Embed.Empty # This is needed or color ends up 0x000000
self._main_dir = bot_dir
self._cog_mgr = CogManager()
self._use_team_features = cli_flags.use_team_features
super().__init__(*args, help_command=None, **kwargs)
# Do not manually use the help formatter attribute here, see `send_help_for`,
# for a documented API. The internals of this object are still subject to change.
self._help_formatter = commands.help.RedHelpFormatter()
self.add_command(commands.help.red_help)
self._permissions_hooks: List[commands.CheckPredicate] = []
self._red_ready = asyncio.Event()
self._red_before_invoke_objs: Set[PreInvokeCoroutine] = set()
def get_command(self, name: str) -> Optional[commands.Command]:
com = super().get_command(name)
assert com is None or isinstance(com, commands.Command)
return com
def get_cog(self, name: str) -> Optional[commands.Cog]:
cog = super().get_cog(name)
assert cog is None or isinstance(cog, commands.Cog)
return cog
@property
def _before_invoke(self): # DEP-WARN
return self._red_before_invoke_method
@_before_invoke.setter
def _before_invoke(self, val): # DEP-WARN
"""Prevent this from being overwritten in super().__init__"""
pass
async def _red_before_invoke_method(self, ctx):
await self.wait_until_red_ready()
return_exceptions = isinstance(ctx.command, commands.commands._AlwaysAvailableCommand)
if self._red_before_invoke_objs:
await asyncio.gather(
*(coro(ctx) for coro in self._red_before_invoke_objs),
return_exceptions=return_exceptions,
)
def remove_before_invoke_hook(self, coro: PreInvokeCoroutine) -> None:
"""
Functional method to remove a `before_invoke` hook.
"""
self._red_before_invoke_objs.discard(coro)
def before_invoke(self, coro: T_BIC) -> T_BIC:
"""
Overridden decorator method for Red's ``before_invoke`` behavior.
This can safely be used purely functionally as well.
3rd party cogs should remove any hooks which they register at unload
using `remove_before_invoke_hook`
Below behavior shared with discord.py:
.. note::
The ``before_invoke`` hooks are
only called if all checks and argument parsing procedures pass
without error. If any check or argument parsing procedures fail
then the hooks are not called.
Parameters
----------
coro: Callable[[commands.Context], Awaitable[Any]]
The coroutine to register as the pre-invoke hook.
Raises
------
TypeError
The coroutine passed is not actually a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError("The pre-invoke hook must be a coroutine.")
self._red_before_invoke_objs.add(coro)
return coro
@property
def cog_mgr(self) -> NoReturn:
raise AttributeError("Please don't mess with the cog manager internals.")
@property
def uptime(self) -> datetime:
""" Allow access to the value, but we don't want cog creators setting it """
return self._uptime
@uptime.setter
def uptime(self, value) -> NoReturn:
raise RuntimeError(
"Hey, we're cool with sharing info about the uptime, but don't try and assign to it please."
)
@property
def db(self) -> NoReturn:
raise AttributeError(
"We really don't want you touching the bot config directly. "
"If you need something in here, take a look at the exposed methods "
"and use the one which corresponds to your needs or "
"open an issue if you need an additional method for your use case."
)
@property
def counter(self) -> NoReturn:
raise AttributeError(
"Please make your own counter object by importing ``Counter`` from ``collections``."
)
@property
def color(self) -> NoReturn:
raise AttributeError("Please fetch the embed color with `get_embed_color`")
@property
def colour(self) -> NoReturn:
raise AttributeError("Please fetch the embed colour with `get_embed_colour`")
@property
def max_messages(self) -> Optional[int]:
return self._max_messages
async def allowed_by_whitelist_blacklist(
self,
who: Optional[Union[discord.Member, discord.User]] = None,
*,
who_id: Optional[int] = None,
guild_id: Optional[int] = None,
role_ids: Optional[List[int]] = None,
) -> bool:
"""
This checks if a user or member is allowed to run things,
as considered by Red's whitelist and blacklist.
If given a user object, this function will check the global lists
If given a member, this will additionally check guild lists
If omiting a user or member, you must provide a value for ``who_id``
You may also provide a value for ``guild_id`` in this case
If providing a member by guild and member ids,
you should supply ``role_ids`` as well
Parameters
----------
who : Optional[Union[discord.Member, discord.User]]
The user or member object to check
Other Parameters
----------------
who_id : Optional[int]
The id of the user or member to check
If not providing a value for ``who``, this is a required parameter.
guild_id : Optional[int]
When used in conjunction with a provided value for ``who_id``, checks
the lists for the corresponding guild as well.
role_ids : Optional[List[int]]
When used with both ``who_id`` and ``guild_id``, checks the role ids provided.
This is required for accurate checking of members in a guild if providing ids.
Raises
------
TypeError
Did not provide ``who`` or ``who_id``
Returns
-------
bool
`True` if user is allowed to run things, `False` otherwise
"""
# Contributor Note:
# All config calls are delayed until needed in this section
# All changes should be made keeping in mind that this is also used as a global check
guild = None
mocked = False # used for an accurate delayed role id expansion later.
if not who:
if not who_id:
raise TypeError("Must provide a value for either `who` or `who_id`")
mocked = True
who = discord.Object(id=who_id)
if guild_id:
guild = discord.Object(id=guild_id)
else:
guild = getattr(who, "guild", None)
if await self.is_owner(who):
return True
global_whitelist = await self._whiteblacklist_cache.get_whitelist()
if global_whitelist:
if who.id not in global_whitelist:
return False
else:
# blacklist is only used when whitelist doesn't exist.
global_blacklist = await self._whiteblacklist_cache.get_blacklist()
if who.id in global_blacklist:
return False
if guild:
if guild.owner_id == who.id:
return True
# The delayed expansion of ids to check saves time in the DM case.
# Converting to a set reduces the total lookup time in section
if mocked:
ids = {i for i in (who.id, *(role_ids or [])) if i != guild.id}
else:
# DEP-WARN
# This uses member._roles (getattr is for the user case)
# If this is removed upstream (undocumented)
# there is a silent failure potential, and role blacklist/whitelists will break.
ids = {i for i in (who.id, *(getattr(who, "_roles", []))) if i != guild.id}
guild_whitelist = await self._whiteblacklist_cache.get_whitelist(guild)
if guild_whitelist:
if ids.isdisjoint(guild_whitelist):
return False
else:
guild_blacklist = await self._whiteblacklist_cache.get_blacklist(guild)
if not ids.isdisjoint(guild_blacklist):
return False
return True
async def ignored_channel_or_guild(self, ctx: commands.Context) -> bool:
"""
This checks if the bot is meant to be ignoring commands in a channel or guild,
as considered by Red's whitelist and blacklist.
Parameters
----------
ctx : Context of where the command is being run.
Returns
-------
bool
`True` if commands are allowed in the channel, `False` otherwise
"""
perms = ctx.channel.permissions_for(ctx.author)
surpass_ignore = (
isinstance(ctx.channel, discord.abc.PrivateChannel)
or perms.manage_guild
or await ctx.bot.is_owner(ctx.author)
or await ctx.bot.is_admin(ctx.author)
)
if surpass_ignore:
return True
guild_ignored = await self._ignored_cache.get_ignored_guild(ctx.guild)
chann_ignored = await self._ignored_cache.get_ignored_channel(ctx.channel)
return not (guild_ignored or chann_ignored and not perms.manage_channels)
async def get_valid_prefixes(self, guild: Optional[discord.Guild] = None) -> List[str]:
"""
This gets the valid prefixes for a guild.
If not provided a guild (or passed None) it will give the DM prefixes.
This is just a fancy wrapper around ``get_prefix``
Parameters
----------
guild : Optional[discord.Guild]
The guild you want prefixes for. Omit (or pass None) for the DM prefixes
Returns
-------
List[str]
If a guild was specified, the valid prefixes in that guild.
If a guild was not specified, the valid prefixes for DMs
"""
return await self.get_prefix(NotMessage(guild))
async def get_embed_color(self, location: discord.abc.Messageable) -> discord.Color:
"""
Get the embed color for a location. This takes into account all related settings.
Parameters
----------
location : `discord.abc.Messageable`
Location to check embed color for.
Returns
-------
discord.Color
Embed color for the provided location.
"""
guild = getattr(location, "guild", None)
if (
guild
and await self._config.guild(guild).use_bot_color()
and not isinstance(location, discord.Member)
):
return guild.me.color
return self._color
get_embed_colour = get_embed_color
# start config migrations
async def _maybe_update_config(self):
"""
This should be run prior to loading cogs or connecting to discord.
"""
schema_version = await self._config.schema_version()
if schema_version == 0:
await self._schema_0_to_1()
schema_version += 1
await self._config.schema_version.set(schema_version)
if schema_version == 1:
await self._schema_1_to_2()
schema_version += 1
await self._config.schema_version.set(schema_version)
async def _schema_1_to_2(self):
"""
This contains the migration of shared API tokens to a custom config scope
"""
log.info("Moving shared API tokens to a custom group")
all_shared_api_tokens = await self._config.get_raw("api_tokens", default={})
for service_name, token_mapping in all_shared_api_tokens.items():
service_partial = self._config.custom(SHARED_API_TOKENS, service_name)
async with service_partial.all() as basically_bulk_update:
basically_bulk_update.update(token_mapping)
await self._config.clear_raw("api_tokens")
async def _schema_0_to_1(self):
"""
This contains the migration to allow multiple mod and multiple admin roles.
"""
log.info("Begin updating guild configs to support multiple mod/admin roles")
all_guild_data = await self._config.all_guilds()
for guild_id, guild_data in all_guild_data.items():
guild_obj = discord.Object(id=guild_id)
mod_roles, admin_roles = [], []
maybe_mod_role_id = guild_data["mod_role"]
maybe_admin_role_id = guild_data["admin_role"]
if maybe_mod_role_id:
mod_roles.append(maybe_mod_role_id)
await self._config.guild(guild_obj).mod_role.set(mod_roles)
if maybe_admin_role_id:
admin_roles.append(maybe_admin_role_id)
await self._config.guild(guild_obj).admin_role.set(admin_roles)
log.info("Done updating guild configs to support multiple mod/admin roles")
# end Config migrations
async def pre_flight(self, cli_flags):
"""
This should only be run once, prior to connecting to discord.
"""
await self._maybe_update_config()
self.description = await self._config.description()
init_global_checks(self)
init_events(self, cli_flags)
if self.owner_id is None:
self.owner_id = await self._config.owner()
i18n_locale = await self._config.locale()
i18n.set_locale(i18n_locale)
self.add_cog(Core(self))
self.add_cog(CogManagerUI())
self.add_command(license_info_command)
if cli_flags.dev:
self.add_cog(Dev())
await modlog._init(self)
bank._init()
packages = []
last_system_info = await self._config.last_system_info()
async def notify_owners(content: str) -> None:
destinations = await self.get_owner_notification_destinations()
for destination in destinations:
prefixes = await self.get_valid_prefixes(getattr(destination, "guild", None))
prefix = prefixes[0]
try:
await destination.send(content.format(prefix=prefix))
except Exception as _exc:
log.exception(
f"I could not send an owner notification to ({destination.id}){destination}"
)
ver_info = list(sys.version_info[:2])
python_version_changed = False
LIB_PATH = cog_data_path(raw_name="Downloader") / "lib"
if ver_info != last_system_info["python_version"]:
await self._config.last_system_info.python_version.set(ver_info)
if any(LIB_PATH.iterdir()):
shutil.rmtree(str(LIB_PATH))
LIB_PATH.mkdir()
self.loop.create_task(
notify_owners(
"We detected a change in minor Python version"
" and cleared packages in lib folder.\n"
"The instance was started with no cogs, please load Downloader"
" and use `{prefix}cog reinstallreqs` to regenerate lib folder."
" After that, restart the bot to get"
" all of your previously loaded cogs loaded again."
)
)
python_version_changed = True
else:
if cli_flags.no_cogs is False:
packages.extend(await self._config.packages())
if cli_flags.load_cogs:
packages.extend(cli_flags.load_cogs)
system_changed = False
machine = platform.machine()
system = platform.system()
if last_system_info["machine"] is None:
await self._config.last_system_info.machine.set(machine)
elif last_system_info["machine"] != machine:
await self._config.last_system_info.machine.set(machine)
system_changed = True
if last_system_info["system"] is None:
await self._config.last_system_info.system.set(system)
elif last_system_info["system"] != system:
await self._config.last_system_info.system.set(system)
system_changed = True
if system_changed and not python_version_changed:
self.loop.create_task(
notify_owners(
"We detected a possible change in machine's operating system"
" or architecture. You might need to regenerate your lib folder"
" if 3rd-party cogs stop working properly.\n"
"To regenerate lib folder, load Downloader and use `{prefix}cog reinstallreqs`."
)
)
if packages:
# Load permissions first, for security reasons
try:
packages.remove("permissions")
except ValueError:
pass
else:
packages.insert(0, "permissions")
to_remove = []
print("Loading packages...")
for package in packages:
try:
spec = await self._cog_mgr.find_cog(package)
await asyncio.wait_for(self.load_extension(spec), 30)
except asyncio.TimeoutError:
log.exception("Failed to load package %s (timeout)", package)
to_remove.append(package)
except Exception as e:
log.exception("Failed to load package {}".format(package), exc_info=e)
await self.remove_loaded_package(package)
to_remove.append(package)
for package in to_remove:
packages.remove(package)
if packages:
print("Loaded packages: " + ", ".join(packages))
if self.rpc_enabled:
await self.rpc.initialize(self.rpc_port)
async def start(self, *args, **kwargs):
cli_flags = kwargs.pop("cli_flags")
await self.pre_flight(cli_flags=cli_flags)
return await super().start(*args, **kwargs)
async def send_help_for(
self, ctx: commands.Context, help_for: Union[commands.Command, commands.GroupMixin, str]
):
"""
Invokes Red's helpformatter for a given context and object.
"""
return await self._help_formatter.send_help(ctx, help_for)
async def embed_requested(self, channel, user, command=None) -> bool:
"""
Determine if an embed is requested for a response.
Parameters
----------
channel : `discord.abc.GuildChannel` or `discord.abc.PrivateChannel`
The channel to check embed settings for.
user : `discord.abc.User`
The user to check embed settings for.
command
(Optional) the command ran.
Returns
-------
bool
:code:`True` if an embed is requested
"""
if isinstance(channel, discord.abc.PrivateChannel):
user_setting = await self._config.user(user).embeds()
if user_setting is not None:
return user_setting
else:
channel_setting = await self._config.channel(channel).embeds()
if channel_setting is not None:
return channel_setting
guild_setting = await self._config.guild(channel.guild).embeds()
if guild_setting is not None:
return guild_setting
global_setting = await self._config.embeds()
return global_setting
async def is_owner(self, user: Union[discord.User, discord.Member]) -> bool:
"""
Determines if the user should be considered a bot owner.
This takes into account CLI flags and application ownership.
By default,
application team members are not considered owners,
while individual application owners are.
Parameters
----------
user: Union[discord.User, discord.Member]
Returns
-------
bool
"""
if user.id in self._co_owners:
return True
if self.owner_id:
return self.owner_id == user.id
elif self.owner_ids:
return user.id in self.owner_ids
else:
app = await self.application_info()
if app.team:
if self._use_team_features:
self.owner_ids = ids = {m.id for m in app.team.members}
return user.id in ids
else:
self.owner_id = owner_id = app.owner.id
return user.id == owner_id
return False
async def is_admin(self, member: discord.Member) -> bool:
"""Checks if a member is an admin of their guild."""
try:
member_snowflakes = member._roles # DEP-WARN
for snowflake in await self._config.guild(member.guild).admin_role():
if member_snowflakes.has(snowflake): # Dep-WARN
return True
except AttributeError: # someone passed a webhook to this
pass
return False
async def is_mod(self, member: discord.Member) -> bool:
"""Checks if a member is a mod or admin of their guild."""
try:
member_snowflakes = member._roles # DEP-WARN
for snowflake in await self._config.guild(member.guild).admin_role():
if member_snowflakes.has(snowflake): # DEP-WARN
return True
for snowflake in await self._config.guild(member.guild).mod_role():
if member_snowflakes.has(snowflake): # DEP-WARN
return True
except AttributeError: # someone passed a webhook to this
pass
return False
async def get_admin_roles(self, guild: discord.Guild) -> List[discord.Role]:
"""
Gets the admin roles for a guild.
"""
ret: List[discord.Role] = []
for snowflake in await self._config.guild(guild).admin_role():
r = guild.get_role(snowflake)
if r:
ret.append(r)
return ret
async def get_mod_roles(self, guild: discord.Guild) -> List[discord.Role]:
"""
Gets the mod roles for a guild.
"""
ret: List[discord.Role] = []
for snowflake in await self._config.guild(guild).mod_role():
r = guild.get_role(snowflake)
if r:
ret.append(r)
return ret
async def get_admin_role_ids(self, guild_id: int) -> List[int]:
"""
Gets the admin role ids for a guild id.
"""
return await self._config.guild(discord.Object(id=guild_id)).admin_role()
async def get_mod_role_ids(self, guild_id: int) -> List[int]:
"""
Gets the mod role ids for a guild id.
"""
return await self._config.guild(discord.Object(id=guild_id)).mod_role()
async def get_shared_api_tokens(self, service_name: str) -> Dict[str, str]:
"""
Gets the shared API tokens for a service
Parameters
----------
service_name: str
The service to get tokens for.
Returns
-------
Dict[str, str]
A Mapping of token names to tokens.
This mapping exists because some services have multiple tokens.
"""
return await self._config.custom(SHARED_API_TOKENS, service_name).all()
async def set_shared_api_tokens(self, service_name: str, **tokens: str):
"""
Sets shared API tokens for a service
In most cases, this should not be used. Users should instead be using the
``set api`` command
This will not clear existing values not specified.
Parameters
----------
service_name: str
The service to set tokens for
**tokens
token_name -> token
Examples
--------
Setting the api_key for youtube from a value in a variable ``my_key``
>>> await ctx.bot.set_shared_api_tokens("youtube", api_key=my_key)
"""
async with self._config.custom(SHARED_API_TOKENS, service_name).all() as group:
group.update(tokens)
self.dispatch("red_api_tokens_update", service_name, MappingProxyType(group))
async def remove_shared_api_tokens(self, service_name: str, *token_names: str):
"""
Removes shared API tokens
Parameters
----------
service_name: str
The service to remove tokens for
*token_names: str
The name of each token to be removed
Examples
--------
Removing the api_key for youtube
>>> await ctx.bot.remove_shared_api_tokens("youtube", "api_key")
"""
async with self._config.custom(SHARED_API_TOKENS, service_name).all() as group:
for name in token_names:
group.pop(name, None)
async def get_context(self, message, *, cls=commands.Context):
return await super().get_context(message, cls=cls)
async def process_commands(self, message: discord.Message):
"""
Same as base method, but dispatches an additional event for cogs
which want to handle normal messages differently to command
messages, without the overhead of additional get_context calls
per cog.
"""
if not message.author.bot:
ctx = await self.get_context(message)
await self.invoke(ctx)
else:
ctx = None
if ctx is None or ctx.valid is False:
self.dispatch("message_without_command", message)
@staticmethod
def list_packages():
"""Lists packages present in the cogs the folder"""
return os.listdir("cogs")
async def save_packages_status(self, packages):
await self._config.packages.set(packages)
async def add_loaded_package(self, pkg_name: str):
async with self._config.packages() as curr_pkgs:
if pkg_name not in curr_pkgs:
curr_pkgs.append(pkg_name)
async def remove_loaded_package(self, pkg_name: str):
async with self._config.packages() as curr_pkgs:
while pkg_name in curr_pkgs:
curr_pkgs.remove(pkg_name)
async def load_extension(self, spec: ModuleSpec):
# NB: this completely bypasses `discord.ext.commands.Bot._load_from_module_spec`
name = spec.name.split(".")[-1]
if name in self.extensions:
raise errors.PackageAlreadyLoaded(spec)
lib = spec.loader.load_module()
if not hasattr(lib, "setup"):
del lib
raise discord.ClientException(f"extension {name} does not have a setup function")
try:
if asyncio.iscoroutinefunction(lib.setup):
await lib.setup(self)
else:
lib.setup(self)
except Exception as e:
self._remove_module_references(lib.__name__)
self._call_module_finalizers(lib, name)
raise
else:
self._BotBase__extensions[name] = lib
def remove_cog(self, cogname: str):
cog = self.get_cog(cogname)
if cog is None:
return
for cls in inspect.getmro(cog.__class__):
try:
hook = getattr(cog, f"_{cls.__name__}__permissions_hook")
except AttributeError:
pass
else:
self.remove_permissions_hook(hook)
super().remove_cog(cogname)
cog.requires.reset()
for meth in self.rpc_handlers.pop(cogname.upper(), ()):
self.unregister_rpc_handler(meth)
async def is_automod_immune(
self, to_check: Union[discord.Message, commands.Context, discord.abc.User, discord.Role]
) -> bool:
"""
Checks if the user, message, context, or role should be considered immune from automated
moderation actions.
This will return ``False`` in direct messages.
Parameters
----------
to_check : `discord.Message` or `commands.Context` or `discord.abc.User` or `discord.Role`
Something to check if it would be immune
Returns
-------
bool
``True`` if immune
"""
guild = getattr(to_check, "guild", None)
if not guild:
return False
if isinstance(to_check, discord.Role):
ids_to_check = [to_check.id]
else:
author = getattr(to_check, "author", to_check)
try:
ids_to_check = [r.id for r in author.roles]
except AttributeError:
# webhook messages are a user not member,
# cheaper than isinstance
if author.bot and author.discriminator == "0000":
return True # webhooks require significant permissions to enable.
else:
ids_to_check.append(author.id)
immune_ids = await self._config.guild(guild).autoimmune_ids()
return any(i in immune_ids for i in ids_to_check)
@staticmethod
async def send_filtered(
destination: discord.abc.Messageable,
filter_mass_mentions=True,
filter_roles=True,
filter_invite_links=True,
filter_all_links=False,
**kwargs,
):
"""
This is a convienience wrapper around
discord.abc.Messageable.send
It takes the destination you'd like to send to, which filters to apply
(defaults on mass mentions, and invite links) and any other parameters
normally accepted by destination.send
This should realistically only be used for responding using user provided
input. (unfortunately, including usernames)
Manually crafted messages which dont take any user input have no need of this
Returns
-------
discord.Message
The message that was sent.
"""
content = kwargs.pop("content", None)