From 463a700af3e71f49b4407b1a3edf5685ea3a2604 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Mon, 6 Oct 2025 17:55:35 +0530 Subject: [PATCH 1/2] Provide features for Policy API Specs --- examples/policy.py | 256 +++++++++++++++++++++++++++++ src/tfe/_http.py | 4 + src/tfe/client.py | 2 + src/tfe/errors.py | 22 +++ src/tfe/models/policy.py | 68 ++++++++ src/tfe/models/policy_set.py | 8 + src/tfe/resources/policy.py | 163 +++++++++++++++++++ tests/units/test_policy.py | 307 +++++++++++++++++++++++++++++++++++ 8 files changed, 830 insertions(+) create mode 100644 examples/policy.py create mode 100644 src/tfe/models/policy.py create mode 100644 src/tfe/models/policy_set.py create mode 100644 src/tfe/resources/policy.py create mode 100644 tests/units/test_policy.py diff --git a/examples/policy.py b/examples/policy.py new file mode 100644 index 0000000..59197d7 --- /dev/null +++ b/examples/policy.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +"""Policy management example for python-tfe SDK. + +This example demonstrates how to use the Policy API to: +1. List policies in an organization +2. Create a new policy (Sentinel or OPA) +3. Upload policy content +4. Read policy details +5. Update policy settings +6. Download policy content +7. Delete a policy + +Usage: + python examples/policy.py --org myorg --policy-name my-policy + python examples/policy.py --org myorg --policy-name my-policy --upload sentinel_policy.sentinel + python examples/policy.py --org myorg --policy-name my-policy --download downloaded_policy.sentinel +""" + +from __future__ import annotations + +import argparse +import os +from pathlib import Path + +from tfe import TFEClient, TFEConfig +from tfe.models.policy import ( + EnforcementLevel, + PolicyCreateOptions, + PolicyListOptions, + PolicyUpdateOptions, +) +from tfe.models.policy_set import PolicyKind + + +def _print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def main(): + parser = argparse.ArgumentParser( + description="Policy management demo for python-tfe SDK" + ) + parser.add_argument( + "--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io") + ) + parser.add_argument("--token", default=os.getenv("TFE_TOKEN", "")) + parser.add_argument("--org", required=True, help="Organization name") + parser.add_argument("--policy-name", required=True, help="Policy name to work with") + parser.add_argument( + "--kind", + choices=["sentinel", "opa"], + default="sentinel", + help="Policy kind (sentinel or opa)", + ) + parser.add_argument( + "--enforcement-level", + choices=["advisory", "soft-mandatory", "hard-mandatory", "mandatory"], + default="advisory", + help="Policy enforcement level", + ) + parser.add_argument("--upload", help="Path to policy file to upload") + parser.add_argument("--download", help="Path to save downloaded policy content") + parser.add_argument("--description", help="Policy description") + parser.add_argument("--query", help="OPA query (required for OPA policies)") + parser.add_argument("--page", type=int, default=1) + parser.add_argument("--page-size", type=int, default=20) + parser.add_argument("--search", help="Search policies by name") + parser.add_argument("--delete", action="store_true", help="Delete the policy") + + args = parser.parse_args() + + if not args.token: + print("Error: TFE_TOKEN environment variable or --token argument required") + return 1 + + cfg = TFEConfig(address=args.address, token=args.token) + client = TFEClient(cfg) + + # 1) List all policies in the organization + _print_header(f"Listing policies in organization: {args.org}") + + list_options = PolicyListOptions( + page_number=args.page, + page_size=args.page_size, + ) + + if args.search: + list_options.search = args.search + if args.kind: + list_options.kind = PolicyKind.SENTINEL if args.kind == "sentinel" else PolicyKind.OPA + + policy_list = client.policies.list(args.org, list_options) + + print(f"Total policies: {policy_list.total_count}") + print(f"Page {policy_list.current_page} of {policy_list.total_pages}") + print() + + existing_policy = None + for policy in policy_list.items: + print(f"- {policy.id} | {policy.name} | kind={policy.kind} | enforcement={policy.enforcement_level}") + if policy.name == args.policy_name: + existing_policy = policy + + # 2) Create a new policy if it doesn't exist + if not existing_policy: + _print_header(f"Creating new policy: {args.policy_name}") + + # Map string enforcement level to enum + enforcement_map = { + "advisory": EnforcementLevel.ENFORCEMENT_ADVISORY, + "soft-mandatory": EnforcementLevel.ENFORCEMENT_SOFT, + "hard-mandatory": EnforcementLevel.ENFORCEMENT_HARD, + "mandatory": EnforcementLevel.ENFORCEMENT_MANDATORY, + } + + create_options = PolicyCreateOptions( + name=args.policy_name, + kind=PolicyKind.SENTINEL if args.kind == "sentinel" else PolicyKind.OPA, + enforcement_level=enforcement_map[args.enforcement_level], + description=args.description or f"Example {args.kind} policy created via python-tfe SDK", + ) + + # OPA policies require a query + if args.kind == "opa": + if not args.query: + create_options.query = "terraform.main" # Default OPA query + else: + create_options.query = args.query + + try: + policy = client.policies.create(args.org, create_options) + print(f"Created policy: {policy.id}") + print(f" Name: {policy.name}") + print(f" Kind: {policy.kind}") + print(f" Enforcement: {policy.enforcement_level}") + if policy.query: + print(f" Query: {policy.query}") + existing_policy = policy + except Exception as e: + print(f"Error creating policy: {e}") + return 1 + + # 3) Read the policy details + _print_header(f"Reading policy details: {existing_policy.id}") + policy_details = client.policies.read(existing_policy.id) + print(f"Policy ID: {policy_details.id}") + print(f"Name: {policy_details.name}") + print(f"Kind: {policy_details.kind}") + print(f"Description: {policy_details.description}") + print(f"Enforcement Level: {policy_details.enforcement_level}") + print(f"Policy Set Count: {policy_details.policy_set_count}") + print(f"Updated At: {policy_details.updated_at}") + if policy_details.query: + print(f"Query: {policy_details.query}") + + # 4) Upload policy content if provided + if args.upload: + _print_header(f"Uploading policy content from: {args.upload}") + try: + policy_content = Path(args.upload).read_bytes() + client.policies.upload(existing_policy.id, policy_content) + print(f"Successfully uploaded {len(policy_content)} bytes to policy {existing_policy.id}") + except Exception as e: + print(f"Error uploading policy content: {e}") + return 1 + elif not args.upload and not existing_policy: + # Upload default content for demonstration + _print_header("Uploading default policy content") + if args.kind == "sentinel": + default_content = """# Example Sentinel policy +main = rule { + true +} +""" + else: # OPA + default_content = """# Example OPA policy +package terraform + +default main = true + +main { + input.resource_changes +} +""" + try: + client.policies.upload(existing_policy.id, default_content.encode('utf-8')) + print(f"Uploaded default {args.kind} policy content") + except Exception as e: + print(f"Error uploading default content: {e}") + + # 5) Download policy content if requested + if args.download: + _print_header(f"Downloading policy content to: {args.download}") + try: + policy_content = client.policies.download(existing_policy.id) + Path(args.download).write_bytes(policy_content) + print(f"Downloaded {len(policy_content)} bytes to {args.download}") + + # Also print the content to console + print("\nPolicy content preview:") + print("-" * 40) + content_str = policy_content.decode('utf-8') + lines = content_str.split('\n') + for i, line in enumerate(lines[:10], 1): # Show first 10 lines + print(f"{i:2d}: {line}") + if len(lines) > 10: + print(f"... ({len(lines) - 10} more lines)") + print("-" * 40) + + except Exception as e: + print(f"Error downloading policy content: {e}") + + # 6) Update policy if description provided + if args.description and existing_policy: + _print_header("Updating policy description") + try: + enforcement_map = { + "advisory": EnforcementLevel.ENFORCEMENT_ADVISORY, + "soft-mandatory": EnforcementLevel.ENFORCEMENT_SOFT, + "hard-mandatory": EnforcementLevel.ENFORCEMENT_HARD, + "mandatory": EnforcementLevel.ENFORCEMENT_MANDATORY, + } + + update_options = PolicyUpdateOptions( + description=args.description, + enforcement_level=enforcement_map[args.enforcement_level], + ) + + if args.kind == "opa" and args.query: + update_options.query = args.query + + updated_policy = client.policies.update(existing_policy.id, update_options) + print(f"Updated policy: {updated_policy.id}") + print(f" New description: {updated_policy.description}") + print(f" Enforcement level: {updated_policy.enforcement_level}") + except Exception as e: + print(f"Error updating policy: {e}") + + # 7) Delete policy if requested + if args.delete and existing_policy: + _print_header(f"Deleting policy: {existing_policy.id}") + try: + client.policies.delete(existing_policy.id) + print(f"Successfully deleted policy: {existing_policy.id}") + except Exception as e: + print(f"Error deleting policy: {e}") + return 1 + + print("\nPolicy operations completed successfully!") + return 0 + + +if __name__ == "__main__": + exit(main()) \ No newline at end of file diff --git a/src/tfe/_http.py b/src/tfe/_http.py index e858f1b..9c51efd 100644 --- a/src/tfe/_http.py +++ b/src/tfe/_http.py @@ -73,6 +73,7 @@ def request( *, params: Mapping[str, Any] | None = None, json_body: Mapping[str, Any] | None = None, + data: bytes | None = None, headers: dict[str, str] | None = None, allow_redirects: bool = True, ) -> httpx.Response: @@ -89,6 +90,7 @@ def request( url, params=params, json=json_body, + content=data, headers=hdrs, follow_redirects=allow_redirects, ) @@ -114,6 +116,7 @@ async def arequest( *, params: Mapping[str, Any] | None = None, json_body: Mapping[str, Any] | None = None, + data: bytes | None = None, headers: dict[str, str] | None = None, allow_redirects: bool = True, ) -> httpx.Response: @@ -128,6 +131,7 @@ async def arequest( url, params=params, json=json_body, + content=data, headers=hdrs, follow_redirects=allow_redirects, ) diff --git a/src/tfe/client.py b/src/tfe/client.py index 793daef..90e0f80 100644 --- a/src/tfe/client.py +++ b/src/tfe/client.py @@ -10,6 +10,7 @@ from .resources.oauth_token import OAuthTokens from .resources.organizations import Organizations from .resources.plan import Plans +from .resources.policy import Policies from .resources.projects import Projects from .resources.query_run import QueryRuns from .resources.registry_module import RegistryModules @@ -72,6 +73,7 @@ def __init__(self, config: TFEConfig | None = None): self.runs = Runs(self._transport) self.query_runs = QueryRuns(self._transport) self.run_events = RunEvents(self._transport) + self.policies = Policies(self._transport) # SSH Keys self.ssh_keys = SSHKeys(self._transport) diff --git a/src/tfe/errors.py b/src/tfe/errors.py index 73951c8..6069a7e 100644 --- a/src/tfe/errors.py +++ b/src/tfe/errors.py @@ -393,3 +393,25 @@ class InvalidRunEventIDError(InvalidValues): def __init__(self, message: str = "invalid value for run event ID"): super().__init__(message) + + +# Policy errors +class InvalidPolicyIDError(InvalidValues): + """Raised when an invalid policy ID is provided.""" + + def __init__(self, message: str = "invalid value for policy ID"): + super().__init__(message) + + +class RequiredQueryError(RequiredFieldMissing): + """Raised when a required query field is missing.""" + + def __init__(self, message: str = "query is required"): + super().__init__(message) + + +class RequiredEnforceError(RequiredFieldMissing): + """Raised when a required enforce field is missing.""" + + def __init__(self, message: str = "enforce or enforcement-level is required"): + super().__init__(message) diff --git a/src/tfe/models/policy.py b/src/tfe/models/policy.py new file mode 100644 index 0000000..fc40834 --- /dev/null +++ b/src/tfe/models/policy.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum + +from pydantic import BaseModel, ConfigDict, Field + +from .organization import Organization +from .policy_set import PolicyKind + + +class EnforcementLevel(str, Enum): + ENFORCEMENT_ADVISORY = "advisory" + ENFORCEMENT_MANDATORY = "mandatory" + ENFORCEMENT_HARD = "hard-mandatory" + ENFORCEMENT_SOFT = "soft-mandatory" + + +class Policy(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + name: str | None = Field(None, alias="name") + kind: PolicyKind | None = Field(None, alias="kind") + query: str | None = Field(None, alias="query") + description: str | None = Field(None, alias="description") + enforcement_level: EnforcementLevel | None = Field(None, alias="enforcement-level") + policy_set_count: int | None = Field(None, alias="policy-set-count") + updated_at: datetime | None = Field(None, alias="updated-at") + organization: Organization | None = Field(None, alias="organization") + + +class PolicyList(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + items: list[Policy] = Field(default_factory=list) + current_page: int | None = None + total_pages: int | None = None + prev_page: int | None = None + next_page: int | None = None + total_count: int | None = None + + +class PolicyListOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + search: str | None = Field(None, alias="search[name]") + kind: PolicyKind | None = Field(None, alias="filter[kind]") + page_number: int | None = Field(None, alias="page[number]") + page_size: int | None = Field(None, alias="page[size]") + + +class PolicyCreateOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + name: str = Field(..., alias="name") + kind: PolicyKind | None = Field(None, alias="kind") + query: str | None = Field(None, alias="query") + description: str | None = Field(None, alias="description") + enforcement_level: EnforcementLevel | None = Field(None, alias="enforcement-level") + + +class PolicyUpdateOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + query: str | None = Field(None, alias="query") + description: str | None = Field(None, alias="description") + enforcement_level: EnforcementLevel | None = Field(None, alias="enforcement-level") diff --git a/src/tfe/models/policy_set.py b/src/tfe/models/policy_set.py new file mode 100644 index 0000000..e9ee8e6 --- /dev/null +++ b/src/tfe/models/policy_set.py @@ -0,0 +1,8 @@ +from __future__ import annotations + +from enum import Enum + + +class PolicyKind(str, Enum): + OPA = "opa" + SENTINEL = "sentinel" diff --git a/src/tfe/resources/policy.py b/src/tfe/resources/policy.py new file mode 100644 index 0000000..fb30ca0 --- /dev/null +++ b/src/tfe/resources/policy.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +from ..errors import ( + InvalidNameError, + InvalidOrgError, + InvalidPolicyIDError, + RequiredEnforceError, + RequiredNameError, + RequiredQueryError, +) +from ..models.policy import ( + Policy, + PolicyCreateOptions, + PolicyList, + PolicyListOptions, + PolicyUpdateOptions, +) +from ..utils import valid_string, valid_string_id +from ._base import _Service + + +class Policies(_Service): + def list( + self, organization: str, options: PolicyListOptions | None = None + ) -> PolicyList: + """List all the policies of the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + params = ( + options.model_dump(by_alias=True, exclude_none=True) if options else None + ) + r = self.t.request( + "GET", + f"/api/v2/organizations/{organization}/policies", + params=params, + ) + jd = r.json() + items = [] + meta = jd.get("meta", {}) + pagination = meta.get("pagination", {}) + for d in jd.get("data", []): + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + attrs["organization"] = d.get("relationships", {}).get("organization", {}) + items.append(Policy.model_validate(attrs)) + return PolicyList( + items=items, + current_page=pagination.get("current-page"), + total_pages=pagination.get("total-pages"), + prev_page=pagination.get("prev-page"), + next_page=pagination.get("next-page"), + total_count=pagination.get("total-count"), + ) + + def create(self, organization: str, options: PolicyCreateOptions) -> Policy: + """Create a new policy in the given organization.""" + if not valid_string_id(organization): + raise InvalidOrgError() + valid = self._valid_create_options(options) + if valid is not None: + raise valid + payload = { + "data": { + "attributes": options.model_dump(by_alias=True, exclude_none=True), + "type": "policies", + } + } + r = self.t.request( + "POST", + f"/api/v2/organizations/{organization}/policies", + json_body=payload, + ) + jd = r.json() + d = jd.get("data", {}) + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + return Policy.model_validate(attrs) + + def read(self, policy_id: str) -> Policy: + """Read a specific policy by its ID.""" + if not valid_string_id(policy_id): + raise InvalidPolicyIDError + r = self.t.request( + "GET", + f"/api/v2/policies/{policy_id}", + ) + jd = r.json() + d = jd.get("data", {}) + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + attrs["organization"] = d.get("relationships", {}).get("organization", {}) + return Policy.model_validate(attrs) + + def update(self, policy_id: str, options: PolicyUpdateOptions) -> Policy: + """Update an existing policy by its ID.""" + if not valid_string_id(policy_id): + raise InvalidPolicyIDError + payload = { + "data": { + "type": "policies", + "attributes": options.model_dump(by_alias=True, exclude_none=True), + } + } + r = self.t.request( + "PATCH", + f"/api/v2/policies/{policy_id}", + json_body=payload, + ) + jd = r.json() + d = jd.get("data", {}) + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + attrs["organization"] = d.get("relationships", {}).get("organization", {}) + return Policy.model_validate(attrs) + + def delete(self, policy_id: str) -> None: + """Delete a specific policy by its ID.""" + if not valid_string_id(policy_id): + raise InvalidPolicyIDError + self.t.request( + "DELETE", + f"/api/v2/policies/{policy_id}", + ) + return None + + def upload(self, policy_id: str, content: bytes) -> None: + """Upload the policy content of the policy.""" + if not valid_string_id(policy_id): + raise InvalidPolicyIDError + + # Send binary content directly (not as JSON) + self.t.request( + "PUT", + f"/api/v2/policies/{policy_id}/upload", + data=content, + headers={"Content-Type": "application/octet-stream"}, + ) + return None + + def download(self, policy_id: str) -> bytes: + """Download the policy content of the policy.""" + if not valid_string_id(policy_id): + raise InvalidPolicyIDError + r = self.t.request( + "GET", + f"/api/v2/policies/{policy_id}/download", + ) + return r.content + + def _valid_create_options(self, options: PolicyCreateOptions) -> None | Exception: + """Validate the given PolicyCreateOptions.""" + if not valid_string(options.name): + return RequiredNameError() + if not valid_string_id(options.name): + return InvalidNameError() + + if options.kind == "opa" and not valid_string(options.query): + return RequiredQueryError() + + if not options.enforcement_level: + return RequiredEnforceError() + + return None diff --git a/tests/units/test_policy.py b/tests/units/test_policy.py new file mode 100644 index 0000000..258cc43 --- /dev/null +++ b/tests/units/test_policy.py @@ -0,0 +1,307 @@ +"""Unit tests for the policy module.""" + +from unittest.mock import Mock, patch + +import pytest + +from tfe._http import HTTPTransport +from tfe.errors import ( + InvalidOrgError, + InvalidPolicyIDError, + RequiredNameError, +) +from tfe.models.policy import ( + EnforcementLevel, + Policy, + PolicyCreateOptions, + PolicyList, + PolicyUpdateOptions, +) +from tfe.models.policy_set import PolicyKind +from tfe.resources.policy import Policies + + +class TestPolicies: + """Test the Policies service class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def policies_service(self, mock_transport): + """Create a Policies service with mocked transport.""" + return Policies(mock_transport) + + def test_list_policies_validations(self, policies_service): + """Test list method with invalid organization.""" + + # Test empty organization + with pytest.raises(InvalidOrgError): + policies_service.list("") + + # Test None organization + with pytest.raises(InvalidOrgError): + policies_service.list(None) + + def test_list_policies_success_without_options( + self, policies_service, mock_transport + ): + """Test successful list operation without options.""" + + mock_response_data = { + "data": [ + { + "id": "pol-123", + "attributes": { + "name": "test-policy", + "kind": "sentinel", + "description": "Test policy description", + "enforcement-level": "advisory", + "policy-set-count": 1, + "updated-at": "2023-01-01T12:00:00Z", + }, + "relationships": { + "organization": { + "data": {"id": "org-123", "type": "organizations"} + } + }, + } + ], + "meta": { + "pagination": { + "current-page": 1, + "total-pages": 1, + "prev-page": None, + "next-page": None, + "total-count": 1, + } + }, + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + result = policies_service.list("org-123") + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/organizations/org-123/policies", params=None + ) + + assert isinstance(result, PolicyList) + assert len(result.items) == 1 + assert result.items[0].id == "pol-123" + assert result.items[0].name == "test-policy" + assert result.items[0].kind == PolicyKind.SENTINEL + assert ( + result.items[0].enforcement_level == EnforcementLevel.ENFORCEMENT_ADVISORY + ) + assert result.current_page == 1 + assert result.total_count == 1 + + def test_create_policy_validations(self, policies_service): + """Test create method validations.""" + + # Test invalid organization + options = PolicyCreateOptions( + name="test-policy", enforcement_level=EnforcementLevel.ENFORCEMENT_ADVISORY + ) + + # Test validation method is called + with patch.object(policies_service, "_valid_create_options") as mock_validate: + mock_validate.return_value = RequiredNameError() + + with pytest.raises(RequiredNameError): + policies_service.create("org-123", options) + + def test_create_policy_success(self, policies_service, mock_transport): + """Test successful create operation.""" + + mock_response_data = { + "data": { + "id": "pol-456", + "attributes": { + "name": "new-policy", + "kind": "sentinel", + "enforcement-level": "hard-mandatory", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + options = PolicyCreateOptions( + name="new-policy", + kind=PolicyKind.SENTINEL, + enforcement_level=EnforcementLevel.ENFORCEMENT_HARD, + ) + + result = policies_service.create("org-123", options) + + mock_transport.request.assert_called_once_with( + "POST", + "/api/v2/organizations/org-123/policies", + json_body={ + "data": { + "attributes": { + "name": "new-policy", + "kind": "sentinel", + "enforcement-level": "hard-mandatory", + }, + "type": "policies", + } + }, + ) + + assert isinstance(result, Policy) + assert result.id == "pol-456" + assert result.name == "new-policy" + + def test_read_policy_success(self, policies_service, mock_transport): + """Test successful read operation.""" + + mock_response_data = { + "data": { + "id": "pol-789", + "attributes": { + "name": "existing-policy", + "kind": "opa", + "query": "terraform.main", + "enforcement-level": "advisory", + }, + "relationships": { + "organization": {"data": {"id": "org-123", "type": "organizations"}} + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + result = policies_service.read("pol-789") + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/policies/pol-789" + ) + + assert isinstance(result, Policy) + assert result.id == "pol-789" + assert result.name == "existing-policy" + assert result.kind == PolicyKind.OPA + assert result.query == "terraform.main" + + def test_update_policy_success(self, policies_service, mock_transport): + """Test successful update operation.""" + + mock_response_data = { + "data": { + "id": "pol-789", + "attributes": { + "name": "updated-policy", + "enforcement-level": "soft-mandatory", + }, + "relationships": { + "organization": {"data": {"id": "org-123", "type": "organizations"}} + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + options = PolicyUpdateOptions( + description="Updated description", + enforcement_level=EnforcementLevel.ENFORCEMENT_SOFT, + ) + + result = policies_service.update("pol-789", options) + + mock_transport.request.assert_called_once_with( + "PATCH", + "/api/v2/policies/pol-789", + json_body={ + "data": { + "type": "policies", + "attributes": { + "description": "Updated description", + "enforcement-level": "soft-mandatory", + }, + } + }, + ) + + assert isinstance(result, Policy) + assert result.id == "pol-789" + + def test_delete_policy_validations(self, policies_service): + """Test delete method with invalid policy ID.""" + + with pytest.raises(InvalidPolicyIDError): + policies_service.delete("") + + def test_delete_policy_success(self, policies_service, mock_transport): + """Test successful delete operation.""" + + policies_service.delete("pol-789") + + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/policies/pol-789" + ) + + def test_upload_policy_success_with_bytes(self, policies_service, mock_transport): + """Test successful upload operation with bytes content.""" + + policy_content = b"main = rule { true }" + + policies_service.upload("pol-789", policy_content) + + mock_transport.request.assert_called_once_with( + "PUT", + "/api/v2/policies/pol-789/upload", + data=policy_content, + headers={"Content-Type": "application/octet-stream"}, + ) + + def test_download_policy_success(self, policies_service, mock_transport): + """Test successful download operation.""" + + policy_content = b"main = rule { true }" + mock_response = Mock() + mock_response.content = policy_content + mock_transport.request.return_value = mock_response + + result = policies_service.download("pol-789") + + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/policies/pol-789/download" + ) + + assert result == policy_content + + def test_valid_create_options_success(self, policies_service): + """Test _valid_create_options with valid options.""" + + # Test valid Sentinel policy + options = PolicyCreateOptions( + name="test-policy", + kind=PolicyKind.SENTINEL, + enforcement_level=EnforcementLevel.ENFORCEMENT_HARD, + ) + result = policies_service._valid_create_options(options) + assert result is None + + # Test valid OPA policy + options = PolicyCreateOptions( + name="test-opa-policy", + kind=PolicyKind.OPA, + query="terraform.main", + enforcement_level=EnforcementLevel.ENFORCEMENT_MANDATORY, + ) + result = policies_service._valid_create_options(options) + assert result is None From b966b463a417dd04c04c1b69f30e9c99f1f2aab4 Mon Sep 17 00:00:00 2001 From: Sivaselvan32 Date: Tue, 7 Oct 2025 14:46:18 +0530 Subject: [PATCH 2/2] Lint issue fix --- examples/policy.py | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/examples/policy.py b/examples/policy.py index 59197d7..55cfcf7 100644 --- a/examples/policy.py +++ b/examples/policy.py @@ -80,33 +80,37 @@ def main(): # 1) List all policies in the organization _print_header(f"Listing policies in organization: {args.org}") - + list_options = PolicyListOptions( page_number=args.page, page_size=args.page_size, ) - + if args.search: list_options.search = args.search if args.kind: - list_options.kind = PolicyKind.SENTINEL if args.kind == "sentinel" else PolicyKind.OPA + list_options.kind = ( + PolicyKind.SENTINEL if args.kind == "sentinel" else PolicyKind.OPA + ) policy_list = client.policies.list(args.org, list_options) - + print(f"Total policies: {policy_list.total_count}") print(f"Page {policy_list.current_page} of {policy_list.total_pages}") print() existing_policy = None for policy in policy_list.items: - print(f"- {policy.id} | {policy.name} | kind={policy.kind} | enforcement={policy.enforcement_level}") + print( + f"- {policy.id} | {policy.name} | kind={policy.kind} | enforcement={policy.enforcement_level}" + ) if policy.name == args.policy_name: existing_policy = policy # 2) Create a new policy if it doesn't exist if not existing_policy: _print_header(f"Creating new policy: {args.policy_name}") - + # Map string enforcement level to enum enforcement_map = { "advisory": EnforcementLevel.ENFORCEMENT_ADVISORY, @@ -114,14 +118,15 @@ def main(): "hard-mandatory": EnforcementLevel.ENFORCEMENT_HARD, "mandatory": EnforcementLevel.ENFORCEMENT_MANDATORY, } - + create_options = PolicyCreateOptions( name=args.policy_name, kind=PolicyKind.SENTINEL if args.kind == "sentinel" else PolicyKind.OPA, enforcement_level=enforcement_map[args.enforcement_level], - description=args.description or f"Example {args.kind} policy created via python-tfe SDK", + description=args.description + or f"Example {args.kind} policy created via python-tfe SDK", ) - + # OPA policies require a query if args.kind == "opa": if not args.query: @@ -161,7 +166,9 @@ def main(): try: policy_content = Path(args.upload).read_bytes() client.policies.upload(existing_policy.id, policy_content) - print(f"Successfully uploaded {len(policy_content)} bytes to policy {existing_policy.id}") + print( + f"Successfully uploaded {len(policy_content)} bytes to policy {existing_policy.id}" + ) except Exception as e: print(f"Error uploading policy content: {e}") return 1 @@ -185,7 +192,7 @@ def main(): } """ try: - client.policies.upload(existing_policy.id, default_content.encode('utf-8')) + client.policies.upload(existing_policy.id, default_content.encode("utf-8")) print(f"Uploaded default {args.kind} policy content") except Exception as e: print(f"Error uploading default content: {e}") @@ -197,18 +204,18 @@ def main(): policy_content = client.policies.download(existing_policy.id) Path(args.download).write_bytes(policy_content) print(f"Downloaded {len(policy_content)} bytes to {args.download}") - + # Also print the content to console print("\nPolicy content preview:") print("-" * 40) - content_str = policy_content.decode('utf-8') - lines = content_str.split('\n') + content_str = policy_content.decode("utf-8") + lines = content_str.split("\n") for i, line in enumerate(lines[:10], 1): # Show first 10 lines print(f"{i:2d}: {line}") if len(lines) > 10: print(f"... ({len(lines) - 10} more lines)") print("-" * 40) - + except Exception as e: print(f"Error downloading policy content: {e}") @@ -222,15 +229,15 @@ def main(): "hard-mandatory": EnforcementLevel.ENFORCEMENT_HARD, "mandatory": EnforcementLevel.ENFORCEMENT_MANDATORY, } - + update_options = PolicyUpdateOptions( description=args.description, enforcement_level=enforcement_map[args.enforcement_level], ) - + if args.kind == "opa" and args.query: update_options.query = args.query - + updated_policy = client.policies.update(existing_policy.id, update_options) print(f"Updated policy: {updated_policy.id}") print(f" New description: {updated_policy.description}") @@ -253,4 +260,4 @@ def main(): if __name__ == "__main__": - exit(main()) \ No newline at end of file + exit(main())