diff --git a/tests/data/workflows/unit_conditional_adder_tree_continues.yml b/tests/data/workflows/unit_conditional_adder_tree_continues.yml new file mode 100644 index 00000000..7c081403 --- /dev/null +++ b/tests/data/workflows/unit_conditional_adder_tree_continues.yml @@ -0,0 +1,45 @@ +title: Adder tree continues +# A +# /\ +# B C <- C should run +description: Test that the condition returns true and C runs +config: + scheduler: dynamic +entrypoint: a +inputs: + another_url: http://api:8000 + value: 1 + +triggers: + - type: webhook + ref: my_webhook + id: wh-XXXXXX + entrypoint: a # This can be any + args: + url: http://api:8000/test/items/1 + method: GET + +actions: + - ref: a + action: example.passthrough + args: + value: "1" + + - ref: b + action: example.add + args: + # Demonstrate casting + lhs: ${{ ACTIONS.a.result -> int }} + rhs: 1 + depends_on: + - a + + - ref: c + action: example.add + args: + lhs: 3 + rhs: ${{ ACTIONS.a.result -> int }} + depends_on: + - a + # This task should run, as a.result is "1" + run_if: ${{ FNS.is_equal(ACTIONS.a.result, str(1)) }} diff --git a/tests/data/workflows/unit_conditional_adder_tree_continues_expected.yaml b/tests/data/workflows/unit_conditional_adder_tree_continues_expected.yaml new file mode 100644 index 00000000..1f73238b --- /dev/null +++ b/tests/data/workflows/unit_conditional_adder_tree_continues_expected.yaml @@ -0,0 +1,14 @@ +ACTIONS: + a: + result: "1" + result_typename: "str" + b: + result: 2 + result_typename: "int" + c: + result: 4 + result_typename: "int" + +INPUTS: + another_url: http://api:8000 + value: 1 diff --git a/tests/data/workflows/unit_conditional_adder_tree_halt.yml b/tests/data/workflows/unit_conditional_adder_tree_halt.yml new file mode 100644 index 00000000..dce583b9 --- /dev/null +++ b/tests/data/workflows/unit_conditional_adder_tree_halt.yml @@ -0,0 +1,45 @@ +title: Adder tree 1 Workflow +# A +# /\ +# B [C] <- C should not run +description: Test that the condition returns false and C does not run +config: + scheduler: dynamic +entrypoint: a +inputs: + another_url: http://api:8000 + value: 1 + +triggers: + - type: webhook + ref: my_webhook + id: wh-XXXXXX + entrypoint: a # This can be any + args: + url: http://api:8000/test/items/1 + method: GET + +actions: + - ref: a + action: example.passthrough + args: + value: "1" + + - ref: b + action: example.add + args: + # Demonstrate casting + lhs: ${{ ACTIONS.a.result -> int }} + rhs: 1 + depends_on: + - a + + - ref: c + action: example.add + args: + lhs: 3 + rhs: ${{ ACTIONS.a.result -> int }} + depends_on: + - a + # This task should not run, as a.result is "1" + run_if: ${{ FNS.is_equal(ACTIONS.a.result, str(2)) }} diff --git a/tests/data/workflows/unit_conditional_adder_tree_halt_expected.yaml b/tests/data/workflows/unit_conditional_adder_tree_halt_expected.yaml new file mode 100644 index 00000000..d43eb41b --- /dev/null +++ b/tests/data/workflows/unit_conditional_adder_tree_halt_expected.yaml @@ -0,0 +1,10 @@ +ACTIONS: + a: + result: "1" + result_typename: "str" + b: + result: 2 + result_typename: "int" +INPUTS: + another_url: "http://api:8000" + value: 1 diff --git a/tests/unit/test_workflows.py b/tests/unit/test_workflows.py index 3a45e2b3..30f8375e 100644 --- a/tests/unit/test_workflows.py +++ b/tests/unit/test_workflows.py @@ -12,8 +12,10 @@ import os import uuid from pathlib import Path +from typing import Any import pytest +import yaml from loguru import logger from temporalio.common import RetryPolicy from temporalio.worker import Worker @@ -95,6 +97,15 @@ def dsl(request: pytest.FixtureRequest) -> DSLInput: return dsl +# Fixture to load yaml files from name +@pytest.fixture +def expected(request: pytest.FixtureRequest) -> dict[str, Any]: + path: Path = request.param + with path.open() as f: + yaml_data = f.read() + return yaml.safe_load(yaml_data) + + @pytest.mark.parametrize("dsl", SHARED_TEST_DEFNS, indirect=True) @pytest.mark.asyncio async def test_workflow_can_run_from_yaml( @@ -184,3 +195,48 @@ async def test_workflow_ordering_is_correct( # Check that the execution order respects the graph edges assert_respectful_exec_order(dsl, result) + + +@pytest.mark.parametrize( + "dsl,expected", + [ + ( + DATA_PATH / "unit_conditional_adder_tree_halt.yml", + DATA_PATH / "unit_conditional_adder_tree_halt_expected.yaml", + ), + ( + DATA_PATH / "unit_conditional_adder_tree_continues.yml", + DATA_PATH / "unit_conditional_adder_tree_continues_expected.yaml", + ), + # ( + # DATA_PATH / "unit_conditional_adder_tree_halt_with_propagation.yml", + # DATA_PATH / "unit_conditional_adder_tree_halt_with_propagation.yaml", + # ), + ], + indirect=True, +) +@pytest.mark.asyncio +async def test_conditional_execution( + dsl, expected, temporal_cluster, mock_registry, auth_sandbox +): + """We need to test that the ordering of the workflow tasks is correct.""" + + # Connect client + + client = await get_temporal_client() + # Run a worker for the activities and workflow + async with Worker( + client, + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + activities=dsl_activities, + workflows=[DSLWorkflow], + workflow_runner=new_sandbox_runner(), + ): + result = await client.execute_workflow( + DSLWorkflow.run, + DSLRunArgs(dsl=dsl, role=ctx_role.get()), + id=gen_id(f"test_conditional_execution-{dsl.title}"), + task_queue=os.environ["TEMPORAL__CLUSTER_QUEUE"], + retry_policy=RetryPolicy(maximum_attempts=1), + ) + assert result == expected