Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ The CLI currently supports the following commands (and sub-commands):
- `remove`|`rm`: Remove tags from a package in a repository.
- `replace`: Replace all existing (non-immutable) tags on a package in a repository.
- `whoami`: Retrieve your current authentication status.

- `tokens`: Manage API tokens.
- `list`|`ls`: List API tokens.
- `refresh`: Refresh an API token.

## Installation

Expand Down
1 change: 1 addition & 0 deletions cloudsmith_cli/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
resync,
status,
tags,
tokens,
upstream,
whoami,
)
56 changes: 55 additions & 1 deletion cloudsmith_cli/cli/commands/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@

import click

from ...core.api import exceptions, user
from ...core.api.init import initialise_api
from ...core.config import create_config_files, new_config_messaging
from .. import decorators, validators
from ..exceptions import handle_api_exceptions
from ..saml import create_configured_session, get_idp_url
from ..utils import maybe_spinner
from ..webserver import AuthenticationWebRequestHandler, AuthenticationWebServer
from .main import main

Expand All @@ -21,11 +25,18 @@
prompt=True,
help="The name of the Cloudsmith organization to authenticate with.",
)
@click.option(
"-t",
"--token",
default=False,
is_flag=True,
help="Retrieve a user API token after successful authentication.",
)
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.initialise_api
@click.pass_context
def authenticate(ctx, opts, owner):
def authenticate(ctx, opts, owner, token):
"""Authenticate to Cloudsmith using the org's SAML setup."""
owner = owner[0].strip("'[]'")
api_host = opts.api_config.host
Expand Down Expand Up @@ -58,3 +69,46 @@ def authenticate(ctx, opts, owner):
debug=opts.debug,
)
auth_server.handle_request()

if not token:
return

initialise_api()
try:
api_token = user.create_user_token_saml()
click.echo(f"New token value: {click.style(api_token.key, fg='magenta')}")
create, has_errors = create_config_files(ctx, opts, api_key=api_token.key)
new_config_messaging(has_errors, opts, create, api_key=api_token.key)
return
except exceptions.ApiException as exc:
if exc.status == 400:
if "User has already created an API key" in exc.detail:
click.confirm(
"User already has a token. Would you like to recreate it?",
abort=True,
)
else:
raise

context_msg = "Failed to refresh the token!"
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
api_tokens = user.list_user_tokens()
for t in api_tokens:
click.echo("Current tokens:")
click.echo(
f"Token: {click.style(t.key, fg='magenta')}, "
f"Created: {click.style(t.created, fg='green')}, "
f"slug_perm: {click.style(t.slug_perm, fg='cyan')}"
)
token_slug = click.prompt(
"Please enter the slug_perm of the token you would like to refresh"
)

click.echo(f"Refreshing token {token_slug}... ", nl=False)
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
with maybe_spinner(opts):
new_token = user.refresh_user_token(token_slug)
click.secho("OK", fg="green")
click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}")
create, has_errors = create_config_files(ctx, opts, api_key=new_token.key)
new_config_messaging(has_errors, opts, create, api_key=new_token.key)
112 changes: 2 additions & 110 deletions cloudsmith_cli/cli/commands/login.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
"""CLI/Commands - Get an API token."""

import collections
import stat

import click
import cloudsmith_api

from ...core.api.exceptions import TwoFactorRequiredException
from ...core.api.user import get_user_token
from ...core.utils import get_help_website
from ...core.config import create_config_files, new_config_messaging
from .. import decorators
from ..exceptions import handle_api_exceptions
from ..utils import maybe_spinner
from .main import main

ConfigValues = collections.namedtuple(
"ConfigValues", ["reader", "present", "mode", "data"]
)


def validate_login(ctx, param, value):
"""Ensure that login is not blank."""
Expand All @@ -28,81 +21,6 @@ def validate_login(ctx, param, value):
return value


def create_config_files(ctx, opts, api_key):
"""Create default config files."""
# pylint: disable=unused-argument
config_reader = opts.get_config_reader()
creds_reader = opts.get_creds_reader()
has_config = config_reader.has_default_file()
has_creds = creds_reader.has_default_file()

if has_config and has_creds:
create = False
else:
click.echo()
create = click.confirm(
"No default config file(s) found, do you want to create them?"
)

click.echo()
if not create:
click.secho(
"For reference here are your default config file locations:", fg="yellow"
)
else:
click.secho(
"Great! Let me just create your default configs for you now ...", fg="green"
)

configs = (
ConfigValues(reader=config_reader, present=has_config, mode=None, data={}),
ConfigValues(
reader=creds_reader,
present=has_creds,
mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP,
data={"api_key": api_key},
),
)

has_errors = False
for config in configs:
click.echo(
"%(name)s config file: %(filepath)s ... "
% {
"name": click.style(config.reader.config_name.capitalize(), bold=True),
"filepath": click.style(
config.reader.get_default_filepath(), fg="magenta"
),
},
nl=False,
)

if not config.present and create:
try:
ok = config.reader.create_default_file(
data=config.data, mode=config.mode
)
except OSError as exc:
ok = False
error_message = exc.strerror
has_errors = True

if ok:
click.secho("CREATED", fg="green")
else:
click.secho("ERROR", fg="red")
click.secho(
"The following error occurred while trying to "
"create the file: %(message)s"
% {"message": click.style(error_message, fg="red")}
)
continue

click.secho("EXISTS" if config.present else "NOT CREATED", fg="yellow")

return create, has_errors


@main.command(aliases=["token"])
@click.option(
"-l",
Expand Down Expand Up @@ -170,30 +88,4 @@ def login(ctx, opts, login, password): # pylint: disable=redefined-outer-name
)

create, has_errors = create_config_files(ctx, opts, api_key=api_key)

if has_errors:
click.echo()
click.secho("Oops, please fix the errors and try again!", fg="red")
return

if opts.api_key != api_key:
click.echo()
if opts.api_key:
click.secho(
"Note: The above API key doesn't match what you have in "
"your default credentials config file.",
fg="yellow",
)
elif not create:
click.secho(
"Note: Don't forget to put your API key in a config file, "
"export it on the environment, or set it via -k.",
fg="yellow",
)
click.secho(
"If you need more help please see the documentation: "
"%(website)s" % {"website": click.style(get_help_website(), bold=True)}
)
click.echo()

click.secho("You're ready to rock, let's start automating!", fg="green")
new_config_messaging(has_errors, opts, create, api_key=api_key)
84 changes: 84 additions & 0 deletions cloudsmith_cli/cli/commands/tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import click

from ...core.api import user as api
from .. import command, decorators
from ..exceptions import handle_api_exceptions
from ..utils import maybe_print_as_json, maybe_spinner
from .main import main


@main.group(cls=command.AliasGroup, name="tokens")
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@click.pass_context
def tokens(ctx, opts):
"""Manage your user API tokens."""


@tokens.command(name="list", aliases=["ls"])
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.pass_context
def list_tokens(ctx, opts):
"""List all user API tokens."""
use_stderr = opts.output in ("json", "pretty_json")

click.echo("Retrieving API tokens... ", nl=False, err=use_stderr)

context_msg = "Failed to retrieve API tokens!"
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
with maybe_spinner(opts):
tokens = api.list_user_tokens()
click.secho("OK", fg="green", err=use_stderr)

if maybe_print_as_json(opts, tokens):
return

print_tokens(tokens)


@tokens.command()
@click.argument(
"token_slug",
required=False,
)
@decorators.common_cli_config_options
@decorators.common_cli_output_options
@decorators.common_api_auth_options
@decorators.initialise_api
@click.pass_context
def refresh(ctx, opts, token_slug):
"""Refresh a specific API token by its slug."""
context_msg = "Failed to refresh the token!"

if not token_slug:
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
with maybe_spinner(opts):
api_tokens = api.list_user_tokens()
click.echo("Current tokens:")
print_tokens(api_tokens)
token_slug = click.prompt(
"Please enter the slug_perm of the token you would like to refresh"
)

click.echo(f"Refreshing token {token_slug}... ", nl=False)
with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg):
with maybe_spinner(opts):
new_token = api.refresh_user_token(token_slug)
click.secho("OK", fg="green")

if maybe_print_as_json(opts, new_token):
return

click.echo(f"New token value: {click.style(new_token.key, fg='magenta')}")


def print_tokens(tokens):
for token in tokens:
click.echo(
f"Token: {click.style(token.key, fg='magenta')}, "
f"Created: {click.style(token.created, fg='green')}, "
f"slug_perm: {click.style(token.slug_perm, fg='cyan')}"
)
Loading