diff --git a/README.md b/README.md index a3e08d3..2eb2305 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,30 @@ Syncing buffer_overflow Success! ``` +## 5. Verify & pull challenges + +If, at any point, changes may have been made to a challenge through the CTFd UI by an admin. To verify that your challenge.yml file matches remote contents, you can use: + +``` +❯ ctf challenge verify [challenge.yml | DIRECTORY] [--verify-files] [--verify-defaults] +``` + +If the `--verify-files` flag is set, challenge files will be downloaded and the binary files will be compared. + +If the `--verify-defaults` flag is set, challenge files will be verified to make sure they include default optional keys present on CTFd. +If you want to pull down the latest version of the challenge, and its challenge files, you can use: + +``` +❯ ctf challenge verify [challenge.yml | DIRECTORY] [--update_files] [--create-files] [--create_defaults] +``` + +If the `--update_files` flag is set, the latest version of every file will be redownloaded from CTFd. + +If the `--create-files` flag is set, any new files added to through the CTFd UI will be downloaded to the same directory as the `challenge.yml` file. + +If the `--create-defaults` flag is set, any optional default values will be added to the `challenge.yml`. + +**This is a destructive action! It will overwrite the local version of `challenge.yml` with the version on CTFd!** # Challenge Templates `ctfcli` contains pre-made challenge templates to make it faster to create CTF challenges with safe defaults. diff --git a/ctfcli/cli/challenges.py b/ctfcli/cli/challenges.py index fd960cb..39cedf5 100644 --- a/ctfcli/cli/challenges.py +++ b/ctfcli/cli/challenges.py @@ -15,6 +15,8 @@ load_installed_challenge, load_installed_challenges, sync_challenge, + verify_challenge, + pull_challenge ) from ctfcli.utils.config import ( get_base_path, @@ -414,3 +416,53 @@ def healthcheck(self, challenge): f"Success", fg="green", ) sys.exit(0) + + def verify(self, challenge=None, ignore=(), verify_files=False, verify_defaults=False): + if isinstance(ignore, str): + ignore = (ignore,) + + if challenge is None: + # Get all challenges if not specifying a challenge + config = load_config() + challenges = dict(config["challenges"]).keys() + else: + challenges = [challenge] + + for challenge in challenges: + path = Path(challenge) + + if path.name.endswith(".yml") is False: + path = path / "challenge.yml" + + click.secho(f"Found {path}") + challenge = load_challenge(path) + click.secho(f'Loaded {challenge["name"]}', fg="yellow") + + click.secho(f'Verifying {challenge["name"]}', fg="yellow") + verify_challenge(challenge=challenge, ignore=ignore, verify_files=verify_files, verify_defaults=verify_defaults) + click.secho("Success!", fg="green") + + def pull(self, challenge=None, ignore=(), update_files=False, create_files=False, create_defaults=False): + if isinstance(ignore, str): + ignore = (ignore,) + + if challenge is None: + # Get all challenges if not specifying a challenge + config = load_config() + challenges = dict(config["challenges"]).keys() + else: + challenges = [challenge] + + for challenge in challenges: + path = Path(challenge) + + if path.name.endswith(".yml") is False: + path = path / "challenge.yml" + + click.secho(f"Found {path}") + challenge = load_challenge(path) + click.secho(f'Loaded {challenge["name"]}', fg="yellow") + + click.secho(f'Verifying {challenge["name"]}', fg="yellow") + pull_challenge(challenge=challenge, ignore=ignore, update_files=update_files, create_files=create_files, create_defaults=create_defaults) + click.secho("Success!", fg="green") \ No newline at end of file diff --git a/ctfcli/utils/challenge.py b/ctfcli/utils/challenge.py index 8595e66..fbaa6b0 100644 --- a/ctfcli/utils/challenge.py +++ b/ctfcli/utils/challenge.py @@ -1,3 +1,4 @@ +from collections import OrderedDict from pathlib import Path import subprocess @@ -404,3 +405,228 @@ def lint_challenge(path): exit(1) exit(0) + +def get_challenge_details(challenge_id): + s = generate_session() + r = s.get(f"/api/v1/challenges/{challenge_id}", json=True) + r.raise_for_status() + challenge = r.json()["data"] + # Remove non-yaml fields + + challenge.pop("id") + challenge.pop("type_data") + challenge.pop("view") + challenge.pop("solves") + challenge.pop("solved_by_me") + + # Normalize fields to ctfcli format + challenge['attempts'] = challenge['max_attempts'] + challenge.pop("max_attempts") + challenge['description'] = challenge['description'].replace('\r\n', '\n') + + for key in ['initial', 'decay', 'minimum']: + if key in challenge: + challenge['extra'][key] = challenge[key] + challenge.pop(key) + + # Add flags + r = s.get(f"/api/v1/challenges/{challenge_id}/flags", json=True) + r.raise_for_status() + flags = r.json()["data"] + challenge["flags"] = [f["content"] if f["type"] == "static" and (f["data"] == None or f["data"] == "") else { "content": f["content"], "type": f["type"], "data": f["data"] } for f in flags] + + # Add tags + r = s.get(f"/api/v1/challenges/{challenge_id}/tags", json=True) + r.raise_for_status() + tags = r.json()["data"] + challenge["tags"] = [t["value"] for t in tags] + + # Add hints + r = s.get(f"/api/v1/challenges/{challenge_id}/hints", json=True) + r.raise_for_status() + hints = r.json()["data"] + challenge["hints"] = [{ "content": h["content"], "cost": h["cost"] } if h["cost"] > 0 else h["content"] for h in hints] + + # Add topics + r = s.get(f"/api/v1/challenges/{challenge_id}/topics", json=True) + r.raise_for_status() + topics = r.json()["data"] + challenge["topics"] = [t["value"] for t in topics] + + # Add requirements + r = s.get(f"/api/v1/challenges/{challenge_id}/requirements", json=True) + r.raise_for_status() + requirements = (r.json().get("data") or {}).get("prerequisites", []) + if len(requirements) > 0: + # Prefer challenge names over IDs + r = s.get("/api/v1/challenges", json=True) + r.raise_for_status() + challenges = r.json()["data"] + challenge["requirements"] = [c["name"] for c in challenges if c["id"] in requirements] + + return challenge + +def is_default(key, value): + if key == "connection_info" and value == None: + return True + if key == "attempts" and value == 0: + return True + if key == "state" and value == "visible": + return True + if key == "type" and value == "standard": + return True + if key in ["tags", "hints", "topics", "requirements"] and value == []: + return True + return False + +def verify_challenge(challenge, ignore=(), verify_files=False, verify_defaults=False, _verify_new_files=True): + """ + Verify that the challenge.yml matches the remote challenge if one exists with the same name + """ + s = generate_session() + + installed_challenges = load_installed_challenges() + for c in installed_challenges: + if c["name"] == challenge["name"]: + challenge_id = c["id"] + break + else: + return + + remote_challenge = get_challenge_details(challenge_id) + for key in remote_challenge: + if key in ignore: + continue + # Special validation needed for files + if key == "files": + # Get base file name of challenge files + local_files = {Path(f).name : f for f in challenge[key]} + + for f in remote_challenge["files"]: + # Get base file name + f_base = f.split("/")[-1].split('?token=')[0] + if f_base not in local_files and _verify_new_files: + raise Exception(f"Remote challenge has file {f_base} that is not present locally") + else: + if verify_files: + # Download remote file and compare contents + req = s.get(f) + req.raise_for_status() + remote_file = req.content + local_file = Path(challenge.directory, local_files[f_base]).read_bytes() + if remote_file != local_file: + raise Exception(f"Remote challenge file {f_base} does not match local file") + + elif key not in challenge: + # Ignore optional keys with default values + if is_default(key, remote_challenge[key]) and not verify_defaults: + continue + + raise Exception(f"Missing field {key} in challenge.yml") + + elif challenge[key] != remote_challenge[key]: + raise Exception(f"Field {key} in challenge.yml does not match remote challenge") + +def pull_challenge(challenge, ignore=(), update_files=False, create_files=False, create_defaults=False): + """ + Rewrite challenge.yml and local files to match the remote challenge + """ + # Prefer multi-line YAML formatting (https://stackoverflow.com/q/8640959/15261182) + def str_presenter(dumper, data): + if len(data.splitlines()) > 1 or '\n' in data: + text_list = [line.rstrip() for line in data.splitlines()] + fixed_data = "\n".join(text_list) + return dumper.represent_scalar('tag:yaml.org,2002:str', fixed_data, style='|') + elif len(data) > 80: + return dumper.represent_scalar('tag:yaml.org,2002:str', data.rstrip(), style='>') + return dumper.represent_scalar('tag:yaml.org,2002:str', data) + + yaml.add_representer(str, str_presenter) + yaml.representer.SafeRepresenter.add_representer(str, str_presenter) + + + + + # Update challenge.yml if verification shows that something changed + try: + verify_challenge(challenge, ignore=ignore, verify_files=update_files, verify_defaults=create_defaults, _verify_new_files=create_files) + click.secho(f"Challenge {challenge['name']} is already up to date", fg="green") + return + except Exception: + pass + + s = generate_session() + installed_challenges = load_installed_challenges() + for c in installed_challenges: + if c["name"] == challenge["name"]: + challenge_id = c["id"] + break + else: + return + + remote_details = get_challenge_details(challenge_id) + + local_files = {Path(f).name : f for f in challenge.get("files",[])} + # Update files + for f in remote_details["files"]: + # Get base file name + f_base = f.split("/")[-1].split('?token=')[0] + if f_base not in local_files and create_files: + print(f"Creating file {f_base} for challenge {challenge['name']}") + # Download remote file and save locally + req = s.get(f) + req.raise_for_status() + Path(challenge.directory, f_base).write_bytes(req.content) + if "files" not in challenge: + challenge["files"] = [] + challenge["files"].append(f_base) + + elif f_base in local_files and update_files: + # Download remote file and replace local file + print(f"Updating file {f_base} for challenge {challenge['name']}") + req = s.get(f) + req.raise_for_status() + remote_file = req.content + local_file = Path(challenge.directory, local_files[f_base]) + local_file.write_bytes(remote_file) + + # Remove files that are no longer present on the remote challenge + remote_cleaned_files = [f.split("/")[-1].split('?token=')[0] for f in remote_details["files"]] + challenge["files"] = [f for f in challenge.get("files",[]) if Path(f).name in remote_cleaned_files] + del remote_details["files"] + + print(f"Updating challenge.yml for {challenge['name']}") + + # Prefer ordering remote details as described by spec + preferred_order = ["name", "category", "description", \ + "value", "type", "connection_info", "attempts", \ + "flags", "topics", "tags", "files", "hints", "requirements", "state"] + + # Merge local and remote challenge.yml & Preserve local keys + order + updated_challenge = dict(challenge) + + # Ignore optional fields with default values + remote_details_updates = {} + for k, v in remote_details.items(): + # If the key value changed, we want to update it + if k in challenge and challenge[k] != v: + remote_details_updates[k] = v + elif not is_default(k, v) or create_defaults: + remote_details_updates[k] = v + + # Add all preferred keys + for key in preferred_order: + if key in remote_details_updates and key not in ignore: + updated_challenge[key] = remote_details_updates[key] + + # Add remaining keys + for key in remote_details_updates: + if key not in preferred_order and key not in ignore: + updated_challenge[key] = remote_details_updates[key] + + # Hack: remove tabs in multiline strings + updated_challenge['description'] = updated_challenge['description'].replace('\t', '') + + with open(challenge.file_path, "w") as f: + yaml.dump(updated_challenge, f, allow_unicode=True, sort_keys=False) + \ No newline at end of file