diff --git a/.github/workflows/deploy-examples.yml b/.github/workflows/deploy-examples.yml index 080a05a..b7624bb 100644 --- a/.github/workflows/deploy-examples.yml +++ b/.github/workflows/deploy-examples.yml @@ -71,6 +71,7 @@ jobs: run: hatch run examples:build - name: Deploy Lambda function - ${{ matrix.example.name }} + id: deploy env: AWS_ACCOUNT_ID: ${{ secrets.AWS_ACCOUNT_ID }} LAMBDA_ENDPOINT: ${{ secrets.LAMBDA_ENDPOINT_BETA }} @@ -88,13 +89,35 @@ jobs: echo "Deploying ${{ matrix.example.name }} as $FUNCTION_NAME" hatch run examples:deploy "${{ matrix.example.name }}" --function-name "$FUNCTION_NAME" - # $LATEST is also a qualified version - QUALIFIED_FUNCTION_NAME="$FUNCTION_NAME:\$LATEST" + # $LATEST is also a qualified version + QUALIFIED_FUNCTION_NAME="${FUNCTION_NAME}:\$LATEST" # Store both names for later steps echo "FUNCTION_NAME=$FUNCTION_NAME" >> $GITHUB_ENV echo "QUALIFIED_FUNCTION_NAME=$QUALIFIED_FUNCTION_NAME" >> $GITHUB_ENV echo "VERSION=$VERSION" >> $GITHUB_ENV + echo "DEPLOYED_FUNCTION_NAME=$FUNCTION_NAME" >> $GITHUB_OUTPUT + echo "QUALIFIED_FUNCTION_NAME=$QUALIFIED_FUNCTION_NAME" >> $GITHUB_OUTPUT + + - name: Run Integration Tests - ${{ matrix.example.name }} + env: + AWS_REGION: ${{ env.AWS_REGION }} + LAMBDA_ENDPOINT: ${{ secrets.LAMBDA_ENDPOINT_BETA }} + QUALIFIED_FUNCTION_NAME: ${{ env.QUALIFIED_FUNCTION_NAME }} + LAMBDA_FUNCTION_TEST_NAME: ${{ matrix.example.name }} + run: | + echo "Running integration tests for ${{ matrix.example.name }}" + echo "Function name: ${{ steps.deploy.outputs.DEPLOYED_FUNCTION_NAME }}" + echo "Qualified function name: ${QUALIFIED_FUNCTION_NAME}" + echo "AWS Region: ${AWS_REGION}" + echo "Lambda Endpoint: ${LAMBDA_ENDPOINT}" + + # Convert example name to test name: "Hello World" -> "test_hello_world" + TEST_NAME="test_$(echo "${{ matrix.example.name }}" | tr '[:upper:]' '[:lower:]' | tr ' ' '_')" + echo "Test name: ${TEST_NAME}" + + # Run integration tests + hatch run test:examples-integration - name: Invoke Lambda function - ${{ matrix.example.name }} env: diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index 5e18db9..13d7608 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -110,6 +110,50 @@ "ExecutionTimeout": 300 }, "path": "./src/map_operations.py" + }, + { + "name": "Block Example", + "description": "Nested child contexts demonstrating block operations", + "handler": "block_example.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/block_example.py" + }, + { + "name": "Logger Example", + "description": "Demonstrating logger usage and enrichment in DurableContext", + "handler": "logger_example.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/logger_example.py" + }, + { + "name": "Steps with Retry", + "description": "Multiple steps with retry logic in a polling pattern", + "handler": "steps_with_retry.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/steps_with_retry.py" + }, + { + "name": "Wait for Condition", + "description": "Polling pattern that waits for a condition to be met", + "handler": "wait_for_condition.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait_for_condition.py" } ] } diff --git a/examples/src/block_example.py b/examples/src/block_example.py new file mode 100644 index 0000000..cb3dc72 --- /dev/null +++ b/examples/src/block_example.py @@ -0,0 +1,46 @@ +"""Example demonstrating nested child contexts (blocks).""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_with_child_context +def nested_block(ctx: DurableContext) -> str: + """Nested block with its own child context.""" + # Wait in the nested block + ctx.wait(seconds=1) + return "nested block result" + + +@durable_with_child_context +def parent_block(ctx: DurableContext) -> dict[str, str]: + """Parent block with nested operations.""" + # Nested step + nested_result: str = ctx.step( + lambda _: "nested step result", + name="nested_step", + ) + + # Nested block with its own child context + nested_block_result: str = ctx.run_in_child_context(nested_block()) + + return { + "nestedStep": nested_result, + "nestedBlock": nested_block_result, + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, str]: + """Handler demonstrating nested child contexts.""" + # Run parent block which contains nested operations + result: dict[str, str] = context.run_in_child_context( + parent_block(), name="parent_block" + ) + + return result diff --git a/examples/src/logger_example.py b/examples/src/logger_example.py new file mode 100644 index 0000000..16937ef --- /dev/null +++ b/examples/src/logger_example.py @@ -0,0 +1,50 @@ +"""Example demonstrating logger usage in DurableContext.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_with_child_context +def child_workflow(ctx: DurableContext) -> str: + """Child workflow with its own logging context.""" + # Child context logger has step_id populated with child context ID + ctx.logger.info("Running in child context") + + # Step in child context has nested step ID + child_result: str = ctx.step( + lambda _: "child-processed", + name="child_step", + ) + + ctx.logger.info("Child workflow completed", extra={"result": child_result}) + + return child_result + + +@durable_execution +def handler(event: Any, context: DurableContext) -> str: + """Handler demonstrating logger usage.""" + # Top-level context logger: no step_id field + context.logger.info("Starting workflow", extra={"eventId": event.get("id")}) + + # Logger in steps - gets enriched with step ID and attempt number + result1: str = context.step( + lambda _: "processed", + name="process_data", + ) + + context.logger.info("Step 1 completed", extra={"result": result1}) + + # Child contexts inherit the parent's logger and have their own step ID + result2: str = context.run_in_child_context(child_workflow(), name="child_workflow") + + context.logger.info( + "Workflow completed", extra={"result1": result1, "result2": result2} + ) + + return f"{result1}-{result2}" diff --git a/examples/src/map_operations.py b/examples/src/map_operations.py index 05c230f..7d4563b 100644 --- a/examples/src/map_operations.py +++ b/examples/src/map_operations.py @@ -1,3 +1,5 @@ +"""Example demonstrating map-like operations for processing collections durably.""" + from typing import Any from aws_durable_execution_sdk_python.context import DurableContext @@ -9,8 +11,8 @@ def square(x: int) -> int: @durable_execution -def handler(_event: Any, context: DurableContext) -> str: - # Process a list of items using map-like operations +def handler(_event: Any, context: DurableContext) -> list[int]: + """Process a list of items using map-like operations.""" items = [1, 2, 3, 4, 5] # Process each item as a separate durable step @@ -19,4 +21,4 @@ def handler(_event: Any, context: DurableContext) -> str: result = context.step(lambda _, x=item: square(x), name=f"square_{i}") results.append(result) - return f"Squared results: {results}" + return results diff --git a/examples/src/parallel.py b/examples/src/parallel.py index 80b2e7b..58015d8 100644 --- a/examples/src/parallel.py +++ b/examples/src/parallel.py @@ -1,3 +1,5 @@ +"""Example demonstrating parallel-like operations for concurrent execution.""" + from typing import Any from aws_durable_execution_sdk_python.context import DurableContext @@ -5,11 +7,11 @@ @durable_execution -def handler(_event: Any, context: DurableContext) -> str: - # Execute multiple operations in parallel +def handler(_event: Any, context: DurableContext) -> list[str]: + # Execute multiple operations task1 = context.step(lambda _: "Task 1 complete", name="task1") task2 = context.step(lambda _: "Task 2 complete", name="task2") task3 = context.step(lambda _: "Task 3 complete", name="task3") # All tasks execute concurrently and results are collected - return f"Results: {task1}, {task2}, {task3}" + return [task1, task2, task3] diff --git a/examples/src/step_with_retry.py b/examples/src/step_with_retry.py index 43dda77..1f70385 100644 --- a/examples/src/step_with_retry.py +++ b/examples/src/step_with_retry.py @@ -15,7 +15,9 @@ @durable_step -def unreliable_operation(_step_context: StepContext) -> str: +def unreliable_operation( + _step_context: StepContext, +) -> str: failure_threshold = 0.5 if random() > failure_threshold: # noqa: S311 msg = "Random error occurred" diff --git a/examples/src/steps_with_retry.py b/examples/src/steps_with_retry.py new file mode 100644 index 0000000..6d16e49 --- /dev/null +++ b/examples/src/steps_with_retry.py @@ -0,0 +1,73 @@ +"""Example demonstrating multiple steps with retry logic.""" + +from random import random +from typing import Any + +from aws_durable_execution_sdk_python.config import StepConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) + + +def simulated_get_item(name: str) -> dict[str, Any] | None: + """Simulate getting an item that may fail randomly.""" + # Fail 50% of the time + if random() < 0.5: # noqa: S311 + msg = "Random failure" + raise RuntimeError(msg) + + # Simulate finding item after some attempts + if random() > 0.3: # noqa: S311 + return {"id": name, "data": "item data"} + + return None + + +@durable_execution +def handler(event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating polling with retry logic.""" + name = event.get("name", "test-item") + + # Retry configuration for steps + retry_config = RetryStrategyConfig( + max_attempts=5, + retryable_error_types=[RuntimeError], + ) + + step_config = StepConfig(create_retry_strategy(retry_config)) + + item = None + poll_count = 0 + max_polls = 5 + + try: + while poll_count < max_polls: + poll_count += 1 + + # Try to get the item with retry + get_response = context.step( + lambda _, n=name: simulated_get_item(n), + name=f"get_item_poll_{poll_count}", + config=step_config, + ) + + # Did we find the item? + if get_response: + item = get_response + break + + # Wait 1 second until next poll + context.wait(seconds=1) + + except RuntimeError as e: + # Retries exhausted + return {"error": "DDB Retries Exhausted", "message": str(e)} + + if not item: + return {"error": "Item Not Found"} + + # We found the item! + return {"success": True, "item": item, "pollsRequired": poll_count} diff --git a/examples/src/wait_for_condition.py b/examples/src/wait_for_condition.py new file mode 100644 index 0000000..ab9d434 --- /dev/null +++ b/examples/src/wait_for_condition.py @@ -0,0 +1,33 @@ +"""Example demonstrating wait-for-condition pattern.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> int: + """Handler demonstrating wait-for-condition pattern.""" + state = 0 + attempt = 0 + max_attempts = 5 + + while attempt < max_attempts: + attempt += 1 + + # Execute step to update state + state = context.step( + lambda _, s=state: s + 1, + name=f"increment_state_{attempt}", + ) + + # Check condition + if state >= 3: + # Condition met, stop + break + + # Wait before next attempt + context.wait(seconds=1) + + return state diff --git a/examples/template.yaml b/examples/template.yaml index 564d1f1..82d87b6 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -1,4 +1,4 @@ -AWSTemplateFormatVersion: '2010-09-09' +AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Globals: Function: @@ -64,7 +64,8 @@ Resources: Properties: CodeUri: build/ Handler: callback.handler - Description: Basic usage of context.create_callback() to create a callback for + Description: + Basic usage of context.create_callback() to create a callback for external systems DurableConfig: RetentionPeriodInDays: 7 @@ -74,7 +75,8 @@ Resources: Properties: CodeUri: build/ Handler: wait_for_callback.handler - Description: Usage of context.wait_for_callback() to wait for external system + Description: + Usage of context.wait_for_callback() to wait for external system responses DurableConfig: RetentionPeriodInDays: 7 @@ -84,7 +86,8 @@ Resources: Properties: CodeUri: build/ Handler: run_in_child_context.handler - Description: Usage of context.run_in_child_context() to execute operations in + Description: + Usage of context.run_in_child_context() to execute operations in isolated contexts DurableConfig: RetentionPeriodInDays: 7 @@ -107,3 +110,39 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + BlockExample: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: block_example.handler + Description: Nested child contexts demonstrating block operations + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + LoggerExample: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: logger_example.handler + Description: Demonstrating logger usage and enrichment in DurableContext + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + StepsWithRetry: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: steps_with_retry.handler + Description: Multiple steps with retry logic in a polling pattern + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + WaitForCondition: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: wait_for_condition.handler + Description: Polling pattern that waits for a condition to be met + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 diff --git a/examples/test/README.md b/examples/test/README.md new file mode 100644 index 0000000..61996b4 --- /dev/null +++ b/examples/test/README.md @@ -0,0 +1,119 @@ +# Integration Tests for Python Durable Execution SDK + +This directory contains integration tests for the Python Durable Execution SDK examples. Tests can run in two modes using pytest fixtures. + +## Test Modes + +### Local Mode (Default) +Tests run against the in-memory `DurableFunctionTestRunner`: +- ✅ Fast execution (seconds) +- ✅ No AWS credentials needed +- ✅ Perfect for development +- ✅ Validates local runner behavior + +```bash +# Run all example tests locally (default) +hatch run test:examples + +# Run with explicit mode flag +pytest --runner-mode=local -m example examples/test/ + +# Run specific test +pytest --runner-mode=local -k test_hello_world examples/test/ +``` + +### Cloud Mode (Integration) +Tests run against actual AWS Lambda functions using `DurableFunctionCloudTestRunner`: +- ✅ Validates cloud deployment +- ✅ Tests real Lambda execution +- ✅ Verifies end-to-end behavior +- ⚠️ Requires deployed functions + +```bash +# Deploy function first +hatch run examples:deploy "hello world" --function-name HelloWorld-Test + +# Set environment variables for cloud testing +export AWS_REGION=us-west-2 +export LAMBDA_ENDPOINT=https://lambda.us-west-2.amazonaws.com +export QUALIFIED_FUNCTION_NAME="HelloWorld-Test:\$LATEST" +export LAMBDA_FUNCTION_TEST_NAME="hello world" + +# Run tests +pytest --runner-mode=cloud -k test_hello_world examples/test/ + +# Or using hatch +hatch run test:examples-integration -k test_hello_world +``` + +## Writing Tests + +Use the `durable_runner` pytest fixture with the `@pytest.mark.durable_execution` marker: + +```python +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from examples.src import my_example + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=my_example.handler, + lambda_function_name="my example", +) +def test_my_example(durable_runner): + """Test my example in both local and cloud modes.""" + with durable_runner: + result = durable_runner.run(input={"test": "data"}, timeout=10) + + # Assertions work in both modes + assert result.status == InvocationStatus.SUCCEEDED + assert result.result == "expected output" + + # Optional mode-specific validations + if durable_runner.mode == "cloud": + # Cloud-specific assertions + pass +``` + +## Configuration + +### Environment Variables (Cloud Mode) +- `AWS_REGION` - AWS region for Lambda invocation (default: us-west-2) +- `LAMBDA_ENDPOINT` - Optional Lambda endpoint URL for testing +- `QUALIFIED_FUNCTION_NAME` - Deployed Lambda function ARN or qualified name (required for cloud mode) +- `LAMBDA_FUNCTION_TEST_NAME` - Lambda function name to match with test's `lambda_function_name` marker (required for cloud mode) + +### CLI Options +- `--runner-mode` - Test mode: `local` (default) or `cloud` + +### Pytest Markers +- `-m example` - Run only example tests +- `-k test_name` - Run tests matching pattern + +## CI/CD Integration + +Tests automatically run in CI/CD after deployment: + +1. `deploy-examples.yml` deploys functions +2. Integration tests run against deployed functions +3. Results reported in GitHub Actions + +See `.github/workflows/deploy-examples.yml` for details. + +## Troubleshooting + +### Timeout errors +**Problem**: `TimeoutError: Execution did not complete within 60s` + +**Solution**: Increase timeout in test: +```python +result = runner.run(input="test", timeout=120) # Increase to 120s +``` + +### Import errors +**Problem**: `ModuleNotFoundError: No module named 'aws_durable_execution_sdk_python_testing'` + +**Solution**: Install dependencies: +```bash +hatch run test:examples # Installs dependencies automatically diff --git a/examples/test/conftest.py b/examples/test/conftest.py new file mode 100644 index 0000000..1f329d4 --- /dev/null +++ b/examples/test/conftest.py @@ -0,0 +1,231 @@ +"""Pytest configuration and fixtures for durable execution tests.""" + +import contextlib +import json +import logging +import os +import sys +from enum import StrEnum +from pathlib import Path +from typing import Any + +import pytest +from aws_durable_execution_sdk_python.lambda_service import OperationPayload +from aws_durable_execution_sdk_python.serdes import ExtendedTypeSerDes + +from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + DurableFunctionTestResult, + DurableFunctionTestRunner, +) + + +# Add examples/src to Python path for imports +examples_src = Path(__file__).parent.parent / "src" +if str(examples_src) not in sys.path: + sys.path.insert(0, str(examples_src)) + + +logger = logging.getLogger(__name__) + + +def deserialize_operation_payload( + payload: OperationPayload | None, serdes: ExtendedTypeSerDes | None = None +) -> Any: + """Deserialize an operation payload using the provided or default serializer. + + This utility function helps test code deserialize operation results that are + returned as raw strings. It supports both the default ExtendedTypeSerDes and + custom serializers. + + Args: + payload: The operation payload string to deserialize, or None. + serdes: Optional custom serializer. If None, uses ExtendedTypeSerDes. + + Returns: + Deserialized result object, or None if payload is None. + """ + if not payload: + return None + + if serdes is None: + serdes = ExtendedTypeSerDes() + + try: + return serdes.deserialize(payload) + except Exception: + # Fallback to plain JSON for backwards compatibility + return json.loads(payload) + + +class RunnerMode(StrEnum): + """Runner mode for local or cloud execution.""" + + LOCAL = "local" + CLOUD = "cloud" + + +def pytest_addoption(parser): + """Add custom command line options for test execution.""" + parser.addoption( + "--runner-mode", + action="store", + default=RunnerMode.LOCAL, + choices=[RunnerMode.LOCAL, RunnerMode.CLOUD], + help="Test runner mode: local (in-memory) or cloud (deployed Lambda)", + ) + + +class TestRunnerAdapter: + """Adapter that provides consistent interface for both local and cloud runners. + + This adapter encapsulates the differences between local and cloud test runners: + - Local runner: Requires context manager for resource cleanup (scheduler thread) + - Cloud runner: No resource cleanup needed (stateless boto3 client) + + The adapter ensures proper resource management while providing a unified interface. + """ + + def __init__( + self, + runner: DurableFunctionTestRunner | DurableFunctionCloudTestRunner, + mode: str, + ): + """Initialize the adapter.""" + self._runner: DurableFunctionTestRunner | DurableFunctionCloudTestRunner = ( + runner + ) + self._mode: str = mode + + def run( + self, + input: str | None = None, # noqa: A002 + timeout: int = 60, + ) -> DurableFunctionTestResult: + """Execute the durable function and return results.""" + return self._runner.run(input=input, timeout=timeout) + + @property + def mode(self) -> str: + """Get the runner mode (local or cloud).""" + return self._mode + + def __enter__(self): + """Context manager entry - only calls runner's __enter__ if it's a context manager.""" + if isinstance(self._runner, contextlib.AbstractContextManager): + self._runner.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit - only calls runner's __exit__ if it's a context manager.""" + if isinstance(self._runner, contextlib.AbstractContextManager): + return self._runner.__exit__(exc_type, exc_val, exc_tb) + return None + + +@pytest.fixture +def durable_runner(request): + """Pytest fixture that provides a test runner based on configuration. + + Configuration for cloud mode: + Environment variables (required): + AWS_REGION: AWS region for Lambda invocation (default: us-west-2) + LAMBDA_ENDPOINT: Optional Lambda endpoint URL + PYTEST_FUNCTION_NAME_MAP: JSON mapping of example names to deployed function names + + CLI option: + --runner-mode=cloud (or local, default: local) + + Example: + AWS_REGION=us-west-2 \ + LAMBDA_ENDPOINT=https://lambda.us-west-2.amazonaws.com \ + PYTEST_FUNCTION_NAME_MAP='{"hello world":"HelloWorld:$LATEST"}' \ + pytest --runner-mode=cloud -k test_hello_world + + Usage in tests: + @pytest.mark.durable_execution( + handler=hello_world.handler, + lambda_function_name="hello world" + ) + def test_hello_world(durable_runner): + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + assert result.status == InvocationStatus.SUCCEEDED + """ + # Get marker with test configuration + marker = request.node.get_closest_marker("durable_execution") + if not marker: + pytest.fail("Test must be marked with @pytest.mark.durable_execution") + + handler: Any = marker.kwargs.get("handler") + lambda_function_name: str | None = marker.kwargs.get("lambda_function_name") + + # Get runner mode from CLI option + runner_mode: str = request.config.getoption("--runner-mode") + + logger.info("Running test in %s mode", runner_mode.upper()) + + # Create appropriate runner + if runner_mode == RunnerMode.CLOUD: + # Get deployed function name and AWS config from environment + deployed_name = _get_deployed_function_name(request, lambda_function_name) + region = os.environ.get("AWS_REGION", "us-west-2") + lambda_endpoint = os.environ.get("LAMBDA_ENDPOINT") + + logger.info("Using AWS region: %s", region) + + # Create cloud runner (no cleanup needed) + runner = DurableFunctionCloudTestRunner( + function_name=deployed_name, + region=region, + lambda_endpoint=lambda_endpoint, + ) + else: + if not handler: + pytest.fail("handler is required for local mode tests") + # Create local runner (needs cleanup via context manager) + runner = DurableFunctionTestRunner(handler=handler) + + # Wrap in adapter and use context manager for proper cleanup + with TestRunnerAdapter(runner, runner_mode) as adapter: + yield adapter + + +def _get_deployed_function_name( + request: pytest.FixtureRequest, + lambda_function_name: str | None, +) -> str: + """Get the deployed function name from environment variables. + + Required environment variables: + - QUALIFIED_FUNCTION_NAME: The qualified function ARN (e.g., "MyFunction:$LATEST") + - LAMBDA_FUNCTION_TEST_NAME: The lambda function name to match against test markers + + Tests are skipped if the test's lambda_function_name doesn't match LAMBDA_FUNCTION_TEST_NAME. + """ + if not lambda_function_name: + pytest.fail("lambda_function_name is required for cloud mode tests") + + # Get from environment variables + function_arn = os.environ.get("QUALIFIED_FUNCTION_NAME") + env_function_name = os.environ.get("LAMBDA_FUNCTION_TEST_NAME") + + if not function_arn or not env_function_name: + pytest.fail( + "Cloud mode requires both QUALIFIED_FUNCTION_NAME and LAMBDA_FUNCTION_TEST_NAME environment variables\n" + 'Example: QUALIFIED_FUNCTION_NAME="MyFunction:$LATEST" LAMBDA_FUNCTION_TEST_NAME="hello world" pytest --runner-mode=cloud' + ) + + # Check if this test matches the function name (case-insensitive) + if lambda_function_name.lower() == env_function_name.lower(): + logger.info( + "Using function ARN: %s for lambda function: %s", + function_arn, + env_function_name, + ) + return function_arn + + # This test doesn't match the function name, skip it + pytest.skip( + f"Test '{lambda_function_name}' doesn't match LAMBDA_FUNCTION_TEST_NAME '{env_function_name}'" + ) diff --git a/examples/test/test_block_example.py b/examples/test/test_block_example.py new file mode 100644 index 0000000..3d220a6 --- /dev/null +++ b/examples/test/test_block_example.py @@ -0,0 +1,104 @@ +"""Tests for block_example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src import block_example +from test.conftest import deserialize_operation_payload + + +def _get_all_operations(operations): + """Recursively get all operations including nested ones.""" + all_ops = [] + for op in operations: + all_ops.append(op) + if hasattr(op, "child_operations") and op.child_operations: + all_ops.extend(_get_all_operations(op.child_operations)) + return all_ops + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=block_example.handler, + lambda_function_name="block example", +) +def test_block_example(durable_runner): + """Test block example with nested child contexts.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + # Verify the final result structure + assert deserialize_operation_payload(result.result) == { + "nestedStep": "nested step result", + "nestedBlock": "nested block result", + } + + # Check for the parent block operation + parent_block_ops = [ + op + for op in result.operations + if op.operation_type.value == "CONTEXT" and op.name == "parent_block" + ] + assert len(parent_block_ops) == 1 + parent_block_op = parent_block_ops[0] + + # Verify parent block result + assert deserialize_operation_payload(parent_block_op.result) == { + "nestedStep": "nested step result", + "nestedBlock": "nested block result", + } + + # Verify parent block has 2 child operations + child_operations = parent_block_op.child_operations + assert len(child_operations) == 2 + + # First child should be a STEP with result "nested step result" + assert child_operations[0].operation_type.value == "STEP" + assert ( + deserialize_operation_payload(child_operations[0].result) + == "nested step result" + ) + + # Second child should be a CONTEXT with result "nested block result" + assert child_operations[1].operation_type.value == "CONTEXT" + assert ( + deserialize_operation_payload(child_operations[1].result) + == "nested block result" + ) + + # Check for nested step operation by name + nested_step_ops = [ + op + for op in result.operations + if op.operation_type.value == "STEP" and op.name == "nested_step" + ] + # Note: nested_step is inside parent_block, so it won't be at top level + # We need to search in child operations + all_ops = _get_all_operations(result.operations) + nested_step_ops = [ + op + for op in all_ops + if op.operation_type.value == "STEP" and op.name == "nested_step" + ] + assert len(nested_step_ops) == 1 + assert ( + deserialize_operation_payload(nested_step_ops[0].result) == "nested step result" + ) + + # Check for nested block operation by name + nested_block_ops = [ + op + for op in all_ops + if op.operation_type.value == "CONTEXT" and op.name == "nested_block" + ] + assert len(nested_block_ops) == 1 + assert ( + deserialize_operation_payload(nested_block_ops[0].result) + == "nested block result" + ) + + # Verify wait operation exists within nested context + wait_ops = [op for op in all_ops if op.operation_type.value == "WAIT"] + assert len(wait_ops) >= 1 diff --git a/examples/test/test_callback.py b/examples/test/test_callback.py index a3712c9..ae46a95 100644 --- a/examples/test/test_callback.py +++ b/examples/test/test_callback.py @@ -1,21 +1,26 @@ """Tests for callback example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import callback +from test.conftest import deserialize_operation_payload -def test_callback(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback.handler, + lambda_function_name="callback", +) +def test_callback(durable_runner): """Test callback example.""" - with DurableFunctionTestRunner(handler=callback.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result.startswith("Callback created with ID:") + assert deserialize_operation_payload(result.result).startswith( + "Callback created with ID:" + ) # Find the callback operation callback_ops = [ diff --git a/examples/test/test_callback_permutations.py b/examples/test/test_callback_permutations.py index 3e5e0b8..9c1e661 100644 --- a/examples/test/test_callback_permutations.py +++ b/examples/test/test_callback_permutations.py @@ -1,21 +1,26 @@ """Tests for callback operation permutations.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import callback_with_timeout +from test.conftest import deserialize_operation_payload -def test_callback_with_timeout(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_with_timeout.handler, + lambda_function_name="callback with timeout", +) +def test_callback_with_timeout(durable_runner): """Test callback with custom timeout configuration.""" - with DurableFunctionTestRunner(handler=callback_with_timeout.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result.startswith("Callback created with 60s timeout:") + assert deserialize_operation_payload(result.result).startswith( + "Callback created with 60s timeout:" + ) callback_ops = [ op for op in result.operations if op.operation_type.value == "CALLBACK" diff --git a/examples/test/test_hello_world.py b/examples/test/test_hello_world.py index a4447b7..c87b8cc 100644 --- a/examples/test/test_hello_world.py +++ b/examples/test/test_hello_world.py @@ -1,18 +1,21 @@ -"""Integration tests for example durable functions.""" +"""Integration tests for hello world example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import hello_world +from test.conftest import deserialize_operation_payload -def test_hello_world(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=hello_world.handler, + lambda_function_name="hello world", +) +def test_hello_world(durable_runner): """Test hello world example.""" - with DurableFunctionTestRunner(handler=hello_world.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Hello World!" + assert deserialize_operation_payload(result.result) == "Hello World!" diff --git a/examples/test/test_logger_example.py b/examples/test/test_logger_example.py new file mode 100644 index 0000000..30290b5 --- /dev/null +++ b/examples/test/test_logger_example.py @@ -0,0 +1,35 @@ +"""Tests for logger_example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType + +from src import logger_example +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=logger_example.handler, + lambda_function_name="logger example", +) +def test_logger_example(durable_runner): + """Test logger example.""" + with durable_runner: + result = durable_runner.run(input={"id": "test-123"}, timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == "processed-child-processed" + + # Verify step operations exist (process_data at top level) + # Note: child_step is nested inside the CONTEXT operation, not at top level + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] + assert len(step_ops) >= 1 + + # Verify context operation exists (child_workflow) + context_ops = [ + op for op in result.operations if op.operation_type.value == "CONTEXT" + ] + assert len(context_ops) >= 1 diff --git a/examples/test/test_map_operations.py b/examples/test/test_map_operations.py index 1106c66..c784942 100644 --- a/examples/test/test_map_operations.py +++ b/examples/test/test_map_operations.py @@ -1,24 +1,30 @@ """Tests for map_operations example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import map_operations +from test.conftest import deserialize_operation_payload -def test_map_operations(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_operations.handler, + lambda_function_name="map operations", +) +def test_map_operations(durable_runner): """Test map_operations example.""" - with DurableFunctionTestRunner(handler=map_operations.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Squared results: [1, 4, 9, 16, 25]" + assert deserialize_operation_payload(result.result) == [1, 4, 9, 16, 25] # Verify all five step operations exist - step_ops = [op for op in result.operations if op.operation_type.value == "STEP"] + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] assert len(step_ops) == 5 step_names = {op.name for op in step_ops} diff --git a/examples/test/test_parallel.py b/examples/test/test_parallel.py index b192f7c..5878648 100644 --- a/examples/test/test_parallel.py +++ b/examples/test/test_parallel.py @@ -1,24 +1,34 @@ """Tests for parallel example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import parallel +from test.conftest import deserialize_operation_payload -def test_parallel(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel.handler, + lambda_function_name="Parallel Operations", +) +def test_parallel(durable_runner): """Test parallel example.""" - with DurableFunctionTestRunner(handler=parallel.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Results: Task 1 complete, Task 2 complete, Task 3 complete" + assert deserialize_operation_payload(result.result) == [ + "Task 1 complete", + "Task 2 complete", + "Task 3 complete", + ] # Verify all three step operations exist - step_ops = [op for op in result.operations if op.operation_type.value == "STEP"] + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] assert len(step_ops) == 3 step_names = {op.name for op in step_ops} diff --git a/examples/test/test_run_in_child_context.py b/examples/test/test_run_in_child_context.py index 9795cbd..1bc5b26 100644 --- a/examples/test/test_run_in_child_context.py +++ b/examples/test/test_run_in_child_context.py @@ -1,21 +1,24 @@ """Tests for run_in_child_context example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import run_in_child_context +from test.conftest import deserialize_operation_payload -def test_run_in_child_context(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=run_in_child_context.handler, + lambda_function_name="run in child context", +) +def test_run_in_child_context(durable_runner): """Test run_in_child_context example.""" - with DurableFunctionTestRunner(handler=run_in_child_context.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Child context result: 10" + assert deserialize_operation_payload(result.result) == "Child context result: 10" # Verify child context operation exists context_ops = [ diff --git a/examples/test/test_step.py b/examples/test/test_step.py index dda0693..3fde032 100644 --- a/examples/test/test_step.py +++ b/examples/test/test_step.py @@ -1,22 +1,24 @@ """Tests for step example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, - StepOperation, -) from src import step +from test.conftest import deserialize_operation_payload -def test_step(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step.handler, + lambda_function_name="Basic Step", +) +def test_step(durable_runner): """Test basic step example.""" - with DurableFunctionTestRunner(handler=step.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == 8 + assert deserialize_operation_payload(result.result) == 8 - step_result: StepOperation = result.get_step("add_numbers") - assert step_result.result == 8 + step_result = result.get_step("add_numbers") + assert deserialize_operation_payload(step_result.result) == 8 diff --git a/examples/test/test_step_permutations.py b/examples/test/test_step_permutations.py index 60c0ecd..d46b733 100644 --- a/examples/test/test_step_permutations.py +++ b/examples/test/test_step_permutations.py @@ -1,51 +1,75 @@ """Tests for step operation permutations.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import step_no_name, step_with_exponential_backoff, step_with_name +from test.conftest import deserialize_operation_payload -def test_step_no_name(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step_no_name.handler, + lambda_function_name="step no name", +) +def test_step_no_name(durable_runner): """Test step without explicit name.""" - with DurableFunctionTestRunner(handler=step_no_name.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Result: Step without name" + assert deserialize_operation_payload(result.result) == "Result: Step without name" - step_ops = [op for op in result.operations if op.operation_type.value == "STEP"] + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] assert len(step_ops) == 1 # Should use function name when no name provided assert step_ops[0].name is None or step_ops[0].name == "" -def test_step_with_name(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step_with_name.handler, + lambda_function_name="step with name", +) +def test_step_with_name(durable_runner): """Test step with explicit name.""" - with DurableFunctionTestRunner(handler=step_with_name.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Result: Step with explicit name" + assert ( + deserialize_operation_payload(result.result) + == "Result: Step with explicit name" + ) - step_ops = [op for op in result.operations if op.operation_type.value == "STEP"] + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] assert len(step_ops) == 1 assert step_ops[0].name == "custom_step" -def test_step_with_exponential_backoff(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step_with_exponential_backoff.handler, + lambda_function_name="step with exponential backoff", +) +def test_step_with_exponential_backoff(durable_runner): """Test step with exponential backoff retry strategy.""" - with DurableFunctionTestRunner( - handler=step_with_exponential_backoff.handler - ) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Result: Step with exponential backoff" + assert ( + deserialize_operation_payload(result.result) + == "Result: Step with exponential backoff" + ) - step_ops = [op for op in result.operations if op.operation_type.value == "STEP"] + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] assert len(step_ops) == 1 assert step_ops[0].name == "retry_step" diff --git a/examples/test/test_step_semantics_at_most_once.py b/examples/test/test_step_semantics_at_most_once.py new file mode 100644 index 0000000..fd8908f --- /dev/null +++ b/examples/test/test_step_semantics_at_most_once.py @@ -0,0 +1,32 @@ +"""Tests for step_semantics_at_most_once example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType + +from src import step_semantics_at_most_once +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step_semantics_at_most_once.handler, + lambda_function_name="step semantics at most once", +) +def test_step_semantics_at_most_once(durable_runner): + """Test step with at-most-once semantics.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert ( + deserialize_operation_payload(result.result) + == "Result: AT_MOST_ONCE_PER_RETRY semantics" + ) + + # Verify step operation exists with correct name + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] + assert len(step_ops) == 1 + assert step_ops[0].name == "at_most_once_step" diff --git a/examples/test/test_step_with_retry.py b/examples/test/test_step_with_retry.py new file mode 100644 index 0000000..9f4f884 --- /dev/null +++ b/examples/test/test_step_with_retry.py @@ -0,0 +1,33 @@ +"""Tests for step_with_retry example.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType + +from src import step_with_retry +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=step_with_retry.handler, + lambda_function_name="step with retry", +) +def test_step_with_retry(durable_runner): + """Test step with retry configuration.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=30) + + # The function uses random() so it may succeed or fail + # We just verify it completes and has retry configuration + assert result.status in [InvocationStatus.SUCCEEDED, InvocationStatus.FAILED] + + # Verify step operation exists + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] + assert len(step_ops) >= 1 + + # If it succeeded, verify the result + if result.status is InvocationStatus.SUCCEEDED: + assert deserialize_operation_payload(result.result) == "Operation succeeded" diff --git a/examples/test/test_steps_with_retry.py b/examples/test/test_steps_with_retry.py new file mode 100644 index 0000000..452ed5f --- /dev/null +++ b/examples/test/test_steps_with_retry.py @@ -0,0 +1,33 @@ +"""Tests for steps_with_retry.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType + +from src import steps_with_retry +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=steps_with_retry.handler, + lambda_function_name="steps with retry", +) +def test_steps_with_retry(durable_runner): + """Test steps_with_retry pattern.""" + with durable_runner: + result = durable_runner.run(input={"name": "test-item"}, timeout=30) + + assert result.status is InvocationStatus.SUCCEEDED + + # Result should be either success with item or error + assert isinstance(deserialize_operation_payload(result.result), dict) + assert "success" in deserialize_operation_payload( + result.result + ) or "error" in deserialize_operation_payload(result.result) + + # Verify step operations exist (polling steps) + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] + assert len(step_ops) >= 1 diff --git a/examples/test/test_wait.py b/examples/test/test_wait.py index e9331b2..b8a98c7 100644 --- a/examples/test/test_wait.py +++ b/examples/test/test_wait.py @@ -1,21 +1,24 @@ """Tests for wait example.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import wait +from test.conftest import deserialize_operation_payload -def test_wait(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait.handler, + lambda_function_name="Wait State", +) +def test_wait(durable_runner): """Test wait example.""" - with DurableFunctionTestRunner(handler=wait.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Wait completed" + assert deserialize_operation_payload(result.result) == "Wait completed" # Find the wait operation (it should be the only non-execution operation) wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"] diff --git a/examples/test/test_wait_for_condition.py b/examples/test/test_wait_for_condition.py new file mode 100644 index 0000000..51187a1 --- /dev/null +++ b/examples/test/test_wait_for_condition.py @@ -0,0 +1,33 @@ +"""Tests for wait_for_condition.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationType + +from src import wait_for_condition +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_for_condition.handler, + lambda_function_name="wait for condition", +) +def test_wait_for_condition(durable_runner): + """Test wait_for_condition pattern.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=15) + + assert result.status is InvocationStatus.SUCCEEDED + # Should reach state 3 after 3 increments + assert deserialize_operation_payload(result.result) == 3 + + # Verify step operations exist (should have 3 increment steps) + step_ops = [ + op for op in result.operations if op.operation_type == OperationType.STEP + ] + assert len(step_ops) == 3 + + # Verify wait operations exist (should have 2 waits before final state) + wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"] + assert len(wait_ops) == 2 diff --git a/examples/test/test_wait_permutations.py b/examples/test/test_wait_permutations.py index e2d1965..6f33787 100644 --- a/examples/test/test_wait_permutations.py +++ b/examples/test/test_wait_permutations.py @@ -1,21 +1,24 @@ """Tests for wait operation permutations.""" +import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python_testing.runner import ( - DurableFunctionTestResult, - DurableFunctionTestRunner, -) from src import wait_with_name +from test.conftest import deserialize_operation_payload -def test_wait_with_name(): +@pytest.mark.example +@pytest.mark.durable_execution( + handler=wait_with_name.handler, + lambda_function_name="wait with name", +) +def test_wait_with_name(durable_runner): """Test wait with explicit name.""" - with DurableFunctionTestRunner(handler=wait_with_name.handler) as runner: - result: DurableFunctionTestResult = runner.run(input="test", timeout=10) + with durable_runner: + result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "Wait with name completed" + assert deserialize_operation_payload(result.result) == "Wait with name completed" wait_ops = [op for op in result.operations if op.operation_type.value == "WAIT"] assert len(wait_ops) == 1 diff --git a/pyproject.toml b/pyproject.toml index ca865b5..a0fd191 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,8 @@ dependencies = [ [tool.hatch.envs.test.scripts] test = "pytest tests/ -v" -examples = "pytest examples/test/ -v" +examples = "pytest --runner-mode=local -m example examples/test/ -v" +examples-integration = "pytest --runner-mode=cloud -m example examples/test/ -v {args}" cov = "pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/aws_durable_execution_sdk_python_testing --cov-fail-under=96" [tool.hatch.envs.examples] @@ -115,6 +116,7 @@ target-version = "py313" [tool.ruff.lint] preview = false +select = ["TID252"] # Enforce absolute imports (ban relative imports) [tool.ruff.lint.isort] known-first-party = ["aws_durable_execution_sdk_python_testing"] @@ -143,3 +145,16 @@ lines-after-imports = 2 "src/aws_durable_execution_sdk_python_testing/invoker.py" = [ "A002", # Argument `input` is shadowing a Python builtin ] + +[tool.pytest.ini_options] +# Declare custom markers to avoid warnings with --strict-markers +markers = [ + # Used for test selection with -m example + "example: marks tests as example tests (deselect with '-m \"not example\"')", + # Used for configuration - passes handler and lambda_function_name to durable_runner fixture + "durable_execution: marks tests that use the durable_runner fixture (not used for test selection)", +] +# Default test discovery paths +testpaths = ["tests", "examples/test"] +# Default options for all test runs +addopts = "-v --strict-markers" diff --git a/src/aws_durable_execution_sdk_python_testing/__init__.py b/src/aws_durable_execution_sdk_python_testing/__init__.py index 694927c..88b125f 100644 --- a/src/aws_durable_execution_sdk_python_testing/__init__.py +++ b/src/aws_durable_execution_sdk_python_testing/__init__.py @@ -1,3 +1,20 @@ """DurableExecutionsPythonTestingLibrary module.""" -# Implement your code here. +from aws_durable_execution_sdk_python_testing.runner import ( + DurableChildContextTestRunner, + DurableFunctionCloudTestRunner, + DurableFunctionTestResult, + DurableFunctionTestRunner, + WebRunner, + WebRunnerConfig, +) + + +__all__ = [ + "DurableChildContextTestRunner", + "DurableFunctionCloudTestRunner", + "DurableFunctionTestResult", + "DurableFunctionTestRunner", + "WebRunner", + "WebRunnerConfig", +] diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index 1025ab0..1d1d4d6 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -142,15 +142,15 @@ def get_execution_details(self, execution_arn: str) -> GetDurableExecutionRespon durable_execution_name=execution.start_input.execution_name, function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}", status=status, - start_timestamp=execution_op.start_timestamp.timestamp() + start_timestamp=execution_op.start_timestamp if execution_op.start_timestamp - else datetime.now(UTC).timestamp(), + else datetime.now(UTC), input_payload=execution_op.execution_details.input_payload if execution_op.execution_details else None, result=result, error=error, - end_timestamp=execution_op.end_timestamp.timestamp() + end_timestamp=execution_op.end_timestamp if execution_op.end_timestamp else None, version="1.0", @@ -223,10 +223,10 @@ def list_executions( durable_execution_name=execution.start_input.execution_name, function_arn=f"arn:aws:lambda:us-east-1:123456789012:function:{execution.start_input.function_name}", status=execution_status, - start_timestamp=execution_op.start_timestamp.timestamp() + start_timestamp=execution_op.start_timestamp if execution_op.start_timestamp - else datetime.now(UTC).timestamp(), - end_timestamp=execution_op.end_timestamp.timestamp() + else datetime.now(UTC), + end_timestamp=execution_op.end_timestamp if execution_op.end_timestamp else None, ) @@ -333,7 +333,7 @@ def stop_execution( # Stop the execution self.fail_execution(execution_arn, stop_error) - return StopDurableExecutionResponse(end_timestamp=datetime.now(UTC).timestamp()) + return StopDurableExecutionResponse(stop_timestamp=datetime.now(UTC)) def get_execution_state( self, diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index b13dea4..8c526dc 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -2,21 +2,29 @@ from __future__ import annotations -from dataclasses import dataclass +import datetime +from dataclasses import dataclass, replace from typing import Any # Import existing types from the main SDK - REUSE EVERYTHING POSSIBLE from aws_durable_execution_sdk_python.lambda_service import ( + CallbackDetails, CallbackOptions, + ChainedInvokeDetails, ChainedInvokeOptions, + ContextDetails, ContextOptions, ErrorObject, + ExecutionDetails, Operation, OperationAction, + OperationStatus, OperationSubType, OperationType, OperationUpdate, + StepDetails, StepOptions, + WaitDetails, WaitOptions, ) from aws_durable_execution_sdk_python.types import ( @@ -156,11 +164,11 @@ class GetDurableExecutionResponse: durable_execution_name: str function_arn: str status: str - start_timestamp: float + start_timestamp: datetime.datetime input_payload: str | None = None result: str | None = None error: ErrorObject | None = None - end_timestamp: float | None = None + end_timestamp: datetime.datetime | None = None version: str | None = None @classmethod @@ -213,8 +221,8 @@ class Execution: durable_execution_name: str function_arn: str status: str - start_timestamp: float - end_timestamp: float | None = None + start_timestamp: datetime.datetime + end_timestamp: datetime.datetime | None = None @classmethod def from_dict(cls, data: dict) -> Execution: @@ -350,14 +358,14 @@ def to_dict(self) -> dict[str, Any]: class StopDurableExecutionResponse: """Response from stopping a durable execution.""" - end_timestamp: float + stop_timestamp: datetime.datetime @classmethod def from_dict(cls, data: dict) -> StopDurableExecutionResponse: - return cls(end_timestamp=data["EndTimestamp"]) + return cls(stop_timestamp=data["StopTimestamp"]) def to_dict(self) -> dict[str, Any]: - return {"EndTimestamp": self.end_timestamp} + return {"StopTimestamp": self.stop_timestamp} @dataclass(frozen=True) @@ -676,7 +684,7 @@ class WaitStartedDetails: """Wait started event details.""" duration: int | None = None - scheduled_end_timestamp: str | None = None + scheduled_end_timestamp: datetime.datetime | None = None @classmethod def from_dict(cls, data: dict) -> WaitStartedDetails: @@ -1010,7 +1018,7 @@ class Event: """Event structure from Smithy model.""" event_type: str - event_timestamp: str + event_timestamp: datetime.datetime sub_type: str | None = None event_id: int = 1 operation_id: str | None = None @@ -1265,6 +1273,453 @@ def to_dict(self) -> dict[str, Any]: return result +@dataclass(frozen=True) +class HistoryEventTypeConfig: + """Configuration for how to process a specific event type.""" + + operation_type: OperationType | None + operation_status: OperationStatus | None + is_start_event: bool + is_end_event: bool + has_result: bool # Whether this event type contains result/error data + + +# Mapping of event types to their processing configuration +# This matches the TypeScript historyEventTypes constant +HISTORY_EVENT_TYPES: dict[str, HistoryEventTypeConfig] = { + "ExecutionStarted": HistoryEventTypeConfig( + operation_type=OperationType.EXECUTION, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=False, + ), + "ExecutionFailed": HistoryEventTypeConfig( + operation_type=OperationType.EXECUTION, + operation_status=OperationStatus.FAILED, + is_start_event=False, + is_end_event=True, + has_result=False, + ), + "ExecutionStopped": HistoryEventTypeConfig( + operation_type=OperationType.EXECUTION, + operation_status=OperationStatus.STOPPED, + is_start_event=False, + is_end_event=True, + has_result=False, + ), + "ExecutionSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.EXECUTION, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=False, + ), + "ExecutionTimedOut": HistoryEventTypeConfig( + operation_type=OperationType.EXECUTION, + operation_status=OperationStatus.TIMED_OUT, + is_start_event=False, + is_end_event=True, + has_result=False, + ), + "CallbackStarted": HistoryEventTypeConfig( + operation_type=OperationType.CALLBACK, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=False, + ), + "CallbackFailed": HistoryEventTypeConfig( + operation_type=OperationType.CALLBACK, + operation_status=OperationStatus.FAILED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "CallbackSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.CALLBACK, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "CallbackTimedOut": HistoryEventTypeConfig( + operation_type=OperationType.CALLBACK, + operation_status=OperationStatus.TIMED_OUT, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ContextStarted": HistoryEventTypeConfig( + operation_type=OperationType.CONTEXT, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=False, + ), + "ContextFailed": HistoryEventTypeConfig( + operation_type=OperationType.CONTEXT, + operation_status=OperationStatus.FAILED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ContextSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.CONTEXT, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ChainedInvokeStarted": HistoryEventTypeConfig( + operation_type=OperationType.CHAINED_INVOKE, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=False, + ), + "ChainedInvokeFailed": HistoryEventTypeConfig( + operation_type=OperationType.CHAINED_INVOKE, + operation_status=OperationStatus.FAILED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ChainedInvokeSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.CHAINED_INVOKE, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ChainedInvokeTimedOut": HistoryEventTypeConfig( + operation_type=OperationType.CHAINED_INVOKE, + operation_status=OperationStatus.TIMED_OUT, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "ChainedInvokeCancelled": HistoryEventTypeConfig( + operation_type=OperationType.CHAINED_INVOKE, + operation_status=OperationStatus.CANCELLED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "StepStarted": HistoryEventTypeConfig( + operation_type=OperationType.STEP, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=False, + ), + "StepFailed": HistoryEventTypeConfig( + operation_type=OperationType.STEP, + operation_status=OperationStatus.FAILED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "StepSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.STEP, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "WaitStarted": HistoryEventTypeConfig( + operation_type=OperationType.WAIT, + operation_status=OperationStatus.STARTED, + is_start_event=True, + is_end_event=False, + has_result=True, + ), + "WaitSucceeded": HistoryEventTypeConfig( + operation_type=OperationType.WAIT, + operation_status=OperationStatus.SUCCEEDED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + "WaitCancelled": HistoryEventTypeConfig( + operation_type=OperationType.WAIT, + operation_status=OperationStatus.CANCELLED, + is_start_event=False, + is_end_event=True, + has_result=True, + ), + # TODO: add support for populating invocation information from InvocationCompleted event + "InvocationCompleted": HistoryEventTypeConfig( + operation_type=None, + operation_status=None, + is_start_event=False, + is_end_event=False, + has_result=True, + ), +} + + +def events_to_operations(events: list[Event]) -> list[Operation]: + """Convert a list of history events into operations. + + This function processes raw history events and groups them by operation ID, + creating comprehensive operation objects following the TypeScript pattern from + aws-durable-execution-sdk-js-testing. + + Multiple events for the same operation_id are merged together, with each event + contributing its specific fields (e.g., CallbackStarted provides callback_id, + CallbackSucceeded provides result). + + Args: + events: List of history events to process + + Returns: + List of operations, one per unique operation ID + + Raises: + ValueError: When required fields are missing from an event + + Note: + InvocationCompleted events are currently skipped as they don't represent + operations. Future enhancement: populate invocation information from these + events (TODO). + """ + operations_map: dict[str, Operation] = {} + + for event in events: + if not event.event_type: + msg = "Missing required 'event_type' field in event" + raise ValueError(msg) + + # Get event type configuration + event_config: HistoryEventTypeConfig | None = HISTORY_EVENT_TYPES.get( + event.event_type + ) + if not event_config: + msg = f"Unknown event type: {event.event_type}" + raise ValueError(msg) + + # TODO: add support for populating invocation information from InvocationCompleted event + if event.event_type == "InvocationCompleted": + continue + + if not event.operation_id: + msg = f"Missing required 'operation_id' field in event {event.event_id}" + raise ValueError(msg) + + # Get previous operation if it exists + previous_operation: Operation | None = operations_map.get(event.operation_id) + + # Get operation type and status from configuration + operation_type: OperationType = ( + event_config.operation_type or OperationType.EXECUTION + ) + status: OperationStatus = ( + event_config.operation_status or OperationStatus.PENDING + ) + + # Parse sub_type + sub_type: OperationSubType | None = None + if event.sub_type: + try: + sub_type = OperationSubType(event.sub_type) + except ValueError: + pass + + # Create base operation + operation = Operation( + operation_id=event.operation_id, + operation_type=operation_type, + status=status, + name=event.name, + parent_id=event.parent_id, + sub_type=sub_type, + start_timestamp=datetime.datetime.now(tz=datetime.timezone.utc), + ) + + # Merge with previous operation if it exists + # Most fields are immutable, so they get preserved from previous events + if previous_operation: + operation = replace( + operation, + name=operation.name or previous_operation.name, + parent_id=operation.parent_id or previous_operation.parent_id, + sub_type=operation.sub_type or previous_operation.sub_type, + start_timestamp=previous_operation.start_timestamp, + end_timestamp=previous_operation.end_timestamp, + execution_details=previous_operation.execution_details, + context_details=previous_operation.context_details, + step_details=previous_operation.step_details, + wait_details=previous_operation.wait_details, + callback_details=previous_operation.callback_details, + chained_invoke_details=previous_operation.chained_invoke_details, + ) + + # Set timestamps based on event configuration + if event_config.is_start_event: + operation = replace(operation, start_timestamp=event.event_timestamp) + if event_config.is_end_event: + operation = replace(operation, end_timestamp=event.event_timestamp) + + # Add operation-specific details incrementally + # Each event type contributes only the fields it has + + # EXECUTION details + if ( + operation_type == OperationType.EXECUTION + and event.execution_started_details + and event.execution_started_details.input + ): + operation = replace( + operation, + execution_details=ExecutionDetails( + input_payload=event.execution_started_details.input.payload + ), + ) + + # CALLBACK details - merge callback_id, result, and error from different events + if operation_type == OperationType.CALLBACK: + existing_cb: CallbackDetails | None = operation.callback_details + callback_id: str = existing_cb.callback_id if existing_cb else "" + result: str | None = existing_cb.result if existing_cb else None + error: ErrorObject | None = existing_cb.error if existing_cb else None + + # CallbackStarted provides callback_id + if event.callback_started_details: + callback_id = event.callback_started_details.callback_id or callback_id + + # CallbackSucceeded provides result + if ( + event.callback_succeeded_details + and event.callback_succeeded_details.result + ): + result = event.callback_succeeded_details.result.payload + + # CallbackFailed provides error + if event.callback_failed_details and event.callback_failed_details.error: + error = event.callback_failed_details.error.payload + + # CallbackTimedOut provides error + if ( + event.callback_timed_out_details + and event.callback_timed_out_details.error + ): + error = event.callback_timed_out_details.error.payload + + operation = replace( + operation, + callback_details=CallbackDetails( + callback_id=callback_id, + result=result, + error=error, + ), + ) + + # STEP details - only update if this event type has result data + if operation_type == OperationType.STEP and event_config.has_result: + existing_step: StepDetails | None = operation.step_details + result_val: str | None = existing_step.result if existing_step else None + error_val: ErrorObject | None = ( + existing_step.error if existing_step else None + ) + attempt: int = existing_step.attempt if existing_step else 0 + next_attempt_ts: datetime.datetime | None = ( + existing_step.next_attempt_timestamp if existing_step else None + ) + + # StepSucceeded provides result + if event.step_succeeded_details: + if event.step_succeeded_details.result: + result_val = event.step_succeeded_details.result.payload + if event.step_succeeded_details.retry_details: + attempt = event.step_succeeded_details.retry_details.current_attempt + + # StepFailed provides error and retry details + if event.step_failed_details: + if event.step_failed_details.error: + error_val = event.step_failed_details.error.payload + if event.step_failed_details.retry_details: + attempt = event.step_failed_details.retry_details.current_attempt + if ( + event.step_failed_details.retry_details.next_attempt_delay_seconds + is not None + ): + next_attempt_ts = event.event_timestamp + datetime.timedelta( + seconds=event.step_failed_details.retry_details.next_attempt_delay_seconds + ) + + operation = replace( + operation, + step_details=StepDetails( + result=result_val, + error=error_val, + attempt=attempt, + next_attempt_timestamp=next_attempt_ts, + ), + ) + + # WAIT details + if operation_type == OperationType.WAIT and event.wait_started_details: + operation = replace( + operation, + wait_details=WaitDetails( + scheduled_timestamp=event.wait_started_details.scheduled_end_timestamp + ), + ) + + # CONTEXT details - only update if this event type has result data (matching TypeScript hasResult) + if operation_type == OperationType.CONTEXT and event_config.has_result: + if ( + event.context_succeeded_details + and event.context_succeeded_details.result + ): + operation = replace( + operation, + context_details=ContextDetails( + result=event.context_succeeded_details.result.payload, + error=None, + ), + ) + elif event.context_failed_details and event.context_failed_details.error: + operation = replace( + operation, + context_details=ContextDetails( + result=None, + error=event.context_failed_details.error.payload, + ), + ) + + # CHAINED_INVOKE details - only update if this event type has result data (matching TypeScript hasResult) + if operation_type == OperationType.CHAINED_INVOKE and event_config.has_result: + if ( + event.chained_invoke_succeeded_details + and event.chained_invoke_succeeded_details.result + ): + operation = replace( + operation, + chained_invoke_details=ChainedInvokeDetails( + result=event.chained_invoke_succeeded_details.result.payload, + error=None, + ), + ) + elif ( + event.chained_invoke_failed_details + and event.chained_invoke_failed_details.error + ): + operation = replace( + operation, + chained_invoke_details=ChainedInvokeDetails( + result=None, + error=event.chained_invoke_failed_details.error.payload, + ), + ) + + # Store in map + operations_map[event.operation_id] = operation + + return list(operations_map.values()) + + @dataclass(frozen=True) class GetDurableExecutionHistoryRequest: """Request to get durable execution history.""" diff --git a/src/aws_durable_execution_sdk_python_testing/runner.py b/src/aws_durable_execution_sdk_python_testing/runner.py index b29d76f..4480675 100644 --- a/src/aws_durable_execution_sdk_python_testing/runner.py +++ b/src/aws_durable_execution_sdk_python_testing/runner.py @@ -3,6 +3,7 @@ import json import logging import os +import time from dataclasses import dataclass, field from typing import ( TYPE_CHECKING, @@ -23,6 +24,7 @@ ) from aws_durable_execution_sdk_python.lambda_service import ( ErrorObject, + OperationPayload, OperationStatus, OperationSubType, OperationType, @@ -44,8 +46,11 @@ LambdaInvoker, ) from aws_durable_execution_sdk_python_testing.model import ( + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, StartDurableExecutionInput, StartDurableExecutionOutput, + events_to_operations, ) from aws_durable_execution_sdk_python_testing.scheduler import Scheduler from aws_durable_execution_sdk_python_testing.stores.base import ( @@ -152,7 +157,7 @@ def from_svc_operation( @dataclass(frozen=True) class ContextOperation(Operation): child_operations: list[Operation] - result: Any = None + result: OperationPayload | None = None error: ErrorObject | None = None @staticmethod @@ -181,11 +186,9 @@ def from_svc_operation( start_timestamp=operation.start_timestamp, end_timestamp=operation.end_timestamp, child_operations=child_operations, - result=( - json.loads(operation.context_details.result) - if operation.context_details and operation.context_details.result - else None - ), + result=operation.context_details.result + if operation.context_details + else None, error=operation.context_details.error if operation.context_details else None, @@ -221,7 +224,7 @@ def get_execution(self, name: str) -> ExecutionOperation: class StepOperation(ContextOperation): attempt: int = 0 next_attempt_timestamp: datetime.datetime | None = None - result: Any = None + result: OperationPayload | None = None error: ErrorObject | None = None @staticmethod @@ -256,11 +259,7 @@ def from_svc_operation( if operation.step_details else None ), - result=( - json.loads(operation.step_details.result) - if operation.step_details and operation.step_details.result - else None - ), + result=operation.step_details.result if operation.step_details else None, error=operation.step_details.error if operation.step_details else None, ) @@ -297,7 +296,7 @@ def from_svc_operation( @dataclass(frozen=True) class CallbackOperation(ContextOperation): callback_id: str | None = None - result: Any = None + result: OperationPayload | None = None error: ErrorObject | None = None @staticmethod @@ -331,11 +330,9 @@ def from_svc_operation( if operation.callback_details else None ), - result=( - json.loads(operation.callback_details.result) - if operation.callback_details and operation.callback_details.result - else None - ), + result=operation.callback_details.result + if operation.callback_details + else None, error=operation.callback_details.error if operation.callback_details else None, @@ -344,7 +341,7 @@ def from_svc_operation( @dataclass(frozen=True) class InvokeOperation(Operation): - result: Any = None + result: OperationPayload | None = None error: ErrorObject | None = None @staticmethod @@ -364,12 +361,9 @@ def from_svc_operation( sub_type=operation.sub_type, start_timestamp=operation.start_timestamp, end_timestamp=operation.end_timestamp, - result=( - json.loads(operation.chained_invoke_details.result) - if operation.chained_invoke_details - and operation.chained_invoke_details.result - else None - ), + result=operation.chained_invoke_details.result + if operation.chained_invoke_details + else None, error=operation.chained_invoke_details.error if operation.chained_invoke_details else None, @@ -402,7 +396,7 @@ def create_operation( class DurableFunctionTestResult: status: InvocationStatus operations: list[Operation] - result: Any = None + result: OperationPayload | None = None error: ErrorObject | None = None @classmethod @@ -420,17 +414,55 @@ def create(cls, execution: Execution) -> DurableFunctionTestResult: msg: str = "Execution result must exist to create test result." raise DurableFunctionsTestError(msg) - deserialized_result = ( - json.loads(execution.result.result) if execution.result.result else None - ) - return cls( status=execution.result.status, operations=operations, - result=deserialized_result, + result=execution.result.result, error=execution.result.error, ) + @classmethod + def from_execution_history( + cls, + execution_response: GetDurableExecutionResponse, + history_response: GetDurableExecutionHistoryResponse, + ) -> DurableFunctionTestResult: + """Create test result from execution history responses. + + Factory method for cloud runner that builds DurableFunctionTestResult + from GetDurableExecution and GetDurableExecutionHistory API responses. + """ + # Map status string to InvocationStatus enum + try: + status = InvocationStatus[execution_response.status] + except KeyError: + logger.warning( + "Unknown status: %s, defaulting to FAILED", execution_response.status + ) + status = InvocationStatus.FAILED + + # Convert Events to Operations - group by operation_id and merge + try: + svc_operations = events_to_operations(history_response.events) + except Exception as e: + logger.warning("Failed to convert events to operations: %s", e) + svc_operations = [] + + # Build operation tree (exclude EXECUTION type from top level) + operations = [] + for svc_op in svc_operations: + if svc_op.operation_type == OperationType.EXECUTION: + continue + if svc_op.parent_id is None: + operations.append(create_operation(svc_op, svc_operations)) + + return cls( + status=status, + operations=operations, + result=execution_response.result, + error=execution_response.error, + ) + def get_operation_by_name(self, name: str) -> Operation: for operation in self.operations: if operation.name == name: @@ -680,3 +712,195 @@ def _create_boto3_client(self) -> Any: endpoint_url=self._config.lambda_endpoint, region_name=self._config.local_runner_region, ) + + +class DurableFunctionCloudTestRunner: + """Test runner that executes durable functions against actual AWS Lambda backend. + + This runner invokes deployed Lambda functions and polls for execution completion, + providing the same interface as DurableFunctionTestRunner for seamless test + compatibility between local and cloud modes. + + Example: + >>> runner = DurableFunctionCloudTestRunner( + ... function_name="HelloWorld-Python-PR-123", region="us-west-2" + ... ) + >>> with runner: + ... result = runner.run(input={"name": "World"}, timeout=60) + >>> assert result.status == InvocationStatus.SUCCEEDED + """ + + def __init__( + self, + function_name: str, + region: str = "us-west-2", + lambda_endpoint: str | None = None, + poll_interval: float = 1.0, + ): + """Initialize cloud test runner.""" + self.function_name = function_name + self.region = region + self.lambda_endpoint = lambda_endpoint + self.poll_interval = poll_interval + + # Set up AWS data path for custom boto models (durable execution fields) + package_path = os.path.dirname(aws_durable_execution_sdk_python.__file__) + data_path = f"{package_path}/botocore/data" + os.environ["AWS_DATA_PATH"] = data_path + + client_config = boto3.session.Config(parameter_validation=False) + self.lambda_client = boto3.client( + "lambdainternal", + endpoint_url=lambda_endpoint, + region_name=region, + config=client_config, + ) + + def run( + self, + input: str | None = None, # noqa: A002 + timeout: int = 60, + ) -> DurableFunctionTestResult: + """Execute function on AWS Lambda and wait for completion.""" + logger.info( + "Invoking Lambda function: %s (timeout: %ds)", self.function_name, timeout + ) + + # JSON encode input + payload = json.dumps(input) + + # Invoke Lambda function + try: + response = self.lambda_client.invoke( + FunctionName=self.function_name, + InvocationType="RequestResponse", + Payload=payload, + ) + except Exception as e: + msg = f"Failed to invoke Lambda function {self.function_name}: {e}" + raise DurableFunctionsTestError(msg) from e + + # Check HTTP status code (200 for RequestResponse, 202 for Event, 204 for DryRun) + status_code = response.get("StatusCode") + if status_code not in (200, 202, 204): + error_payload = response["Payload"].read().decode("utf-8") + msg = f"Lambda invocation failed with status {status_code}: {error_payload}" + raise DurableFunctionsTestError(msg) + + # Check for function errors + if "FunctionError" in response: + error_payload = response["Payload"].read().decode("utf-8") + msg = f"Lambda function failed: {error_payload}" + raise DurableFunctionsTestError(msg) + + result_payload = response["Payload"].read().decode("utf-8") + logger.info( + "Lambda invocation completed, response: %s", + result_payload, + ) + + # Extract durable execution ARN from response headers + # The InvocationResponse includes X-Amz-Durable-Execution-Arn header + execution_arn = response.get("DurableExecutionArn") + if not execution_arn: + msg = ( + f"No DurableExecutionArn in response for function {self.function_name}" + ) + raise DurableFunctionsTestError(msg) + + # Poll for completion + execution_response = self._wait_for_completion(execution_arn, timeout) + + # Get execution history + history_response = self._get_execution_history(execution_arn) + + # Build test result from execution history + return DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + def _wait_for_completion( + self, execution_arn: str, timeout: int + ) -> GetDurableExecutionResponse: + """Poll execution status until completion or timeout. + + Args: + execution_arn: ARN of the durable execution + timeout: Maximum seconds to wait + + Returns: + GetDurableExecutionResponse with typed execution details + + Raises: + TimeoutError: If execution doesn't complete within timeout + DurableFunctionsTestError: If status check fails + """ + start_time = time.time() + last_status = None + + while time.time() - start_time < timeout: + try: + execution_dict = self.lambda_client.get_durable_execution( + DurableExecutionArn=execution_arn + ) + execution = GetDurableExecutionResponse.from_dict(execution_dict) + except Exception as e: + msg = f"Failed to get execution status: {e}" + raise DurableFunctionsTestError(msg) from e + + # Log status changes + if execution.status != last_status: + logger.info("Execution status: %s", execution.status) + last_status = execution.status + + # Check if execution completed + if execution.status == "SUCCEEDED": + logger.info("Execution succeeded") + return execution + if execution.status == "FAILED": + logger.warning("Execution failed") + return execution + if execution.status in ["TIMED_OUT", "ABORTED"]: + logger.warning("Execution terminated: %s", execution.status) + return execution + + # Wait before next poll + time.sleep(self.poll_interval) + + # Timeout reached + elapsed = time.time() - start_time + msg = ( + f"Execution did not complete within {timeout}s " + f"(elapsed: {elapsed:.1f}s, last status: {last_status})" + ) + raise TimeoutError(msg) + + def _get_execution_history( + self, execution_arn: str + ) -> GetDurableExecutionHistoryResponse: + """Retrieve execution history from Lambda service. + + Args: + execution_arn: ARN of the durable execution + + Returns: + GetDurableExecutionHistoryResponse with typed Event objects + + Raises: + DurableFunctionsTestError: If history retrieval fails + """ + try: + history_dict = self.lambda_client.get_durable_execution_history( + DurableExecutionArn=execution_arn, + IncludeExecutionData=True, + ) + history_response = GetDurableExecutionHistoryResponse.from_dict( + history_dict + ) + except Exception as e: + msg = f"Failed to get execution history: {e}" + raise DurableFunctionsTestError(msg) from e + + logger.info("Retrieved %d events from history", len(history_response.events)) + + return history_response diff --git a/tests/e2e/basic_success_path_test.py b/tests/e2e/basic_success_path_test.py index 7b26f61..a24d516 100644 --- a/tests/e2e/basic_success_path_test.py +++ b/tests/e2e/basic_success_path_test.py @@ -1,5 +1,6 @@ """Functional tests, covering end-to-end DurableTestRunner.""" +import json from typing import Any from aws_durable_execution_sdk_python.context import ( @@ -71,16 +72,16 @@ def function_under_test(event: Any, context: DurableContext) -> list[str]: result: DurableFunctionTestResult = runner.run(input="input str", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == ["1 2", "3 4 4 3", "5 6"] + assert result.result == json.dumps(["1 2", "3 4 4 3", "5 6"]) one_result: StepOperation = result.get_step("one") - assert one_result.result == "1 2" + assert one_result.result == json.dumps("1 2") two_result: ContextOperation = result.get_context("two") - assert two_result.result == "3 4 4 3" + assert two_result.result == json.dumps("3 4 4 3") three_result: StepOperation = result.get_step("three") - assert three_result.result == "5 6" + assert three_result.result == json.dumps("5 6") # currently has the optimization where it's not saving child checkpoints after parent done # prob should unpick that for test diff --git a/tests/executor_test.py b/tests/executor_test.py index 7babe70..78a067e 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -1860,7 +1860,7 @@ def test_stop_execution(executor, mock_store): mock_store.load.assert_called_once_with("test-arn") mock_fail.assert_called_once() - assert result.end_timestamp is not None + assert result.stop_timestamp is not None def test_stop_execution_already_complete(executor, mock_store): diff --git a/tests/model_test.py b/tests/model_test.py index 98afc62..d7b2c55 100644 --- a/tests/model_test.py +++ b/tests/model_test.py @@ -421,10 +421,10 @@ def test_stop_durable_execution_request_minimal(): def test_stop_durable_execution_response_serialization(): """Test StopDurableExecutionResponse from_dict/to_dict round-trip.""" - data = {"EndTimestamp": "2023-01-01T00:01:00Z"} + data = {"StopTimestamp": "2023-01-01T00:01:00Z"} response_obj = StopDurableExecutionResponse.from_dict(data) - assert response_obj.end_timestamp == "2023-01-01T00:01:00Z" + assert response_obj.stop_timestamp == "2023-01-01T00:01:00Z" result_data = response_obj.to_dict() assert result_data == data @@ -2931,3 +2931,678 @@ def test_checkpoint_updated_execution_state_with_next_marker(): "NextMarker": "next-marker-123", } assert result_data == expected_data + + +# Tests for events_to_operations function + + +def test_events_to_operations_empty_list(): + """Test events_to_operations with empty event list.""" + from aws_durable_execution_sdk_python_testing.model import events_to_operations + + operations = events_to_operations([]) + assert operations == [] + + +def test_events_to_operations_execution_started(): + """Test events_to_operations with ExecutionStarted event.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventInput, + ExecutionStartedDetails, + events_to_operations, + ) + + event = Event( + event_type="ExecutionStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="exec-1", + execution_started_details=ExecutionStartedDetails( + input=EventInput(payload="test-input", truncated=False), + execution_timeout=300, + ), + ) + + operations = events_to_operations([event]) + + assert len(operations) == 1 + assert operations[0].operation_id == "exec-1" + assert operations[0].operation_type == OperationType.EXECUTION + assert operations[0].status == OperationStatus.STARTED + assert operations[0].execution_details.input_payload == "test-input" + + +def test_events_to_operations_callback_lifecycle(): + """Test events_to_operations with complete callback lifecycle.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + CallbackStartedDetails, + CallbackSucceededDetails, + Event, + EventResult, + events_to_operations, + ) + + started_event = Event( + event_type="CallbackStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + name="test-callback", + callback_started_details=CallbackStartedDetails(callback_id="callback-123"), + ) + + succeeded_event = Event( + event_type="CallbackSucceeded", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + callback_succeeded_details=CallbackSucceededDetails( + result=EventResult(payload="callback-result", truncated=False) + ), + ) + + operations = events_to_operations([started_event, succeeded_event]) + + assert len(operations) == 1 + assert operations[0].operation_id == "cb-1" + assert operations[0].operation_type == OperationType.CALLBACK + assert operations[0].status == OperationStatus.SUCCEEDED + assert operations[0].name == "test-callback" + assert operations[0].callback_details.callback_id == "callback-123" + assert operations[0].callback_details.result == "callback-result" + assert operations[0].callback_details.error is None + + +def test_events_to_operations_missing_event_type(): + """Test events_to_operations raises error for missing event_type.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type=None, + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + ) + + with pytest.raises(ValueError, match="Missing required 'event_type' field"): + events_to_operations([event]) + + +def test_events_to_operations_unknown_event_type(): + """Test events_to_operations raises error for unknown event type.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type="UnknownEventType", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="op-1", + ) + + with pytest.raises(ValueError, match="Unknown event type: UnknownEventType"): + events_to_operations([event]) + + +def test_events_to_operations_missing_operation_id(): + """Test events_to_operations raises error for missing operation_id.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type="StepStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id=None, + ) + + with pytest.raises(ValueError, match="Missing required 'operation_id' field"): + events_to_operations([event]) + + +def test_events_to_operations_step_with_retry(): + """Test events_to_operations with step retry details.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventResult, + RetryDetails, + StepSucceededDetails, + events_to_operations, + ) + + succeeded_event = Event( + event_type="StepSucceeded", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="step-1", + name="test-step", + step_succeeded_details=StepSucceededDetails( + result=EventResult(payload="step-result", truncated=False), + retry_details=RetryDetails(current_attempt=2), + ), + ) + + operations = events_to_operations([succeeded_event]) + + assert len(operations) == 1 + assert operations[0].operation_type == OperationType.STEP + assert operations[0].status == OperationStatus.SUCCEEDED + assert operations[0].step_details.result == "step-result" + assert operations[0].step_details.attempt == 2 + + +def test_events_to_operations_step_failed_with_next_attempt(): + """Test events_to_operations with failed step and next attempt timestamp.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventError, + RetryDetails, + StepFailedDetails, + events_to_operations, + ) + + event_time = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC) + failed_event = Event( + event_type="StepFailed", + event_timestamp=event_time, + operation_id="step-1", + step_failed_details=StepFailedDetails( + error=EventError( + payload=ErrorObject( + message="step failed", type=None, data=None, stack_trace=None + ) + ), + retry_details=RetryDetails( + current_attempt=1, next_attempt_delay_seconds=10 + ), + ), + ) + + operations = events_to_operations([failed_event]) + + assert len(operations) == 1 + assert operations[0].status == OperationStatus.FAILED + assert operations[0].step_details.error.message == "step failed" + assert operations[0].step_details.attempt == 1 + expected_next_attempt = event_time + datetime.timedelta(seconds=10) + assert operations[0].step_details.next_attempt_timestamp == expected_next_attempt + + +def test_events_to_operations_context_succeeded(): + """Test events_to_operations with successful context.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + ContextSucceededDetails, + Event, + EventResult, + events_to_operations, + ) + + succeeded_event = Event( + event_type="ContextSucceeded", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="ctx-1", + name="test-context", + context_succeeded_details=ContextSucceededDetails( + result=EventResult(payload="context-result", truncated=False) + ), + ) + + operations = events_to_operations([succeeded_event]) + + assert len(operations) == 1 + assert operations[0].operation_type == OperationType.CONTEXT + assert operations[0].status == OperationStatus.SUCCEEDED + assert operations[0].context_details.result == "context-result" + assert operations[0].context_details.error is None + + +def test_events_to_operations_chained_invoke_succeeded(): + """Test events_to_operations with successful chained invoke.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + ChainedInvokeSucceededDetails, + Event, + EventResult, + events_to_operations, + ) + + succeeded_event = Event( + event_type="ChainedInvokeSucceeded", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="invoke-1", + name="test-invoke", + chained_invoke_succeeded_details=ChainedInvokeSucceededDetails( + result=EventResult(payload="invoke-result", truncated=False) + ), + ) + + operations = events_to_operations([succeeded_event]) + + assert len(operations) == 1 + assert operations[0].operation_type == OperationType.CHAINED_INVOKE + assert operations[0].status == OperationStatus.SUCCEEDED + assert operations[0].chained_invoke_details.result == "invoke-result" + assert operations[0].chained_invoke_details.error is None + + +def test_events_to_operations_skips_invocation_completed(): + """Test events_to_operations skips InvocationCompleted events.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + invocation_event = Event( + event_type="InvocationCompleted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="invocation-1", + ) + + operations = events_to_operations([invocation_event]) + assert len(operations) == 0 + + +def test_events_to_operations_callback_failed(): + """Test events_to_operations with failed callback.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + CallbackFailedDetails, + CallbackStartedDetails, + Event, + EventError, + events_to_operations, + ) + + started_event = Event( + event_type="CallbackStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + callback_started_details=CallbackStartedDetails(callback_id="callback-123"), + ) + + failed_event = Event( + event_type="CallbackFailed", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + callback_failed_details=CallbackFailedDetails( + error=EventError( + payload=ErrorObject( + message="callback failed", type=None, data=None, stack_trace=None + ) + ) + ), + ) + + operations = events_to_operations([started_event, failed_event]) + + assert len(operations) == 1 + assert operations[0].status == OperationStatus.FAILED + assert operations[0].callback_details.error.message == "callback failed" + assert operations[0].callback_details.result is None + + +def test_events_to_operations_callback_timed_out(): + """Test events_to_operations with timed out callback.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + CallbackStartedDetails, + CallbackTimedOutDetails, + Event, + EventError, + events_to_operations, + ) + + started_event = Event( + event_type="CallbackStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + callback_started_details=CallbackStartedDetails(callback_id="callback-123"), + ) + + timed_out_event = Event( + event_type="CallbackTimedOut", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="cb-1", + callback_timed_out_details=CallbackTimedOutDetails( + error=EventError( + payload=ErrorObject( + message="callback timed out", type=None, data=None, stack_trace=None + ) + ) + ), + ) + + operations = events_to_operations([started_event, timed_out_event]) + + assert len(operations) == 1 + assert operations[0].status == OperationStatus.TIMED_OUT + assert operations[0].callback_details.error.message == "callback timed out" + + +def test_events_to_operations_wait_started(): + """Test events_to_operations with wait operation.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + WaitStartedDetails, + events_to_operations, + ) + + scheduled_time = datetime.datetime(2023, 1, 1, 1, 0, 0, tzinfo=datetime.UTC) + wait_event = Event( + event_type="WaitStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="wait-1", + name="test-wait", + wait_started_details=WaitStartedDetails( + duration=3600, scheduled_end_timestamp=scheduled_time + ), + ) + + operations = events_to_operations([wait_event]) + + assert len(operations) == 1 + assert operations[0].operation_type == OperationType.WAIT + assert operations[0].status == OperationStatus.STARTED + assert operations[0].wait_details.scheduled_timestamp == scheduled_time + + +def test_events_to_operations_context_failed(): + """Test events_to_operations with failed context.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + ContextFailedDetails, + Event, + EventError, + events_to_operations, + ) + + failed_event = Event( + event_type="ContextFailed", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="ctx-1", + context_failed_details=ContextFailedDetails( + error=EventError( + payload=ErrorObject( + message="context failed", type=None, data=None, stack_trace=None + ) + ) + ), + ) + + operations = events_to_operations([failed_event]) + + assert len(operations) == 1 + assert operations[0].status == OperationStatus.FAILED + assert operations[0].context_details.error.message == "context failed" + assert operations[0].context_details.result is None + + +def test_events_to_operations_chained_invoke_failed(): + """Test events_to_operations with failed chained invoke.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + ChainedInvokeFailedDetails, + Event, + EventError, + events_to_operations, + ) + + failed_event = Event( + event_type="ChainedInvokeFailed", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="invoke-1", + chained_invoke_failed_details=ChainedInvokeFailedDetails( + error=EventError( + payload=ErrorObject( + message="invoke failed", type=None, data=None, stack_trace=None + ) + ) + ), + ) + + operations = events_to_operations([failed_event]) + + assert len(operations) == 1 + assert operations[0].status == OperationStatus.FAILED + assert operations[0].chained_invoke_details.error.message == "invoke failed" + assert operations[0].chained_invoke_details.result is None + + +def test_events_to_operations_multiple_operations(): + """Test events_to_operations with multiple different operations.""" + import datetime + + from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, + ) + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventResult, + StepSucceededDetails, + events_to_operations, + ) + + events = [ + Event( + event_type="StepStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="step-1", + name="step-one", + ), + Event( + event_type="StepSucceeded", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + operation_id="step-1", + step_succeeded_details=StepSucceededDetails( + result=EventResult(payload="result-1", truncated=False) + ), + ), + Event( + event_type="WaitStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 2, 0, tzinfo=datetime.UTC), + operation_id="wait-1", + name="wait-one", + ), + ] + + operations = events_to_operations(events) + + assert len(operations) == 2 + step_op = next(op for op in operations if op.operation_id == "step-1") + wait_op = next(op for op in operations if op.operation_id == "wait-1") + + assert step_op.operation_type == OperationType.STEP + assert step_op.status == OperationStatus.SUCCEEDED + assert step_op.name == "step-one" + + assert wait_op.operation_type == OperationType.WAIT + assert wait_op.status == OperationStatus.STARTED + assert wait_op.name == "wait-one" + + +def test_events_to_operations_merges_timestamps(): + """Test events_to_operations correctly merges start and end timestamps.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventResult, + StepSucceededDetails, + events_to_operations, + ) + + start_time = datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC) + + events = [ + Event( + event_type="StepStarted", + event_timestamp=start_time, + operation_id="step-1", + ), + Event( + event_type="StepSucceeded", + event_timestamp=end_time, + operation_id="step-1", + step_succeeded_details=StepSucceededDetails( + result=EventResult(payload="result", truncated=False) + ), + ), + ] + + operations = events_to_operations(events) + + assert len(operations) == 1 + assert operations[0].start_timestamp == start_time + assert operations[0].end_timestamp == end_time + + +def test_events_to_operations_preserves_parent_id(): + """Test events_to_operations preserves parent_id from events.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type="StepStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="step-1", + parent_id="parent-ctx", + name="child-step", + ) + + operations = events_to_operations([event]) + + assert len(operations) == 1 + assert operations[0].parent_id == "parent-ctx" + + +def test_events_to_operations_preserves_sub_type(): + """Test events_to_operations preserves sub_type from events.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type="StepStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="step-1", + sub_type="Step", + ) + + operations = events_to_operations([event]) + + assert len(operations) == 1 + assert operations[0].sub_type is not None + assert operations[0].sub_type.value == "Step" + + +def test_events_to_operations_invalid_sub_type(): + """Test events_to_operations handles invalid sub_type gracefully.""" + import datetime + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + events_to_operations, + ) + + event = Event( + event_type="StepStarted", + event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + operation_id="step-1", + sub_type="INVALID_SUB_TYPE", + ) + + operations = events_to_operations([event]) + + assert len(operations) == 1 + # Invalid sub_type should be ignored (set to None) + assert operations[0].sub_type is None diff --git a/tests/runner_test.py b/tests/runner_test.py index 552fa1e..76d38f6 100644 --- a/tests/runner_test.py +++ b/tests/runner_test.py @@ -110,7 +110,7 @@ def test_context_operation_from_svc_operation(): assert ctx_op.operation_id == "ctx-id" assert ctx_op.operation_type is OperationType.CONTEXT - assert ctx_op.result == "test-result" + assert ctx_op.result == json.dumps("test-result") assert ctx_op.child_operations == [] @@ -318,7 +318,7 @@ def test_step_operation_from_svc_operation(): assert step_op.operation_id == "step-id" assert step_op.operation_type is OperationType.STEP assert step_op.attempt == 2 - assert step_op.result == "step-result" + assert step_op.result == json.dumps("step-result") def test_step_operation_wrong_type(): @@ -386,7 +386,7 @@ def test_callback_operation_from_svc_operation(): assert callback_op.operation_id == "callback-id" assert callback_op.operation_type is OperationType.CALLBACK assert callback_op.callback_id == "cb-123" - assert callback_op.result == "callback-result" + assert callback_op.result == json.dumps("callback-result") def test_callback_operation_wrong_type(): @@ -420,7 +420,7 @@ def test_invoke_operation_from_svc_operation(): assert invoke_op.operation_id == "invoke-id" assert invoke_op.operation_type is OperationType.CHAINED_INVOKE - assert invoke_op.result == "invoke-result" + assert invoke_op.result == json.dumps("invoke-result") def test_invoke_operation_wrong_type(): @@ -508,7 +508,7 @@ def test_durable_function_test_result_create(): result = DurableFunctionTestResult.create(execution) assert result.status is InvocationStatus.SUCCEEDED - assert result.result == "test-result" + assert result.result == json.dumps("test-result") assert result.error is None assert len(result.operations) == 1 # EXECUTION operation filtered out @@ -1024,3 +1024,577 @@ def test_durable_child_context_test_runner_init_with_args( # verify that handler is called with expected args when durable function is invoked durable_execution_func(Mock(), Mock()) handler.assert_called_once_with(str_input, num=num_input) + + +# Tests for DurableFunctionCloudTestRunner and from_execution_history + + +def test_durable_function_test_result_from_execution_history(): + """Test DurableFunctionTestResult.from_execution_history factory method.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + EventResult, + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + StepSucceededDetails, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="SUCCEEDED", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + end_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + result="test-result", + error=None, + ) + + history_response = GetDurableExecutionHistoryResponse( + events=[ + Event( + event_type="ExecutionStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC + ), + operation_id="exec-1", + ), + Event( + event_type="StepStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 10, tzinfo=datetime.UTC + ), + operation_id="step-1", + name="test-step", + ), + Event( + event_type="StepSucceeded", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 20, tzinfo=datetime.UTC + ), + operation_id="step-1", + step_succeeded_details=StepSucceededDetails( + result=EventResult(payload="step-result", truncated=False) + ), + ), + ] + ) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert result.status == InvocationStatus.SUCCEEDED + assert result.result == "test-result" + assert result.error is None + assert len(result.operations) == 1 + assert result.operations[0].name == "test-step" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_init(mock_boto3): + """Test DurableFunctionCloudTestRunner initialization.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", + region="us-west-2", + poll_interval=0.5, + ) + + assert runner.function_name == "test-function" + assert runner.region == "us-west-2" + assert runner.poll_interval == 0.5 + mock_boto3.client.assert_called_once() + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_success(mock_boto3): + """Test DurableFunctionCloudTestRunner.run with successful execution.""" + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 200, + "Payload": Mock(read=lambda: b'{"result": "success"}'), + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + } + + mock_client.get_durable_execution.return_value = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + "DurableExecutionName": "test-execution", + "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:test", + "Status": "SUCCEEDED", + "StartTimestamp": "2023-01-01T00:00:00Z", + "EndTimestamp": "2023-01-01T00:01:00Z", + "Result": "test-result", + } + + mock_client.get_durable_execution_history.return_value = { + "Events": [ + { + "EventType": "ExecutionStarted", + "EventTimestamp": "2023-01-01T00:00:00Z", + "Id": "exec-1", + } + ] + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + + result = runner.run(input="test-input", timeout=10) + + assert result.status == InvocationStatus.SUCCEEDED + assert result.result == "test-result" + mock_client.invoke.assert_called_once() + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_invoke_failure(mock_boto3): + """Test DurableFunctionCloudTestRunner.run with invoke failure.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.invoke.side_effect = Exception("Invoke failed") + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to invoke Lambda function" + ): + runner.run(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +@patch("aws_durable_execution_sdk_python_testing.runner.time") +def test_cloud_runner_wait_for_completion_timeout(mock_time, mock_boto3): + """Test DurableFunctionCloudTestRunner._wait_for_completion with timeout.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_time.time.side_effect = [0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0] + + mock_client.get_durable_execution.return_value = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + "DurableExecutionName": "test-execution", + "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:test", + "Status": "RUNNING", + "StartTimestamp": "2023-01-01T00:00:00Z", + } + + runner = DurableFunctionCloudTestRunner( + function_name="test-function", poll_interval=0.01 + ) + + with pytest.raises(TimeoutError, match="Execution did not complete within"): + runner._wait_for_completion("test-arn", timeout=2) + + +def test_durable_function_test_result_from_execution_history_with_exception(): + """Test from_execution_history handles events_to_operations exception.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="SUCCEEDED", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + ) + + history_response = GetDurableExecutionHistoryResponse( + events=[ + Event( + event_type="StepStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC + ), + operation_id=None, + ) + ] + ) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert result.status == InvocationStatus.SUCCEEDED + assert len(result.operations) == 0 + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_completion_failed_status(mock_boto3): + """Test DurableFunctionCloudTestRunner._wait_for_completion with FAILED status.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution.return_value = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + "DurableExecutionName": "test-execution", + "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:test", + "Status": "FAILED", + "StartTimestamp": "2023-01-01T00:00:00Z", + "EndTimestamp": "2023-01-01T00:01:00Z", + "Error": {"ErrorMessage": "execution failed"}, + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + result = runner._wait_for_completion("test-arn", timeout=10) + + assert result.status == "FAILED" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_bad_status_code(mock_boto3): + """Test DurableFunctionCloudTestRunner.run with bad HTTP status code.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 500, + "Payload": Mock(read=lambda: b"Internal Server Error"), + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Lambda invocation failed with status 500" + ): + runner.run(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_function_error(mock_boto3): + """Test DurableFunctionCloudTestRunner.run with function error.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 200, + "FunctionError": "Unhandled", + "Payload": Mock(read=lambda: b'{"errorMessage": "Function failed"}'), + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises(DurableFunctionsTestError, match="Lambda function failed"): + runner.run(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_run_missing_execution_arn(mock_boto3): + """Test DurableFunctionCloudTestRunner.run with missing execution ARN.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.invoke.return_value = { + "StatusCode": 200, + "Payload": Mock(read=lambda: b'{"result": "success"}'), + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="No DurableExecutionArn in response" + ): + runner.run(input="test-input") + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_completion_get_execution_failure(mock_boto3): + """Test DurableFunctionCloudTestRunner._wait_for_completion with API failure.""" + from aws_durable_execution_sdk_python_testing.exceptions import ( + DurableFunctionsTestError, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.get_durable_execution.side_effect = Exception("API error") + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + + with pytest.raises( + DurableFunctionsTestError, match="Failed to get execution status" + ): + runner._wait_for_completion("test-arn", timeout=10) + + +def test_durable_function_test_result_from_execution_history_filters_execution_type(): + """Test from_execution_history filters out EXECUTION type operations.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="SUCCEEDED", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + ) + + history_response = GetDurableExecutionHistoryResponse( + events=[ + Event( + event_type="ExecutionStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC + ), + operation_id="exec-1", + ), + ] + ) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert len(result.operations) == 0 + + +def test_durable_function_test_result_from_execution_history_unknown_status(): + """Test from_execution_history with unknown status defaults to FAILED.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.model import ( + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="UNKNOWN_STATUS", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + ) + + history_response = GetDurableExecutionHistoryResponse(events=[]) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert result.status == InvocationStatus.FAILED + + +def test_durable_function_test_result_from_execution_history_with_parent_operations(): + """Test from_execution_history filters operations with parent_id.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + + from aws_durable_execution_sdk_python_testing.model import ( + Event, + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="SUCCEEDED", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + ) + + history_response = GetDurableExecutionHistoryResponse( + events=[ + Event( + event_type="StepStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC + ), + operation_id="step-1", + name="parent-step", + ), + Event( + event_type="StepStarted", + event_timestamp=datetime.datetime( + 2023, 1, 1, 0, 0, 10, tzinfo=datetime.UTC + ), + operation_id="step-2", + name="child-step", + parent_id="step-1", + ), + ] + ) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert len(result.operations) == 1 + assert result.operations[0].name == "parent-step" + + +def test_durable_function_test_result_from_execution_history_failed(): + """Test from_execution_history with failed execution.""" + import datetime + + from aws_durable_execution_sdk_python.execution import InvocationStatus + from aws_durable_execution_sdk_python.lambda_service import ErrorObject + + from aws_durable_execution_sdk_python_testing.model import ( + GetDurableExecutionHistoryResponse, + GetDurableExecutionResponse, + ) + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionTestResult, + ) + + execution_response = GetDurableExecutionResponse( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + durable_execution_name="test-execution", + function_arn="arn:aws:lambda:us-east-1:123456789012:function:test", + status="FAILED", + start_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), + end_timestamp=datetime.datetime(2023, 1, 1, 0, 1, 0, tzinfo=datetime.UTC), + error=ErrorObject( + message="execution failed", type=None, data=None, stack_trace=None + ), + ) + + history_response = GetDurableExecutionHistoryResponse(events=[]) + + result = DurableFunctionTestResult.from_execution_history( + execution_response, history_response + ) + + assert result.status == InvocationStatus.FAILED + assert result.error.message == "execution failed" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_completion_timed_out_status(mock_boto3): + """Test DurableFunctionCloudTestRunner._wait_for_completion with TIMED_OUT status.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution.return_value = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + "DurableExecutionName": "test-execution", + "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:test", + "Status": "TIMED_OUT", + "StartTimestamp": "2023-01-01T00:00:00Z", + "EndTimestamp": "2023-01-01T00:01:00Z", + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + result = runner._wait_for_completion("test-arn", timeout=10) + + assert result.status == "TIMED_OUT" + + +@patch("aws_durable_execution_sdk_python_testing.runner.boto3") +def test_cloud_runner_wait_for_completion_aborted_status(mock_boto3): + """Test DurableFunctionCloudTestRunner._wait_for_completion with ABORTED status.""" + from aws_durable_execution_sdk_python_testing.runner import ( + DurableFunctionCloudTestRunner, + ) + + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + mock_client.get_durable_execution.return_value = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:test:execution:exec-1", + "DurableExecutionName": "test-execution", + "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:test", + "Status": "ABORTED", + "StartTimestamp": "2023-01-01T00:00:00Z", + "EndTimestamp": "2023-01-01T00:01:00Z", + } + + runner = DurableFunctionCloudTestRunner(function_name="test-function") + result = runner._wait_for_completion("test-arn", timeout=10) + + assert result.status == "ABORTED" diff --git a/tests/web/handlers_test.py b/tests/web/handlers_test.py index 228a4b0..d3217e5 100644 --- a/tests/web/handlers_test.py +++ b/tests/web/handlers_test.py @@ -870,7 +870,7 @@ def test_stop_durable_execution_handler_success(): handler = StopDurableExecutionHandler(executor) # Mock the executor response - mock_response = StopDurableExecutionResponse(end_timestamp="2023-01-01T00:01:00Z") + mock_response = StopDurableExecutionResponse(stop_timestamp="2023-01-01T00:01:00Z") executor.stop_execution.return_value = mock_response # Create request with proper stop data @@ -900,7 +900,7 @@ def test_stop_durable_execution_handler_success(): # Verify response assert response.status_code == 200 - assert response.body == {"EndTimestamp": "2023-01-01T00:01:00Z"} + assert response.body == {"StopTimestamp": "2023-01-01T00:01:00Z"} # Verify executor was called with correct parameters executor.stop_execution.assert_called_once()