diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index 8f629729..3b69afeb 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -4,6 +4,7 @@ auth, check, copy, + credential_helper, delete, dependencies, docs, diff --git a/cloudsmith_cli/cli/commands/credential_helper/__init__.py b/cloudsmith_cli/cli/commands/credential_helper/__init__.py new file mode 100644 index 00000000..c6d8ef07 --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/__init__.py @@ -0,0 +1,46 @@ +""" +Credential helper commands for Cloudsmith. + +This module provides credential helper commands for various package managers +(Docker, pip, npm, etc.) that follow their respective credential helper protocols. +""" + +import click + +from ..main import main +from .cargo import cargo as cargo_cmd +from .docker import docker as docker_cmd +from .npm import npm as npm_cmd +from .nuget import nuget as nuget_cmd +from .terraform import terraform as terraform_cmd + + +@click.group() +def credential_helper(): + """ + Credential helpers for package managers. + + These commands provide credentials for package managers like Docker, pip, + npm, Terraform, Cargo, and NuGet. They are typically called by + wrapper binaries (e.g., docker-credential-cloudsmith) or used directly + for debugging. + + Examples: + # Test Docker credential helper + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + + # Test Terraform credential helper + $ cloudsmith credential-helper terraform terraform.cloudsmith.io + + # Test npm/pnpm token helper + $ cloudsmith credential-helper npm + """ + + +credential_helper.add_command(cargo_cmd, name="cargo") +credential_helper.add_command(docker_cmd, name="docker") +credential_helper.add_command(npm_cmd, name="npm") +credential_helper.add_command(nuget_cmd, name="nuget") +credential_helper.add_command(terraform_cmd, name="terraform") + +main.add_command(credential_helper, name="credential-helper") diff --git a/cloudsmith_cli/cli/commands/credential_helper/cargo.py b/cloudsmith_cli/cli/commands/credential_helper/cargo.py new file mode 100644 index 00000000..f94c48c4 --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/cargo.py @@ -0,0 +1,146 @@ +""" +Cargo credential helper command. + +Implements credential retrieval for Cargo registries hosted on Cloudsmith. + +See: https://doc.rust-lang.org/cargo/reference/credential-provider-protocol.html +""" + +import json +import sys + +import click + +from ....credential_helpers.cargo import get_credentials + + +@click.command() +@click.option( + "--cargo-plugin", + is_flag=True, + default=False, + help="Run in Cargo credential provider plugin mode (JSON-line protocol).", +) +@click.argument("index_url", required=False, default=None) +def cargo(cargo_plugin, index_url): + """ + Cargo credential helper for Cloudsmith registries. + + Returns credentials as a Bearer token for Cargo sparse registries. + + If --cargo-plugin is passed, runs the full Cargo credential provider + JSON-line protocol (hello handshake, request/response). This is used + by the cargo-credential-cloudsmith wrapper binary. + + If INDEX_URL is provided as an argument, uses it directly. + Otherwise reads from stdin. + + Examples: + # Direct usage + $ cloudsmith credential-helper cargo sparse+https://cargo.cloudsmith.io/org/repo/ + Bearer eyJ0eXAiOiJKV1Qi... + + # Via wrapper (called by Cargo) + $ cargo-credential-cloudsmith --cargo-plugin + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for OIDC) + CLOUDSMITH_SERVICE_SLUG: Service account slug (required for OIDC) + """ + if cargo_plugin: + _run_cargo_plugin() + return + + try: + if not index_url: + index_url = sys.stdin.read().strip() + + if not index_url: + click.echo("Error: No index URL provided", err=True) + sys.exit(1) + + token = get_credentials(index_url, debug=False) + + if not token: + click.echo( + "Error: Unable to retrieve credentials. " + "Set CLOUDSMITH_API_KEY or configure OIDC.", + err=True, + ) + sys.exit(1) + + click.echo(json.dumps({"token": f"Bearer {token}"})) + + except Exception as e: # pylint: disable=broad-exception-caught + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +def _run_cargo_plugin(): + """ + Run the full Cargo credential provider JSON-line protocol. + + See: https://doc.rust-lang.org/cargo/reference/credential-provider-protocol.html + """ + hello = {"v": [1]} + sys.stdout.write(json.dumps(hello) + "\n") + sys.stdout.flush() + + try: + line = sys.stdin.readline() + if not line: + sys.exit(0) + + request = json.loads(line) + except (json.JSONDecodeError, EOFError): + sys.exit(1) + + kind = request.get("kind", "") + registry = request.get("registry", {}) + index_url = registry.get("index-url", "") + + if kind == "get": + _handle_get(index_url) + elif kind in ("login", "logout"): + _handle_unsupported(kind) + else: + _write_error("operation-not-supported", f"Unknown operation: {kind}") + + +def _handle_get(index_url): + """Handle a 'get' request from Cargo.""" + token = get_credentials(index_url) + if not token: + _write_error( + "not-found", + "No credentials available. Set CLOUDSMITH_API_KEY or configure OIDC.", + ) + return + + response = { + "Ok": { + "kind": "get", + "token": f"Bearer {token}", + "cache": "session", + "operation_independent": True, + } + } + sys.stdout.write(json.dumps(response) + "\n") + sys.stdout.flush() + + +def _handle_unsupported(kind): + """Handle unsupported operations (login/logout).""" + _write_error( + "operation-not-supported", + f"Operation '{kind}' is not supported. " + "Credentials are managed by the Cloudsmith credential chain.", + ) + + +def _write_error(kind, message): + """Write an error response in Cargo JSON-line format.""" + response = {"Err": {"kind": kind, "message": message}} + sys.stdout.write(json.dumps(response) + "\n") + sys.stdout.flush() diff --git a/cloudsmith_cli/cli/commands/credential_helper/docker.py b/cloudsmith_cli/cli/commands/credential_helper/docker.py new file mode 100644 index 00000000..1e1c33da --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/docker.py @@ -0,0 +1,73 @@ +""" +Docker credential helper command. + +Implements the Docker credential helper protocol for Cloudsmith registries. + +See: https://github.com/docker/docker-credential-helpers +""" + +import json +import sys + +import click + +from ....credential_helpers.docker import get_credentials + + +@click.command() +def docker(): + """ + Docker credential helper for Cloudsmith registries. + + Reads a Docker registry server URL from stdin and returns credentials in JSON format. + This command implements the 'get' operation of the Docker credential helper protocol. + + Only provides credentials for Cloudsmith Docker registries (docker.cloudsmith.io). + + Input (stdin): + Server URL as plain text (e.g., "docker.cloudsmith.io") + + Output (stdout): + JSON: {"Username": "token", "Secret": ""} + + Exit codes: + 0: Success + 1: Error (no credentials available, not a Cloudsmith registry, etc.) + + Examples: + # Manual testing + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + # Called by Docker via wrapper + $ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for OIDC) + CLOUDSMITH_SERVICE_SLUG: Service account slug (required for OIDC) + """ + try: + server_url = sys.stdin.read().strip() + + if not server_url: + click.echo("Error: No server URL provided on stdin", err=True) + sys.exit(1) + + credentials = get_credentials(server_url, debug=False) + + if not credentials: + click.echo( + "Error: Unable to retrieve credentials. " + "Make sure you have either CLOUDSMITH_API_KEY set, " + "or CLOUDSMITH_ORG + CLOUDSMITH_SERVICE_SLUG for OIDC authentication.", + err=True, + ) + sys.exit(1) + + click.echo(json.dumps(credentials)) + + except Exception as e: # pylint: disable=broad-exception-caught + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/cli/commands/credential_helper/npm.py b/cloudsmith_cli/cli/commands/credential_helper/npm.py new file mode 100644 index 00000000..a37010ef --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/npm.py @@ -0,0 +1,58 @@ +""" +npm/pnpm token helper command. + +Prints a raw Cloudsmith API token for use with pnpm's tokenHelper. + +See: https://pnpm.io/npmrc#url-tokenhelper +""" + +import sys + +import click + +from ....credential_helpers.npm import get_token + + +@click.command() +def npm(): + """ + npm/pnpm token helper for Cloudsmith registries. + + Prints a raw API token to stdout for use with pnpm's tokenHelper configuration. + + Examples: + # Direct usage + $ cloudsmith credential-helper npm + eyJ0eXAiOiJKV1Qi... + + # Via wrapper (called by pnpm) + $ npm-credentials-cloudsmith + eyJ0eXAiOiJKV1Qi... + + Configuration in ~/.npmrc: + //npm.cloudsmith.io/:tokenHelper=/absolute/path/to/npm-credentials-cloudsmith + + Find the path with: which npm-credentials-cloudsmith + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for OIDC) + CLOUDSMITH_SERVICE_SLUG: Service account slug (required for OIDC) + """ + try: + token = get_token(debug=False) + + if not token: + click.echo( + "Error: Unable to retrieve credentials. " + "Set CLOUDSMITH_API_KEY or configure OIDC.", + err=True, + ) + sys.exit(1) + + sys.stdout.write(token) + sys.stdout.flush() + + except Exception as e: # pylint: disable=broad-exception-caught + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/cli/commands/credential_helper/nuget.py b/cloudsmith_cli/cli/commands/credential_helper/nuget.py new file mode 100644 index 00000000..8fcb0eca --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/nuget.py @@ -0,0 +1,64 @@ +""" +NuGet credential helper command. + +Implements credential retrieval for NuGet feeds hosted on Cloudsmith. + +See: https://learn.microsoft.com/en-us/nuget/reference/extensibility/nuget-exe-credential-providers +""" + +import json +import sys + +import click + +from ....credential_helpers.nuget import get_credentials + + +@click.command() +@click.argument("uri", required=False, default=None) +def nuget(uri): + """ + NuGet credential helper for Cloudsmith feeds. + + Returns credentials in NuGet's expected JSON format: + {"Username": "token", "Password": "...", "Message": ""} + + If URI is provided as an argument, uses it directly. + Otherwise reads from stdin. + + Examples: + # Direct usage + $ cloudsmith credential-helper nuget https://nuget.cloudsmith.io/org/repo/v3/index.json + {"Username":"token","Password":"eyJ0eXAiOiJKV1Qi...","Message":""} + + # Via wrapper (called by NuGet) + $ CredentialProvider.Cloudsmith -uri https://nuget.cloudsmith.io/org/repo/v3/index.json + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for OIDC) + CLOUDSMITH_SERVICE_SLUG: Service account slug (required for OIDC) + """ + try: + if not uri: + uri = sys.stdin.read().strip() + + if not uri: + click.echo("Error: No URI provided", err=True) + sys.exit(1) + + credentials = get_credentials(uri, debug=False) + + if not credentials: + click.echo( + "Error: Unable to retrieve credentials. " + "Set CLOUDSMITH_API_KEY or configure OIDC.", + err=True, + ) + sys.exit(1) + + click.echo(json.dumps({**credentials, "Message": ""})) + + except Exception as e: # pylint: disable=broad-exception-caught + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/cli/commands/credential_helper/terraform.py b/cloudsmith_cli/cli/commands/credential_helper/terraform.py new file mode 100644 index 00000000..fa9781dc --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/terraform.py @@ -0,0 +1,61 @@ +""" +Terraform credential helper command. + +Implements the Terraform credential helper protocol for Cloudsmith registries. + +See: https://developer.hashicorp.com/terraform/internals/credentials-helpers +""" + +import json +import sys + +import click + +from ....credential_helpers.terraform import get_credentials + + +@click.command() +@click.argument("hostname", required=False, default=None) +def terraform(hostname): + """ + Terraform credential helper for Cloudsmith registries. + + Returns credentials in Terraform's expected JSON format: {"token": "..."} + + Can be used directly or via the terraform-credentials-cloudsmith wrapper. + + If HOSTNAME is provided as an argument, uses it directly. + Otherwise reads from stdin (for use with the wrapper binary). + + Examples: + # Direct usage + $ cloudsmith credential-helper terraform terraform.cloudsmith.io + {"token":"eyJ0eXAiOiJKV1Qi..."} + + # Via wrapper + $ terraform-credentials-cloudsmith get terraform.cloudsmith.io + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for OIDC) + CLOUDSMITH_SERVICE_SLUG: Service account slug (required for OIDC) + """ + try: + if not hostname: + hostname = sys.stdin.read().strip() + + if not hostname: + click.echo("Error: No hostname provided", err=True) + sys.exit(1) + + token = get_credentials(hostname, debug=False) + + if not token: + click.echo("{}") + sys.exit(0) + + click.echo(json.dumps({"token": token})) + + except Exception as e: # pylint: disable=broad-exception-caught + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/core/credentials/__init__.py b/cloudsmith_cli/core/credentials/__init__.py index 9e3c7831..e143e7f1 100644 --- a/cloudsmith_cli/core/credentials/__init__.py +++ b/cloudsmith_cli/core/credentials/__init__.py @@ -51,10 +51,29 @@ def resolve(self, context: CredentialContext) -> CredentialResult | None: class CredentialProviderChain: - """Evaluates credential providers in order, returning the first valid result.""" - - def __init__(self, providers: list[CredentialProvider]): - self.providers = providers + """Evaluates credential providers in order, returning the first valid result. + + If no providers are given, uses the default chain: + EnvironmentVariable → ConfigFile → Keyring → OIDC. + """ + + def __init__(self, providers: list[CredentialProvider] | None = None): + if providers is not None: + self.providers = providers + else: + from .providers import ( + ConfigFileProvider, + EnvironmentVariableProvider, + KeyringProvider, + OidcProvider, + ) + + self.providers = [ + EnvironmentVariableProvider(), + ConfigFileProvider(), + KeyringProvider(), + OidcProvider(), + ] def resolve(self, context: CredentialContext) -> CredentialResult | None: """Evaluate each provider in order. Return the first successful result.""" diff --git a/cloudsmith_cli/core/credentials/providers.py b/cloudsmith_cli/core/credentials/providers.py index 6d9f907a..ccb6fb95 100644 --- a/cloudsmith_cli/core/credentials/providers.py +++ b/cloudsmith_cli/core/credentials/providers.py @@ -175,6 +175,18 @@ def resolve( # pylint: disable=too-many-return-statements logger.debug("OidcProvider: No CI/CD environment detected, skipping") return None + # Check cache before retrieving vendor token (avoids expensive STS calls) + from .oidc.cache import get_cached_token, store_cached_token + + cached = get_cached_token(context.api_host, org, service_slug) + if cached: + logger.debug("OidcProvider: Using cached OIDC token") + return CredentialResult( + api_key=cached, + source_name="oidc", + source_detail=f"OIDC via {detector.name} [cached] (org: {org}, service: {service_slug})", + ) + try: vendor_token = detector.get_token() except Exception: # pylint: disable=broad-exception-caught @@ -192,18 +204,6 @@ def resolve( # pylint: disable=too-many-return-statements ) return None - # Check cache before doing a full exchange - from .oidc.cache import get_cached_token, store_cached_token - - cached = get_cached_token(context.api_host, org, service_slug) - if cached: - logger.debug("OidcProvider: Using cached OIDC token") - return CredentialResult( - api_key=cached, - source_name="oidc", - source_detail=f"OIDC via {detector.name} [cached] (org: {org}, service: {service_slug})", - ) - # Exchange vendor JWT for Cloudsmith token try: cloudsmith_token = exchange_oidc_token( diff --git a/cloudsmith_cli/core/tests/test_credentials.py b/cloudsmith_cli/core/tests/test_credentials.py index 3834ff8b..b0ffbd02 100644 --- a/cloudsmith_cli/core/tests/test_credentials.py +++ b/cloudsmith_cli/core/tests/test_credentials.py @@ -846,6 +846,7 @@ def test_oidc_provider_uses_cache(self, mock_detect, mock_exchange, tmp_path): env = { "CLOUDSMITH_ORG": "myorg", "CLOUDSMITH_SERVICE_SLUG": "mysvc", + "CLOUDSMITH_NO_KEYRING": "1", } with patch( @@ -864,7 +865,8 @@ def test_oidc_provider_uses_cache(self, mock_detect, mock_exchange, tmp_path): assert result is not None assert result.api_key == cached_token assert "[cached]" in result.source_detail - # exchange should NOT have been called + # Neither get_token nor exchange should have been called + mock_detector.get_token.assert_not_called() mock_exchange.assert_not_called() @patch("cloudsmith_cli.core.credentials.oidc.exchange.exchange_oidc_token") @@ -887,6 +889,7 @@ def test_oidc_provider_exchanges_when_cache_expired( env = { "CLOUDSMITH_ORG": "myorg", "CLOUDSMITH_SERVICE_SLUG": "mysvc", + "CLOUDSMITH_NO_KEYRING": "1", } with patch( diff --git a/cloudsmith_cli/credential_helpers/__init__.py b/cloudsmith_cli/credential_helpers/__init__.py new file mode 100644 index 00000000..6e6715ce --- /dev/null +++ b/cloudsmith_cli/credential_helpers/__init__.py @@ -0,0 +1,6 @@ +""" +Credential helpers for various package managers. + +This package provides credential helper implementations for Docker, pip, npm, etc. +Each helper follows its respective package manager's credential helper protocol. +""" diff --git a/cloudsmith_cli/credential_helpers/cargo/__init__.py b/cloudsmith_cli/credential_helpers/cargo/__init__.py new file mode 100644 index 00000000..ccdec3dc --- /dev/null +++ b/cloudsmith_cli/credential_helpers/cargo/__init__.py @@ -0,0 +1,32 @@ +""" +Cargo credential provider logic for Cloudsmith. + +This module provides functions for retrieving credentials for Cargo registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain, resolve_credentials + + +def get_credentials(index_url, debug=False): + """ + Get a token for a Cloudsmith Cargo registry. + + Resolves credentials first, then verifies the URL is a Cloudsmith registry + (including custom domains, authenticated via the resolved token). + + Args: + index_url: The registry index URL (e.g., "sparse+https://cargo.cloudsmith.io/org/repo/") + debug: Enable debug logging + + Returns: + str: API token or None if not available + """ + result = resolve_credentials(debug) + if not result: + return None + + if not is_cloudsmith_domain(index_url, _credential_result=result): + return None + + return result.api_key diff --git a/cloudsmith_cli/credential_helpers/cargo/wrapper.py b/cloudsmith_cli/credential_helpers/cargo/wrapper.py new file mode 100644 index 00000000..c0d499f8 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/cargo/wrapper.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +""" +Wrapper for cargo-credential-cloudsmith. + +This is the entry point binary that Cargo calls to get registry credentials. +It delegates to: cloudsmith credential-helper cargo --cargo-plugin + +See: https://doc.rust-lang.org/cargo/reference/credential-provider-protocol.html + +Configure in ~/.cargo/config.toml: + [registry] + credential-provider = ["cargo-credential-cloudsmith"] + + # Or for a specific registry: + [registries.cloudsmith] + index = "sparse+https://cargo.cloudsmith.io/org/repo/" + credential-provider = ["cargo-credential-cloudsmith"] +""" +import subprocess +import sys + + +def main(): + """Cargo credential provider entry point. Delegates to cloudsmith credential-helper cargo.""" + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "cargo", "--cargo-plugin"], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/cloudsmith_cli/credential_helpers/common.py b/cloudsmith_cli/credential_helpers/common.py new file mode 100644 index 00000000..f5bf639f --- /dev/null +++ b/cloudsmith_cli/credential_helpers/common.py @@ -0,0 +1,195 @@ +""" +Shared utilities for credential helpers. + +Provides credential resolution and domain checking used by all helpers. +Networking configuration (proxy, TLS, headers) is read from the same +environment variables and config files used by the main CLI so that +credential helpers work correctly behind proxies and with custom certs. +""" + +import logging +import os + +from ..core.credentials import CredentialContext, CredentialProviderChain + +logger = logging.getLogger(__name__) + + +def _get_networking_config(): + """Read networking configuration from env vars and the CLI config file. + + Sources (in priority order): + 1. Environment variables: CLOUDSMITH_API_PROXY, + CLOUDSMITH_WITHOUT_API_SSL_VERIFY, CLOUDSMITH_API_USER_AGENT, + CLOUDSMITH_API_HEADERS + 2. CLI config file (config.ini): api_proxy, api_ssl_verify + + Returns: + dict with keys: proxy, ssl_verify, user_agent, headers + """ + proxy = os.environ.get("CLOUDSMITH_API_PROXY", "").strip() or None + user_agent = os.environ.get("CLOUDSMITH_API_USER_AGENT", "").strip() or None + + # SSL verify: env var is an opt-out flag (presence means disable) + ssl_verify_env = os.environ.get("CLOUDSMITH_WITHOUT_API_SSL_VERIFY", "").strip() + ssl_verify = ( + not ssl_verify_env.lower() in ("1", "true", "yes") if ssl_verify_env else True + ) + + # Parse extra headers from CSV "key=value,key2=value2" + headers = None + headers_env = os.environ.get("CLOUDSMITH_API_HEADERS", "").strip() + if headers_env: + headers = {} + for pair in headers_env.split(","): + if "=" in pair: + k, v = pair.split("=", 1) + headers[k.strip()] = v.strip() + + # Fall back to CLI config file for proxy and ssl_verify + if not proxy or ssl_verify is True: + try: + from ..cli.config import ConfigReader + + raw_config = ConfigReader.read_config() + defaults = raw_config.get("default", {}) + + if not proxy: + cfg_proxy = defaults.get("api_proxy", "").strip() + if cfg_proxy: + proxy = cfg_proxy + + # Only override ssl_verify if the env var wasn't explicitly set + if not ssl_verify_env: + cfg_ssl = defaults.get("api_ssl_verify", "true").strip().lower() + if cfg_ssl in ("0", "false", "no"): + ssl_verify = False + except Exception: # pylint: disable=broad-exception-caught + logger.debug("Failed to read CLI config for networking", exc_info=True) + + return { + "proxy": proxy, + "ssl_verify": ssl_verify, + "user_agent": user_agent, + "headers": headers, + } + + +def resolve_credentials(debug=False): + """ + Resolve Cloudsmith credentials using the provider chain. + + Tries providers in order: environment variables, config file, keyring, OIDC. + Networking configuration is read from env vars and the CLI config file so + that OIDC token exchange works behind proxies. + + Args: + debug: Enable debug logging + + Returns: + CredentialResult with .api_key, or None if no credentials available + """ + api_host = os.environ.get("CLOUDSMITH_API_HOST", "https://api.cloudsmith.io") + net = _get_networking_config() + + context = CredentialContext( + api_host=api_host, + debug=debug, + proxy=net["proxy"], + ssl_verify=net["ssl_verify"], + user_agent=net["user_agent"], + headers=net["headers"], + ) + + chain = CredentialProviderChain() + result = chain.resolve(context) + + if not result or not result.api_key: + return None + + return result + + +def extract_hostname(url): + """ + Extract bare hostname from any URL format. + + Handles protocols, sparse+ prefix, ports, paths, and trailing slashes. + + Args: + url: URL in any format (e.g., "sparse+https://cargo.cloudsmith.io/org/repo/") + + Returns: + str: Lowercase hostname (e.g., "cargo.cloudsmith.io") + """ + if not url: + return "" + + normalized = url.lower().strip() + + # Remove sparse+ prefix (Cargo) + if normalized.startswith("sparse+"): + normalized = normalized[7:] + + # Remove protocol + if "://" in normalized: + normalized = normalized.split("://", 1)[1] + + # Remove userinfo (user@host) + if "@" in normalized.split("/")[0]: + normalized = normalized.split("@", 1)[1] + + # Extract hostname (before first / or :) + hostname = normalized.split("/")[0].split(":")[0] + + return hostname + + +def is_cloudsmith_domain(url, _credential_result=None): + """ + Check if a URL points to a Cloudsmith service. + + Checks standard *.cloudsmith.io domains first (no auth needed). + If not a standard domain, uses the provided credential result (or + resolves credentials) and queries the Cloudsmith API for custom domains. + + Args: + url: URL or hostname to check + _credential_result: Pre-resolved CredentialResult to avoid duplicate + credential resolution. If None, resolves credentials internally. + + Returns: + bool: True if this is a Cloudsmith domain + """ + hostname = extract_hostname(url) + if not hostname: + return False + + # Standard Cloudsmith domains — no auth needed + if hostname.endswith("cloudsmith.io") or hostname == "cloudsmith.io": + return True + + # Custom domains require org + auth + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + if not org: + return False + + result = _credential_result or resolve_credentials() + if not result: + return False + + from .custom_domains import get_custom_domains_for_org + + api_host = os.environ.get("CLOUDSMITH_API_HOST", "https://api.cloudsmith.io") + net = _get_networking_config() + custom_domains = get_custom_domains_for_org( + org, + api_host, + result.api_key, + proxy=net["proxy"], + ssl_verify=net["ssl_verify"], + user_agent=net["user_agent"], + headers=net["headers"], + ) + + return hostname in [d.lower() for d in custom_domains] diff --git a/cloudsmith_cli/credential_helpers/conda/__init__.py b/cloudsmith_cli/credential_helpers/conda/__init__.py new file mode 100644 index 00000000..4ed50088 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/conda/__init__.py @@ -0,0 +1,32 @@ +""" +Conda auth handler logic for Cloudsmith. + +This module provides functions for retrieving credentials for Conda channels +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain, resolve_credentials + + +def get_credentials(url, debug=False): + """ + Get credentials for a Cloudsmith Conda channel. + + Resolves credentials first, then verifies the URL is a Cloudsmith channel + (including custom domains, authenticated via the resolved token). + + Args: + url: The channel URL (e.g., "https://conda.cloudsmith.io/org/repo/") + debug: Enable debug logging + + Returns: + tuple: (username, token) or None if not available + """ + result = resolve_credentials(debug) + if not result: + return None + + if not is_cloudsmith_domain(url, _credential_result=result): + return None + + return ("token", result.api_key) diff --git a/cloudsmith_cli/credential_helpers/conda/plugin.py b/cloudsmith_cli/credential_helpers/conda/plugin.py new file mode 100644 index 00000000..b8de76fa --- /dev/null +++ b/cloudsmith_cli/credential_helpers/conda/plugin.py @@ -0,0 +1,56 @@ +""" +Conda plugin for Cloudsmith authentication. + +This module implements the conda auth handler plugin interface, +providing automatic authentication for Cloudsmith Conda channels. + +See: https://docs.conda.io/projects/conda/en/stable/dev-guide/plugins/auth_handlers.html + +Install and configure in .condarc: + channel_settings: + - channel: https://conda.cloudsmith.io/org/repo/ + auth: cloudsmith +""" + +import logging + +from . import get_credentials + +logger = logging.getLogger(__name__) + +try: + from conda.plugins import hookimpl + from conda.plugins.types import CondaAuthHandler + from requests.auth import HTTPBasicAuth + + class CloudsmithCondaAuth(HTTPBasicAuth): + """Requests auth handler that injects Cloudsmith credentials via HTTP Basic Auth.""" + + def __init__(self, channel_name=None): + """Initialize with placeholder credentials (resolved per-request).""" + super().__init__("token", "") + self.channel_name = channel_name + + def __call__(self, request): + """Add authentication to the request.""" + creds = get_credentials(request.url) + if creds: + username, token = creds + self.username = username + self.password = token + logger.debug("Injected Cloudsmith basic auth for %s", request.url) + return super().__call__(request) + return request + + @hookimpl + def conda_auth_handlers(): + """Register the Cloudsmith auth handler with conda.""" + yield CondaAuthHandler( + name="cloudsmith", + handler=CloudsmithCondaAuth, + ) + +except ImportError: + # conda is not installed - plugin hooks won't be available + # This is fine when running outside of conda environments + logger.debug("conda not available, skipping plugin registration") diff --git a/cloudsmith_cli/credential_helpers/custom_domains.py b/cloudsmith_cli/credential_helpers/custom_domains.py new file mode 100644 index 00000000..df257f4a --- /dev/null +++ b/cloudsmith_cli/credential_helpers/custom_domains.py @@ -0,0 +1,257 @@ +""" +Helper for discovering Cloudsmith custom domains. + +This module provides functions to fetch custom domains from the Cloudsmith API +for use in credential helpers. Results are cached on the filesystem. +""" + +import json +import logging +import time +from pathlib import Path +from typing import List, Optional + +logger = logging.getLogger(__name__) + +# Cache custom domains for 1 hour +CACHE_TTL_SECONDS = 3600 + + +def get_cache_dir() -> Path: + """ + Get the cache directory for custom domains. + + Returns: + Path to cache directory (e.g., ~/.cloudsmith/cache/custom_domains/) + """ + home = Path.home() + cache_dir = home / ".cloudsmith" / "cache" / "custom_domains" + cache_dir.mkdir(parents=True, exist_ok=True) + return cache_dir + + +def get_cache_path(org: str) -> Path: + """ + Get the cache file path for an organization's custom domains. + + Args: + org: Organization slug + + Returns: + Path to cache file + """ + cache_dir = get_cache_dir() + safe_org = "".join(c if c.isalnum() or c in "-_" else "_" for c in org) + return cache_dir / f"{safe_org}.json" + + +def is_cache_valid(cache_path: Path) -> bool: + """ + Check if a cache file exists and is still valid. + + Args: + cache_path: Path to cache file + + Returns: + bool: True if cache exists and hasn't expired + """ + if not cache_path.exists(): + return False + + try: + mtime = cache_path.stat().st_mtime + age = time.time() - mtime + return age < CACHE_TTL_SECONDS + except OSError: + return False + + +def read_cache(cache_path: Path) -> Optional[List[str]]: + """ + Read custom domains from cache file. + + Args: + cache_path: Path to cache file + + Returns: + List of domain strings or None if cache invalid/missing + """ + if not is_cache_valid(cache_path): + return None + + try: + with open(cache_path, encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and "domains" in data: + domains = data["domains"] + if isinstance(domains, list): + logger.debug( + "Read %d domains from cache: %s", len(domains), cache_path + ) + return domains + except (OSError, json.JSONDecodeError) as exc: + logger.debug("Failed to read cache %s: %s", cache_path, exc) + + return None + + +def write_cache(cache_path: Path, domains: List[str]) -> None: + """ + Write custom domains to cache file. + + Args: + cache_path: Path to cache file + domains: List of domain strings to cache + """ + try: + data = { + "domains": domains, + "cached_at": time.time(), + } + with open(cache_path, "w", encoding="utf-8") as f: + json.dump(data, f) + logger.debug("Wrote %d domains to cache: %s", len(domains), cache_path) + except OSError as exc: + logger.debug("Failed to write cache %s: %s", cache_path, exc) + + +def _create_session( + api_key: str = None, + proxy: str = None, + ssl_verify: bool = True, + user_agent: str = None, + headers: dict = None, +): + """Create a requests session with networking and auth configuration. + + Mirrors the pattern used by EnvironmentDetector._create_session() and + exchange.create_exchange_session() so that custom-domain API calls + respect the same proxy / TLS / header settings as the rest of the CLI. + + Args: + api_key: Optional API key for Bearer auth. + proxy: HTTP/HTTPS proxy URL. + ssl_verify: Whether to verify SSL certificates. + user_agent: Custom User-Agent string. + headers: Additional headers to include. + + Returns: + Configured requests.Session instance. + """ + import requests + + session = requests.Session() + + if proxy: + session.proxies = {"http": proxy, "https": proxy} + + session.verify = ssl_verify + + if user_agent: + session.headers["User-Agent"] = user_agent + + if headers: + session.headers.update(headers) + + if api_key: + session.headers["Authorization"] = f"Bearer {api_key}" + + return session + + +def get_custom_domains_for_org( # pylint: disable=too-many-return-statements + org: str, + api_host: str = "https://api.cloudsmith.io", + api_key: str = None, + proxy: str = None, + ssl_verify: bool = True, + user_agent: str = None, + headers: dict = None, +) -> List[str]: + """ + Fetch custom domains for a Cloudsmith organization. + + Results are cached on the filesystem for 1 hour to avoid excessive API calls. + Networking settings (proxy, TLS, headers) are forwarded to the HTTP session + so custom-domain lookups work behind proxies and with custom certificates. + + Args: + org: Organization slug + api_host: Cloudsmith API host + api_key: Optional API key for authentication + proxy: HTTP/HTTPS proxy URL + ssl_verify: Whether to verify SSL certificates (default: True) + user_agent: Custom User-Agent string + headers: Additional headers to include + + Returns: + List of custom domain strings (e.g., ['docker.customer.com', 'dl.customer.com']) + Empty list if API call fails or org has no custom domains + + Example: + >>> domains = get_custom_domains_for_org('my-org', api_key='token') + >>> print(domains) + ['docker.acme.com', 'dl.acme.com'] + """ + cache_path = get_cache_path(org) + cached_domains = read_cache(cache_path) + if cached_domains is not None: + logger.debug("Using cached custom domains for %s", org) + return cached_domains + + logger.debug("Fetching custom domains from API for %s", org) + + try: + session = _create_session( + api_key=api_key, + proxy=proxy, + ssl_verify=ssl_verify, + user_agent=user_agent, + headers=headers, + ) + + url = f"{api_host}/orgs/{org}/custom-domains/" + + response = session.get(url, timeout=10) + + if response.status_code in (401, 403): + logger.debug( + "Custom domains API requires auth - assuming no custom domains for %s", + org, + ) + return [] # Don't cache 401/403 - might work later with auth + + if response.status_code == 404: + logger.debug("Organization %s not found or has no custom domains", org) + write_cache(cache_path, []) # Cache empty result to avoid repeated 404s + return [] + + if response.status_code != 200: + logger.debug( + "Failed to fetch custom domains for %s: HTTP %d", + org, + response.status_code, + ) + return [] + + data = response.json() + + # Expected format: [{"host": "docker.customer.com", ...}, ...] + domains = [] + if isinstance(data, list): + for item in data: + if isinstance(item, dict) and "host" in item: + domains.append(item["host"]) + + logger.debug("Fetched %d custom domains for %s", len(domains), org) + + write_cache(cache_path, domains) + + return domains + + except ImportError: + logger.debug("requests library not available, cannot fetch custom domains") + return [] + except Exception as exc: # pylint: disable=broad-exception-caught + logger.debug("Error fetching custom domains: %s", exc) + return [] diff --git a/cloudsmith_cli/credential_helpers/docker/__init__.py b/cloudsmith_cli/credential_helpers/docker/__init__.py new file mode 100644 index 00000000..28f5625a --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/__init__.py @@ -0,0 +1,32 @@ +""" +Docker credential helper logic for Cloudsmith. + +This module provides functions for retrieving credentials for Docker registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain, resolve_credentials + + +def get_credentials(server_url, debug=False): + """ + Get credentials for a Cloudsmith Docker registry. + + Resolves credentials first, then verifies the URL is a Cloudsmith registry + (including custom domains, authenticated via the resolved token). + + Args: + server_url: The Docker registry server URL + debug: Enable debug logging + + Returns: + dict: Credentials with 'Username' and 'Secret' keys, or None + """ + result = resolve_credentials(debug) + if not result: + return None + + if not is_cloudsmith_domain(server_url, _credential_result=result): + return None + + return {"Username": "token", "Secret": result.api_key} diff --git a/cloudsmith_cli/credential_helpers/docker/wrapper.py b/cloudsmith_cli/credential_helpers/docker/wrapper.py new file mode 100644 index 00000000..85f050ee --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/wrapper.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +""" +Wrapper for docker-credential-cloudsmith. + +This is the entry point binary that Docker calls. It delegates to the main +cloudsmith credential-helper docker command. + +See: https://github.com/docker/docker-credential-helpers + +Configure in ~/.docker/config.json: + { + "credHelpers": { + "docker.cloudsmith.io": "cloudsmith" + } + } +""" +import subprocess +import sys + + +def main(): + """ + Docker credential helper wrapper. + + Docker calls this with the operation as argv[1]: + - get: Retrieve credentials + - store: Store credentials (not supported) + - erase: Erase credentials (not supported) + - list: List credentials (not supported) + + We only support 'get' and delegate to: cloudsmith credential-helper docker + """ + if len(sys.argv) < 2: + print( + "Error: Missing operation argument. " + "Usage: docker-credential-cloudsmith ", + file=sys.stderr, + ) + sys.exit(1) + + operation = sys.argv[1] + + if operation == "get": + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "docker"], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + elif operation in ("store", "erase", "list"): + print( + f"Error: Operation '{operation}' is not supported. " + "Only 'get' is available for Cloudsmith credential helper.", + file=sys.stderr, + ) + sys.exit(1) + else: + print( + f"Error: Unknown operation '{operation}'. " + "Valid operations: get, store, erase, list", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/cloudsmith_cli/credential_helpers/npm/__init__.py b/cloudsmith_cli/credential_helpers/npm/__init__.py new file mode 100644 index 00000000..228c89c3 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/npm/__init__.py @@ -0,0 +1,27 @@ +""" +npm/pnpm token helper logic for Cloudsmith. + +This module provides functions for retrieving tokens for npm/pnpm registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import resolve_credentials + + +def get_token(debug=False): + """ + Get a Cloudsmith API token for npm/pnpm authentication. + + Returns the token string prefixed with "Bearer" for use with pnpm's tokenHelper. + + Args: + debug: Enable debug logging + + Returns: + str: "Bearer " or None if not available + """ + result = resolve_credentials(debug) + if not result: + return None + + return f"Bearer {result.api_key}" diff --git a/cloudsmith_cli/credential_helpers/npm/wrapper.py b/cloudsmith_cli/credential_helpers/npm/wrapper.py new file mode 100644 index 00000000..0abe8a23 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/npm/wrapper.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +""" +Wrapper for npm-credentials-cloudsmith (pnpm tokenHelper). + +This is the entry point binary that pnpm calls to get an auth token. +It delegates to: cloudsmith credential-helper npm + +See: https://pnpm.io/npmrc#url-tokenhelper + +Configure in user ~/.npmrc: + //npm.cloudsmith.io/:tokenHelper=/absolute/path/to/npm-credentials-cloudsmith + +Find the path with: which npm-credentials-cloudsmith +""" +import subprocess +import sys + + +def main(): + """pnpm tokenHelper entry point. Delegates to cloudsmith credential-helper npm.""" + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "npm"], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/cloudsmith_cli/credential_helpers/nuget/__init__.py b/cloudsmith_cli/credential_helpers/nuget/__init__.py new file mode 100644 index 00000000..25a8395b --- /dev/null +++ b/cloudsmith_cli/credential_helpers/nuget/__init__.py @@ -0,0 +1,32 @@ +""" +NuGet credential provider logic for Cloudsmith. + +This module provides functions for retrieving credentials for NuGet feeds +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain, resolve_credentials + + +def get_credentials(uri, debug=False): + """ + Get credentials for a Cloudsmith NuGet feed. + + Resolves credentials first, then verifies the URI is a Cloudsmith feed + (including custom domains, authenticated via the resolved token). + + Args: + uri: The NuGet package source URI + debug: Enable debug logging + + Returns: + dict: Credentials with 'Username' and 'Password' keys, or None + """ + result = resolve_credentials(debug) + if not result: + return None + + if not is_cloudsmith_domain(uri, _credential_result=result): + return None + + return {"Username": "token", "Password": result.api_key} diff --git a/cloudsmith_cli/credential_helpers/nuget/wrapper.py b/cloudsmith_cli/credential_helpers/nuget/wrapper.py new file mode 100644 index 00000000..5ed85298 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/nuget/wrapper.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +""" +Wrapper for CredentialProvider.Cloudsmith (NuGet credential provider). + +This is the entry point binary that NuGet calls to get feed credentials. +It delegates to: cloudsmith credential-helper nuget + +See: https://learn.microsoft.com/en-us/nuget/reference/extensibility/nuget-exe-credential-providers + +Configure by setting NUGET_CREDENTIALPROVIDERS_PATH to include the directory +containing this binary, or place it alongside nuget.exe. + + export NUGET_CREDENTIALPROVIDERS_PATH=/path/to/directory/ + +For dotnet CLI, place in one of the plugin search directories: + - ~/.nuget/plugins/netcore/CredentialProvider.Cloudsmith/ (Linux/macOS) + - %USERPROFILE%\\.nuget\\plugins\\netcore\\CredentialProvider.Cloudsmith\\ (Windows) + +Usage by NuGet: + CredentialProvider.Cloudsmith -uri [-isRetry] [-nonInteractive] + +Exit codes: + 0: Success (credentials returned as JSON on stdout) + 1: Provider not applicable for this URI + 2: Failure +""" +import argparse +import subprocess +import sys + + +def main(): + """NuGet credential provider entry point. Delegates to cloudsmith credential-helper nuget.""" + parser = argparse.ArgumentParser(description="Cloudsmith NuGet Credential Provider") + parser.add_argument("-uri", required=True, help="Package source URI") + parser.add_argument("-isRetry", action="store_true", help="Whether this is a retry") + parser.add_argument( + "-nonInteractive", action="store_true", help="Non-interactive mode" + ) + + args = parser.parse_args() + + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "nuget", args.uri], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/cloudsmith_cli/credential_helpers/pip/__init__.py b/cloudsmith_cli/credential_helpers/pip/__init__.py new file mode 100644 index 00000000..d76f2362 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/pip/__init__.py @@ -0,0 +1,134 @@ +""" +Pip keyring backend for Cloudsmith. + +This module provides a keyring backend that pip can use to automatically +authenticate with Cloudsmith Python repositories. + +The backend is auto-discovered by the keyring library and used by pip when +installing packages from Cloudsmith feeds. No configuration is needed beyond +installing cloudsmith-cli: + + pip install mypackage --index-url https://python.cloudsmith.io/org/repo/simple/ + +See: https://pip.pypa.io/en/stable/topics/authentication/#keyring-support +""" + +import logging + +import keyring.backend +import keyring.credentials + +from ..common import is_cloudsmith_domain, resolve_credentials + +logger = logging.getLogger(__name__) + + +class CloudsmithKeyringBackend(keyring.backend.KeyringBackend): + """ + Keyring backend for Cloudsmith Python repositories. + + This backend integrates with pip and twine to provide automatic authentication + for Cloudsmith Python feeds using the Cloudsmith credential provider chain. + + Supported URLs: + - Standard domains: *.cloudsmith.io + - Custom domains: Fetched from API + + Priority: + 9.9 (high priority - runs before system keychains) + + Usage: + Once installed, pip automatically uses this backend: + + $ pip install mypackage --index-url https://python.cloudsmith.io/myorg/myrepo/simple/ + + Requirements: + - Either CLOUDSMITH_API_KEY environment variable + - Or CLOUDSMITH_ORG + CLOUDSMITH_SERVICE_SLUG for OIDC authentication + - Or credentials in ~/.cloudsmith/config.ini + """ + + priority = 9.9 + _resolving = False + + def __init__(self): + """Initialize the backend with an in-memory cache.""" + super().__init__() + self._cache = {} + + def get_credential(self, service, username): + """ + Get credentials for a Cloudsmith service URL. + + Args: + service: URL of the Python repository + username: Username (ignored - we always use 'token') + + Returns: + keyring.credentials.SimpleCredential or None + """ + try: + # Reentrancy guard: the KeyringProvider in the credential chain + # calls keyring.get_password() which invokes this backend. Without + # this guard that would loop forever. + if CloudsmithKeyringBackend._resolving: + return None + CloudsmithKeyringBackend._resolving = True + + try: + result = resolve_credentials() + if not result: + logger.debug("No credentials resolved from provider chain") + return None + + if not is_cloudsmith_domain(service, _credential_result=result): + logger.debug("Not a Cloudsmith domain: %s", service) + return None + + logger.debug( + "Credentials resolved for %s via %s", service, result.source_name + ) + username = "token" + self._cache[(service, username)] = result.api_key + return keyring.credentials.SimpleCredential(username, result.api_key) + finally: + CloudsmithKeyringBackend._resolving = False + + except Exception as exc: # pylint: disable=broad-exception-caught + logger.debug("Error getting credentials: %s", exc, exc_info=True) + return None + + def get_password(self, service, username): + """ + Get password for a service and username. + + Args: + service: URL of the Python repository + username: Username to get password for + + Returns: + str: Password/API key or None + """ + password = self._cache.pop((service, username), None) + if password is not None: + return password + + creds = self.get_credential(service, None) + if creds and username == creds.username: + return creds.password + + return None + + def set_password(self, service, username, password): + """Not supported - credentials are dynamic.""" + raise NotImplementedError( + "CloudsmithKeyringBackend does not support storing passwords. " + "Use CLOUDSMITH_API_KEY environment variable or config file instead." + ) + + def delete_password(self, service, username): + """Not supported - credentials are dynamic.""" + raise NotImplementedError( + "CloudsmithKeyringBackend does not support deleting passwords. " + "Credentials are managed via environment variables or config files." + ) diff --git a/cloudsmith_cli/credential_helpers/terraform/__init__.py b/cloudsmith_cli/credential_helpers/terraform/__init__.py new file mode 100644 index 00000000..500656bb --- /dev/null +++ b/cloudsmith_cli/credential_helpers/terraform/__init__.py @@ -0,0 +1,74 @@ +""" +Terraform credential helper logic for Cloudsmith. + +This module provides functions for retrieving credentials for Terraform registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +import os + +from ..common import extract_hostname, is_cloudsmith_domain, resolve_credentials + + +def get_credentials(hostname, debug=False): + """ + Get a token for a Cloudsmith Terraform registry. + + Checks whether the hostname is a Cloudsmith domain before attempting + credential resolution, so non-Cloudsmith registries (e.g. + registry.terraform.io) are skipped silently. + + Terraform expects tokens in the format: + - Standard domains: org/repo/token + - Custom domains: repo/token (org is implicit in the domain) + + Args: + hostname: The registry hostname + debug: Enable debug logging + + Returns: + str: Token in format "org/repo/token" or "repo/token", or None + """ + bare_hostname = extract_hostname(hostname) + is_standard = bare_hostname and ( + bare_hostname.endswith("cloudsmith.io") or bare_hostname == "cloudsmith.io" + ) + + if not is_standard: + return _get_custom_domain_credentials(hostname, debug) + + return _get_standard_domain_credentials(debug) + + +def _get_custom_domain_credentials(hostname, debug): + """Get credentials for a custom (non-cloudsmith.io) domain.""" + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + if not org: + return None + + result = resolve_credentials(debug) + if not result: + return None + + if not is_cloudsmith_domain(hostname, _credential_result=result): + return None + + repo = os.environ.get("CLOUDSMITH_REPO", "").strip() + if not repo: + return None + return f"{repo}/{result.api_key}" + + +def _get_standard_domain_credentials(debug): + """Get credentials for a standard cloudsmith.io domain.""" + result = resolve_credentials(debug) + if not result: + return None + + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + repo = os.environ.get("CLOUDSMITH_REPO", "").strip() + + if not org or not repo: + return None + + return f"{org}/{repo}/{result.api_key}" diff --git a/cloudsmith_cli/credential_helpers/terraform/wrapper.py b/cloudsmith_cli/credential_helpers/terraform/wrapper.py new file mode 100644 index 00000000..548c0755 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/terraform/wrapper.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +""" +Wrapper for terraform-credentials-cloudsmith. + +This is the entry point binary that Terraform calls to get registry credentials. +It delegates to: cloudsmith credential-helper terraform + +See: https://developer.hashicorp.com/terraform/internals/credentials-helpers + +Configure in ~/.terraformrc: + credentials_helper "cloudsmith" { + args = [] + } + +Note: The binary must be discoverable by Terraform. Either symlink it into +a directory on your PATH as 'terraform-credentials-cloudsmith', or place it +in one of Terraform's default plugin search locations: + - ~/.terraform.d/plugins/ (Linux/macOS) + - %APPDATA%\\terraform.d\\plugins\\ (Windows) +""" +import subprocess +import sys + + +def main(): + """ + Terraform credential helper entry point. + + Terraform calls this with: + - get : Delegate to cloudsmith credential-helper terraform + - store : Not supported + - forget : Not supported + """ + if len(sys.argv) < 3: + print( + "Usage: terraform-credentials-cloudsmith ", + file=sys.stderr, + ) + sys.exit(1) + + operation = sys.argv[1] + hostname = sys.argv[2] + + if operation == "get": + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "terraform", hostname], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + + elif operation == "store": + # Read and discard stdin (Terraform sends JSON) + sys.stdin.read() + print( + "Error: Storing credentials is not supported. " + "Credentials are managed by the Cloudsmith credential chain.", + file=sys.stderr, + ) + sys.exit(1) + + elif operation == "forget": + print( + "Error: Forgetting credentials is not supported. " + "Credentials are managed by the Cloudsmith credential chain.", + file=sys.stderr, + ) + sys.exit(1) + + else: + print( + f"Error: Unknown operation '{operation}'. " + "Valid operations: get, store, forget", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 37101eb5..342b9c30 100644 --- a/setup.py +++ b/setup.py @@ -72,7 +72,20 @@ def get_long_description(): ], }, entry_points={ - "console_scripts": ["cloudsmith=cloudsmith_cli.cli.commands.main:main"] + "console_scripts": [ + "cloudsmith=cloudsmith_cli.cli.commands.main:main", + "docker-credential-cloudsmith=cloudsmith_cli.credential_helpers.docker.wrapper:main", + "terraform-credentials-cloudsmith=cloudsmith_cli.credential_helpers.terraform.wrapper:main", + "cargo-credential-cloudsmith=cloudsmith_cli.credential_helpers.cargo.wrapper:main", + "npm-credentials-cloudsmith=cloudsmith_cli.credential_helpers.npm.wrapper:main", + "CredentialProvider.Cloudsmith=cloudsmith_cli.credential_helpers.nuget.wrapper:main", + ], + "keyring.backends": [ + "cloudsmith=cloudsmith_cli.credential_helpers.pip:CloudsmithKeyringBackend", + ], + "conda": [ + "cloudsmith-auth=cloudsmith_cli.credential_helpers.conda.plugin", + ], }, keywords=["cloudsmith", "cli", "devops"], classifiers=[