Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 213 additions & 0 deletions examples/oauth_token_complete_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Complete OAuth Token Testing Suite

This file contains individual tests for all 4 OAuth token functions implemented in src/tfe/resources/oauth_token.py:

FUNCTIONS AVAILABLE FOR TESTING:
1. list() - List OAuth tokens for an organization
2. read() - Read an OAuth token by its ID
3. update() - Update an existing OAuth token
4. delete() - Delete an OAuth token by its ID

USAGE:
- Uncomment specific test sections to test individual functions
- Modify test data (token IDs, SSH keys, etc.) as needed for your environment
- Ensure you have proper TFE credentials and organization access
- Note: OAuth tokens are automatically created when OAuth clients are created

PREREQUISITES:
- You need existing OAuth clients/tokens to test with
- Set TFE_TOKEN and TFE_ADDRESS environment variables
- Organization 'aayush-test' should exist with OAuth clients
"""

import os
import sys

# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))

from tfe import TFEClient, TFEConfig
from tfe.errors import NotFound
from tfe.models.oauth_token import OAuthTokenListOptions, OAuthTokenUpdateOptions


def main():
"""Test all OAuth token functions individually."""

print("=" * 80)
print("OAUTH TOKEN COMPLETE TESTING SUITE")
print("=" * 80)
print("Testing ALL 4 functions in src/tfe/resources/oauth_token.py")
print("Comprehensive test coverage for all OAuth token operations")
print("=" * 80)

# Initialize the TFE client
client = TFEClient(TFEConfig.from_env())
organization_name = "aayush-test" # Using specified organization

# Variables to store found resources for dependent tests
test_token_id = None

# =====================================================
# TEST 1: LIST OAUTH TOKENS
# =====================================================
print("\n1. Testing list() function:")
try:
# Test basic list without options
token_list = client.oauth_tokens.list(organization_name)
print(f" ✓ Found {len(token_list.items)} OAuth tokens")

# Show token details
for i, token in enumerate(token_list.items[:3], 1): # Show first 3
print(f" {i}. Token ID: {token.id}")
print(f" UID: {token.uid}")
print(f" Service Provider User: {token.service_provider_user}")
print(f" Has SSH Key: {token.has_ssh_key}")
print(f" Created: {token.created_at}")
if token.oauth_client:
print(f" OAuth Client: {token.oauth_client.id}")

# Store first token for subsequent tests
if token_list.items:
test_token_id = token_list.items[0].id
print(f"\n Using token {test_token_id} for subsequent tests")

# Test list with options
print("\n Testing list() with pagination options:")
options = OAuthTokenListOptions(page_size=10, page_number=1)
token_list_with_options = client.oauth_tokens.list(organization_name, options)
print(f" ✓ Found {len(token_list_with_options.items)} tokens with options")
if token_list_with_options.current_page:
print(f" Current page: {token_list_with_options.current_page}")
if token_list_with_options.total_count:
print(f" Total count: {token_list_with_options.total_count}")

except NotFound:
print(
" ✓ No OAuth tokens found (organization may not exist or no tokens available)"
)
except Exception as e:
print(f" ✗ Error: {e}")

# =====================================================
# TEST 2: READ OAUTH TOKEN
# =====================================================
if test_token_id:
print("\n2. Testing read() function:")
try:
token = client.oauth_tokens.read(test_token_id)
print(f" ✓ Read OAuth token: {token.id}")
print(f" UID: {token.uid}")
print(f" Service Provider User: {token.service_provider_user}")
print(f" Has SSH Key: {token.has_ssh_key}")
print(f" Created: {token.created_at}")
if token.oauth_client:
print(f" OAuth Client: {token.oauth_client.id}")

except Exception as e:
print(f" ✗ Error: {e}")
else:
print("\n2. Testing read() function:")
print(" ⚠ Skipped - No OAuth token available to read")

# =====================================================
# TEST 3: UPDATE OAUTH TOKEN
# =====================================================
if test_token_id:
print("\n3. Testing update() function:")
try:
# Test updating with SSH key
print(" Testing update with SSH key...")
ssh_key = """-----BEGIN RSA PRIVATE KEY-----
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kindly remove the hardcoded ssh key in the file and pass it as env variable.

-----END RSA PRIVATE KEY-----"""

options = OAuthTokenUpdateOptions(private_ssh_key=ssh_key)
updated_token = client.oauth_tokens.update(test_token_id, options)
print(f" ✓ Updated OAuth token: {updated_token.id}")
print(f" Has SSH Key after update: {updated_token.has_ssh_key}")

# Test updating without SSH key (no changes)
print("\n Testing update without changes...")
options_empty = OAuthTokenUpdateOptions()
updated_token_2 = client.oauth_tokens.update(test_token_id, options_empty)
print(f" ✓ Updated OAuth token (no changes): {updated_token_2.id}")

except Exception as e:
print(f" ✗ Error: {e}")
print(
" Note: This may fail if the SSH key format is invalid or constraints apply"
)
else:
print("\n3. Testing update() function:")
print(" ⚠ Skipped - No OAuth token available to update")

# =====================================================
# TEST 4: DELETE OAUTH TOKEN
# =====================================================
print("\n4. Testing delete() function:")

# Using specific OAuth token ID for deletion
delete_token_id = "ot-WQf5ARHA1Qxzo9d4"

try:
print(f" Attempting to delete OAuth token: {delete_token_id}")
client.oauth_tokens.delete(delete_token_id)
print(f" ✓ Successfully deleted OAuth token: {delete_token_id}")

# Verify deletion by trying to read the token
try:
client.oauth_tokens.read(delete_token_id)
print(" ✗ Token still exists after deletion!")
except NotFound:
print(" ✓ Confirmed token was deleted - no longer accessible")
except Exception as e:
print(f" ? Verification failed: {e}")

except Exception as e:
print(f" ✗ Error deleting token: {e}")

# Uncomment the following section ONLY if you have a disposable OAuth token
# WARNING: This will permanently delete the OAuth token!
"""
if test_token_id:
try:
print(f" Attempting to delete OAuth token: {test_token_id}")
client.oauth_tokens.delete(test_token_id)
print(f" ✓ Successfully deleted OAuth token: {test_token_id}")

# Verify deletion by trying to read the token
try:
client.oauth_tokens.read(test_token_id)
print(f" ✗ Token still exists after deletion!")
except NotFound:
print(f" ✓ Confirmed token was deleted - no longer accessible")
except Exception as e:
print(f" ? Verification failed: {e}")

except Exception as e:
print(f" ✗ Error deleting token: {e}")
else:
print(" ⚠ Skipped - No OAuth token available to delete")
"""

# =====================================================
# SUMMARY
# =====================================================
print("\n" + "=" * 80)
print("OAUTH TOKEN TESTING COMPLETE")
print("=" * 80)
print("Functions tested:")
print("✓ 1. list() - List OAuth tokens for organization")
print("✓ 2. read() - Read OAuth token by ID")
print("✓ 3. update() - Update existing OAuth token")
print("✓ 4. delete() - Delete OAuth token (testing with ot-WQf5ARHA1Qxzo9d4)")
print("")
print("All OAuth token functions have been tested!")
print("Check the output above for any errors or warnings.")
print("=" * 80)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/tfe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .resources.apply import Applies
from .resources.configuration_version import ConfigurationVersions
from .resources.oauth_client import OAuthClients
from .resources.oauth_token import OAuthTokens
from .resources.organizations import Organizations
from .resources.plan import Plans
from .resources.projects import Projects
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(self, config: TFEConfig | None = None):
ca_bundle=cfg.ca_bundle,
)
self.oauth_clients = OAuthClients(self._transport)
self.oauth_tokens = OAuthTokens(self._transport)
# Agent resources
self.agent_pools = AgentPools(self._transport)
self.agents = Agents(self._transport)
Expand Down
3 changes: 3 additions & 0 deletions src/tfe/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class ErrStateVersionUploadNotSupported(TFEError): ...
ERR_REQUIRED_PROJECT = "projects are required"
ERR_PROJECT_MIN_LIMIT = "must specify at least one project"

# OAuth Token Error Constants
ERR_INVALID_OAUTH_TOKEN_ID = "invalid OAuth token ID"

# SSH Key Error Constants
ERR_INVALID_SSH_KEY_ID = "invalid SSH key ID"

Expand Down
13 changes: 13 additions & 0 deletions src/tfe/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
ServiceProviderType,
)

# Re-export all OAuth token types
from .oauth_token import (
OAuthToken,
OAuthTokenList,
OAuthTokenListOptions,
OAuthTokenUpdateOptions,
)

# Re-export all query run types
from .query_run import (
QueryRun,
Expand Down Expand Up @@ -133,6 +141,11 @@
"OAuthClientRemoveProjectsOptions",
"OAuthClientUpdateOptions",
"ServiceProviderType",
# OAuth token types
"OAuthToken",
"OAuthTokenList",
"OAuthTokenListOptions",
"OAuthTokenUpdateOptions",
# SSH key types
"SSHKey",
"SSHKeyCreateOptions",
Expand Down
69 changes: 69 additions & 0 deletions src/tfe/models/oauth_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING

from pydantic import BaseModel, ConfigDict, Field

if TYPE_CHECKING:
from .oauth_client import OAuthClient


class OAuthToken(BaseModel):
"""OAuth token represents a VCS configuration including the associated OAuth token."""

model_config = ConfigDict(extra="forbid")

id: str = Field(..., description="OAuth token ID")
uid: str = Field(..., description="OAuth token UID")
created_at: datetime = Field(..., description="Creation timestamp")
has_ssh_key: bool = Field(..., description="Whether the token has an SSH key")
service_provider_user: str = Field(..., description="Service provider user")

# Relationships
oauth_client: OAuthClient | None = Field(
None, description="The associated OAuth client"
)


class OAuthTokenList(BaseModel):
"""List of OAuth tokens with pagination information."""

model_config = ConfigDict(extra="forbid")

items: list[OAuthToken] = Field(default_factory=list, description="OAuth tokens")
current_page: int | None = Field(None, description="Current page number")
prev_page: int | None = Field(None, description="Previous page number")
next_page: int | None = Field(None, description="Next page number")
total_pages: int | None = Field(None, description="Total number of pages")
total_count: int | None = Field(None, description="Total count of items")


class OAuthTokenListOptions(BaseModel):
"""Options for listing OAuth tokens."""

model_config = ConfigDict(extra="forbid")

page_number: int | None = Field(None, description="Page number")
page_size: int | None = Field(None, description="Page size")


class OAuthTokenUpdateOptions(BaseModel):
"""Options for updating an OAuth token."""

model_config = ConfigDict(extra="forbid")

private_ssh_key: str | None = Field(
None, description="A private SSH key to be used for git clone operations"
)


# Rebuild models to resolve forward references
try:
from .oauth_client import OAuthClient # noqa: F401

OAuthToken.model_rebuild()
OAuthTokenList.model_rebuild()
except ImportError:
# If OAuthClient is not available, create a dummy class
pass
Loading
Loading