diff --git a/README.md b/README.md index 571dc5db..4b85905d 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,9 @@ The CLI currently supports the following commands (and sub-commands): - `remove`|`rm`: Remove tags from a package in a repository. - `replace`: Replace all existing (non-immutable) tags on a package in a repository. - `whoami`: Retrieve your current authentication status. - +- `tokens`: Manage API tokens. + - `list`|`ls`: List API tokens. + - `refresh`: Refresh an API token. ## Installation diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index 88ce9cc8..02afc6b6 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -21,6 +21,7 @@ resync, status, tags, + tokens, upstream, whoami, ) diff --git a/cloudsmith_cli/cli/commands/auth.py b/cloudsmith_cli/cli/commands/auth.py index 46a7d8a4..276aefca 100644 --- a/cloudsmith_cli/cli/commands/auth.py +++ b/cloudsmith_cli/cli/commands/auth.py @@ -4,9 +4,13 @@ import click +from ...core.api import exceptions, user +from ...core.api.init import initialise_api +from ...core.config import create_config_files, new_config_messaging from .. import decorators, validators from ..exceptions import handle_api_exceptions from ..saml import create_configured_session, get_idp_url +from ..utils import maybe_spinner from ..webserver import AuthenticationWebRequestHandler, AuthenticationWebServer from .main import main @@ -21,11 +25,18 @@ prompt=True, help="The name of the Cloudsmith organization to authenticate with.", ) +@click.option( + "-t", + "--token", + default=False, + is_flag=True, + help="Retrieve a user API token after successful authentication.", +) @decorators.common_cli_config_options @decorators.common_cli_output_options @decorators.initialise_api @click.pass_context -def authenticate(ctx, opts, owner): +def authenticate(ctx, opts, owner, token): """Authenticate to Cloudsmith using the org's SAML setup.""" owner = owner[0].strip("'[]'") api_host = opts.api_config.host @@ -58,3 +69,46 @@ def authenticate(ctx, opts, owner): debug=opts.debug, ) auth_server.handle_request() + + if not token: + return + + initialise_api() + try: + api_token = user.create_user_token_saml() + click.echo(f"New token value: {click.style(api_token.key, fg='magenta')}") + create, has_errors = create_config_files(ctx, opts, api_key=api_token.key) + new_config_messaging(has_errors, opts, create, api_key=api_token.key) + return + except exceptions.ApiException as exc: + if exc.status == 400: + if "User has already created an API key" in exc.detail: + click.confirm( + "User already has a token. Would you like to recreate it?", + abort=True, + ) + else: + raise + + context_msg = "Failed to refresh the token!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + api_tokens = user.list_user_tokens() + for t in api_tokens: + click.echo("Current tokens:") + click.echo( + f"Token: {click.style(t.key, fg='magenta')}, " + f"Created: {click.style(t.created, fg='green')}, " + f"slug_perm: {click.style(t.slug_perm, fg='cyan')}" + ) + token_slug = click.prompt( + "Please enter the slug_perm of the token you would like to refresh" + ) + + click.echo(f"Refreshing token {token_slug}... ", nl=False) + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + new_token = user.refresh_user_token(token_slug) + click.secho("OK", fg="green") + click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") + create, has_errors = create_config_files(ctx, opts, api_key=new_token.key) + new_config_messaging(has_errors, opts, create, api_key=new_token.key) diff --git a/cloudsmith_cli/cli/commands/login.py b/cloudsmith_cli/cli/commands/login.py index a3dbecb2..da885ac1 100644 --- a/cloudsmith_cli/cli/commands/login.py +++ b/cloudsmith_cli/cli/commands/login.py @@ -1,23 +1,16 @@ """CLI/Commands - Get an API token.""" -import collections -import stat - import click import cloudsmith_api from ...core.api.exceptions import TwoFactorRequiredException from ...core.api.user import get_user_token -from ...core.utils import get_help_website +from ...core.config import create_config_files, new_config_messaging from .. import decorators from ..exceptions import handle_api_exceptions from ..utils import maybe_spinner from .main import main -ConfigValues = collections.namedtuple( - "ConfigValues", ["reader", "present", "mode", "data"] -) - def validate_login(ctx, param, value): """Ensure that login is not blank.""" @@ -28,81 +21,6 @@ def validate_login(ctx, param, value): return value -def create_config_files(ctx, opts, api_key): - """Create default config files.""" - # pylint: disable=unused-argument - config_reader = opts.get_config_reader() - creds_reader = opts.get_creds_reader() - has_config = config_reader.has_default_file() - has_creds = creds_reader.has_default_file() - - if has_config and has_creds: - create = False - else: - click.echo() - create = click.confirm( - "No default config file(s) found, do you want to create them?" - ) - - click.echo() - if not create: - click.secho( - "For reference here are your default config file locations:", fg="yellow" - ) - else: - click.secho( - "Great! Let me just create your default configs for you now ...", fg="green" - ) - - configs = ( - ConfigValues(reader=config_reader, present=has_config, mode=None, data={}), - ConfigValues( - reader=creds_reader, - present=has_creds, - mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP, - data={"api_key": api_key}, - ), - ) - - has_errors = False - for config in configs: - click.echo( - "%(name)s config file: %(filepath)s ... " - % { - "name": click.style(config.reader.config_name.capitalize(), bold=True), - "filepath": click.style( - config.reader.get_default_filepath(), fg="magenta" - ), - }, - nl=False, - ) - - if not config.present and create: - try: - ok = config.reader.create_default_file( - data=config.data, mode=config.mode - ) - except OSError as exc: - ok = False - error_message = exc.strerror - has_errors = True - - if ok: - click.secho("CREATED", fg="green") - else: - click.secho("ERROR", fg="red") - click.secho( - "The following error occurred while trying to " - "create the file: %(message)s" - % {"message": click.style(error_message, fg="red")} - ) - continue - - click.secho("EXISTS" if config.present else "NOT CREATED", fg="yellow") - - return create, has_errors - - @main.command(aliases=["token"]) @click.option( "-l", @@ -170,30 +88,4 @@ def login(ctx, opts, login, password): # pylint: disable=redefined-outer-name ) create, has_errors = create_config_files(ctx, opts, api_key=api_key) - - if has_errors: - click.echo() - click.secho("Oops, please fix the errors and try again!", fg="red") - return - - if opts.api_key != api_key: - click.echo() - if opts.api_key: - click.secho( - "Note: The above API key doesn't match what you have in " - "your default credentials config file.", - fg="yellow", - ) - elif not create: - click.secho( - "Note: Don't forget to put your API key in a config file, " - "export it on the environment, or set it via -k.", - fg="yellow", - ) - click.secho( - "If you need more help please see the documentation: " - "%(website)s" % {"website": click.style(get_help_website(), bold=True)} - ) - click.echo() - - click.secho("You're ready to rock, let's start automating!", fg="green") + new_config_messaging(has_errors, opts, create, api_key=api_key) diff --git a/cloudsmith_cli/cli/commands/tokens.py b/cloudsmith_cli/cli/commands/tokens.py new file mode 100644 index 00000000..480a1f06 --- /dev/null +++ b/cloudsmith_cli/cli/commands/tokens.py @@ -0,0 +1,84 @@ +import click + +from ...core.api import user as api +from .. import command, decorators +from ..exceptions import handle_api_exceptions +from ..utils import maybe_print_as_json, maybe_spinner +from .main import main + + +@main.group(cls=command.AliasGroup, name="tokens") +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@click.pass_context +def tokens(ctx, opts): + """Manage your user API tokens.""" + + +@tokens.command(name="list", aliases=["ls"]) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.pass_context +def list_tokens(ctx, opts): + """List all user API tokens.""" + use_stderr = opts.output in ("json", "pretty_json") + + click.echo("Retrieving API tokens... ", nl=False, err=use_stderr) + + context_msg = "Failed to retrieve API tokens!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + tokens = api.list_user_tokens() + click.secho("OK", fg="green", err=use_stderr) + + if maybe_print_as_json(opts, tokens): + return + + print_tokens(tokens) + + +@tokens.command() +@click.argument( + "token_slug", + required=False, +) +@decorators.common_cli_config_options +@decorators.common_cli_output_options +@decorators.common_api_auth_options +@decorators.initialise_api +@click.pass_context +def refresh(ctx, opts, token_slug): + """Refresh a specific API token by its slug.""" + context_msg = "Failed to refresh the token!" + + if not token_slug: + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + api_tokens = api.list_user_tokens() + click.echo("Current tokens:") + print_tokens(api_tokens) + token_slug = click.prompt( + "Please enter the slug_perm of the token you would like to refresh" + ) + + click.echo(f"Refreshing token {token_slug}... ", nl=False) + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + new_token = api.refresh_user_token(token_slug) + click.secho("OK", fg="green") + + if maybe_print_as_json(opts, new_token): + return + + click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}") + + +def print_tokens(tokens): + for token in tokens: + click.echo( + f"Token: {click.style(token.key, fg='magenta')}, " + f"Created: {click.style(token.created, fg='green')}, " + f"slug_perm: {click.style(token.slug_perm, fg='cyan')}" + ) diff --git a/cloudsmith_cli/cli/tests/commands/test_tokens.py b/cloudsmith_cli/cli/tests/commands/test_tokens.py new file mode 100644 index 00000000..0954407f --- /dev/null +++ b/cloudsmith_cli/cli/tests/commands/test_tokens.py @@ -0,0 +1,123 @@ +from unittest.mock import patch + +import pytest + +from cloudsmith_cli.cli.commands.tokens import list_tokens, refresh +from cloudsmith_cli.core.api.exceptions import ApiException + + +class MockToken: + """Mock Token object with the properties needed for testing.""" + + def __init__(self, key, created, slug_perm): + self.key = key + self.created = created + self.slug_perm = slug_perm + + +@pytest.mark.usefixtures("set_api_host_env_var") +class TestListTokensCommand: + + def test_list_tokens_success(self, runner): + """Test successful listing of tokens.""" + mock_tokens = [ + MockToken( + key="abc123", created="2025-01-01T00:00:00Z", slug_perm="token-1" + ), + MockToken( + key="def456", created="2025-01-02T00:00:00Z", slug_perm="token-2" + ), + ] + + with patch("cloudsmith_cli.core.api.user.list_user_tokens") as mock_list_tokens: + mock_list_tokens.return_value = mock_tokens + result = runner.invoke(list_tokens, [], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Retrieving API tokens... OK" in result.output + assert "Token: abc123" in result.output + assert "Created: 2025-01-01T00:00:00Z" in result.output + assert "slug_perm: token-1" in result.output + assert "Token: def456" in result.output + assert "Created: 2025-01-02T00:00:00Z" in result.output + assert "slug_perm: token-2" in result.output + + def test_list_tokens_error(self, runner): + """Test error handling when listing tokens fails.""" + with patch("cloudsmith_cli.core.api.user.list_user_tokens") as mock_list_tokens: + # Use ApiException for proper error handling + mock_list_tokens.side_effect = ApiException("API error") + result = runner.invoke(list_tokens, [], catch_exceptions=True) + + assert result.exit_code != 0 + + # The error message might be in different places depending on how the exception is raised + error_content = ( + str(getattr(result, "exception", "")) + result.output + result.stderr + ) + assert ( + "API error" in error_content + or "Failed to retrieve API tokens" in error_content + ) + + +@pytest.mark.usefixtures("set_api_host_env_var") +class TestRefreshTokenCommand: + """Test suite for the 'tokens refresh' command.""" + + def test_refresh_token_with_slug(self, runner): + """Test successful refreshing of a token with a provided slug.""" + mock_new_token = MockToken( + key="new_token_123", + created="2025-01-03T00:00:00Z", + slug_perm="token-refresh", + ) + + with patch( + "cloudsmith_cli.core.api.user.refresh_user_token" + ) as mock_refresh_token: + mock_refresh_token.return_value = mock_new_token + result = runner.invoke(refresh, ["token-refresh"], catch_exceptions=False) + + assert result.exit_code == 0 + assert "Refreshing token token-refresh... OK" in result.output + assert "New token value: new_token_123" in result.output + mock_refresh_token.assert_called_once_with("token-refresh") + + def test_refresh_token_error(self, runner): + """Test error handling when refreshing a token fails.""" + with patch( + "cloudsmith_cli.core.api.user.refresh_user_token" + ) as mock_refresh_token: + # Use ApiException for proper error handling + mock_refresh_token.side_effect = ApiException("API error") + result = runner.invoke(refresh, ["token-error"], catch_exceptions=True) + + assert result.exit_code != 0 + + # The error message might be in different places depending on how the exception is raised + error_content = ( + str(getattr(result, "exception", "")) + result.output + result.stderr + ) + assert ( + "API error" in error_content + or "Failed to refresh the token" in error_content + ) + + def test_refresh_token_list_error(self, runner): + """Test error handling when listing tokens fails during refresh.""" + with patch("cloudsmith_cli.core.api.user.list_user_tokens") as mock_list_tokens: + # Use ApiException for proper error handling + mock_list_tokens.side_effect = ApiException("API error") + result = runner.invoke(refresh, [], catch_exceptions=True) + + assert result.exit_code != 0 + + # The error message might be in different places depending on how the exception is raised + error_content = ( + str(getattr(result, "exception", "")) + result.output + result.stderr + ) + assert ( + "API error" in error_content + or "Failed to refresh the token" in error_content + ) diff --git a/cloudsmith_cli/core/api/user.py b/cloudsmith_cli/core/api/user.py index 55a247ce..b42ae401 100644 --- a/cloudsmith_cli/core/api/user.py +++ b/cloudsmith_cli/core/api/user.py @@ -55,6 +55,17 @@ def get_user_token(login, password, totp_token=None, two_factor_token=None): raise +def create_user_token_saml() -> dict: + """Create a new user API token using SAML.""" + client = get_user_api() + + with catch_raise_api_exception(): + data, _, headers = client.user_tokens_create_with_http_info() + + ratelimits.maybe_rate_limit(client, headers) + return data + + def get_user_brief(): """Retrieve brief for current user (if any).""" client = get_user_api() @@ -64,3 +75,25 @@ def get_user_brief(): ratelimits.maybe_rate_limit(client, headers) return data.authenticated, data.slug, data.email, data.name + + +def list_user_tokens() -> list[dict]: + """List all user API tokens.""" + client = get_user_api() + + with catch_raise_api_exception(): + data, _, headers = client.user_tokens_list_with_http_info() + + ratelimits.maybe_rate_limit(client, headers) + return data.results + + +def refresh_user_token(token_slug: str) -> dict: + """Refresh user API token.""" + client = get_user_api() + + with catch_raise_api_exception(): + data, _, headers = client.user_tokens_refresh_with_http_info(token_slug) + + ratelimits.maybe_rate_limit(client, headers) + return data diff --git a/cloudsmith_cli/core/config.py b/cloudsmith_cli/core/config.py new file mode 100644 index 00000000..16044fb6 --- /dev/null +++ b/cloudsmith_cli/core/config.py @@ -0,0 +1,115 @@ +import collections +import stat + +import click + +from .utils import get_help_website + +ConfigValues = collections.namedtuple( + "ConfigValues", ["reader", "present", "mode", "data"] +) + + +def create_config_files(ctx, opts, api_key): + """Create default config files.""" + # pylint: disable=unused-argument + config_reader = opts.get_config_reader() + creds_reader = opts.get_creds_reader() + has_config = config_reader.has_default_file() + has_creds = creds_reader.has_default_file() + + if has_config and has_creds: + create = False + else: + click.echo() + create = click.confirm( + "No default config file(s) found, do you want to create them?" + ) + + click.echo() + if not create: + click.secho( + "For reference here are your default config file locations:", fg="yellow" + ) + else: + click.secho( + "Great! Let me just create your default configs for you now ...", fg="green" + ) + + configs = ( + ConfigValues(reader=config_reader, present=has_config, mode=None, data={}), + ConfigValues( + reader=creds_reader, + present=has_creds, + mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP, + data={"api_key": api_key}, + ), + ) + + has_errors = False + for config in configs: + click.echo( + "%(name)s config file: %(filepath)s ... " + % { + "name": click.style(config.reader.config_name.capitalize(), bold=True), + "filepath": click.style( + config.reader.get_default_filepath(), fg="magenta" + ), + }, + nl=False, + ) + + if not config.present and create: + try: + ok = config.reader.create_default_file( + data=config.data, mode=config.mode + ) + except OSError as exc: + ok = False + error_message = exc.strerror + has_errors = True + + if ok: + click.secho("CREATED", fg="green") + else: + click.secho("ERROR", fg="red") + click.secho( + "The following error occurred while trying to " + "create the file: %(message)s" + % {"message": click.style(error_message, fg="red")} + ) + continue + + click.secho("EXISTS" if config.present else "NOT CREATED", fg="yellow") + + return create, has_errors + + +def new_config_messaging(has_errors, opts, create, api_key): + """Provide messaging to user after generating new configs""" + if has_errors: + click.echo() + click.secho("Oops, please fix the errors and try again!", fg="red") + return + + if opts.api_key != api_key: + click.echo() + if opts.api_key: + click.secho( + "Note: The above API key doesn't match what you have in " + "your default credentials config file.", + fg="yellow", + ) + elif not create: + click.secho( + "Note: Don't forget to put your API key in a config file, " + "export it on the environment, or set it via -k.", + fg="yellow", + ) + click.secho( + "If you need more help please see the documentation: " + "%(website)s" % {"website": click.style(get_help_website(), bold=True)} + ) + click.echo() + + click.secho("You're ready to rock, let's start automating!", fg="green") diff --git a/requirements.txt b/requirements.txt index 9c9d8409..e82dc971 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,19 +34,17 @@ click-didyoumean==0.3.1 # via cloudsmith-cli (setup.py) click-spinner==0.1.10 # via cloudsmith-cli (setup.py) -cloudsmith-api==2.0.16 +cloudsmith-api==2.0.18 # via cloudsmith-cli (setup.py) configparser==7.1.0 # via click-configfile coverage[toml]==7.2.7 - # via - # coverage - # pytest-cov + # via pytest-cov dill==0.3.7 # via pylint distlib==0.3.7 # via virtualenv -exceptiongroup==1.1.2 +exceptiongroup==1.2.2 # via pytest filelock==3.12.2 # via virtualenv @@ -58,7 +56,7 @@ identify==2.5.26 # via pre-commit idna==3.8 # via requests -importlib-metadata==8.5.0 +importlib-metadata==8.7.0 # via keyring iniconfig==2.0.0 # via pytest @@ -123,7 +121,7 @@ six==1.16.0 # click-configfile # cloudsmith-api # python-dateutil -tomli==2.0.1 +tomli==2.2.1 # via # build # coverage @@ -133,7 +131,7 @@ tomli==2.0.1 # pytest tomlkit==0.12.1 # via pylint -typing-extensions==4.7.1 +typing-extensions==4.13.2 # via # astroid # pylint @@ -148,7 +146,7 @@ wheel==0.41.1 # via pip-tools wrapt==1.15.0 # via astroid -zipp==3.20.2 +zipp==3.21.0 # via importlib-metadata # The following packages are considered to be unsafe in a requirements file: diff --git a/setup.py b/setup.py index 07a7481b..4bba7796 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ def get_long_description(): "click-configfile>=0.2.3", "click-didyoumean>=0.0.3", "click-spinner>=0.1.7", - "cloudsmith-api>=2.0.17,<3.0", # Compatible upto (but excluding) 3.0+ + "cloudsmith-api>=2.0.18,<3.0", # Compatible upto (but excluding) 3.0+ "keyring>=25.4.1", "requests>=2.18.4", "requests_toolbelt>=0.8.0",