From e3b1032703f30b1adabeaf0fccc7f74847947597 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Fri, 14 Nov 2025 12:06:25 -0800 Subject: [PATCH 1/2] fix: add replay_children in checkpoint processor - Add replay_children in checkpoint processor - Add large scale map test case --- examples/examples-catalog.json | 11 ++++ examples/src/map/map_with_large_scale.py | 64 +++++++++++++++++++ .../run_in_child_context_large_data.py | 7 +- .../test/map/test_map_with_large_scale.py | 36 +++++++++++ .../checkpoint/processors/base.py | 8 ++- tests/checkpoint/processors/base_test.py | 18 ++++++ 6 files changed, 137 insertions(+), 7 deletions(-) create mode 100644 examples/src/map/map_with_large_scale.py create mode 100644 examples/test/map/test_map_with_large_scale.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index a8ad760..3129707 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -133,6 +133,17 @@ }, "path": "./src/map/map_operations.py" }, + { + "name": "Map Large Scale", + "description": "Processing collections using map-like durable operations in large scale", + "handler": "map_with_large_scale.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_large_scale.py" + }, { "name": "Block Example", "description": "Nested child contexts demonstrating block operations", diff --git a/examples/src/map/map_with_large_scale.py b/examples/src/map/map_with_large_scale.py new file mode 100644 index 0000000..9168516 --- /dev/null +++ b/examples/src/map/map_with_large_scale.py @@ -0,0 +1,64 @@ +"""Test map with 50 iterations, each returning 100KB data.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +def generate_large_string(size_in_kb: int) -> str: + """Generate a string of approximately the specified size in KB.""" + return "A" * 1024 * size_in_kb + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating large scale map with substantial data.""" + # Create array of 50 items (more manageable for testing) + items = list(range(1, 51)) # 1 to 50 + + config = MapConfig(max_concurrency=10) # Process 10 items concurrently + data = generate_large_string(100) + results = context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: { + "itemId": item, + "index": index, + "dataSize": len(data), + "data": data, + "processed": True, + } + ), + name="large-scale-map", + config=config, + ) + + context.wait(Duration.from_seconds(1), name="wait1") + + # Process results immediately after map operation + # Note: After wait operations, the BatchResult may be summarized + final_results = results.get_results() + total_data_size = sum(result["dataSize"] for result in final_results) + all_items_processed = all(result["processed"] for result in final_results) + + total_size_in_mb = round(total_data_size / (1024 * 1024)) + + summary = { + "itemsProcessed": results.success_count, + "totalDataSizeMB": total_size_in_mb, + "totalDataSizeBytes": total_data_size, + "maxConcurrency": 10, + "averageItemSize": round(total_data_size / results.success_count), + "allItemsProcessed": all_items_processed, + } + + context.wait(Duration.from_seconds(1), name="wait2") + + return { + "success": True, + "message": "Successfully processed 50 items with substantial data using map", + "summary": summary, + } diff --git a/examples/src/run_in_child_context/run_in_child_context_large_data.py b/examples/src/run_in_child_context/run_in_child_context_large_data.py index 5597667..f8b8133 100644 --- a/examples/src/run_in_child_context/run_in_child_context_large_data.py +++ b/examples/src/run_in_child_context/run_in_child_context_large_data.py @@ -12,12 +12,7 @@ def generate_large_string(size_in_kb: int) -> str: """Generate a string of approximately the specified size in KB.""" - target_size = size_in_kb * 1024 # Convert KB to bytes - base_string = "A" * 1000 # 1KB string - repetitions = target_size // 1000 - remainder = target_size % 1000 - - return base_string * repetitions + "A" * remainder + return "A" * 1024 * size_in_kb @durable_with_child_context diff --git a/examples/test/map/test_map_with_large_scale.py b/examples/test/map/test_map_with_large_scale.py new file mode 100644 index 0000000..be3fb7b --- /dev/null +++ b/examples/test/map/test_map_with_large_scale.py @@ -0,0 +1,36 @@ +"""Tests for map_large_scale.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.map import map_with_large_scale +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_large_scale.handler, + lambda_function_name="map large scale", +) +def test_handle_50_items_with_100kb_each_using_map(durable_runner): + """Test handling 50 items with 100KB each using map.""" + pass + with durable_runner: + result = durable_runner.run(input=None, timeout=60) + + result_data = deserialize_operation_payload(result.result) + + # Verify the execution succeeded + assert result.status is InvocationStatus.SUCCEEDED + assert result_data["success"] is True + + # Verify the expected number of items were processed (50 items) + assert result_data["summary"]["itemsProcessed"] == 50 + assert result_data["summary"]["allItemsProcessed"] is True + + # Verify data size expectations (~5MB total from 50 items × 100KB each) + assert result_data["summary"]["totalDataSizeMB"] > 4 # Should be ~5MB + assert result_data["summary"]["totalDataSizeMB"] < 6 + assert result_data["summary"]["totalDataSizeBytes"] > 5000000 # ~5MB + assert result_data["summary"]["averageItemSize"] > 100000 # ~100KB per item + assert result_data["summary"]["maxConcurrency"] == 10 diff --git a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py index 130a096..56933d5 100644 --- a/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py +++ b/src/aws_durable_execution_sdk_python_testing/checkpoint/processors/base.py @@ -77,7 +77,13 @@ def _create_execution_details( def _create_context_details(self, update: OperationUpdate) -> ContextDetails | None: """Create ContextDetails from OperationUpdate.""" return ( - ContextDetails(result=update.payload, error=update.error) + ContextDetails( + result=update.payload, + error=update.error, + replay_children=update.context_options.replay_children + if update.context_options + else False, + ) if update.operation_type == OperationType.CONTEXT else None ) diff --git a/tests/checkpoint/processors/base_test.py b/tests/checkpoint/processors/base_test.py index ee588c4..700fd96 100644 --- a/tests/checkpoint/processors/base_test.py +++ b/tests/checkpoint/processors/base_test.py @@ -20,6 +20,7 @@ StepDetails, WaitDetails, WaitOptions, + ContextOptions, ) from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import ( @@ -198,6 +199,23 @@ def test_create_step_details(): assert result.error == error +def test_create_context_details_with_replay_children(): + processor = MockProcessor() + update = OperationUpdate( + operation_id="test-id", + operation_type=OperationType.CONTEXT, + action=OperationAction.SUCCEED, + payload="test-payload", + context_options=ContextOptions(replay_children=True), + ) + + result = processor.create_context_details(update) + + assert isinstance(result, ContextDetails) + assert result.result == "test-payload" + assert result.replay_children == True + + def test_create_step_details_non_step_type(): processor = MockProcessor() update = OperationUpdate( From 107206af418b57b10cd36c5441133fe6eefeca57 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 17 Nov 2025 21:40:38 +0000 Subject: [PATCH 2/2] chore: update SAM template --- examples/template.yaml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/examples/template.yaml b/examples/template.yaml index 368d31e..4e1a7ed 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -182,6 +182,20 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + MapWithLargeScale: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_large_scale.handler + Description: Processing collections using map-like durable operations in large + scale + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 BlockExample: Type: AWS::Serverless::Function Properties: