Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions src/backend/src/routes/workflows_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,176 @@ async def get_step_types(
return manager.get_step_type_schemas()


# ---------------------------------------------------------------------------
# Trigger-type catalog
# ---------------------------------------------------------------------------
#
# Returns a UI-friendly catalog of every TriggerType enum member so the
# workflow-authoring form can:
# 1. Render human-readable labels (no more raw enum values in the picker).
# 2. Group triggers (lifecycle / request_flow / validation_gates /
# system_scheduled).
# 3. Show approval (for_*) vs process (on_* / before_* / scheduled / manual)
# based on the workflow being authored.
# 4. Pre-populate the entity_types multiselect with the entity types each
# trigger is wired for in the backend.
#
# Wire-format contract: every TriggerType member appears exactly once.
# `value` is the raw enum string (used as the FK in stored trigger configs).
# Labels here are the canonical source of truth; the FE getTriggerLabel()
# helper mirrors them so existing labels keep working if the endpoint is
# unavailable.
#
# entity_types mapping mirrors SUPPORTED_TRIGGER_ENTITY_MAP in the FE
# (src/frontend/src/lib/workflow-labels.ts) — keep them in sync. An empty
# list means "any entity type" (manual, scheduled, on_first_access).

_TRIGGER_LABELS: Dict[str, str] = {
"for_subscribe": "When a user subscribes",
"on_subscribe": "After a subscription is created",
"for_request_access": "When a user requests access",
"on_request_access": "After an access request is submitted",
"for_request_review": "When a user requests review",
"on_request_review": "After a review request is submitted",
"for_request_publish": "When a user requests publish",
"on_request_publish": "After a publish request is submitted",
"for_request_certify": "When a user requests certification",
"on_request_certify": "After a certification request is submitted",
"for_request_status_change": "When a user requests status change",
"on_request_status_change": "After a status change request is submitted",
"for_approval_response": "Approval response dialog",
"before_create": "Before entity is created (validation)",
"before_update": "Before entity is updated (validation)",
"before_status_change": "Before status changes (validation)",
"on_create": "After entity is created",
"on_update": "After entity is updated",
"on_delete": "After entity is deleted",
"on_status_change": "After status changes",
"on_publish": "After entity is published",
"on_unpublish": "After entity is unpublished",
"on_revoke": "After access is revoked",
"on_expiring": "When access is about to expire",
"on_first_access": "First time a user accesses (consent)",
"on_unsubscribe": "After a user unsubscribes",
"on_job_success": "After a background job succeeds",
"on_job_failure": "After a background job fails",
"scheduled": "On a schedule (cron)",
# Fallbacks for enum members that are not part of the user-approved table.
# These keep the catalog 1:1 with the enum without expanding the table.
"manual": "Manually triggered",
"on_certify": "After entity is certified",
"on_decertify": "After entity is decertified",
}

# Group assignment per the design brief. Anything not listed falls back to
# "lifecycle" for process workflows.
_TRIGGER_GROUPS_REQUEST_FLOW = {
"for_subscribe", "for_request_access", "for_request_review",
"for_request_publish", "for_request_certify",
"for_request_status_change", "for_approval_response",
"on_subscribe", "on_unsubscribe",
"on_request_access", "on_request_review", "on_request_publish",
"on_request_certify", "on_request_status_change",
"on_revoke", "on_expiring", "on_first_access",
}
_TRIGGER_GROUPS_LIFECYCLE = {
"on_create", "on_update", "on_delete", "on_status_change",
"on_publish", "on_unpublish",
# on_certify / on_decertify are lifecycle in spirit — they describe an
# entity transitioning state, not a request flow.
"on_certify", "on_decertify",
}
_TRIGGER_GROUPS_VALIDATION = {
"before_create", "before_update", "before_status_change",
}
_TRIGGER_GROUPS_SYSTEM_SCHEDULED = {
"on_job_success", "on_job_failure", "scheduled", "manual",
}

# Entity types each trigger CAN fire for, derived from the dispatch sites in
# src/backend/src/common/workflow_triggers.py and the existing FE map
# (SUPPORTED_TRIGGER_ENTITY_MAP). Empty list = "any entity" (no constraint).
_TRIGGER_ENTITY_TYPES: Dict[str, List[str]] = {
# CRUD / lifecycle
"on_create": ["catalog", "schema", "table", "data_contract", "data_product", "domain"],
"on_update": ["data_contract", "data_product", "domain"],
"on_delete": ["data_contract", "data_product", "domain"],
"on_status_change": ["data_contract", "data_product", "data_asset_review"],
"on_publish": ["data_contract", "data_product"],
"on_unpublish": ["data_contract", "data_product"],
"on_certify": ["data_contract", "data_product"],
"on_decertify": ["data_contract", "data_product"],
# Validation gates
"before_create": ["catalog", "schema", "table"],
"before_update": ["data_contract"],
"before_status_change": ["data_contract", "data_product"],
# Request flow — process side
"on_request_review": ["data_contract", "data_product", "data_asset_review"],
"on_request_access": ["access_grant", "project", "role"],
"on_request_publish": ["data_contract", "data_product"],
"on_request_certify": ["data_contract", "data_product"],
"on_request_status_change": ["data_product"],
"on_subscribe": ["subscription", "data_product", "data_contract"],
"on_unsubscribe": ["subscription", "data_product", "data_contract"],
"on_revoke": ["access_grant"],
"on_expiring": ["access_grant"],
"on_first_access": ["user"],
# Request flow — approval (for_*) side. These mirror the matching on_*
# trigger's entity scope so the wizard targets the same kinds of objects.
"for_subscribe": ["data_product", "data_contract"],
"for_request_access": ["access_grant", "project", "role"],
"for_request_review": ["data_contract", "data_product", "data_asset_review"],
"for_request_publish": ["data_contract", "data_product"],
"for_request_certify": ["data_contract", "data_product"],
"for_request_status_change": ["data_product"],
"for_approval_response": [], # system trigger — any entity
# Background jobs
"on_job_success": ["job"],
"on_job_failure": ["job"],
# Scheduled / manual — no entity binding
"scheduled": [],
"manual": [],
}


def _trigger_group(value: str) -> str:
if value in _TRIGGER_GROUPS_REQUEST_FLOW:
return "request_flow"
if value in _TRIGGER_GROUPS_VALIDATION:
return "validation_gates"
if value in _TRIGGER_GROUPS_SYSTEM_SCHEDULED:
return "system_scheduled"
if value in _TRIGGER_GROUPS_LIFECYCLE:
return "lifecycle"
# Safe fallback — should never hit if mappings stay in sync with the enum.
return "lifecycle"


@router.get("/trigger-types", response_model=List[Dict[str, Any]])
async def get_trigger_types(
request: Request,
_: bool = Depends(PermissionChecker('settings', FeatureAccessLevel.READ_ONLY)),
) -> List[Dict[str, Any]]:
"""Get the UI catalog of all trigger types.

Returns one entry per TriggerType enum member with the metadata the
workflow-authoring picker needs (label, group, workflow_type,
entity_types). See module-level comments for the contract.
"""
out: List[Dict[str, Any]] = []
for tt in TriggerType:
value = tt.value
workflow_type = "approval" if value.startswith("for_") else "process"
out.append({
"value": value,
"label": _TRIGGER_LABELS.get(value, value.replace("_", " ").title()),
"workflow_type": workflow_type,
"entity_types": list(_TRIGGER_ENTITY_TYPES.get(value, [])),
"group": _trigger_group(value),
})
return out


@router.get("/executions", response_model=WorkflowExecutionListResponse)
async def list_executions(
request: Request,
Expand Down
84 changes: 84 additions & 0 deletions src/backend/src/tests/unit/test_trigger_enum_pin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Enum-pin test for TriggerType.

The wire format of `trigger.type` in stored workflow definitions is the raw
string value of each TriggerType enum member. Renaming any of these values is
a breaking change for every existing workflow row in customer databases — so
this test pins the exact strings.

If you genuinely need to add a new trigger, append it here. If you think you
need to rename an existing one, write a migration instead.
"""

import pytest

from src.models.process_workflows import TriggerType


# (Enum-member-name, expected wire value) — one tuple per TriggerType member.
# Keep this list in sync with the enum; the test below also asserts no enum
# member is missing from the list.
_EXPECTED_TRIGGER_VALUES = [
("ON_CREATE", "on_create"),
("ON_UPDATE", "on_update"),
("ON_DELETE", "on_delete"),
("ON_STATUS_CHANGE", "on_status_change"),
("SCHEDULED", "scheduled"),
("MANUAL", "manual"),
("BEFORE_CREATE", "before_create"),
("BEFORE_UPDATE", "before_update"),
("BEFORE_STATUS_CHANGE", "before_status_change"),
("ON_REQUEST_REVIEW", "on_request_review"),
("ON_REQUEST_ACCESS", "on_request_access"),
("ON_REQUEST_PUBLISH", "on_request_publish"),
("ON_REQUEST_STATUS_CHANGE", "on_request_status_change"),
("ON_JOB_SUCCESS", "on_job_success"),
("ON_JOB_FAILURE", "on_job_failure"),
("ON_SUBSCRIBE", "on_subscribe"),
("ON_UNSUBSCRIBE", "on_unsubscribe"),
("ON_REQUEST_CERTIFY", "on_request_certify"),
("ON_CERTIFY", "on_certify"),
("ON_DECERTIFY", "on_decertify"),
("ON_PUBLISH", "on_publish"),
("ON_UNPUBLISH", "on_unpublish"),
("ON_EXPIRING", "on_expiring"),
("ON_REVOKE", "on_revoke"),
("FOR_APPROVAL_RESPONSE", "for_approval_response"),
("FOR_SUBSCRIBE", "for_subscribe"),
("FOR_REQUEST_REVIEW", "for_request_review"),
("FOR_REQUEST_ACCESS", "for_request_access"),
("FOR_REQUEST_PUBLISH", "for_request_publish"),
("FOR_REQUEST_CERTIFY", "for_request_certify"),
("FOR_REQUEST_STATUS_CHANGE", "for_request_status_change"),
("ON_FIRST_ACCESS", "on_first_access"),
]


@pytest.mark.parametrize(("member_name", "wire_value"), _EXPECTED_TRIGGER_VALUES)
def test_trigger_value_is_pinned(member_name: str, wire_value: str) -> None:
"""Each TriggerType member must keep its exact wire-format string."""
member = getattr(TriggerType, member_name)
assert member.value == wire_value, (
f"TriggerType.{member_name}.value changed from '{wire_value}' to "
f"'{member.value}' — this is a breaking change for stored workflow "
f"trigger configs. Add a migration instead of renaming the enum value."
)


def test_all_enum_members_are_pinned() -> None:
"""Every TriggerType member must appear in the pin table.

Catches the case where a new trigger is added to the enum but its wire
value is not vetted here.
"""
enum_names = {m.name for m in TriggerType}
pinned_names = {name for name, _ in _EXPECTED_TRIGGER_VALUES}
missing = enum_names - pinned_names
extra = pinned_names - enum_names
assert not missing, (
f"New TriggerType members missing from the pin table: {sorted(missing)}. "
f"Add them to _EXPECTED_TRIGGER_VALUES in this file."
)
assert not extra, (
f"Pin table references TriggerType members that no longer exist: "
f"{sorted(extra)}. Did you delete the enum value?"
)
91 changes: 91 additions & 0 deletions src/backend/src/tests/unit/test_trigger_types_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Contract test for GET /api/workflows/trigger-types.

Asserts the response shape and that every TriggerType enum member is
represented exactly once. The endpoint is the canonical catalog consumed by
the frontend workflow-authoring picker — drift here breaks the UX without
breaking type checks.
"""

import asyncio
from typing import Any, Dict, List
from unittest.mock import MagicMock

import pytest

from src.models.process_workflows import TriggerType
from src.routes.workflows_routes import get_trigger_types


def _call_endpoint() -> List[Dict[str, Any]]:
"""Invoke the async route handler directly, bypassing FastAPI auth."""
# PermissionChecker is a Depends() — bypassed by calling the underlying
# coroutine directly with positional args. request is unused by the
# handler body, so a MagicMock is sufficient.
return asyncio.get_event_loop().run_until_complete(
get_trigger_types(request=MagicMock(), _=True)
)


def test_every_trigger_type_represented_exactly_once() -> None:
payload = _call_endpoint()
values = [entry["value"] for entry in payload]
expected = {tt.value for tt in TriggerType}
assert set(values) == expected, (
f"Trigger-types catalog drift: missing={expected - set(values)}, "
f"extra={set(values) - expected}"
)
# No duplicates
assert len(values) == len(set(values)), (
f"Duplicate entries in trigger-types catalog: {values}"
)
assert len(values) == len(expected)


def test_response_shape() -> None:
payload = _call_endpoint()
required_keys = {"value", "label", "workflow_type", "entity_types", "group"}
for entry in payload:
assert required_keys.issubset(entry.keys()), (
f"Trigger-types entry missing keys: {required_keys - entry.keys()} "
f"(entry={entry})"
)
assert isinstance(entry["value"], str)
assert isinstance(entry["label"], str) and entry["label"]
assert entry["workflow_type"] in {"process", "approval"}
assert isinstance(entry["entity_types"], list)
for et in entry["entity_types"]:
assert isinstance(et, str)
assert entry["group"] in {
"lifecycle", "request_flow", "validation_gates", "system_scheduled",
}


def test_is_advanced_not_in_response() -> None:
"""The is_advanced field was removed when the "Show advanced triggers"
toggle was dropped from the picker — for_approval_response is now shown
inline alongside the other approval triggers. Guard against it sneaking
back in.
"""
payload = _call_endpoint()
for entry in payload:
assert "is_advanced" not in entry, (
f"is_advanced should not be returned anymore (entry={entry})"
)


def test_for_triggers_are_approval_workflow_type() -> None:
"""Every for_* trigger must be classified as approval, everything else as process."""
payload = _call_endpoint()
for entry in payload:
if entry["value"].startswith("for_"):
assert entry["workflow_type"] == "approval", entry
else:
assert entry["workflow_type"] == "process", entry


def test_approval_triggers_are_in_request_flow_group() -> None:
"""All for_* approval triggers should live under request_flow in the UI."""
payload = _call_endpoint()
for entry in payload:
if entry["workflow_type"] == "approval":
assert entry["group"] == "request_flow", entry
Loading
Loading