Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@
},
"path": "./src/map/map_operations.py"
},
{
"name": "Map Large Scale",
"description": "Processing collections using map-like durable operations in large scale",
"handler": "map_with_large_scale.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_large_scale.py"
},
{
"name": "Block Example",
"description": "Nested child contexts demonstrating block operations",
Expand Down
64 changes: 64 additions & 0 deletions examples/src/map/map_with_large_scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Test map with 50 iterations, each returning 100KB data."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.config import Duration


def generate_large_string(size_in_kb: int) -> str:
"""Generate a string of approximately the specified size in KB."""
return "A" * 1024 * size_in_kb


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating large scale map with substantial data."""
# Create array of 50 items (more manageable for testing)
items = list(range(1, 51)) # 1 to 50

config = MapConfig(max_concurrency=10) # Process 10 items concurrently
data = generate_large_string(100)
results = context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: {
"itemId": item,
"index": index,
"dataSize": len(data),
"data": data,
"processed": True,
}
),
name="large-scale-map",
config=config,
)

context.wait(Duration.from_seconds(1), name="wait1")

# Process results immediately after map operation
# Note: After wait operations, the BatchResult may be summarized
final_results = results.get_results()
total_data_size = sum(result["dataSize"] for result in final_results)
all_items_processed = all(result["processed"] for result in final_results)

total_size_in_mb = round(total_data_size / (1024 * 1024))

summary = {
"itemsProcessed": results.success_count,
"totalDataSizeMB": total_size_in_mb,
"totalDataSizeBytes": total_data_size,
"maxConcurrency": 10,
"averageItemSize": round(total_data_size / results.success_count),
"allItemsProcessed": all_items_processed,
}

context.wait(Duration.from_seconds(1), name="wait2")

return {
"success": True,
"message": "Successfully processed 50 items with substantial data using map",
"summary": summary,
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@

def generate_large_string(size_in_kb: int) -> str:
"""Generate a string of approximately the specified size in KB."""
target_size = size_in_kb * 1024 # Convert KB to bytes
base_string = "A" * 1000 # 1KB string
repetitions = target_size // 1000
remainder = target_size % 1000

return base_string * repetitions + "A" * remainder
return "A" * 1024 * size_in_kb


@durable_with_child_context
Expand Down
14 changes: 14 additions & 0 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ Resources:
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
MapWithLargeScale:
Type: AWS::Serverless::Function
Properties:
CodeUri: build/
Handler: map_with_large_scale.handler
Description: Processing collections using map-like durable operations in large
scale
Role:
Fn::GetAtt:
- DurableFunctionRole
- Arn
DurableConfig:
RetentionPeriodInDays: 7
ExecutionTimeout: 300
BlockExample:
Type: AWS::Serverless::Function
Properties:
Expand Down
36 changes: 36 additions & 0 deletions examples/test/map/test_map_with_large_scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for map_large_scale."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.map import map_with_large_scale
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=map_with_large_scale.handler,
lambda_function_name="map large scale",
)
def test_handle_50_items_with_100kb_each_using_map(durable_runner):
"""Test handling 50 items with 100KB each using map."""
pass
with durable_runner:
result = durable_runner.run(input=None, timeout=60)

result_data = deserialize_operation_payload(result.result)

# Verify the execution succeeded
assert result.status is InvocationStatus.SUCCEEDED
assert result_data["success"] is True

# Verify the expected number of items were processed (50 items)
assert result_data["summary"]["itemsProcessed"] == 50
assert result_data["summary"]["allItemsProcessed"] is True

# Verify data size expectations (~5MB total from 50 items × 100KB each)
assert result_data["summary"]["totalDataSizeMB"] > 4 # Should be ~5MB
assert result_data["summary"]["totalDataSizeMB"] < 6
assert result_data["summary"]["totalDataSizeBytes"] > 5000000 # ~5MB
assert result_data["summary"]["averageItemSize"] > 100000 # ~100KB per item
assert result_data["summary"]["maxConcurrency"] == 10
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ def _create_execution_details(
def _create_context_details(self, update: OperationUpdate) -> ContextDetails | None:
"""Create ContextDetails from OperationUpdate."""
return (
ContextDetails(result=update.payload, error=update.error)
ContextDetails(
result=update.payload,
error=update.error,
replay_children=update.context_options.replay_children
if update.context_options
else False,
)
if update.operation_type == OperationType.CONTEXT
else None
)
Expand Down
18 changes: 18 additions & 0 deletions tests/checkpoint/processors/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
StepDetails,
WaitDetails,
WaitOptions,
ContextOptions,
)

from aws_durable_execution_sdk_python_testing.checkpoint.processors.base import (
Expand Down Expand Up @@ -198,6 +199,23 @@ def test_create_step_details():
assert result.error == error


def test_create_context_details_with_replay_children():
processor = MockProcessor()
update = OperationUpdate(
operation_id="test-id",
operation_type=OperationType.CONTEXT,
action=OperationAction.SUCCEED,
payload="test-payload",
context_options=ContextOptions(replay_children=True),
)

result = processor.create_context_details(update)

assert isinstance(result, ContextDetails)
assert result.result == "test-payload"
assert result.replay_children == True


def test_create_step_details_non_step_type():
processor = MockProcessor()
update = OperationUpdate(
Expand Down
Loading