diff --git a/ctfcli/__main__.py b/ctfcli/__main__.py index 0a9a72b..7557ed8 100644 --- a/ctfcli/__main__.py +++ b/ctfcli/__main__.py @@ -133,7 +133,7 @@ def main(): # if the command returns an int, then we serialize it as none to prevent fire from printing it # (this does not change the actual return value, so it's still good to use as an exit code) # everything else is returned as is, so fire can print help messages - ret = fire.Fire(CTFCLI, serialize=lambda r: None if isinstance(r, int) else r) + ret = fire.Fire(COMMANDS["cli"], serialize=lambda r: None if isinstance(r, int) else r) if isinstance(ret, int): sys.exit(ret) diff --git a/ctfcli/cli/challenges.py b/ctfcli/cli/challenges.py index a97229c..c84bc0b 100644 --- a/ctfcli/cli/challenges.py +++ b/ctfcli/cli/challenges.py @@ -2,7 +2,7 @@ import os import subprocess from pathlib import Path -from typing import Tuple, Union +from typing import List, Optional, Tuple, Union from urllib.parse import urlparse import click @@ -23,30 +23,31 @@ class ChallengeCommand: def new(self, type: str = "blank") -> int: log.debug(f"new: (type={type})") - config = Config() + # If the type is blank, use the built-in default template if type == "blank": - template_path = config.base_path / "templates" / type / "default" + template_path = Config.get_base_path() / "templates" / type / "default" log.debug(f"template_path: {template_path}") cookiecutter(str(template_path)) return 0 - # Check if we're referencing an installed template - template_path = config.templates_path / type + # If the type is not the default 'blank' - check if it's installed + template_path = Config.get_templates_path() / type if template_path.is_dir(): # If we found a template directory, use it log.debug(f"template_path: {template_path}") cookiecutter(str(template_path)) return 0 - # If we didn't, use a built-in template + # If it's not installed, check if it's built-in + # Without a specified variant if os.sep not in type: - # if variant wasn't specified use the default - template_path = config.base_path / "templates" / type / "default" + template_path = Config.get_base_path() / "templates" / type / "default" log.debug(f"template_path: {template_path}") cookiecutter(str(template_path)) return 0 - template_path = config.base_path / "templates" / type + # With a specified variant + template_path = Config.get_base_path() / "templates" / type if template_path.is_dir(): log.debug(f"template_path: {template_path}") cookiecutter(str(template_path)) @@ -60,30 +61,15 @@ def new(self, type: str = "blank") -> int: def edit(self, challenge: str, dockerfile: bool = False) -> int: log.debug(f"edit: {challenge} (dockerfile={dockerfile})") - config = Config() - requested_challenge = config["challenges"].get(challenge, None) - if not requested_challenge: - click.secho( - f"Could not find added challenge '{challenge}' " - "Please check that the challenge is added to .ctf/config and that your path matches", - fg="red", - ) + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: return 1 - challenge_path = config.project_path / challenge - if not challenge.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - challenge = Challenge(challenge_path) - except ChallengeException as e: - click.secho(str(e), fg="red") - return 1 - - edited_file_path = challenge_path + edited_file_path = challenge_instance.challenge_file_path if dockerfile: - dockerfile_path = config.project_path / challenge_path.parent / challenge.get("image", ".") + dockerfile_path = challenge_instance.challenge_directory / challenge_instance.get("image", ".") + if not str(dockerfile_path).endswith("Dockerfile"): dockerfile_path = dockerfile_path / "Dockerfile" @@ -107,22 +93,12 @@ def show(self, challenge: str, color=True) -> int: def view(self, challenge: str, color=True) -> int: log.debug(f"view: {challenge} (color={color})") - config = Config() - requested_challenge = config["challenges"].get(challenge, None) - if not requested_challenge: - click.secho( - f"Could not find added challenge '{challenge}' " - "Please check that the challenge is added to .ctf/config and that your path matches", - fg="red", - ) + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: return 1 - challenge_path = config.project_path / challenge - if not challenge.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - with open(challenge_path, "r") as challenge_yml_file: + with open(challenge_instance.challenge_file_path, "r") as challenge_yml_file: challenge_yml = challenge_yml_file.read() if color: @@ -144,14 +120,14 @@ def add(self, repo: str, directory: str = None, yaml_path: str = None) -> int: # check if we're working with a remote challenge which has to be pulled first if repo.endswith(".git"): - # Get relative path from project root to current directory + # Get a relative path from project root to current directory project_path = config.project_path project_relative_cwd = Path.cwd().relative_to(project_path) - # Get new directory that will add the git subtree + # Get a new directory that will add the git subtree repository_basename = Path(repo).stem - # Use the custom subdirectory for the challenge, if one was provided + # Use the custom subdirectory for the challenge if one was provided repository_path = repository_basename if directory: custom_directory_path = Path(directory) @@ -160,12 +136,12 @@ def add(self, repo: str, directory: str = None, yaml_path: str = None) -> int: # Join targets challenge_path = project_relative_cwd / repository_path - # If a custom yaml_path is specified we add it to our challenge_key + # If a custom yaml_path is specified, we add it to our challenge_key challenge_key = challenge_path if yaml_path: challenge_key = challenge_key / yaml_path - # Add new challenge to the config + # Add a new challenge to the config config["challenges"][str(challenge_key)] = repo head_branch = get_git_repo_head_branch(repo) @@ -237,7 +213,7 @@ def push(self, challenge: str = None) -> int: challenge_repo = config.challenges.get(str(challenge_path), None) # if we don't find the challenge by the directory, - # check if it's saved with direct path to challenge.yml + # check if it's saved with a direct path to challenge.yml if not challenge_repo: challenge_repo = config.challenges.get(str(challenge_path / "challenge.yml"), None) @@ -317,7 +293,7 @@ def pull(self, challenge: str = None) -> int: challenge_repo = config.challenges.get(str(challenge_path), None) # if we don't find the challenge by the directory, - # check if it's saved with direct path to challenge.yml + # check if it's saved with a direct path to challenge.yml if not challenge_repo: challenge_repo = config.challenges.get(str(challenge_path / "challenge.yml"), None) @@ -395,7 +371,6 @@ def restore(self, challenge: str = None) -> int: failed_restores = [] for challenge_key, challenge_source in config.challenges.items(): - # if challenge is specified, loop through challenges to find it if challenge is not None and challenge_key != challenge: continue @@ -406,7 +381,7 @@ def restore(self, challenge: str = None) -> int: ) continue - # Check if we have a target directory, or the challenge is saved as a reference to challenge.yml + # Check if we have a target directory, or the challenge is saved as a reference to challenge.yml. # We cannot restore this, as we don't know the root of the challenge to pull the subtree if challenge_key.endswith(".yml"): click.secho( @@ -473,140 +448,126 @@ def install( self, challenge: str = None, force: bool = False, hidden: bool = False, ignore: Union[str, Tuple[str]] = () ) -> int: log.debug(f"install: (challenge={challenge}, force={force}, hidden={hidden}, ignore={ignore})") - config = Config() - challenge_keys = [challenge] + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 - # Get all challenges if not specifying a challenge - if challenge is None: - challenge_keys = config.challenges.keys() + local_challenges = [challenge_instance] + else: + local_challenges = self.resolve_all_challenges() - # Check if there are attributes to be ignored, and if there's only one cast it to a tuple if isinstance(ignore, str): ignore = (ignore,) - failed_installs = [] - with click.progressbar(challenge_keys, label="Installing challenges") as challenges: - for challenge_key in challenges: - click.echo() # echo a new line as a separator - challenge_path = config.project_path / Path(challenge_key) - - # if the challenge key does not end with .yml - then assume the default challenge.yml location - # otherwise - treat it as a full path - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" + config = Config() + remote_challenges = Challenge.load_installed_challenges() - try: - challenge = Challenge(challenge_path) - if hidden: - challenge["state"] = "hidden" + failed_installs = [] + with click.progressbar(local_challenges, label="Installing challenges") as challenges: + for challenge_instance in challenges: + click.echo() - except ChallengeException as e: - click.secho(str(e), fg="red") - failed_installs.append(challenge_key) - continue + if hidden: + challenge_instance["state"] = "hidden" click.secho( - f"Installing '{challenge['name']}' ({challenge_path.relative_to(config.project_path)}) ...", + f"Installing '{challenge_instance}' (" + f"{challenge_instance.challenge_file_path.relative_to(config.project_path)}" + f") ...", fg="blue", ) - installed_challenges = challenge.load_installed_challenges() found_duplicate = False - for c in installed_challenges: - if c["name"] == challenge["name"]: + for remote_challenge in remote_challenges: + if remote_challenge["name"] == challenge_instance["name"]: click.secho( - f"Found already existing challenge with the same name ({challenge['name']}). " + f"Found already existing challenge with the same name ({remote_challenge['name']}). " "Perhaps you meant sync instead of install?", fg="red", ) found_duplicate = True + break if found_duplicate: if not force: - failed_installs.append(challenge_key) + failed_installs.append(challenge_instance) continue click.secho("Syncing existing challenge instead (because of --force)", fg="yellow") try: - challenge.sync(ignore=ignore) + challenge_instance.sync(ignore=ignore) except ChallengeException as e: click.secho("Failed to sync challenge", fg="red") click.secho(str(e), fg="red") - failed_installs.append(challenge_key) + failed_installs.append(challenge_instance) continue # If we don't break because of duplicated challenge names - continue the installation try: - challenge.create(ignore=ignore) + challenge_instance.create(ignore=ignore) except ChallengeException as e: click.secho("Failed to install challenge", fg="red") click.secho(str(e), fg="red") - failed_installs.append(challenge_key) + failed_installs.append(challenge_instance) if len(failed_installs) == 0: click.secho("Success! All challenges installed!", fg="green") return 0 click.secho("Install failed for:", fg="red") - for challenge in failed_installs: - click.echo(f" - {challenge}") + for challenge_instance in failed_installs: + click.echo(f" - {challenge_instance}") return 1 def sync(self, challenge: str = None, ignore: Union[str, Tuple[str]] = ()) -> int: log.debug(f"sync: (challenge={challenge}, ignore={ignore})") - config = Config() - challenge_keys = [challenge] - # Get all challenges if not specifying a challenge - if challenge is None: - challenge_keys = config.challenges.keys() + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 + + local_challenges = [challenge_instance] + else: + local_challenges = self.resolve_all_challenges() - # Check if there are attributes to be ignored, and if there's only one cast it to a tuple if isinstance(ignore, str): ignore = (ignore,) - failed_syncs = [] - with click.progressbar(challenge_keys, label="Syncing challenges") as challenges: - for challenge_key in challenges: - click.echo() # echo a new line as a separator - challenge_path = config.project_path / Path(challenge_key) - - # if the challenge key does not end with .yml - then assume the default challenge.yml location - # otherwise - treat it as a full path - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - challenge = Challenge(challenge_path) - except ChallengeException as e: - click.secho(str(e), fg="red") - failed_syncs.append(challenge_key) - continue + config = Config() + remote_challenges = Challenge.load_installed_challenges() - installed_challenges = challenge.load_installed_challenges() + failed_syncs = [] + with click.progressbar(local_challenges, label="Syncing challenges") as challenges: + for challenge_instance in challenges: + click.echo() - if not any(c["name"] == challenge["name"] for c in installed_challenges): + challenge_name = challenge_instance["name"] + if not any(c["name"] == challenge_name for c in remote_challenges): click.secho( - f"Could not find existing challenge {challenge['name']}. " + f"Could not find existing challenge {challenge_name}. " f"Perhaps you meant install instead of sync?", fg="red", ) - failed_syncs.append(challenge_key) + failed_syncs.append(challenge_instance) continue click.secho( - f"Syncing '{challenge['name']}' ({challenge_path.relative_to(config.project_path)}) ...", + f"Syncing '{challenge_name}' (" + f"{challenge_instance.challenge_file_path.relative_to(config.project_path)}" + f") ...", fg="blue", ) try: - challenge.sync(ignore=ignore) + challenge_instance.sync(ignore=ignore) except ChallengeException as e: click.secho("Failed to sync challenge", fg="red") click.secho(str(e), fg="red") - failed_syncs.append(challenge_key) + failed_syncs.append(challenge_instance) if len(failed_syncs) == 0: click.secho("Success! All challenges synced!", fg="green") @@ -626,39 +587,31 @@ def deploy( ) -> int: log.debug(f"deploy: (challenge={challenge}, host={host}, skip_login={skip_login})") - config = Config() - challenge_keys = [challenge] + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 - if challenge is None: - challenge_keys = config.challenges.keys() + challenges = [challenge_instance] + else: + challenges = self.resolve_all_challenges() - failed_deployments, failed_syncs = [], [] + deployable_challenges, failed_deployments, failed_syncs = [], [], [] # get challenges which can be deployed (have an image) - deployable_challenges = [] - for challenge_key in challenge_keys: - challenge_path = config.project_path / Path(challenge_key) - - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - challenge = Challenge(challenge_path) - if challenge.get("image"): - deployable_challenges.append(challenge) - - except ChallengeException as e: - click.secho(str(e), fg="red") - failed_deployments.append(challenge_key) - continue + for challenge_instance in challenges: + if challenge_instance.get("image"): + deployable_challenges.append(challenge_instance) + else: + failed_deployments.append(challenge_instance) + config = Config() with click.progressbar(deployable_challenges, label="Deploying challenges") as challenges: - for challenge in challenges: - click.echo() # echo a new line as a separator + for challenge_instance in challenges: + click.echo() - challenge_name = challenge.get("name") - challenge_key = challenge.challenge_file_path.parent - target_host = host or challenge.get("host") + challenge_name = challenge_instance.get("name") + target_host = host or challenge_instance.get("host") # Default to cloud deployment if host is not specified scheme = "cloud" @@ -675,61 +628,66 @@ def deploy( scheme = url.scheme deployment_handler = get_deployment_handler(scheme)( - challenge, host=host, protocol=challenge.get("protocol") + challenge_instance, host=host, protocol=challenge_instance.get("protocol") ) + click.secho( f"Deploying challenge service '{challenge_name}' " - f"({challenge.challenge_file_path.relative_to(config.project_path)}) " + f"({challenge_instance.challenge_file_path.relative_to(config.project_path)}) " f"with {deployment_handler.__class__.__name__} ...", fg="blue", ) + deployment_result = deployment_handler.deploy(skip_login=skip_login) - # Use hardcoded connection_info if specified - if challenge.get("connection_info"): - click.secho("Using connection_info hardcoded in challenge.yml", fg="yellow") + # Don't modify the connection_info if it exists already + if challenge_instance.get("connection_info"): + click.secho("Using connection_info from challenge.yml", fg="yellow") # Otherwise, use connection_info from the deployment result if provided elif deployment_result.connection_info: - challenge["connection_info"] = deployment_result.connection_info + challenge_instance["connection_info"] = deployment_result.connection_info # Finally, if no connection_info was provided in the challenge and the # deployment didn't result in one either, just ensure it's not present else: - challenge["connection_info"] = None + challenge_instance["connection_info"] = None if not deployment_result.success: click.secho("An error occurred during service deployment!", fg="red") - failed_deployments.append(challenge_key) + failed_deployments.append(challenge_instance) continue - installed_challenges = challenge.load_installed_challenges() + installed_challenges = Challenge.load_installed_challenges() existing_challenge = next( - (c for c in installed_challenges if c["name"] == challenge["name"]), + (c for c in installed_challenges if c["name"] == challenge_instance["name"]), None, ) - if challenge["connection_info"]: + if challenge_instance.get("connection_info"): click.secho( - f"Challenge service deployed at: {challenge['connection_info']}", + f"Challenge service deployed at: {challenge_instance['connection_info']}", fg="green", ) - challenge.save() # Save the challenge with the new connection_info + challenge_instance.save() # Save the challenge with the new connection_info else: click.secho( - "Could not resolve a connection_info for the deployed service.\nIf your DeploymentHandler " - "does not return a connection_info, make sure to provide one in the challenge.yml file.", + "Could not resolve a connection_info for the deployed service.\n" + "If your DeploymentHandler does not return a connection_info, " + "make sure to provide one in the challenge.yml file.", fg="yellow", ) try: if existing_challenge: click.secho(f"Updating challenge '{challenge_name}'", fg="blue") - challenge.sync(ignore=["flags", "topics", "tags", "files", "hints", "requirements", "state"]) + challenge_instance.sync( + ignore=["flags", "topics", "tags", "files", "hints", "requirements", "state"] + ) else: click.secho(f"Creating challenge '{challenge_name}'", fg="blue") - challenge.create() + challenge_instance.create() except ChallengeException as e: click.secho( @@ -738,7 +696,7 @@ def deploy( fg="red", ) click.secho(str(e), fg="red") - failed_syncs.append(challenge_key) + failed_syncs.append(challenge_instance) click.secho("Success!\n", fg="green") @@ -751,13 +709,13 @@ def deploy( if len(failed_deployments) > 0: click.secho("Deployment failed for:", fg="red") - for challenge in failed_deployments: - click.echo(f" - {challenge}") + for challenge_instance in failed_deployments: + click.echo(f" - {challenge_instance}") if len(failed_syncs) > 0: click.secho("Install / Sync failed for:", fg="red") - for challenge in failed_deployments: - click.echo(f" - {challenge}") + for challenge_instance in failed_deployments: + click.echo(f" - {challenge_instance}") return 1 @@ -768,25 +726,14 @@ def lint( flag_format: str = "flag{", ) -> int: log.debug(f"lint: (challenge={challenge}, skip_hadolint={skip_hadolint}, flag_format='{flag_format}')") - config = Config() - challenge_path = Path.cwd() - - if challenge: - challenge_path = config.project_path / Path(challenge) - - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - try: - challenge = Challenge(challenge_path) - except ChallengeException as e: - click.secho(str(e), fg="red") + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: return 1 - click.secho(f"Loaded {challenge['name']}", fg="blue") - + click.secho(f"Loaded {challenge_instance}", fg="blue") try: - challenge.lint(skip_hadolint=skip_hadolint, flag_format=flag_format) + challenge_instance.lint(skip_hadolint=skip_hadolint, flag_format=flag_format) except LintException as e: click.secho("Linting found issues!\n", fg="yellow") e.print_summary() @@ -795,43 +742,33 @@ def lint( click.secho("Success! Lint didn't find any issues!", fg="green") return 0 - def healthcheck(self, challenge: str = None) -> int: + def healthcheck(self, challenge: Optional[str] = None) -> int: log.debug(f"healthcheck: (challenge={challenge})") - config = Config() - challenge_path = Path.cwd() - - if challenge: - challenge_path = config.project_path / Path(challenge) - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - challenge = Challenge(challenge_path) - except ChallengeException as e: - click.secho(str(e), fg="red") + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: return 1 - click.secho(f"Loaded {challenge['name']}", fg="blue") - healthcheck = challenge.get("healthcheck", None) + click.secho(f"Loaded {challenge_instance}", fg="blue") + healthcheck = challenge_instance.get("healthcheck", None) if not healthcheck: click.secho( - f"Challenge '{challenge['name']}' does not define a healthcheck.", + f"Challenge '{challenge_instance}' does not define a healthcheck.", fg="red", ) return 1 # Get challenges installed from CTFd and try to find our challenge - installed_challenges = Challenge.load_installed_challenges() + remote_challenges = Challenge.load_installed_challenges() challenge_id = None - for c in installed_challenges: - if challenge["name"] == c["name"]: - challenge_id = c["id"] + for remote_challenge in remote_challenges: + if challenge_instance["name"] == remote_challenge["name"]: + challenge_id = remote_challenge["id"] if challenge_id is None: click.secho( - f"Could not find existing challenge '{challenge['name']}'. " + f"Could not find existing challenge '{challenge_instance}'. " f"Challenge needs to be installed and deployed to run a healthcheck.", fg="red", ) @@ -839,22 +776,25 @@ def healthcheck(self, challenge: str = None) -> int: challenge_data = Challenge.load_installed_challenge(challenge_id) if not challenge_data: - click.secho(f"Could not load data for challenge '{challenge['name']}'.", fg="red") + click.secho(f"Could not load data for challenge '{challenge_instance}'.", fg="red") return 1 connection_info = challenge_data.get("connection_info") if not connection_info: click.secho( - f"Challenge '{challenge['name']}' does not provide connection info. " + f"Challenge '{challenge_instance}' does not provide connection info. " "Perhaps it needs to be deployed first?", fg="red", ) return 1 - log.debug(f"call(['{healthcheck}', '--connection-info', '{connection_info}'], cwd='{challenge_path.parent}')") + log.debug( + f"call(['{healthcheck}', '--connection-info', '{connection_info}'], " + f"cwd='{challenge_instance.challenge_directory}')" + ) healthcheck_status = subprocess.call( [healthcheck, "--connection-info", connection_info], - cwd=challenge_path.parent, + cwd=challenge_instance.challenge_directory, ) if healthcheck_status != 0: @@ -871,38 +811,26 @@ def mirror( skip_verify: bool = False, ignore: Union[str, Tuple[str]] = (), ) -> int: - config = Config() - challenge_keys = [challenge] + log.debug( + f"mirror: (challenge={challenge}, files_directory={files_directory}, " + f"skip_verify={skip_verify}, ignore={ignore})" + ) - # Get all local challenges if not specifying a challenge - if challenge is None: - challenge_keys = config.challenges.keys() + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 + + local_challenges = [challenge_instance] + else: + local_challenges = self.resolve_all_challenges() - # Check if there are attributes to be ignored, and if there's only one cast it to a tuple if isinstance(ignore, str): ignore = (ignore,) - # Load local challenges - local_challenges, failed_mirrors = [], [] - for challenge_key in challenge_keys: - challenge_path = config.project_path / Path(challenge_key) - - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - local_challenges.append(Challenge(challenge_path)) - - except ChallengeException as e: - click.secho(str(e), fg="red") - failed_mirrors.append(challenge_key) - continue - remote_challenges = Challenge.load_installed_challenges() - - if len(challenge_keys) > 1: - # When mirroring all challenges - issue a warning if there are extra challenges on the remote - # that do not have a local version + if len(local_challenges) > 1: + # Issue a warning if there are extra challenges on the remote that do not have a local version local_challenge_names = [c["name"] for c in local_challenges] for remote_challenge in remote_challenges: @@ -914,65 +842,51 @@ def mirror( fg="yellow", ) + failed_mirrors = [] with click.progressbar(local_challenges, label="Mirroring challenges") as challenges: - for challenge in challenges: + for challenge_instance in challenges: try: - if not skip_verify and challenge.verify(ignore=ignore): + if not skip_verify and challenge_instance.verify(ignore=ignore): click.secho( - f"Challenge '{challenge['name']}' is already in sync. Skipping mirroring.", + f"Challenge '{challenge_instance}' is already in sync. Skipping mirroring.", fg="blue", ) else: # if skip_verify is True or challenge.verify(ignore=ignore) is False - challenge.mirror(files_directory_name=files_directory, ignore=ignore) + challenge_instance.mirror(files_directory_name=files_directory, ignore=ignore) except ChallengeException as e: click.secho(str(e), fg="red") - failed_mirrors.append(challenge["name"]) + failed_mirrors.append(challenge_instance) if len(failed_mirrors) == 0: click.secho("Success! All challenges mirrored!", fg="green") return 0 click.secho("Mirror failed for:", fg="red") - for challenge in failed_mirrors: - click.echo(f" - {challenge}") + for challenge_instance in failed_mirrors: + click.echo(f" - {challenge_instance}") return 1 def verify(self, challenge: str = None, ignore: Tuple[str] = ()) -> int: - config = Config() - challenge_keys = [challenge] + log.debug(f"verify: (challenge={challenge}, ignore={ignore})") + + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 - # Get all local challenges if not specifying a challenge - if challenge is None: - challenge_keys = config.challenges.keys() + local_challenges = [challenge_instance] + else: + local_challenges = self.resolve_all_challenges() - # Check if there are attributes to be ignored, and if there's only one cast it to a tuple if isinstance(ignore, str): ignore = (ignore,) - # Load local challenges - local_challenges, failed_verifications = [], [] - for challenge_key in challenge_keys: - challenge_path = config.project_path / Path(challenge_key) - - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" - - try: - local_challenges.append(Challenge(challenge_path)) - - except ChallengeException as e: - click.secho(str(e), fg="red") - failed_verifications.append(challenge_key) - continue - remote_challenges = Challenge.load_installed_challenges() - - if len(challenge_keys) > 1: - # When verifying all challenges - issue a warning if there are extra challenges on the remote - # that do not have a local version + if len(local_challenges) > 1: + # Issue a warning if there are extra challenges on the remote that do not have a local version local_challenge_names = [c["name"] for c in local_challenges] for remote_challenge in remote_challenges: @@ -983,31 +897,31 @@ def verify(self, challenge: str = None, ignore: Tuple[str] = ()) -> int: fg="yellow", ) - challenges_in_sync, challenges_out_of_sync = [], [] + failed_verifications, challenges_in_sync, challenges_out_of_sync = [], [], [] with click.progressbar(local_challenges, label="Verifying challenges") as challenges: - for challenge in challenges: + for challenge_instance in challenges: try: - if not challenge.verify(ignore=ignore): - challenges_out_of_sync.append(challenge["name"]) + if not challenge_instance.verify(ignore=ignore): + challenges_out_of_sync.append(challenge_instance) else: - challenges_in_sync.append(challenge["name"]) + challenges_in_sync.append(challenge_instance) except ChallengeException as e: click.secho(str(e), fg="red") - failed_verifications.append(challenge["name"]) + failed_verifications.append(challenge_instance) if len(failed_verifications) == 0: click.secho("Success! All challenges verified!", fg="green") if len(challenges_in_sync) > 0: click.secho("Challenges in sync:", fg="green") - for challenge in challenges_in_sync: - click.echo(f" - {challenge}") + for challenge_instance in challenges_in_sync: + click.echo(f" - {challenge_instance}") if len(challenges_out_of_sync) > 0: click.secho("Challenges out of sync:", fg="yellow") - for challenge in challenges_out_of_sync: - click.echo(f" - {challenge}") + for challenge_instance in challenges_out_of_sync: + click.echo(f" - {challenge_instance}") if len(challenges_out_of_sync) > 1: return 2 @@ -1015,33 +929,32 @@ def verify(self, challenge: str = None, ignore: Tuple[str] = ()) -> int: return 1 click.secho("Verification failed for:", fg="red") - for challenge in failed_verifications: - click.echo(f" - {challenge}") + for challenge_instance in failed_verifications: + click.echo(f" - {challenge_instance}") return 1 - def format(self, challenge: str = None) -> int: - config = Config() - challenge_keys = [challenge] - - # Get all local challenges if not specifying a challenge - if challenge is None: - challenge_keys = config.challenges.keys() + def format(self, challenge: Optional[str] = None) -> int: + log.debug(f"format: (challenge={challenge})") - failed_formats = [] - for challenge_key in challenge_keys: - challenge_path = config.project_path / Path(challenge_key) + if challenge: + challenge_instance = self.resolve_single_challenge(challenge) + if not challenge_instance: + return 1 - if not challenge_path.name.endswith(".yml"): - challenge_path = challenge_path / "challenge.yml" + challenges = [challenge_instance] + else: + challenges = self.resolve_all_challenges() + failed_formats = [] + for challenge_instance in challenges: try: - # load the challenge and save it without changes - Challenge(challenge_path).save() + # save the challenge without changes to trigger the format + challenge_instance.save() except ChallengeException as e: click.secho(str(e), fg="red") - failed_formats.append(challenge_key) + failed_formats.append(challenge_instance) continue if len(failed_formats) == 0: @@ -1049,7 +962,53 @@ def format(self, challenge: str = None) -> int: return 0 click.secho("Format failed for:", fg="red") - for challenge in failed_formats: - click.echo(f" - {challenge}") + for challenge_instance in failed_formats: + click.echo(f" - {challenge_instance}") return 1 + + @staticmethod + def resolve_single_challenge(challenge: Optional[str] = None) -> Optional[Challenge]: + # if a challenge is specified + if challenge: + # check if it's a path to challenge.yml, or the current directory + if challenge.endswith(".yml") or challenge.endswith(".yaml") or challenge == ".": + challenge_path = Path(challenge) + + # otherwise it's a name to be resolved from the config + else: + config = Config() + challenge_path = config.project_path / Path(challenge) + + # otherwise, assume it's in the current directory + else: + challenge_path = Path.cwd() + + if not challenge_path.name.endswith(".yml") and not challenge_path.name.endswith(".yaml"): + challenge_path = challenge_path / "challenge.yml" + + try: + return Challenge(challenge_path) + except ChallengeException as e: + click.secho(str(e), fg="red") + return + + @staticmethod + def resolve_all_challenges() -> List[Challenge]: + config = Config() + challenge_keys = config.challenges.keys() + + challenges = [] + for challenge_key in challenge_keys: + challenge_path = config.project_path / Path(challenge_key) + + if not challenge_path.name.endswith(".yml"): + challenge_path = challenge_path / "challenge.yml" + + try: + challenges.append(Challenge(challenge_path)) + except ChallengeException as e: + click.secho(str(e), fg="red") + continue + + return challenges diff --git a/ctfcli/core/challenge.py b/ctfcli/core/challenge.py index ef5a0f0..046add6 100644 --- a/ctfcli/core/challenge.py +++ b/ctfcli/core/challenge.py @@ -146,6 +146,9 @@ def __init__(self, challenge_yml: Union[str, PathLike], overrides=None): if self.get("image"): self.image = Image(slugify(self["name"]), self.challenge_directory / self["image"]) + def __str__(self): + return self["name"] + def _load_challenge_id(self): remote_challenges = self.load_installed_challenges() if not remote_challenges: diff --git a/ctfcli/core/plugins.py b/ctfcli/core/plugins.py index 4829493..7711245 100644 --- a/ctfcli/core/plugins.py +++ b/ctfcli/core/plugins.py @@ -13,6 +13,9 @@ def load_plugins(commands: Dict): sys.path.insert(0, str(plugins_path.absolute())) for plugin in sorted(plugins_path.iterdir()): + if plugin.name.startswith("_"): + continue + plugin_path = plugins_path / plugin / "__init__.py" log.debug(f"Loading plugin '{plugin}' from '{plugin_path}'")