From bbfe6d7514bebfd05efe4b40fe519bf026c0358e Mon Sep 17 00:00:00 2001 From: Chris Busillo Date: Sun, 3 May 2026 00:16:29 -0400 Subject: [PATCH] Type product context cutover store boundary --- control_plane/product_context_cutover.py | 270 ++++++++++++++++------- tests/test_product_context_cutover.py | 214 +++++++++++++++++- 2 files changed, 390 insertions(+), 94 deletions(-) diff --git a/control_plane/product_context_cutover.py b/control_plane/product_context_cutover.py index a015e841..f2dfa111 100644 --- a/control_plane/product_context_cutover.py +++ b/control_plane/product_context_cutover.py @@ -2,12 +2,15 @@ import hashlib import uuid -from typing import Literal +from typing import Literal, Protocol from pydantic import BaseModel, ConfigDict, model_validator from control_plane.contracts.dokploy_target_id_record import DokployTargetIdRecord +from control_plane.contracts.dokploy_target_record import DokployTargetRecord +from control_plane.contracts.environment_inventory import EnvironmentInventory from control_plane.contracts.product_profile_record import LaunchplaneProductProfileRecord +from control_plane.contracts.release_tuple_record import ReleaseTupleRecord from control_plane.contracts.runtime_environment_record import ( RuntimeEnvironmentDeleteEvent, RuntimeEnvironmentRecord, @@ -18,12 +21,100 @@ SecretRecord, SecretVersion, ) -from control_plane.storage.postgres import PostgresRecordStore from control_plane.workflows.ship import utc_now_timestamp ContextCutoverMode = Literal["dry-run", "apply"] LegacyContextCleanupMode = Literal["dry-run", "apply"] +CurrentAuthorityDeleteStatus = Literal["deleted", "missing", "changed"] +RuntimeEnvironmentDeleteStatus = Literal["deleted", "missing", "changed"] + + +class ProductContextCutoverReadStore(Protocol): + def read_product_profile_record(self, product: str) -> LaunchplaneProductProfileRecord: ... + + def list_product_profile_records( + self, *, driver_id: str = "" + ) -> tuple[LaunchplaneProductProfileRecord, ...]: ... + + def list_runtime_environment_records( + self, *, context_name: str = "", instance_name: str = "" + ) -> tuple[RuntimeEnvironmentRecord, ...]: ... + + def list_dokploy_target_records(self) -> tuple[DokployTargetRecord, ...]: ... + + def list_dokploy_target_id_records(self) -> tuple[DokployTargetIdRecord, ...]: ... + + def list_secret_records( + self, + *, + integration: str = "", + context_name: str = "", + instance_name: str = "", + limit: int | None = None, + ) -> tuple[SecretRecord, ...]: ... + + def find_secret_record( + self, + *, + scope: str, + integration: str, + name: str, + context: str = "", + instance: str = "", + ) -> SecretRecord | None: ... + + def list_secret_bindings( + self, + *, + integration: str = "", + context_name: str = "", + instance_name: str = "", + limit: int | None = None, + ) -> tuple[SecretBinding, ...]: ... + + def list_environment_inventory(self) -> tuple[EnvironmentInventory, ...]: ... + + def list_release_tuple_records(self) -> tuple[ReleaseTupleRecord, ...]: ... + + +class ProductContextCutoverStore(ProductContextCutoverReadStore, Protocol): + def write_product_profile_record(self, record: LaunchplaneProductProfileRecord) -> None: ... + + def write_runtime_environment_record(self, record: RuntimeEnvironmentRecord) -> None: ... + + def delete_runtime_environment_record_with_event( + self, + *, + event: RuntimeEnvironmentDeleteEvent, + expected_record: RuntimeEnvironmentRecord, + ) -> RuntimeEnvironmentDeleteStatus: ... + + def write_dokploy_target_record(self, record: DokployTargetRecord) -> None: ... + + def delete_dokploy_target_record( + self, *, expected_record: DokployTargetRecord + ) -> CurrentAuthorityDeleteStatus: ... + + def write_dokploy_target_id_record(self, record: DokployTargetIdRecord) -> None: ... + + def delete_dokploy_target_id_record( + self, *, expected_record: DokployTargetIdRecord + ) -> CurrentAuthorityDeleteStatus: ... + + def read_secret_version(self, version_id: str) -> SecretVersion: ... + + def write_secret_version(self, version: SecretVersion) -> None: ... + + def write_secret_record(self, record: SecretRecord) -> None: ... + + def write_secret_binding(self, binding: SecretBinding) -> None: ... + + def write_secret_audit_event(self, event: SecretAuditEvent) -> None: ... + + def write_environment_inventory(self, record: EnvironmentInventory) -> None: ... + + def write_release_tuple_record(self, record: ReleaseTupleRecord) -> None: ... class ProductContextCutoverRequest(BaseModel): @@ -168,7 +259,7 @@ def _secret_cleanup_event_id(secret_id: str) -> str: def _record_target_context_exists( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverReadStore, source_context: str, target_context: str, product: str, @@ -194,7 +285,7 @@ def _record_target_context_exists( def _target_runtime_route_exists( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverReadStore, source_record: RuntimeEnvironmentRecord, target_context: str, ) -> bool: @@ -206,7 +297,7 @@ def _target_runtime_route_exists( def _target_secret_route_exists( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverReadStore, source_record: SecretRecord, target_context: str, ) -> bool: @@ -233,7 +324,7 @@ def _target_instance_exists( def _source_secret_bindings( - *, record_store: PostgresRecordStore, record: SecretRecord + *, record_store: ProductContextCutoverReadStore, record: SecretRecord ) -> tuple[SecretBinding, ...]: return tuple( binding @@ -296,7 +387,7 @@ def _profile_semantic_payload(profile: LaunchplaneProductProfileRecord) -> dict[ def plan_product_context_cutover( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverReadStore, request: ProductContextCutoverRequest, ) -> dict[str, object]: profile = record_store.read_product_profile_record(request.product) @@ -309,7 +400,7 @@ def plan_product_context_cutover( context_name=request.target_context ) } - runtime_records = [ + runtime_records: list[dict[str, object]] = [ { "scope": record.scope, "instance": record.instance, @@ -332,7 +423,7 @@ def plan_product_context_cutover( for record in record_store.list_dokploy_target_records() if record.context == request.target_context } - dokploy_targets = [ + dokploy_targets: list[dict[str, object]] = [ { "instance": record.instance, "target_type": record.target_type, @@ -355,7 +446,7 @@ def plan_product_context_cutover( for record in record_store.list_dokploy_target_id_records() if record.context == request.target_context } - dokploy_target_ids = [ + dokploy_target_ids: list[dict[str, object]] = [ { "instance": record.instance, "target_id": record.target_id, @@ -408,7 +499,7 @@ def plan_product_context_cutover( for record in record_store.list_environment_inventory() if record.context == request.target_context } - inventory_records = [ + inventory_records: list[dict[str, object]] = [ { "instance": record.instance, "artifact_id": str(getattr(record.artifact_identity, "artifact_id", "") or ""), @@ -429,7 +520,7 @@ def plan_product_context_cutover( for record in record_store.list_release_tuple_records() if record.context == request.target_context } - release_tuples = [ + release_tuples: list[dict[str, object]] = [ { "channel": record.channel, "artifact_id": record.artifact_id, @@ -448,7 +539,7 @@ def plan_product_context_cutover( source_label=request.source_label, ) profile_changed = _profile_semantic_payload(profile) != _profile_semantic_payload(next_profile) - groups = { + groups: dict[str, list[dict[str, object]]] = { "runtime_environment_records": runtime_records, "managed_secret_records": managed_secrets, "dokploy_targets": dokploy_targets, @@ -482,7 +573,7 @@ def plan_product_context_cutover( def apply_product_context_cutover( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverStore, request: ProductContextCutoverRequest, ) -> dict[str, object]: plan = plan_product_context_cutover(record_store=record_store, request=request) @@ -490,18 +581,18 @@ def apply_product_context_cutover( return plan now = utc_now_timestamp() - for record in record_store.list_runtime_environment_records( + for runtime_record in record_store.list_runtime_environment_records( context_name=request.source_context ): exists = any( - target.scope == record.scope and target.instance == record.instance + target.scope == runtime_record.scope and target.instance == runtime_record.instance for target in record_store.list_runtime_environment_records( context_name=request.target_context ) ) if not exists: record_store.write_runtime_environment_record( - record.model_copy( + runtime_record.model_copy( update={ "context": request.target_context, "updated_at": now, @@ -510,18 +601,19 @@ def apply_product_context_cutover( ) ) - for record in tuple( + for dokploy_target_record in tuple( item for item in record_store.list_dokploy_target_records() if item.context == request.source_context ): exists = any( - target.context == request.target_context and target.instance == record.instance + target.context == request.target_context + and target.instance == dokploy_target_record.instance for target in record_store.list_dokploy_target_records() ) if not exists: record_store.write_dokploy_target_record( - record.model_copy( + dokploy_target_record.model_copy( update={ "context": request.target_context, "updated_at": now, @@ -530,41 +622,45 @@ def apply_product_context_cutover( ) ) - for record in tuple( + for target_id_record in tuple( item for item in record_store.list_dokploy_target_id_records() if item.context == request.source_context ): exists = any( - target.context == request.target_context and target.instance == record.instance + target.context == request.target_context + and target.instance == target_id_record.instance for target in record_store.list_dokploy_target_id_records() ) if not exists: record_store.write_dokploy_target_id_record( DokployTargetIdRecord( context=request.target_context, - instance=record.instance, - target_id=record.target_id, + instance=target_id_record.instance, + target_id=target_id_record.target_id, updated_at=now, source_label=request.source_label, ) ) - for record in record_store.list_secret_records( + for secret_record in record_store.list_secret_records( context_name=request.source_context, limit=None, ): target = record_store.find_secret_record( - scope=record.scope, - integration=record.integration, - name=record.name, + scope=secret_record.scope, + integration=secret_record.integration, + name=secret_record.name, context=request.target_context, - instance=record.instance, + instance=secret_record.instance, ) if target is not None: continue - target_secret_id = _target_secret_id(record, target_context=request.target_context) - source_version = record_store.read_secret_version(record.current_version_id) + target_secret_id = _target_secret_id( + secret_record, + target_context=request.target_context, + ) + source_version = record_store.read_secret_version(secret_record.current_version_id) target_version_id = _target_secret_version_id( secret_id=target_secret_id, source_version_id=source_version.version_id, @@ -583,30 +679,30 @@ def apply_product_context_cutover( record_store.write_secret_record( SecretRecord( secret_id=target_secret_id, - scope=record.scope, - integration=record.integration, - name=record.name, + scope=secret_record.scope, + integration=secret_record.integration, + name=secret_record.name, context=request.target_context, - instance=record.instance, - description=record.description, - policy=record.policy, - status=record.status, + instance=secret_record.instance, + description=secret_record.description, + policy=secret_record.policy, + status=secret_record.status, current_version_id=target_version_id, created_at=now, updated_at=now, - last_validated_at=record.last_validated_at, + last_validated_at=secret_record.last_validated_at, updated_by=request.source_label, ) ) for binding in tuple( item for item in record_store.list_secret_bindings( - integration=record.integration, + integration=secret_record.integration, context_name=request.source_context, - instance_name=record.instance, + instance_name=secret_record.instance, limit=None, ) - if item.secret_id == record.secret_id + if item.secret_id == secret_record.secret_id ): record_store.write_secret_binding( SecretBinding( @@ -629,7 +725,7 @@ def apply_product_context_cutover( SecretAuditEvent( event_id=_target_secret_event_id( secret_id=target_secret_id, - source_secret_id=record.secret_id, + source_secret_id=secret_record.secret_id, ), secret_id=target_secret_id, event_type="imported", @@ -638,42 +734,46 @@ def apply_product_context_cutover( detail="Launchplane imported managed secret during product context cutover.", metadata={ "source": request.source_label, - "source_secret_id": record.secret_id, + "source_secret_id": secret_record.secret_id, "source_context": request.source_context, "target_context": request.target_context, }, ) ) - for record in tuple( + for inventory_record in tuple( item for item in record_store.list_environment_inventory() if item.context == request.source_context ): exists = any( - target.context == request.target_context and target.instance == record.instance + target.context == request.target_context + and target.instance == inventory_record.instance for target in record_store.list_environment_inventory() ) if not exists: record_store.write_environment_inventory( - record.model_copy(update={"context": request.target_context, "updated_at": now}) + inventory_record.model_copy( + update={"context": request.target_context, "updated_at": now} + ) ) - for record in tuple( + for release_tuple_record in tuple( item for item in record_store.list_release_tuple_records() if item.context == request.source_context ): exists = any( - target.context == request.target_context and target.channel == record.channel + target.context == request.target_context + and target.channel == release_tuple_record.channel for target in record_store.list_release_tuple_records() ) if not exists: record_store.write_release_tuple_record( - record.model_copy( + release_tuple_record.model_copy( update={ "context": request.target_context, - "tuple_id": f"{request.target_context}-{record.channel}-{record.artifact_id}", + "tuple_id": f"{request.target_context}-{release_tuple_record.channel}-{release_tuple_record.artifact_id}", } ) ) @@ -694,7 +794,7 @@ def apply_product_context_cutover( def plan_legacy_context_cleanup( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverReadStore, request: LegacyContextCleanupRequest, ) -> dict[str, object]: _record_target_context_exists( @@ -705,46 +805,46 @@ def plan_legacy_context_cleanup( ) runtime_records: list[dict[str, object]] = [] - for record in record_store.list_runtime_environment_records( + for runtime_record in record_store.list_runtime_environment_records( context_name=request.source_context ): target_exists = _target_runtime_route_exists( record_store=record_store, - source_record=record, + source_record=runtime_record, target_context=request.target_context, ) runtime_records.append( { - "scope": record.scope, - "instance": record.instance, - "env_keys": sorted(record.env.keys()), - "env_value_count": len(record.env), + "scope": runtime_record.scope, + "instance": runtime_record.instance, + "env_keys": sorted(runtime_record.env.keys()), + "env_value_count": len(runtime_record.env), "action": "deleted" if target_exists else "blocked_missing_target", } ) secret_records: list[dict[str, object]] = [] - for record in record_store.list_secret_records( + for secret_record in record_store.list_secret_records( context_name=request.source_context, limit=None, ): - bindings = _source_secret_bindings(record_store=record_store, record=record) + bindings = _source_secret_bindings(record_store=record_store, record=secret_record) target_exists = _target_secret_route_exists( record_store=record_store, - source_record=record, + source_record=secret_record, target_context=request.target_context, ) action = "blocked_missing_target" if target_exists: - action = "skipped" if record.status == "disabled" else "disabled" + action = "skipped" if secret_record.status == "disabled" else "disabled" secret_records.append( { - "secret_id": record.secret_id, - "scope": record.scope, - "integration": record.integration, - "name": record.name, - "instance": record.instance, - "status": record.status, + "secret_id": secret_record.secret_id, + "scope": secret_record.scope, + "integration": secret_record.integration, + "name": secret_record.name, + "instance": secret_record.instance, + "status": secret_record.status, "binding_keys": sorted(binding.binding_key for binding in bindings), "binding_statuses": {binding.binding_key: binding.status for binding in bindings}, "action": action, @@ -752,7 +852,7 @@ def plan_legacy_context_cleanup( ) target_records = record_store.list_dokploy_target_records() - dokploy_targets = [ + dokploy_targets: list[dict[str, object]] = [ { "instance": record.instance, "target_type": record.target_type, @@ -773,7 +873,7 @@ def plan_legacy_context_cleanup( ] target_id_records = record_store.list_dokploy_target_id_records() - dokploy_target_ids = [ + dokploy_target_ids: list[dict[str, object]] = [ { "instance": record.instance, "target_id": record.target_id, @@ -799,7 +899,7 @@ def plan_legacy_context_cleanup( for record in record_store.list_release_tuple_records() if record.context == request.source_context ) - groups = { + groups: dict[str, list[dict[str, object]]] = { "runtime_environment_records": runtime_records, "managed_secret_records": secret_records, "dokploy_targets": dokploy_targets, @@ -834,7 +934,7 @@ def plan_legacy_context_cleanup( def apply_legacy_context_cleanup( *, - record_store: PostgresRecordStore, + record_store: ProductContextCutoverStore, request: LegacyContextCleanupRequest, ) -> dict[str, object]: plan = plan_legacy_context_cleanup(record_store=record_store, request=request) @@ -845,28 +945,28 @@ def apply_legacy_context_cleanup( now = utc_now_timestamp() actor = request.actor or request.source_label - for record in record_store.list_runtime_environment_records( + for runtime_record in record_store.list_runtime_environment_records( context_name=request.source_context ): status = record_store.delete_runtime_environment_record_with_event( event=_runtime_delete_event( - record=record, + record=runtime_record, actor=actor, source_label=request.source_label, now=now, ), - expected_record=record, + expected_record=runtime_record, ) if status == "changed": raise ValueError("Runtime environment record changed during cleanup.") - for record in record_store.list_secret_records( + for secret_record in record_store.list_secret_records( context_name=request.source_context, limit=None, ): - if record.status != "disabled": + if secret_record.status != "disabled": record_store.write_secret_record( - record.model_copy( + secret_record.model_copy( update={ "status": "disabled", "updated_at": now, @@ -876,8 +976,8 @@ def apply_legacy_context_cleanup( ) record_store.write_secret_audit_event( SecretAuditEvent( - event_id=_secret_cleanup_event_id(record.secret_id), - secret_id=record.secret_id, + event_id=_secret_cleanup_event_id(secret_record.secret_id), + secret_id=secret_record.secret_id, event_type="disabled", recorded_at=now, actor=actor, @@ -889,27 +989,27 @@ def apply_legacy_context_cleanup( }, ) ) - for binding in _source_secret_bindings(record_store=record_store, record=record): + for binding in _source_secret_bindings(record_store=record_store, record=secret_record): if binding.status != "disabled": record_store.write_secret_binding( binding.model_copy(update={"status": "disabled", "updated_at": now}) ) - for record in tuple( + for target_id_record in tuple( item for item in record_store.list_dokploy_target_id_records() if item.context == request.source_context ): - status = record_store.delete_dokploy_target_id_record(expected_record=record) + status = record_store.delete_dokploy_target_id_record(expected_record=target_id_record) if status == "changed": raise ValueError("Dokploy target ID record changed during cleanup.") - for record in tuple( + for dokploy_target_record in tuple( item for item in record_store.list_dokploy_target_records() if item.context == request.source_context ): - status = record_store.delete_dokploy_target_record(expected_record=record) + status = record_store.delete_dokploy_target_record(expected_record=dokploy_target_record) if status == "changed": raise ValueError("Dokploy target record changed during cleanup.") diff --git a/tests/test_product_context_cutover.py b/tests/test_product_context_cutover.py index 4c64e89f..122ac0b5 100644 --- a/tests/test_product_context_cutover.py +++ b/tests/test_product_context_cutover.py @@ -1,9 +1,11 @@ import unittest from pathlib import Path from tempfile import TemporaryDirectory +from typing import cast from control_plane.contracts.dokploy_target_id_record import DokployTargetIdRecord from control_plane.contracts.dokploy_target_record import DokployTargetRecord +from control_plane.contracts.environment_inventory import EnvironmentInventory from control_plane.contracts.product_profile_record import ( LaunchplaneProductProfileRecord, ProductImageProfile, @@ -11,6 +13,7 @@ ProductPreviewProfile, ) from control_plane.contracts.runtime_environment_record import RuntimeEnvironmentRecord +from control_plane.contracts.release_tuple_record import ReleaseTupleRecord from control_plane.contracts.secret_record import SecretBinding, SecretRecord, SecretVersion from control_plane.product_context_cutover import ( LegacyContextCleanupBoundaryError, @@ -18,6 +21,7 @@ ProductContextCutoverRequest, apply_legacy_context_cleanup, apply_product_context_cutover, + plan_product_context_cutover, ) from control_plane.storage.postgres import PostgresRecordStore @@ -26,6 +30,14 @@ def _sqlite_database_url(database_path: Path) -> str: return f"sqlite+pysqlite:///{database_path}" +def _payload_section(payload: dict[str, object], key: str) -> dict[str, object]: + return cast("dict[str, object]", payload[key]) + + +def _payload_counts(payload: dict[str, object]) -> dict[str, dict[str, int]]: + return cast("dict[str, dict[str, int]]", payload["counts"]) + + def _seed_syo_source_records(store: PostgresRecordStore) -> None: store.write_product_profile_record( LaunchplaneProductProfileRecord( @@ -135,7 +147,189 @@ def _seed_syo_source_records(store: PostgresRecordStore) -> None: ) +class _FakeProductContextCutoverStore: + def __init__(self) -> None: + self.profile = LaunchplaneProductProfileRecord( + product="sellyouroutboard", + display_name="SellYourOutboard.com", + repository="cbusillo/sellyouroutboard", + driver_id="generic-web", + image=ProductImageProfile(repository="ghcr.io/cbusillo/sellyouroutboard"), + runtime_port=3000, + health_path="/api/health", + lanes=( + ProductLaneProfile( + instance="testing", + context="sellyouroutboard-testing", + base_url="https://syo-testing.example", + health_url="https://syo-testing.example/api/health", + ), + ProductLaneProfile( + instance="prod", + context="sellyouroutboard-testing", + base_url="https://www.sellyouroutboard.com", + health_url="https://www.sellyouroutboard.com/api/health", + ), + ), + preview=ProductPreviewProfile( + enabled=True, + context="sellyouroutboard-testing", + slug_template="pr-{number}", + ), + updated_at="2026-05-01T04:29:07Z", + source="fake-store", + ) + self.runtime_records = ( + RuntimeEnvironmentRecord( + scope="instance", + context="sellyouroutboard-testing", + instance="prod", + env={"CONTACT_EMAIL_MODE": "log"}, + updated_at="2026-05-01T19:15:31Z", + source_label="fake-store", + ), + ) + self.secret_records = ( + SecretRecord( + secret_id="secret-runtime-environment-smtp-password-sellyouroutboard-testing-prod", + scope="context_instance", + integration="runtime_environment", + name="SMTP_PASSWORD", + context="sellyouroutboard-testing", + instance="prod", + current_version_id="secret-version-source", + created_at="2026-05-01T04:00:00Z", + updated_at="2026-05-01T04:00:00Z", + ), + ) + self.secret_bindings = ( + SecretBinding( + binding_id="binding-source", + secret_id="secret-runtime-environment-smtp-password-sellyouroutboard-testing-prod", + integration="runtime_environment", + binding_key="SMTP_PASSWORD", + context="sellyouroutboard-testing", + instance="prod", + created_at="2026-05-01T04:00:00Z", + updated_at="2026-05-01T04:00:00Z", + ), + ) + + def read_product_profile_record(self, product: str) -> LaunchplaneProductProfileRecord: + if product != self.profile.product: + raise FileNotFoundError(product) + return self.profile + + def write_product_profile_record(self, record: LaunchplaneProductProfileRecord) -> None: + self.profile = record + + def list_product_profile_records( + self, *, driver_id: str = "" + ) -> tuple[LaunchplaneProductProfileRecord, ...]: + if driver_id and driver_id != self.profile.driver_id: + return () + return (self.profile,) + + def list_runtime_environment_records( + self, *, context_name: str = "", instance_name: str = "" + ) -> tuple[RuntimeEnvironmentRecord, ...]: + return tuple( + record + for record in self.runtime_records + if (not context_name or record.context == context_name) + and (not instance_name or record.instance == instance_name) + ) + + def list_dokploy_target_records(self) -> tuple[DokployTargetRecord, ...]: + return () + + def list_dokploy_target_id_records(self) -> tuple[DokployTargetIdRecord, ...]: + return () + + def list_secret_records( + self, + *, + integration: str = "", + context_name: str = "", + instance_name: str = "", + limit: int | None = None, + ) -> tuple[SecretRecord, ...]: + records = tuple( + record + for record in self.secret_records + if (not integration or record.integration == integration) + and (not context_name or record.context == context_name) + and (not instance_name or record.instance == instance_name) + ) + return records[:limit] if limit is not None else records + + def find_secret_record( + self, + *, + scope: str, + integration: str, + name: str, + context: str = "", + instance: str = "", + ) -> SecretRecord | None: + for record in self.secret_records: + if ( + record.scope == scope + and record.integration == integration + and record.name == name + and record.context == context + and record.instance == instance + ): + return record + return None + + def list_secret_bindings( + self, + *, + integration: str = "", + context_name: str = "", + instance_name: str = "", + limit: int | None = None, + ) -> tuple[SecretBinding, ...]: + bindings = tuple( + binding + for binding in self.secret_bindings + if (not integration or binding.integration == integration) + and (not context_name or binding.context == context_name) + and (not instance_name or binding.instance == instance_name) + ) + return bindings[:limit] if limit is not None else bindings + + def list_environment_inventory(self) -> tuple[EnvironmentInventory, ...]: + return () + + def list_release_tuple_records(self) -> tuple[ReleaseTupleRecord, ...]: + return () + + class ProductContextCutoverTests(unittest.TestCase): + def test_dry_run_uses_structural_store_boundary(self) -> None: + payload = plan_product_context_cutover( + record_store=_FakeProductContextCutoverStore(), + request=ProductContextCutoverRequest( + product="sellyouroutboard", + source_context="sellyouroutboard-testing", + target_context="sellyouroutboard", + display_name="SellYourOutboard", + ), + ) + + self.assertEqual(payload["mode"], "dry-run") + profile_payload = _payload_section(payload, "profile") + counts = _payload_counts(payload) + self.assertEqual(profile_payload["display_name"], "SellYourOutboard") + self.assertEqual(profile_payload["preview_context"], "sellyouroutboard") + self.assertEqual( + counts["runtime_environment_records"], + {"created": 1, "skipped": 0}, + ) + self.assertEqual(counts["managed_secret_records"], {"created": 1, "skipped": 0}) + def test_dry_run_reports_redacted_plan_without_writing_target_records(self) -> None: with TemporaryDirectory() as temporary_directory_name: store = PostgresRecordStore( @@ -161,13 +355,15 @@ def test_dry_run_reports_redacted_plan_without_writing_target_records(self) -> N store.close() self.assertEqual(payload["mode"], "dry-run") - self.assertEqual(payload["profile"]["display_name"], "SellYourOutboard") - self.assertEqual(payload["profile"]["preview_context"], "sellyouroutboard") + profile_payload = _payload_section(payload, "profile") + counts = _payload_counts(payload) + self.assertEqual(profile_payload["display_name"], "SellYourOutboard") + self.assertEqual(profile_payload["preview_context"], "sellyouroutboard") self.assertEqual( - payload["counts"]["runtime_environment_records"], + counts["runtime_environment_records"], {"created": 2, "skipped": 0}, ) - self.assertEqual(payload["counts"]["managed_secret_records"], {"created": 1, "skipped": 0}) + self.assertEqual(counts["managed_secret_records"], {"created": 1, "skipped": 0}) self.assertNotIn("encrypted-value", str(payload)) self.assertEqual(target_runtime_records, ()) @@ -245,14 +441,14 @@ def test_apply_copies_current_authority_records_and_updates_profile(self) -> Non self.assertEqual(version.ciphertext, "encrypted-value") self.assertEqual([binding.binding_key for binding in bindings], ["SMTP_PASSWORD"]) self.assertEqual( - repeated_payload["counts"]["runtime_environment_records"], + _payload_counts(repeated_payload)["runtime_environment_records"], {"created": 0, "skipped": 2}, ) self.assertEqual( - repeated_payload["counts"]["managed_secret_records"], + _payload_counts(repeated_payload)["managed_secret_records"], {"created": 0, "skipped": 1}, ) - self.assertEqual(repeated_payload["profile"]["action"], "unchanged") + self.assertEqual(_payload_section(repeated_payload, "profile")["action"], "unchanged") def test_apply_preserves_distinct_preview_context_in_history(self) -> None: with TemporaryDirectory() as temporary_directory_name: @@ -380,8 +576,8 @@ def test_legacy_cleanup_deletes_lookup_records_and_disables_source_secrets(self) self.assertEqual(dry_run["mode"], "dry-run") self.assertFalse(dry_run["blocked"]) - self.assertEqual(dry_run["counts"]["runtime_environment_records"], {"deleted": 2}) - self.assertEqual(dry_run["counts"]["managed_secret_records"], {"disabled": 1}) + self.assertEqual(_payload_counts(dry_run)["runtime_environment_records"], {"deleted": 2}) + self.assertEqual(_payload_counts(dry_run)["managed_secret_records"], {"disabled": 1}) self.assertEqual(payload["mode"], "apply") self.assertTrue(payload["applied"]) self.assertEqual(source_runtime_records, ())