Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/src/wait_for_callback/wait_for_callback_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Demonstrates waitForCallback timeout scenarios."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration
from aws_durable_execution_sdk_python.config import WaitForCallbackConfig


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating waitForCallback timeout."""

config = WaitForCallbackConfig(
timeout=Duration.from_seconds(1), heartbeat_timeout=Duration.from_seconds(2)
)

def submitter(_) -> None:
"""Submitter succeeds but callback never completes."""
return None

try:
result: str = context.wait_for_callback(
submitter,
config=config,
)
return {
"callbackResult": result,
"success": True,
}
except Exception as error:
return {
"success": False,
"error": str(error),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Tests for wait_for_callback_timeout."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.wait_for_callback import wait_for_callback_timeout
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=wait_for_callback_timeout.handler,
lambda_function_name="Wait For Callback Timeout",
)
def test_handle_wait_for_callback_timeout_scenarios(durable_runner):
"""Test waitForCallback timeout scenarios."""
test_payload = {"test": "timeout-scenario"}

with durable_runner:
execution_arn = durable_runner.run_async(input=test_payload, timeout=2)
# Don't send callback - let it timeout
result = durable_runner.wait_for_result(execution_arn=execution_arn)

# Handler catches the timeout error, so execution succeeds with error in result
assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)

assert result_data["success"] is False
assert isinstance(result_data["error"], str)
assert len(result_data["error"]) > 0
assert "Callback timed out: Callback.Timeout" == result_data["error"]
4 changes: 2 additions & 2 deletions src/aws_durable_execution_sdk_python_testing/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,9 +1146,9 @@ def _on_callback_timeout(self, execution_arn: str, callback_id: str) -> None:
f"Callback timed out: {CallbackTimeoutType.TIMEOUT.value}"
)
execution.complete_callback_timeout(callback_id, timeout_error)
execution.complete_fail(timeout_error)
self._store.update(execution)
logger.warning("[%s] Callback %s timed out", execution_arn, callback_id)
self._invoke_execution(callback_token.execution_arn)
except Exception:
logger.exception(
"[%s] Error processing callback timeout for %s",
Expand All @@ -1173,11 +1173,11 @@ def _on_callback_heartbeat_timeout(
f"Callback heartbeat timed out: {CallbackTimeoutType.HEARTBEAT.value}"
)
execution.complete_callback_timeout(callback_id, heartbeat_error)
execution.complete_fail(heartbeat_error)
self._store.update(execution)
logger.warning(
"[%s] Callback %s heartbeat timed out", execution_arn, callback_id
)
self._invoke_execution(callback_token.execution_arn)
except Exception:
logger.exception(
"[%s] Error processing callback heartbeat timeout for %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def from_bytes(
else:
# Use standard JSON deserialization
try:
body_dict = json.loads(body_bytes.decode("utf-8"))
if body_bytes == b"":
body_dict = {}
else:
body_dict = json.loads(body_bytes.decode("utf-8"))
logger.debug("Successfully deserialized request using standard JSON")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
msg = f"JSON deserialization failed: {e}"
Expand Down
2 changes: 0 additions & 2 deletions tests/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2578,7 +2578,6 @@ def test_callback_timeout_handlers(executor, mock_store):
mock_execution.complete_callback_timeout.assert_called()
timeout_error = mock_execution.complete_callback_timeout.call_args[0][1]
assert "Callback timed out" in str(timeout_error.message)
mock_execution.complete_fail.assert_called()

# Reset mocks for heartbeat test
mock_execution.reset_mock()
Expand All @@ -2590,7 +2589,6 @@ def test_callback_timeout_handlers(executor, mock_store):
mock_execution.complete_callback_timeout.assert_called()
heartbeat_error = mock_execution.complete_callback_timeout.call_args[0][1]
assert "Callback heartbeat timed out" in str(heartbeat_error.message)
mock_execution.complete_fail.assert_called()


def test_callback_timeout_completed_execution(executor, mock_store):
Expand Down
Loading