diff --git a/examples/oauth_token_complete_test.py b/examples/oauth_token_complete_test.py new file mode 100644 index 0000000..fba05b8 --- /dev/null +++ b/examples/oauth_token_complete_test.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python3 +""" +Complete OAuth Token Testing Suite + +This file contains individual tests for all 4 OAuth token functions implemented in src/tfe/resources/oauth_token.py: + +FUNCTIONS AVAILABLE FOR TESTING: +1. list() - List OAuth tokens for an organization +2. read() - Read an OAuth token by its ID +3. update() - Update an existing OAuth token +4. delete() - Delete an OAuth token by its ID + +USAGE: +- Uncomment specific test sections to test individual functions +- Modify test data (token IDs, SSH keys, etc.) as needed for your environment +- Ensure you have proper TFE credentials and organization access +- Note: OAuth tokens are automatically created when OAuth clients are created + +PREREQUISITES: +- You need existing OAuth clients/tokens to test with +- Set TFE_TOKEN and TFE_ADDRESS environment variables +- Organization 'aayush-test' should exist with OAuth clients +""" + +import os +import sys + +# Add the src directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from tfe import TFEClient, TFEConfig +from tfe.errors import NotFound +from tfe.models.oauth_token import OAuthTokenListOptions, OAuthTokenUpdateOptions + + +def main(): + """Test all OAuth token functions individually.""" + + print("=" * 80) + print("OAUTH TOKEN COMPLETE TESTING SUITE") + print("=" * 80) + print("Testing ALL 4 functions in src/tfe/resources/oauth_token.py") + print("Comprehensive test coverage for all OAuth token operations") + print("=" * 80) + + # Initialize the TFE client + client = TFEClient(TFEConfig.from_env()) + organization_name = "aayush-test" # Using specified organization + + # Variables to store found resources for dependent tests + test_token_id = None + + # ===================================================== + # TEST 1: LIST OAUTH TOKENS + # ===================================================== + print("\n1. Testing list() function:") + try: + # Test basic list without options + token_list = client.oauth_tokens.list(organization_name) + print(f" ✓ Found {len(token_list.items)} OAuth tokens") + + # Show token details + for i, token in enumerate(token_list.items[:3], 1): # Show first 3 + print(f" {i}. Token ID: {token.id}") + print(f" UID: {token.uid}") + print(f" Service Provider User: {token.service_provider_user}") + print(f" Has SSH Key: {token.has_ssh_key}") + print(f" Created: {token.created_at}") + if token.oauth_client: + print(f" OAuth Client: {token.oauth_client.id}") + + # Store first token for subsequent tests + if token_list.items: + test_token_id = token_list.items[0].id + print(f"\n Using token {test_token_id} for subsequent tests") + + # Test list with options + print("\n Testing list() with pagination options:") + options = OAuthTokenListOptions(page_size=10, page_number=1) + token_list_with_options = client.oauth_tokens.list(organization_name, options) + print(f" ✓ Found {len(token_list_with_options.items)} tokens with options") + if token_list_with_options.current_page: + print(f" Current page: {token_list_with_options.current_page}") + if token_list_with_options.total_count: + print(f" Total count: {token_list_with_options.total_count}") + + except NotFound: + print( + " ✓ No OAuth tokens found (organization may not exist or no tokens available)" + ) + except Exception as e: + print(f" ✗ Error: {e}") + + # ===================================================== + # TEST 2: READ OAUTH TOKEN + # ===================================================== + if test_token_id: + print("\n2. Testing read() function:") + try: + token = client.oauth_tokens.read(test_token_id) + print(f" ✓ Read OAuth token: {token.id}") + print(f" UID: {token.uid}") + print(f" Service Provider User: {token.service_provider_user}") + print(f" Has SSH Key: {token.has_ssh_key}") + print(f" Created: {token.created_at}") + if token.oauth_client: + print(f" OAuth Client: {token.oauth_client.id}") + + except Exception as e: + print(f" ✗ Error: {e}") + else: + print("\n2. Testing read() function:") + print(" ⚠ Skipped - No OAuth token available to read") + + # ===================================================== + # TEST 3: UPDATE OAUTH TOKEN + # ===================================================== + if test_token_id: + print("\n3. Testing update() function:") + try: + # Test updating with SSH key + print(" Testing update with SSH key...") + ssh_key = """-----BEGIN RSA PRIVATE KEY----- +-----END RSA PRIVATE KEY-----""" + + options = OAuthTokenUpdateOptions(private_ssh_key=ssh_key) + updated_token = client.oauth_tokens.update(test_token_id, options) + print(f" ✓ Updated OAuth token: {updated_token.id}") + print(f" Has SSH Key after update: {updated_token.has_ssh_key}") + + # Test updating without SSH key (no changes) + print("\n Testing update without changes...") + options_empty = OAuthTokenUpdateOptions() + updated_token_2 = client.oauth_tokens.update(test_token_id, options_empty) + print(f" ✓ Updated OAuth token (no changes): {updated_token_2.id}") + + except Exception as e: + print(f" ✗ Error: {e}") + print( + " Note: This may fail if the SSH key format is invalid or constraints apply" + ) + else: + print("\n3. Testing update() function:") + print(" ⚠ Skipped - No OAuth token available to update") + + # ===================================================== + # TEST 4: DELETE OAUTH TOKEN + # ===================================================== + print("\n4. Testing delete() function:") + + # Using specific OAuth token ID for deletion + delete_token_id = "ot-WQf5ARHA1Qxzo9d4" + + try: + print(f" Attempting to delete OAuth token: {delete_token_id}") + client.oauth_tokens.delete(delete_token_id) + print(f" ✓ Successfully deleted OAuth token: {delete_token_id}") + + # Verify deletion by trying to read the token + try: + client.oauth_tokens.read(delete_token_id) + print(" ✗ Token still exists after deletion!") + except NotFound: + print(" ✓ Confirmed token was deleted - no longer accessible") + except Exception as e: + print(f" ? Verification failed: {e}") + + except Exception as e: + print(f" ✗ Error deleting token: {e}") + + # Uncomment the following section ONLY if you have a disposable OAuth token + # WARNING: This will permanently delete the OAuth token! + """ + if test_token_id: + try: + print(f" Attempting to delete OAuth token: {test_token_id}") + client.oauth_tokens.delete(test_token_id) + print(f" ✓ Successfully deleted OAuth token: {test_token_id}") + + # Verify deletion by trying to read the token + try: + client.oauth_tokens.read(test_token_id) + print(f" ✗ Token still exists after deletion!") + except NotFound: + print(f" ✓ Confirmed token was deleted - no longer accessible") + except Exception as e: + print(f" ? Verification failed: {e}") + + except Exception as e: + print(f" ✗ Error deleting token: {e}") + else: + print(" ⚠ Skipped - No OAuth token available to delete") + """ + + # ===================================================== + # SUMMARY + # ===================================================== + print("\n" + "=" * 80) + print("OAUTH TOKEN TESTING COMPLETE") + print("=" * 80) + print("Functions tested:") + print("✓ 1. list() - List OAuth tokens for organization") + print("✓ 2. read() - Read OAuth token by ID") + print("✓ 3. update() - Update existing OAuth token") + print("✓ 4. delete() - Delete OAuth token (testing with ot-WQf5ARHA1Qxzo9d4)") + print("") + print("All OAuth token functions have been tested!") + print("Check the output above for any errors or warnings.") + print("=" * 80) + + +if __name__ == "__main__": + main() diff --git a/src/tfe/client.py b/src/tfe/client.py index cd4a402..c8f8922 100644 --- a/src/tfe/client.py +++ b/src/tfe/client.py @@ -7,6 +7,7 @@ from .resources.apply import Applies from .resources.configuration_version import ConfigurationVersions from .resources.oauth_client import OAuthClients +from .resources.oauth_token import OAuthTokens from .resources.organizations import Organizations from .resources.plan import Plans from .resources.projects import Projects @@ -43,6 +44,7 @@ def __init__(self, config: TFEConfig | None = None): ca_bundle=cfg.ca_bundle, ) self.oauth_clients = OAuthClients(self._transport) + self.oauth_tokens = OAuthTokens(self._transport) # Agent resources self.agent_pools = AgentPools(self._transport) self.agents = Agents(self._transport) diff --git a/src/tfe/errors.py b/src/tfe/errors.py index 341150f..596ed68 100644 --- a/src/tfe/errors.py +++ b/src/tfe/errors.py @@ -99,6 +99,9 @@ class ErrStateVersionUploadNotSupported(TFEError): ... ERR_REQUIRED_PROJECT = "projects are required" ERR_PROJECT_MIN_LIMIT = "must specify at least one project" +# OAuth Token Error Constants +ERR_INVALID_OAUTH_TOKEN_ID = "invalid OAuth token ID" + # SSH Key Error Constants ERR_INVALID_SSH_KEY_ID = "invalid SSH key ID" diff --git a/src/tfe/models/__init__.py b/src/tfe/models/__init__.py index 1ba6309..f565816 100644 --- a/src/tfe/models/__init__.py +++ b/src/tfe/models/__init__.py @@ -51,6 +51,14 @@ ServiceProviderType, ) +# Re-export all OAuth token types +from .oauth_token import ( + OAuthToken, + OAuthTokenList, + OAuthTokenListOptions, + OAuthTokenUpdateOptions, +) + # Re-export all query run types from .query_run import ( QueryRun, @@ -133,6 +141,11 @@ "OAuthClientRemoveProjectsOptions", "OAuthClientUpdateOptions", "ServiceProviderType", + # OAuth token types + "OAuthToken", + "OAuthTokenList", + "OAuthTokenListOptions", + "OAuthTokenUpdateOptions", # SSH key types "SSHKey", "SSHKeyCreateOptions", diff --git a/src/tfe/models/oauth_token.py b/src/tfe/models/oauth_token.py new file mode 100644 index 0000000..c6b004b --- /dev/null +++ b/src/tfe/models/oauth_token.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict, Field + +if TYPE_CHECKING: + from .oauth_client import OAuthClient + + +class OAuthToken(BaseModel): + """OAuth token represents a VCS configuration including the associated OAuth token.""" + + model_config = ConfigDict(extra="forbid") + + id: str = Field(..., description="OAuth token ID") + uid: str = Field(..., description="OAuth token UID") + created_at: datetime = Field(..., description="Creation timestamp") + has_ssh_key: bool = Field(..., description="Whether the token has an SSH key") + service_provider_user: str = Field(..., description="Service provider user") + + # Relationships + oauth_client: OAuthClient | None = Field( + None, description="The associated OAuth client" + ) + + +class OAuthTokenList(BaseModel): + """List of OAuth tokens with pagination information.""" + + model_config = ConfigDict(extra="forbid") + + items: list[OAuthToken] = Field(default_factory=list, description="OAuth tokens") + current_page: int | None = Field(None, description="Current page number") + prev_page: int | None = Field(None, description="Previous page number") + next_page: int | None = Field(None, description="Next page number") + total_pages: int | None = Field(None, description="Total number of pages") + total_count: int | None = Field(None, description="Total count of items") + + +class OAuthTokenListOptions(BaseModel): + """Options for listing OAuth tokens.""" + + model_config = ConfigDict(extra="forbid") + + page_number: int | None = Field(None, description="Page number") + page_size: int | None = Field(None, description="Page size") + + +class OAuthTokenUpdateOptions(BaseModel): + """Options for updating an OAuth token.""" + + model_config = ConfigDict(extra="forbid") + + private_ssh_key: str | None = Field( + None, description="A private SSH key to be used for git clone operations" + ) + + +# Rebuild models to resolve forward references +try: + from .oauth_client import OAuthClient # noqa: F401 + + OAuthToken.model_rebuild() + OAuthTokenList.model_rebuild() +except ImportError: + # If OAuthClient is not available, create a dummy class + pass diff --git a/src/tfe/resources/oauth_token.py b/src/tfe/resources/oauth_token.py new file mode 100644 index 0000000..fb25074 --- /dev/null +++ b/src/tfe/resources/oauth_token.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any +from urllib.parse import quote + +from ..errors import ERR_INVALID_OAUTH_TOKEN_ID, ERR_INVALID_ORG +from ..models.oauth_token import ( + OAuthToken, + OAuthTokenList, + OAuthTokenListOptions, + OAuthTokenUpdateOptions, +) +from ..utils import encode_query, valid_string_id +from ._base import _Service + + +class OAuthTokens(_Service): + """OAuth tokens service for managing VCS OAuth tokens.""" + + def list( + self, organization: str, options: OAuthTokenListOptions | None = None + ) -> OAuthTokenList: + """List all the OAuth tokens for a given organization.""" + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/oauth-tokens" + params = {} + + if options: + if options.page_number: + params["page[number]"] = str(options.page_number) + if options.page_size: + params["page[size]"] = str(options.page_size) + + query_string = encode_query(params) + full_path = f"{path}{query_string}" + + response = self.t.request("GET", full_path) + data = response.json() + + tokens = [] + if "data" in data: + for item in data["data"]: + tokens.append(self._parse_oauth_token(item)) + + # Parse pagination metadata + pagination = {} + if "meta" in data: + meta = data["meta"] + if "pagination" in meta: + page_info = meta["pagination"] + pagination = { + "current_page": page_info.get("current-page"), + "prev_page": page_info.get("prev-page"), + "next_page": page_info.get("next-page"), + "total_pages": page_info.get("total-pages"), + "total_count": page_info.get("total-count"), + } + + return OAuthTokenList(items=tokens, **pagination) + + def read(self, oauth_token_id: str) -> OAuthToken: + """Read an OAuth token by its ID.""" + if not valid_string_id(oauth_token_id): + raise ValueError(ERR_INVALID_OAUTH_TOKEN_ID) + + path = f"/api/v2/oauth-tokens/{quote(oauth_token_id)}" + response = self.t.request("GET", path) + data = response.json() + + if "data" in data: + return self._parse_oauth_token(data["data"]) + + raise ValueError("Invalid response format") + + def update( + self, oauth_token_id: str, options: OAuthTokenUpdateOptions + ) -> OAuthToken: + """Update an existing OAuth token.""" + if not valid_string_id(oauth_token_id): + raise ValueError(ERR_INVALID_OAUTH_TOKEN_ID) + + body: dict[str, Any] = { + "data": { + "type": "oauth-tokens", + "attributes": {}, + } + } + + if options.private_ssh_key is not None: + body["data"]["attributes"]["ssh-key"] = options.private_ssh_key + + path = f"/api/v2/oauth-tokens/{quote(oauth_token_id)}" + response = self.t.request("PATCH", path, json_body=body) + data = response.json() + + if "data" in data: + return self._parse_oauth_token(data["data"]) + + raise ValueError("Invalid response format") + + def delete(self, oauth_token_id: str) -> None: + """Delete an OAuth token by its ID.""" + if not valid_string_id(oauth_token_id): + raise ValueError(ERR_INVALID_OAUTH_TOKEN_ID) + + path = f"/api/v2/oauth-tokens/{quote(oauth_token_id)}" + self.t.request("DELETE", path) + + def _parse_oauth_token(self, data: dict[str, Any]) -> OAuthToken: + """Parse OAuth token data from API response.""" + attributes = data.get("attributes", {}) + + # Parse creation timestamp + created_at_str = attributes.get("created-at") + created_at = ( + datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + if created_at_str + else datetime.now() + ) + + # Parse OAuth client relationship + oauth_client = None + # For now, just set to None since it's mainly for display + # The actual relationship data would require more complex parsing + + return OAuthToken( + id=data.get("id", ""), + uid=attributes.get("uid", ""), + created_at=created_at, + has_ssh_key=attributes.get("has-ssh-key", False), + service_provider_user=attributes.get("service-provider-user", ""), + oauth_client=oauth_client, + ) diff --git a/tests/units/test_oauth_token.py b/tests/units/test_oauth_token.py new file mode 100644 index 0000000..135548f --- /dev/null +++ b/tests/units/test_oauth_token.py @@ -0,0 +1,324 @@ +""" +Comprehensive unit tests for OAuth token operations in the Python TFE SDK. + +This test suite covers all OAuth token methods including list, read, update, and delete operations. +""" + +from datetime import datetime +from unittest.mock import Mock + +import pytest + +from src.tfe._http import HTTPTransport +from src.tfe.errors import ( + ERR_INVALID_OAUTH_TOKEN_ID, + ERR_INVALID_ORG, +) +from src.tfe.models.oauth_token import ( + OAuthTokenListOptions, + OAuthTokenUpdateOptions, +) +from src.tfe.resources.oauth_token import OAuthTokens + + +class TestOAuthTokenParsing: + """Test the OAuth token parsing functionality.""" + + @pytest.fixture + def oauth_tokens_service(self): + """Create an OAuthTokens service for testing parsing.""" + mock_transport = Mock(spec=HTTPTransport) + return OAuthTokens(mock_transport) + + def test_parse_oauth_token_minimal(self, oauth_tokens_service): + """Test _parse_oauth_token with minimal data.""" + data = { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": False, + "service-provider-user": "testuser", + }, + "relationships": {}, + } + + result = oauth_tokens_service._parse_oauth_token(data) + + assert result.id == "ot-test123" + assert result.uid == "uid-test123" + assert isinstance(result.created_at, datetime) + assert result.has_ssh_key is False + assert result.service_provider_user == "testuser" + assert result.oauth_client is None + + def test_parse_oauth_token_with_oauth_client(self, oauth_tokens_service): + """Test _parse_oauth_token with OAuth client relationship.""" + data = { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": True, + "service-provider-user": "testuser", + }, + "relationships": { + "oauth-client": { + "data": { + "id": "oc-client123", + "type": "oauth-clients", + } + } + }, + } + + result = oauth_tokens_service._parse_oauth_token(data) + + assert result.id == "ot-test123" + assert result.has_ssh_key is True + # For now, oauth_client relationship parsing is not implemented + assert result.oauth_client is None + + def test_parse_oauth_token_empty_relationships(self, oauth_tokens_service): + """Test _parse_oauth_token with empty relationships.""" + data = { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": False, + "service-provider-user": "testuser", + }, + "relationships": {"oauth-client": {"data": None}}, + } + + result = oauth_tokens_service._parse_oauth_token(data) + + assert result.id == "ot-test123" + assert result.oauth_client is None + + +class TestOAuthTokens: + """Test the OAuthTokens service methods.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport for testing.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def oauth_tokens_service(self, mock_transport): + """Create an OAuthTokens service with mocked transport.""" + return OAuthTokens(mock_transport) + + def test_list_oauth_tokens_basic(self, oauth_tokens_service, mock_transport): + """Test listing OAuth tokens without options.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": [ + { + "id": "ot-test1", + "attributes": { + "uid": "uid-test1", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": False, + "service-provider-user": "testuser1", + }, + "relationships": {}, + }, + { + "id": "ot-test2", + "attributes": { + "uid": "uid-test2", + "created-at": "2023-01-02T00:00:00Z", + "has-ssh-key": True, + "service-provider-user": "testuser2", + }, + "relationships": {}, + }, + ], + "meta": { + "pagination": { + "current-page": 1, + "prev-page": None, + "next-page": None, + "total-pages": 1, + "total-count": 2, + } + }, + } + mock_transport.request.return_value = mock_response + + result = oauth_tokens_service.list("test-org") + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/organizations/test-org/oauth-tokens" + ) + assert len(result.items) == 2 + assert result.items[0].id == "ot-test1" + assert result.items[1].id == "ot-test2" + assert result.current_page == 1 + assert result.total_count == 2 + + def test_list_oauth_tokens_with_options(self, oauth_tokens_service, mock_transport): + """Test listing OAuth tokens with pagination options.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": [], + "meta": {"pagination": {"current-page": 2}}, + } + mock_transport.request.return_value = mock_response + + options = OAuthTokenListOptions(page_number=2, page_size=50) + oauth_tokens_service.list("test-org", options) + + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/organizations/test-org/oauth-tokens?page[number]=2&page[size]=50", + ) + + def test_list_oauth_tokens_invalid_org(self, oauth_tokens_service): + """Test listing OAuth tokens with invalid organization ID.""" + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + oauth_tokens_service.list("") + + def test_read_oauth_token_success(self, oauth_tokens_service, mock_transport): + """Test reading an OAuth token successfully.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": False, + "service-provider-user": "testuser", + }, + "relationships": {}, + } + } + mock_transport.request.return_value = mock_response + + result = oauth_tokens_service.read("ot-test123") + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/oauth-tokens/ot-test123" + ) + assert result.id == "ot-test123" + assert result.uid == "uid-test123" + + def test_read_oauth_token_invalid_id(self, oauth_tokens_service): + """Test reading an OAuth token with invalid ID.""" + with pytest.raises(ValueError, match=ERR_INVALID_OAUTH_TOKEN_ID): + oauth_tokens_service.read("") + + def test_update_oauth_token_success(self, oauth_tokens_service, mock_transport): + """Test updating an OAuth token successfully.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": True, + "service-provider-user": "testuser", + }, + "relationships": {}, + } + } + mock_transport.request.return_value = mock_response + + options = OAuthTokenUpdateOptions(private_ssh_key="test-ssh-key") + result = oauth_tokens_service.update("ot-test123", options) + + expected_body = { + "data": { + "type": "oauth-tokens", + "attributes": { + "ssh-key": "test-ssh-key", + }, + } + } + mock_transport.request.assert_called_once_with( + "PATCH", "/api/v2/oauth-tokens/ot-test123", json_body=expected_body + ) + assert result.id == "ot-test123" + assert result.has_ssh_key is True + + def test_update_oauth_token_no_ssh_key(self, oauth_tokens_service, mock_transport): + """Test updating an OAuth token without SSH key.""" + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "ot-test123", + "attributes": { + "uid": "uid-test123", + "created-at": "2023-01-01T00:00:00Z", + "has-ssh-key": False, + "service-provider-user": "testuser", + }, + "relationships": {}, + } + } + mock_transport.request.return_value = mock_response + + options = OAuthTokenUpdateOptions() + result = oauth_tokens_service.update("ot-test123", options) + + expected_body = { + "data": { + "type": "oauth-tokens", + "attributes": {}, + } + } + mock_transport.request.assert_called_once_with( + "PATCH", "/api/v2/oauth-tokens/ot-test123", json_body=expected_body + ) + assert result.id == "ot-test123" + + def test_update_oauth_token_invalid_id(self, oauth_tokens_service): + """Test updating an OAuth token with invalid ID.""" + options = OAuthTokenUpdateOptions() + with pytest.raises(ValueError, match=ERR_INVALID_OAUTH_TOKEN_ID): + oauth_tokens_service.update("", options) + + def test_delete_oauth_token_success(self, oauth_tokens_service, mock_transport): + """Test deleting an OAuth token successfully.""" + oauth_tokens_service.delete("ot-test123") + + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/oauth-tokens/ot-test123" + ) + + def test_delete_oauth_token_invalid_id(self, oauth_tokens_service): + """Test deleting an OAuth token with invalid ID.""" + with pytest.raises(ValueError, match=ERR_INVALID_OAUTH_TOKEN_ID): + oauth_tokens_service.delete("") + + +class TestOAuthTokenValidation: + """Test OAuth token validation functionality.""" + + @pytest.fixture + def oauth_tokens_service(self): + """Create an OAuthTokens service for testing validation.""" + mock_transport = Mock(spec=HTTPTransport) + return OAuthTokens(mock_transport) + + def test_oauth_token_list_options(self, oauth_tokens_service): + """Test OAuth token list options creation.""" + options = OAuthTokenListOptions(page_number=1, page_size=25) + + assert options.page_number == 1 + assert options.page_size == 25 + + def test_oauth_token_update_options(self, oauth_tokens_service): + """Test OAuth token update options creation.""" + options = OAuthTokenUpdateOptions(private_ssh_key="test-key") + + assert options.private_ssh_key == "test-key" + + # Test with no SSH key + options_empty = OAuthTokenUpdateOptions() + assert options_empty.private_ssh_key is None