diff --git a/examples/state_versions.py b/examples/state_versions.py new file mode 100644 index 0000000..581b0e8 --- /dev/null +++ b/examples/state_versions.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import argparse +import os +from pathlib import Path + +from tfe import TFEClient, TFEConfig +from tfe.errors import ErrStateVersionUploadNotSupported +from tfe.models.state_version import ( + StateVersionCreateOptions, + StateVersionCurrentOptions, + StateVersionListOptions, +) +from tfe.models.state_version_output import StateVersionOutputsListOptions + + +def _print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def main(): + parser = argparse.ArgumentParser( + description="State Versions demo for python-tfe SDK" + ) + parser.add_argument( + "--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io") + ) + parser.add_argument("--token", default=os.getenv("TFE_TOKEN", "")) + parser.add_argument("--org", required=True, help="Organization name") + parser.add_argument("--workspace", required=True, help="Workspace name") + parser.add_argument("--workspace-id", required=True, help="Workspace ID") + parser.add_argument("--download", help="Path to save downloaded current state") + parser.add_argument("--upload", help="Path to a .tfstate (or JSON state) to upload") + parser.add_argument("--page", type=int, default=1) + parser.add_argument("--page-size", type=int, default=10) + args = parser.parse_args() + + cfg = TFEConfig(address=args.address, token=args.token) + client = TFEClient(cfg) + + options = StateVersionListOptions( + page_number=args.page, + page_size=args.page_size, + organization=args.org, + workspace=args.workspace, + ) + + sv_list = client.state_versions.list(options) + + print(f"Total state versions: {sv_list.total_count}") + print(f"Page {sv_list.current_page} of {sv_list.total_pages}") + print() + + for sv in sv_list.items: + print(f"- {sv.id} | status={sv.status} | created_at={sv.created_at}") + + # 1) List all state versions across org and workspace filters + _print_header("Org-scoped listing via /api/v2/state-versions (first page)") + all_sv = client.state_versions.list( + StateVersionListOptions( + organization=args.org, workspace=args.workspace, page_size=args.page_size + ) + ) + for sv in all_sv.items: + print(f"- {sv.id} | status={sv.status} | created_at={sv.created_at}") + + # 2) Read the current state version (with outputs included if you want) + _print_header("Reading current state version") + current = client.state_versions.read_current_with_options( + args.workspace_id, StateVersionCurrentOptions(include=["outputs"]) + ) + print( + f"Current SV: {current.id} status={current.status} durl={current.hosted_state_download_url}" + ) + + # 3) (Optional) Download the current state (optional) + if args.download: + _print_header(f"Downloading current state to: {args.download}") + raw = client.state_versions.download(current.id) + Path(args.download).write_bytes(raw) + print(f"Wrote {len(raw)} bytes to {args.download}") + + # 4) List outputs for the current state version (paged) + _print_header("Listing outputs (current state version)") + outs = client.state_versions.list_outputs( + current.id, options=StateVersionOutputsListOptions(page_size=50) + ) + if not outs.items: + print("No outputs found.") + for o in outs.items: + # Sensitive outputs will have value = None + print(f"- {o.name}: sensitive={o.sensitive} type={o.type} value={o.value}") + + # 5) (Optional) Upload a new state file + if args.upload: + _print_header(f"Uploading new state from: {args.upload}") + payload = Path(args.upload).read_bytes() + try: + # If your server supports signed uploads, this will: + # a) create SV (to get upload URL) + # b) PUT bytes to the signed URL + # c) read back the SV to return a hydrated object + new_sv = client.state_versions.upload( + args.workspace_id, + raw_state=payload, + options=StateVersionCreateOptions(), + ) + print(f"Uploaded new SV: {new_sv.id} status={new_sv.status}") + except ErrStateVersionUploadNotSupported as e: + # Some older/self-hosted versions don’t support direct upload + print(f"Upload not supported on this server: {e}") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 76afdbc..1b11eeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ description = "Official Python SDK for HashiCorp Terraform Cloud / Terraform Ent readme = "README.md" license = { text = "MPL-2.0" } authors = [{ name = "HashiCorp" }] -requires-python = ">=3.9" +requires-python = ">=3.10" dependencies = [ "httpx>=0.27.0,<0.29.0", "pydantic>=2.6,<3", diff --git a/src/tfe/_http.py b/src/tfe/_http.py index b8a351f..e5b8d8b 100644 --- a/src/tfe/_http.py +++ b/src/tfe/_http.py @@ -1,8 +1,10 @@ from __future__ import annotations +import re import time from collections.abc import Mapping from typing import Any +from urllib.parse import urljoin import anyio import httpx @@ -18,6 +20,8 @@ _RETRY_STATUSES = {429, 502, 503, 504} +ABSOLUTE_URL_RE = re.compile(r"^https?://", re.I) + class HTTPTransport: def __init__( @@ -56,6 +60,12 @@ def __init__( http2=http2, timeout=timeout, verify=ca_bundle or verify_tls ) # proxies=proxies + def _build_url(self, path: str) -> str: + # IMPORTANT: don't prefix absolute URLs (hosted_state, signed blobs, etc.) + if ABSOLUTE_URL_RE.match(path): + return path + return urljoin(self.base, path.lstrip("/")) + def request( self, method: str, @@ -66,11 +76,12 @@ def request( headers: dict[str, str] | None = None, allow_redirects: bool = True, ) -> httpx.Response: - url = f"{self.base}{path}" + url = self._build_url(path) hdrs = dict(self.headers) if headers: hdrs.update(headers) attempt = 0 + # print(method, url, params, json_body, hdrs) while True: try: resp = self._sync.request( @@ -92,6 +103,7 @@ def request( self._sleep(attempt, retry_after) attempt += 1 continue + # print(resp) self._raise_if_error(resp) return resp diff --git a/src/tfe/client.py b/src/tfe/client.py index 1e35c7c..e7a2f92 100644 --- a/src/tfe/client.py +++ b/src/tfe/client.py @@ -5,6 +5,8 @@ from .resources.organizations import Organizations from .resources.projects import Projects from .resources.registry_module import RegistryModules +from .resources.state_version_outputs import StateVersionOutputs +from .resources.state_versions import StateVersions from .resources.variable import Variables from .resources.workspaces import Workspaces @@ -32,5 +34,8 @@ def __init__(self, config: TFEConfig | None = None): self.workspaces = Workspaces(self._transport) self.registry_modules = RegistryModules(self._transport) + self.state_versions = StateVersions(self._transport) + self.state_version_outputs = StateVersionOutputs(self._transport) + def close(self) -> None: pass diff --git a/src/tfe/errors.py b/src/tfe/errors.py index ed0b897..be17b8a 100644 --- a/src/tfe/errors.py +++ b/src/tfe/errors.py @@ -52,6 +52,9 @@ class InvalidValues(TFEError): ... class RequiredFieldMissing(TFEError): ... +class ErrStateVersionUploadNotSupported(TFEError): ... + + # Error message constants ERR_INVALID_NAME = "invalid value for name" ERR_REQUIRED_NAME = "name is required" diff --git a/src/tfe/models/state_version.py b/src/tfe/models/state_version.py new file mode 100644 index 0000000..018620c --- /dev/null +++ b/src/tfe/models/state_version.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum + +from pydantic import BaseModel, ConfigDict, Field + +# ---- Enums ---- + + +class StateVersionStatus(str, Enum): + PENDING = "pending" + FINALIZED = "finalized" + DISCARDED = "discarded" + + +class StateVersionIncludeOpt(str, Enum): + CREATED_BY = "created_by" + RUN = "run" + RUN_CREATED_BY = "run.created_by" + RUN_CONFIGURATION_VERSION = "run.configuration_version" + OUTPUTS = "outputs" + + +# ---- DTOs ---- + + +class StateVersion(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str = Field(..., alias="id") + created_at: datetime = Field(..., alias="created-at") + hosted_state_download_url: str | None = Field( + None, alias="hosted-state-download-url" + ) + hosted_state_upload_url: str | None = Field(None, alias="hosted-state-upload-url") + status: StateVersionStatus | None = Field(None, alias="status") + + # Optional/advanced fields (present on newer servers; keep loose) + resources_processed: bool | None = Field(None, alias="resources-processed") + modules: dict | None = None + providers: dict | None = None + resources: list[dict] | None = None + + +class StateVersionCreateOptions(BaseModel): + """ + Options for POST /workspaces/:id/state-versions. + If you omit inline content (recommended), you must include serial & md5. + """ + + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + serial: int + md5: str + lineage: str | None = None + terraform_version: str | None = Field(None, alias="terraform-version") + + # Optional one-shot create path (if you don't use signed upload) + state: str | None = None # base64-encoded tfstate + json_state: str | None = Field(None, alias="json-state") + json_state_outputs: str | None = Field(None, alias="json-state-outputs") + + +class StateVersionCurrentOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + include: list[StateVersionIncludeOpt] | None = None + + +class StateVersionReadOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + include: list[StateVersionIncludeOpt] | None = None + + +class StateVersionListOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + # Standard pagination + filters + page_number: int | None = Field(None, alias="page[number]") + page_size: int | None = Field(None, alias="page[size]") + organization: str | None = Field(None, alias="filter[organization][name]") + workspace: str | None = Field(None, alias="filter[workspace][name]") + + +class StateVersionList(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + items: list[StateVersion] = Field(default_factory=list) + current_page: int | None = None + total_pages: int | None = None + total_count: int | None = None diff --git a/src/tfe/models/state_version_output.py b/src/tfe/models/state_version_output.py new file mode 100644 index 0000000..1d69b27 --- /dev/null +++ b/src/tfe/models/state_version_output.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class StateVersionOutput(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + name: str + sensitive: bool + type: str + value: Any + detailed_type: Any | None = Field(None, alias="detailed-type") + + +class StateVersionOutputsListOptions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + page_number: int | None = Field(None, alias="page[number]") + page_size: int | None = Field(None, alias="page[size]") + + +class StateVersionOutputsList(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + items: list[StateVersionOutput] = Field(default_factory=list) + current_page: int | None = None + total_pages: int | None = None + total_count: int | None = None diff --git a/src/tfe/resources/state_version_outputs.py b/src/tfe/resources/state_version_outputs.py new file mode 100644 index 0000000..205073c --- /dev/null +++ b/src/tfe/resources/state_version_outputs.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +from typing import Any + +from ..models.state_version_output import ( + StateVersionOutput, + StateVersionOutputsList, + StateVersionOutputsListOptions, +) +from ..utils import valid_string_id +from ._base import _Service + + +def _safe_str(v: Any, default: str = "") -> str: + return v if isinstance(v, str) else (str(v) if v is not None else default) + + +class StateVersionOutputs(_Service): + """ + HCPTF and TFE State Version Outputs service. + + Endpoints: + - GET /api/v2/state-version-outputs/:id + - GET /api/v2/workspaces/:workspace_id/current-state-version-outputs + """ + + def read(self, output_id: str) -> StateVersionOutput: + """Read a specific state version output by ID.""" + if not valid_string_id(output_id): + raise ValueError("invalid output id") + + r = self.t.request("GET", f"/api/v2/state-version-outputs/{output_id}") + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + + return StateVersionOutput( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + def read_current( + self, + workspace_id: str, + options: StateVersionOutputsListOptions | None = None, + ) -> StateVersionOutputsList: + """ + Read outputs for the workspace's current state version. + Note: sensitive outputs are returned with null values by the API. + """ + if not valid_string_id(workspace_id): + raise ValueError("invalid workspace id") + + params: dict[str, Any] = {} + if options: + if options.page_number is not None: + params["page[number]"] = options.page_number + if options.page_size is not None: + params["page[size]"] = options.page_size + + r = self.t.request( + "GET", + f"/api/v2/workspaces/{workspace_id}/current-state-version-outputs", + params=params, + ) + data = r.json() + + items: list[StateVersionOutput] = [] + for item in data.get("data", []): + attr = item.get("attributes", {}) or {} + items.append( + StateVersionOutput( + id=_safe_str(item.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + ) + + meta = data.get("meta", {}).get("pagination", {}) or {} + return StateVersionOutputsList( + items=items, + current_page=meta.get("current-page"), + total_pages=meta.get("total-pages"), + total_count=meta.get("total-count"), + ) diff --git a/src/tfe/resources/state_versions.py b/src/tfe/resources/state_versions.py new file mode 100644 index 0000000..29e1f5c --- /dev/null +++ b/src/tfe/resources/state_versions.py @@ -0,0 +1,330 @@ +from __future__ import annotations + +from typing import Any +from urllib.parse import urlencode + +from ..errors import NotFound + +# Pydantic models for this feature +from ..models.state_version import ( + StateVersion, + StateVersionCreateOptions, + StateVersionCurrentOptions, + StateVersionList, + StateVersionListOptions, + StateVersionReadOptions, +) +from ..models.state_version_output import ( + StateVersionOutput, + StateVersionOutputsList, + StateVersionOutputsListOptions, +) +from ..utils import looks_like_workspace_id, valid_string_id +from ._base import _Service + + +def _safe_str(v: Any, default: str = "") -> str: + return v if isinstance(v, str) else (str(v) if v is not None else default) + + +class StateVersions(_Service): + """ + TFE/TFC State Versions service. + + Endpoints covered (JSON:API): + - GET /api/v2/workspaces/:workspace_id/state-versions + - GET /api/v2/workspaces/:workspace_id/current-state-version + - GET /api/v2/state-versions/:id + - GET /api/v2/state-versions/:id/download + - GET /api/v2/state-versions/:id/outputs + - POST /api/v2/workspaces/:workspace_id/state-versions + - POST /api/v2/state-versions/:id/actions/soft_delete_backing_data (TFE only) + - POST /api/v2/state-versions/:id/actions/restore_backing_data (TFE only) + - POST /api/v2/state-versions/:id/actions/permanently_delete_backing_data (TFE only) + """ + + def _resolve_workspace_id(self, workspace: str, organization: str | None) -> str: + """Accept a workspace ID (ws-xxxxxx) or resolve by name with organization.""" + if looks_like_workspace_id(workspace): + return workspace + if not organization: + raise ValueError("organization is required when workspace is a name") + r = self.t.request( + "GET", f"/api/v2/organizations/{organization}/workspaces/{workspace}" + ) + data = r.json().get("data") or {} + ws_id = _safe_str(data.get("id")) + if not ws_id: + raise NotFound(f"workspace '{workspace}' not found in org '{organization}'") + return ws_id + + # ---------------------------- + # Listing & reading + # ---------------------------- + + @staticmethod + def _encode_query(params: dict[str, Any]) -> str: + clean = {k: v for k, v in params.items() if v is not None} + if not clean: + return "" + return "?" + urlencode(clean, doseq=True) + + def list(self, options: StateVersionListOptions | None = None) -> StateVersionList: + """ + GET /state-versions + Accepts filters for organization and workspace and standard pagination. + """ + params = options.model_dump(by_alias=True, exclude_none=True) if options else {} + path = f"/api/v2/state-versions{self._encode_query(params)}" + r = self.t.request("GET", path) + jd = r.json() + # Expecting JSON:API list. Normalize to models. + items = [] + meta = jd.get("meta", {}) + for d in jd.get("data", []): + attrs = d.get("attributes", {}) + attrs["id"] = d.get("id") + items.append(StateVersion.model_validate(attrs)) + return StateVersionList( + items=items, + current_page=meta.get("pagination", {}).get("current-page"), + total_pages=meta.get("pagination", {}).get("total-pages"), + total_count=meta.get("pagination", {}).get("total-count"), + ) + + def read(self, state_version_id: str) -> StateVersion: + """Read a state version by ID.""" + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + + r = self.t.request("GET", f"/api/v2/state-versions/{state_version_id}") + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + + return StateVersion( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + def read_with_options( + self, state_version_id: str, options: StateVersionReadOptions + ) -> StateVersion: + """Read a state version with include options (?include=outputs,run,created_by,...).""" + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + + params: dict[str, Any] = {} + if options and options.include: + params["include"] = ",".join(options.include) + + r = self.t.request( + "GET", f"/api/v2/state-versions/{state_version_id}", params=params + ) + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + + return StateVersion( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + def read_current(self, workspace_id: str) -> StateVersion: + """Read the current state version for a workspace.""" + if not valid_string_id(workspace_id): + raise ValueError("invalid workspace id") + + r = self.t.request( + "GET", f"/api/v2/workspaces/{workspace_id}/current-state-version" + ) + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + + return StateVersion( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + def read_current_with_options( + self, workspace_id: str, options: StateVersionCurrentOptions + ) -> StateVersion: + """Read the current state version with include options.""" + if not valid_string_id(workspace_id): + raise ValueError("invalid workspace id") + + params: dict[str, Any] = {} + if options and options.include: + params["include"] = ",".join(options.include) + + r = self.t.request( + "GET", + f"/api/v2/workspaces/{workspace_id}/current-state-version", + params=params, + ) + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + + return StateVersion( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + # ---------------------------- + # Create / upload (signed URL) + # ---------------------------- + + def create( + self, + workspace: str, + options: StateVersionCreateOptions, + *, + organization: str | None = None, + ) -> StateVersion: + """Create a state-version record (returns hosted upload URLs if content omitted).""" + ws_id = self._resolve_workspace_id(workspace, organization) + + attrs = options.model_dump(by_alias=True, exclude_none=True) + if not attrs: + # API requires attributes; at minimum serial & md5 + raise ValueError( + "state-version create requires attributes (at least serial & md5)" + ) + + body = {"data": {"type": "state-versions", "attributes": attrs}} + r = self.t.request( + "POST", f"/api/v2/workspaces/{ws_id}/state-versions", json_body=body + ) + d = r.json()["data"] + attr = d.get("attributes", {}) or {} + return StateVersion( + id=_safe_str(d.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + + """ + def upload( + self, + workspace: str, + *, + raw_state: bytes | None = None, + raw_json_state: bytes | None = None, + options: Optional[StateVersionCreateOptions] = None, + organization: Optional[str] = None, + ) -> StateVersion: + # TBD: Implements Upload State Functionality + """ + + def download(self, state_version_id: str) -> bytes: + """ + Download the raw state file bytes for a specific state version. + + HCP Terraform returns a signed blob URL in the state-version attributes + called 'hosted-state-download-url'. We must fetch that URL directly. + """ + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + + sv = self.read(state_version_id) + url = sv.hosted_state_download_url + if not url: + # Can happen if SV is missing, not finalized yet, or you lack permissions. + # Also happens on some older/self-hosted versions if backing data was GC’d. + from ..errors import NotFound + + raise NotFound("download url not available for this state version") + + # Download the bytes from the signed Archivist URL (follow redirects). + # Avoid JSON:API headers here; Accept */* is fine. + resp = self.t.request( + "GET", url, allow_redirects=True, headers={"Accept": "application/json"} + ) + return resp.content + + def download_current(self, workspace_id: str) -> bytes: + """Download the current state for a workspace.""" + if not valid_string_id(workspace_id): + raise ValueError("invalid workspace id") + + sv = self.read_current(workspace_id) + url = sv.hosted_state_download_url + if not url: + from ..errors import NotFound + + raise NotFound("download url not available for current state") + resp = self.t.request( + "GET", url, allow_redirects=True, headers={"Accept": "*/*"} + ) + return resp.content + + # ---------------------------- + # Outputs (via state version) + # ---------------------------- + + def list_outputs( + self, + state_version_id: str, + options: StateVersionOutputsListOptions | None = None, + ) -> StateVersionOutputsList: + """List outputs for a given state version (paged).""" + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + + params: dict[str, Any] = {} + if options: + if options.page_number is not None: + params["page[number]"] = options.page_number + if options.page_size is not None: + params["page[size]"] = options.page_size + + r = self.t.request( + "GET", f"/api/v2/state-versions/{state_version_id}/outputs", params=params + ) + data = r.json() + + items: list[StateVersionOutput] = [] + for item in data.get("data", []): + attr = item.get("attributes", {}) or {} + items.append( + StateVersionOutput( + id=_safe_str(item.get("id")), + **{k.replace("-", "_"): v for k, v in attr.items()}, + ) + ) + + meta = data.get("meta", {}).get("pagination", {}) or {} + return StateVersionOutputsList( + items=items, + current_page=meta.get("current-page"), + total_pages=meta.get("total-pages"), + total_count=meta.get("total-count"), + ) + + # ---------------------------- + # TFE-only backing data actions + # ---------------------------- + + def soft_delete_backing_data(self, state_version_id: str) -> None: + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + self.t.request( + "POST", + f"/api/v2/state-versions/{state_version_id}/actions/soft_delete_backing_data", + ) + return None + + def restore_backing_data(self, state_version_id: str) -> None: + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + self.t.request( + "POST", + f"/api/v2/state-versions/{state_version_id}/actions/restore_backing_data", + ) + return None + + def permanently_delete_backing_data(self, state_version_id: str) -> None: + if not valid_string_id(state_version_id): + raise ValueError("invalid state version id") + self.t.request( + "POST", + f"/api/v2/state-versions/{state_version_id}/actions/permanently_delete_backing_data", + ) + return None diff --git a/src/tfe/utils.py b/src/tfe/utils.py index 7c3a0bf..e99eea5 100644 --- a/src/tfe/utils.py +++ b/src/tfe/utils.py @@ -2,7 +2,8 @@ import re import time -from collections.abc import Callable +from collections.abc import Callable, Mapping +from typing import Any from .errors import ( InvalidNameError, @@ -22,6 +23,7 @@ ) _STRING_ID_PATTERN = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_-]{2,}$") +_WS_ID_RE = re.compile(r"^ws-[A-Za-z0-9]+$") _VERSION_PATTERN = re.compile( r"^\d+\.\d+\.\d+(?:-[a-zA-Z0-9]+(?:\.[a-zA-Z0-9]+)*)?(?:\+[a-zA-Z0-9]+(?:\.[a-zA-Z0-9]+)*)?$" ) @@ -51,6 +53,34 @@ def valid_string_id(v: str | None) -> bool: return v is not None and _STRING_ID_PATTERN.match(str(v)) is not None +def _safe_str(v: Any, default: str = "") -> str: + return v if isinstance(v, str) else (str(v) if v is not None else default) + + +def looks_like_workspace_id(value: Any) -> bool: + """True if value matches "ws-" pattern.""" + return isinstance(value, str) and bool(_WS_ID_RE.match(value)) + + +def encode_query(params: Mapping[str, Any] | None) -> str: + """ + Best-effort encoder for JSON:API-style query dicts into a query string. + Keeps keys like "page[number]" intact. Values that are lists/tuples are joined with commas. + """ + if not params: + return "" + parts: list[str] = [] + for k, v in params.items(): + if v is None: + continue + if isinstance(v, (list, tuple)): + sv = ",".join(str(x) for x in v) + else: + sv = str(v) + parts.append(f"{k}={sv}") + return ("?" + "&".join(parts)) if parts else "" + + def valid_version(v: str | None) -> bool: """Validate semantic version string.""" return v is not None and _VERSION_PATTERN.match(str(v)) is not None