diff --git a/cforge/commands/resources/prompts.py b/cforge/commands/resources/prompts.py index 3bb0276..2fd6e42 100644 --- a/cforge/commands/resources/prompts.py +++ b/cforge/commands/resources/prompts.py @@ -50,8 +50,7 @@ def prompts_list( print_table( prompts, "Prompts", - ["id", "name", "description", "arguments", "isActive"], - {"isActive": "enabled"}, + ["id", "name", "description", "arguments", "enabled"], ) else: console.print("[yellow]No prompts found[/yellow]") diff --git a/cforge/commands/settings/export.py b/cforge/commands/settings/export.py index 4e7c4dc..6db85e1 100644 --- a/cforge/commands/settings/export.py +++ b/cforge/commands/settings/export.py @@ -17,7 +17,7 @@ import typer # First-Party -from cforge.common import get_console, get_settings, make_authenticated_request +from cforge.common import get_base_url, get_console, make_authenticated_request def export( @@ -33,7 +33,7 @@ def export( console = get_console() try: - console.print(f"[cyan]Exporting configuration from gateway at http://{get_settings().host}:{get_settings().port}[/cyan]") + console.print(f"[cyan]Exporting configuration from gateway at {get_base_url()}[/cyan]") # Build API parameters params: Dict[str, Any] = {} diff --git a/cforge/commands/settings/login.py b/cforge/commands/settings/login.py index c16e4e6..a107d3d 100644 --- a/cforge/commands/settings/login.py +++ b/cforge/commands/settings/login.py @@ -12,7 +12,7 @@ import typer # First-Party -from cforge.common import get_console, get_settings, get_token_file, save_token +from cforge.common import get_base_url, get_console, get_token_file, save_token def login( @@ -28,7 +28,7 @@ def login( try: # Make login request - gateway_url = f"http://{get_settings().host}:{get_settings().port}" + gateway_url = get_base_url() full_url = f"{gateway_url}/auth/login" response = requests.post(full_url, json={"email": email, "password": password}) diff --git a/cforge/commands/settings/profiles.py b/cforge/commands/settings/profiles.py new file mode 100644 index 0000000..91fd0af --- /dev/null +++ b/cforge/commands/settings/profiles.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +"""Location: ./cforge/commands/profiles.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +CLI commands for profile management +""" + +# Standard +from datetime import datetime +from pathlib import Path +from typing import Optional +import json +import secrets +import string + +# Third-Party +import typer + +# First-Party +from cforge.common import get_console, print_table, print_json, prompt_for_schema +from cforge.config import get_settings +from cforge.profile_utils import ( + AuthProfile, + ProfileStore, + get_all_profiles, + get_profile, + get_active_profile, + set_active_profile, + load_profile_store, + save_profile_store, +) + + +def profiles_list() -> None: + """List all available profiles. + + Displays all profiles configured in the Desktop app, showing their name, + email, API URL, and active status. + """ + console = get_console() + + try: + profiles = get_all_profiles() + + # Prepare data for table + profile_data = [] + for profile in profiles: + profile_data.append( + { + "id": profile.id, + "name": profile.name, + "email": profile.email, + "api_url": profile.api_url, + "active": "✓" if profile.is_active else "", + "environment": profile.metadata.environment if profile.metadata else "", + } + ) + + print_table( + profile_data, + "Available Profiles", + ["id", "name", "email", "api_url", "environment", "active"], + ) + + # Show which profile is currently active + active = get_active_profile() + console.print(f"\n[green]Currently using profile:[/green] [cyan]{active.name}[/cyan] ({active.email})") + console.print(f"[dim]Connected to: {active.api_url}[/dim]") + + except Exception as e: + console.print(f"[red]Error listing profiles: {str(e)}[/red]") + raise typer.Exit(1) + + +def profiles_get( + profile_id: Optional[str] = typer.Argument( + None, + help="Profile ID to retrieve. If not provided, shows the active profile.", + ), + json_output: bool = typer.Option( + False, + "--json", + help="Output in JSON format", + ), +) -> None: + """Get details of a specific profile or the active profile. + + If no profile ID is provided, displays information about the currently + active profile. + """ + console = get_console() + + try: + if profile_id: + profile = get_profile(profile_id) + if not profile: + console.print(f"[red]Profile not found: {profile_id}[/red]") + raise typer.Exit(1) + else: + profile = get_active_profile() + + if json_output: + # Output as JSON + print_json(profile.model_dump(by_alias=True), title="Profile Details") + else: + # Pretty print profile details + console.print(f"\n[bold cyan]Profile: {profile.name}[/bold cyan]") + console.print(f"[dim]ID:[/dim] {profile.id}") + console.print(f"[dim]Email:[/dim] {profile.email}") + console.print(f"[dim]API URL:[/dim] {profile.api_url}") + console.print(f"[dim]Active:[/dim] {'[green]Yes[/green]' if profile.is_active else '[yellow]No[/yellow]'}") + console.print(f"[dim]Created:[/dim] {profile.created_at}") + if profile.last_used: + console.print(f"[dim]Last Used:[/dim] {profile.last_used}") + + if profile.metadata: + console.print("\n[bold]Metadata:[/bold]") + if profile.metadata.description: + console.print(f" [dim]Description:[/dim] {profile.metadata.description}") + if profile.metadata.environment: + console.print(f" [dim]Environment:[/dim] {profile.metadata.environment}") + if profile.metadata.icon: + console.print(f" [dim]Icon:[/dim] {profile.metadata.icon}") + + except Exception as e: + console.print(f"[red]Error retrieving profile: {str(e)}[/red]") + raise typer.Exit(1) + + +def profiles_switch( + profile_id: str = typer.Argument( + ..., + help="Profile ID to switch to. Use 'cforge profiles list' to see available profiles.", + ), +) -> None: + """Switch to a different profile. + + Sets the specified profile as the active profile. All subsequent CLI + commands will use this profile's API URL for connections. + + Note: This only changes which profile the CLI uses. To fully authenticate + and manage profiles, use the Context Forge Desktop app. + """ + console = get_console() + + try: + # Check if profile exists + profile = get_profile(profile_id) + if not profile: + console.print(f"[red]Profile not found: {profile_id}[/red]") + console.print("[dim]Use 'cforge profiles list' to see available profiles.[/dim]") + raise typer.Exit(1) + + # Switch to the profile + success = set_active_profile(profile_id) + if not success: + console.print(f"[red]Failed to switch to profile: {profile_id}[/red]") + raise typer.Exit(1) + + console.print(f"[green]✓ Switched to profile:[/green] [cyan]{profile.name}[/cyan]") + console.print(f"[dim]Email:[/dim] {profile.email}") + console.print(f"[dim]API URL:[/dim] {profile.api_url}") + + # Clear the settings cache so the new profile takes effect + get_settings.cache_clear() + + console.print("\n[yellow]Note:[/yellow] Profile switched successfully. " "The CLI will now connect to the selected profile's API URL.") + + except Exception as e: + console.print(f"[red]Error switching profile: {str(e)}[/red]") + raise typer.Exit(1) + + +def profiles_create( + data_file: Optional[Path] = typer.Argument(None, help="JSON file containing prompt data (interactive mode if not provided)"), +) -> None: + """Create a new profile interactively. + + Walks the user through creating a new profile by prompting for all required + fields. The new profile will be created in an inactive state. After creation, + you will be asked if you want to enable the new profile. + """ + console = get_console() + + try: + console.print("\n[bold cyan]Create New Profile[/bold cyan]") + console.print("[dim]You will be prompted for profile information.[/dim]\n") + + # Generate a 16-character random ID (matching desktop app format) + alphabet = string.ascii_letters + string.digits + profile_id = "".join(secrets.choice(alphabet) for _ in range(16)) + created_at = datetime.now() + + # Pre-fill fields that should not be prompted + prefilled = { + "id": profile_id, + "is_active": False, + "created_at": created_at, + "last_used": None, + } + + if data_file: + if not data_file.exists(): + console.print(f"[red]File not found: {data_file}[/red]") + raise typer.Exit(1) + profile_data = json.loads(data_file.read_text()) + profile_data.update(prefilled) + else: + profile_data = prompt_for_schema(AuthProfile, prefilled=prefilled) + + # Create the AuthProfile instance + new_profile = AuthProfile.model_validate(profile_data) + + # Load or create the profile store + store = load_profile_store() + if not store: + store = ProfileStore(profiles={}, active_profile_id=None) + + # Add the new profile to the store + store.profiles[new_profile.id] = new_profile + + # Save the profile store + save_profile_store(store) + + console.print("\n[green]✓ Profile created successfully![/green]") + console.print(f"[dim]Profile ID:[/dim] {new_profile.id}") + console.print(f"[dim]Name:[/dim] {new_profile.name}") + console.print(f"[dim]Email:[/dim] {new_profile.email}") + console.print(f"[dim]API URL:[/dim] {new_profile.api_url}") + + # Ask if the user wants to enable the new profile + console.print("\n[yellow]Enable this profile now?[/yellow]", end=" ") + if typer.confirm("", default=False): + success = set_active_profile(new_profile.id) + if success: + console.print(f"[green]✓ Profile enabled:[/green] [cyan]{new_profile.name}[/cyan]") + # Clear the settings cache so the new profile takes effect + get_settings.cache_clear() + else: + console.print(f"[red]Failed to enable profile: {new_profile.id}[/red]") + else: + console.print("[dim]Profile created but not enabled. Use 'cforge profiles switch' to enable it later.[/dim]") + + except Exception as e: + console.print(f"[red]Error creating profile: {str(e)}[/red]") + raise typer.Exit(1) diff --git a/cforge/commands/settings/whoami.py b/cforge/commands/settings/whoami.py index ba78986..94702d4 100644 --- a/cforge/commands/settings/whoami.py +++ b/cforge/commands/settings/whoami.py @@ -9,18 +9,33 @@ # First-Party from cforge.common import get_console, get_settings, get_token_file, load_token +from cforge.profile_utils import get_active_profile def whoami() -> None: """Show current authentication status and token source. - Displays where the authentication token is coming from (if any). + Displays where the authentication token is coming from (if any) and + information about the active profile if one is set. """ + console = get_console() settings = get_settings() env_token = settings.mcpgateway_bearer_token stored_token = load_token() + active_profile = get_active_profile() + + # Display active profile information + console.print("[bold cyan]Active Profile:[/bold cyan]") + console.print(f" [cyan]Name:[/cyan] {active_profile.name}") + console.print(f" [cyan]ID:[/cyan] {active_profile.id}") + console.print(f" [cyan]Email:[/cyan] {active_profile.email}") + console.print(f" [cyan]API URL:[/cyan] {active_profile.api_url}") + if active_profile.metadata and active_profile.metadata.environment: + console.print(f" [cyan]Environment:[/cyan] {active_profile.metadata.environment}") + console.print() + # Display authentication status if env_token: console.print("[green]✓ Authenticated via MCPGATEWAY_BEARER_TOKEN environment variable[/green]") console.print(f"[cyan]Token:[/cyan] {env_token[:10]}...") diff --git a/cforge/common.py b/cforge/common.py index 4476a59..30882c0 100644 --- a/cforge/common.py +++ b/cforge/common.py @@ -26,7 +26,10 @@ import typer # First-Party +from cforge.profile_utils import DEFAULT_PROFILE_ID from cforge.config import get_settings +from cforge.credential_store import load_profile_credentials +from cforge.profile_utils import get_active_profile # ------------------------------------------------------------------------------ # Singletons @@ -96,14 +99,29 @@ def handle_exception(exception: Exception) -> None: # ------------------------------------------------------------------------------ +def get_base_url() -> str: + """Get the full base URL for the current profile's server + + TODO: This will need to support https in the future! + + Returns: + The string URL base + """ + return get_active_profile().api_url + + def get_token_file() -> Path: """Get the path to the token file in contextforge_home. + Uses the active profile if available, otherwise returns the default token file. + For the virtual default profile, uses the unsuffixed token file. + Returns: - Path to the token file + Path to the token file (profile-specific or default) """ - token_file = get_settings().contextforge_home / "token" - return token_file + profile = get_active_profile() + suffix = "" if profile.id == DEFAULT_PROFILE_ID else f".{profile.id}" + return get_settings().contextforge_home / f"token{suffix}" def save_token(token: str) -> None: @@ -131,13 +149,52 @@ def load_token() -> Optional[str]: return None +def attempt_auto_login() -> Optional[str]: + """Attempt to automatically login using stored credentials. + + This function tries to login using credentials stored by the desktop app + in the encrypted credential store. If successful, it saves the token + and returns it. + + Returns: + Authentication token if auto-login succeeds, None otherwise + """ + # Try to load credentials from the encrypted store + profile = get_active_profile() + credentials = load_profile_credentials(profile.id) + if not credentials or not credentials.get("email") or not credentials.get("password"): + return None + + # Attempt login + try: + gateway_url = get_base_url() + response = requests.post( + f"{gateway_url}/auth/email/login", + json={"email": credentials["email"], "password": credentials["password"]}, + headers={"Content-Type": "application/json"}, + ) + + if response.status_code == 200: + data = response.json() + token = data.get("access_token") + if token: + # Save the token for future use + save_token(token) + return token + except Exception: + # Silently fail - auto-login is best-effort + pass + + return None + + def get_auth_token() -> Optional[str]: """Get authentication token from multiple sources in priority order. Priority: 1. MCPGATEWAY_BEARER_TOKEN environment variable 2. Stored token in contextforge_home/token file - 3. Basic auth from settings + 3. Auto-login using stored credentials (if available) Returns: Authentication token string or None if not configured @@ -152,6 +209,11 @@ def get_auth_token() -> Optional[str]: if token: return token + # Try auto-login with stored credentials + token = attempt_auto_login() + if token: + return token + return None @@ -190,7 +252,7 @@ def make_authenticated_request( else: headers["Authorization"] = f"Bearer {token}" - gateway_url = f"http://{get_settings().host}:{get_settings().port}" + gateway_url = get_base_url() full_url = f"{gateway_url}{url}" try: diff --git a/cforge/credential_store.py b/cforge/credential_store.py new file mode 100644 index 0000000..eca88f6 --- /dev/null +++ b/cforge/credential_store.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +"""Location: ./cforge/credential_store.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +Credential storage utilities compatible with desktop app's electron-store encryption. +""" + +# Standard +import json +from pathlib import Path +from typing import Optional + +# Third-Party +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +# First-Party +from cforge.config import get_settings + + +def get_credential_store_path() -> Path: + """Get the path to the encrypted credential store file. + + Returns: + Path to the credential store JSON file + """ + return get_settings().contextforge_home / "context-forge-credentials.json" + + +def get_encryption_key_path() -> Path: + """Get the path to the encryption key file. + + Returns: + Path to the encryption key JSON file + """ + return get_settings().contextforge_home / "context-forge-keys.json" + + +def load_encryption_key() -> Optional[str]: + """Load the encryption key from the key store. + + Returns: + Encryption key string if found, None otherwise + """ + key_path = get_encryption_key_path() + if not key_path.exists(): + return None + + try: + with open(key_path, "r", encoding="utf-8") as f: + data = json.load(f) + return data.get("encryptionKey") + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to load encryption key: {e}") + return None + + +def decrypt_credential_data(encrypted_data: bytes, encryption_key: str) -> Optional[str]: + """Decrypt credential data using the same format as electron-store/conf. + + The format is: [16 bytes IV][':'][encrypted data] + Uses AES-256-CBC with PBKDF2 key derivation (10,000 iterations, SHA-512) + + Args: + encrypted_data: The encrypted data bytes + encryption_key: The encryption key string + + Returns: + Decrypted string if successful, None otherwise + """ + try: + # Extract IV from first 16 bytes + if len(encrypted_data) < 17: + return None + + iv = encrypted_data[:16] + + # Verify separator (should be ':' at byte 16) + if encrypted_data[16:17] != b":": + # Try legacy format without separator + encrypted_payload = encrypted_data[16:] + else: + # Skip IV and ':' separator (17 bytes total) + encrypted_payload = encrypted_data[17:] + + # Derive key using PBKDF2 with same parameters as conf package + kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, # 32 bytes = 256 bits for AES-256 + salt=iv, # IV is used as salt + iterations=10_000, + backend=default_backend(), + ) + key = kdf.derive(encryption_key.encode("utf-8")) + + # Decrypt using AES-256-CBC + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + decryptor = cipher.decryptor() + decrypted_data = decryptor.update(encrypted_payload) + decryptor.finalize() + + # Remove PKCS7 padding + padding_length = decrypted_data[-1] + decrypted_data = decrypted_data[:-padding_length] + + return decrypted_data.decode("utf-8") + except Exception as e: + print(f"Warning: Failed to decrypt credential data: {e}") + return None + + +def load_profile_credentials(profile_id: str) -> Optional[dict]: + """Load credentials for a specific profile from the encrypted store. + + Args: + profile_id: The profile ID to load credentials for + + Returns: + Dictionary with 'email' and 'password' if found, None otherwise + """ + # Load encryption key + encryption_key = load_encryption_key() + if not encryption_key: + return None + + # Load encrypted credential store + cred_path = get_credential_store_path() + if not cred_path.exists(): + return None + + try: + # Read the encrypted file as bytes + with open(cred_path, "rb") as f: + encrypted_data = f.read() + + # Decrypt the data + decrypted_json = decrypt_credential_data(encrypted_data, encryption_key) + if not decrypted_json: + return None + + # Parse JSON + credentials = json.loads(decrypted_json) + + # Get credentials for the specific profile + profile_creds = credentials.get(profile_id) + if not profile_creds: + return None + + return { + "email": profile_creds.get("email"), + "password": profile_creds.get("password"), + } + except (json.JSONDecodeError, OSError) as e: + print(f"Warning: Failed to load profile credentials: {e}") + return None diff --git a/cforge/main.py b/cforge/main.py index 794fbd1..b2fc7ce 100644 --- a/cforge/main.py +++ b/cforge/main.py @@ -32,6 +32,7 @@ from cforge.common import get_app from cforge.commands.deploy.deploy import deploy from cforge.commands.server.serve import serve +from cforge.commands.settings import profiles from cforge.commands.settings.login import login from cforge.commands.settings.logout import logout from cforge.commands.settings.whoami import whoami @@ -112,12 +113,27 @@ app.command(rich_help_panel="Settings")(login) app.command(rich_help_panel="Settings")(logout) app.command(rich_help_panel="Settings")(whoami) -app.command(rich_help_panel="Settings")(export) app.command(name="import", rich_help_panel="Settings")(import_cmd) +app.command(rich_help_panel="Settings")(export) app.command(rich_help_panel="Settings")(config_schema) app.command(rich_help_panel="Settings")(support_bundle) app.command(rich_help_panel="Settings")(version) +# --------------------------------------------------------------------------- +# Profiles command group +# --------------------------------------------------------------------------- + +profiles_app = typer.Typer( + name="profiles", + help="Manage user profiles for connecting to different Context Forge instances", + rich_markup_mode="rich", +) +app.add_typer(profiles_app, name="profiles", rich_help_panel="Settings") +profiles_app.command("list")(profiles.profiles_list) +profiles_app.command("get")(profiles.profiles_get) +profiles_app.command("switch")(profiles.profiles_switch) +profiles_app.command("create")(profiles.profiles_create) + # --------------------------------------------------------------------------- # Deploy command (hidden stub for future use) # --------------------------------------------------------------------------- diff --git a/cforge/profile_utils.py b/cforge/profile_utils.py new file mode 100644 index 0000000..208507a --- /dev/null +++ b/cforge/profile_utils.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +"""Location: ./cforge/profile_utils.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +Profile management utilities for Context Forge CLI. +Reads profile data from the Desktop app's electron-store files. +""" + +# Standard +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional +import json + +# Third-Party +from pydantic import BaseModel, Field, ValidationInfo, field_validator + +# Local +from cforge.config import get_settings + +# Virtual default profile ID for local development +DEFAULT_PROFILE_ID = "__default__" + + +class ProfileMetadata(BaseModel): + """Metadata for a profile.""" + + description: Optional[str] = None + environment: Optional[str] = None # 'production', 'staging', 'development', 'local' + color: Optional[str] = None + icon: Optional[str] = None + is_internal: Optional[bool] = Field(None, alias="isInternal") + + class Config: + """Pydantic model config""" + + # Map naming conventions + populate_by_name = True + + +class AuthProfile(BaseModel): + """Authentication profile matching the Desktop app schema.""" + + id: str + name: str + email: str + api_url: str = Field(alias="apiUrl") + is_active: bool = Field(alias="isActive") + created_at: datetime = Field(alias="createdAt") + last_used: Optional[datetime] = Field(None, alias="lastUsed") + metadata: Optional[ProfileMetadata] = None + + class Config: + """Pydantic model config""" + + # Map naming conventions + populate_by_name = True + + +class ProfileStore(BaseModel): + """Profile store structure matching the Desktop app schema.""" + + profiles: Dict[str, AuthProfile] = {} + active_profile_id: Optional[str] = Field(None, alias="activeProfileId") + + class Config: + """Pydantic model config""" + + # Map naming conventions + populate_by_name = True + + @field_validator("profiles") + def validate_profiles(cls, profiles: Dict[str, AuthProfile]) -> Dict[str, AuthProfile]: + """Validate that IDs match between keys and profile objects and only one + profile is active + """ + if any(key != val.id for key, val in profiles.items()): + raise ValueError(f"key/id mismatch: {profiles}") + if len([p.id for p in profiles.values() if p.is_active]) > 1: + raise ValueError(f"Found multiple active profiles: {[profiles]}") + return profiles + + @field_validator("active_profile_id") + def validate_active_profile_id(cls, active_profile_id: Optional[str], info: ValidationInfo) -> Optional[str]: + """Validate that the given active_profile_id corresponds to the given + profiles + """ + if active_profile_id is None: + return active_profile_id + + if not (profiles := info.data.get("profiles")): + raise ValueError(f"Cannot set active_profile_id={active_profile_id} without providing profiles") + if not (active_profile := profiles.get(active_profile_id)): + raise ValueError(f"active_profile_id={active_profile_id} not present in profiles={profiles}") + if not active_profile.is_active: + raise ValueError(f"active_profile_id={active_profile_id} is not marked as active in profiles={profiles}") + + return active_profile_id + + +def get_default_api_url() -> str: + """Get the default API URL if not set via a profile + + Returns: + URL based on configured settings + """ + return f"http://{get_settings().host}:{get_settings().port}" + + +def get_profile_store_path() -> Path: + """Get the path to the profile store file. + + Returns: + Path to the profile store JSON file + """ + return get_settings().contextforge_home / "context-forge-profiles.json" + + +def load_profile_store() -> Optional[ProfileStore]: + """Load the profile store from disk. + + Returns: + ProfileStore if found and valid, None otherwise + """ + if (store_path := get_profile_store_path()) and store_path.exists(): + try: + with open(store_path, "r", encoding="utf-8") as f: + data = json.load(f) + return ProfileStore.model_validate(data) + except (json.JSONDecodeError, ValueError) as e: + print(f"Warning: Failed to load profile store: {e}") + + +def save_profile_store(store: ProfileStore) -> None: + """Save the profile store to disk. + + Args: + store: ProfileStore to save + """ + store_path = get_profile_store_path() + store_path.parent.mkdir(exist_ok=True) + + with open(store_path, "w", encoding="utf-8") as f: + # Convert to dict with original field names (camelCase) + data = store.model_dump(by_alias=True) + json.dump(data, f, indent=2, default=str) + + +def get_all_profiles() -> List[AuthProfile]: + """Get all profiles, including the virtual default profile. + + Returns: + List of all profiles, including virtual default if no Desktop default exists + """ + profiles = [] + if store := load_profile_store(): + profiles = list(store.profiles.values()) + + # Check if Desktop app has created a default profile + expected_default_url = get_default_api_url() + has_desktop_default = any(p.api_url == expected_default_url and p.metadata and p.metadata.is_internal for p in profiles) + + # Only include virtual default if Desktop hasn't created one + if not has_desktop_default: + default_profile = AuthProfile( + id=DEFAULT_PROFILE_ID, + name="Local Default", + email="admin@localhost", + api_url=expected_default_url, + is_active=not bool(store and store.active_profile_id), + created_at=datetime.now(), + metadata=ProfileMetadata( + description="Default local development profile", + environment="local", + ), + ) + profiles.append(default_profile) + + return profiles + + +def get_profile(profile_id: str) -> Optional[AuthProfile]: + """Get a specific profile by ID, including the virtual default profile. + + Args: + profile_id: Profile ID to retrieve + + Returns: + AuthProfile if found, None otherwise + """ + # Check for virtual default profile + if profile_id == DEFAULT_PROFILE_ID: + store = load_profile_store() + return AuthProfile( + id=DEFAULT_PROFILE_ID, + name="Local Default", + email="admin@localhost", + api_url=get_default_api_url(), + is_active=not bool(store and store.active_profile_id), + created_at=datetime.now(), + metadata=ProfileMetadata( + description="Default local development profile", + environment="local", + ), + ) + + if store := load_profile_store(): + return store.profiles.get(profile_id) + + +def get_active_profile() -> AuthProfile: + """Get the currently active profile, including the virtual default profile. + + Returns: + AuthProfile if an active profile is set, or default (virtual or desktop) + """ + if (store := load_profile_store()) and store.active_profile_id: + profile = store.profiles.get(store.active_profile_id) + # This should be unreachable due to validation on loading + assert profile, "BAD STATE: Profile store active profile id not found in profiles" + return profile + + # Check if Desktop app has created a default profile + expected_default_url = get_default_api_url() + + if store: + for profile in store.profiles.values(): + if profile.api_url == expected_default_url and profile.metadata and profile.metadata.is_internal: + # If Desktop default exists, use that + return profile + + # Return virtual default profile if no Desktop default exists + return AuthProfile( + id=DEFAULT_PROFILE_ID, + name="Local Default", + email="admin@localhost", + api_url=expected_default_url, + is_active=True, + created_at=datetime.now(), + metadata=ProfileMetadata( + description="Default local development profile", + environment="local", + ), + ) + + +def set_active_profile(profile_id: str) -> bool: + """Set the active profile, including support for the virtual default profile. + + Args: + profile_id: Profile ID to set as active + + Returns: + True if successful, False if profile not found + """ + # Handle virtual default profile + if profile_id == DEFAULT_PROFILE_ID: + if store := load_profile_store(): + # Deactivate all profiles to switch to default + for pid in store.profiles: + store.profiles[pid].is_active = False + store.active_profile_id = None + save_profile_store(store) + # Always return True for default profile (it always exists) + return True + + store = load_profile_store() + if not store: + return False + + if profile_id not in store.profiles: + return False + + # Update all profiles to inactive + for pid in store.profiles: + store.profiles[pid].is_active = False + + # Set the selected profile as active + store.profiles[profile_id].is_active = True + store.profiles[profile_id].last_used = datetime.now() + store.active_profile_id = profile_id + + save_profile_store(store) + return True diff --git a/pyproject.toml b/pyproject.toml index 6f1f9d8..8546f28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ dependencies = [ "rich>=13.9.4", "typer>=0.20.0", "mcp-contextforge-gateway>=0.9.0", + "cryptography>=44.0.0", ] # ---------------------------------------------------------------- diff --git a/tests/commands/settings/test_export.py b/tests/commands/settings/test_export.py index dbb7d75..9429f1c 100644 --- a/tests/commands/settings/test_export.py +++ b/tests/commands/settings/test_export.py @@ -24,7 +24,7 @@ class TestExportCommand: """Tests for export command.""" - def test_export_with_custom_output(self, mock_settings, mock_console) -> None: + def test_export_with_custom_output(self, mock_base_url, mock_console) -> None: """Test export with custom output file.""" mock_export_data = { "version": "1.0", @@ -36,7 +36,7 @@ def test_export_with_custom_output(self, mock_settings, mock_console) -> None: output_file = Path(temp_dir) / "export.json" with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", return_value=mock_export_data): export(output=output_file, types=None, exclude_types=None, tags=None, include_inactive=False, no_dependencies=False, verbose=False) @@ -46,7 +46,7 @@ def test_export_with_custom_output(self, mock_settings, mock_console) -> None: assert data["version"] == "1.0" assert data["metadata"]["entity_counts"]["tools"] == 5 - def test_export_with_default_filename(self, mock_settings, mock_console) -> None: + def test_export_with_default_filename(self, mock_base_url, mock_console) -> None: """Test export with auto-generated filename.""" mock_export_data = {"metadata": {"entity_counts": {}}} import os @@ -56,7 +56,7 @@ def test_export_with_default_filename(self, mock_settings, mock_console) -> None try: os.chdir(temp_dir) with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", return_value=mock_export_data): export(output=None, types=None, exclude_types=None, tags=None, include_inactive=False, no_dependencies=False, verbose=False) @@ -66,7 +66,7 @@ def test_export_with_default_filename(self, mock_settings, mock_console) -> None finally: os.chdir(orig_dir) - def test_export_with_filters(self, mock_settings, mock_console) -> None: + def test_export_with_filters(self, mock_base_url, mock_console) -> None: """Test export with filter parameters.""" mock_export_data = {"metadata": {"entity_counts": {}}} @@ -74,7 +74,7 @@ def test_export_with_filters(self, mock_settings, mock_console) -> None: output_file = Path(temp_dir) / "export.json" with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", return_value=mock_export_data) as mock_request: export( output=output_file, @@ -95,7 +95,7 @@ def test_export_with_filters(self, mock_settings, mock_console) -> None: assert params["include_inactive"] == "true" assert params["include_dependencies"] == "false" - def test_export_verbose_mode(self, mock_settings, mock_console) -> None: + def test_export_verbose_mode(self, mock_base_url, mock_console) -> None: """Test export with verbose output.""" mock_export_data = { "version": "1.0", @@ -109,7 +109,7 @@ def test_export_verbose_mode(self, mock_settings, mock_console) -> None: output_file = Path(temp_dir) / "export.json" with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", return_value=mock_export_data): export(output=output_file, types=None, exclude_types=None, tags=None, include_inactive=False, no_dependencies=False, verbose=True) @@ -117,7 +117,7 @@ def test_export_verbose_mode(self, mock_settings, mock_console) -> None: assert any("Export details" in str(call) for call in mock_console.print.call_args_list) assert any("Version:" in str(call) for call in mock_console.print.call_args_list) - def test_export_with_entity_counts(self, mock_settings, mock_console) -> None: + def test_export_with_entity_counts(self, mock_base_url, mock_console) -> None: """Test export with non-zero entity counts.""" mock_export_data = {"metadata": {"entity_counts": {"tools": 5, "prompts": 3, "servers": 0}}} @@ -125,7 +125,7 @@ def test_export_with_entity_counts(self, mock_settings, mock_console) -> None: output_file = Path(temp_dir) / "export.json" with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", return_value=mock_export_data): export(output=output_file, types=None, exclude_types=None, tags=None, include_inactive=False, no_dependencies=False, verbose=False) @@ -135,10 +135,10 @@ def test_export_with_entity_counts(self, mock_settings, mock_console) -> None: assert "prompts: 3" in output_str # servers: 0 should not be printed (covered by if count > 0) - def test_export_error_handling(self, mock_settings, mock_console) -> None: + def test_export_error_handling(self, mock_base_url, mock_console) -> None: """Test export error handling.""" with patch("cforge.commands.settings.export.get_console", return_value=mock_console): - with patch("cforge.commands.settings.export.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.export.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.export.make_authenticated_request", side_effect=Exception("Export failed")): with pytest.raises(typer.Exit) as exc_info: export(output=None, types=None, exclude_types=None, tags=None, include_inactive=False, no_dependencies=False, verbose=False) diff --git a/tests/commands/settings/test_login.py b/tests/commands/settings/test_login.py index a85e80e..aac6966 100644 --- a/tests/commands/settings/test_login.py +++ b/tests/commands/settings/test_login.py @@ -25,7 +25,7 @@ class TestLoginCommand: """Tests for login command.""" - def test_login_success_with_save(self, mock_settings, mock_console) -> None: + def test_login_success_with_save(self, mock_base_url, mock_console) -> None: """Test successful login with token save.""" mock_response = Mock() mock_response.status_code = 200 @@ -35,7 +35,7 @@ def test_login_success_with_save(self, mock_settings, mock_console) -> None: token_file = Path(temp_dir) / "token" with patch("cforge.commands.settings.login.get_console", return_value=mock_console): - with patch("cforge.commands.settings.login.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.login.requests.post", return_value=mock_response): with patch("cforge.commands.settings.login.save_token") as mock_save: with patch("cforge.commands.settings.login.get_token_file", return_value=token_file): @@ -48,14 +48,14 @@ def test_login_success_with_save(self, mock_settings, mock_console) -> None: assert any("Login successful" in str(call) for call in mock_console.print.call_args_list) assert any("Token saved" in str(call) for call in mock_console.print.call_args_list) - def test_login_success_without_save(self, mock_settings, mock_console) -> None: + def test_login_success_without_save(self, mock_base_url, mock_console) -> None: """Test successful login without saving token.""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {"access_token": "test_token_123"} with patch("cforge.commands.settings.login.get_console", return_value=mock_console): - with patch("cforge.commands.settings.login.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.login.requests.post", return_value=mock_response): with patch("cforge.commands.settings.login.save_token") as mock_save: login(email="test@example.com", password="password", save=False) @@ -66,14 +66,14 @@ def test_login_success_without_save(self, mock_settings, mock_console) -> None: # Verify console shows export instruction assert any("MCPGATEWAY_BEARER_TOKEN" in str(call) for call in mock_console.print.call_args_list) - def test_login_http_error(self, mock_settings, mock_console) -> None: + def test_login_http_error(self, mock_base_url, mock_console) -> None: """Test login with HTTP error response.""" mock_response = Mock() mock_response.status_code = 401 mock_response.text = "Invalid credentials" with patch("cforge.commands.settings.login.get_console", return_value=mock_console): - with patch("cforge.commands.settings.login.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.login.requests.post", return_value=mock_response): with pytest.raises(typer.Exit) as exc_info: login(email="test@example.com", password="wrong", save=True) @@ -84,14 +84,14 @@ def test_login_http_error(self, mock_settings, mock_console) -> None: # Verify error message assert any("Login failed" in str(call) for call in mock_console.print.call_args_list) - def test_login_no_token_in_response(self, mock_settings, mock_console) -> None: + def test_login_no_token_in_response(self, mock_base_url, mock_console) -> None: """Test login when server doesn't return a token.""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = {} # No access_token with patch("cforge.commands.settings.login.get_console", return_value=mock_console): - with patch("cforge.commands.settings.login.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.login.requests.post", return_value=mock_response): with pytest.raises(typer.Exit) as exc_info: login(email="test@example.com", password="password", save=True) @@ -102,10 +102,10 @@ def test_login_no_token_in_response(self, mock_settings, mock_console) -> None: # Verify error message assert any("No token received" in str(call) for call in mock_console.print.call_args_list) - def test_login_connection_error(self, mock_settings, mock_console) -> None: + def test_login_connection_error(self, mock_base_url, mock_console) -> None: """Test login with connection error.""" with patch("cforge.commands.settings.login.get_console", return_value=mock_console): - with patch("cforge.commands.settings.login.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): with patch("cforge.commands.settings.login.requests.post", side_effect=requests.ConnectionError("Connection refused")): with pytest.raises(typer.Exit) as exc_info: login(email="test@example.com", password="password", save=True) @@ -136,3 +136,109 @@ def test_login_lifecycle(self, mock_console, mock_client, mock_settings) -> None save=True, ) make_authenticated_request("GET", "/tools") + + +class TestLoginWithProfiles: + """Tests for login command with profile support.""" + + def test_login_saves_to_profile_specific_token_file(self, mock_base_url, mock_console, mock_settings) -> None: + """Test that login saves token to profile-specific file when profile is active.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"access_token": "profile_token_123"} + + # Create and save an active profile + profile_id = "test-profile-login" + profile = AuthProfile( + id=profile_id, + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + token_file = mock_settings.contextforge_home / f"token.{profile_id}" + + with patch("cforge.commands.settings.login.get_console", return_value=mock_console): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): + with patch("cforge.commands.settings.login.requests.post", return_value=mock_response): + login(email="test@example.com", password="password", save=True) + + # Verify token was saved to profile-specific file + assert token_file.exists() + assert token_file.read_text() == "profile_token_123" + + def test_login_with_multiple_profiles(self, mock_base_url, mock_console, mock_settings) -> None: + """Test that different profiles can have different tokens.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + profile_id1 = "profile-1" + profile_id2 = "profile-2" + + token_file1 = mock_settings.contextforge_home / f"token.{profile_id1}" + token_file2 = mock_settings.contextforge_home / f"token.{profile_id2}" + + # Login to profile 1 + mock_response1 = Mock() + mock_response1.status_code = 200 + mock_response1.json.return_value = {"access_token": "token_profile_1"} + + profile1 = AuthProfile( + id=profile_id1, + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store1 = ProfileStore( + profiles={profile_id1: profile1}, + activeProfileId=profile_id1, + ) + save_profile_store(store1) + + with patch("cforge.commands.settings.login.get_console", return_value=mock_console): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): + with patch("cforge.commands.settings.login.requests.post", return_value=mock_response1): + login(email="user1@example.com", password="password1", save=True) + + # Login to profile 2 + mock_response2 = Mock() + mock_response2.status_code = 200 + mock_response2.json.return_value = {"access_token": "token_profile_2"} + + profile2 = AuthProfile( + id=profile_id2, + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store2 = ProfileStore( + profiles={profile_id2: profile2}, + activeProfileId=profile_id2, + ) + save_profile_store(store2) + + with patch("cforge.commands.settings.login.get_console", return_value=mock_console): + with patch("cforge.commands.settings.login.get_base_url", return_value=mock_base_url): + with patch("cforge.commands.settings.login.requests.post", return_value=mock_response2): + login(email="user2@example.com", password="password2", save=True) + + # Verify both tokens exist and are different + assert token_file1.exists() + assert token_file2.exists() + assert token_file1.read_text() == "token_profile_1" + assert token_file2.read_text() == "token_profile_2" + assert token_file1.read_text() != token_file2.read_text() diff --git a/tests/commands/settings/test_logout.py b/tests/commands/settings/test_logout.py index 678e6f7..1a11b2a 100644 --- a/tests/commands/settings/test_logout.py +++ b/tests/commands/settings/test_logout.py @@ -85,3 +85,81 @@ def test_logout_lifecycle(self, mock_console, mock_client, mock_settings) -> Non # Try again (should fail) with pytest.raises(AuthenticationError): make_authenticated_request("GET", "/tools") + + +class TestLogoutWithProfiles: + """Tests for logout command with profile support.""" + + def test_logout_removes_profile_specific_token(self, mock_console, mock_settings) -> None: + """Test that logout removes profile-specific token file.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + # Create and save an active profile + profile_id = "test-profile-logout" + profile = AuthProfile( + id=profile_id, + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + token_file = mock_settings.contextforge_home / f"token.{profile_id}" + token_file.write_text("profile_token") + + with patch("cforge.commands.settings.logout.get_console", return_value=mock_console): + logout() + + # Token file should be deleted + assert not token_file.exists() + + # Verify console output mentions the profile-specific file + assert mock_console.print.call_count == 2 + first_call = mock_console.print.call_args_list[0][0][0] + assert "Token removed" in first_call + assert profile_id in str(token_file) + + def test_logout_only_removes_active_profile_token(self, mock_console, mock_settings) -> None: + """Test that logout only removes the active profile's token, not others.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + profile_id1 = "profile-1" + profile_id2 = "profile-2" + + # Create profile 2 as active + profile2 = AuthProfile( + id=profile_id2, + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id2: profile2}, + activeProfileId=profile_id2, + ) + save_profile_store(store) + + token_file1 = mock_settings.contextforge_home / f"token.{profile_id1}" + token_file2 = mock_settings.contextforge_home / f"token.{profile_id2}" + + # Create both token files + token_file1.write_text("token_profile_1") + token_file2.write_text("token_profile_2") + + with patch("cforge.commands.settings.logout.get_console", return_value=mock_console): + logout() + + # Only profile 2's token should be deleted (the active one) + assert token_file1.exists() # Profile 1's token still exists + assert not token_file2.exists() # Profile 2's token was deleted + assert token_file1.read_text() == "token_profile_1" diff --git a/tests/commands/settings/test_profiles.py b/tests/commands/settings/test_profiles.py new file mode 100644 index 0000000..9b5e5cc --- /dev/null +++ b/tests/commands/settings/test_profiles.py @@ -0,0 +1,698 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/commands/settings/test_profiles.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +Tests for profile management CLI commands. +""" + +# Standard +from datetime import datetime +from pathlib import Path +from unittest.mock import Mock, patch +import json +import tempfile + +# Third-Party +import pytest +import typer + +# First-Party +from cforge.commands.settings.profiles import ( + profiles_create, + profiles_get, + profiles_list, + profiles_switch, +) +from cforge.profile_utils import ( + AuthProfile, + ProfileMetadata, + ProfileStore, + save_profile_store, + load_profile_store, + DEFAULT_PROFILE_ID, +) + + +class TestProfilesList: + """Tests for profiles list command.""" + + def test_profiles_list_success(self, mock_console, mock_settings) -> None: + """Test listing profiles successfully.""" + # Create test profiles + profile1 = AuthProfile( + id="profile-1", + name="Production", + email="user@prod.com", + apiUrl="https://api.prod.com", + isActive=True, + createdAt=datetime.now(), + metadata=ProfileMetadata(environment="production"), + ) + profile2 = AuthProfile( + id="profile-2", + name="Staging", + email="user@staging.com", + apiUrl="https://api.staging.com", + isActive=False, + createdAt=datetime.now(), + metadata=ProfileMetadata(environment="staging"), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.print_table") as mock_table: + profiles_list() + + # Verify table was printed + mock_table.assert_called_once() + call_args = mock_table.call_args + profile_data = call_args[0][0] + + # Verify profile data (should include 2 profiles + virtual default) + assert len(profile_data) == 3 + assert any(p["id"] == "profile-1" for p in profile_data) + assert any(p["id"] == "profile-2" for p in profile_data) + assert any(p["id"] == DEFAULT_PROFILE_ID for p in profile_data) + assert any(p["active"] == "✓" for p in profile_data) + + # Verify active profile message + assert any("Currently using profile" in str(call) for call in mock_console.print.call_args_list) + + def test_profiles_list_empty(self, mock_console, mock_settings) -> None: + """Test listing when no profiles exist (should show virtual default).""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.print_table") as mock_table: + profiles_list() + + # Verify table was printed with virtual default + mock_table.assert_called_once() + call_args = mock_table.call_args + profile_data = call_args[0][0] + + # Should have exactly 1 profile (the virtual default) + assert len(profile_data) == 1 + assert profile_data[0]["id"] == DEFAULT_PROFILE_ID + assert profile_data[0]["name"] == "Local Default" + + def test_profiles_list_with_active_profile(self, mock_console, mock_settings) -> None: + """Test listing profiles when there is an active profile.""" + from datetime import datetime + + # Create test profiles with one active + profile1 = AuthProfile( + id="profile-1", + name="Active Profile", + email="active@example.com", + apiUrl="https://api.active.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Inactive Profile", + email="inactive@example.com", + apiUrl="https://api.inactive.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.print_table"): + profiles_list() + + # Verify active profile message is shown + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Currently using profile" in call for call in print_calls) + assert any("Active Profile" in call for call in print_calls) + + def test_profiles_list_without_active_profile(self, mock_console, mock_settings) -> None: + """Test listing profiles when there is not an active profile.""" + from datetime import datetime + + # Create test profiles with one active + profile1 = AuthProfile( + id="profile-1", + name="Active Profile", + email="active@example.com", + apiUrl="https://api.active.com", + isActive=False, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Inactive Profile", + email="inactive@example.com", + apiUrl="https://api.inactive.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.print_table"): + profiles_list() + + # Verify active profile message shows default + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Currently using profile" in call for call in print_calls) + assert any("Local Default" in call for call in print_calls) + + def test_profiles_list_error(self, mock_console, mock_settings) -> None: + """Test listing profiles with an error.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.get_all_profiles", side_effect=Exception("Test error")): + with pytest.raises(typer.Exit) as exc_info: + profiles_list() + + assert exc_info.value.exit_code == 1 + assert any("Error listing profiles" in str(call) for call in mock_console.print.call_args_list) + + +class TestProfilesGet: + """Tests for profiles get command.""" + + def test_profiles_get_by_id(self, mock_console, mock_settings) -> None: + """Test getting a specific profile by ID.""" + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + lastUsed=datetime.now(), + metadata=ProfileMetadata( + description="Test description", + environment="production", + icon="🚀", + ), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id="profile-1", json_output=False) + + # Verify profile details were printed + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Test Profile" in call for call in print_calls) + assert any("test@example.com" in call for call in print_calls) + assert any("https://api.example.com" in call for call in print_calls) + + def test_profiles_get_active(self, mock_console, mock_settings) -> None: + """Test getting the active profile when no ID provided.""" + profile = AuthProfile( + id="profile-1", + name="Active Profile", + email="active@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id=None, json_output=False) + + # Verify active profile was shown + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Active Profile" in call for call in print_calls) + + def test_profiles_get_default(self, mock_console, mock_settings) -> None: + """Test getting the virtual default profile.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id=DEFAULT_PROFILE_ID, json_output=False) + + # Verify default profile was shown + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Local Default" in call for call in print_calls) + assert any("admin@localhost" in call for call in print_calls) + + def test_profiles_get_not_found(self, mock_console, mock_settings) -> None: + """Test getting a profile that doesn't exist.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with pytest.raises(typer.Exit): + profiles_get(profile_id="nonexistent", json_output=False) + + def test_profiles_get_active_when_none_exists(self, mock_console, mock_settings) -> None: + """Test getting active profile when profile_id is None and no profile is active (edge case).""" + # Create a profile store with a profile but no active profile + profile = AuthProfile( + id="profile-1", + name="Inactive Profile", + email="inactive@example.com", + apiUrl="https://api.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId=None, + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + # Should not raise an error, should return virtual default + profiles_get(profile_id=None, json_output=False) + + # Should show the virtual default profile + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Local Default" in call for call in print_calls) + + def test_profiles_get_no_active(self, mock_console, mock_settings) -> None: + """Test getting active profile when none is set (should return default).""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id=None, json_output=False) + + # Should show the virtual default profile + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Default local development profile" in call for call in print_calls) + + def test_profiles_get_no_active_no_profiles(self, mock_console, mock_settings) -> None: + """Test getting active profile when no profiles exist and none active (returns default).""" + # Don't create any profile store - should return virtual default + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id=None, json_output=False) + + # Should show the virtual default profile + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Local Default" in call for call in print_calls) + + assert any("Local Default" in call for call in print_calls) + + def test_profiles_get_json_output(self, mock_console, mock_settings) -> None: + """Test getting profile with JSON output.""" + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.print_json") as mock_json: + profiles_get(profile_id="profile-1", json_output=True) + + # Verify JSON output was called + mock_json.assert_called_once() + + def test_profiles_get_with_metadata_fields(self, mock_console, mock_settings) -> None: + """Test getting profile with all metadata fields displayed.""" + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + lastUsed=datetime.now(), + metadata=ProfileMetadata( + description="Test description", + environment="production", + icon="🚀", + ), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id="profile-1", json_output=False) + + # Verify all metadata fields are printed + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Description" in call and "Test description" in call for call in print_calls) + assert any("Environment" in call and "production" in call for call in print_calls) + assert any("Icon" in call and "🚀" in call for call in print_calls) + assert any("Last Used" in call for call in print_calls) + + def test_profiles_get_error(self, mock_console, mock_settings) -> None: + """Test getting profile with an error.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.get_profile", side_effect=Exception("Test error")): + with pytest.raises(typer.Exit) as exc_info: + profiles_get(profile_id="profile-1", json_output=False) + + assert exc_info.value.exit_code == 1 + assert any("Error retrieving profile" in str(call) for call in mock_console.print.call_args_list) + + def test_profiles_get_partial_metadata(self, mock_console, mock_settings) -> None: + """Test getting a profile with partial metadata still works.""" + profile = AuthProfile( + id="profile-1", + name="Active Profile", + email="active@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + metadata=ProfileMetadata(isInternal=True), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + profiles_get(profile_id=None, json_output=False) + + # Verify active profile was shown + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Active Profile" in call for call in print_calls) + assert not any("Description:" in call for call in mock_console.print.call_args_list) + assert not any("Environment:" in call for call in mock_console.print.call_args_list) + assert not any("Icon:" in call for call in mock_console.print.call_args_list) + + +class TestProfilesSwitch: + """Tests for profiles switch command.""" + + def test_profiles_switch_success(self, mock_console, mock_settings) -> None: + """Test successfully switching profiles.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.get_settings") as mock_get_settings: + mock_get_settings.cache_clear = Mock() + profiles_switch(profile_id="profile-2") + + # Verify success message + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Switched to profile" in call for call in print_calls) + assert any("Profile 2" in call for call in print_calls) + + # Verify cache was cleared + mock_get_settings.cache_clear.assert_called_once() + + def test_profiles_switch_to_default(self, mock_console, mock_settings) -> None: + """Test switching to the virtual default profile.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.get_settings") as mock_get_settings: + mock_get_settings.cache_clear = Mock() + profiles_switch(profile_id=DEFAULT_PROFILE_ID) + + # Verify success message + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Switched to profile" in call for call in print_calls) + assert any("Local Default" in call for call in print_calls) + + # Verify all profiles are now inactive + updated_store = load_profile_store() + assert updated_store is not None + assert updated_store.active_profile_id is None + assert all(not p.is_active for p in updated_store.profiles.values()) + + def test_profiles_switch_not_found(self, mock_console, mock_settings) -> None: + """Test switching to a profile that doesn't exist.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with pytest.raises(typer.Exit) as exc_info: + profiles_switch(profile_id="nonexistent") + + assert exc_info.value.exit_code == 1 + assert any("Profile not found" in str(call) for call in mock_console.print.call_args_list) + + def test_profiles_switch_error(self, mock_console, mock_settings) -> None: + """Test switching profiles with an error.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.set_active_profile", side_effect=Exception("Test error")): + with pytest.raises(typer.Exit) as exc_info: + profiles_switch(profile_id="profile-1") + + assert exc_info.value.exit_code == 1 + assert any("Error switching profile" in str(call) for call in mock_console.print.call_args_list) + + def test_profiles_switch_failed_to_switch(self, mock_console, mock_settings) -> None: + """Test switching profiles when set_active_profile returns False.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.set_active_profile", return_value=False): + with pytest.raises(typer.Exit) as exc_info: + profiles_switch(profile_id="profile-1") + + assert exc_info.value.exit_code == 1 + assert any("Failed to switch to profile" in str(call) for call in mock_console.print.call_args_list) + + +class TestProfilesCreate: + """Tests for profiles create command.""" + + def test_profiles_create_success(self, mock_console, mock_settings) -> None: + """Test successfully creating a new profile.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.prompt_for_schema") as mock_prompt: + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=False): + # Mock the prompt to return profile data + mock_prompt.return_value = { + "id": "test-profile-id", + "name": "Test Profile", + "email": "test@example.com", + "api_url": "https://api.test.com", + "is_active": False, + "created_at": datetime.now(), + } + + profiles_create(None) + + # Verify success message + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Profile created successfully" in call for call in print_calls) + assert any("Test Profile" in call for call in print_calls) + + def test_profiles_create_and_enable(self, mock_console, mock_settings) -> None: + """Test creating a profile and enabling it.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.prompt_for_schema") as mock_prompt: + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=True): + with patch("cforge.commands.settings.profiles.set_active_profile", return_value=True) as set_active_profile_mock: + with patch("cforge.commands.settings.profiles.get_settings") as mock_get_settings: + mock_get_settings.cache_clear = Mock() + + # Mock the prompt to return profile data + mock_prompt.return_value = { + "id": "test-profile-id", + "name": "Test Profile", + "email": "test@example.com", + "api_url": "https://api.test.com", + "is_active": False, + "created_at": datetime.now(), + } + + profiles_create(None) + + # Verify success and enable messages + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Profile created successfully" in call for call in print_calls) + assert any("Profile enabled" in call for call in print_calls) + set_active_profile_mock.assert_called_with("test-profile-id") + + def test_profiles_create_error(self, mock_console, mock_settings) -> None: + """Test creating profile with an error.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.prompt_for_schema", side_effect=Exception("Test error")): + with pytest.raises(typer.Exit) as exc_info: + profiles_create(None) + + assert exc_info.value.exit_code == 1 + assert any("Error creating profile" in str(call) for call in mock_console.print.call_args_list) + + def test_profiles_create_enable_fails(self, mock_console, mock_settings) -> None: + """Test creating profile but enabling fails.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.prompt_for_schema") as mock_prompt: + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=True): + with patch("cforge.commands.settings.profiles.set_active_profile", return_value=False): + # Mock the prompt to return profile data + mock_prompt.return_value = { + "id": "test-profile-id", + "name": "Test Profile", + "email": "test@example.com", + "api_url": "https://api.test.com", + "is_active": False, + "created_at": datetime.now(), + } + + profiles_create(None) + + # Verify failure message + print_calls = [str(call) for call in mock_console.print.call_args_list] + assert any("Failed to enable profile" in call for call in print_calls) + + def test_profiles_create_with_existing_store(self, mock_console, mock_settings) -> None: + """Test creating a profile when a profile store already exists.""" + from cforge.profile_utils import load_profile_store + + # Create an existing profile store + existing_profile = AuthProfile( + id="existing-profile", + name="Existing Profile", + email="existing@example.com", + apiUrl="https://api.existing.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={"existing-profile": existing_profile}, + activeProfileId="existing-profile", + ) + save_profile_store(store) + + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.prompt_for_schema") as mock_prompt: + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=False): + # Mock the prompt to return profile data + mock_prompt.return_value = { + "id": "new-profile-id", + "name": "New Profile", + "email": "new@example.com", + "api_url": "https://api.new.com", + "is_active": False, + "created_at": datetime.now(), + } + + profiles_create(None) + + # Verify both profiles exist in the store + updated_store = load_profile_store() + assert updated_store is not None + assert len(updated_store.profiles) == 2 + assert "existing-profile" in updated_store.profiles + assert "new-profile-id" in updated_store.profiles + + def test_profiles_create_data_file(self, mock_console, mock_settings) -> None: + """Test successfully creating a new profile using a data file.""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=False): + data_file_content = { + "name": "Test Profile", + "email": "test@example.com", + "api_url": "https://api.test.com", + "is_active": False, + } + with tempfile.NamedTemporaryFile("w") as data_file: + data_file.write(json.dumps(data_file_content)) + data_file.flush() + profiles_create(Path(data_file.name)) + + # Verify both profiles exist in the store + updated_store = load_profile_store() + assert updated_store is not None + assert len(updated_store.profiles) == 1 + profile_id = list(updated_store.profiles.keys())[0] + profile = list(updated_store.profiles.values())[0] + assert profile.id == profile_id + assert profile.name == data_file_content["name"] + assert profile.email == data_file_content["email"] + assert profile.api_url == data_file_content["api_url"] + assert profile.is_active == data_file_content["is_active"] + + def test_profiles_create_bad_data_file(self, mock_console, mock_settings) -> None: + """Test error handling when data file is not found""" + with patch("cforge.commands.settings.profiles.get_console", return_value=mock_console): + with patch("cforge.commands.settings.profiles.typer.confirm", return_value=False): + with pytest.raises(typer.Exit) as exc_info: + profiles_create(mock_settings.contextforge_home / "does" / "not" / "exist") + assert exc_info.value.exit_code == 1 diff --git a/tests/commands/settings/test_whoami.py b/tests/commands/settings/test_whoami.py index fff6ab3..b8cc20f 100644 --- a/tests/commands/settings/test_whoami.py +++ b/tests/commands/settings/test_whoami.py @@ -27,19 +27,9 @@ def test_whoami_with_env_token(self, mock_settings, mock_console) -> None: with patch("cforge.commands.settings.whoami.load_token", return_value=None): whoami() - # Verify console output - assert mock_console.print.call_count == 2 - - # Check first call - authentication status - first_call = mock_console.print.call_args_list[0][0][0] - assert "Authenticated via MCPGATEWAY_BEARER_TOKEN" in first_call - assert "[green]" in first_call - - # Check second call - token preview - second_call = mock_console.print.call_args_list[1][0][0] - assert "Token:" in second_call - assert "env_token_" in second_call - assert "..." in second_call + call_messages = [call[0][0] for call in mock_console.print.call_args_list if call and call[0]] + assert any("Authenticated via MCPGATEWAY_BEARER_TOKEN" in call for call in call_messages) + assert any("env_token_" in call for call in call_messages) def test_whoami_with_stored_token(self, mock_settings, mock_console) -> None: """Test whoami when authenticated via stored token file.""" @@ -51,19 +41,9 @@ def test_whoami_with_stored_token(self, mock_settings, mock_console) -> None: with patch("cforge.commands.settings.whoami.load_token", return_value=stored_token): whoami() - # Verify console output - assert mock_console.print.call_count == 2 - - # Check first call - authentication status with file path - first_call = mock_console.print.call_args_list[0][0][0] - assert "Authenticated via stored token" in first_call - assert "[green]" in first_call - - # Check second call - token preview - second_call = mock_console.print.call_args_list[1][0][0] - assert "Token:" in second_call - assert "stored_tok" in second_call - assert "..." in second_call + call_messages = [call[0][0] for call in mock_console.print.call_args_list if call and call[0]] + assert any("Authenticated via stored token" in call for call in call_messages) + assert any("stored_tok" in call for call in call_messages) def test_whoami_not_authenticated(self, mock_settings, mock_console) -> None: """Test whoami when not authenticated.""" @@ -75,14 +55,9 @@ def test_whoami_not_authenticated(self, mock_settings, mock_console) -> None: with patch("cforge.commands.settings.whoami.load_token", return_value=None): whoami() - # Verify console output - mock_console.print.assert_called_once() - - # Check output message - call_args = mock_console.print.call_args[0][0] - assert "Not authenticated" in call_args - assert "cforge login" in call_args - assert "[yellow]" in call_args + call_messages = [call[0][0] for call in mock_console.print.call_args_list if call and call[0]] + assert any("Not authenticated" in call for call in call_messages) + assert any("cforge login" in call for call in call_messages) def test_whoami_env_token_takes_precedence(self, mock_settings, mock_console) -> None: """Test that env token takes precedence over stored token.""" @@ -95,7 +70,131 @@ def test_whoami_env_token_takes_precedence(self, mock_settings, mock_console) -> with patch("cforge.commands.settings.whoami.load_token", return_value=stored_token): whoami() - # Should show env token, not stored token - first_call = mock_console.print.call_args_list[0][0][0] - assert "MCPGATEWAY_BEARER_TOKEN" in first_call - assert "stored token" not in first_call + call_messages = [call[0][0] for call in mock_console.print.call_args_list if call and call[0]] + assert any("MCPGATEWAY_BEARER_TOKEN" in call for call in call_messages) + assert not any("stored token" in call for call in call_messages) + + +class TestWhoamiWithProfiles: + """Tests for whoami command with profile support.""" + + def test_whoami_with_active_profile_and_token(self, mock_settings, mock_console) -> None: + """Test whoami displays active profile information along with auth status.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + # Create and save an active profile + profile_id = "test-profile-whoami" + profile = AuthProfile( + id=profile_id, + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + # Set up authentication + mock_settings.mcpgateway_bearer_token = None + stored_token = "stored_token_1234567890abcdef" + + with patch("cforge.commands.settings.whoami.get_console", return_value=mock_console): + with patch("cforge.commands.settings.whoami.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.whoami.load_token", return_value=stored_token): + whoami() + + # Should display profile info (5 lines) + blank line + auth status (2 lines) = 8 calls + assert mock_console.print.call_count == 8 + + # Check profile information is displayed + calls_text = " ".join([str(call[0][0]) if call[0] else "" for call in mock_console.print.call_args_list]) + assert "Active Profile:" in calls_text + assert "Test Profile" in calls_text + assert profile_id in calls_text + assert "test@example.com" in calls_text + assert "https://api.example.com" in calls_text + + # Check authentication status is still displayed + assert "Authenticated via stored token" in calls_text + assert "stored_tok" in calls_text + + def test_whoami_with_active_profile_with_metadata(self, mock_settings, mock_console) -> None: + """Test whoami displays profile metadata when available.""" + from cforge.profile_utils import AuthProfile, ProfileMetadata, ProfileStore, save_profile_store + from datetime import datetime + + # Create profile with metadata + profile_id = "test-profile-metadata" + metadata = ProfileMetadata( + description="Test environment", + environment="staging", + color="#FF5733", + icon="🚀", + ) + profile = AuthProfile( + id=profile_id, + name="Staging Profile", + email="staging@example.com", + apiUrl="https://staging-api.example.com", + isActive=True, + createdAt=datetime.now(), + metadata=metadata, + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + mock_settings.mcpgateway_bearer_token = None + + with patch("cforge.commands.settings.whoami.get_console", return_value=mock_console): + with patch("cforge.commands.settings.whoami.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.whoami.load_token", return_value=None): + whoami() + + # Check that environment is displayed + calls_text = " ".join([str(call[0][0]) if call[0] else "" for call in mock_console.print.call_args_list]) + assert "Environment:" in calls_text + assert "staging" in calls_text + + def test_whoami_with_active_profile_no_auth(self, mock_settings, mock_console) -> None: + """Test whoami with active profile but no authentication.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + # Create and save an active profile + profile_id = "test-profile-noauth" + profile = AuthProfile( + id=profile_id, + name="No Auth Profile", + email="noauth@example.com", + apiUrl="https://noauth-api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + # No authentication + mock_settings.mcpgateway_bearer_token = None + + with patch("cforge.commands.settings.whoami.get_console", return_value=mock_console): + with patch("cforge.commands.settings.whoami.get_settings", return_value=mock_settings): + with patch("cforge.commands.settings.whoami.load_token", return_value=None): + whoami() + + # Should display profile info + not authenticated message + calls_text = " ".join([str(call[0][0]) if call[0] else "" for call in mock_console.print.call_args_list]) + assert "Active Profile:" in calls_text + assert "No Auth Profile" in calls_text + assert "Not authenticated" in calls_text + assert "cforge login" in calls_text diff --git a/tests/conftest.py b/tests/conftest.py index a0700cb..f81280d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -53,7 +53,7 @@ def patch_everywhere(name: str, **kwargs) -> Generator[List[Any], None, None]: """Patch a function in every place it is imported.""" # Find all modules that have the function - mod_names = [m for m, mod in sys.modules.items() if m.startswith("cforge") and hasattr(mod, name)] + mod_names = [m for m, mod in sys.modules.items() if ((m.startswith("cforge") or (m.startswith("test") and "conftest" in m)) and hasattr(mod, name))] patches = [patch(f"{m}.{name}", **kwargs) for m in mod_names] yields = [p.__enter__() for p in patches] try: @@ -311,3 +311,8 @@ def registered_mcp_server(mock_mcp_server, authorized_mock_client) -> Generator[ """Test-level fixture to register the mock server and unregister at the end""" with register_mcp_server(mock_mcp_server, authorized_mock_client) as mcp_server: yield mcp_server + + +@pytest.fixture +def mock_base_url(mock_settings): + yield f"http://{mock_settings.host}:{mock_settings.port}" diff --git a/tests/test_common.py b/tests/test_common.py index c4a28a6..3a9f34e 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -70,6 +70,31 @@ def test_get_token_file(self, mock_settings) -> None: assert str(token_file).endswith("token") assert token_file.parent == mock_settings.contextforge_home + def test_get_token_file_with_active_profile(self, mock_settings) -> None: + """Test getting the token file path uses active profile when available.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + # Create and save an active profile + profile_id = "active-profile-456" + profile = AuthProfile( + id=profile_id, + name="Active Profile", + email="active@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + # get_token_file should use the active profile + token_file = get_token_file() + assert str(token_file).endswith(f"token.{profile_id}") + def test_save_and_load_token(self) -> None: """Test saving and loading a token.""" test_token = "test_token_123" @@ -81,6 +106,91 @@ def test_save_and_load_token(self) -> None: assert loaded_token == test_token + def test_save_and_load_token_with_active_profile(self, mock_settings) -> None: + """Test saving and loading a token with an active profile.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + test_token = "profile_token_456" + profile_id = "test-profile-789" + + # Create and save an active profile + profile = AuthProfile( + id=profile_id, + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + # Save and load token - should use profile-specific file + save_token(test_token) + loaded_token = load_token() + + assert loaded_token == test_token + + # Verify it was saved to profile-specific file + token_file = mock_settings.contextforge_home / f"token.{profile_id}" + assert token_file.exists() + + def test_save_token_different_profiles(self, mock_settings) -> None: + """Test that different profiles have separate token files.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + token1 = "token_for_profile_1" + token2 = "token_for_profile_2" + profile_id1 = "profile-1" + profile_id2 = "profile-2" + + # Save token for profile 1 + profile1 = AuthProfile( + id=profile_id1, + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store1 = ProfileStore( + profiles={profile_id1: profile1}, + activeProfileId=profile_id1, + ) + save_profile_store(store1) + save_token(token1) + + # Save token for profile 2 + profile2 = AuthProfile( + id=profile_id2, + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store2 = ProfileStore( + profiles={profile_id2: profile2}, + activeProfileId=profile_id2, + ) + save_profile_store(store2) + save_token(token2) + + # Verify both tokens exist in separate files + token_file1 = mock_settings.contextforge_home / f"token.{profile_id1}" + token_file2 = mock_settings.contextforge_home / f"token.{profile_id2}" + + assert token_file1.exists() + assert token_file2.exists() + assert token_file1.read_text() == token1 + assert token_file2.read_text() == token2 + assert token1 != token2 + def test_load_token_nonexistent(self, tmp_path: Path) -> None: """Test loading a token when file doesn't exist.""" nonexistent_file = tmp_path / "nonexistent" / "token" @@ -90,6 +200,70 @@ def test_load_token_nonexistent(self, tmp_path: Path) -> None: assert token is None + def test_load_token_nonexistent_profile(self, mock_settings) -> None: + """Test loading a token for a profile that doesn't have a token file.""" + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + profile_id = "nonexistent-profile" + + # Create an active profile but don't create a token file + profile = AuthProfile( + id=profile_id, + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={profile_id: profile}, + activeProfileId=profile_id, + ) + save_profile_store(store) + + # Try to load token - should return None since file doesn't exist + token = load_token() + + assert token is None + + +class TestBaseUrl: + """Tests for get_base_url function.""" + + def test_get_base_url_with_active_profile(self, mock_settings) -> None: + """Test get_base_url returns profile's API URL when active profile exists.""" + from cforge.common import get_base_url + from cforge.profile_utils import AuthProfile, ProfileStore, save_profile_store + from datetime import datetime + + # Create and save a profile + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://custom-api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + # Get base URL should return the profile's API URL + base_url = get_base_url() + assert base_url == "https://custom-api.example.com" + + def test_get_base_url_without_active_profile(self, mock_settings) -> None: + """Test get_base_url returns default URL when no active profile.""" + from cforge.common import get_base_url + + # No profile saved, should use settings + base_url = get_base_url() + assert base_url == f"http://{mock_settings.host}:{mock_settings.port}" + class TestAuthentication: """Tests for authentication functions.""" @@ -120,6 +294,195 @@ def test_get_auth_token_none(self, mock_settings) -> None: assert token is None +class TestAutoLogin: + """Tests for automatic login functionality.""" + + def test_attempt_auto_login_no_profile(self, mock_settings): + """Test auto-login when no profile is active.""" + from cforge.common import attempt_auto_login + + token = attempt_auto_login() + assert token is None + + def test_attempt_auto_login_no_credentials(self, mock_settings): + """Test auto-login when credentials are not available.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value=None): + token = attempt_auto_login() + assert token is None + + def test_attempt_auto_login_missing_email(self, mock_settings): + """Test auto-login when email is missing from credentials.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"password": "test"}): + token = attempt_auto_login() + assert token is None + + def test_attempt_auto_login_missing_password(self, mock_settings): + """Test auto-login when password is missing from credentials.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"email": "test@example.com"}): + token = attempt_auto_login() + assert token is None + + @patch("cforge.common.requests.post") + def test_attempt_auto_login_success(self, mock_post, mock_settings): + """Test successful auto-login.""" + from cforge.common import attempt_auto_login, load_token + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + # Mock successful login response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"access_token": "auto-login-token"} + mock_post.return_value = mock_response + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"email": "test@example.com", "password": "test-pass"}): + token = attempt_auto_login() + assert token == "auto-login-token" + + # Verify token was saved + saved_token = load_token() + assert saved_token == "auto-login-token" + + @patch("cforge.common.requests.post") + def test_attempt_auto_login_failed_login(self, mock_post, mock_settings): + """Test auto-login when login fails.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + # Mock failed login response + mock_response = Mock() + mock_response.status_code = 401 + mock_post.return_value = mock_response + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"email": "test@example.com", "password": "wrong-pass"}): + token = attempt_auto_login() + assert token is None + + @patch("cforge.common.requests.post") + def test_attempt_auto_login_no_token_in_response(self, mock_post, mock_settings): + """Test auto-login when response doesn't contain token.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + # Mock response without token + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + mock_post.return_value = mock_response + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"email": "test@example.com", "password": "test-pass"}): + token = attempt_auto_login() + assert token is None + + @patch("cforge.common.requests.post") + def test_attempt_auto_login_request_exception(self, mock_post, mock_settings): + """Test auto-login when request raises exception.""" + from cforge.common import attempt_auto_login + from cforge.profile_utils import AuthProfile + from datetime import datetime + + mock_profile = AuthProfile( + id="test-profile", + name="Test", + email="test@example.com", + apiUrl="http://localhost:4444", + isActive=True, + createdAt=datetime.now(), + ) + + # Mock request exception + mock_post.side_effect = Exception("Connection error") + + with patch("cforge.common.get_active_profile", return_value=mock_profile): + with patch("cforge.common.load_profile_credentials", return_value={"email": "test@example.com", "password": "test-pass"}): + token = attempt_auto_login() + assert token is None + + def test_get_auth_token_with_auto_login(self, mock_settings): + """Test that get_auth_token attempts auto-login when no token is available.""" + from cforge.common import get_auth_token + + # Mock no env token and no file token, but successful auto-login + with patch("cforge.common.load_token", return_value=None): + with patch("cforge.common.attempt_auto_login", return_value="auto-token"): + token = get_auth_token() + assert token == "auto-token" + + class TestErrors: """Tests for custom error classes.""" @@ -359,7 +722,7 @@ def test_request_without_auth_succeeds_on_unauthenticated_server(self, mock_sett assert "Authorization" not in call_args[1]["headers"] assert result == {"result": "success"} - def test_request_with_bearer_token(self, mock_client) -> None: + def test_request_with_bearer_token(self, mock_client, mock_settings) -> None: """Test successful request with Bearer token.""" mock_client.reset_mock() with mock_client_login(mock_client): diff --git a/tests/test_credential_store.py b/tests/test_credential_store.py new file mode 100644 index 0000000..a44998d --- /dev/null +++ b/tests/test_credential_store.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/test_credential_store.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +Tests for credential store functionality. +""" + +# Standard +import json + +# Third-Party +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +# First-Party +from cforge.credential_store import ( + decrypt_credential_data, + get_credential_store_path, + get_encryption_key_path, + load_encryption_key, + load_profile_credentials, +) + + +class TestCredentialStorePaths: + """Tests for credential store path functions.""" + + def test_get_credential_store_path(self, mock_settings): + """Test getting the credential store path.""" + path = get_credential_store_path() + assert path == mock_settings.contextforge_home / "context-forge-credentials.json" + + def test_get_encryption_key_path(self, mock_settings): + """Test getting the encryption key path.""" + path = get_encryption_key_path() + assert path == mock_settings.contextforge_home / "context-forge-keys.json" + + +class TestEncryptionKeyLoading: + """Tests for encryption key loading.""" + + def test_load_encryption_key_success(self, mock_settings): + """Test loading encryption key successfully.""" + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_data = {"encryptionKey": "test-encryption-key-12345"} # gitleaks:allow + key_path.write_text(json.dumps(key_data), encoding="utf-8") + + key = load_encryption_key() + assert key == "test-encryption-key-12345" # gitleaks:allow + + def test_load_encryption_key_missing_file(self, mock_settings): + """Test loading encryption key when file doesn't exist.""" + key = load_encryption_key() + assert key is None + + def test_load_encryption_key_invalid_json(self, mock_settings): + """Test loading encryption key with invalid JSON.""" + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text("invalid json", encoding="utf-8") + + key = load_encryption_key() + assert key is None + + +class TestCredentialDecryption: + """Tests for credential decryption.""" + + def _encrypt_data(self, data: str, encryption_key: str) -> bytes: + """Helper to encrypt data using the same format as electron-store/conf.""" + import os + + # Generate random IV + iv = os.urandom(16) + + # Derive key using PBKDF2 + kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, + salt=iv, + iterations=10_000, + backend=default_backend(), + ) + key = kdf.derive(encryption_key.encode("utf-8")) + + # Encrypt using AES-256-CBC + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + encryptor = cipher.encryptor() + + # Add PKCS7 padding + data_bytes = data.encode("utf-8") + padding_length = 16 - (len(data_bytes) % 16) + padded_data = data_bytes + bytes([padding_length] * padding_length) + + encrypted = encryptor.update(padded_data) + encryptor.finalize() + + # Format: [IV][':'][encrypted data] + return iv + b":" + encrypted + + def test_decrypt_credential_data_success(self): + """Test successful credential decryption.""" + test_data = '{"test": "data"}' + encryption_key = "test-key-12345" + + encrypted = self._encrypt_data(test_data, encryption_key) + decrypted = decrypt_credential_data(encrypted, encryption_key) + + assert decrypted == test_data + + def test_decrypt_credential_data_wrong_key(self): + """Test decryption with wrong key fails gracefully.""" + test_data = '{"test": "data"}' + encryption_key = "test-key-12345" + wrong_key = "wrong-key-67890" # gitleaks:allow + + encrypted = self._encrypt_data(test_data, encryption_key) + decrypted = decrypt_credential_data(encrypted, wrong_key) + + # Decryption should fail and return None or not match original + assert decrypted is None or decrypted != test_data + + def test_decrypt_credential_data_invalid_format(self): + """Test decryption with invalid data format.""" + decrypted = decrypt_credential_data(b"invalid data", "test-key") + assert decrypted is None + + +class TestLoadProfileCredentials: + """Tests for loading profile credentials.""" + + def test_load_profile_credentials_success(self, mock_settings): + """Test loading profile credentials successfully.""" + profile_id = "test-profile-123" + encryption_key = "test-encryption-key" + credentials_data = {profile_id: {"email": "test@example.com", "password": "test-password"}} + + # Setup encryption key + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(json.dumps({"encryptionKey": encryption_key}), encoding="utf-8") + + # Setup encrypted credentials + cred_path = get_credential_store_path() + encrypted_data = self._encrypt_data(json.dumps(credentials_data), encryption_key) + cred_path.write_bytes(encrypted_data) + + # Load credentials + creds = load_profile_credentials(profile_id) + assert creds is not None + assert creds["email"] == "test@example.com" + assert creds["password"] == "test-password" + + def test_load_profile_credentials_no_key(self, mock_settings): + """Test loading credentials when encryption key is missing.""" + creds = load_profile_credentials("test-profile") + assert creds is None + + def test_load_profile_credentials_no_store(self, mock_settings): + """Test loading credentials when credential store is missing.""" + # Setup encryption key only + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(json.dumps({"encryptionKey": "test-key"}), encoding="utf-8") + + creds = load_profile_credentials("test-profile") + assert creds is None + + def test_load_profile_credentials_profile_not_found(self, mock_settings): + """Test loading credentials for non-existent profile.""" + encryption_key = "test-encryption-key" + credentials_data = {"other-profile": {"email": "other@example.com", "password": "other-password"}} + + # Setup encryption key + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(json.dumps({"encryptionKey": encryption_key}), encoding="utf-8") + + # Setup encrypted credentials + cred_path = get_credential_store_path() + encrypted_data = self._encrypt_data(json.dumps(credentials_data), encryption_key) + cred_path.write_bytes(encrypted_data) + + # Try to load non-existent profile + creds = load_profile_credentials("test-profile") + assert creds is None + + def test_load_profile_credentials_invalid_json(self, mock_settings): + """Test loading credentials when decrypted data is invalid JSON.""" + encryption_key = "test-encryption-key" + + # Setup encryption key + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(json.dumps({"encryptionKey": encryption_key}), encoding="utf-8") + + # Setup encrypted credentials with invalid JSON + cred_path = get_credential_store_path() + encrypted_data = self._encrypt_data("invalid json {", encryption_key) + cred_path.write_bytes(encrypted_data) + + # Try to load - should handle JSON error gracefully + creds = load_profile_credentials("test-profile") + assert creds is None + + def test_load_profile_credentials_decryption_fails(self, mock_settings): + """Test loading credentials when decryption fails.""" + encryption_key = "test-encryption-key" + + # Setup encryption key + key_path = get_encryption_key_path() + key_path.parent.mkdir(parents=True, exist_ok=True) + key_path.write_text(json.dumps({"encryptionKey": encryption_key}), encoding="utf-8") + + # Setup credential store with corrupted data + cred_path = get_credential_store_path() + cred_path.write_bytes(b"corrupted data that will fail decryption") + + # Try to load - should handle decryption failure gracefully + creds = load_profile_credentials("test-profile") + assert creds is None + + def _encrypt_data(self, data: str, encryption_key: str) -> bytes: + """Helper to encrypt data using the same format as electron-store/conf.""" + import os + + # Generate random IV + iv = os.urandom(16) + + # Derive key using PBKDF2 + kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, + salt=iv, + iterations=10_000, + backend=default_backend(), + ) + key = kdf.derive(encryption_key.encode("utf-8")) + + # Encrypt using AES-256-CBC + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + encryptor = cipher.encryptor() + + # Add PKCS7 padding + data_bytes = data.encode("utf-8") + padding_length = 16 - (len(data_bytes) % 16) + padded_data = data_bytes + bytes([padding_length] * padding_length) + + encrypted = encryptor.update(padded_data) + encryptor.finalize() + + # Format: [IV][':'][encrypted data] + return iv + b":" + encrypted diff --git a/tests/test_profile_utils.py b/tests/test_profile_utils.py new file mode 100644 index 0000000..95e5d63 --- /dev/null +++ b/tests/test_profile_utils.py @@ -0,0 +1,758 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/test_profile_utils.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 +Authors: Gabe Goodhart + +Tests for profile management utilities. +""" + +# Standard +import json +from datetime import datetime +from pathlib import Path + +# First-Party +from cforge.profile_utils import ( + AuthProfile, + DEFAULT_PROFILE_ID, + ProfileMetadata, + ProfileStore, + get_all_profiles, + get_active_profile, + get_profile, + get_profile_store_path, + load_profile_store, + save_profile_store, + set_active_profile, +) + + +class TestProfileModels: + """Tests for Pydantic profile models.""" + + def test_profile_metadata_creation(self) -> None: + """Test creating ProfileMetadata with various fields.""" + metadata = ProfileMetadata( + description="Test profile", + environment="production", + color="#FF0000", + icon="🚀", + isInternal=True, + ) + + assert metadata.description == "Test profile" + assert metadata.environment == "production" + assert metadata.color == "#FF0000" + assert metadata.icon == "🚀" + assert metadata.is_internal is True + + def test_profile_metadata_optional_fields(self) -> None: + """Test ProfileMetadata with optional fields omitted.""" + metadata = ProfileMetadata() + + assert metadata.description is None + assert metadata.environment is None + assert metadata.color is None + assert metadata.icon is None + assert metadata.is_internal is None + + def test_auth_profile_creation(self) -> None: + """Test creating AuthProfile with required fields.""" + now = datetime.now() + profile = AuthProfile( + id="profile-123", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=now, + lastUsed=now, + ) + + assert profile.id == "profile-123" + assert profile.name == "Test Profile" + assert profile.email == "test@example.com" + assert profile.api_url == "https://api.example.com" + assert profile.is_active is True + assert profile.created_at == now + assert profile.last_used == now + + def test_auth_profile_with_metadata(self) -> None: + """Test AuthProfile with metadata.""" + metadata = ProfileMetadata(environment="staging") + profile = AuthProfile( + id="profile-123", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=False, + createdAt=datetime.now(), + metadata=metadata, + ) + + assert profile.metadata is not None + assert profile.metadata.environment == "staging" + + def test_profile_store_creation(self) -> None: + """Test creating ProfileStore.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + + def test_profile_store_key_id_mismatch(self) -> None: + """Test ProfileStore validation fails when key doesn't match profile ID.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + try: + ProfileStore( + profiles={"wrong-key": profile}, # Key doesn't match profile.id + activeProfileId="profile-1", + ) + assert False, "Expected ValueError for key/id mismatch" + except ValueError as e: + assert "key/id mismatch" in str(e) + + def test_profile_store_multiple_active_profiles(self) -> None: + """Test ProfileStore validation fails when multiple profiles are active.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=True, # Both profiles are active + createdAt=datetime.now(), + ) + + try: + ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + assert False, "Expected ValueError for multiple active profiles" + except ValueError as e: + assert "Found multiple active profiles" in str(e) + + def test_profile_store_active_id_without_profiles(self) -> None: + """Test ProfileStore validation fails when active_profile_id is set without profiles.""" + try: + ProfileStore( + profiles={}, + activeProfileId="profile-1", + ) + assert False, "Expected ValueError for active_profile_id without profiles" + except ValueError as e: + assert "Cannot set active_profile_id" in str(e) + assert "without providing profiles" in str(e) + + def test_profile_store_active_id_not_in_profiles(self) -> None: + """Test ProfileStore validation fails when active_profile_id is not in profiles.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + try: + ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-2", # ID not in profiles + ) + assert False, "Expected ValueError for active_profile_id not in profiles" + except ValueError as e: + assert "not present in profiles" in str(e) + + def test_profile_store_active_id_not_marked_active(self) -> None: + """Test ProfileStore validation fails when active_profile_id profile is not marked active.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=False, # Not marked as active + createdAt=datetime.now(), + ) + + try: + ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + assert False, "Expected ValueError for active_profile_id not marked as active" + except ValueError as e: + assert "is not marked as active" in str(e) + + def test_profile_store_active_id_mismatch(self) -> None: + """Test ProfileStore validation fails when active_profile_id doesn't match the active profile.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=False, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=True, # profile-2 is active + createdAt=datetime.now(), + ) + + try: + ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", # But active_profile_id points to profile-1 + ) + assert False, "Expected ValueError for active profile ID mismatch" + except ValueError as e: + assert "is not marked as active" in str(e) + + def test_profile_store_empty(self) -> None: + """Test creating empty ProfileStore.""" + store = ProfileStore() + + assert len(store.profiles) == 0 + assert store.active_profile_id is None + + +class TestProfileStorePath: + """Tests for profile store path functions.""" + + def test_get_profile_store_path(self, mock_settings) -> None: + """Test getting the profile store path.""" + path = get_profile_store_path() + + assert isinstance(path, Path) + assert path.name == "context-forge-profiles.json" + assert mock_settings.contextforge_home == path.parent + + +class TestLoadProfileStore: + """Tests for loading profile store.""" + + def test_load_profile_store_success(self, mock_settings) -> None: + """Test successfully loading a profile store.""" + # Create a test profile store file + store_path = get_profile_store_path() + store_path.parent.mkdir(exist_ok=True) + + test_data = { + "profiles": { + "profile-1": { + "id": "profile-1", + "name": "Test Profile", + "email": "test@example.com", + "apiUrl": "https://api.example.com", + "isActive": True, + "createdAt": "2025-01-01T00:00:00", + } + }, + "activeProfileId": "profile-1", + } + + with open(store_path, "w", encoding="utf-8") as f: + json.dump(test_data, f) + + store = load_profile_store() + + assert store is not None + assert len(store.profiles) == 1 + assert store.active_profile_id == "profile-1" + assert "profile-1" in store.profiles + + def test_load_profile_store_not_found(self, mock_settings) -> None: + """Test loading when profile store doesn't exist.""" + store = load_profile_store() + + assert store is None + + def test_load_profile_store_invalid_json(self, mock_settings) -> None: + """Test loading with invalid JSON.""" + store_path = get_profile_store_path() + store_path.parent.mkdir(exist_ok=True) + + with open(store_path, "w", encoding="utf-8") as f: + f.write("invalid json {") + + store = load_profile_store() + + assert store is None + + def test_load_profile_store_invalid_schema(self, mock_settings) -> None: + """Test loading with invalid schema.""" + store_path = get_profile_store_path() + store_path.parent.mkdir(exist_ok=True) + + # Missing required fields + test_data = {"profiles": {}} + + with open(store_path, "w", encoding="utf-8") as f: + json.dump(test_data, f) + + store = load_profile_store() + + # Should still load with defaults + assert store is not None + assert len(store.profiles) == 0 + + +class TestSaveProfileStore: + """Tests for saving profile store.""" + + def test_save_profile_store_success(self, mock_settings) -> None: + """Test successfully saving a profile store.""" + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + + save_profile_store(store) + + # Verify file was created + store_path = get_profile_store_path() + assert store_path.exists() + + # Verify content + with open(store_path, "r", encoding="utf-8") as f: + data = json.load(f) + + assert "profiles" in data + assert "profile-1" in data["profiles"] + assert data["activeProfileId"] == "profile-1" + assert data["profiles"]["profile-1"]["apiUrl"] == "https://api.example.com" + + def test_save_profile_store_creates_directory(self, mock_settings) -> None: + """Test that save creates parent directory if needed.""" + store = ProfileStore() + store_path = get_profile_store_path() + + # Ensure directory doesn't exist + if store_path.parent.exists(): + import shutil + + shutil.rmtree(store_path.parent) + + save_profile_store(store) + + assert store_path.parent.exists() + assert store_path.exists() + + +class TestGetAllProfiles: + """Tests for getting all profiles.""" + + def test_get_all_profiles_success(self, mock_settings) -> None: + """Test getting all profiles (includes virtual default).""" + # Create test profiles + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + profiles = get_all_profiles() + + # Should include 2 profiles + virtual default + assert len(profiles) == 3 + assert any(p.id == "profile-1" for p in profiles) + assert any(p.id == "profile-2" for p in profiles) + assert any(p.id == DEFAULT_PROFILE_ID for p in profiles) + + def test_get_all_profiles_empty(self, mock_settings) -> None: + """Test getting profiles when none exist (returns virtual default).""" + profiles = get_all_profiles() + + # Should return only the virtual default profile + assert len(profiles) == 1 + assert profiles[0].id == DEFAULT_PROFILE_ID + assert profiles[0].name == "Local Default" + + +class TestGetProfile: + """Tests for getting a specific profile.""" + + def test_get_profile_success(self, mock_settings) -> None: + """Test getting a specific profile by ID.""" + profile = AuthProfile( + id="profile-1", + name="Test Profile", + email="test@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + result = get_profile("profile-1") + + assert result is not None + assert result.id == "profile-1" + assert result.name == "Test Profile" + + def test_get_profile_not_found(self, mock_settings) -> None: + """Test getting a profile that doesn't exist.""" + result = get_profile("nonexistent") + + assert result is None + + def test_get_profile_no_store(self, mock_settings) -> None: + """Test getting a profile when store doesn't exist.""" + result = get_profile("profile-1") + + assert result is None + + +class TestGetActiveProfile: + """Tests for getting the active profile.""" + + def test_get_active_profile_success(self, mock_settings) -> None: + """Test getting the active profile.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + result = get_active_profile() + + assert result is not None + assert result.id == "profile-1" + assert result.is_active is True + + def test_get_active_profile_none_set(self, mock_settings) -> None: + """Test getting active profile when none is set (returns virtual default).""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId=None, + ) + save_profile_store(store) + + result = get_active_profile() + + # Should return virtual default profile + assert result is not None + assert result.id == DEFAULT_PROFILE_ID + assert result.name == "Local Default" + assert result.is_active is True + + def test_get_active_profile_no_store(self, mock_settings) -> None: + """Test getting active profile when store doesn't exist (returns virtual default).""" + result = get_active_profile() + + # Should return virtual default profile + assert result is not None + assert result.id == DEFAULT_PROFILE_ID + assert result.name == "Local Default" + assert result.is_active is True + + +class TestSetActiveProfile: + """Tests for setting the active profile.""" + + def test_set_active_profile_success(self, mock_settings) -> None: + """Test successfully setting the active profile.""" + profile1 = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + profile2 = AuthProfile( + id="profile-2", + name="Profile 2", + email="user2@example.com", + apiUrl="https://api2.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile1, "profile-2": profile2}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + # Switch to profile-2 + result = set_active_profile("profile-2") + + assert result is True + + # Verify the change + updated_store = load_profile_store() + assert updated_store is not None + assert updated_store.active_profile_id == "profile-2" + assert updated_store.profiles["profile-2"].is_active is True + assert updated_store.profiles["profile-1"].is_active is False + assert updated_store.profiles["profile-2"].last_used is not None + + def test_set_active_profile_not_found(self, mock_settings) -> None: + """Test setting active profile when profile doesn't exist.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + result = set_active_profile("nonexistent") + + assert result is False + + def test_set_active_profile_no_store(self, mock_settings) -> None: + """Test setting active profile when store doesn't exist.""" + result = set_active_profile("profile-1") + + assert result is False + + def test_set_active_profile_updates_last_used(self, mock_settings) -> None: + """Test that setting active profile updates last_used timestamp.""" + profile = AuthProfile( + id="profile-1", + name="Profile 1", + email="user1@example.com", + apiUrl="https://api1.example.com", + isActive=False, + createdAt=datetime.now(), + lastUsed=None, + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId=None, + ) + save_profile_store(store) + + # Set as active + result = set_active_profile("profile-1") + + assert result is True + + # Verify last_used was updated + updated_store = load_profile_store() + assert updated_store is not None + assert updated_store.profiles["profile-1"].last_used is not None + + +class TestDesktopDefaultProfile: + """Tests for Desktop app default profile detection.""" + + def test_get_all_profiles_with_desktop_default(self, mock_settings) -> None: + """Test that virtual default is not included when Desktop default exists.""" + # Create a Desktop-created default profile + desktop_default = AuthProfile( + id="random-desktop-id", + name="Desktop Default", + email="admin@localhost", + apiUrl=f"http://{mock_settings.host}:{mock_settings.port}", + isActive=True, + createdAt=datetime.now(), + metadata=ProfileMetadata( + description="Desktop default profile", + environment="local", + is_internal=True, + ), + ) + + store = ProfileStore( + profiles={"random-desktop-id": desktop_default}, + activeProfileId="random-desktop-id", + ) + save_profile_store(store) + + profiles = get_all_profiles() + + # Should only have the Desktop default, not the virtual default + assert len(profiles) == 1 + assert profiles[0].id == "random-desktop-id" + assert profiles[0].metadata.is_internal is True + assert not any(p.id == DEFAULT_PROFILE_ID for p in profiles) + + def test_get_all_profiles_without_desktop_default(self, mock_settings) -> None: + """Test that virtual default is included when no Desktop default exists.""" + # Create a non-default profile + profile = AuthProfile( + id="profile-1", + name="Custom Profile", + email="user@example.com", + apiUrl="https://api.example.com", + isActive=True, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId="profile-1", + ) + save_profile_store(store) + + profiles = get_all_profiles() + + # Should have both the custom profile and virtual default + assert len(profiles) == 2 + assert any(p.id == "profile-1" for p in profiles) + assert any(p.id == DEFAULT_PROFILE_ID for p in profiles) + + def test_get_active_profile_with_desktop_default_inactive(self, mock_settings) -> None: + """Test that the desktop default is returned when Desktop default exists but is not active.""" + # Create a Desktop-created default profile that is NOT active + desktop_default = AuthProfile( + id="random-desktop-id", + name="Desktop Default", + email="admin@localhost", + apiUrl=f"http://{mock_settings.host}:{mock_settings.port}", + isActive=False, + createdAt=datetime.now(), + metadata=ProfileMetadata( + description="Desktop default profile", + environment="local", + is_internal=True, + ), + ) + + store = ProfileStore( + profiles={"random-desktop-id": desktop_default}, + activeProfileId=None, + ) + save_profile_store(store) + + result = get_active_profile() + + # Should return None because Desktop default exists (even if not active) + assert result == desktop_default + + def test_get_active_profile_without_desktop_default(self, mock_settings) -> None: + """Test that virtual default is returned when no Desktop default exists.""" + # Create a non-default profile that is NOT active + profile = AuthProfile( + id="profile-1", + name="Custom Profile", + email="user@example.com", + apiUrl="https://api.example.com", + isActive=False, + createdAt=datetime.now(), + ) + + store = ProfileStore( + profiles={"profile-1": profile}, + activeProfileId=None, + ) + save_profile_store(store) + + result = get_active_profile() + + # Should return virtual default + assert result is not None + assert result.id == DEFAULT_PROFILE_ID + assert result.name == "Local Default" + + def test_set_active_profile_default_no_store(self, mock_settings) -> None: + """Test setting active profile to the default with no store passes""" + result = set_active_profile(DEFAULT_PROFILE_ID) + + assert result is True