Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,21 @@
"ExecutionTimeout": 300
},
"path": "./src/comprehensive_operations/comprehensive_operations.py"
}
},
{
"name": "Create Callback Concurrency",
"description": "Demonstrates multiple concurrent createCallback operations using context.parallel",
"handler": "callback_concurrency.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_concurrency.py",
"loggingConfig": {
"ApplicationLogLevel": "DEBUG",
"LogFormat": "JSON"
}
}
]
}
51 changes: 51 additions & 0 deletions examples/src/callback/callback_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Demonstrates multiple concurrent createCallback operations using context.parallel."""

from typing import Any

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating multiple concurrent callback operations."""

callback_config = CallbackConfig(timeout=Duration.from_seconds(30))

def callback_branch_1(ctx: DurableContext) -> str:
"""First callback branch."""
callback = ctx.create_callback(
name="api-call-1",
config=callback_config,
)
return callback.result()

def callback_branch_2(ctx: DurableContext) -> str:
"""Second callback branch."""
callback = ctx.create_callback(
name="api-call-2",
config=callback_config,
)
return callback.result()

def callback_branch_3(ctx: DurableContext) -> str:
"""Third callback branch."""
callback = ctx.create_callback(
name="api-call-3",
config=callback_config,
)
return callback.result()

parallel_results = context.parallel(
functions=[callback_branch_1, callback_branch_2, callback_branch_3],
name="parallel_callbacks",
)

# Extract results from parallel execution
results = parallel_results.get_results()

return {
"results": results,
"allCompleted": True,
}
14 changes: 14 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -665,3 +665,17 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackConcurrency:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_concurrency.handler
Description: Demonstrates multiple concurrent createCallback operations using
context.parallel
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
83 changes: 83 additions & 0 deletions examples/test/callback/test_callback_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Tests for create_callback_concurrent."""

import json

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.callback import callback_concurrency
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_concurrency.handler,
lambda_function_name="Create Callback Concurrency",
)
def test_handle_multiple_concurrent_callback_operations(durable_runner):
"""Test handling multiple concurrent callback operations."""
with durable_runner:
# Start the execution (this will pause at the callbacks)
execution_arn = durable_runner.run_async(input=None, timeout=60)

callback_id_1 = durable_runner.wait_for_callback(
execution_arn=execution_arn, name="api-call-1"
)
callback_id_2 = durable_runner.wait_for_callback(
execution_arn=execution_arn, name="api-call-2"
)
callback_id_3 = durable_runner.wait_for_callback(
execution_arn=execution_arn, name="api-call-3"
)

callback_result_2 = json.dumps(
{
"id": 2,
"data": "second",
}
)
durable_runner.send_callback_success(
callback_id=callback_id_2, result=callback_result_2.encode()
)

callback_result_1 = json.dumps(
{
"id": 1,
"data": "first",
}
)
durable_runner.send_callback_success(
callback_id=callback_id_1, result=callback_result_1.encode()
)

callback_result_3 = json.dumps(
{
"id": 3,
"data": "third",
}
)
durable_runner.send_callback_success(
callback_id=callback_id_3, result=callback_result_3.encode()
)

result = durable_runner.wait_for_result(execution_arn=execution_arn)

assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)

assert result_data == {
"results": [callback_result_1, callback_result_2, callback_result_3],
"allCompleted": True,
}

# Verify all callback operations were tracked
operations = result.get_context("parallel_callbacks")

assert len(operations.child_operations) == 3

# Verify all operations are CALLBACK type
for op in operations.child_operations:
assert op.operation_type.value == "CONTEXT"
assert len(op.child_operations) == 1
assert op.child_operations[0].operation_type.value == "CALLBACK"
Loading