From 071bc9b30f20fc74ca5c7bb2a421dae5325ef786 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Tue, 25 Nov 2025 16:30:49 -0800 Subject: [PATCH] examples: add concurrency callback example --- examples/examples-catalog.json | 17 +++- examples/src/callback/callback_concurrency.py | 51 ++++++++++++ examples/template.yaml | 14 ++++ .../callback/test_callback_concurrency.py | 83 +++++++++++++++++++ 4 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 examples/src/callback/callback_concurrency.py create mode 100644 examples/test/callback/test_callback_concurrency.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index ae9a8de..cf3285d 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -444,6 +444,21 @@ "ExecutionTimeout": 300 }, "path": "./src/none_results/none_results.py" - } + }, + { + "name": "Create Callback Concurrency", + "description": "Demonstrates multiple concurrent createCallback operations using context.parallel", + "handler": "callback_concurrency.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/callback/callback_concurrency.py", + "loggingConfig": { + "ApplicationLogLevel": "DEBUG", + "LogFormat": "JSON" + } + } ] } diff --git a/examples/src/callback/callback_concurrency.py b/examples/src/callback/callback_concurrency.py new file mode 100644 index 0000000..173e808 --- /dev/null +++ b/examples/src/callback/callback_concurrency.py @@ -0,0 +1,51 @@ +"""Demonstrates multiple concurrent createCallback operations using context.parallel.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import CallbackConfig, Duration +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating multiple concurrent callback operations.""" + + callback_config = CallbackConfig(timeout=Duration.from_seconds(30)) + + def callback_branch_1(ctx: DurableContext) -> str: + """First callback branch.""" + callback = ctx.create_callback( + name="api-call-1", + config=callback_config, + ) + return callback.result() + + def callback_branch_2(ctx: DurableContext) -> str: + """Second callback branch.""" + callback = ctx.create_callback( + name="api-call-2", + config=callback_config, + ) + return callback.result() + + def callback_branch_3(ctx: DurableContext) -> str: + """Third callback branch.""" + callback = ctx.create_callback( + name="api-call-3", + config=callback_config, + ) + return callback.result() + + parallel_results = context.parallel( + functions=[callback_branch_1, callback_branch_2, callback_branch_3], + name="parallel_callbacks", + ) + + # Extract results from parallel execution + results = parallel_results.get_results() + + return { + "results": results, + "allCompleted": True, + } diff --git a/examples/template.yaml b/examples/template.yaml index 67d4c29..67fd5d8 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -557,3 +557,17 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + CallbackConcurrency: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: callback_concurrency.handler + Description: Demonstrates multiple concurrent createCallback operations using + context.parallel + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 diff --git a/examples/test/callback/test_callback_concurrency.py b/examples/test/callback/test_callback_concurrency.py new file mode 100644 index 0000000..e78b0f5 --- /dev/null +++ b/examples/test/callback/test_callback_concurrency.py @@ -0,0 +1,83 @@ +"""Tests for create_callback_concurrent.""" + +import json + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.callback import callback_concurrency +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=callback_concurrency.handler, + lambda_function_name="Create Callback Concurrency", +) +def test_handle_multiple_concurrent_callback_operations(durable_runner): + """Test handling multiple concurrent callback operations.""" + with durable_runner: + # Start the execution (this will pause at the callbacks) + execution_arn = durable_runner.run_async(input=None, timeout=60) + + callback_id_1 = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="api-call-1" + ) + callback_id_2 = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="api-call-2" + ) + callback_id_3 = durable_runner.wait_for_callback( + execution_arn=execution_arn, name="api-call-3" + ) + + callback_result_2 = json.dumps( + { + "id": 2, + "data": "second", + } + ) + durable_runner.send_callback_success( + callback_id=callback_id_2, result=callback_result_2.encode() + ) + + callback_result_1 = json.dumps( + { + "id": 1, + "data": "first", + } + ) + durable_runner.send_callback_success( + callback_id=callback_id_1, result=callback_result_1.encode() + ) + + callback_result_3 = json.dumps( + { + "id": 3, + "data": "third", + } + ) + durable_runner.send_callback_success( + callback_id=callback_id_3, result=callback_result_3.encode() + ) + + result = durable_runner.wait_for_result(execution_arn=execution_arn) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data == { + "results": [callback_result_1, callback_result_2, callback_result_3], + "allCompleted": True, + } + + # Verify all callback operations were tracked + operations = result.get_context("parallel_callbacks") + + assert len(operations.child_operations) == 3 + + # Verify all operations are CALLBACK type + for op in operations.child_operations: + assert op.operation_type.value == "CONTEXT" + assert len(op.child_operations) == 1 + assert op.child_operations[0].operation_type.value == "CALLBACK"