diff --git a/examples/ssh_keys.py b/examples/ssh_keys.py new file mode 100644 index 0000000..0dc6693 --- /dev/null +++ b/examples/ssh_keys.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""SSH Keys Example Script. + +This script demonstrates how to use the SSH Keys API to: +1. List all SSH keys for an organization +2. Create a new SSH key +3. Read a specific SSH key +4. Update an SSH key +5. Delete an SSH key + +IMPORTANT: SSH Keys API has special authentication requirements: +- āŒ CANNOT use Organization Tokens (AT-*) +- āœ… MUST use User Tokens or Team Tokens +- āœ… MUST have 'manage VCS settings' permission + +Before running this script: +1. Create a User Token in Terraform Cloud: + - Go to User Settings → Tokens (not Organization Settings) + - Create new token with VCS management permissions +2. Set TFE_TOKEN environment variable with your User token (not Organization token!) +3. Set TFE_ORG environment variable with your organization name +4. Set SSH_PRIVATE_KEY environment variable with your SSH private key +""" + +import os +import sys + +# Add the source directory to the path for direct execution +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from tfe import TFEClient, TFEConfig +from tfe.errors import NotFound, TFEError +from tfe.models import SSHKeyCreateOptions, SSHKeyListOptions, SSHKeyUpdateOptions + +# Configuration +TFE_TOKEN = os.getenv("TFE_TOKEN") +TFE_ORG = os.getenv("TFE_ORG") + +# SSH private key from environment variable (API expects private key, not public) +SSH_KEY_VALUE = os.getenv("SSH_PRIVATE_KEY") + + +def check_token_type(token): + """Check and validate token type for SSH Keys API.""" + print("šŸ” Token Analysis:") + if token.startswith("AT-"): + print(" Token Type: Organization Token (AT-*)") + print(" āŒ SSH Keys API does NOT support Organization Tokens") + print(" šŸ’” Please create a User Token instead") + print("") + print("šŸ”§ To create a User Token:") + print(" 1. Go to Terraform Cloud → User Settings → Tokens") + print(" 2. Create new token with VCS management permissions") + print(" 3. Replace TFE_TOKEN environment variable") + return False + elif token.startswith("TF-"): + print(" Token Type: User Token (TF-*)") + print(" āœ… SSH Keys API supports User Tokens") + return True + elif ".atlasv1." in token: + print(" Token Type: User/Team Token (.atlasv1. format)") + print(" āœ… SSH Keys API supports User/Team Tokens") + return True + else: + print(f" Token Type: Unknown format ({token[:10]}...)") + print(" šŸ’” Expected User Token (TF-*) or Team Token") + return True # Allow unknown formats to try + + +def main(): + """Main function demonstrating SSH Keys API usage.""" + + # Validate environment variables + if not TFE_TOKEN: + print("āŒ Error: TFE_TOKEN environment variable is required") + print("šŸ’” Create a User Token (not Organization Token) in Terraform Cloud") + sys.exit(1) + + if not TFE_ORG: + print("āŒ Error: TFE_ORG environment variable is required") + sys.exit(1) + + if not SSH_KEY_VALUE: + print("āŒ Error: SSH_PRIVATE_KEY environment variable is required") + print("šŸ’” Provide a valid SSH private key for testing") + sys.exit(1) + + # Check token type first + if not check_token_type(TFE_TOKEN): + sys.exit(1) + + # Initialize the TFE client + config = TFEConfig(token=TFE_TOKEN) + client = TFEClient(config) + + print(f"\nSSH Keys API Example for organization: {TFE_ORG}") + print("=" * 50) + + try: + # 1. List existing SSH keys + print("\n1. Listing SSH keys...") + ssh_keys = client.ssh_keys.list(TFE_ORG) + print(f"āœ… Found {len(ssh_keys.items)} SSH keys:") + for key in ssh_keys.items: + print(f" - ID: {key.id}, Name: {key.name}") + + # 2. Create a new SSH key + print("\n2. Creating a new SSH key...") + create_options = SSHKeyCreateOptions( + name="Python TFE Example SSH Key", value=SSH_KEY_VALUE + ) + + new_key = client.ssh_keys.create(TFE_ORG, create_options) + print(f"āœ… Created SSH key: {new_key.id} - {new_key.name}") + + # 3. Read the SSH key we just created + print("\n3. Reading the SSH key...") + read_key = client.ssh_keys.read(new_key.id) + print(f"āœ… Read SSH key: {read_key.id} - {read_key.name}") + + # 4. Update the SSH key + print("\n4. Updating the SSH key...") + update_options = SSHKeyUpdateOptions(name="Updated Python TFE Example SSH Key") + + updated_key = client.ssh_keys.update(new_key.id, update_options) + print(f"āœ… Updated SSH key: {updated_key.id} - {updated_key.name}") + + # 5. Delete the SSH key + print("\n5. Deleting the SSH key...") + client.ssh_keys.delete(new_key.id) + print(f"āœ… Deleted SSH key: {new_key.id}") + + # 6. Verify deletion by listing again + print("\n6. Verifying deletion...") + ssh_keys_after = client.ssh_keys.list(TFE_ORG) + print(f"āœ… SSH keys after deletion: {len(ssh_keys_after.items)}") + + # 7. Demonstrate pagination with options + print("\n7. Demonstrating pagination options...") + list_options = SSHKeyListOptions(page_size=5, page_number=1) + paginated_keys = client.ssh_keys.list(TFE_ORG, list_options) + print(f"āœ… Page 1 with page size 5: {len(paginated_keys.items)} keys") + print(f" Total pages: {paginated_keys.total_pages}") + print(f" Total count: {paginated_keys.total_count}") + + print("\nšŸŽ‰ SSH Keys API example completed successfully!") + + except NotFound as e: + print(f"\nāŒ SSH Keys API Error: {e}") + print("\nšŸ’” This error usually means:") + print(" - Using Organization Token (not allowed)") + print(" - SSH Keys feature not available") + print(" - Insufficient permissions") + print("\nšŸ”§ Try using a User Token instead of Organization Token") + sys.exit(1) + + except TFEError as e: + print(f"\nāŒ TFE API Error: {e}") + if hasattr(e, "status"): + if e.status == 403: + print("šŸ’” Permission denied - check token type and permissions") + elif e.status == 401: + print("šŸ’” Authentication failed - check token validity") + elif e.status == 422: + print("šŸ’” Validation error - check SSH key format") + sys.exit(1) + + except Exception as e: + print(f"\nāŒ Unexpected error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/tfe/client.py b/src/tfe/client.py index 7122448..cd4a402 100644 --- a/src/tfe/client.py +++ b/src/tfe/client.py @@ -17,6 +17,7 @@ from .resources.run_event import RunEvents from .resources.run_task import RunTasks from .resources.run_trigger import RunTriggers +from .resources.ssh_keys import SSHKeys from .resources.state_version_outputs import StateVersionOutputs from .resources.state_versions import StateVersions from .resources.variable import Variables @@ -69,5 +70,8 @@ def __init__(self, config: TFEConfig | None = None): self.query_runs = QueryRuns(self._transport) self.run_events = RunEvents(self._transport) + # SSH Keys + self.ssh_keys = SSHKeys(self._transport) + def close(self) -> None: pass diff --git a/src/tfe/errors.py b/src/tfe/errors.py index c7d81c4..341150f 100644 --- a/src/tfe/errors.py +++ b/src/tfe/errors.py @@ -99,6 +99,9 @@ class ErrStateVersionUploadNotSupported(TFEError): ... ERR_REQUIRED_PROJECT = "projects are required" ERR_PROJECT_MIN_LIMIT = "must specify at least one project" +# SSH Key Error Constants +ERR_INVALID_SSH_KEY_ID = "invalid SSH key ID" + class WorkspaceNotFound(NotFound): ... diff --git a/src/tfe/models/__init__.py b/src/tfe/models/__init__.py index 3d25df1..1ba6309 100644 --- a/src/tfe/models/__init__.py +++ b/src/tfe/models/__init__.py @@ -111,6 +111,15 @@ RegistryProviderReadOptions, ) +# Re-export all SSH key types +from .ssh_key import ( + SSHKey, + SSHKeyCreateOptions, + SSHKeyList, + SSHKeyListOptions, + SSHKeyUpdateOptions, +) + # Define what should be available when importing with * __all__ = [ # OAuth client types @@ -124,6 +133,12 @@ "OAuthClientRemoveProjectsOptions", "OAuthClientUpdateOptions", "ServiceProviderType", + # SSH key types + "SSHKey", + "SSHKeyCreateOptions", + "SSHKeyList", + "SSHKeyListOptions", + "SSHKeyUpdateOptions", # Agent and agent pool types "Agent", "AgentPool", diff --git a/src/tfe/models/ssh_key.py b/src/tfe/models/ssh_key.py new file mode 100644 index 0000000..cc853e4 --- /dev/null +++ b/src/tfe/models/ssh_key.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + + +class SSHKey(BaseModel): + """Represents an SSH key in Terraform Enterprise.""" + + model_config = ConfigDict(populate_by_name=True) + + id: str = Field(..., description="The unique identifier for this SSH key") + type: str = Field(default="ssh-keys", description="The type of this resource") + name: str = Field(..., description="A name to identify the SSH key") + + +class SSHKeyCreateOptions(BaseModel): + """Options for creating a new SSH key.""" + + model_config = ConfigDict(populate_by_name=True) + + name: str = Field(..., description="A name to identify the SSH key") + value: str = Field(..., description="The text of the SSH private key") + + +class SSHKeyUpdateOptions(BaseModel): + """Options for updating an SSH key.""" + + model_config = ConfigDict(populate_by_name=True) + + name: str | None = Field(None, description="A name to identify the SSH key") + + +class SSHKeyListOptions(BaseModel): + """Options for listing SSH keys.""" + + model_config = ConfigDict(populate_by_name=True) + + page_number: int | None = Field( + None, alias="page[number]", description="Page number to retrieve", ge=1 + ) + page_size: int | None = Field( + None, alias="page[size]", description="Number of items per page", ge=1, le=100 + ) + + +class SSHKeyList(BaseModel): + """Represents a paginated list of SSH keys.""" + + model_config = ConfigDict(populate_by_name=True) + + items: list[SSHKey] = Field(default_factory=list, description="List of SSH keys") + current_page: int | None = Field(None, description="Current page number") + total_pages: int | None = Field(None, description="Total number of pages") + prev_page: str | None = Field(None, description="URL of the previous page") + next_page: str | None = Field(None, description="URL of the next page") + total_count: int | None = Field(None, description="Total number of items") diff --git a/src/tfe/resources/ssh_keys.py b/src/tfe/resources/ssh_keys.py new file mode 100644 index 0000000..429d5fe --- /dev/null +++ b/src/tfe/resources/ssh_keys.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from typing import Any + +from ..errors import ( + InvalidOrgError, + InvalidSSHKeyIDError, +) +from ..models.ssh_key import ( + SSHKey, + SSHKeyCreateOptions, + SSHKeyList, + SSHKeyListOptions, + SSHKeyUpdateOptions, +) +from ..utils import valid_string_id +from ._base import _Service + + +class SSHKeys(_Service): + """SSH Keys API for Terraform Enterprise.""" + + def list( + self, organization: str, options: SSHKeyListOptions | None = None + ) -> SSHKeyList: + """List SSH keys for the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + + params = ( + options.model_dump(by_alias=True, exclude_none=True) if options else None + ) + + r = self.t.request( + "GET", + f"/api/v2/organizations/{organization}/ssh-keys", + params=params, + ) + + jd = r.json() + items = [] + meta = jd.get("meta", {}) + pagination = meta.get("pagination", {}) + + for d in jd.get("data", []): + items.append(self._parse_ssh_key(d)) + + return SSHKeyList( + items=items, + current_page=pagination.get("current-page"), + total_pages=pagination.get("total-pages"), + prev_page=pagination.get("prev-page"), + next_page=pagination.get("next-page"), + total_count=pagination.get("total-count"), + ) + + def create(self, organization: str, options: SSHKeyCreateOptions) -> SSHKey: + """Create a new SSH key for the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + + attrs = options.model_dump(by_alias=True, exclude_none=True) + body: dict[str, Any] = { + "data": { + "attributes": attrs, + "type": "ssh-keys", + } + } + + r = self.t.request( + "POST", + f"/api/v2/organizations/{organization}/ssh-keys", + json_body=body, + ) + + jd = r.json() + data = jd.get("data", {}) + + return self._parse_ssh_key(data) + + def read(self, ssh_key_id: str) -> SSHKey: + """Read an SSH key by its ID.""" + if not valid_string_id(ssh_key_id): + raise InvalidSSHKeyIDError() + + r = self.t.request("GET", f"/api/v2/ssh-keys/{ssh_key_id}") + + jd = r.json() + data = jd.get("data", {}) + + return self._parse_ssh_key(data) + + def update(self, ssh_key_id: str, options: SSHKeyUpdateOptions) -> SSHKey: + """Update an SSH key.""" + if not valid_string_id(ssh_key_id): + raise InvalidSSHKeyIDError() + + attrs = options.model_dump(by_alias=True, exclude_none=True) + body: dict[str, Any] = { + "data": { + "attributes": attrs, + "type": "ssh-keys", + } + } + + r = self.t.request( + "PATCH", + f"/api/v2/ssh-keys/{ssh_key_id}", + json_body=body, + ) + + jd = r.json() + data = jd.get("data", {}) + + return self._parse_ssh_key(data) + + def delete(self, ssh_key_id: str) -> None: + """Delete an SSH key.""" + if not valid_string_id(ssh_key_id): + raise InvalidSSHKeyIDError() + + self.t.request("DELETE", f"/api/v2/ssh-keys/{ssh_key_id}") + # DELETE returns 204 No Content on success + + def _parse_ssh_key(self, data: dict[str, Any]) -> SSHKey: + """Parse SSH key data from API response.""" + attrs = data.get("attributes", {}) + attrs["id"] = data.get("id") + return SSHKey.model_validate(attrs) diff --git a/tests/units/test_ssh_keys.py b/tests/units/test_ssh_keys.py new file mode 100644 index 0000000..dde405c --- /dev/null +++ b/tests/units/test_ssh_keys.py @@ -0,0 +1,76 @@ +"""Test the SSH Keys functionality.""" + +from unittest.mock import Mock + +import pytest + +from src.tfe._http import HTTPTransport +from src.tfe.errors import ( + InvalidOrgError, + InvalidSSHKeyIDError, +) +from src.tfe.models.ssh_key import ( + SSHKeyCreateOptions, + SSHKeyUpdateOptions, +) +from src.tfe.resources.ssh_keys import SSHKeys + + +class TestSSHKeyParsing: + """Test the SSH key parsing functionality.""" + + @pytest.fixture + def ssh_keys_service(self): + """Create an SSHKeys service for testing parsing.""" + mock_transport = Mock(spec=HTTPTransport) + return SSHKeys(mock_transport) + + def test_parse_ssh_key_minimal(self, ssh_keys_service): + """Test _parse_ssh_key with minimal data.""" + data = { + "id": "sshkey-123", + "type": "ssh-keys", + "attributes": { + "name": "My SSH Key", + }, + } + ssh_key = ssh_keys_service._parse_ssh_key(data) + assert ssh_key.id == "sshkey-123" + assert ssh_key.name == "My SSH Key" + + +class TestSSHKeys: + """Test the SSH Keys service.""" + + @pytest.fixture + def ssh_keys_service(self): + """Create an SSHKeys service for testing.""" + mock_transport = Mock(spec=HTTPTransport) + return SSHKeys(mock_transport) + + def test_list_ssh_keys_invalid_org(self, ssh_keys_service): + """Test listing SSH keys with invalid organization.""" + with pytest.raises(InvalidOrgError): + ssh_keys_service.list("") + + def test_create_ssh_key_invalid_org(self, ssh_keys_service): + """Test creating SSH key with invalid organization.""" + options = SSHKeyCreateOptions(name="Test", value="ssh-rsa AAAAB3...") + with pytest.raises(InvalidOrgError): + ssh_keys_service.create("", options) + + def test_read_ssh_key_invalid_id(self, ssh_keys_service): + """Test reading SSH key with invalid ID.""" + with pytest.raises(InvalidSSHKeyIDError): + ssh_keys_service.read("") + + def test_update_ssh_key_invalid_id(self, ssh_keys_service): + """Test updating SSH key with invalid ID.""" + options = SSHKeyUpdateOptions(name="Updated") + with pytest.raises(InvalidSSHKeyIDError): + ssh_keys_service.update("", options) + + def test_delete_ssh_key_invalid_id(self, ssh_keys_service): + """Test deleting SSH key with invalid ID.""" + with pytest.raises(InvalidSSHKeyIDError): + ssh_keys_service.delete("")