Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 11 additions & 62 deletions docs/core/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

**Concurrency control** - Limiting how many items process simultaneously using `max_concurrency` in `MapConfig`.

**Item batching** - Grouping multiple items together for processing as a single unit to optimize efficiency.

**Completion criteria** - Rules that determine when a map operation succeeds or fails based on individual item results.

[↑ Back to top](#table-of-contents)
Expand All @@ -42,7 +40,7 @@ Use map operations to:
- Transform collections with automatic checkpointing
- Process lists of items in parallel
- Handle large datasets with resilience
- Control concurrency and batching behavior
- Control concurrency behavior
- Define custom success/failure criteria

Map operations use `context.map()` to process collections efficiently. Each item becomes an independent operation that executes in parallel with other items.
Expand All @@ -55,7 +53,6 @@ Map operations use `context.map()` to process collections efficiently. Each item
- **Independent checkpointing** - Each item's result is saved separately
- **Partial completion** - Completed items don't reprocess on replay
- **Concurrency control** - Limit simultaneous processing with `max_concurrency`
- **Batching support** - Group items for efficient processing
- **Flexible completion** - Define custom success/failure criteria
- **Result ordering** - Results maintain the same order as inputs

Expand Down Expand Up @@ -102,7 +99,7 @@ If the function is interrupted after processing items 0-2, it resumes at item 3
```python
def map(
inputs: Sequence[U],
func: Callable[[DurableContext, U | BatchedInput[Any, U], int, Sequence[U]], T],
func: Callable[[DurableContext, U, int, Sequence[U]], T],
name: str | None = None,
config: MapConfig | None = None,
) -> BatchResult[T]
Expand All @@ -113,7 +110,7 @@ def map(
- `inputs` - A sequence of items to process (list, tuple, or any sequence type).
- `func` - A callable that processes each item. See [Map function signature](#map-function-signature) for details.
- `name` (optional) - A name for the map operation, useful for debugging and testing.
- `config` (optional) - A `MapConfig` object to configure concurrency, batching, and completion criteria.
- `config` (optional) - A `MapConfig` object to configure concurrency and completion criteria.

**Returns:** A `BatchResult[T]` containing the results from processing all items.

Expand All @@ -128,7 +125,7 @@ The map function receives four parameters:
```python
def process_item(
context: DurableContext,
item: U | BatchedInput[Any, U],
item: U,
index: int,
items: Sequence[U]
) -> T:
Expand All @@ -140,7 +137,7 @@ def process_item(
**Parameters:**

- `context` - A `DurableContext` for the item's processing. Use this to call steps, waits, or other operations.
- `item` - The current item being processed. Can be a `BatchedInput` if batching is configured.
- `item` - The current item being processed.
- `index` - The zero-based index of the item in the original collection.
- `items` - The full collection of items being processed.

Expand Down Expand Up @@ -186,7 +183,6 @@ from aws_durable_execution_sdk_python import (
from aws_durable_execution_sdk_python.config import (
MapConfig,
CompletionConfig,
ItemBatcher,
)

def process_item(context: DurableContext, item: int, index: int, items: list[int]) -> dict:
Expand All @@ -196,14 +192,13 @@ def process_item(context: DurableContext, item: int, index: int, items: list[int
@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
items = list(range(100))

# Configure map operation
config = MapConfig(
max_concurrency=10, # Process 10 items at a time
item_batcher=ItemBatcher(max_items_per_batch=5), # Batch 5 items together
completion_config=CompletionConfig.all_successful(), # Require all to succeed
)

result = context.map(items, process_item, name="process_numbers", config=config)
return result
```
Expand All @@ -212,8 +207,6 @@ def handler(event: dict, context: DurableContext) -> BatchResult[dict]:

**max_concurrency** - Maximum number of items to process concurrently. If `None`, all items process in parallel. Use this to control resource usage.

**item_batcher** - Configuration for batching items together. Use `ItemBatcher(max_items_per_batch=N)` to group items.

**completion_config** - Defines when the map operation succeeds or fails:
- `CompletionConfig()` - Default, allows any number of failures
- `CompletionConfig.all_successful()` - Requires all items to succeed
Expand Down Expand Up @@ -254,36 +247,6 @@ def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
return result
```

### Batching items

Group multiple items for efficient processing:

```python
from aws_durable_execution_sdk_python.config import MapConfig, ItemBatcher, BatchedInput

def process_batch(
context: DurableContext,
batch: BatchedInput[None, int],
index: int,
items: list[int]
) -> list[dict]:
"""Process a batch of items together."""
# Process all items in the batch together
return [{"item": item, "squared": item * item} for item in batch.items]

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[list[dict]]:
items = list(range(100))

# Process items in batches of 10
config = MapConfig(
item_batcher=ItemBatcher(max_items_per_batch=10)
)

result = context.map(items, process_batch, config=config)
return result
```

### Custom completion criteria

Define when the map operation should succeed or fail:
Expand Down Expand Up @@ -391,8 +354,6 @@ def handler(event: dict, context: DurableContext) -> list[str]:

**Control concurrency for external calls** - When calling external APIs, use `max_concurrency` to avoid rate limits.

**Batch for efficiency** - For small, fast operations, use `item_batcher` to reduce overhead.

**Define completion criteria** - Use `CompletionConfig` to specify when the operation should succeed or fail.

**Keep map functions focused** - Each map function should process one item. Don't mix collection iteration with item processing.
Expand All @@ -401,7 +362,7 @@ def handler(event: dict, context: DurableContext) -> list[str]:

**Handle errors gracefully** - Wrap error-prone code in try-except blocks or use completion criteria to tolerate failures.

**Consider collection size** - For very large collections (10,000+ items), consider batching or processing in chunks.
**Consider collection size** - For very large collections (10,000+ items), consider processing in chunks.

**Monitor memory usage** - Large collections create many checkpoints. Monitor Lambda memory usage.

Expand All @@ -415,23 +376,15 @@ def handler(event: dict, context: DurableContext) -> list[str]:

**Use max_concurrency wisely** - Too much concurrency can overwhelm external services or exhaust Lambda resources. Start conservative and increase as needed.

**Batch small operations** - If each item processes quickly (< 100ms), batching reduces overhead:

```python
config = MapConfig(
item_batcher=ItemBatcher(max_items_per_batch=10)
)
```

**Optimize map functions** - Keep map functions lightweight. Move heavy computation into steps within the map function.

**Use appropriate completion criteria** - Fail fast with `tolerated_failure_count` to avoid processing remaining items when many fail.

**Monitor checkpoint size** - Large result objects increase checkpoint size and Lambda memory usage. Return only necessary data.

**Consider memory limits** - Processing thousands of items creates many checkpoints. Monitor Lambda memory and adjust batch size or concurrency.
**Consider memory limits** - Processing thousands of items creates many checkpoints. Monitor Lambda memory and adjust concurrency.

**Profile your workload** - Test with representative data to find optimal concurrency and batch settings.
**Profile your workload** - Test with representative data to find optimal concurrency settings.

[↑ Back to top](#table-of-contents)

Expand All @@ -443,7 +396,7 @@ A: Map operations process a collection of similar items using the same function.

**Q: How many items can I process?**

A: There's no hard limit, but consider Lambda's memory and timeout constraints. For very large collections (10,000+ items), use batching or process in chunks.
A: There's no hard limit, but consider Lambda's memory and timeout constraints. For very large collections (10,000+ items), consider processing in chunks.

**Q: Do items process in order?**

Expand Down Expand Up @@ -471,10 +424,6 @@ for item_result in batch_result.results:

A: Yes, you can call `context.map()` inside a map function to process nested collections.

**Q: How does batching work?**

A: When you configure `item_batcher`, multiple items are grouped together and passed as a `BatchedInput` to your map function. Process all items in `batch.items`.

**Q: What's the difference between serdes and item_serdes?**

A: `item_serdes` serializes individual item results as they complete. `serdes` serializes the entire `BatchResult` at the end. Use both for custom serialization at different levels.
Expand Down