Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: POC - Try Pluggy for HMSL modularity #812

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Empty file.
128 changes: 128 additions & 0 deletions ggshield/cmd/hmsl/hashicorp_plugin/hashicorp_impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import os
from typing import Iterator, Tuple, Optional

import click
import hmsl_check

from ggshield.core.errors import UnexpectedError
from ggshield.verticals.hmsl.collection import SecretWithKey, collect_list
from ggshield.verticals.hmsl.secret_manager.hashicorp_vault.api_client import VaultAPIClient
from ggshield.verticals.hmsl.secret_manager.hashicorp_vault.cli import get_vault_cli_token
from ggshield.verticals.hmsl.secret_manager.hashicorp_vault.exceptions import VaultCliTokenFetchingError, \
VaultForbiddenItemError


@hmsl_check.hookimpl
def cmd_options():
return [
click.option(
"--use-cli-token",
"use_cli_token",
is_flag=True,
show_default=True,
default=False,
help="Instead of getting the token from the environment variable, "
"get it from the CLI tool.",
),
click.option(
"--url",
"url",
required=False,
type=str,
help="The URL of the secret manager server.",
),
click.option(
"--recursive",
"-r",
"recursive",
is_flag=True,
show_default=True,
default=False,
help="If the secret manager path is a directory and not a file, explore recursively.",
),
click.argument(
"vault_path",
type=str,
),
]

def _get_vault_token(self, use_cli_token):
if use_cli_token:
try:
return get_vault_cli_token()
except VaultCliTokenFetchingError as exc:
raise click.UsageError(
f"could not get the token from Vault CLI: {str(exc)}"
) from exc

env_token = os.getenv("VAULT_TOKEN")
if env_token is None:
raise click.UsageError(
"you need to specify the Vault token to use, either through the VAULT_TOKEN"
" environment variable or by using --use-cli-token to use the token "
"from the Vault CLI."
)

return env_token

def _split_vault_mount_and_path(self, initial_path: str) -> Tuple[str, str]:
"""
From a given initial path like secret/my_app/prod/env, split it in two:
first the mount name, then the path.
"""
split_path = initial_path.lstrip("/").split("/")
return (split_path[0], "/".join(split_path[1:]).strip("/"))


@hmsl_check.hookimpl
def collect_secrets(
ctx: click.Context,
path: str,
args: str,
use_cli_token: bool,
url: Optional[str],
recursive: bool,
vault_path: str, **_
) -> Iterator[SecretWithKey]:
"""
Check secrets of an Hashicorp Vault instance.
Only compatible with the kv secret engines (v1 or v2) for now.

Will use the VAULT_URL environment variable to get the Vault instance URL or
the --url option if no environment variable is set.

Will use the VAULT_TOKEN environment variable to authenticate, except
if the --use-cli-token option is set.
"""

# Get the Vault URL
if url is None:
url = os.getenv("VAULT_URL")
if url is None:
raise Exception(
"you need to specify the URL of your Vault, "
"either through --url or in the VAULT_URL environment variable"
)
vault_token = _get_vault_token(use_cli_token)
vault_client = VaultAPIClient(url, vault_token)

# Get mount object and check it exists and we have access to it
mount_name, secret_path = _split_vault_mount_and_path(vault_path)
all_kv_mounts = list(vault_client.get_kv_mounts())
mount = next(
(item for item in all_kv_mounts if item.name == mount_name), None
)
if mount is None:
raise UnexpectedError(
f"mount {mount_name} not found. Make sure it exists and "
"that your token has access to it."
)
try:
result = vault_client.get_secrets(mount, secret_path, recursive)
# catch this error here if the path provided is a file that we don't have access to
except VaultForbiddenItemError:
raise UnexpectedError(
f"access to the given path '{vault_path}' was forbidden. "
"Are you sure the permissions of the token are correct?"
)
return collect_list(result.secrets)
3 changes: 3 additions & 0 deletions ggshield/cmd/hmsl/hmsl_check/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pluggy

hookimpl = pluggy.HookimplMarker("hmsl_check")
23 changes: 23 additions & 0 deletions ggshield/cmd/hmsl/hmsl_check/hookspecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pluggy

from typing import Iterator

from ggshield.verticals.hmsl.collection import SecretWithKey

hookspec = pluggy.HookspecMarker("hmsl_check")


@hookspec
def cmd_options():
"""
Specifies the available command options via click
"""


@hookspec
def collect_secrets(**_) -> Iterator[SecretWithKey]:
"""
Collects the secrets in a given scope
"""


52 changes: 52 additions & 0 deletions ggshield/cmd/hmsl/hmsl_check/host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import itertools

import hookspecs
import lib

import pluggy

from ggshield.cmd.hmsl.hmsl_utils import check_secrets
from ggshield.core.text_utils import display_info
from ggshield.verticals.hmsl.collection import prepare


def main(**kwargs):
pm = get_plugin_manager()
plugin = HmslPlugin(pm.hook)
collected_secrets = plugin.collect_secrets(**kwargs)
# full_hashes is True because we need the hashes to decrypt the secrets.
# They will correctly be truncated by our client later.
prepared_secrets = prepare(collected_secrets, naming_strategy, full_hashes=True)
display_info(f"Collected {len(prepared_secrets.payload)} secrets.")
check_secrets(
ctx=ctx,
prepared_secrets=prepared_secrets,
json_output=True,
full_hashes=True,
)
return 0


def get_plugin_manager():
pm = pluggy.PluginManager("hmsl_check")
pm.add_hookspecs(hookspecs)
pm.load_setuptools_entrypoints("hmsl_check")
pm.register(lib)
return pm


class HmslPlugin:
def __init__(self, hook):
self.hook = hook

@classmethod
def cmd_options(cls, self):
results = self.hook.cmd_options()
return list(itertools.chain(*results))

def collect_secrets(self, **_):
results = self.hook.collect_secrets(**_)
return list(itertools.chain(*results))

if __name__ == "__main__":
main()
38 changes: 38 additions & 0 deletions ggshield/cmd/hmsl/hmsl_check/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
from subprocess import run
from typing import Iterator

import click
import hmsl_check

from ggshield.cmd.hmsl.hmsl_common_options import (
input_arg,
)
from ggshield.verticals.hmsl.collection import (
SecretWithKey,
)


@hmsl_check.hookimpl
def cmd_options():
return [
input_arg,
click.option(
"--args",
"args",
default="",
required=False,
type=str,
help="Command options.",
)
]


@hmsl_check.hookimpl
def collect_secrets(ctx: click.Context, path: str, args: str, **_) -> Iterator[SecretWithKey]:
args = [arg for arg in args.split(' ') if arg]
res = run([path, *args], capture_output=True)
if res.returncode != 0:
raise Exception(res.stderr.decode('utf-8'))
for secret in json.loads(res.stdout.decode('utf-8')):
yield SecretWithKey(key=secret.get("key"), value=secret["value"])
9 changes: 7 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from setuptools import find_packages
from setuptools import setup


setup()
setup(
name="hmsl_check",
install_requires="pluggy>=0.3,<1.0",
entry_points={"console_scripts": ["eggsample=eggsample.host:main"]},
packages=find_packages(),
)