From 86e54e2d7a762a097bf02125242b96559f180474 Mon Sep 17 00:00:00 2001 From: Fae Date: Mon, 6 May 2024 19:52:12 -0700 Subject: [PATCH] Add commands for config and permission editing and reloading. (#2397) * Add commands for config and permission editing and reloading. Refactors how options are registered, and adds a new dependency configupdater. * Update example_options.ini * Some fixes for setperms --- config/example_options.ini | 41 +- musicbot/bot.py | 688 +++++++++++++++-- musicbot/config.py | 1448 ++++++++++++++++++++++++++++++------ musicbot/constants.py | 7 + musicbot/constructs.py | 89 ++- musicbot/permissions.py | 621 +++++++++++++--- musicbot/utils.py | 40 +- requirements.txt | 1 + 8 files changed, 2444 insertions(+), 491 deletions(-) diff --git a/config/example_options.ini b/config/example_options.ini index 767d8ce19..99fc63e79 100644 --- a/config/example_options.ini +++ b/config/example_options.ini @@ -102,6 +102,13 @@ UseAutoPlaylist = yes # songs will be played in a sequential order instead. AutoPlaylistRandom = yes +# Enable automatic skip of auto-playlist songs when a user plays a new song. +# This only applies to the current playing song if it was added by the auto-playlist. +AutoPlaylistAutoSkip = yes + +# Remove songs from the auto-playlist if they are found in the song blocklist. +AutoPlaylistRemoveBlocked = yes + # Pause the music when nobody is in a voice channel, until someone joins again. AutoPause = yes @@ -190,8 +197,9 @@ LeaveInactiveVC = no # Default value is 300 seconds. LeaveInactiveVCTimeOut = 300 -# Sets if if the bot should leave immediately once all songs have finished playing. -LeaveAfterSong = no +# If enabled, MusicBot will leave the channel immediately when the song queue is empty. +# Default is set to: no +LeaveAfterQueueEmpty = no # Set a period of seconds that a player can be paused or not playing before it will disconnect. # This setting is independent of LeaveAfterQueueEmpty. @@ -203,14 +211,39 @@ LeavePlayerInactiveFor = 0 # When enabled the bot will automatically rotate between user requested songs. RoundRobinQueue = no -# This setting allows you to quickly enable or disable use of User Blocklist. +# Enable the user block list feature, without emptying the block list. +# Default is set to: yes EnableUserBlocklist = yes # This setting allows you to quickly enable or disable the use of Song Blocklist. -EnableSongBlockList = no +# Default is set to: no +EnableSongBlocklist = no + +# Allow MusicBot to use system ping command to detect network outage and availability. +# This is useful if you keep the bot joined to a channel or playing music 24/7. +# MusicBot must be restarted to enable network testing. +# By default this is disabled. +EnableNetworkChecker = no [Files] +# Configure automatic log file rotation at restart, and limit the number of files kept. +# When disabled, only one log is kept and its contents are replaced each run. +# Default is 0, or disabled. Maximum allowed number is 100." +LogsMaxKept = 0 + +# Configure the log file date format used when LogsMaxKept is enabled. +# If left blank, a warning is logged and the default will be used instead. +# Learn more about time format codes from the tables and data here: +# https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior +# Default value is: ".ended-%Y-%j-%H%m%S" +LogsDateFormat = .ended-%Y-%j-%H%m%S + +# An optional file path to an auto playlist text file. +# Each line of the file will be treated similarly to using the play command. +# Default value is: config/autoplaylist.txt +AutoPlaylistFile = + # Path to your i18n file. Do not set this if you do not know what it does. i18nFile = diff --git a/musicbot/bot.py b/musicbot/bot.py index 775c320d0..a5529b013 100644 --- a/musicbot/bot.py +++ b/musicbot/bot.py @@ -28,6 +28,11 @@ from .aliases import Aliases, AliasesDefault from .config import Config, ConfigDefaults from .constants import ( + DEFAULT_DATA_NAME_CUR_SONG, + DEFAULT_DATA_NAME_QUEUE, + DEFAULT_DATA_NAME_SERVERS, + DEFAULT_OWNER_GROUP_NAME, + DEFAULT_PERMS_GROUP_NAME, DEFAULT_PING_SLEEP, DEFAULT_PING_TARGET, DEFAULT_PING_TIMEOUT, @@ -55,6 +60,7 @@ dev_only, format_size_from_bytes, format_song_duration, + format_time_to_seconds, instance_diff, is_empty_voice_channel, load_file, @@ -117,10 +123,14 @@ def __init__( load_opus_lib() if config_file is None: - config_file = ConfigDefaults.options_file + self._config_file = ConfigDefaults.options_file + else: + self._config_file = config_file if perms_file is None: - perms_file = PermissionsDefaults.perms_file + self._perms_file = PermissionsDefaults.perms_file + else: + self._perms_file = perms_file if aliases_file is None: aliases_file = AliasesDefault.aliases_file @@ -138,9 +148,11 @@ def __init__( self.players: Dict[int, MusicPlayer] = {} self.autojoinable_channels: Set[VoiceableChannel] = set() - self.config = Config(config_file) + self.config = Config(self._config_file) - self.permissions = Permissions(perms_file, grant_all=[self.config.owner_id]) + self.permissions = Permissions(self._perms_file) + # Set the owner ID in case it wasn't auto... + self.permissions.set_owner_id(self.config.owner_id) self.str = Json(self.config.i18n_file) if self.config.usealias: @@ -249,6 +261,10 @@ async def _test_network(self) -> None: A self looping method that tests network connectivity. This will call to the systems ping command and use its return status. """ + if not self.config.enable_network_checker: + log.debug("Network ping test is disabled via config.") + return + if self.logout_called: log.noise("Network ping test is closing down.") # type: ignore[attr-defined] return @@ -265,10 +281,14 @@ async def _test_network(self) -> None: ping_target = DEFAULT_PING_TARGET # Make a ping call based on OS. - ping_path = shutil.which("ping") - if not ping_path: - log.warning("Could not locate path to `ping` system executable.") - ping_path = "ping" + if not hasattr(self, "_mb_ping_exe_path"): + ping_path = shutil.which("ping") + if not ping_path: + log.warning("Could not locate `ping` executable in your environment.") + ping_path = "ping" + setattr(self, "_mb_ping_exe_path", ping_path) + else: + ping_path = getattr(self, "_mb_ping_exe_path", "ping") ping_cmd: List[str] = [] if os.name == "nt": @@ -288,9 +308,25 @@ async def _test_network(self) -> None: stderr=asyncio.subprocess.DEVNULL, ) ping_status = await p.wait() + except FileNotFoundError: + log.error( + "MusicBot could not locate a `ping` command path. Early network outage detection will not function." + "\nMusicBot tried the following command: %s", + " ".join(ping_cmd), + ) + return + except PermissionError: + log.error( + "MusicBot was not allowed to execute the `ping` command. Early network outage detection will not function." + "\nMusicBot tried the following command: %s", + " ".join(ping_cmd), + ) + return except OSError: log.error( - "Your environment may not allow the `ping` system command. Early network outage detection will not function.", + "Your environment may not allow the `ping` system command. Early network outage detection will not function." + "\nMusicBot tried the following command: %s", + " ".join(ping_cmd), exc_info=self.config.debug_mode, ) return @@ -389,8 +425,6 @@ async def _auto_join_channels( # Check guilds for a resumable channel, conditionally override with owner summon. resuming = False for guild in self.guilds: - # TODO: test that this, guild_unavailable hasn't fired in testing yet - # but that don't mean this wont break due to fragile API out-of-order packets... if guild.unavailable: log.warning( "Guild not available, cannot join: %s/%s", guild.id, guild.name @@ -1375,11 +1409,11 @@ async def on_player_entry_added( # if playing auto-playlist track and a user queues a track, # if we're configured to do so, auto skip the auto playlist track. if ( - player.current_entry + self.config.auto_playlist_autoskip + and player.current_entry and player.current_entry.from_auto_playlist and playlist.peek() == entry and not entry.from_auto_playlist - # TODO: and self.config.autoplaylist_autoskip ): log.debug("Automatically skipping auto-playlist entry for queued entry.") player.skip() @@ -1505,9 +1539,7 @@ async def update_now_playing_status(self, set_offline: bool = False) -> None: await self.change_presence(status=status, activity=activity) self.last_status = activity - async def serialize_queue( - self, guild: discord.Guild, *, path: Optional[str] = None - ) -> None: + async def serialize_queue(self, guild: discord.Guild) -> None: """ Serialize the current queue for a server's player to json. """ @@ -1518,8 +1550,7 @@ async def serialize_queue( if not player: return - if path is None: - path = f"data/{guild.id}/queue.json" + path = self.config.data_path.joinpath(str(guild.id), DEFAULT_DATA_NAME_QUEUE) async with self.aiolocks["queue_serialization" + ":" + str(guild.id)]: log.debug("Serializing queue for %s", guild.id) @@ -1532,8 +1563,6 @@ async def deserialize_queue( guild: discord.Guild, voice_client: discord.VoiceClient, playlist: Optional[Playlist] = None, - *, - directory: Optional[str] = None, ) -> Optional[MusicPlayer]: """ Deserialize a saved queue for a server into a MusicPlayer. If no queue is saved, returns None. @@ -1544,23 +1573,20 @@ async def deserialize_queue( if playlist is None: playlist = Playlist(self) - if directory is None: - directory = f"data/{guild.id}/queue.json" + path = self.config.data_path.joinpath(str(guild.id), DEFAULT_DATA_NAME_QUEUE) async with self.aiolocks["queue_serialization:" + str(guild.id)]: - if not os.path.isfile(directory): + if not path.is_file(): return None log.debug("Deserializing queue for %s", guild.id) - with open(directory, "r", encoding="utf8") as f: + with open(path, "r", encoding="utf8") as f: data = f.read() return MusicPlayer.from_json(data, self, voice_client, playlist) - async def write_current_song( - self, guild: discord.Guild, entry: EntryTypes, *, path: Optional[str] = None - ) -> None: + async def write_current_song(self, guild: discord.Guild, entry: EntryTypes) -> None: """ Writes the current song to file """ @@ -1568,8 +1594,7 @@ async def write_current_song( if not player: return - if path is None: - path = f"data/{guild.id}/current.txt" + path = self.config.data_path.joinpath(str(guild.id), DEFAULT_DATA_NAME_CUR_SONG) async with self.aiolocks["current_song:" + str(guild.id)]: log.debug("Writing current song for %s", guild.id) @@ -2068,11 +2093,6 @@ async def _on_ready_once(self) -> None: print(flush=True) - # TODO: if on-demand loading is not good enough, we can load guild specifics here. - # if self.config.enable_options_per_guild: - # for s in self.guilds: - # await self._load_guild_options(s) - # validate bound channels and log them. if self.config.bound_channels: # Get bound channels by ID, and validate that we can use them. @@ -2172,17 +2192,30 @@ async def _on_ready_once(self) -> None: if self.config.show_config_at_start: self._on_ready_log_configs() - # we do this after the config list because it's a lot easier to notice here - if self.config.missing_keys: + # we do this after the config stuff because it's a lot easier to notice here + if self.config.register.ini_missing_options: + missing_list = "\n".join( + str(o) for o in self.config.register.ini_missing_options + ) conf_warn = exceptions.HelpfulError( preface="Detected missing config options!", - issue="Your options.ini file is missing some options. Defaults will be used for this session.\n" - f"Here is a list of options we think are missing:\n {', '.join(self.config.missing_keys)}", + issue=( + "Your config file is missing some options. Defaults will be used for this session.\n" + f"Here is a list of options we think are missing:\n{missing_list}" + ), solution="Check the example_options.ini file for newly added options and copy them to your config.", + footnote="You can also use the `config` command to set the missing options.", ) log.warning(str(conf_warn)[1:]) - # self.loop.create_task(self._on_ready_call_later()) + # Pre-load guild specific data / options. + # TODO: probably change this later for better UI/UX. + if self.config.enable_options_per_guild: + for guild in self.guilds: + # Triggers on-demand task to load data from disk. + self.server_data[guild.id].is_ready() + # context switch to give scheduled task an execution window. + await asyncio.sleep(0) async def _on_ready_always(self) -> None: """ @@ -2225,9 +2258,10 @@ async def _on_ready_ensure_env(self) -> None: """ log.debug("Ensuring data folders exist") for guild in self.guilds: - pathlib.Path(f"data/{guild.id}/").mkdir(exist_ok=True) + self.config.data_path.joinpath(str(guild.id)).mkdir(exist_ok=True) - with open("data/server_names.txt", "w", encoding="utf8") as f: + names_path = self.config.data_path.joinpath(DEFAULT_DATA_NAME_SERVERS) + with open(names_path, "w", encoding="utf8") as f: for guild in sorted(self.guilds, key=lambda s: int(s.id)): f.write(f"{guild.id}: {guild.name}\n") @@ -2724,7 +2758,7 @@ async def cmd_blockuser( async def cmd_blocksong( self, - _player: MusicPlayer, + _player: Optional[MusicPlayer], option: str, leftover_args: List[str], song_subject: str = "", @@ -2773,6 +2807,16 @@ async def cmd_blocksong( f"Subject `{song_subject}` is already in the song block list.\n{status_msg}" ) + # remove song from auto-playlist if it is blocked + if ( + self.config.auto_playlist_remove_on_block + and _player + and _player.current_entry + and song_subject == _player.current_entry.url + and _player.current_entry.from_auto_playlist + ): + await self.remove_url_from_autoplaylist(song_subject) + async with self.aiolocks["song_blocklist"]: self.config.song_blocklist.append_items([song_subject]) @@ -2783,17 +2827,13 @@ async def cmd_blocksong( delete_after=10, ) + # handle "remove" and "-" if not self.config.song_blocklist.is_blocked(song_subject): raise exceptions.CommandError( "The subject is not in the song block list and cannot be removed.", expire_in=10, ) - # TODO: add self.config.autoplaylist_remove_on_block - # if self.config.autoplaylist_remove_on_block - # and song_subject is current_entry.url - # and current_entry.from_auto_playlist - # await self.remove_url_from_autoplaylist(song_subject) async with self.aiolocks["song_blocklist"]: self.config.song_blocklist.remove_items([song_subject]) @@ -4678,7 +4718,7 @@ async def cmd_skip( if permission_force_skip and (force_skip or self.config.legacy_skip): if ( not permission_force_skip - and not permissions.skiplooped + and not permissions.skip_looped and player.repeatsong ): raise exceptions.PermissionsError( @@ -4740,7 +4780,7 @@ async def cmd_skip( ) if skips_remaining <= 0: - if not permissions.skiplooped and player.repeatsong: + if not permissions.skip_looped and player.repeatsong: raise exceptions.PermissionsError( self.str.get( "cmd-skip-vote-noperms-looped-song", @@ -4773,7 +4813,7 @@ async def cmd_skip( ) # TODO: When a song gets skipped, delete the old x needed to skip messages - if not permissions.skiplooped and player.repeatsong: + if not permissions.skip_looped and player.repeatsong: raise exceptions.PermissionsError( self.str.get( "cmd-skip-vote-noperms-looped-song", @@ -4874,6 +4914,249 @@ async def cmd_volume( expire_in=20, ) + @owner_only + async def cmd_config( + self, + user_mentions: List[discord.Member], + channel_mentions: List[discord.abc.GuildChannel], + option: str, + leftover_args: List[str], + ) -> CommandResponse: + """ + Usage: + {command_prefix}config missing + Shows help text about any missing config options. + + {command_prefix}config diff + Lists the names of options which have been changed since loading config file. + + {command_prefix}config list + List the available config options and their sections. + + {command_prefix}config reload + Reload the options.ini file from disk. + + {command_prefix}config help [Section] [Option] + Shows help text for a specific option. + + {command_prefix}config show [Section] [Option] + Display the current value of the option. + + {command_prefix}config save [Section] [Option] + Saves the current current value to the options file. + + {command_prefix}config set [Section] [Option] [value] + Validates the option and sets the config for the session, but not to file. + + This command allows management of MusicBot config options file. + """ + if user_mentions and channel_mentions: + raise exceptions.CommandError( + "Config cannot use channel and user mentions at the same time.", + expire_in=30, + ) + + option = option.lower() + valid_options = [ + "missing", + "diff", + "list", + "save", + "help", + "show", + "set", + "reload", + ] + if option not in valid_options: + raise exceptions.CommandError( + f"Invalid option for command: `{option}`", + expire_in=30, + ) + + # Show missing options with help text. + if option == "missing": + missing = "" + for opt in self.config.register.ini_missing_options: + missing += ( + f"**Missing Option:** `{opt}`\n" + "```" + f"{opt.comment}\n" + f"Default is set to: {opt.default}" + "```\n" + ) + if not missing: + missing = "*All config options are present and accounted for!*" + + return Response( + missing, + delete_after=60, + ) + + # Show options names that have changed since loading. + if option == "diff": + changed = "" + for opt in self.config.register.get_updated_options(): + changed += f"`{str(opt)}`\n" + + if not changed: + changed = "No config options appear to be changed." + else: + changed = f"**Changed Options:**\n{changed}" + + return Response( + changed, + delete_after=60, + ) + + # List all available options. + if option == "list": + non_edit_opts = "" + editable_opts = "" + for opt in self.config.register.option_list: + if opt.editable: + editable_opts += f"`{opt}`\n" + else: + non_edit_opts += f"`{opt}`\n" + + opt_list = ( + f"## Available Options:\n" + f"**Editable Options:**\n{editable_opts}\n" + f"**Manual Edit Only:**\n{non_edit_opts}" + ) + return Response( + opt_list, + delete_after=60, + ) + + # Try to reload options.ini file from disk. + if option == "reload": + try: + new_conf = Config(self._config_file) + await new_conf.async_validate(self) + + self.config = new_conf + + return Response( + "Config options reloaded from file successfully!", + delete_after=30, + ) + except Exception as e: + raise exceptions.CommandError( + f"Unable to reload Config due to the following errror:\n{str(e)}", + expire_in=30, + ) from e + + # sub commands beyond here need 2 leftover_args + if option in ["help", "show", "save"]: + if len(leftover_args) < 2: + raise exceptions.CommandError( + "You must provide a section name and option name for this command.", + expire_in=30, + ) + + # Get the command args from leftovers and check them. + section_arg = leftover_args.pop(0) + option_arg = leftover_args.pop(0) + if user_mentions: + leftover_args += [str(m.id) for m in user_mentions] + if channel_mentions: + leftover_args += [str(ch.id) for ch in channel_mentions] + value_arg = " ".join(leftover_args) + p_opt = self.config.register.get_config_option(section_arg, option_arg) + + if section_arg not in self.config.register.sections: + sects = ", ".join(self.config.register.sections) + raise exceptions.CommandError( + f"The section `{section_arg}` is not available.\n" + f"The available sections are: {sects}", + expire_in=30, + ) + + if p_opt is None: + option_arg = f"[{section_arg}] > {option_arg}" + raise exceptions.CommandError( + f"The option `{option_arg}` is not available.", + expire_in=30, + ) + opt = p_opt + + # Display some commentary about the option and its default. + if option == "help": + default = "\nThis option can only be set by editing the config file." + if opt.editable: + default = f"\nBy default this option is set to: {opt.default}" + return Response( + f"**Option:** `{opt}`\n{opt.comment}{default}", + delete_after=60, + ) + + # Save the current config value to the INI file. + if option == "save": + if not opt.editable: + raise exceptions.CommandError( + f"Option `{opt}` is not editable. Cannot save to disk.", + expire_in=30, + ) + + async with self.aiolocks["config_edit"]: + saved = self.config.save_option(opt) + + if not saved: + raise exceptions.CommandError( + f"Failed to save the option: `{opt}`", + expire_in=30, + ) + return Response( + f"Successfully saved the option: `{opt}`", + delete_after=30, + ) + + # Display the current config and INI file values. + if option == "show": + if not opt.editable: + raise exceptions.CommandError( + f"Option `{opt}` is not editable, value cannot be displayed.", + expire_in=30, + ) + # TODO: perhaps make use of currently unused display value for empty configs. + cur_val, ini_val, _disp_val = self.config.register.get_values(opt) + return Response( + f"**Option:** `{opt}`\n" + f"Current Value: `{cur_val}`\n" + f"INI File Value: `{ini_val}`", + delete_after=30, + ) + + # update a config variable, but don't save it. + if option == "set": + if not opt.editable: + raise exceptions.CommandError( + f"Option `{opt}` is not editable. Cannot update setting.", + expire_in=30, + ) + + if not value_arg: + raise exceptions.CommandError( + "You must provide a section, option, and value for this sub command.", + expire_in=30, + ) + + log.debug("Doing set with on %s == %s", opt, value_arg) + async with self.aiolocks["config_update"]: + updated = self.config.update_option(opt, value_arg) + if not updated: + raise exceptions.CommandError( + f"Option `{opt}` was not updated!", + expire_in=30, + ) + return Response( + f"Option `{opt}` was updated for this session.\n" + f"To save the change use `config save {opt.section} {opt.option}`", + delete_after=30, + ) + + return None + @owner_only async def cmd_option(self, option: str, value: str) -> CommandResponse: """ @@ -5489,24 +5772,280 @@ async def cmd_perms( permissions = self.permissions.for_user(user) if user == author: - lines = [f"Command permissions in {guild.name}\n", "```", "```"] + perms = ( + f"Your command permissions in {guild.name} are:\n" + f"```{permissions.format(for_user=True)}```" + ) else: - lines = [ - f"Command permissions for {user.name} in {guild.name}\n", - "```", - "```", - ] - - for perm in permissions.__dict__: - # TODO: double check this still works as desired. - if perm in ["user_list"] or permissions.__dict__[perm] == set(): - continue - perm_data = permissions.__dict__[perm] - lines.insert(len(lines) - 1, f"{perm}: {perm_data}") + perms = ( + f"The command permissions for {user.name} in {guild.name} are:\n" + f"```{permissions.format()}```" + ) - await self.safe_send_message(author, "\n".join(lines), fallback_channel=channel) + await self.safe_send_message(author, perms, fallback_channel=channel) return Response("\N{OPEN MAILBOX WITH RAISED FLAG}", delete_after=20) + @owner_only + async def cmd_setperms( + self, + user_mentions: List[discord.Member], + leftover_args: List[str], + option: str = "list", + ) -> CommandResponse: + """ + Usage: + {command_prefix}setperms list + show loaded groups and list permission options. + + {command_prefix}setperms reload + reloads permissions from the permissions.ini file. + + {command_prefix}setperms add [GroupName] + add new group with defaults. + + {command_prefix}setperms remove [GroupName] + remove existing group. + + {command_prefix}setperms help [PermName] + show help text for the permission option. + + {command_prefix}setperms show [GroupName] [PermName] + show permission value for given group and permission. + + {command_prefix}setperms save [GroupName] + save permissions group to file. + + {command_prefix}setperms set [GroupName] [PermName] [Value] + set permission value for the group. + """ + if user_mentions: + raise exceptions.CommandError( + "Permissions cannot use channel and user mentions at the same time.", + expire_in=30, + ) + + option = option.lower() + valid_options = [ + "list", + "add", + "remove", + "save", + "help", + "show", + "set", + "reload", + ] + if option not in valid_options: + raise exceptions.CommandError( + f"Invalid option for command: `{option}`", + expire_in=30, + ) + + # Reload the permissions file from disk. + if option == "reload": + try: + new_permissions = Permissions(self._perms_file) + # Set the owner ID in case it wasn't auto... + new_permissions.set_owner_id(self.config.owner_id) + await new_permissions.async_validate(self) + + self.permissions = new_permissions + + return Response( + "Permissions reloaded from file successfully!", + delete_after=30, + ) + except Exception as e: + raise exceptions.CommandError( + f"Unable to reload Permissions due to the following errror:\n{str(e)}", + expire_in=30, + ) from e + + # List permission groups and available permission options. + if option == "list": + gl = [] + for section in self.permissions.register.sections: + gl.append(f"`{section}`\n") + + editable_opts = "" + for opt in self.permissions.register.option_list: + if opt.section != DEFAULT_PERMS_GROUP_NAME: + continue + + # if opt.editable: + editable_opts += f"`{opt.option}`\n" + + groups = "".join(gl) + opt_list = ( + f"## Available Groups:\n{groups}\n" + f"## Available Options:\n" + f"{editable_opts}\n" + ) + return Response( + opt_list, + delete_after=60, + ) + + # sub commands beyond here need 2 leftover_args + if option in ["help", "show", "save", "add", "remove"]: + if len(leftover_args) < 1: + raise exceptions.CommandError( + "You must provide a group or option name for this command.", + expire_in=30, + ) + if option == "set" and len(leftover_args) < 3: + raise exceptions.CommandError( + "You must provide a group, option, and value to set for this command.", + expire_in=30, + ) + + # Get the command args from leftovers and check them. + group_arg = "" + option_arg = "" + if option == "help": + group_arg = DEFAULT_PERMS_GROUP_NAME + option_arg = leftover_args.pop(0) + else: + group_arg = leftover_args.pop(0) + if option in ["set", "show"]: + if not leftover_args: + raise exceptions.CommandError( + f"The {option} sub-command requires a group and permission name.", + expire_in=30, + ) + option_arg = leftover_args.pop(0) + + if user_mentions: + leftover_args += [str(m.id) for m in user_mentions] + value_arg = " ".join(leftover_args) + + if group_arg not in self.permissions.register.sections and option != "add": + sects = ", ".join(self.permissions.register.sections) + raise exceptions.CommandError( + f"The group `{group_arg}` is not available.\n" + f"The available groups are: {sects}", + expire_in=30, + ) + + # Make sure the option is set if the sub-command needs it. + if option in ["help", "set", "show"]: + p_opt = self.permissions.register.get_config_option(group_arg, option_arg) + if p_opt is None: + option_arg = f"[{group_arg}] > {option_arg}" + raise exceptions.CommandError( + f"The permission `{option_arg}` is not available.", + expire_in=30, + ) + opt = p_opt + + # Display some commentary about the option and its default. + if option == "help": + default = ( + "\nThis permission can only be set by editing the permissions file." + ) + # TODO: perhaps use empty display values here. + if opt.editable: + dval = self.permissions.register.to_ini(opt, use_default=True) + default = f"\nBy default this permission is set to: `{dval}`" + return Response( + f"**Permission:** `{opt.option}`\n{opt.comment}{default}", + delete_after=60, + ) + + if option == "add": + if group_arg in self.permissions.register.sections: + raise exceptions.CommandError( + f"Cannot add group `{group_arg}` it already exists.", + expire_in=30, + ) + async with self.aiolocks["permission_edit"]: + self.permissions.add_group(group_arg) + + return Response( + f"Successfully added new group: `{group_arg}`\n" + f"You can now customizse the permissions with: `setperms set {group_arg}`\n" + f"Make sure to save the new group with: `setperms save {group_arg}`", + delete_after=30, + ) + + if option == "remove": + if group_arg in [DEFAULT_OWNER_GROUP_NAME, DEFAULT_PERMS_GROUP_NAME]: + raise exceptions.CommandError( + "Cannot remove built-in group.", expire_in=30 + ) + + async with self.aiolocks["permission_edit"]: + self.permissions.remove_group(group_arg) + + return Response( + f"Successfully removed group: `{group_arg}`" + f"Make sure to save this change with: `setperms save {group_arg}`", + delete_after=30, + ) + + # Save the current config value to the INI file. + if option == "save": + if group_arg == DEFAULT_OWNER_GROUP_NAME: + raise exceptions.CommandError( + "The owner group is not editable.", + expire_in=30, + ) + + async with self.aiolocks["permission_edit"]: + saved = self.permissions.save_group(group_arg) + + if not saved: + raise exceptions.CommandError( + f"Failed to save the group: `{group_arg}`", + expire_in=30, + ) + return Response( + f"Successfully saved the group: `{group_arg}`", + delete_after=30, + ) + + # Display the current permissions group and INI file values. + if option == "show": + cur_val, ini_val, empty_display_val = self.permissions.register.get_values( + opt + ) + return Response( + f"**Permission:** `{opt}`\n" + f"Current Value: `{cur_val}` {empty_display_val}\n" + f"INI File Value: `{ini_val}`", + delete_after=30, + ) + + # update a permission, but don't save it. + if option == "set": + if group_arg == DEFAULT_OWNER_GROUP_NAME: + raise exceptions.CommandError( + "The owner group is not editable.", + expire_in=30, + ) + + if not value_arg: + raise exceptions.CommandError( + "You must provide a section, option, and value for this sub command.", + expire_in=30, + ) + + log.debug("Doing set with on %s == %s", opt, value_arg) + async with self.aiolocks["permission_update"]: + updated = self.permissions.update_option(opt, value_arg) + if not updated: + raise exceptions.CommandError( + f"Permission `{opt}` was not updated!", + expire_in=30, + ) + return Response( + f"Permission `{opt}` was updated for this session.\n" + f"To save the change use `setperms save {opt.section} {opt.option}`", + delete_after=30, + ) + + return None + @owner_only async def cmd_setname(self, leftover_args: List[str], name: str) -> CommandResponse: """ @@ -6309,24 +6848,9 @@ async def on_message(self, message: discord.Message) -> None: handler_kwargs[key] = arg_value params.pop(key) + # Test non-owners for command permissions. if message.author.id != self.config.owner_id: - if ( - user_permissions.command_whitelist - and command not in user_permissions.command_whitelist - ): - raise exceptions.PermissionsError( - f"This command is not enabled for your group ({user_permissions.name}).", - expire_in=20, - ) - - if ( - user_permissions.command_blacklist - and command in user_permissions.command_blacklist - ): - raise exceptions.PermissionsError( - f"This command is disabled for your group ({user_permissions.name}).", - expire_in=20, - ) + user_permissions.can_use_command(command) # Invalid usage, return docstring if params: @@ -6706,7 +7230,7 @@ async def on_guild_join(self, guild: discord.Guild) -> None: ) log.debug("Creating data folder for guild %s", guild.id) - pathlib.Path(f"data/{guild.id}/").mkdir(exist_ok=True) + self.config.data_path.joinpath(str(guild.id)).mkdir(exist_ok=True) async def on_guild_remove(self, guild: discord.Guild) -> None: """ diff --git a/musicbot/config.py b/musicbot/config.py index 8c9b234d0..8e863de71 100644 --- a/musicbot/config.py +++ b/musicbot/config.py @@ -1,31 +1,63 @@ import configparser +import datetime import logging import os import pathlib import shutil import sys -from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Set, Tuple, Union +from typing import ( + TYPE_CHECKING, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + Union, + overload, +) + +import configupdater from .constants import ( BUNDLED_AUTOPLAYLIST_FILE, DEFAULT_AUDIO_CACHE_PATH, DEFAULT_AUTOPLAYLIST_FILE, + DEFAULT_DATA_NAME_SERVERS, + DEFAULT_DATA_PATH, DEFAULT_FOOTER_TEXT, DEFAULT_I18N_FILE, DEFAULT_LOG_LEVEL, + DEFAULT_LOGS_KEPT, + DEFAULT_LOGS_ROTATE_FORMAT, DEFAULT_OPTIONS_FILE, DEFAULT_SONG_BLOCKLIST_FILE, DEFAULT_USER_BLOCKLIST_FILE, DEPRECATED_USER_BLACKLIST, EXAMPLE_OPTIONS_FILE, + MAXIMUM_LOGS_LIMIT, ) from .exceptions import HelpfulError -from .utils import format_size_to_bytes, format_time_to_seconds, set_logging_level +from .utils import ( + format_size_from_bytes, + format_size_to_bytes, + format_time_to_seconds, + set_logging_level, + set_logging_max_kept_logs, + set_logging_rotate_date_format, +) if TYPE_CHECKING: import discord from .bot import MusicBot + from .permissions import Permissions + +# Type for ConfigParser.get(... vars) argument +ConfVars = Optional[Dict[str, str]] +# Types considered valid for config options. +DebugLevel = Tuple[str, int] +RegTypes = Union[str, int, bool, float, Set[int], Set[str], DebugLevel, pathlib.Path] log = logging.getLogger(__name__) @@ -47,6 +79,13 @@ def create_file_ifnoexist( class Config: + """ + This object is responsible for loading and validating config, using default + values where needed. It provides interfaces to read and set the state of + config values, and finally a method to update the config file with values + from this instance of config. + """ + def __init__(self, config_file: pathlib.Path) -> None: """ Handles locating, initializing, loading, and validating config data. @@ -61,291 +100,617 @@ def __init__(self, config_file: pathlib.Path) -> None: self.config_file = config_file self.find_config() + # TODO: maybe some mechanism to handle config renames? config = ExtendedConfigParser() config.read(config_file, encoding="utf-8") - - confsections = { - "Credentials", - "Permissions", - "Chat", - "MusicBot", - "Files", - }.difference(config.sections()) - if confsections: - sections_str = ", ".join([f"[{s}]" for s in confsections]) - raise HelpfulError( - "One or more required config sections are missing.", - "Fix your config. Each [Section] should be on its own line with " - f"nothing else on it. The following sections are missing: {sections_str}", - preface="An error has occured parsing the config:\n", - ) + self.register = ConfigOptionRegistry(self, config) self._confpreface = "An error has occured reading the config:\n" self._confpreface2 = "An error has occured validating the config:\n" - self._login_token: str = config.get( - "Credentials", "Token", fallback=ConfigDefaults.token + # DebugLevel is important for feedback, so we load it first. + self._debug_level: DebugLevel = self.register.init_option( + section="MusicBot", + option="DebugLevel", + dest="_debug_level", + default=ConfigDefaults._debug_level(), + getter="getdebuglevel", + comment=( + "Set the log verbosity of MusicBot. Normally this should be set to INFO.\n" + "It can be set to one of the following:\n" + " CRITICAL, ERROR, WARNING, INFO, DEBUG, VOICEDEBUG, FFMPEG, NOISY, or EVERYTHING" + ), + editable=False, ) + self.debug_level_str: str = self._debug_level[0] + self.debug_level: int = self._debug_level[1] + self.debug_mode: bool = self.debug_level <= logging.DEBUG + set_logging_level(self.debug_level) + # This gets filled in later while checking for token in the environment vars. self.auth: Tuple[str] = ("",) + self._login_token: str = self.register.init_option( + section="Credentials", + option="Token", + dest="_login_token", + getter="get", + default=ConfigDefaults.token, + comment="Discord bot authentication token for your Bot. Visit Discord Developer Portal to create a bot App and generate your Token. Never publish your bot token!", + editable=False, + ) - self.spotify_clientid = config.get( - "Credentials", "Spotify_ClientID", fallback=ConfigDefaults.spotify_clientid + self.spotify_clientid = self.register.init_option( + section="Credentials", + option="Spotify_ClientID", + dest="spotify_clientid", + default=ConfigDefaults.spotify_clientid, + comment="Provide an optional Spotify Client ID to enable MusicBot to interact with Spotify API.", + editable=False, ) - self.spotify_clientsecret = config.get( - "Credentials", - "Spotify_ClientSecret", - fallback=ConfigDefaults.spotify_clientsecret, + self.spotify_clientsecret = self.register.init_option( + section="Credentials", + option="Spotify_ClientSecret", + dest="spotify_clientsecret", + default=ConfigDefaults.spotify_clientsecret, + comment="Provide an optional Spotify Client Secret to enable MusicBot to interact with Spotify API.", + editable=False, ) - self.owner_id: int = config.getownerid( - "Permissions", "OwnerID", fallback=ConfigDefaults.owner_id + self.owner_id: int = self.register.init_option( + section="Permissions", + option="OwnerID", + dest="owner_id", + default=ConfigDefaults.owner_id, + comment="Provide a Discord User ID number or the word 'auto' to set the owner of this bot.", + getter="getownerid", + editable=False, ) - self.dev_ids: Set[int] = config.getidset( - "Permissions", "DevIDs", fallback=ConfigDefaults.dev_ids + self.dev_ids: Set[int] = self.register.init_option( + section="Permissions", + option="DevIDs", + dest="dev_ids", + default=ConfigDefaults.dev_ids, + comment=( + "A list of Discord User ID numbers who can remotely execute code using MusicBot dev-only commands. " + "Warning, you should only set this if you plan to do development of MusicBot!" + ), + getter="getidset", + editable=False, ) - self.bot_exception_ids = config.getidset( - "Permissions", "BotExceptionIDs", fallback=ConfigDefaults.bot_exception_ids + + self.bot_exception_ids: Set[int] = self.register.init_option( + section="Permissions", + option="BotExceptionIDs", + dest="bot_exception_ids", + getter="getidset", + default=ConfigDefaults.bot_exception_ids, + comment="Discord Member IDs for other bots that MusicBot should not ignore. All bots are ignored by default.", ) - self.command_prefix = config.get( - "Chat", "CommandPrefix", fallback=ConfigDefaults.command_prefix + self.command_prefix: str = self.register.init_option( + section="Chat", + option="CommandPrefix", + dest="command_prefix", + default=ConfigDefaults.command_prefix, + comment="Command prefix is how all MusicBot commands must be started", ) - self.bound_channels = config.getidset( - "Chat", "BindToChannels", fallback=ConfigDefaults.bound_channels + self.bound_channels: Set[int] = self.register.init_option( + section="Chat", + option="BindToChannels", + dest="bound_channels", + default=ConfigDefaults.bound_channels, + getter="getidset", + comment=( + "ID numbers for text channels that MusicBot should exclusively use for commands." + " All channels are used if this is not set." + ), ) - self.unbound_servers = config.getboolean( - "Chat", "AllowUnboundServers", fallback=ConfigDefaults.unbound_servers + self.unbound_servers: bool = self.register.init_option( + section="Chat", + option="AllowUnboundServers", + dest="unbound_servers", + default=ConfigDefaults.unbound_servers, + getter="getboolean", + comment="Allow MusicBot to respond in all text channels of a server, when no channels are set in BindToChannels option.", ) - self.autojoin_channels = config.getidset( - "Chat", "AutojoinChannels", fallback=ConfigDefaults.autojoin_channels + self.autojoin_channels: Set[int] = self.register.init_option( + section="Chat", + option="AutojoinChannels", + dest="autojoin_channels", + default=ConfigDefaults.autojoin_channels, + getter="getidset", + comment="A list of Voice Channel IDs that MusicBot should automatically join on start up.", ) - self.dm_nowplaying = config.getboolean( - "Chat", "DMNowPlaying", fallback=ConfigDefaults.dm_nowplaying + self.dm_nowplaying: bool = self.register.init_option( + section="Chat", + option="DMNowPlaying", + dest="dm_nowplaying", + default=ConfigDefaults.dm_nowplaying, + getter="getboolean", + comment="MusicBot will try to send Now Playing notices directly to the member who requested the song instead of posting in server channel.", ) - self.no_nowplaying_auto = config.getboolean( - "Chat", - "DisableNowPlayingAutomatic", - fallback=ConfigDefaults.no_nowplaying_auto, + self.no_nowplaying_auto: bool = self.register.init_option( + section="Chat", + option="DisableNowPlayingAutomatic", + dest="no_nowplaying_auto", + default=ConfigDefaults.no_nowplaying_auto, + getter="getboolean", + comment="Disable now playing messages for songs played via auto playlist.", ) - self.nowplaying_channels = config.getidset( - "Chat", "NowPlayingChannels", fallback=ConfigDefaults.nowplaying_channels + self.nowplaying_channels: Set[int] = self.register.init_option( + section="Chat", + option="NowPlayingChannels", + dest="nowplaying_channels", + default=ConfigDefaults.nowplaying_channels, + getter="getidset", + comment="Forces MusicBot to use a specific channel to send now playing messages. One text channel ID per server.", ) - self.delete_nowplaying = config.getboolean( - "Chat", "DeleteNowPlaying", fallback=ConfigDefaults.delete_nowplaying + self.delete_nowplaying: bool = self.register.init_option( + section="Chat", + option="DeleteNowPlaying", + dest="delete_nowplaying", + default=ConfigDefaults.delete_nowplaying, + getter="getboolean", + comment="MusicBot will automatically delete Now Playing messages.", ) - self.default_volume = config.getfloat( - "MusicBot", "DefaultVolume", fallback=ConfigDefaults.default_volume + self.default_volume: float = self.register.init_option( + section="MusicBot", + option="DefaultVolume", + dest="default_volume", + default=ConfigDefaults.default_volume, + getter="getpercent", + comment=( + "Sets the default volume level MusicBot will play songs at. " + "Must be a value from 0 to 1 inclusive." + ), + ) + self.skips_required: int = self.register.init_option( + section="MusicBot", + option="SkipsRequired", + dest="skips_required", + default=ConfigDefaults.skips_required, + getter="getint", + comment=( + "Number of members required to skip a song. " + "Acts as a minimum when SkipRatio would require more votes." + ), + ) + self.skip_ratio_required: float = self.register.init_option( + section="MusicBot", + option="SkipRatio", + dest="skip_ratio_required", + default=ConfigDefaults.skip_ratio_required, + getter="getpercent", + comment="This percent of listeners must vote for skip. If SkipsRequired is lower it will be used instead.", ) - self.skips_required = config.getint( - "MusicBot", "SkipsRequired", fallback=ConfigDefaults.skips_required + self.save_videos: bool = self.register.init_option( + section="MusicBot", + option="SaveVideos", + dest="save_videos", + default=ConfigDefaults.save_videos, + getter="getboolean", + comment="Allow MusicBot to keep downloaded media, or delete it right away.", ) - self.skip_ratio_required = config.getfloat( - "MusicBot", "SkipRatio", fallback=ConfigDefaults.skip_ratio_required + self.storage_limit_bytes: int = self.register.init_option( + section="MusicBot", + option="StorageLimitBytes", + dest="storage_limit_bytes", + default=ConfigDefaults.storage_limit_bytes, + getter="getdatasize", + comment="If SaveVideos is enabled, set a limit on how much storage space should be used.", ) - self.save_videos = config.getboolean( - "MusicBot", "SaveVideos", fallback=ConfigDefaults.save_videos + self.storage_limit_days: int = self.register.init_option( + section="MusicBot", + option="StorageLimitDays", + dest="storage_limit_days", + default=ConfigDefaults.storage_limit_days, + getter="getint", + comment="If SaveVideos is enabled, set a limit on how long files should be kept.", ) - self.storage_limit_bytes = config.getdatasize( - "MusicBot", "StorageLimitBytes", fallback=ConfigDefaults.storage_limit_bytes + self.storage_retain_autoplay: bool = self.register.init_option( + section="MusicBot", + option="StorageRetainAutoPlay", + dest="storage_retain_autoplay", + default=ConfigDefaults.storage_retain_autoplay, + getter="getboolean", + comment="If SaveVideos is enabled, never purge auto playlist songs from the cache.", ) - self.storage_limit_days = config.getint( - "MusicBot", "StorageLimitDays", fallback=ConfigDefaults.storage_limit_days + self.now_playing_mentions: bool = self.register.init_option( + section="MusicBot", + option="NowPlayingMentions", + dest="now_playing_mentions", + default=ConfigDefaults.now_playing_mentions, + getter="getboolean", + comment="Mention the user who added the song when it is played.", ) - self.storage_retain_autoplay = config.getboolean( - "MusicBot", - "StorageRetainAutoPlay", - fallback=ConfigDefaults.storage_retain_autoplay, + self.auto_summon: bool = self.register.init_option( + section="MusicBot", + option="AutoSummon", + dest="auto_summon", + default=ConfigDefaults.auto_summon, + getter="getboolean", + comment="Automatically join the owner if they are in an accessible voice channel when bot starts.", ) - self.now_playing_mentions = config.getboolean( - "MusicBot", - "NowPlayingMentions", - fallback=ConfigDefaults.now_playing_mentions, + self.auto_playlist: bool = self.register.init_option( + section="MusicBot", + option="UseAutoPlaylist", + dest="auto_playlist", + default=ConfigDefaults.auto_playlist, + getter="getboolean", + comment="Enable MusicBot to automatically play music from the autoplaylist.txt", ) - self.auto_summon = config.getboolean( - "MusicBot", "AutoSummon", fallback=ConfigDefaults.auto_summon + self.auto_playlist_random: bool = self.register.init_option( + section="MusicBot", + option="AutoPlaylistRandom", + dest="auto_playlist_random", + default=ConfigDefaults.auto_playlist_random, + getter="getboolean", + comment="Shuffles the autoplaylist tracks before playing them.", ) - self.auto_playlist = config.getboolean( - "MusicBot", "UseAutoPlaylist", fallback=ConfigDefaults.auto_playlist + self.auto_playlist_autoskip: bool = self.register.init_option( + section="MusicBot", + option="AutoPlaylistAutoSkip", + dest="auto_playlist_autoskip", + default=ConfigDefaults.auto_playlist_autoskip, + getter="getboolean", + comment=( + "Enable automatic skip of auto-playlist songs when a user plays a new song.\n" + "This only applies to the current playing song if it was added by the auto-playlist." + ), ) - self.auto_playlist_random = config.getboolean( - "MusicBot", - "AutoPlaylistRandom", - fallback=ConfigDefaults.auto_playlist_random, + # TODO: this option needs more implementation to ensure blocked tracks are removed. + self.auto_playlist_remove_on_block: bool = self.register.init_option( + section="MusicBot", + option="AutoPlaylistRemoveBlocked", + dest="auto_playlist_remove_on_block", + default=ConfigDefaults.auto_playlist_remove_on_block, + getter="getboolean", + comment="Remove songs from the auto-playlist if they are found in the song blocklist.", ) - self.auto_pause = config.getboolean( - "MusicBot", "AutoPause", fallback=ConfigDefaults.auto_pause + self.auto_pause: bool = self.register.init_option( + section="MusicBot", + option="AutoPause", + dest="auto_pause", + default=ConfigDefaults.auto_pause, + getter="getboolean", + comment="MusicBot will automatically pause playback when no users are listening.", ) - self.delete_messages = config.getboolean( - "MusicBot", "DeleteMessages", fallback=ConfigDefaults.delete_messages + self.delete_messages: bool = self.register.init_option( + section="MusicBot", + option="DeleteMessages", + dest="delete_messages", + default=ConfigDefaults.delete_messages, + getter="getboolean", + comment="Allow MusicBot to automatically delete messages it sends, after a short delay.", ) - self.delete_invoking = config.getboolean( - "MusicBot", "DeleteInvoking", fallback=ConfigDefaults.delete_invoking + self.delete_invoking: bool = self.register.init_option( + section="MusicBot", + option="DeleteInvoking", + dest="delete_invoking", + default=ConfigDefaults.delete_invoking, + getter="getboolean", + comment="Auto delete valid commands after a short delay.", ) - self.persistent_queue = config.getboolean( - "MusicBot", "PersistentQueue", fallback=ConfigDefaults.persistent_queue + self.persistent_queue: bool = self.register.init_option( + section="MusicBot", + option="PersistentQueue", + dest="persistent_queue", + default=ConfigDefaults.persistent_queue, + getter="getboolean", + comment="Allow MusicBot to save the song queue, so they will survive restarts.", ) - self.status_message = config.get( - "MusicBot", "StatusMessage", fallback=ConfigDefaults.status_message + self.status_message: str = self.register.init_option( + section="MusicBot", + option="StatusMessage", + dest="status_message", + default=ConfigDefaults.status_message, + comment="Set a custom status text instead of showing dynamic info about what is playing in bot's activity status.", ) - self.write_current_song = config.getboolean( - "MusicBot", "WriteCurrentSong", fallback=ConfigDefaults.write_current_song + self.write_current_song: bool = self.register.init_option( + section="MusicBot", + option="WriteCurrentSong", + dest="write_current_song", + default=ConfigDefaults.write_current_song, + getter="getboolean", + comment="If enabled, MusicBot will save the track title to: data/{server_ID}/current.txt", ) - self.allow_author_skip = config.getboolean( - "MusicBot", "AllowAuthorSkip", fallback=ConfigDefaults.allow_author_skip + self.allow_author_skip: bool = self.register.init_option( + section="MusicBot", + option="AllowAuthorSkip", + dest="allow_author_skip", + default=ConfigDefaults.allow_author_skip, + getter="getboolean", + comment="Allow the member who requested the song to skip it, bypassing votes.", ) - self.use_experimental_equalization = config.getboolean( - "MusicBot", - "UseExperimentalEqualization", - fallback=ConfigDefaults.use_experimental_equalization, + self.use_experimental_equalization: bool = self.register.init_option( + section="MusicBot", + option="UseExperimentalEqualization", + dest="use_experimental_equalization", + default=ConfigDefaults.use_experimental_equalization, + getter="getboolean", + comment="Tries to use ffmpeg to get volume normalizing options for use in playback.", ) - self.embeds = config.getboolean( - "MusicBot", "UseEmbeds", fallback=ConfigDefaults.embeds + self.embeds: bool = self.register.init_option( + section="MusicBot", + option="UseEmbeds", + dest="embeds", + default=ConfigDefaults.embeds, + getter="getboolean", + comment="Allow MusicBot to format it's messages as embeds.", ) - self.queue_length = config.getint( - "MusicBot", "QueueLength", fallback=ConfigDefaults.queue_length + self.queue_length: int = self.register.init_option( + section="MusicBot", + option="QueueLength", + dest="queue_length", + default=ConfigDefaults.queue_length, + getter="getint", + comment="The number of entries to show per-page when using q command to list the queue.", ) - self.remove_ap = config.getboolean( - "MusicBot", "RemoveFromAPOnError", fallback=ConfigDefaults.remove_ap + self.remove_ap: bool = self.register.init_option( + section="MusicBot", + option="RemoveFromAPOnError", + dest="remove_ap", + default=ConfigDefaults.remove_ap, + getter="getboolean", + comment="Enable MusicBot to automatically remove unplayable entries from tha auto playlist.", ) - self.show_config_at_start = config.getboolean( - "MusicBot", - "ShowConfigOnLaunch", - fallback=ConfigDefaults.show_config_at_start, + self.show_config_at_start: bool = self.register.init_option( + section="MusicBot", + option="ShowConfigOnLaunch", + dest="show_config_at_start", + default=ConfigDefaults.show_config_at_start, + getter="getboolean", + comment="Display MusicBot config settings in the logs at startup.", ) - self.legacy_skip = config.getboolean( - "MusicBot", "LegacySkip", fallback=ConfigDefaults.legacy_skip + self.legacy_skip: bool = self.register.init_option( + section="MusicBot", + option="LegacySkip", + dest="legacy_skip", + default=ConfigDefaults.legacy_skip, + getter="getboolean", + comment="Enable users with the InstaSkip permission to bypass skip voting and force skips.", ) - self.leavenonowners = config.getboolean( - "MusicBot", - "LeaveServersWithoutOwner", - fallback=ConfigDefaults.leavenonowners, + self.leavenonowners: bool = self.register.init_option( + section="MusicBot", + option="LeaveServersWithoutOwner", + dest="leavenonowners", + default=ConfigDefaults.leavenonowners, + getter="getboolean", + comment="If enabled, MusicBot will leave servers if the owner is not in their member list.", ) - self.usealias = config.getboolean( - "MusicBot", "UseAlias", fallback=ConfigDefaults.usealias + self.usealias: bool = self.register.init_option( + section="MusicBot", + option="UseAlias", + dest="usealias", + default=ConfigDefaults.usealias, + getter="getboolean", + comment="If enabled, MusicBot will allow commands to have multiple names using data in: config/aliases.json", ) - self.footer_text = config.get( - "MusicBot", "CustomEmbedFooter", fallback=ConfigDefaults.footer_text + self.footer_text: str = self.register.init_option( + section="MusicBot", + option="CustomEmbedFooter", + dest="footer_text", + default=ConfigDefaults.footer_text, + comment="Replace MusicBot name/version in embed footer with custom text. Only applied when UseEmbeds is enabled and it is not blank.", ) - self.self_deafen = config.getboolean( - "MusicBot", "SelfDeafen", fallback=ConfigDefaults.self_deafen + self.self_deafen: bool = self.register.init_option( + section="MusicBot", + option="SelfDeafen", + dest="self_deafen", + default=ConfigDefaults.self_deafen, + getter="getboolean", + comment="MusicBot will automatically deafen itself when entering a voice channel.", ) - self.leave_inactive_channel = config.getboolean( - "MusicBot", - "LeaveInactiveVC", - fallback=ConfigDefaults.leave_inactive_channel, + self.leave_inactive_channel: bool = self.register.init_option( + section="MusicBot", + option="LeaveInactiveVC", + dest="leave_inactive_channel", + default=ConfigDefaults.leave_inactive_channel, + getter="getboolean", + comment="If enabled, MusicBot will leave a voice channel when no users are listening, after waiting for a period set in LeaveInactiveVCTimeOut.", ) - self.leave_inactive_channel_timeout = config.getduration( - "MusicBot", - "LeaveInactiveVCTimeOut", - fallback=ConfigDefaults.leave_inactive_channel_timeout, + self.leave_inactive_channel_timeout: float = self.register.init_option( + section="MusicBot", + option="LeaveInactiveVCTimeOut", + dest="leave_inactive_channel_timeout", + default=ConfigDefaults.leave_inactive_channel_timeout, + getter="getduration", + comment=( + "Set a period of time to wait before leaving an inactive voice channel. " + "You can set this to a number of seconds or phrase like: 4 hours" + ), ) - self.leave_after_queue_empty = config.getboolean( - "MusicBot", - "LeaveAfterSong", - fallback=ConfigDefaults.leave_after_queue_empty, + self.leave_after_queue_empty: bool = self.register.init_option( + section="MusicBot", + option="LeaveAfterQueueEmpty", + dest="leave_after_queue_empty", + default=ConfigDefaults.leave_after_queue_empty, + getter="getboolean", + comment="If enabled, MusicBot will leave the channel immediately when the song queue is empty.", ) - self.leave_player_inactive_for = config.getduration( - "MusicBot", - "LeavePlayerInactiveFor", - fallback=ConfigDefaults.leave_player_inactive_for, + self.leave_player_inactive_for: float = self.register.init_option( + section="MusicBot", + option="LeavePlayerInactiveFor", + dest="leave_player_inactive_for", + default=ConfigDefaults.leave_player_inactive_for, + getter="getduration", + comment="MusicBot will wait for this period of time before leaving voice channel when player is not playing or is paused. Set to 0 to disable.", ) - self.searchlist = config.getboolean( - "MusicBot", "SearchList", fallback=ConfigDefaults.searchlist + self.searchlist: bool = self.register.init_option( + section="MusicBot", + option="SearchList", + dest="searchlist", + default=ConfigDefaults.searchlist, + getter="getboolean", + comment="If enabled, users must indicate search result choices by sending a message instead of using reactions.", ) - self.defaultsearchresults = config.getint( - "MusicBot", - "DefaultSearchResults", - fallback=ConfigDefaults.defaultsearchresults, + self.defaultsearchresults: int = self.register.init_option( + section="MusicBot", + option="DefaultSearchResults", + dest="defaultsearchresults", + default=ConfigDefaults.defaultsearchresults, + getter="getint", + comment="Sets the default number of search results to fetch when using search command without a specific number.", ) - self.enable_options_per_guild = config.getboolean( - "MusicBot", - "EnablePrefixPerGuild", - fallback=ConfigDefaults.enable_options_per_guild, + self.enable_options_per_guild: bool = self.register.init_option( + section="MusicBot", + option="EnablePrefixPerGuild", + dest="enable_options_per_guild", + default=ConfigDefaults.enable_options_per_guild, + getter="getboolean", + comment="Allow MusicBot to save a per-server command prefix, and enables setprefix command.", ) - self.round_robin_queue = config.getboolean( - "MusicBot", - "RoundRobinQueue", - fallback=ConfigDefaults.defaultround_robin_queue, + self.round_robin_queue: bool = self.register.init_option( + section="MusicBot", + option="RoundRobinQueue", + dest="round_robin_queue", + default=ConfigDefaults.defaultround_robin_queue, + getter="getboolean", + comment="If enabled and multiple members are adding songs, MusicBot will organize playback for one song per member.", ) - dbg_str, dbg_int = config.getdebuglevel( - "MusicBot", "DebugLevel", fallback=ConfigDefaults.debug_level_str + self.enable_network_checker: bool = self.register.init_option( + section="MusicBot", + option="EnableNetworkChecker", + dest="enable_network_checker", + default=ConfigDefaults.enable_network_checker, + getter="getboolean", + comment=( + "Allow MusicBot to use system ping command to detect network outage and availability.\n" + "This is useful if you keep the bot joined to a channel or playing music 24/7.\n" + "MusicBot must be restarted to enable network testing.\n" + "By default this is disabled." + ), ) - self.debug_level_str: str = dbg_str - self.debug_level: int = dbg_int - self.debug_mode: bool = self.debug_level <= logging.DEBUG - set_logging_level(self.debug_level) - self.user_blocklist_enabled = config.getboolean( - "MusicBot", - "EnableUserBlocklist", - fallback=ConfigDefaults.user_blocklist_enabled, + self.user_blocklist_enabled: bool = self.register.init_option( + section="MusicBot", + option="EnableUserBlocklist", + dest="user_blocklist_enabled", + default=ConfigDefaults.user_blocklist_enabled, + getter="getboolean", + comment="Enable the user block list feature, without emptying the block list.", ) - self.user_blocklist_file = config.getpathlike( - "Files", "UserBlocklistFile", fallback=ConfigDefaults.user_blocklist_file + self.user_blocklist_file: pathlib.Path = self.register.init_option( + section="Files", + option="UserBlocklistFile", + dest="user_blocklist_file", + default=ConfigDefaults.user_blocklist_file, + getter="getpathlike", + comment="An optional file path to a text file listing Discord User IDs, one per line.", ) self.user_blocklist: "UserBlocklist" = UserBlocklist(self.user_blocklist_file) - self.song_blocklist_enabled = config.getboolean( - "MusicBot", - "EnableSongBlocklist", - fallback=ConfigDefaults.song_blocklist_enabled, + self.song_blocklist_enabled: bool = self.register.init_option( + section="MusicBot", + option="EnableSongBlocklist", + dest="song_blocklist_enabled", + default=ConfigDefaults.song_blocklist_enabled, + getter="getboolean", + comment="Enable the song block list feature, without emptying the block list.", ) - self.song_blocklist_file = config.getpathlike( - "Files", "SongBlocklistFile", fallback=ConfigDefaults.song_blocklist_file + self.song_blocklist_file: pathlib.Path = self.register.init_option( + section="Files", + option="SongBlocklistFile", + dest="song_blocklist_file", + default=ConfigDefaults.song_blocklist_file, + getter="getpathlike", + comment=( + "An optional file path to a text file that lists URLs, words, or phrases one per line.\n" + "Any song title or URL that contains any line in the list will be blocked." + ), ) self.song_blocklist: "SongBlocklist" = SongBlocklist(self.song_blocklist_file) - self.auto_playlist_file = config.getpathlike( - "Files", "AutoPlaylistFile", fallback=ConfigDefaults.auto_playlist_file + self.auto_playlist_file: pathlib.Path = self.register.init_option( + section="Files", + option="AutoPlaylistFile", + dest="auto_playlist_file", + default=ConfigDefaults.auto_playlist_file, + getter="getpathlike", + comment=( + "An optional file path to an auto playlist text file.\n" + "Each line of the file will be treated similarly to using the play command." + ), ) - self.i18n_file = config.getpathlike( - "Files", "i18nFile", fallback=ConfigDefaults.i18n_file + self.i18n_file: pathlib.Path = self.register.init_option( + section="Files", + option="i18nFile", + dest="i18n_file", + default=ConfigDefaults.i18n_file, + getter="getpathlike", + comment=( + "An optional file path to an i18n language file.\n" + "This option may be removed or replaced in the future!" + # TODO: i18n stuff when I get around to gettext. + ), ) - self.audio_cache_path = config.getpathlike( - "Files", "AudioCachePath", fallback=ConfigDefaults.audio_cache_path + self.audio_cache_path: pathlib.Path = self.register.init_option( + section="Files", + option="AudioCachePath", + dest="audio_cache_path", + default=ConfigDefaults.audio_cache_path, + getter="getpathlike", + comment="An optional directory path where MusicBot will store long and short-term cache for playback.", ) - # This value gets set dynamically, based on success with API authentication. - self.spotify_enabled = False + self.logs_max_kept: int = self.register.init_option( + section="Files", + option="LogsMaxKept", + dest="logs_max_kept", + default=ConfigDefaults.logs_max_kept, + getter="getint", + comment=( + "Configure automatic log file rotation at restart, and limit the number of files kept.\n" + "When disabled, only one log is kept and its contents are replaced each run.\n" + f"Default is 0, or disabled. Maximum allowed number is {MAXIMUM_LOGS_LIMIT}." + ), + ) - self.run_checks() + self.logs_date_format: str = self.register.init_option( + section="Files", + option="LogsDateFormat", + dest="logs_date_format", + default=ConfigDefaults.logs_date_format, + comment=( + "Configure the log file date format used when LogsMaxKept is enabled.\n" + "If left blank, a warning is logged and the default will be used instead.\n" + "Learn more about time format codes from the tables and data here:\n" + " https://docs.python.org/3/library/datetime.html#strftime-strptime-behavior\n" + f"Default value is: {DEFAULT_LOGS_ROTATE_FORMAT}" + ), + ) - self.missing_keys: Set[str] = set() - self.check_changes(config) + # Convert all path constants into config as pathlib.Path objects. + self.data_path = pathlib.Path(DEFAULT_DATA_PATH).resolve() + self.server_names_path = self.data_path.joinpath(DEFAULT_DATA_NAME_SERVERS) - self.setup_autoplaylist() + # Validate the config settings match destination values. + self.register.validate_register_destinations() - def check_changes(self, conf: "ExtendedConfigParser") -> None: - """ - Load the example options file and use it to detect missing config. - The results are stored in self.missing_keys as a set difference. + # Make the registry check for missing data in the INI file. + self.register.update_missing_config() - Note that keys from all sections are stored in one list, which is - then reduced to a set. If sections contain overlapping key names, - this logic will not detect a key missing from one section that was - present in another. + if self.register.ini_missing_sections: + sections_str = ", ".join( + [f"[{s}]" for s in self.register.ini_missing_sections] + ) + raise HelpfulError( + "One or more required config sections are missing.", + "Fix your config. Each [Section] should be on its own line with " + f"nothing else on it. The following sections are missing: {sections_str}", + preface="An error has occured parsing the config:\n", + ) - :param: conf: the currently loaded config file parser object. - """ - exfile = pathlib.Path(EXAMPLE_OPTIONS_FILE) - if exfile.is_file(): - usr_keys = conf.fetch_all_keys() - exconf = ExtendedConfigParser() - if not exconf.read(exfile, encoding="utf-8"): - log.error( - "Cannot detect changes in config, example options file is missing." - ) - return - ex_keys = exconf.fetch_all_keys() - if set(usr_keys) != set(ex_keys): - self.missing_keys = set(ex_keys) - set( - usr_keys - ) # to raise this as an issue in bot.py later + # This value gets set dynamically, based on success with API authentication. + self.spotify_enabled = False + + self.run_checks() + + self.setup_autoplaylist() def run_checks(self) -> None: """ @@ -354,6 +719,21 @@ def run_checks(self) -> None: :raises: musicbot.exceptions.HelpfulError if some validation failed that the user needs to correct. """ + if self.logs_max_kept > MAXIMUM_LOGS_LIMIT: + log.warning( + "Cannot store more than %s log files. Option LogsMaxKept will be limited instead.", + MAXIMUM_LOGS_LIMIT, + ) + self.logs_max_kept = MAXIMUM_LOGS_LIMIT + set_logging_max_kept_logs(self.logs_max_kept) + + if not self.logs_date_format and self.logs_max_kept > 0: + log.warning( + "Config option LogsDateFormat is empty and this will break log file rotation. Using default instead." + ) + self.logs_date_format = DEFAULT_LOGS_ROTATE_FORMAT + set_logging_rotate_date_format(self.logs_date_format) + if self.i18n_file != ConfigDefaults.i18n_file and not os.path.isfile( self.i18n_file ): @@ -437,9 +817,6 @@ def run_checks(self) -> None: if not self.footer_text: self.footer_text = ConfigDefaults.footer_text - # TODO: Add save function for future editing of options with commands - # Maybe add warnings about fields missing from the config file - async def async_validate(self, bot: "MusicBot") -> None: """ Validation logic for bot settings that depends on data from async services. @@ -548,7 +925,7 @@ def find_config(self) -> None: "Please configure settings in '%s' and re-run the bot.", DEFAULT_OPTIONS_FILE, ) - sys.exit(1) + raise RuntimeError("MusicBot cannot proceed with this config.") except ValueError as e: # Config id value was changed but its not valid raise HelpfulError( @@ -585,10 +962,84 @@ def setup_autoplaylist(self) -> None: self.auto_playlist_removed_file = ap_removed_file self.auto_playlist_cachemap_file = ap_cachemap_file + def update_option(self, option: "ConfigOption", value: str) -> bool: + """ + Uses option data to parse the given value and update its associated config. + No data is saved to file however. + """ + tmp_parser = ExtendedConfigParser() + tmp_parser.read_dict({option.section: {option.option: value}}) + + try: + get = getattr(tmp_parser, option.getter, None) + if not get: + log.critical("Dev Bug! Config option has getter that is not available.") + return False + new_conf_val = get(option.section, option.option, fallback=option.default) + if not isinstance(new_conf_val, type(option.default)): + log.error( + "Dev Bug! Config option has invalid type, getter and default must be the same type." + ) + return False + setattr(self, option.dest, new_conf_val) + return True + except (HelpfulError, ValueError, TypeError): + return False + + def save_option(self, option: "ConfigOption") -> bool: + """ + Converts the current Config value into an INI file value as needed. + Note: ConfigParser must not use multi-line values. This will break them. + Should multiline values be needed, maybe use ConfigUpdater package instead. + """ + try: + cu = configupdater.ConfigUpdater() + cu.optionxform = str # type: ignore + cu.read(self.config_file, encoding="utf8") + + if option.section in list(cu.keys()): + if option.option not in list(cu[option.section].keys()): + log.debug("Option was missing previously.") + cu[option.section][option.option] = self.register.to_ini(option) + c_bits = option.comment.split("\n") + adder = cu[option.section][option.option].add_before + adder.space() + if len(c_bits) > 1: + for line in c_bits: + adder.comment(line) + else: + adder.comment(option.comment) + cu[option.section][option.option].add_after.space() + else: + cu[option.section][option.option] = self.register.to_ini(option) + else: + # TODO: Maybe we should make a method that handles first-time setup + # or otherwise some form of auto-update thing for config? + log.error( + "Config section not in parsed config! Missing: %s", option.section + ) + return False + cu.update_file() + log.info( + "Saved config option: %s = %s", + option, + cu[option.section][option.option].value, + ) + return True + except ( + OSError, + AttributeError, + configparser.DuplicateSectionError, + configparser.ParsingError, + ): + log.exception("Failed to save config: %s", option) + return False + class ConfigDefaults: """ This class contains default values used mainly as config fallback values. + None type is not allowed as a default value. """ owner_id: int = 0 @@ -619,18 +1070,12 @@ class ConfigDefaults: auto_summon: bool = True auto_playlist: bool = True auto_playlist_random: bool = True + auto_playlist_autoskip: bool = False + auto_playlist_remove_on_block: bool = False auto_pause: bool = True delete_messages: bool = True delete_invoking: bool = False persistent_queue: bool = True - - debug_level: int = getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO) - debug_level_str: str = ( - DEFAULT_LOG_LEVEL - if logging.getLevelName(debug_level) == DEFAULT_LOG_LEVEL - else "INFO" - ) - status_message: str = "" write_current_song: bool = False allow_author_skip: bool = True @@ -652,6 +1097,7 @@ class ConfigDefaults: enable_options_per_guild: bool = False footer_text: str = DEFAULT_FOOTER_TEXT defaultround_robin_queue: bool = False + enable_network_checker: bool = False song_blocklist: Set[str] = set() user_blocklist: Set[int] = set() @@ -659,6 +1105,9 @@ class ConfigDefaults: # default true here since the file being populated was previously how it was enabled. user_blocklist_enabled: bool = True + logs_max_kept: int = DEFAULT_LOGS_KEPT + logs_date_format: str = DEFAULT_LOGS_ROTATE_FORMAT + # Create path objects from the constants. options_file: pathlib.Path = pathlib.Path(DEFAULT_OPTIONS_FILE) user_blocklist_file: pathlib.Path = pathlib.Path(DEFAULT_USER_BLOCKLIST_FILE) @@ -667,6 +1116,441 @@ class ConfigDefaults: i18n_file: pathlib.Path = pathlib.Path(DEFAULT_I18N_FILE) audio_cache_path: pathlib.Path = pathlib.Path(DEFAULT_AUDIO_CACHE_PATH).absolute() + @staticmethod + def _debug_level() -> Tuple[str, int]: + """default values for debug log level configs""" + debug_level: int = getattr(logging, DEFAULT_LOG_LEVEL, logging.INFO) + debug_level_str: str = ( + DEFAULT_LOG_LEVEL + if logging.getLevelName(debug_level) == DEFAULT_LOG_LEVEL + else logging.getLevelName(debug_level) + ) + return (debug_level_str, debug_level) + + +class ConfigOption: + """Basic data model for individual registered options.""" + + def __init__( + self, + section: str, + option: str, + dest: str, + default: RegTypes, + comment: str, + getter: str = "get", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> None: + """ + Defines a configuration option in MusicBot and attributes used to + identify the option both at runtime and in the INI file. + + :param: section: The section this option belongs to, case sensitive. + :param: option: The name of this option, case sensitive. + :param: dest: The name of a Config attribute the value of this option will be stored in. + :param: getter: The name of a callable in ConfigParser used to get this option value. + :param: default: The default value for this option if it is missing or invalid. + :param: comment: A comment or help text to show for this option. + :param: editable: If this option can be changed via commands. + :param: invisible: (Permissions only) hide from display when formatted for per-user display. + :param: empty_display_val Value shown when the parsed value is empty or None. + """ + self.section = section + self.option = option + self.dest = dest + self.getter = getter + self.default = default + self.comment = comment + self.editable = editable + self.invisible = invisible + self.empty_display_val = empty_display_val + + def __str__(self) -> str: + return f"[{self.section}] > {self.option}" + + +class ConfigOptionRegistry: + """ + Management system for registering config options which provides methods to + query the state of configurations or translate them. + """ + + def __init__( + self, config: Union[Config, "Permissions"], parser: "ExtendedConfigParser" + ) -> None: + """ + Manage a configuration registry that associates config options to their + parent section, a runtime name, validation for values, and commentary + or other help text about the option. + """ + self._config = config + self._parser = parser + + # registered options. + self._option_list: List[ConfigOption] = [] + + # registered sections. + self._sections: Set[str] = set() + self._options: Set[str] = set() + self._distinct_options: Set[str] = set() + + # set up missing config data. + self.ini_missing_options: Set[ConfigOption] = set() + self.ini_missing_sections: Set[str] = set() + + @property + def sections(self) -> Set[str]: + """Available section names.""" + return self._sections + + @property + def option_keys(self) -> Set[str]: + """Available options with section names.""" + return self._options + + @property + def option_list(self) -> List[ConfigOption]: + """Non-settable option list.""" + return self._option_list + + def update_missing_config(self) -> None: + """ + Checks over the ini file for options missing from the file. + It only considers registered options, rather than looking at examples file. + As such it should be run after all options are registered. + """ + # load the unique sections and options from the parser. + p_section_set = set() + p_key_set = set() + parser_sections = dict(self._parser.items()) + for section in parser_sections: + p_section_set.add(section) + opts = set(parser_sections[section].keys()) + for opt in opts: + p_key_set.add(f"[{section}] > {opt}") + + # update the missing sections registry. + self.ini_missing_sections = self._sections - p_section_set + + # populate the missing options registry. + for option in self._option_list: + if str(option) not in p_key_set: + self.ini_missing_options.add(option) + + def get_updated_options(self) -> List[ConfigOption]: + """ + Get ConfigOptions that have been updated at runtime. + """ + changed = [] + for option in self._option_list: + if not hasattr(self._config, option.dest): + raise AttributeError( + f"Dev Bug! Attribute `Config.{option.dest}` does not exist." + ) + + if not hasattr(self._parser, option.getter): + raise AttributeError( + f"Dev Bug! Method `*ConfigParser.{option.getter}` does not exist." + ) + + p_getter = getattr(self._parser, option.getter) + config_value = getattr(self._config, option.dest) + parser_value = p_getter( + option.section, option.option, fallback=option.default + ) + + # We only care about changed options that are editable. + if config_value != parser_value and option.editable: + changed.append(option) + return changed + + def get_config_option(self, section: str, option: str) -> Optional[ConfigOption]: + """ + Gets the config option if it exists, or returns None + """ + for opt in self._option_list: + if opt.section == section and opt.option == option: + return opt + return None + + def get_values(self, opt: ConfigOption) -> Tuple[RegTypes, str, str]: + """ + Get the values in Config and *ConfigParser for this config option. + Returned tuple contains parsed value, ini-string, and a display string + for the parsed config value if applicable. + Display string may be empty if not used. + """ + if not opt.editable: + return ("", "", "") + + if not hasattr(self._config, opt.dest): + raise AttributeError( + f"Dev Bug! Attribute `Config.{opt.dest}` does not exist." + ) + + if not hasattr(self._parser, opt.getter): + raise AttributeError( + f"Dev Bug! Method `*ConfigParser.{opt.getter}` does not exist." + ) + + p_getter = getattr(self._parser, opt.getter) + config_value = getattr(self._config, opt.dest) + parser_value = p_getter(opt.section, opt.option, fallback=opt.default) + + display_config_value = "" + if not display_config_value and opt.empty_display_val: + display_config_value = opt.empty_display_val + + return (config_value, parser_value, display_config_value) + + def validate_register_destinations(self) -> None: + """Check all configured options for matching destination definitions.""" + errors = [] + for opt in self._option_list: + if not hasattr(self._config, opt.dest): + errors.append( + f"Config Option `{opt}` has an missing destination named: {opt.dest}" + ) + if errors: + msg = "Dev Bug! Some options failed config validation.\n" + msg += "\n".join(errors) + raise RuntimeError(msg) + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: str, + comment: str, + getter: str = "get", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> str: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: bool, + comment: str, + getter: str = "getboolean", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> bool: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: int, + comment: str, + getter: str = "getint", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> int: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: float, + comment: str, + getter: str = "getfloat", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> float: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: Set[int], + comment: str, + getter: str = "getidset", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> Set[int]: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: Set[str], + comment: str, + getter: str = "getstrset", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> Set[str]: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: DebugLevel, + comment: str, + getter: str = "getdebuglevel", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> DebugLevel: + pass + + @overload + def init_option( + self, + section: str, + option: str, + dest: str, + default: pathlib.Path, + comment: str, + getter: str = "getpathlike", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> pathlib.Path: + pass + + def init_option( + self, + section: str, + option: str, + dest: str, + default: RegTypes, + comment: str, + getter: str = "get", + editable: bool = True, + invisible: bool = False, + empty_display_val: str = "", + ) -> RegTypes: + """ + Register an option while getting its configuration value at the same time. + + :param: section: The section this option belongs to, case sensitive. + :param: option: The name of this option, case sensitive. + :param: dest: The name of a Config attribute the value of this option will be stored in. + :param: getter: The name of a callable in ConfigParser used to get this option value. + :param: default: The default value for this option if it is missing or invalid. + :param: comment: A comment or help text to show for this option. + :param: editable: If this option can be changed via commands. + """ + # Check that the getter function exists and is callable. + if not hasattr(self._parser, getter): + raise ValueError( + f"Dev Bug! There is no *ConfigParser function by the name of: {getter}" + ) + if not callable(getattr(self._parser, getter)): + raise TypeError( + f"Dev Bug! The *ConfigParser.{getter} attribute is not a callable function." + ) + + # add the option to the registry. + config_opt = ConfigOption( + section=section, + option=option, + dest=dest, + default=default, + getter=getter, + comment=comment, + editable=editable, + invisible=invisible, + empty_display_val=empty_display_val, + ) + self._option_list.append(config_opt) + self._sections.add(section) + self._options.add(str(config_opt)) + self._distinct_options.add(option) + + # get the current config value. + getfunc = getattr(self._parser, getter) + opt: RegTypes = getfunc(section, option, fallback=default) + + # sanity check that default actually matches the type from getter. + if not isinstance(opt, type(default)): + raise TypeError( + "Dev Bug! Are you using the wrong getter for this option?\n" + f"[{section}] > {option} has type: {type(default)} but got type: {type(opt)}" + ) + return opt + + def to_ini(self, option: ConfigOption, use_default: bool = False) -> str: + """ + Convert the parsed config value into an INI value. + This method does not perform validation, simply converts the value. + + :param: use_default: return the default value instead of current config. + """ + if use_default: + conf_value = option.default + else: + if not hasattr(self._config, option.dest): + raise AttributeError( + f"Dev Bug! Attribute `Config.{option.dest}` does not exist." + ) + + conf_value = getattr(self._config, option.dest) + return self._value_to_ini(conf_value, option.getter) + + def _value_to_ini(self, conf_value: RegTypes, getter: str) -> str: + """Converts a value to an ini string.""" + if getter == "get": + return str(conf_value) + + if getter == "getint": + return str(conf_value) + + if getter == "getfloat": + return f"{conf_value:.3f}" + + if getter == "getboolean": + return "yes" if conf_value else "no" + + if getter in ["getstrset", "getidset"] and isinstance(conf_value, set): + return ", ".join(str(x) for x in conf_value) + + if getter == "getdatasize" and isinstance(conf_value, int): + return format_size_from_bytes(conf_value) + + if getter == "getduration" and isinstance(conf_value, (int, float)): + td = datetime.timedelta(seconds=round(conf_value)) + return str(td) + + if getter == "getpathlike": + return str(conf_value) + + # NOTE: Added for completeness but unused as debug_level is not editable. + if getter == "getdebuglevel" and isinstance(conf_value, int): + return str(logging.getLevelName(conf_value)) + + return str(conf_value) + class ExtendedConfigParser(configparser.ConfigParser): """ @@ -676,7 +1560,10 @@ class ExtendedConfigParser(configparser.ConfigParser): """ def __init__(self) -> None: - super().__init__(interpolation=None) + # If empty_lines_in_values is ever true, config editing needs refactor. + # Probably should use ConfigUpdater package instead. + super().__init__(interpolation=None, empty_lines_in_values=False) + self.error_preface = "Error loading config value:" def optionxform(self, optionstr: str) -> str: """ @@ -702,11 +1589,11 @@ def getownerid( section: str, key: str, fallback: int = 0, - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> int: """get the owner ID or 0 for auto""" - val = self.get(section, key, fallback="").strip() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() if not val: return fallback if val.lower() == "auto": @@ -716,9 +1603,9 @@ def getownerid( return int(val) except ValueError as e: raise HelpfulError( - f"OwnerID is not valid. Your setting: {val}", - "Set OwnerID to a numerical ID or set it to 'auto' to have the bot find it.", - preface="Error while loading config:\n", + f"The owner ID in [{section}] > {key} is not valid. Your setting: {val}", + f"Set {key} to a numerical ID or set it to 'auto' to have the bot find it for you.", + preface=self.error_preface, ) from e def getpathlike( @@ -726,28 +1613,40 @@ def getpathlike( section: str, key: str, fallback: pathlib.Path, - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> pathlib.Path: """ get a config value and parse it as a Path object. the `fallback` argument is required. """ - val = self.get(section, key, fallback="").strip() - if not val: + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() + if not val and fallback: return fallback - return pathlib.Path(val) + if not val and not fallback: + raise ValueError( + f"The option [{section}] > {key} does not have a valid fallback value. This is a bug!" + ) + + try: + return pathlib.Path(val).resolve(strict=False) + except RuntimeError as e: + raise HelpfulError( + preface=self.error_preface, + issue=f"The config option [{section}] > {key} is not a valid file system location.", + solution="Check the path setting and make sure it doesn't loop back on itself.", + ) from e def getidset( self, section: str, key: str, fallback: Optional[Set[int]] = None, - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> Set[int]: """get a config value and parse it as a set of ID values.""" - val = self.get(section, key, fallback="").strip() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() if not val and fallback: return set(fallback) @@ -756,9 +1655,9 @@ def getidset( return set(int(i) for i in str_ids) except ValueError as e: raise HelpfulError( - f"One of the IDs in your config `{key}` is invalid.", + f"One of the IDs in option [{section}] > {key} is invalid.", "Ensure all IDs are numerical, and separated only by spaces or commas.", - preface="Error while loading config:\n", + preface=self.error_preface, ) from e def getdebuglevel( @@ -766,11 +1665,11 @@ def getdebuglevel( section: str, key: str, fallback: str = "", - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin - ) -> Tuple[str, int]: + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin + ) -> DebugLevel: """get a config value an parse it as a logger level.""" - val = self.get(section, key, fallback="").strip().upper() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip().upper() if not val and fallback: val = fallback.upper() @@ -794,48 +1693,103 @@ def getdatasize( section: str, key: str, fallback: int = 0, - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> int: """get a config value and parse it as a human readable data size""" - val = self.get(section, key, fallback="").strip() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() if not val and fallback: return fallback try: return format_size_to_bytes(val) except ValueError: log.warning( - "Config '%s' has invalid config value '%s' using default instead.", + "Option [%s] > %s has invalid config value '%s' using default instead.", + section, key, val, ) return fallback + def getpercent( + self, + section: str, + key: str, + fallback: float = 0.0, + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin + ) -> float: + """ + Get a config value and parse it as a percentage. + Always returns a positive value between 0 and 1 inclusive. + """ + if fallback: + fallback = max(0.0, min(abs(fallback), 1.0)) + + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() + if not val and fallback: + return fallback + + v = 0.0 + # account for literal percentage character: % + if val.startswith("%") or val.endswith("%"): + try: + ival = val.replace("%", "").strip() + v = abs(int(ival)) / 100 + except (ValueError, TypeError): + if fallback: + return fallback + raise + + # account for explicit float and implied percentage. + else: + try: + v = abs(float(val)) + # if greater than 1, assume implied percentage. + if v > 1: + v = v / 100 + except (ValueError, TypeError): + if fallback: + return fallback + raise + + if v > 1: + log.warning( + "Option [%s] > %s has a value greater than 100 %% (%s) and will be set to %s instead.", + section, + key, + val, + fallback if fallback else 1, + ) + v = fallback if fallback else 1 + + return v + def getduration( self, section: str, key: str, fallback: Union[int, float] = 0, - raw: bool = False, # pylint: disable=unused-argument, - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> float: """get a config value parsed as a time duration.""" - val = self.get(section, key, fallback="").strip() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() if not val and fallback: return float(fallback) seconds = format_time_to_seconds(val) return float(seconds) - def getstrset( # pylint: disable=dangerous-default-value + def getstrset( self, section: str, key: str, - fallback: Set[str] = set(), - raw: bool = False, # pylint: disable=unused-argument - vars: Any = None, # pylint: disable=unused-argument,redefined-builtin + fallback: Set[str], + raw: bool = False, + vars: ConfVars = None, # pylint: disable=redefined-builtin ) -> Set[str]: """get a config value parsed as a set of string values.""" - val = self.get(section, key, fallback="").strip() + val = self.get(section, key, fallback="", raw=raw, vars=vars).strip() if not val and fallback: return set(fallback) return set(x for x in val.replace(",", " ").split()) diff --git a/musicbot/constants.py b/musicbot/constants.py index 9f039f059..7ea9f37b0 100644 --- a/musicbot/constants.py +++ b/musicbot/constants.py @@ -15,6 +15,8 @@ DEFAULT_FOOTER_TEXT: str = f"Just-Some-Bots/MusicBot ({VERSION})" DEFAULT_BOT_NAME: str = "MusicBot" DEFAULT_BOT_ICON: str = "https://i.imgur.com/gFHBoZA.png" +DEFAULT_OWNER_GROUP_NAME: str = "Owner (auto)" +DEFAULT_PERMS_GROUP_NAME: str = "Default" # File path constants @@ -28,6 +30,11 @@ DEFAULT_AUTOPLAYLIST_FILE: str = "config/autoplaylist.txt" BUNDLED_AUTOPLAYLIST_FILE: str = "config/_autoplaylist.txt" DEFAULT_AUDIO_CACHE_PATH: str = "audio_cache" +DEFAULT_DATA_PATH: str = "data" +DEFAULT_DATA_NAME_SERVERS: str = "server_names.txt" +DEFAULT_DATA_NAME_QUEUE: str = "queue.json" +DEFAULT_DATA_NAME_CUR_SONG: str = "current.txt" +DEFAULT_DATA_NAME_OPTIONS: str = "options.json" EXAMPLE_OPTIONS_FILE: str = "config/example_options.ini" EXAMPLE_PERMS_FILE: str = "config/example_permissions.ini" diff --git a/musicbot/constructs.py b/musicbot/constructs.py index e1307ec27..7f66d6106 100644 --- a/musicbot/constructs.py +++ b/musicbot/constructs.py @@ -2,7 +2,6 @@ import inspect import json import logging -import pathlib import pydoc from collections import defaultdict from typing import ( @@ -20,12 +19,20 @@ import discord +from .constants import ( + DEFAULT_BOT_ICON, + DEFAULT_BOT_NAME, + DEFAULT_DATA_NAME_OPTIONS, + DEFAULT_FOOTER_TEXT, +) from .json import Json from .utils import _get_variable log = logging.getLogger(__name__) if TYPE_CHECKING: + from discord.types.embed import EmbedType + from .bot import MusicBot from .config import Config @@ -76,6 +83,7 @@ def __init__(self, bot: "MusicBot") -> None: self._prefix_history: Set[str] = set() self._events: DefaultDict[str, GuildAsyncEvent] = defaultdict(GuildAsyncEvent) self._file_lock: asyncio.Lock = asyncio.Lock() + self._loading_lock: asyncio.Lock = asyncio.Lock() self._is_file_loaded: bool = False # Members below are available for public use. @@ -87,6 +95,10 @@ def __init__(self, bot: "MusicBot") -> None: if bot.loop: bot.loop.create_task(self.load_guild_options_file()) + def is_ready(self) -> bool: + """A status indicator for fully loaded server data.""" + return self._is_file_loaded and self._guild_id != 0 + def _lookup_guild_id(self) -> int: """ Looks up guild.id used to create this instance of GuildSpecificData @@ -148,44 +160,51 @@ async def load_guild_options_file(self) -> None: server-specific options intended to persist through shutdowns. This method only supports per-server command prefix currently. """ - if self._guild_id == 0: - self._guild_id = self._lookup_guild_id() + if self._loading_lock.locked(): + return + + async with self._loading_lock: if self._guild_id == 0: - log.error( - "Cannot load data for guild with ID 0. This is likely a bug in the code!" - ) + self._guild_id = self._lookup_guild_id() + if self._guild_id == 0: + log.error( + "Cannot load data for guild with ID 0. This is likely a bug in the code!" + ) + return + + opt_file = self._bot_config.data_path.joinpath( + str(self._guild_id), DEFAULT_DATA_NAME_OPTIONS + ) + if not opt_file.is_file(): + log.debug("No file for guild %s/%s", self._guild_id, self._guild_name) + self._is_file_loaded = True return - opt_file = pathlib.Path(f"data/{self._guild_id}/options.json") - if not opt_file.is_file(): - log.debug("No file for guild %s/%s", self._guild_id, self._guild_name) - return - - async with self._file_lock: - try: - log.debug( - "Loading guild data for guild with ID: %s/%s", + async with self._file_lock: + try: + log.debug( + "Loading guild data for guild with ID: %s/%s", + self._guild_id, + self._guild_name, + ) + options = Json(opt_file) + self._is_file_loaded = True + except OSError: + log.exception( + "An OS error prevented reading guild data file: %s", + opt_file, + ) + return + + guild_prefix = options.get("command_prefix", None) + if guild_prefix: + self._command_prefix = guild_prefix + log.info( + "Guild %s/%s has custom command prefix: %s", self._guild_id, self._guild_name, + self._command_prefix, ) - options = Json(opt_file) - self._is_file_loaded = True - except OSError: - log.exception( - "An OS error prevented reading guild data file: %s", - opt_file, - ) - return - - guild_prefix = options.get("command_prefix", None) - if guild_prefix: - self._command_prefix = guild_prefix - log.info( - "Guild %s/%s has custom command prefix: %s", - self._guild_id, - self._guild_name, - self._command_prefix, - ) async def save_guild_options_file(self) -> None: """ @@ -198,7 +217,9 @@ async def save_guild_options_file(self) -> None: ) return - opt_file = pathlib.Path(f"data/{self._guild_id}/options.json") + opt_file = self._bot_config.data_path.joinpath( + str(self._guild_id), DEFAULT_DATA_NAME_OPTIONS + ) # Prepare a dictionary to store our options. opt_dict = {"command_prefix": self._command_prefix} diff --git a/musicbot/permissions.py b/musicbot/permissions.py index 0cf675b81..b3e0620f9 100644 --- a/musicbot/permissions.py +++ b/musicbot/permissions.py @@ -2,18 +2,26 @@ import logging import pathlib import shutil -from typing import TYPE_CHECKING, Any, Dict, List, Set, Union +from typing import TYPE_CHECKING, Dict, Set, Tuple, Type, Union +import configupdater import discord -from .config import ExtendedConfigParser -from .constants import DEFAULT_PERMS_FILE, EXAMPLE_PERMS_FILE +from .config import ConfigOption, ConfigOptionRegistry, ExtendedConfigParser, RegTypes +from .constants import ( + DEFAULT_OWNER_GROUP_NAME, + DEFAULT_PERMS_FILE, + DEFAULT_PERMS_GROUP_NAME, + EXAMPLE_PERMS_FILE, +) +from .exceptions import HelpfulError, PermissionsError if TYPE_CHECKING: from .bot import MusicBot log = logging.getLogger(__name__) + # Permissive class define the permissive value of each permissions @@ -23,32 +31,29 @@ class PermissionsDefaults: Most values restrict access by default. """ - perms_file: pathlib.Path = pathlib.Path(DEFAULT_PERMS_FILE) - example_perms_file: pathlib.Path = pathlib.Path(EXAMPLE_PERMS_FILE) + command_whitelist: Set[str] = set() + command_blacklist: Set[str] = set() + ignore_non_voice: Set[str] = set() + grant_to_roles: Set[int] = set() + user_list: Set[int] = set() - CommandWhiteList: Set[str] = set() - CommandBlackList: Set[str] = set() - IgnoreNonVoice: Set[str] = set() - GrantToRoles: Set[int] = set() - UserList: Set[int] = set() + max_songs: int = 8 + max_song_length: int = 210 + max_playlist_length: int = 0 + max_search_items: int = 10 - MaxSongs: int = 8 - MaxSongLength: int = 210 - MaxPlaylistLength: int = 0 - MaxSearchItems: int = 10 + allow_playlists: bool = True + insta_skip: bool = False + skip_looped: bool = False + remove: bool = False + skip_when_absent: bool = True + bypass_karaoke_mode: bool = False - AllowPlaylists: bool = True - InstaSkip: bool = False - SkipLooped: bool = False - Remove: bool = False - SkipWhenAbsent: bool = True - BypassKaraokeMode: bool = False - - SummonNoVoice: bool = False + summon_no_voice: bool = False # allow at least the extractors that the bot normally needs. # an empty set here allows all. - Extractors: Set[str] = { + extractors: Set[str] = { "generic", "youtube", "youtube:tab", @@ -57,33 +62,43 @@ class PermissionsDefaults: "spotify:musicbot", } + # These defaults are not used per-group but rather for permissions system itself. + perms_file: pathlib.Path = pathlib.Path(DEFAULT_PERMS_FILE) + example_perms_file: pathlib.Path = pathlib.Path(EXAMPLE_PERMS_FILE) + + +class PermissiveDefaults(PermissionsDefaults): + """ + The maxiumum allowed version of defaults. + Most values grant access or remove limits by default. + """ -class Permissive: - CommandWhiteList: Set[str] = set() - CommandBlackList: Set[str] = set() - IgnoreNonVoice: Set[str] = set() - GrantToRoles: Set[int] = set() - UserList: Set[int] = set() + command_whitelist: Set[str] = set() + command_blacklist: Set[str] = set() + ignore_non_voice: Set[str] = set() + grant_to_roles: Set[int] = set() + user_list: Set[int] = set() - MaxSongs: int = 0 - MaxSongLength: int = 0 - MaxPlaylistLength: int = 0 - MaxSearchItems: int = 10 + max_songs: int = 0 + max_song_length: int = 0 + max_playlist_length: int = 0 + max_search_items: int = 10 - AllowPlaylists: bool = True - InstaSkip: bool = True - SkipLooped: bool = True - Remove: bool = True - SkipWhenAbsent: bool = False - BypassKaraokeMode: bool = True + allow_playlists: bool = True + insta_skip: bool = True + skip_looped: bool = True + remove: bool = True + skip_when_absent: bool = False + bypass_karaoke_mode: bool = True - SummonNoVoice: bool = True + summon_no_voice: bool = True - Extractors: Set[str] = set() + # an empty set here allows all. + extractors: Set[str] = set() class Permissions: - def __init__(self, perms_file: pathlib.Path, grant_all: List[int]) -> None: + def __init__(self, perms_file: pathlib.Path) -> None: """ Handles locating, initializing defaults, loading, and validating permissions config from the given `perms_file` path. @@ -92,6 +107,8 @@ def __init__(self, perms_file: pathlib.Path, grant_all: List[int]) -> None: """ self.perms_file = perms_file self.config = ExtendedConfigParser() + self.register = PermissionOptionRegistry(self, self.config) + self.groups: Dict[str, "PermissionGroup"] = {} if not self.config.read(self.perms_file, encoding="utf-8"): example_file = PermissionsDefaults.example_perms_file @@ -112,47 +129,58 @@ def __init__(self, perms_file: pathlib.Path, grant_all: List[int]) -> None: f"Unable to copy {example_file} to {self.perms_file}: {str(e)}" ) from e - self.default_group = PermissionGroup("Default", self.config["Default"]) - self.groups = set() - for section in self.config.sections(): - if section != "Owner (auto)": - self.groups.add(PermissionGroup(section, self.config[section])) - - if self.config.has_section("Owner (auto)"): - owner_group = PermissionGroup( - "Owner (auto)", self.config["Owner (auto)"], fallback=Permissive + if section == DEFAULT_OWNER_GROUP_NAME: + self.groups[section] = self._generate_permissive_group(section) + else: + self.groups[section] = self._generate_default_group(section) + + # in case the permissions don't have a default group, make one. + if not self.config.has_section(DEFAULT_PERMS_GROUP_NAME): + self.groups[DEFAULT_PERMS_GROUP_NAME] = self._generate_default_group( + DEFAULT_PERMS_GROUP_NAME ) - else: - log.info( - "[Owner (auto)] section not found, falling back to permissive default" - ) - # Create a fake section to fallback onto the default permissive values to grant to the owner - owner_group = PermissionGroup( - "Owner (auto)", - configparser.SectionProxy(self.config, "Owner (auto)"), - fallback=Permissive, + # in case the permissions don't have an owner group, create a virtual one. + if not self.config.has_section(DEFAULT_OWNER_GROUP_NAME): + self.groups[DEFAULT_OWNER_GROUP_NAME] = self._generate_permissive_group( + DEFAULT_OWNER_GROUP_NAME ) - if hasattr(grant_all, "__iter__"): - owner_group.user_list = set(grant_all) + self.register.validate_register_destinations() + + def _generate_default_group(self, name: str) -> "PermissionGroup": + """Generate a group with `name` using PermissionDefaults.""" + return PermissionGroup(name, self, PermissionsDefaults) + + def _generate_permissive_group(self, name: str) -> "PermissionGroup": + """Generate a group with `name` using PermissiveDefaults. Typically owner group.""" + return PermissionGroup(name, self, PermissiveDefaults) + + def set_owner_id(self, owner_id: int) -> None: + """Sets the given id as the owner ID in the owner permission group.""" + if owner_id == 0: + log.debug("OwnerID is set auto, will set correctly later.") + self.groups[DEFAULT_OWNER_GROUP_NAME].user_list = set([owner_id]) + + @property + def owner_group(self) -> "PermissionGroup": + """Always returns the owner group""" + return self.groups[DEFAULT_OWNER_GROUP_NAME] - self.groups.add(owner_group) + @property + def default_group(self) -> "PermissionGroup": + """Always returns the default group""" + return self.groups[DEFAULT_PERMS_GROUP_NAME] async def async_validate(self, bot: "MusicBot") -> None: """ Handle validation of permissions data that depends on async services. """ log.debug("Validating permissions...") - - og = discord.utils.get(self.groups, name="Owner (auto)") - if not og: - raise RuntimeError("Owner permissions group is missing!") - - if 0 in og.user_list: - log.debug("Fixing automatic owner group") - og.user_list = {bot.config.owner_id} + if 0 in self.owner_group.user_list: + log.debug("Setting auto OwnerID for owner permissions group.") + self.owner_group.user_list = {bot.config.owner_id} def save(self) -> None: """ @@ -168,38 +196,122 @@ def for_user(self, user: Union[discord.Member, discord.User]) -> "PermissionGrou :param user: A discord User or Member object """ - for group in self.groups: + # Search for the first group a member ID shows up in. + for group in self.groups.values(): if user.id in group.user_list: return group - # The only way I could search for roles is if I add a `server=None` param and pass that too + # In case this is not a Member and has no roles, use default. if isinstance(user, discord.User): return self.default_group - # We loop again so that we don't return a role based group before we find an assigned one - for group in self.groups: + # Search groups again and associate the member by role IDs. + for group in self.groups.values(): for role in user.roles: if role.id in group.granted_to_roles: return group + # Or just assign default role. return self.default_group - def create_group(self, name: str, **kwargs: Dict[str, Any]) -> None: + def add_group(self, name: str) -> None: + """ + Creates a permission group, but does nothing to the parser. + """ + self.groups[name] = self._generate_default_group(name) + + def remove_group(self, name: str) -> None: + """Removes a permission group but does nothing to the parser.""" + del self.groups[name] + self.register.unregister_group(name) + + def save_group(self, group: str) -> bool: + """ + Converts the current Permission Group value into an INI file value as needed. + Note: ConfigParser must not use multi-line values. This will break them. + Should multiline values be needed, maybe use ConfigUpdater package instead. + """ + try: + cu = configupdater.ConfigUpdater() + cu.optionxform = str # type: ignore + cu.read(self.perms_file, encoding="utf8") + + opts = self.register.get_option_dict(group) + # update/delete existing + if group in set(cu.keys()): + # update + if group in self.groups: + log.debug("Updating group in permssions file: %s", group) + for option in set(cu[group].keys()): + cu[group][option].value = self.register.to_ini(opts[option]) + + # delete + else: + log.debug("Deleting group from permissions file: %s", group) + cu.remove_section(group) + + # add new + elif group in self.groups: + log.debug("Adding new group to permissions file: %s", group) + options = "" + for _, opt in opts.items(): + c_bits = opt.comment.split("\n") + if len(c_bits) > 1: + comments = "".join([f"# {x}\n" for x in c_bits]) + else: + comments = f"# {opt.comment.strip()}\n" + ini_val = self.register.to_ini(opt) + options += f"{comments}{opt.option} = {ini_val}\n\n" + new_section = configupdater.ConfigUpdater() + new_section.optionxform = str # type: ignore + new_section.read_string(f"[{group}]\n{options}\n") + cu.add_section(new_section[group].detach()) + + log.debug("Saving permissions file now.") + cu.update_file() + return True + + # except configparser.MissingSectionHeaderError: + except configparser.ParsingError: + log.exception("ConfigUpdater could not parse the permissions file!") + except configparser.DuplicateSectionError: + log.exception("You have a duplicate section, fix your Permissions file!") + except OSError: + log.exception("Failed to save permissions group: %s", group) + + return False + + def update_option(self, option: ConfigOption, value: str) -> bool: """ - Currently unused, intended to create a permission group that could - then be saved back to the permissions config file. + Uses option data to parse the given value and update its associated permission. + No data is saved to file however. """ - # TODO: Test this. and implement the rest of permissions editing... - self.config.read_dict({name: kwargs}) - self.groups.add(PermissionGroup(name, self.config[name])) + tparser = ExtendedConfigParser() + tparser.read_dict({option.section: {option.option: value}}) + + try: + get = getattr(tparser, option.getter, None) + if not get: + log.critical("Dev Bug! Permission has getter that is not available.") + return False + new_conf_val = get(option.section, option.option, fallback=option.default) + if not isinstance(new_conf_val, type(option.default)): + log.error( + "Dev Bug! Permission has invalid type, getter and default must be the same type." + ) + return False + setattr(self.groups[option.section], option.dest, new_conf_val) + return True + except (HelpfulError, ValueError, TypeError): + return False class PermissionGroup: def __init__( self, name: str, - section_data: configparser.SectionProxy, - fallback: Any = PermissionsDefaults, + manager: Permissions, + defaults: Type[PermissionsDefaults], ) -> None: """ Create a PermissionGroup object from a ConfigParser section. @@ -208,56 +320,172 @@ def __init__( :param: section_data: a config SectionProxy that describes a group :param: fallback: Typically a PermissionsDefaults class """ + self._mgr = manager + self.name = name - self.command_whitelist = section_data.getstrset( - "CommandWhiteList", fallback=fallback.CommandWhiteList + self.command_whitelist = self._mgr.register.init_option( + section=name, + option="CommandWhiteList", + dest="command_whitelist", + getter="getstrset", + default=defaults.command_whitelist, + comment="List of command names allowed for use, separated by spaces. Overrides CommandBlackList is set.", + empty_display_val="(All allowed)", ) - self.command_blacklist = section_data.getstrset( - "CommandBlackList", fallback=fallback.CommandBlackList + self.command_blacklist = self._mgr.register.init_option( + section=name, + option="CommandBlackList", + dest="command_blacklist", + default=defaults.command_blacklist, + getter="getstrset", + comment="List of command names denied from use, separated by spaces. Will not work if CommandWhiteList is set!", + empty_display_val="(None denied)", ) - self.ignore_non_voice = section_data.getstrset( - "IgnoreNonVoice", fallback=fallback.IgnoreNonVoice + self.ignore_non_voice = self._mgr.register.init_option( + section=name, + option="IgnoreNonVoice", + dest="ignore_non_voice", + getter="getstrset", + default=defaults.ignore_non_voice, + comment=( + "List of command names that can only be used while in the same voice channel as MusicBot.\n" + "Some commands will always require the user to be in voice, regardless of this list.\n" + "Command names should be separated by spaces." + ), + empty_display_val="(No commands listed)", ) - self.granted_to_roles = section_data.getidset( - "GrantToRoles", fallback=fallback.GrantToRoles + self.granted_to_roles = self._mgr.register.init_option( + section=name, + option="GrantToRoles", + dest="granted_to_roles", + getter="getidset", + default=defaults.grant_to_roles, + comment="List of Discord server role IDs that are granted this permission group. This option is ignored if UserList is set.", + invisible=True, ) - self.user_list = section_data.getidset("UserList", fallback=fallback.UserList) - - self.max_songs = section_data.getint("MaxSongs", fallback=fallback.MaxSongs) - self.max_song_length = section_data.getint( - "MaxSongLength", fallback=fallback.MaxSongLength + self.user_list = self._mgr.register.init_option( + section=name, + option="UserList", + dest="user_list", + getter="getidset", + default=defaults.user_list, + comment="List of Discord member IDs that are granted permissions in this group. This option overrides GrantToRoles.", + invisible=True, ) - self.max_playlist_length = section_data.getint( - "MaxPlaylistLength", fallback=fallback.MaxPlaylistLength + self.max_songs = self._mgr.register.init_option( + section=name, + option="MaxSongs", + dest="max_songs", + getter="getint", + default=defaults.max_songs, + comment="Maximum number of songs a user is allowed to queue. A value of 0 means unlimited.", + empty_display_val="(Unlimited)", ) - self.max_search_items = section_data.getint( - "MaxSearchItems", fallback=fallback.MaxSearchItems + self.max_song_length = self._mgr.register.init_option( + section=name, + option="MaxSongLength", + dest="max_song_length", + getter="getint", + default=defaults.max_song_length, + comment=( + "Maximum length of a song in seconds. A value of 0 means unlimited.\n" + "This permission may not be enforced if song duration is not available." + ), + empty_display_val="(Unlimited)", ) - - self.allow_playlists = section_data.getboolean( - "AllowPlaylists", fallback=fallback.AllowPlaylists + self.max_playlist_length = self._mgr.register.init_option( + section=name, + option="MaxPlaylistLength", + dest="max_playlist_length", + getter="getint", + default=defaults.max_playlist_length, + comment="Maximum number of songs a playlist is allowed to have to be queued. A value of 0 means unlimited.", + empty_display_val="(Unlimited)", ) - self.instaskip = section_data.getboolean( - "InstaSkip", fallback=fallback.InstaSkip + self.max_search_items = self._mgr.register.init_option( + section=name, + option="MaxSearchItems", + dest="max_search_items", + getter="getint", + default=defaults.max_search_items, + comment="The maximum number of items that can be returned in a search.", ) - self.skiplooped = section_data.getboolean( - "SkipLooped", fallback=fallback.SkipLooped + self.allow_playlists = self._mgr.register.init_option( + section=name, + option="AllowPlaylists", + dest="allow_playlists", + getter="getboolean", + default=defaults.allow_playlists, + comment="Allow users to queue playlists, or multiple songs at once.", ) - self.remove = section_data.getboolean("Remove", fallback=fallback.Remove) - self.skip_when_absent = section_data.getboolean( - "SkipWhenAbsent", fallback=fallback.SkipWhenAbsent + self.instaskip = self._mgr.register.init_option( + section=name, + option="InstaSkip", + dest="instaskip", + getter="getboolean", + default=defaults.insta_skip, + comment="Allow users to skip without voting, if LegacySkip config option is enabled.", ) - self.bypass_karaoke_mode = section_data.getboolean( - "BypassKaraokeMode", fallback=fallback.BypassKaraokeMode + self.skip_looped = self._mgr.register.init_option( + section=name, + option="SkipLooped", + dest="skip_looped", + getter="getboolean", + default=defaults.skip_looped, + comment="Allows the user to skip a looped song.", ) - - self.summonplay = section_data.getboolean( - "SummonNoVoice", fallback=fallback.SummonNoVoice + self.remove = self._mgr.register.init_option( + section=name, + option="Remove", + dest="remove", + getter="getboolean", + default=defaults.remove, + comment=( + "Allows the user to remove any song from the queue.\n" + "Does not remove or skip currently playing songs." + ), ) - - self.extractors = section_data.getstrset( - "Extractors", fallback=fallback.Extractors + self.skip_when_absent = self._mgr.register.init_option( + section=name, + option="SkipWhenAbsent", + dest="skip_when_absent", + getter="getboolean", + default=defaults.skip_when_absent, + comment="Skip songs added by users who are not in voice when their song is played.", + ) + self.bypass_karaoke_mode = self._mgr.register.init_option( + section=name, + option="BypassKaraokeMode", + dest="bypass_karaoke_mode", + getter="getboolean", + default=defaults.bypass_karaoke_mode, + comment="Allows the user to add songs to the queue when Karaoke Mode is enabled.", + ) + self.summonplay = self._mgr.register.init_option( + section=name, + option="SummonNoVoice", + dest="summonplay", + getter="getboolean", + default=defaults.summon_no_voice, + comment=( + "Auto summon to user voice channel when using play commands, if bot isn't in voice already.\n" + "The summon command must still be allowed for this group!" + ), + ) + self.extractors = self._mgr.register.init_option( + section=name, + option="Extractors", + dest="extractors", + getter="getstrset", + default=defaults.extractors, + comment=( + "List of yt_dlp extractor keys, separated by spaces, that are allowed to be used.\n" + "Services supported by yt_dlp shown here: https://github.com/yt-dlp/yt-dlp/blob/master/supportedsites.md \n" + "MusicBot also provides one custom service `spotify:musicbot` to enable or disable spotify API extraction.\n" + "NOTICE: MusicBot might not support all services available to yt_dlp!\n" + ), + empty_display_val="(All allowed)", ) self.validate() @@ -277,8 +505,165 @@ def remove_user(self, uid: int) -> None: if uid in self.user_list: self.user_list.remove(uid) + def can_use_command(self, command: str) -> None: + """ + Test if command is enabled in this permission group. + + :raises: PermissionsError if command is denied from use. + """ + if self.command_whitelist and command not in self.command_whitelist: + raise PermissionsError( + f"This command is not enabled for your group ({self.name}).", + expire_in=20, + ) + + if self.command_blacklist and command in self.command_blacklist: + raise PermissionsError( + f"This command is disabled for your group ({self.name}).", + expire_in=20, + ) + + def format(self, for_user: bool = False) -> str: + """ + Format the current group values into INI-like text. + + :param: for_user: Present values for display, instead of literal values. + """ + perms = f"Permission group name: {self.name}\n" + for opt in self._mgr.register.option_list: + if opt.section != self.name: + continue + if opt.invisible and for_user: + continue + val = self._mgr.register.to_ini(opt) + if not val and opt.empty_display_val: + val = opt.empty_display_val + perms += f"{opt.option} = {val}\n" + return perms + def __repr__(self) -> str: return f"" def __str__(self) -> str: return f"" + + +class PermissionOptionRegistry(ConfigOptionRegistry): + def __init__(self, config: Permissions, parser: ExtendedConfigParser) -> None: + super().__init__(config, parser) + + def validate_register_destinations(self) -> None: + """Check all configured options for matching destination definitions.""" + if not isinstance(self._config, Permissions): + raise RuntimeError( + "Dev Bug! Somehow this is Config when it should be Permissions." + ) + + errors = [] + for opt in self._option_list: + if not hasattr(self._config.groups[opt.section], opt.dest): + errors.append( + f"Permission `{opt}` has an missing destination named: {opt.dest}" + ) + if errors: + msg = "Dev Bug! Some permissions failed validation.\n" + msg += "\n".join(errors) + raise RuntimeError(msg) + + @property + def distinct_options(self) -> Set[str]: + """Unique Permission names for Permission groups.""" + return self._distinct_options + + def get_option_dict(self, group: str) -> Dict[str, ConfigOption]: + """Get only ConfigOptions for the group, in a dict by option name.""" + return {opt.option: opt for opt in self.option_list if opt.section == group} + + def unregister_group(self, group: str) -> None: + """Removes all registered options for group.""" + new_opts = [] + for opt in self.option_list: + if opt.section == group: + continue + new_opts.append(opt) + self._option_list = new_opts + + def get_values(self, opt: ConfigOption) -> Tuple[RegTypes, str, str]: + """ + Get the values in PermissionGroup and *ConfigParser for this option. + Returned tuple contains parsed value, ini-string, and a display string + for the parsed config value if applicable. + Display string may be empty if not used. + """ + if not isinstance(self._config, Permissions): + raise RuntimeError( + "Dev Bug! Somehow this is Config when it should be Permissions." + ) + + if not opt.editable: + return ("", "", "") + + if opt.section not in self._config.groups: + raise ValueError( + f"Dev Bug! PermissionGroup named `{opt.section}` does not exist." + ) + + if not hasattr(self._config.groups[opt.section], opt.dest): + raise AttributeError( + f"Dev Bug! Attribute `PermissionGroup.{opt.dest}` does not exist." + ) + + if not hasattr(self._parser, opt.getter): + raise AttributeError( + f"Dev Bug! Method `*ConfigParser.{opt.getter}` does not exist." + ) + + parser_get = getattr(self._parser, opt.getter) + config_value = getattr(self._config.groups[opt.section], opt.dest) + parser_value = parser_get(opt.section, opt.option, fallback=opt.default) + + display_config_value = "" + if not config_value and opt.empty_display_val: + display_config_value = opt.empty_display_val + + return (config_value, parser_value, display_config_value) + + def get_parser_value(self, opt: ConfigOption) -> RegTypes: + """returns the parser's parsed value for the given option.""" + getter = getattr(self._parser, opt.getter, None) + if getter is None: + raise AttributeError( + f"Dev Bug! Attribute *ConfigParser.{opt.getter} does not exist." + ) + + val: RegTypes = getter(opt.section, opt.option, fallback=opt.default) + if not isinstance(val, type(opt.default)): + raise TypeError("Dev Bug! Type from parser does not match default type.") + return val + + def to_ini(self, option: ConfigOption, use_default: bool = False) -> str: + """ + Convert the parsed permission value into an INI value. + This method does not perform validation, simply converts the value. + + :param: use_default: return the default value instead of current config. + """ + if not isinstance(self._config, Permissions): + raise RuntimeError( + "Dev Bug! Registry does not have Permissions config object." + ) + + if use_default: + conf_value = option.default + else: + if option.section not in self._config.groups: + raise ValueError(f"No PermissionGroup by the name `{option.section}`") + + group = self._config.groups[option.section] + if not hasattr(group, option.dest): + raise AttributeError( + f"Dev Bug! Attribute `PermissionGroup.{option.dest}` does not exist." + ) + + conf_value = getattr(group, option.dest) + return self._value_to_ini(conf_value, option.getter) diff --git a/musicbot/utils.py b/musicbot/utils.py index 3a0b4a859..8051f5f5f 100644 --- a/musicbot/utils.py +++ b/musicbot/utils.py @@ -247,13 +247,11 @@ def set_logging_level(level: int, override: bool = False) -> None: dlogger.setLevel(level) -# TODO: perhaps add config file option for max logs kept. def set_logging_max_kept_logs(number: int) -> None: """Inform the logger how many logs it should keep.""" setattr(logging, "mb_max_logs_kept", number) -# TODO: perhaps add a config file option for date format. def set_logging_rotate_date_format(sftime: str) -> None: """Inform the logger how it should format rotated file date strings.""" setattr(logging, "mb_rot_date_fmt", sftime) @@ -663,10 +661,13 @@ def format_song_duration(seconds: Union[int, float, datetime.timedelta]) -> str: # Simply remove any microseconds from the delta. time_delta = str(seconds).split(".", maxsplit=1)[0] + t_hours = seconds.seconds / 3600 - # Check the hours portion for empty 0 and remove it. - duration_array = time_delta.split(":") - return time_delta if int(duration_array[0]) > 0 else ":".join(duration_array[1:]) + # if hours is 0 remove it. + if seconds.days == 0 and t_hours < 1: + duration_array = time_delta.split(":") + return ":".join(duration_array[1:]) + return time_delta def format_size_from_bytes(size_bytes: int) -> str: @@ -759,6 +760,33 @@ def format_time_to_seconds(time_str: Union[str, int]) -> int: if isinstance(time_str, int): return time_str + # support HH:MM:SS notations like those from timedelta.__str__ + hms_total = 0 + if ":" in time_str: + parts = time_str.split() + for part in parts: + bits = part.split(":") + part_sec = 0 + try: + # format is MM:SS + if len(bits) == 2: + m = int(bits[0]) + s = int(bits[1]) + part_sec += (m * 60) + s + # format is HH:MM:SS + elif len(bits) == 3: + h = int(bits[0] or 0) + m = int(bits[1]) + s = int(bits[2] or 0) + part_sec += (h * 3600) + (m * 60) + s + # format is not supported. + else: + continue + hms_total += part_sec + time_str = time_str.replace(part, "") + except (ValueError, TypeError): + continue + # TODO: find a good way to make this i18n friendly. time_lex = re.compile(r"(\d*\.?\d+)\s*(y|d|h|m|s)?", re.I) unit_seconds = { @@ -768,7 +796,7 @@ def format_time_to_seconds(time_str: Union[str, int]) -> int: "m": 60, "s": 1, } - total_sec = 0 + total_sec = hms_total for value, unit in time_lex.findall(time_str): if not unit: unit = "s" diff --git a/requirements.txt b/requirements.txt index 7f50f54c3..e1c11500d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ yt-dlp colorlog cffi --only-binary all; sys_platform == 'win32' certifi +configupdater