diff --git a/src/uipath/_cli/_auth/_client_credentials.py b/src/uipath/_cli/_auth/_client_credentials.py new file mode 100644 index 000000000..bd542b0a7 --- /dev/null +++ b/src/uipath/_cli/_auth/_client_credentials.py @@ -0,0 +1,147 @@ +from typing import Optional +from urllib.parse import urlparse + +import httpx + +from .._utils._console import ConsoleLogger +from ._models import TokenData +from ._utils import parse_access_token, update_env_file + +console = ConsoleLogger() + + +class ClientCredentialsService: + """Service for client credentials authentication flow.""" + + def __init__(self, domain: str): + self.domain = domain + + def get_token_url(self) -> str: + """Get the token URL for the specified domain.""" + match self.domain: + case "alpha": + return "https://alpha.uipath.com/identity_/connect/token" + case "staging": + return "https://staging.uipath.com/identity_/connect/token" + case _: # cloud (default) + return "https://cloud.uipath.com/identity_/connect/token" + + def _is_valid_domain_or_subdomain(self, hostname: str, domain: str) -> bool: + """Check if hostname is either an exact match or a valid subdomain of the domain. + + Args: + hostname: The hostname to check + domain: The domain to validate against + + Returns: + True if hostname is valid domain or subdomain, False otherwise + """ + return hostname == domain or hostname.endswith(f".{domain}") + + def extract_domain_from_base_url(self, base_url: str) -> str: + """Extract domain from base URL. + + Args: + base_url: The base URL to extract domain from + + Returns: + The domain (alpha, staging, or cloud) + """ + try: + parsed = urlparse(base_url) + hostname = parsed.hostname + + if hostname: + match hostname: + case h if self._is_valid_domain_or_subdomain(h, "alpha.uipath.com"): + return "alpha" + case h if self._is_valid_domain_or_subdomain( + h, "staging.uipath.com" + ): + return "staging" + case h if self._is_valid_domain_or_subdomain(h, "cloud.uipath.com"): + return "cloud" + + # Default to cloud if we can't determine + return "cloud" + except Exception: + # Default to cloud if parsing fails + return "cloud" + + def authenticate( + self, client_id: str, client_secret: str, scope: str = "OR.Execution" + ) -> Optional[TokenData]: + """Authenticate using client credentials flow. + + Args: + client_id: The client ID for authentication + client_secret: The client secret for authentication + scope: The scope for the token (default: OR.Execution) + + Returns: + Token data if successful, None otherwise + """ + token_url = self.get_token_url() + + data = { + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + "scope": scope, + } + + try: + with httpx.Client(timeout=30.0) as client: + response = client.post(token_url, data=data) + + match response.status_code: + case 200: + token_data = response.json() + # Convert to our TokenData format + return { + "access_token": token_data["access_token"], + "token_type": token_data.get("token_type", "Bearer"), + "expires_in": token_data.get("expires_in", 3600), + "scope": token_data.get("scope", scope), + # Client credentials flow doesn't provide these, but we need them for compatibility + "refresh_token": "", + "id_token": "", + } + case 400: + console.error( + "Invalid client credentials or request parameters." + ) + return None + case 401: + console.error("Unauthorized: Invalid client credentials.") + return None + case _: + console.error( + f"Authentication failed: {response.status_code} - {response.text}" + ) + return None + + except httpx.RequestError as e: + console.error(f"Network error during authentication: {e}") + return None + except Exception as e: + console.error(f"Unexpected error during authentication: {e}") + return None + + def setup_environment(self, token_data: TokenData, base_url: str): + """Setup environment variables for client credentials authentication. + + Args: + token_data: The token data from authentication + base_url: The base URL for the UiPath instance + """ + parsed_access_token = parse_access_token(token_data["access_token"]) + + env_vars = { + "UIPATH_ACCESS_TOKEN": token_data["access_token"], + "UIPATH_URL": base_url, + "UIPATH_ORGANIZATION_ID": parsed_access_token.get("prt_id", ""), + "UIPATH_TENANT_ID": "", + } + + update_env_file(env_vars) diff --git a/src/uipath/_cli/cli_auth.py b/src/uipath/_cli/cli_auth.py index 5693e6bba..87850487f 100644 --- a/src/uipath/_cli/cli_auth.py +++ b/src/uipath/_cli/cli_auth.py @@ -9,6 +9,7 @@ from ..telemetry import track from ._auth._auth_server import HTTPServer +from ._auth._client_credentials import ClientCredentialsService from ._auth._oidc_utils import get_auth_config, get_auth_url from ._auth._portal_service import PortalService, select_tenant from ._auth._utils import update_auth_file, update_env_file @@ -64,9 +65,67 @@ def set_port(): required=False, help="Force new token", ) +@click.option( + "--client-id", + required=False, + help="Client ID for client credentials authentication (unattended mode)", +) +@click.option( + "--client-secret", + required=False, + help="Client secret for client credentials authentication (unattended mode)", +) +@click.option( + "--base-url", + required=False, + help="Base URL for the UiPath tenant instance (required for client credentials)", +) @track -def auth(domain, force: None | bool = False): - """Authenticate with UiPath Cloud Platform.""" +def auth( + domain, + force: None | bool = False, + client_id: str = None, + client_secret: str = None, + base_url: str = None, +): + """Authenticate with UiPath Cloud Platform. + + Interactive mode (default): Opens browser for OAuth authentication. + Unattended mode: Use --client-id, --client-secret and --base-url for client credentials flow. + """ + # Check if client credentials are provided for unattended authentication + if client_id and client_secret: + if not base_url: + console.error( + "--base-url is required when using client credentials authentication." + ) + return + + with console.spinner("Authenticating with client credentials ..."): + # Create service instance + credentials_service = ClientCredentialsService(domain) + + # If base_url is provided, extract domain from it to override the CLI domain parameter + if base_url: + extracted_domain = credentials_service.extract_domain_from_base_url( + base_url + ) + credentials_service.domain = extracted_domain + + token_data = credentials_service.authenticate(client_id, client_secret) + + if token_data: + credentials_service.setup_environment(token_data, base_url) + console.success( + "Client credentials authentication successful.", + ) + else: + console.error( + "Client credentials authentication failed. Please check your credentials.", + ) + return + + # Interactive authentication flow (existing logic) with console.spinner("Authenticating with UiPath ..."): portal_service = PortalService(domain)