Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions examples/policy_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

import argparse
import os

from pytfe import TFEClient, TFEConfig
from pytfe.models import PolicyEvaluationListOptions


def _print_header(title: str):
print("\n" + "=" * 80)
print(title)
print("=" * 80)


def main():
parser = argparse.ArgumentParser(
description="Policy Evaluations demo for python-tfe SDK"
)
parser.add_argument(
"--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io")
)
parser.add_argument("--token", default=os.getenv("TFE_TOKEN", ""))
parser.add_argument(
"--task-stage-id",
required=True,
help="Task stage ID to list policy evaluations for",
)
parser.add_argument("--page", type=int, default=1)
parser.add_argument("--page-size", type=int, default=20)
args = parser.parse_args()

if not args.token:
print("Error: TFE_TOKEN environment variable or --token argument is required")
return

cfg = TFEConfig(address=args.address, token=args.token)
client = TFEClient(cfg)

# List all policy evaluations for the given task stage
_print_header(f"Listing policy evaluations for task stage: {args.task_stage_id}")

options = PolicyEvaluationListOptions(
page_number=args.page,
page_size=args.page_size,
)

try:
pe_list = client.policy_evaluations.list(args.task_stage_id, options)

print(f"Total policy evaluations: {pe_list.total_count}")
print(f"Page {pe_list.current_page} of {pe_list.total_pages}")
print()

if not pe_list.items:
print("No policy evaluations found for this task stage.")
else:
for pe in pe_list.items:
print(f"- ID: {pe.id}")
print(f" Status: {pe.status}")
print(f" Policy Kind: {pe.policy_kind}")

if pe.result_count:
print(" Result Count:")
if pe.result_count.passed is not None:
print(f" - Passed: {pe.result_count.passed}")
if pe.result_count.advisory_failed is not None:
print(
f" - Advisory Failed: {pe.result_count.advisory_failed}"
)
if pe.result_count.mandatory_failed is not None:
print(
f" - Mandatory Failed: {pe.result_count.mandatory_failed}"
)
if pe.result_count.errored is not None:
print(f" - Errored: {pe.result_count.errored}")

if pe.status_timestamp:
print(" Status Timestamps:")
if pe.status_timestamp.passed_at:
print(f" - Passed At: {pe.status_timestamp.passed_at}")
if pe.status_timestamp.failed_at:
print(f" - Failed At: {pe.status_timestamp.failed_at}")
if pe.status_timestamp.running_at:
print(f" - Running At: {pe.status_timestamp.running_at}")
if pe.status_timestamp.canceled_at:
print(f" - Canceled At: {pe.status_timestamp.canceled_at}")
if pe.status_timestamp.errored_at:
print(f" - Errored At: {pe.status_timestamp.errored_at}")

if pe.task_stage:
print(f" Task Stage: {pe.task_stage.id} ({pe.task_stage.type})")

if pe.created_at:
print(f" Created At: {pe.created_at}")
if pe.updated_at:
print(f" Updated At: {pe.updated_at}")

print()

except Exception as e:
print(f"Error listing policy evaluations: {e}")
return


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions src/pytfe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
from .resources.plan import Plans
from .resources.policy import Policies
from .resources.policy_check import PolicyChecks
from .resources.policy_evaluation import PolicyEvaluations
from .resources.policy_set import PolicySets
from .resources.policy_set_outcome import PolicySets as PolicySetOutcomes
from .resources.policy_set_version import PolicySetVersions
from .resources.projects import Projects
from .resources.query_run import QueryRuns
from .resources.registry_module import RegistryModules
Expand Down Expand Up @@ -76,8 +79,11 @@ def __init__(self, config: TFEConfig | None = None):
self.query_runs = QueryRuns(self._transport)
self.run_events = RunEvents(self._transport)
self.policies = Policies(self._transport)
self.policy_evaluations = PolicyEvaluations(self._transport)
self.policy_checks = PolicyChecks(self._transport)
self.policy_sets = PolicySets(self._transport)
self.policy_set_outcomes = PolicySetOutcomes(self._transport)
self.policy_set_versions = PolicySetVersions(self._transport)

# SSH Keys
self.ssh_keys = SSHKeys(self._transport)
Expand Down
15 changes: 15 additions & 0 deletions src/pytfe/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,18 @@ class InvalidPoliciesError(InvalidValues):

def __init__(self, message: str = "must provide at least one policy"):
super().__init__(message)


# Policy Evaluation errors
class InvalidTaskStageIDError(InvalidValues):
"""Raised when an invalid task stage ID is provided."""

def __init__(self, message: str = "invalid value for task stage ID"):
super().__init__(message)


class InvalidPolicyEvaluationIDError(InvalidValues):
"""Raised when an invalid policy evaluation ID is provided."""

def __init__(self, message: str = "invalid value for policy evaluation ID"):
super().__init__(message)
17 changes: 17 additions & 0 deletions src/pytfe/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@
PolicyStatus,
PolicyStatusTimestamps,
)
from .policy_evaluation import (
PolicyAttachable,
PolicyEvaluation,
PolicyEvaluationList,
PolicyEvaluationListOptions,
PolicyEvaluationStatus,
PolicyEvaluationStatusTimestamps,
PolicyResultCount,
)
from .policy_set import (
PolicySet,
PolicySetAddPoliciesOptions,
Expand Down Expand Up @@ -526,6 +535,14 @@
"PolicyStatusTimestamps",
"PolicyCheckListOptions",
"PolicyCheckList",
# Policy Evaluation
"PolicyAttachable",
"PolicyEvaluation",
"PolicyEvaluationList",
"PolicyEvaluationListOptions",
"PolicyEvaluationStatus",
"PolicyEvaluationStatusTimestamps",
"PolicyResultCount",
# Policy
"Policy",
"PolicyCreateOptions",
Expand Down
94 changes: 94 additions & 0 deletions src/pytfe/models/policy_evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from __future__ import annotations

from datetime import datetime
from enum import Enum

from pydantic import BaseModel, ConfigDict, Field

from .policy_types import PolicyKind


class PolicyEvaluationStatus(str, Enum):
"""PolicyEvaluationStatus is an enum that represents all possible statuses for a policy evaluation"""

POLICYEVALUATIONPASSED = "passed"
POLICYEVALUATIONFAILED = "failed"
POLICYEVALUATIONPENDING = "pending"
POLICYEVALUATIONRUNNING = "running"
POLICYEVALUATIONCANCELED = "canceled"
POLICYEVALUATIONERRORED = "errored"
POLICYEVALUATIONUNREACHABLE = "unreachable"
POLICYEVALUATIONOVERRIDDEN = "overridden"


class PolicyEvaluation(BaseModel):
"""PolicyEvaluation represents the policy evaluations that are part of the task stage."""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

id: str
status: PolicyEvaluationStatus | None = Field(None, alias="status")
policy_kind: PolicyKind | None = Field(None, alias="policy-kind")
status_timestamp: PolicyEvaluationStatusTimestamps | None = Field(
None, alias="status-timestamp"
)
result_count: PolicyResultCount | None = Field(None, alias="result-count")
created_at: datetime | None = Field(None, alias="created-at")
updated_at: datetime | None = Field(None, alias="updated-at")

# The task stage the policy evaluation belongs to
task_stage: PolicyAttachable | None = Field(None, alias="policy-attachable")


class PolicyEvaluationStatusTimestamps(BaseModel):
"""PolicyEvaluationStatusTimestamps represents the set of timestamps recorded for a policy evaluation"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

passed_at: datetime | None = Field(None, alias="passed-at")
failed_at: datetime | None = Field(None, alias="failed-at")
running_at: datetime | None = Field(None, alias="running-at")
canceled_at: datetime | None = Field(None, alias="canceled-at")
errored_at: datetime | None = Field(None, alias="errored-at")


class PolicyAttachable(BaseModel):
"""The task stage the policy evaluation belongs to"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

id: str
type: str | None = Field(None, alias="type")


class PolicyResultCount(BaseModel):
"""PolicyResultCount represents the count of the policy results"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

advisory_failed: int | None = Field(None, alias="advisory-failed")
mandatory_failed: int | None = Field(None, alias="mandatory-failed")
passed: int | None = Field(None, alias="passed")
errored: int | None = Field(None, alias="errored")


class PolicyEvaluationList(BaseModel):
"""PolicyEvaluationList represents a list of policy evaluations"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

items: list[PolicyEvaluation] | None = Field(default_factory=list)
current_page: int | None = None
next_page: str | None = None
prev_page: str | None = None
total_count: int | None = None
total_pages: int | None = None


class PolicyEvaluationListOptions(BaseModel):
"""PolicyEvaluationListOptions represents the options for listing policy evaluations"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

page_number: int | None = Field(None, alias="page[number]")
page_size: int | None = Field(None, alias="page[size]")
66 changes: 66 additions & 0 deletions src/pytfe/models/policy_set_outcome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

from pydantic import BaseModel, ConfigDict, Field

from .policy_evaluation import PolicyEvaluation, PolicyResultCount


class PolicySetOutcome(BaseModel):
"""PolicySetOutcome represents outcome of the policy set that are part of the policy evaluation"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

id: str
outcomes: list[Outcome] = Field(default_factory=list, alias="outcomes")
error: str | None = Field(None, alias="error")
overridable: bool | None = Field(None, alias="overridable")
policy_set_name: str | None = Field(None, alias="policy-set-name")
policy_set_description: str | None = Field(None, alias="policy-set-description")
result_count: PolicyResultCount | None = Field(None, alias="result-count")

# The policy evaluation that this outcome belongs to
policy_evaluation: PolicyEvaluation | None = Field(None, alias="policy-evaluation")


class Outcome(BaseModel):
"""Outcome represents the outcome of the individual policy"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

enforcement_level: str | None = Field(None, alias="enforcement-level")
query: str | None = Field(None, alias="query")
status: str | None = Field(None, alias="status")
policy_name: str | None = Field(None, alias="policy-name")
description: str | None = Field(None, alias="description")


class PolicySetOutcomeList(BaseModel):
"""PolicySetOutcomeList represents a list of policy set outcomes"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

items: list[PolicySetOutcome] | None = Field(default_factory=list)
current_page: int | None = None
next_page: str | None = None
prev_page: str | None = None
total_count: int | None = None
total_pages: int | None = None


class PolicySetOutcomeListFilter(BaseModel):
"""PolicySetOutcomeListFilter represents the filters that are supported while listing a policy set outcome"""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

status: str | None = Field(None, alias="status")
enforcement_level: str | None = Field(None, alias="enforcement-level")


class PolicySetOutcomeListOptions(BaseModel):
"""PolicySetOutcomeListOptions represents the options for listing policy set outcomes."""

model_config = ConfigDict(populate_by_name=True, validate_by_name=True)

filter: dict[str, PolicySetOutcomeListFilter] | None = None
page_number: int | None = Field(None, alias="page[number]")
page_size: int | None = Field(None, alias="page[size]")
Loading
Loading