Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,50 @@
"ExecutionTimeout": 300
},
"path": "./src/none_results/none_results.py"
},
{
"name": "Create Callback Failures Uncaught",
"description": "Demonstrates callback failure scenarios where the error propagates and is handled by framework",
"handler": "callback_failure.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_failure.py"
},
{
"name": "Create Callback Failures Caught Error",
"description": "Demonstrates callback failure scenarios where the error is caught in the code",
"handler": "callback_failure.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_failure.py"
},
{
"name": "Callback With Heartbeat Timeout",
"description": "Demonstrates callback timeout scenarios (heartbeat timeout and general timeout)",
"handler": "callback_with_timeout.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_with_timeout.py"
},
{
"name": "Callback With General Timeout",
"description": "Demonstrates callback timeout scenarios (heartbeat timeout and general timeout)",
"handler": "callback_with_timeout.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/callback/callback_with_timeout.py"
}
]
}
35 changes: 35 additions & 0 deletions examples/src/callback/callback_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Demonstrates callback failure scenarios where the error propagates and is handled by framework."""

from typing import Any

from aws_durable_execution_sdk_python.config import CallbackConfig, Duration
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating callback failure scenarios."""
should_catch_error = event.get("shouldCatchError", False)
callback_config = CallbackConfig(timeout=Duration.from_seconds(60))

if should_catch_error:
# Pattern where error is caught and returned in result
try:
callback = context.create_callback(
name="failing-operation",
config=callback_config,
)
return callback.result()
except Exception as error:
return {
"success": False,
"error": str(error),
}
else:
# Pattern where error propagates to framework (for basic failure case)
callback = context.create_callback(
name="failing-operation",
config=callback_config,
)
return callback.result()
16 changes: 10 additions & 6 deletions examples/src/callback/callback_with_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@


@durable_execution
def handler(_event: Any, context: DurableContext) -> str:
# Callback with custom timeout configuration
config = CallbackConfig(
timeout=Duration.from_seconds(60), heartbeat_timeout=Duration.from_seconds(30)
)
def handler(event: Any, context: DurableContext) -> str:
timeout_type = event.get("timeoutType", "general")
if timeout_type == "heartbeat":
config = CallbackConfig(
timeout=Duration.from_seconds(10),
heartbeat_timeout=Duration.from_seconds(1),
)
else:
config = CallbackConfig(timeout=Duration.from_seconds(1))

callback: Callback[str] = context.create_callback(
name="timeout_callback", config=config
)

return f"Callback created with 60s timeout: {callback.callback_id}"
return callback.result()
28 changes: 28 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,31 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackFailure:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_failure.handler
Description: Demonstrates callback failure scenarios where the error is caught
in the code
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
CallbackWithTimeout:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: callback_with_timeout.handler
Description: Demonstrates callback timeout scenarios (heartbeat timeout and
general timeout)
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
64 changes: 64 additions & 0 deletions examples/test/callback/test_callback_failure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Tests for create_callback_failures."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import ErrorObject

from src.callback import callback_failure


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_failure.handler,
lambda_function_name="Create Callback Failures Uncaught",
)
def test_handle_callback_operations_with_failure_uncaught(durable_runner):
"""Test handling callback operations with failure."""
test_payload = {"shouldCatchError": False}

with durable_runner:
execution_arn = durable_runner.run_async(input=test_payload, timeout=30)

callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)

durable_runner.send_callback_failure(
callback_id=callback_id,
error=ErrorObject.from_message("External API failure"),
)

result = durable_runner.wait_for_result(execution_arn=execution_arn)

assert result.status is InvocationStatus.FAILED

error = result.error
assert error is not None
assert "External API failure" in error.message
assert error.type == "CallbackError"
assert error.stack_trace is None


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_failure.handler,
lambda_function_name="Create Callback Failures Caught Error",
)
def test_handle_callback_operations_with_caught_error(durable_runner):
"""Test handling callback operations with caught error."""
test_payload = {"shouldCatchError": True}

with durable_runner:
execution_arn = durable_runner.run_async(input=test_payload, timeout=30)
callback_id = durable_runner.wait_for_callback(execution_arn=execution_arn)
durable_runner.send_callback_failure(
callback_id=callback_id,
error=ErrorObject.from_message("External API failure"),
)
result = durable_runner.wait_for_result(execution_arn=execution_arn)

assert result.status is InvocationStatus.SUCCEEDED

from test.conftest import deserialize_operation_payload

result_data = deserialize_operation_payload(result.result)
assert result_data["success"] is False
assert "External API failure" in result_data["error"]
46 changes: 46 additions & 0 deletions examples/test/callback/test_callback_with_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Tests for callback operation permutations."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.callback import callback_with_timeout


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_with_timeout.handler,
lambda_function_name="Callback With Heartbeat Timeout",
)
def test_callback_with_heartbeat_timeout(durable_runner):
"""Test callback with custom timeout configuration."""
test_payload = {"timeoutType": "heartbeat"}
with durable_runner:
result = durable_runner.run(input=test_payload, timeout=20)

assert result.status is InvocationStatus.FAILED
error = result.error
assert error is not None
assert error.message == "Callback timed out on heartbeat"
assert error.type == "CallbackError"
assert error.data is None
assert error.stack_trace is None


@pytest.mark.example
@pytest.mark.durable_execution(
handler=callback_with_timeout.handler,
lambda_function_name="Callback With General Timeout",
)
def test_callback_with_general_timeout(durable_runner):
"""Test callback with custom timeout configuration."""
test_payload = {"timeoutType": "general"}
with durable_runner:
result = durable_runner.run(input=test_payload, timeout=20)

assert result.status is InvocationStatus.FAILED
error = result.error
assert error is not None
assert error.message == "Callback timed out"
assert error.type == "CallbackError"
assert error.data is None
assert error.stack_trace is None
Loading