From c8ba1d648f0a7fb6a04859ffe8b8e3394c95465a Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Fri, 19 Sep 2025 13:48:01 +0530 Subject: [PATCH] Run Tasks and Run Triggers API Specs --- examples/run_task.py | 231 ++++++++++++++ examples/run_trigger.py | 237 +++++++++++++++ src/tfe/client.py | 4 + src/tfe/errors.py | 112 +++++-- src/tfe/models/agentpool.py | 7 + src/tfe/models/organization.py | 215 +++++++++++++ src/tfe/models/run_task.py | 93 ++++++ src/tfe/models/run_trigger.py | 52 ++++ src/tfe/models/workspace.py | 14 + src/tfe/models/workspace_run_task.py | 7 + src/tfe/resources/_base.py | 2 +- src/tfe/resources/run_task.py | 290 ++++++++++++++++++ src/tfe/resources/run_trigger.py | 167 ++++++++++ src/tfe/utils.py | 2 +- tests/units/test_run_task.py | 436 +++++++++++++++++++++++++++ tests/units/test_run_trigger.py | 365 ++++++++++++++++++++++ 16 files changed, 2211 insertions(+), 23 deletions(-) create mode 100644 examples/run_task.py create mode 100644 examples/run_trigger.py create mode 100644 src/tfe/models/agentpool.py create mode 100644 src/tfe/models/organization.py create mode 100644 src/tfe/models/run_task.py create mode 100644 src/tfe/models/run_trigger.py create mode 100644 src/tfe/models/workspace.py create mode 100644 src/tfe/models/workspace_run_task.py create mode 100644 src/tfe/resources/run_task.py create mode 100644 src/tfe/resources/run_trigger.py create mode 100644 tests/units/test_run_task.py create mode 100644 tests/units/test_run_trigger.py diff --git a/examples/run_task.py b/examples/run_task.py new file mode 100644 index 0000000..0418564 --- /dev/null +++ b/examples/run_task.py @@ -0,0 +1,231 @@ +import time +import traceback + +from tfe import TFEClient, TFEConfig +from tfe.models.run_task import ( + RunTaskCreateOptions, + RunTaskIncludeOptions, + RunTaskListOptions, + RunTaskReadOptions, + RunTaskUpdateOptions, +) + + +def run_task_list(client, org_name): + """Test run task list with all options combined.""" + print(f"=== Testing Run Task List Comprehensive Options for '{org_name}' ===") + + # List run tasks with all options + print("\n1. Listing Run Tasks with All Options Combined:") + try: + options = RunTaskListOptions( + page_number=1, + page_size=10, + include=[ + RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS, + RunTaskIncludeOptions.RUN_TASK_WORKSPACE, + ], + ) + + run_task_list = client.run_tasks.list(org_name, options) + run_tasks = list(run_task_list) + print(f" ✓ Found {len(run_tasks)} run tasks with comprehensive options") + + for i, task in enumerate(run_tasks, 1): + print(f" {i:2d}. {task.name}") + print(f" URL: {task.url}") + print(f" Category: {task.category}") + print(f" Enabled: {task.enabled}") + + # Show description if available + if task.description: + print(f" Description: {task.description}") + + # Show global configuration details + if task.global_configuration: + gc = task.global_configuration + print(" Global Config:") + print(f" - Enabled: {gc.enabled}") + print(f" - Enforcement: {gc.enforcement_level.value}") + if gc.stages: + stages = [stage.value for stage in gc.stages] + print(f" - Stages: {', '.join(stages)}") + + # Show relationships + if task.organization: + print(f" Organization: {task.organization.id}") + + if task.workspace_run_tasks: + print( + f" Workspace Run Tasks: {len(task.workspace_run_tasks)} items" + ) + + if task.agent_pool: + print(f" Agent Pool: {task.agent_pool.id}") + + except Exception as e: + print(f" Error listing run tasks comprehensively: {e}") + traceback.print_exc() + + +def run_task_create(client, org_name): + """Create a comprehensive run task that demonstrates all available features.""" + print("\n=== Creating Comprehensive Demonstration Run Task ===") + + try: + timestamp = int(time.time()) + + # Create the most comprehensive example possible + options = RunTaskCreateOptions( + name=f"comprehensive-demo-{timestamp}", + url="https://httpbin.org/post", + category="task", + description="A comprehensive demonstration task showcasing all available features and configurations", + enabled=True, + hmac_key=f"demo-secret-key-{timestamp}", + ) + + print("\n2. Creating task with the following configuration:") + created_task = client.run_tasks.create(org_name, options) + + print("\n ✓ Successfully created comprehensive run task!") + print(f" Task Name: {created_task.name}") + print(f" Task ID: {created_task.id}") + print(f" URL: {created_task.url}") + print(f" Enabled: {created_task.enabled}") + print(f" Description: {created_task.description}") + + # Display additional details + if created_task.organization: + print(f" Organization: {created_task.organization.id}") + + if created_task.hmac_key: + print(" HMAC Key: ***configured***") + + return created_task.id, created_task.name + + except Exception as e: + print(f" ✗ Error creating comprehensive run task: {e}") + return None, None + + +def run_task_read(client, task_id, task_name): + """Read and display details of a specific run task.""" + try: + print(f"\n4. Reading Run Task '{task_name}' (ID: {task_id})") + read_task = client.run_tasks.read(task_id) + + print("\n ✓ Successfully read run task:") + print(f" Task Name: {read_task.name}") + print(f" Task ID: {read_task.id}") + print(f" URL: {read_task.url}") + print(f" Category: {read_task.category}") + print(f" Enabled: {read_task.enabled}") + print(f" Description: {read_task.description or 'None'}") + print(f" HMAC Key: {'[SET]' if read_task.hmac_key else 'None'}") + + if read_task.organization: + print(f" Organization: {read_task.organization.id}") + + except Exception as e: + print(f" ✗ Error reading run task '{task_name}': {e}") + traceback.print_exc() + + +def run_task_read_with_options(client, task_id, task_name): + """Read a specific run task with include options.""" + try: + options = RunTaskReadOptions( + include=[RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS] + ) + print( + f"\n5. Reading Run Task '{task_name}' (ID: {task_id}) with includes: {options}" + ) + read_task_with_option = client.run_tasks.read_with_options(task_id, options) + + print("\n ✓ Successfully read run task with includes:") + print(f" Task Name: {read_task_with_option.name}") + print(f" Task ID: {read_task_with_option.id}") + print(f" URL: {read_task_with_option.url}") + print(f" Category: {read_task_with_option.category}") + + if RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS in options.include: + print( + " (Workspace tasks relationship data would be included in API response)" + ) + + if RunTaskIncludeOptions.RUN_TASK_WORKSPACE in options.include: + print(" (Workspace data would be included in API response)") + + except Exception as e: + print(f" ✗ Error reading run task '{task_name}' with includes: {e}") + traceback.print_exc() + + +def run_task_update(client, task_id): + """Update various fields of a specific run task.""" + print(f"\n=== Updating Run Task (ID: {task_id}) with Various Configurations ===") + + try: + # Update basic fields + print("\n3. Updating basic fields (name, description, url)...") + update_options = RunTaskUpdateOptions( + name=f"updated-name-{int(time.time())}", + description="Updated description for the run task", + url="https://httpbin.org/anything", + ) + updated_task = client.run_tasks.update(task_id, update_options) + + print(" Successfully updated basic fields:") + print(f" Name: {updated_task.name}") + print(f" Description: {updated_task.description}") + print(f" URL: {updated_task.url}") + + except Exception as e: + print(f" Error updating basic fields: {e}") + + +def run_task_delete(client, task_id, task_name): + """Delete a specific run task.""" + try: + print(f"\n6. Deleting Run Task '{task_name}' (ID: {task_id})") + client.run_tasks.delete(task_id) + print(f"\n ✓ Successfully deleted run task: {task_name} (ID: {task_id})") + return True + + except Exception as e: + print(f" ✗ Error deleting run task '{task_name}': {e}") + return False + + +def main(): + """Main function to demonstrate comprehensive run task list operations.""" + print("Run Task List - Comprehensive Example") + print("=" * 50) + + # Initialize client + config = TFEConfig() + client = TFEClient(config) + + # Replace 'your-org-name' with an actual organization name + org_name = "your-org-name" + + print(f"Using organization: {org_name}") + + try: + # Test comprehensive list operations + run_task_list(client, org_name) + task_id, task_name = run_task_create(client, org_name) + if task_id: + run_task_update(client, task_id) + if task_id and task_name: + run_task_read(client, task_id, task_name) + run_task_read_with_options(client, task_id, task_name) + run_task_delete(client, task_id, task_name) + + except Exception as e: + print(f"\n Example failed: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/run_trigger.py b/examples/run_trigger.py new file mode 100644 index 0000000..e0eec19 --- /dev/null +++ b/examples/run_trigger.py @@ -0,0 +1,237 @@ +import time +import traceback + +from tfe import TFEClient, TFEConfig +from tfe.models.run_trigger import ( + RunTriggerCreateOptions, + RunTriggerFilterOp, + RunTriggerIncludeOp, + RunTriggerListOptions, +) +from tfe.models.workspace import Workspace + + +def run_trigger_list(client, workspace_id): + """Test run trigger list with all options combined.""" + print( + f"=== Testing Run Trigger List Comprehensive Options for workspace '{workspace_id}' ===" + ) + + print("\n1. Listing Run Triggers with options:") + try: + options = RunTriggerListOptions( + page_number=1, + page_size=10, + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_INBOUND, + include=[ + RunTriggerIncludeOp.RUN_TRIGGER_WORKSPACE, + RunTriggerIncludeOp.RUN_TRIGGER_SOURCEABLE, + ], + ) + + run_trigger_list = client.run_triggers.list(workspace_id, options) + run_triggers = list(run_trigger_list) + print( + f" ✓ Found {len(run_triggers)} inbound run triggers with comprehensive options" + ) + + for i, trigger in enumerate(run_triggers, 1): + print( + f" {i:2d}. Source: {trigger.sourceable_name} → Target: {trigger.workspace_name}" + ) + print(f" Trigger ID: {trigger.id}") + print(f" Created: {trigger.created_at}") + + # Show sourceable workspace details if available + if trigger.sourceable: + print( + f" Source Workspace: {trigger.sourceable.name} (ID: {trigger.sourceable.id})" + ) + if trigger.sourceable.organization: + print( + f" Source Organization: {trigger.sourceable.organization}" + ) + + # Show target workspace details if available + if trigger.workspace: + print( + f" Target Workspace: {trigger.workspace.name} (ID: {trigger.workspace.id})" + ) + if trigger.workspace.organization: + print( + f" Target Organization: {trigger.workspace.organization}" + ) + + # Also try listing outbound triggers (without include params - not supported) + print("\n Listing Outbound Run Triggers:") + outbound_options = RunTriggerListOptions( + page_number=1, + page_size=5, + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_OUTBOUND, + ) + + outbound_triggers = list( + client.run_triggers.list(workspace_id, outbound_options) + ) + print(f" ✓ Found {len(outbound_triggers)} outbound run triggers") + + for i, trigger in enumerate(outbound_triggers, 1): + print( + f" {i:2d}. Source: {trigger.sourceable_name} → Target: {trigger.workspace_name}" + ) + + except Exception as e: + print(f" Error listing run triggers comprehensively: {e}") + traceback.print_exc() + + +def run_trigger_create(client, workspace_id, source_workspace_id): + """Create a comprehensive run trigger that demonstrates all available features.""" + print( + f"\n=== Creating Run Trigger from workspace '{source_workspace_id}' to '{workspace_id}' ===" + ) + + try: + source_workspace = Workspace( + id=source_workspace_id, + name=f"source-workspace-{int(time.time())}", + organization="prab-sandbox01", # This would typically be the actual org name + ) + + options = RunTriggerCreateOptions(sourceable=source_workspace) + + print("\n2. Creating run trigger with the following configuration:") + + created_trigger = client.run_triggers.create(workspace_id, options) + + print("\n ✓ Successfully created run trigger!") + print(f" Trigger ID: {created_trigger.id}") + print(f" Source: {created_trigger.sourceable_name}") + print(f" Target: {created_trigger.workspace_name}") + print(f" Created At: {created_trigger.created_at}") + + # Display additional details + if created_trigger.sourceable: + print( + f" Source Workspace: {created_trigger.sourceable.name} (ID: {created_trigger.sourceable.id})" + ) + + if created_trigger.workspace: + print( + f" Target Workspace: {created_trigger.workspace.name} (ID: {created_trigger.workspace.id})" + ) + + return ( + created_trigger.id, + created_trigger.sourceable_name, + created_trigger.workspace_name, + ) + + except Exception as e: + print(f" Error creating run trigger: {e}") + traceback.print_exc() + return None, None, None + + +def run_trigger_read(client, trigger_id, source_name, target_name): + """Read and display details of a specific run trigger.""" + try: + print( + f"\n3. Reading Run Trigger '{source_name} → {target_name}' (ID: {trigger_id})" + ) + read_trigger = client.run_triggers.read(trigger_id) + + print("\n ✓ Successfully read run trigger:") + print(f" Trigger ID: {read_trigger.id}") + print(f" Type: {read_trigger.type}") + print(f" Source: {read_trigger.sourceable_name}") + print(f" Target: {read_trigger.workspace_name}") + print(f" Created At: {read_trigger.created_at}") + + # Show detailed workspace information + if read_trigger.sourceable: + print(" Source Workspace Details:") + print(f" - Name: {read_trigger.sourceable.name}") + print(f" - ID: {read_trigger.sourceable.id}") + if read_trigger.sourceable.organization: + print(f" - Organization: {read_trigger.sourceable.organization}") + + if read_trigger.workspace: + print(" Target Workspace Details:") + print(f" - Name: {read_trigger.workspace.name}") + print(f" - ID: {read_trigger.workspace.id}") + if read_trigger.workspace.organization: + print(f" - Organization: {read_trigger.workspace.organization}") + + # Show sourceable choice if available + if read_trigger.sourceable_choice and read_trigger.sourceable_choice.workspace: + choice_ws = read_trigger.sourceable_choice.workspace + print(" Sourceable Choice Workspace:") + print(f" - Name: {choice_ws.name}") + print(f" - ID: {choice_ws.id}") + + except Exception as e: + print(f" Error reading run trigger '{source_name} → {target_name}': {e}") + traceback.print_exc() + + +def run_trigger_delete(client, trigger_id, source_name, target_name): + """Delete a specific run trigger.""" + try: + print( + f"\n4. Deleting Run Trigger '{source_name} → {target_name}' (ID: {trigger_id})" + ) + client.run_triggers.delete(trigger_id) + print( + f"\n ✓ Successfully deleted run trigger: {source_name} → {target_name} (ID: {trigger_id})" + ) + return True + + except Exception as e: + print(f" Error deleting run trigger '{source_name} → {target_name}': {e}") + traceback.print_exc() + return False + + +def main(): + """Main function to demonstrate comprehensive run trigger operations.""" + print("Run Trigger - Comprehensive Example") + print("=" * 50) + + # Initialize client + config = TFEConfig() + client = TFEClient(config) + + # Replace these with actual workspace IDs from your organization + target_workspace_id = "target_workspace_id" # Workspace that will receive triggers + source_workspace_id = "source_workspace_id" # Workspace that will trigger runs + + print(f"Using target workspace: {target_workspace_id}") + print(f"Using source workspace: {source_workspace_id}") + print( + "\nNOTE: Please replace these with actual workspace IDs from your organization" + ) + + try: + # Test comprehensive list operations + run_trigger_list(client, target_workspace_id) + + # Create a new run trigger + trigger_id, source_name, target_name = run_trigger_create( + client, target_workspace_id, source_workspace_id + ) + + # Read the created trigger + if trigger_id: + run_trigger_read(client, trigger_id, source_name, target_name) + + # Clean up - delete the created trigger + run_trigger_delete(client, trigger_id, source_name, target_name) + + except Exception as e: + print(f"\nExample failed: {e}") + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/src/tfe/client.py b/src/tfe/client.py index e7a2f92..1bea139 100644 --- a/src/tfe/client.py +++ b/src/tfe/client.py @@ -5,6 +5,8 @@ from .resources.organizations import Organizations from .resources.projects import Projects from .resources.registry_module import RegistryModules +from .resources.run_task import RunTasks +from .resources.run_trigger import RunTriggers from .resources.state_version_outputs import StateVersionOutputs from .resources.state_versions import StateVersions from .resources.variable import Variables @@ -36,6 +38,8 @@ def __init__(self, config: TFEConfig | None = None): self.state_versions = StateVersions(self._transport) self.state_version_outputs = StateVersionOutputs(self._transport) + self.run_tasks = RunTasks(self._transport) + self.run_triggers = RunTriggers(self._transport) def close(self) -> None: pass diff --git a/src/tfe/errors.py b/src/tfe/errors.py index be17b8a..8b6d82c 100644 --- a/src/tfe/errors.py +++ b/src/tfe/errors.py @@ -106,13 +106,6 @@ class WorkspaceValidationError(ValidationError): pass -class RequiredNameError(WorkspaceValidationError): - """Raised when workspace name is required but not provided.""" - - def __init__(self) -> None: - super().__init__("name is required") - - class InvalidNameError(WorkspaceValidationError): """Raised when workspace name is invalid.""" @@ -182,20 +175,6 @@ def __init__(self) -> None: # Parameter validation errors -class InvalidOrgError(WorkspaceValidationError): - """Raised when organization parameter is invalid.""" - - def __init__(self) -> None: - super().__init__("invalid value for organization") - - -class InvalidWorkspaceIDError(WorkspaceValidationError): - """Raised when workspace ID parameter is invalid.""" - - def __init__(self) -> None: - super().__init__("invalid value for workspace ID") - - class InvalidWorkspaceValueError(WorkspaceValidationError): """Raised when workspace name parameter is invalid.""" @@ -243,3 +222,94 @@ class MissingTagBindingIdentifierError(WorkspaceValidationError): def __init__(self) -> None: super().__init__("TagBindings are required") + + +class InvalidOrgError(InvalidValues): + """Raised when an invalid run task ID is provided.""" + + def __init__(self, message: str = "invalid value for organization"): + super().__init__(message) + + +class InvalidWorkspaceIDError(InvalidValues): + """Raised when an invalid run workspace ID is provided.""" + + def __init__(self, message: str = "invalid value for workspace ID"): + super().__init__(message) + + +class RequiredNameError(RequiredFieldMissing): + """Raised when a required name field is missing.""" + + def __init__(self, message: str = "name is required"): + super().__init__(message) + + +# Run Task errors +class InvalidRunTaskIDError(InvalidValues): + """Raised when an invalid run task ID is provided.""" + + def __init__(self, message: str = "invalid value for run task ID"): + super().__init__(message) + + +class InvalidRunTaskNameError(InvalidValues): + """Raised when an invalid run task name is provided.""" + + pass + + +class InvalidRunTaskURLError(InvalidValues): + """Raised when an invalid run task URL is provided.""" + + def __init__(self, message: str = "invalid url for run task URL"): + super().__init__(message) + + +class InvalidRunTaskCategoryError(InvalidValues): + """Raised when an invalid run task category is provided.""" + + def __init__(self, message: str = 'category must be "task"'): + super().__init__(message) + + +# Run Trigger errors +class RequiredRunTriggerListOpsError(RequiredFieldMissing): + """Raised when required run trigger list options are missing.""" + + def __init__(self, message: str = "required run trigger list options"): + super().__init__(message) + + +class RequiredSourceableError(RequiredFieldMissing): + """Raised when a required sourceable field is missing.""" + + def __init__(self, message: str = "sourceable is required"): + super().__init__(message) + + +class InvalidRunTriggerTypeError(InvalidValues): + """Raised when an invalid run trigger type is provided.""" + + def __init__( + self, + message: str = 'invalid value or no value for RunTriggerType. It must be either "inbound" or "outbound"', + ): + super().__init__(message) + + +class UnsupportedRunTriggerTypeError(InvalidValues): + """Raised when an unsupported run trigger type is provided with include params.""" + + def __init__( + self, + message: str = 'unsupported RunTriggerType. It must be "inbound" when requesting "include" query params', + ): + super().__init__(message) + + +class InvalidRunTriggerIDError(InvalidValues): + """Raised when an invalid run trigger ID is provided.""" + + def __init__(self, message: str = "invalid value for run trigger ID"): + super().__init__(message) diff --git a/src/tfe/models/agentpool.py b/src/tfe/models/agentpool.py new file mode 100644 index 0000000..2c1780c --- /dev/null +++ b/src/tfe/models/agentpool.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class AgentPool(BaseModel): + id: str diff --git a/src/tfe/models/organization.py b/src/tfe/models/organization.py new file mode 100644 index 0000000..8450ddf --- /dev/null +++ b/src/tfe/models/organization.py @@ -0,0 +1,215 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum + +from pydantic import BaseModel, Field + + +class OrganizationUpdateOptions(BaseModel): + name: str | None = None + email: str | None = None + assessments_enforced: bool | None = None + collaborator_auth_policy: str | None = None + cost_estimation_enabled: bool | None = None + default_execution_mode: str | None = None + external_id: str | None = None + is_unified: bool | None = None + owners_team_saml_role_id: str | None = None + permissions: dict | None = None + saml_enabled: bool | None = None + session_remember: int | None = None + session_timeout: int | None = None + two_factor_conformant: bool | None = None + send_passing_statuses_for_untriggered_speculative_plans: bool | None = None + remaining_testable_count: int | None = None + speculative_plan_management_enabled: bool | None = None + aggregated_commit_status_enabled: bool | None = None + allow_force_delete_workspaces: bool | None = None + default_project: dict | None = None + default_agent_pool: dict | None = None + data_retention_policy: dict | None = None + data_retention_policy_choice: dict | None = None + + +class OrganizationCreateOptions(BaseModel): + name: str | None = None + email: str | None = None + assessments_enforced: bool | None = None + collaborator_auth_policy: str | None = None + cost_estimation_enabled: bool | None = None + default_execution_mode: str | None = None + external_id: str | None = None + is_unified: bool | None = None + owners_team_saml_role_id: str | None = None + permissions: dict | None = None + saml_enabled: bool | None = None + session_remember: int | None = None + session_timeout: int | None = None + two_factor_conformant: bool | None = None + send_passing_statuses_for_untriggered_speculative_plans: bool | None = None + remaining_testable_count: int | None = None + speculative_plan_management_enabled: bool | None = None + aggregated_commit_status_enabled: bool | None = None + allow_force_delete_workspaces: bool | None = None + default_project: dict | None = None + default_agent_pool: dict | None = None + data_retention_policy: dict | None = None + data_retention_policy_choice: dict | None = None + + +class ExecutionMode(str, Enum): + REMOTE = "remote" + AGENT = "agent" + LOCAL = "local" + + +class RunStatus(str, Enum): + PLANNING = "planning" + PLANNED = "planned" + APPLIED = "applied" + CANCELED = "canceled" + ERRORED = "errored" + + +class Organization(BaseModel): + name: str | None = None + assessments_enforced: bool | None = None + collaborator_auth_policy: str | None = None + cost_estimation_enabled: bool | None = None + created_at: datetime | None = None + default_execution_mode: str | None = None + email: str | None = None + external_id: str | None = None + id: str | None = None + is_unified: bool | None = None + owners_team_saml_role_id: str | None = None + permissions: dict | None = None + saml_enabled: bool | None = None + session_remember: int | None = None + session_timeout: int | None = None + trial_expires_at: datetime | None = None + two_factor_conformant: bool | None = None + send_passing_statuses_for_untriggered_speculative_plans: bool | None = None + remaining_testable_count: int | None = None + speculative_plan_management_enabled: bool | None = None + aggregated_commit_status_enabled: bool | None = None + allow_force_delete_workspaces: bool | None = None + default_project: dict | None = None + default_agent_pool: dict | None = None + data_retention_policy: dict | None = None + data_retention_policy_choice: dict | None = None + + +class Project(BaseModel): + id: str + name: str + organization: str + + +class Capacity(BaseModel): + organization: str + pending: int + running: int + + +class Entitlements(BaseModel): + id: str + agents: bool | None = None + audit_logging: bool | None = None + cost_estimation: bool | None = None + global_run_tasks: bool | None = None + operations: bool | None = None + private_module_registry: bool | None = None + private_run_tasks: bool | None = None + run_tasks: bool | None = None + sso: bool | None = None + sentinel: bool | None = None + state_storage: bool | None = None + teams: bool | None = None + vcs_integrations: bool | None = None + waypoint_actions: bool | None = None + waypoint_templates_and_addons: bool | None = None + + +class Run(BaseModel): + id: str + status: RunStatus + # Add other Run fields as needed + + +class Pagination(BaseModel): + current_page: int + total_count: int + # Add other pagination fields as needed + + +class RunQueue(BaseModel): + pagination: Pagination | None = None + items: list[Run] = Field(default_factory=list) + + +class ReadRunQueueOptions(BaseModel): + # List options for pagination + page_number: int | None = None + page_size: int | None = None + + +class DataRetentionPolicy(BaseModel): + """Deprecated: Use DataRetentionPolicyDeleteOlder instead.""" + + id: str + delete_older_than_n_days: int + + +class DataRetentionPolicyDeleteOlder(BaseModel): + id: str + delete_older_than_n_days: int + + +class DataRetentionPolicyDontDelete(BaseModel): + id: str + + +class DataRetentionPolicyChoice(BaseModel): + """Polymorphic data retention policy choice.""" + + data_retention_policy: DataRetentionPolicy | None = None + data_retention_policy_delete_older: DataRetentionPolicyDeleteOlder | None = None + data_retention_policy_dont_delete: DataRetentionPolicyDontDelete | None = None + + def is_populated(self) -> bool: + """Returns whether one of the choices is populated.""" + return ( + self.data_retention_policy is not None + or self.data_retention_policy_delete_older is not None + or self.data_retention_policy_dont_delete is not None + ) + + def convert_to_legacy_struct(self) -> DataRetentionPolicy | None: + """Convert the DataRetentionPolicyChoice to the legacy DataRetentionPolicy struct.""" + if not self.is_populated(): + return None + + if self.data_retention_policy is not None: + return self.data_retention_policy + elif self.data_retention_policy_delete_older is not None: + return DataRetentionPolicy( + id=self.data_retention_policy_delete_older.id, + delete_older_than_n_days=self.data_retention_policy_delete_older.delete_older_than_n_days, + ) + return None + + +class DataRetentionPolicySetOptions(BaseModel): + """Deprecated: Use DataRetentionPolicyDeleteOlderSetOptions instead.""" + + delete_older_than_n_days: int + + +class DataRetentionPolicyDeleteOlderSetOptions(BaseModel): + delete_older_than_n_days: int + + +class DataRetentionPolicyDontDeleteSetOptions(BaseModel): + pass # No additional fields needed diff --git a/src/tfe/models/run_task.py b/src/tfe/models/run_task.py new file mode 100644 index 0000000..afb83b3 --- /dev/null +++ b/src/tfe/models/run_task.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from enum import Enum + +from pydantic import BaseModel, Field + +from ..types import Pagination +from .agentpool import AgentPool +from .organization import Organization +from .workspace_run_task import WorkspaceRunTask + + +class RunTask(BaseModel): + id: str + name: str + description: str | None = None + url: str + category: str + hmac_key: str | None = None + enabled: bool + global_configuration: GlobalRunTask | None = None + + agent_pool: AgentPool | None = None + organization: Organization | None = None + workspace_run_tasks: list[WorkspaceRunTask] = Field(default_factory=list) + + +class GlobalRunTask(BaseModel): + enabled: bool + stages: list[Stage] = Field(default_factory=list) + enforcement_level: TaskEnforcementLevel + + +class GlobalRunTaskOptions(BaseModel): + enabled: bool | None = None + stages: list[Stage] | None = Field(default_factory=list) + enforcement_level: TaskEnforcementLevel | None = None + + +class Stage(str, Enum): + PRE_PLAN = "pre-plan" + POST_PLAN = "post-plan" + PRE_APPLY = "pre-apply" + POST_APPLY = "post-apply" + + +class TaskEnforcementLevel(str, Enum): + ADVISORY = "advisory" + MANDATORY = "mandatory" + + +class RunTaskIncludeOptions(str, Enum): + RUN_TASK_WORKSPACE_TASKS = "workspace_tasks" + RUN_TASK_WORKSPACE = "workspace_tasks.workspace" + + +class RunTaskList(BaseModel): + items: list[RunTask] = Field(default_factory=list) + pagination: Pagination | None = None + + +class RunTaskListOptions(BaseModel): + page_number: int | None = None + page_size: int | None = None + include: list[RunTaskIncludeOptions] | None = Field(default_factory=list) + + +class RunTaskReadOptions(BaseModel): + include: list[RunTaskIncludeOptions] | None = Field(default_factory=list) + + +class RunTaskCreateOptions(BaseModel): + type: str = Field(default="tasks") + name: str + description: str | None = None + url: str + category: str + hmac_key: str | None = None + enabled: bool = True + global_configuration: GlobalRunTaskOptions | None = None + agent_pool: AgentPool | None = None + + +class RunTaskUpdateOptions(BaseModel): + type: str = Field(default="tasks") + name: str | None = None + description: str | None = None + url: str | None = None + category: str | None = None + hmac_key: str | None = None + enabled: bool | None = None + global_configuration: GlobalRunTaskOptions | None = None + agent_pool: AgentPool | None = None diff --git a/src/tfe/models/run_trigger.py b/src/tfe/models/run_trigger.py new file mode 100644 index 0000000..bc3d55a --- /dev/null +++ b/src/tfe/models/run_trigger.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum + +from pydantic import BaseModel, Field + +from ..types import Pagination +from .workspace import Workspace + + +class RunTrigger(BaseModel): + id: str + type: str = Field(default="run-triggers") + created_at: datetime + sourceable_name: str + workspace_name: str + + sourceable: Workspace + sourceable_choice: SourceableChoice + workspace: Workspace + + +class RunTriggerListOptions(BaseModel): + page_number: int | None = Field(default=1) + page_size: int | None = Field(default=20) + run_trigger_type: RunTriggerFilterOp + include: list[RunTriggerIncludeOp] | None = Field(default_factory=list) + + +class RunTriggerCreateOptions(BaseModel): + type: str = Field(default="run-triggers") + sourceable: Workspace + + +class RunTriggerList(BaseModel): + items: list[RunTrigger] + pagination: Pagination + + +class SourceableChoice(BaseModel): + workspace: Workspace + + +class RunTriggerFilterOp(str, Enum): + RUN_TRIGGER_OUTBOUND = "outbound" + RUN_TRIGGER_INBOUND = "inbound" + + +class RunTriggerIncludeOp(str, Enum): + RUN_TRIGGER_WORKSPACE = "workspace" + RUN_TRIGGER_SOURCEABLE = "sourceable" diff --git a/src/tfe/models/workspace.py b/src/tfe/models/workspace.py new file mode 100644 index 0000000..98b5ae7 --- /dev/null +++ b/src/tfe/models/workspace.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from pydantic import BaseModel, Field + +from .organization import ExecutionMode + + +class Workspace(BaseModel): + id: str + name: str + organization: str + execution_mode: ExecutionMode | None = None + project_id: str | None = None + tags: list[str] = Field(default_factory=list) diff --git a/src/tfe/models/workspace_run_task.py b/src/tfe/models/workspace_run_task.py new file mode 100644 index 0000000..b5072a6 --- /dev/null +++ b/src/tfe/models/workspace_run_task.py @@ -0,0 +1,7 @@ +from __future__ import annotations + +from pydantic import BaseModel + + +class WorkspaceRunTask(BaseModel): + id: str diff --git a/src/tfe/resources/_base.py b/src/tfe/resources/_base.py index fdcb99c..c14ebba 100644 --- a/src/tfe/resources/_base.py +++ b/src/tfe/resources/_base.py @@ -27,7 +27,7 @@ def _list( data = json_response.get("data", []) yield from data - if len(data) < p["page[size]"]: + if len(data) < int(p["page[size]"]): break page += 1 diff --git a/src/tfe/resources/run_task.py b/src/tfe/resources/run_task.py new file mode 100644 index 0000000..3fd2937 --- /dev/null +++ b/src/tfe/resources/run_task.py @@ -0,0 +1,290 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +from ..errors import ( + InvalidOrgError, + InvalidRunTaskCategoryError, + InvalidRunTaskIDError, + InvalidRunTaskURLError, + RequiredNameError, +) +from ..models.agentpool import AgentPool +from ..models.organization import Organization +from ..models.run_task import ( + GlobalRunTask, + RunTask, + RunTaskCreateOptions, + RunTaskListOptions, + RunTaskReadOptions, + RunTaskUpdateOptions, + Stage, + TaskEnforcementLevel, +) +from ..models.workspace_run_task import WorkspaceRunTask +from ..utils import valid_string, valid_string_id +from ._base import _Service + + +def _safe_str(v: Any, default: str = "") -> str: + return v if isinstance(v, str) else (str(v) if v is not None else default) + + +def _run_task_from(d: dict[str, Any], org: str | None = None) -> RunTask: + """ + Convert JSON API response data to RunTask object. + + Maps the JSON API format to Python model fields, handling: + - Basic attributes (id, name, url, etc.) + - Optional fields (description, hmac_key) + - Global configuration object + - Relationships (agent_pool, organization, workspace_run_tasks) + """ + attr: dict[str, Any] = d.get("attributes", {}) or {} + relationships: dict[str, Any] = d.get("relationships", {}) or {} + + id_str: str = _safe_str(d.get("id")) + name_str: str = _safe_str(attr.get("name")) + + # Handle global configuration if present + global_config = None + raw_global = attr.get("global-configuration") + if raw_global and isinstance(raw_global, dict): + # Check if enabled exists - if not, no global config + if "enabled" in raw_global and isinstance(raw_global["enabled"], bool): + stages = [] + if "stages" in raw_global and isinstance(raw_global["stages"], list): + stages = [ + Stage(stage) + for stage in raw_global["stages"] + if isinstance(stage, str) + ] + + enforcement_level = TaskEnforcementLevel.ADVISORY # Default value + if "enforcement-level" in raw_global and isinstance( + raw_global["enforcement-level"], str + ): + try: + enforcement_level = TaskEnforcementLevel( + raw_global["enforcement-level"] + ) + except ValueError: + # If invalid enforcement level, use default + enforcement_level = TaskEnforcementLevel.ADVISORY + + global_config = GlobalRunTask( + enabled=raw_global["enabled"], + stages=stages, + enforcement_level=enforcement_level, + ) + + # Handle agent pool relationship + agent_pool = None + agent_pool_data = relationships.get("agent-pool", {}).get("data") + if agent_pool_data and isinstance(agent_pool_data, dict): + # Create minimal AgentPool object from relationship data + agent_pool = AgentPool(id=_safe_str(agent_pool_data.get("id"))) + + # Handle organization relationship + organization = None + org_data = relationships.get("organization", {}).get("data") + if org_data and isinstance(org_data, dict): + # Create minimal Organization object from relationship data + organization = Organization( + id=_safe_str(org_data.get("id")), + name=org or None, # Use org parameter or None + email=None, # Not available in relationship data + ) + + # Handle workspace run tasks relationship + workspace_run_tasks = [] + wrt_data = relationships.get("workspace-tasks", {}).get("data", []) + if isinstance(wrt_data, list): + # Note: Full WorkspaceRunTask objects would need to be fetched separately + # Here we just create minimal objects with IDs + for item in wrt_data: + if isinstance(item, dict) and "id" in item: + workspace_run_tasks.append( + WorkspaceRunTask(id=_safe_str(item.get("id"))) + ) + + return RunTask( + id=id_str, + name=name_str, + description=_safe_str(attr.get("description")) or None, + url=_safe_str(attr.get("url")), + category=_safe_str(attr.get("category")), + hmac_key=attr.get("hmac-key"), # Can be None + enabled=bool(attr.get("enabled")), + global_configuration=global_config, + agent_pool=agent_pool, + organization=organization, + workspace_run_tasks=workspace_run_tasks, + ) + + +class RunTasks(_Service): + def list( + self, organization_id: str, options: RunTaskListOptions | None = None + ) -> Iterator[RunTask]: + if not valid_string_id(organization_id): + raise InvalidOrgError() + + if options is None: + options = RunTaskListOptions() + + params: dict[str, str] = {} + if options.page_size is not None: + params["page[size]"] = str(options.page_size) + if options.page_number is not None: + params["page[number]"] = str(options.page_number) + if options.include: + params["include"] = ",".join(options.include) + + path = f"/api/v2/organizations/{organization_id}/tasks" + for item in self._list(path, params=params): + yield _run_task_from(item, organization_id) + + def create(self, organization_id: str, options: RunTaskCreateOptions) -> RunTask: + if not valid_string_id(organization_id): + raise InvalidOrgError() + if not valid_string(options.name): + raise RequiredNameError() + if not valid_string(options.url): + raise InvalidRunTaskURLError() + if options.category != "task": + raise InvalidRunTaskCategoryError("Invalid run task category; must be task") + body: dict[str, Any] = { + "data": { + "type": "tasks", + "attributes": { + "name": options.name, + "url": options.url, + "category": options.category, + "description": options.description or "", + }, + }, + } + if options.hmac_key is not None: + body["data"]["attributes"]["hmac_key"] = options.hmac_key + if options.enabled is not None: + body["data"]["attributes"]["enabled"] = options.enabled + if options.global_configuration is not None: + gc = options.global_configuration + gc_dict: dict[str, Any] = {} + if gc.enabled is not None: + gc_dict["enabled"] = gc.enabled + if gc.stages is not None: + gc_dict["stages"] = [stage.value for stage in gc.stages] + if gc.enforcement_level is not None: + gc_dict["enforcement-level"] = gc.enforcement_level.value + body["data"]["attributes"]["global-configuration"] = gc_dict + if options.agent_pool is not None and options.agent_pool.id: + body["data"]["relationships"] = { + "agent_pool": { + "data": {"type": "agent_pools", "id": options.agent_pool.id} + } + } + + r = self.t.request( + "POST", + f"/api/v2/organizations/{organization_id}/tasks", + json_body=body, + ) + return _run_task_from(r.json()["data"], organization_id) + + def read(self, run_task_id: str) -> RunTask: + return self.read_with_options(run_task_id, RunTaskReadOptions()) + + def read_with_options( + self, run_task_id: str, options: RunTaskReadOptions + ) -> RunTask: + if not valid_string_id(run_task_id): + raise InvalidRunTaskIDError() + params: dict[str, str] = {} + if options.include: + params["include"] = ",".join(options.include) + + path = f"/api/v2/tasks/{run_task_id}" + r = self.t.request("GET", path, params=params) + return _run_task_from(r.json()["data"]) + + def update(self, run_task_id: str, options: RunTaskUpdateOptions) -> RunTask: + if not valid_string_id(run_task_id): + raise InvalidRunTaskIDError("Invalid run task ID") + if options.name is not None and not valid_string(options.name): + raise RequiredNameError() + if options.url is not None and not valid_string(options.url): + raise InvalidRunTaskURLError() + if options.category is not None and options.category != "task": + raise InvalidRunTaskCategoryError("Invalid run task category; must be task") + body: dict[str, Any] = { + "data": {"type": "tasks", "id": run_task_id, "attributes": {}} + } + if options.name is not None: + body["data"]["attributes"]["name"] = options.name + if options.url is not None: + body["data"]["attributes"]["url"] = options.url + if options.category is not None: + body["data"]["attributes"]["category"] = options.category + if options.description is not None: + body["data"]["attributes"]["description"] = options.description + if options.hmac_key is not None: + body["data"]["attributes"]["hmac_key"] = options.hmac_key + if options.enabled is not None: + body["data"]["attributes"]["enabled"] = options.enabled + if options.global_configuration is not None: + gc = options.global_configuration + gc_dict: dict[str, Any] = {} + if gc.enabled is not None: + gc_dict["enabled"] = gc.enabled + if gc.stages is not None: + gc_dict["stages"] = [stage.value for stage in gc.stages] + if gc.enforcement_level is not None: + gc_dict["enforcement-level"] = gc.enforcement_level.value + body["data"]["attributes"]["global-configuration"] = gc_dict + if options.agent_pool is not None: + body["data"].setdefault("relationships", {}) + if options.agent_pool.id: + body["data"]["relationships"]["agent_pool"] = { + "data": {"type": "agent_pools", "id": options.agent_pool.id} + } + else: + body["data"]["relationships"]["agent_pool"] = {"data": None} + r = self.t.request( + "PATCH", + f"/api/v2/tasks/{run_task_id}", + json_body=body, + ) + return _run_task_from(r.json()["data"]) + + def delete(self, run_task_id: str) -> None: + if not valid_string_id(run_task_id): + raise InvalidRunTaskIDError() + self.t.request("DELETE", f"/api/v2/tasks/{run_task_id}") + + def attach_to_workspace( + self, + workspace_id: str, + run_task_id: str, + enforcement_level: TaskEnforcementLevel, + ) -> WorkspaceRunTask: + """ + Attach a run task to a workspace. + + This is a convenience method that creates a workspace run task relationship. + """ + # This would typically delegate to workspace_run_tasks.create() + # For now, we'll create a placeholder implementation + # In a real implementation, this would call: + """ + create_options = WorkspaceRunTaskCreateOptions( + enforcement_level=enforcement_level, + run_task=RunTask(id=run_task_id, name="", url="", category="task", enabled=True) + ) + return workspace_run_tasks.create(workspace_id, create_options) + """ + + # TODO: Implement actual workspace run task creation + raise NotImplementedError("attach_to_workspace method needs to be implemented") diff --git a/src/tfe/resources/run_trigger.py b/src/tfe/resources/run_trigger.py new file mode 100644 index 0000000..44ab8cd --- /dev/null +++ b/src/tfe/resources/run_trigger.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import builtins +from collections.abc import Iterator +from datetime import datetime +from typing import Any + +from ..errors import ( + InvalidRunTriggerIDError, + InvalidRunTriggerTypeError, + InvalidWorkspaceIDError, + RequiredRunTriggerListOpsError, + RequiredSourceableError, + UnsupportedRunTriggerTypeError, +) +from ..models.run_trigger import ( + RunTrigger, + RunTriggerCreateOptions, + RunTriggerFilterOp, + RunTriggerIncludeOp, + RunTriggerListOptions, + SourceableChoice, +) +from ..models.workspace import Workspace +from ..utils import valid_string_id +from ._base import _Service + + +def _safe_str(v: Any, default: str = "") -> str: + return v if isinstance(v, str) else (str(v) if v is not None else default) + + +def _run_trigger_from(d: dict[str, Any], org: str | None = None) -> RunTrigger: + attr: dict[str, Any] = d.get("attributes", {}) or {} + relationships: dict[str, Any] = d.get("relationships", {}) or {} + + id_str: str = d.get("id", "") + created_at_str: str = _safe_str(attr.get("created-at")) + sourceable_name_str: str = _safe_str(attr.get("sourceable-name")) + workspace_name_str: str = _safe_str(attr.get("workspace-name")) + + # Extract workspace ID from relationships + workspace_id = "" + workspace_rel = relationships.get("workspace", {}) + if workspace_rel and "data" in workspace_rel: + workspace_id = workspace_rel["data"].get("id", "") + + # Extract sourceable ID from relationships + sourceable_id = "" + sourceable_rel = relationships.get("sourceable", {}) + if sourceable_rel and "data" in sourceable_rel: + sourceable_id = sourceable_rel["data"].get("id", "") + + # Create workspace objects with proper IDs + workspace = Workspace( + id=workspace_id, name=workspace_name_str, organization=org or "" + ) + sourceable = Workspace( + id=sourceable_id, name=sourceable_name_str, organization=org or "" + ) + sourceable_choice = SourceableChoice( + workspace=sourceable + ) # Should reference sourceable, not workspace + + # Parse created_at as datetime + created_at = ( + datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + if created_at_str + else datetime.now() + ) + + return RunTrigger( + id=id_str, + created_at=created_at, + sourceable_name=sourceable_name_str, + workspace_name=workspace_name_str, + sourceable=sourceable, + sourceable_choice=sourceable_choice, + workspace=workspace, + ) + + +class RunTriggers(_Service): + def list( + self, workspace_id: str, options: RunTriggerListOptions + ) -> Iterator[RunTrigger]: + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + if not options: + raise RequiredRunTriggerListOpsError() + self.validate_run_trigger_filter_param( + options.run_trigger_type, options.include or [] + ) + params: dict[str, str] = {} + if options.page_size is not None: + params["page[size]"] = str(options.page_size) + if options.page_number is not None: + params["page[number]"] = str(options.page_number) + if options.run_trigger_type: + params["filter[run-trigger][type]"] = options.run_trigger_type.value + if options.include: + params["include"] = ",".join(options.include) + + path = f"/api/v2/workspaces/{workspace_id}/run-triggers" + for item in self._list(path, params=params): + rt = _run_trigger_from(item) + self.backfill_deprecated_sourceable(rt) + yield rt + + def create(self, workspace_id: str, options: RunTriggerCreateOptions) -> RunTrigger: + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError() + if options.sourceable is None: + raise RequiredSourceableError() + body: dict[str, Any] = { + "data": { + "relationships": { + "sourceable": { + "data": {"type": "workspaces", "id": options.sourceable.id} + } + } + } + } + + r = self.t.request( + "POST", + f"/api/v2/workspaces/{workspace_id}/run-triggers", + json_body=body, + ) + rt = _run_trigger_from(r.json()["data"]) + self.backfill_deprecated_sourceable(rt) + return rt + + def read(self, run_trigger_id: str) -> RunTrigger: + if not valid_string_id(run_trigger_id): + raise InvalidRunTriggerIDError() + path = f"/api/v2/run-triggers/{run_trigger_id}" + r = self.t.request("GET", path) + rt = _run_trigger_from(r.json()["data"]) + self.backfill_deprecated_sourceable(rt) + return rt + + def delete(self, run_trigger_id: str) -> None: + if not valid_string_id(run_trigger_id): + raise InvalidRunTriggerIDError() + path = f"/api/v2/run-triggers/{run_trigger_id}" + self.t.request("DELETE", path) + return None + + def validate_run_trigger_filter_param( + self, + filter_param: RunTriggerFilterOp, + include_param: builtins.list[RunTriggerIncludeOp], + ) -> None: + if filter_param not in RunTriggerFilterOp: + raise InvalidRunTriggerTypeError() + if len(include_param) > 0: + if filter_param != RunTriggerFilterOp.RUN_TRIGGER_INBOUND: + raise UnsupportedRunTriggerTypeError() + return None + + def backfill_deprecated_sourceable(self, rt: RunTrigger) -> None: + if rt.sourceable or not rt.sourceable_choice: + return + + rt.sourceable = rt.sourceable_choice.workspace + return None diff --git a/src/tfe/utils.py b/src/tfe/utils.py index e99eea5..d8be76d 100644 --- a/src/tfe/utils.py +++ b/src/tfe/utils.py @@ -73,7 +73,7 @@ def encode_query(params: Mapping[str, Any] | None) -> str: for k, v in params.items(): if v is None: continue - if isinstance(v, (list, tuple)): + if isinstance(v, (list | tuple)): sv = ",".join(str(x) for x in v) else: sv = str(v) diff --git a/tests/units/test_run_task.py b/tests/units/test_run_task.py new file mode 100644 index 0000000..9497e36 --- /dev/null +++ b/tests/units/test_run_task.py @@ -0,0 +1,436 @@ +"""Unit tests for the run task module.""" + +from unittest.mock import Mock, patch + +import pytest + +from tfe._http import HTTPTransport +from tfe.errors import ( + InvalidOrgError, + InvalidRunTaskCategoryError, + InvalidRunTaskIDError, + InvalidRunTaskURLError, + RequiredNameError, +) +from tfe.models.agentpool import AgentPool +from tfe.models.run_task import ( + GlobalRunTaskOptions, + RunTaskCreateOptions, + RunTaskIncludeOptions, + RunTaskListOptions, + RunTaskReadOptions, + RunTaskUpdateOptions, + Stage, + TaskEnforcementLevel, +) +from tfe.resources.run_task import RunTasks, _run_task_from + + +class TestRunTaskFrom: + """Test the _run_task_from function.""" + + def test_run_task_from_comprehensive(self): + """Test _run_task_from with various data scenarios.""" + + # Testdata with all fields populated + data = { + "id": "task-123", + "attributes": { + "name": "Test Task", + "url": "https://example.com/webhook", + "category": "task", + "enabled": True, + "global-configuration": { + "enabled": True, + "stages": ["pre-plan", "post-apply"], + "enforcement-level": "mandatory", + }, + }, + "relationships": { + "agent-pool": {"data": {"id": "apool-123", "type": "agent-pools"}}, + "organization": {"data": {"id": "org-123", "type": "organizations"}}, + "workspace-tasks": { + "data": [ + {"id": "wstask-1", "type": "workspace-tasks"}, + {"id": "wstask-2", "type": "workspace-tasks"}, + ] + }, + }, + } + + result = _run_task_from(data, org="org-123") + + assert result.id == "task-123" + assert result.name == "Test Task" + assert result.url == "https://example.com/webhook" + assert result.category == "task" + assert result.enabled is True + assert result.description is None + assert result.hmac_key is None + assert result.global_configuration is not None + assert result.global_configuration.enabled is True + assert result.global_configuration.stages == [Stage.PRE_PLAN, Stage.POST_APPLY] + assert ( + result.global_configuration.enforcement_level + == TaskEnforcementLevel.MANDATORY + ) + assert result.agent_pool is not None + assert result.agent_pool.id == "apool-123" + assert result.organization is not None + assert result.organization.id == "org-123" + assert result.organization.name == "org-123" + assert isinstance(result.workspace_run_tasks, list) + assert len(result.workspace_run_tasks) == 2 + assert result.workspace_run_tasks[0].id == "wstask-1" + assert result.workspace_run_tasks[1].id == "wstask-2" + + +class TestRunTasks: + """Test the RunTasks.list method.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def run_tasks_service(self, mock_transport): + """Create a RunTasks service with mocked transport.""" + return RunTasks(mock_transport) + + def test_list_run_task(self, run_tasks_service): + """Test cases for list method with various scenarios.""" + + # Test 1: Invalid organization ID should raise error + with pytest.raises(InvalidOrgError): + list(run_tasks_service.list("", None)) + + with pytest.raises(InvalidOrgError): + list(run_tasks_service.list(None, None)) + + # Test 2: Default options (no options provided) + mock_data = [ + { + "id": "task-1", + "attributes": { + "name": "Task 1", + "url": "https://example.com/webhook1", + "category": "task", + "enabled": True, + }, + }, + { + "id": "task-2", + "attributes": { + "name": "Task 2", + "url": "https://example.com/webhook2", + "category": "task", + "enabled": False, + }, + }, + ] + + with patch.object(run_tasks_service, "_list") as mock_list: + mock_list.return_value = iter(mock_data) + + result = list(run_tasks_service.list("org-123")) + + # Verify _list was called with correct parameters + mock_list.assert_called_once_with( + "/api/v2/organizations/org-123/tasks", params={} + ) + + # Verify results + assert len(result) == 2 + assert result[0].id == "task-1" + assert result[0].name == "Task 1" + assert result[1].id == "task-2" + assert result[1].name == "Task 2" + + # Test 3: All options combined (includes pagination and multiple include options) + options = RunTaskListOptions( + page_number=3, + page_size=25, + include=[ + RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS, + RunTaskIncludeOptions.RUN_TASK_WORKSPACE, + ], + ) + + with patch.object(run_tasks_service, "_list") as mock_list: + mock_list.return_value = iter([]) + + list(run_tasks_service.list("org-complete", options)) + + mock_list.assert_called_once_with( + "/api/v2/organizations/org-complete/tasks", + params={ + "page[number]": "3", + "page[size]": "25", + "include": "workspace_tasks,workspace_tasks.workspace", + }, + ) + + # Test 4: Method returns iterator + with patch.object(run_tasks_service, "_list") as mock_list: + mock_list.return_value = iter([]) + + result = run_tasks_service.list("org-iterator") + + # Verify it's an iterator + assert hasattr(result, "__iter__") + assert hasattr(result, "__next__") + + def test_create_run_task(self, run_tasks_service): + """Test cases for create method with various scenarios.""" + + # Test 1: Missing name should raise error + with pytest.raises(RequiredNameError): + options = RunTaskCreateOptions( + name="", # Empty string should trigger our validation + url="https://example.com/webhook", + category="task", + ) + run_tasks_service.create("org-123", options) + + # Test 2: Missing URL should raise error + with pytest.raises(InvalidRunTaskURLError): + options = RunTaskCreateOptions( + name="Test Task", + url="", # Empty string should trigger our validation + category="task", + ) + run_tasks_service.create("org-123", options) + + # Test 3: Invalid category should raise error + with pytest.raises(InvalidRunTaskCategoryError): + options = RunTaskCreateOptions( + name="Test Task", url="https://example.com/webhook", category="invalid" + ) + run_tasks_service.create("org-123", options) + + # Test 4: Create with all optional fields + mock_response_data_full = { + "id": "task-456", + "attributes": { + "name": "Advanced Task", + "url": "https://example.com/advanced-webhook", + "category": "task", + "enabled": False, + "description": "Advanced task description", + "hmac_key": "secret-key-123", + "global-configuration": { + "enabled": True, + "stages": ["pre-plan", "post-plan"], + "enforcement-level": "mandatory", + }, + }, + "relationships": { + "agent-pool": {"data": {"type": "agent_pools", "id": "apool-123"}} + }, + } + + mock_response_full = Mock() + mock_response_full.json.return_value = {"data": mock_response_data_full} + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response_full + + options = RunTaskCreateOptions( + name="Advanced Task", + url="https://example.com/advanced-webhook", + category="task", + description="Advanced task description", + hmac_key="secret-key-123", + enabled=False, + global_configuration=GlobalRunTaskOptions( + enabled=True, + stages=[Stage.PRE_PLAN, Stage.POST_PLAN], + enforcement_level=TaskEnforcementLevel.MANDATORY, + ), + agent_pool=AgentPool(id="apool-123"), + ) + + result = run_tasks_service.create("org-456", options) + + # Verify request was made correctly + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + + assert call_args[0][0] == "POST" # HTTP method + assert call_args[0][1] == "/api/v2/organizations/org-456/tasks" # URL + + # Verify response + assert result.id == "task-456" + assert result.name == "Advanced Task" + assert result.description == "Advanced task description" + assert result.enabled is False + + def test_delete_run_task(self, run_tasks_service): + """Test case for run task delete operations.""" + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = None # DELETE returns no content + + run_tasks_service.delete("task-123") + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/tasks/task-123" + ) + + def test_read_run_task(self, run_tasks_service): + """Test cases for RunTask read operations.""" + + # Mock response for read request with included relationships + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "task-123", + "type": "tasks", + "attributes": { + "name": "test-task", + "url": "https://example.com/task", + "description": "Test task description", + "category": "task", + "enabled": True, + "hmac-key": "secret-key", + }, + "relationships": { + "organization": {"data": {"id": "org-123", "type": "organizations"}} + }, + "links": {"self": "/api/v2/tasks/task-123"}, + }, + "included": [ + { + "id": "org-123", + "type": "organizations", + "attributes": {"name": "test-org"}, + } + ], + } + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + result = run_tasks_service.read("task-123") + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/tasks/task-123", params={} + ) + + # Verify returned data + assert result.id == "task-123" + assert result.name == "test-task" + assert result.url == "https://example.com/task" + assert result.description == "Test task description" + assert result.category == "task" + assert result.enabled is True + assert result.hmac_key == "secret-key" + + options = RunTaskReadOptions( + include=[RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS] + ) + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + result = run_tasks_service.read_with_options("task-123", options) + + # Verify request was made with include parameter + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/tasks/task-123", params={"include": "workspace_tasks"} + ) + + """Test read method with multiple include options.""" + + options = RunTaskReadOptions( + include=[ + RunTaskIncludeOptions.RUN_TASK_WORKSPACE_TASKS, + RunTaskIncludeOptions.RUN_TASK_WORKSPACE, + ] + ) + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + result = run_tasks_service.read_with_options("task-123", options) + + # Verify request was made with multiple includes + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/tasks/task-123", + params={"include": "workspace_tasks,workspace_tasks.workspace"}, + ) + + def test_update_task_all_fields(self, run_tasks_service): + """Test cases for RunTask update operations.""" + + mock_response = Mock() + mock_response.json.return_value = { + "data": { + "id": "task-123", + "type": "tasks", + "attributes": { + "name": "comprehensive-update", + "url": "https://updated-example.com/webhook", + "description": "Comprehensive update test", + "category": "task", + "enabled": False, + "hmac-key": "new-secret-key", + }, + } + } + + options = RunTaskUpdateOptions( + name="comprehensive-update", + description="Comprehensive update test", + url="https://updated-example.com/webhook", + category="task", + hmac_key="new-secret-key", + enabled=False, + ) + + with patch.object(run_tasks_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + result = run_tasks_service.update("task-123", options) + + # Verify comprehensive request body + call_args = mock_transport.request.call_args + assert call_args[0] == ("PATCH", "/api/v2/tasks/task-123") + + assert result.id == "task-123" + assert result.name == "comprehensive-update" + assert result.url == "https://updated-example.com/webhook" + assert result.description == "Comprehensive update test" + assert result.category == "task" + assert result.enabled is False + assert result.hmac_key == "new-secret-key" + assert result.organization is None + assert result.workspace_run_tasks == [] + + def test_update_task_validation_errors(self, run_tasks_service): + """Test update method validation errors.""" + + # Test invalid task ID + options = RunTaskUpdateOptions(name="test-update") + + with pytest.raises(InvalidRunTaskIDError): + run_tasks_service.update("", options) + + # Test invalid name + options = RunTaskUpdateOptions(name="") + with pytest.raises(RequiredNameError): + run_tasks_service.update("task-123", options) + + # Test invalid URL + options = RunTaskUpdateOptions(url="") + with pytest.raises(InvalidRunTaskURLError): + run_tasks_service.update("task-123", options) + + # Test invalid category + options = RunTaskUpdateOptions(category="invalid-category") + with pytest.raises(InvalidRunTaskCategoryError): + run_tasks_service.update("task-123", options) diff --git a/tests/units/test_run_trigger.py b/tests/units/test_run_trigger.py new file mode 100644 index 0000000..a0cbfda --- /dev/null +++ b/tests/units/test_run_trigger.py @@ -0,0 +1,365 @@ +"""Unit tests for the run trigger module.""" + +from datetime import datetime +from unittest.mock import Mock, patch + +import pytest + +from tfe._http import HTTPTransport +from tfe.errors import ( + InvalidRunTriggerIDError, + InvalidRunTriggerTypeError, + InvalidWorkspaceIDError, + RequiredRunTriggerListOpsError, + RequiredSourceableError, + UnsupportedRunTriggerTypeError, +) +from tfe.models.run_trigger import ( + RunTrigger, + RunTriggerCreateOptions, + RunTriggerFilterOp, + RunTriggerIncludeOp, + RunTriggerListOptions, + SourceableChoice, +) +from tfe.models.workspace import Workspace +from tfe.resources.run_trigger import RunTriggers, _run_trigger_from + + +class TestRunTriggerFrom: + """Test the _run_trigger_from function.""" + + def test_run_trigger_from_comprehensive(self): + """Test _run_trigger_from with various data scenarios.""" + + # Test data with all fields populated + data = { + "id": "rt-123", + "attributes": { + "created-at": "2023-01-01T12:00:00Z", + "sourceable-name": "source-workspace", + "workspace-name": "target-workspace", + }, + "relationships": { + "sourceable": {"data": {"id": "ws-source-123", "type": "workspaces"}}, + "workspace": {"data": {"id": "ws-target-456", "type": "workspaces"}}, + }, + } + + result = _run_trigger_from(data) + + assert result.id == "rt-123" + assert result.sourceable_name == "source-workspace" + assert result.workspace_name == "target-workspace" + assert isinstance(result.created_at, datetime) + assert result.sourceable is not None + assert result.sourceable.name == "source-workspace" + assert result.sourceable_choice is not None + assert result.sourceable_choice.workspace.name == "source-workspace" + assert result.workspace is not None + assert result.workspace.name == "target-workspace" + + +class TestRunTriggers: + """Test the RunTriggers service class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def run_triggers_service(self, mock_transport): + """Create a RunTriggers service with mocked transport.""" + return RunTriggers(mock_transport) + + def test_list_run_triggers_validations(self, run_triggers_service): + """Test list method with invalid workspace ID.""" + + # Test empty workspace ID + with pytest.raises(InvalidWorkspaceIDError): + list( + run_triggers_service.list( + "", + RunTriggerListOptions( + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_INBOUND + ), + ) + ) + + """Test list method with missing options.""" + with pytest.raises(RequiredRunTriggerListOpsError): + list(run_triggers_service.list("ws-123", None)) + + """Test list method with invalid filter type.""" + # Mock an invalid filter type by monkey-patching the validation + with patch.object( + run_triggers_service, "validate_run_trigger_filter_param" + ) as mock_validate: + mock_validate.side_effect = InvalidRunTriggerTypeError() + + with pytest.raises(InvalidRunTriggerTypeError): + list( + run_triggers_service.list( + "ws-123", + RunTriggerListOptions( + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_INBOUND + ), + ) + ) + + """Test list method with include options on outbound trigger type.""" + options = RunTriggerListOptions( + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_OUTBOUND, + include=[RunTriggerIncludeOp.RUN_TRIGGER_WORKSPACE], + ) + + with pytest.raises(UnsupportedRunTriggerTypeError): + list(run_triggers_service.list("ws-123", options)) + + def test_list_run_triggers_success(self, run_triggers_service): + """Test successful list operation.""" + + # Mock data structure that _list returns - each item should have the full structure + # since the list method calls _run_trigger_from(item["attributes"]) + mock_data = [ + { + "id": "rt-1", + "attributes": { + "created-at": "2023-01-01T10:00:00Z", + "sourceable-name": "source-ws-1", + "workspace-name": "target-ws-1", + }, + }, + { + "id": "rt-2", + "attributes": { + "created-at": "2023-01-02T11:00:00Z", + "sourceable-name": "source-ws-2", + "workspace-name": "target-ws-2", + }, + }, + ] + + with patch.object(run_triggers_service, "_list") as mock_list: + mock_list.return_value = iter(mock_data) + + options = RunTriggerListOptions( + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_INBOUND, + page_number=2, + page_size=10, + include=[RunTriggerIncludeOp.RUN_TRIGGER_WORKSPACE], + ) + + result = list(run_triggers_service.list("ws-123", options)) + + # Verify _list was called with correct parameters + mock_list.assert_called_once_with( + "/api/v2/workspaces/ws-123/run-triggers", + params={ + "page[size]": "10", + "page[number]": "2", + "filter[run-trigger][type]": "inbound", + "include": "workspace", + }, + ) + + # Verify results - test the actual _run_trigger_from behavior + assert len(result) == 2 + assert result[0].sourceable_name == "source-ws-1" + assert result[0].workspace_name == "target-ws-1" + assert result[1].sourceable_name == "source-ws-2" + assert result[1].workspace_name == "target-ws-2" + + def test_list_run_triggers_returns_iterator(self, run_triggers_service): + """Test that list method returns an iterator.""" + + with patch.object(run_triggers_service, "_list") as mock_list: + mock_list.return_value = iter([]) + + options = RunTriggerListOptions( + run_trigger_type=RunTriggerFilterOp.RUN_TRIGGER_INBOUND + ) + result = run_triggers_service.list("ws-123", options) + + # Verify it's an iterator + assert hasattr(result, "__iter__") + assert hasattr(result, "__next__") + + def test_create_run_trigger_validations(self, run_triggers_service): + """Test create method with invalid workspace ID.""" + + options = RunTriggerCreateOptions( + sourceable=Workspace(id="ws-source", name="source", organization="org") + ) + + with pytest.raises(InvalidWorkspaceIDError): + run_triggers_service.create("", options) + + """Test create method with missing sourceable.""" + # Since the model requires sourceable, we can just validate that RequiredSourceableError + # is raised when the service method checks for None sourceable + # Create valid options but then manually set sourceable to None to bypass model validation + options = RunTriggerCreateOptions( + sourceable=Workspace(id="ws-source", name="source", organization="org") + ) + options.sourceable = None + + with pytest.raises(RequiredSourceableError): + run_triggers_service.create("ws-123", options) + + def test_create_run_trigger_success(self, run_triggers_service): + """Test successful create operation.""" + + mock_response_data = { + "id": "rt-new", + "attributes": { + "created-at": "2023-01-01T12:00:00Z", + "sourceable-name": "source-workspace", + "workspace-name": "target-workspace", + }, + } + + mock_response = Mock() + mock_response.json.return_value = {"data": mock_response_data} + + with patch.object(run_triggers_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + options = RunTriggerCreateOptions( + sourceable=Workspace(id="ws-source", name="source", organization="org") + ) + + result = run_triggers_service.create("ws-123", options) + + # Verify request was made correctly + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + + assert call_args[0][0] == "POST" # HTTP method + assert call_args[0][1] == "/api/v2/workspaces/ws-123/run-triggers" # URL + + # Verify request body structure + json_body = call_args[1]["json_body"] + assert "data" in json_body + assert "relationships" in json_body["data"] + assert "sourceable" in json_body["data"]["relationships"] + assert ( + json_body["data"]["relationships"]["sourceable"]["data"]["id"] + == "ws-source" + ) + + # Verify response + assert result.id == "rt-new" + assert result.sourceable_name == "source-workspace" + assert result.workspace_name == "target-workspace" + + def test_read_run_trigger_invalid_id(self, run_triggers_service): + """Test read method with invalid run trigger ID.""" + + with pytest.raises(InvalidRunTriggerIDError): + run_triggers_service.read("") + + def test_read_run_trigger_success(self, run_triggers_service): + """Test successful read operation.""" + + mock_response_data = { + "id": "rt-read", + "attributes": { + "created-at": "2023-01-01T12:00:00Z", + "sourceable-name": "source-workspace", + "workspace-name": "target-workspace", + }, + } + + mock_response = Mock() + mock_response.json.return_value = {"data": mock_response_data} + + with patch.object(run_triggers_service, "t") as mock_transport: + mock_transport.request.return_value = mock_response + + result = run_triggers_service.read("rt-123") + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/run-triggers/rt-123" + ) + + # Verify response + assert result.id == "rt-read" + assert result.sourceable_name == "source-workspace" + assert result.workspace_name == "target-workspace" + + def test_delete_run_trigger_success(self, run_triggers_service): + """Test successful delete operation.""" + + with patch.object(run_triggers_service, "t") as mock_transport: + mock_transport.request.return_value = None # DELETE returns no content + + result = run_triggers_service.delete("rt-123") + + # Verify request was made correctly + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/run-triggers/rt-123" + ) + + # Verify return value + assert result is None + + def test_validate_run_trigger_filter_param_validations(self, run_triggers_service): + """Test validation with invalid filter parameter.""" + + # This should be tested by mocking the enum validation + with patch("tfe.resources.run_trigger.RunTriggerFilterOp") as mock_enum: + mock_enum.__contains__ = Mock(return_value=False) + + with pytest.raises(InvalidRunTriggerTypeError): + run_triggers_service.validate_run_trigger_filter_param("invalid", []) + + """Test validation with unsupported include options.""" + with pytest.raises(UnsupportedRunTriggerTypeError): + run_triggers_service.validate_run_trigger_filter_param( + RunTriggerFilterOp.RUN_TRIGGER_OUTBOUND, + [RunTriggerIncludeOp.RUN_TRIGGER_WORKSPACE], + ) + + def test_validate_run_trigger_filter_param_success(self, run_triggers_service): + """Test successful validation.""" + + # Should not raise any exception + run_triggers_service.validate_run_trigger_filter_param( + RunTriggerFilterOp.RUN_TRIGGER_INBOUND, + [RunTriggerIncludeOp.RUN_TRIGGER_WORKSPACE], + ) + + # Should not raise any exception for outbound with no includes + run_triggers_service.validate_run_trigger_filter_param( + RunTriggerFilterOp.RUN_TRIGGER_OUTBOUND, [] + ) + + def test_backfill_deprecated_sourceable_already_exists(self, run_triggers_service): + """Test backfill when sourceable already exists.""" + + workspace = Workspace(id="ws-1", name="workspace", organization="org") + rt = RunTrigger( + id="rt-1", + created_at=datetime.now(), + sourceable_name="source", + workspace_name="target", + sourceable=workspace, # Already exists + sourceable_choice=SourceableChoice(workspace=workspace), + workspace=workspace, + ) + + run_triggers_service.backfill_deprecated_sourceable(rt) + + # Should not change existing sourceable + assert rt.sourceable == workspace + + # Manually set to None to test the backfill logic + rt.sourceable = None + + run_triggers_service.backfill_deprecated_sourceable(rt) + + # Should backfill from sourceable_choice + assert rt.sourceable == workspace