diff --git a/docs/utilities/batch-processing.md b/docs/utilities/batch-processing.md
index fde2df6f0..9b464723f 100644
--- a/docs/utilities/batch-processing.md
+++ b/docs/utilities/batch-processing.md
@@ -1,1544 +1,1544 @@
----
-title: Batch Processing
-description: Utility
----
-
-The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
-
-```mermaid
-stateDiagram-v2
- direction LR
- BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
- LambdaInit: Lambda invocation
- BatchProcessor: Batch Processor
- RecordHandler: Record Handler function
- YourLogic: Your logic to process each batch item
- LambdaResponse: Lambda response
-
- BatchSource --> LambdaInit
-
- LambdaInit --> BatchProcessor
- BatchProcessor --> RecordHandler
-
- state BatchProcessor {
- [*] --> RecordHandler: Your function
- RecordHandler --> YourLogic
- }
-
- RecordHandler --> BatchProcessor: Collect results
- BatchProcessor --> LambdaResponse: Report items that failed processing
-```
-
-## Key features
-
-- Reports batch item failures to reduce number of retries for a record upon errors
-- Simple interface to process each batch record
-- Typed batch processing with automatic deserialization
-- Lambda context injection for typed handlers
-- AOT (Ahead-of-Time) compilation support
-
-- Bring your own batch processor
-- Parallel processing
-
-## Background
-
-When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
-
-If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.
-
-```mermaid
-journey
- section Conditions
- Successful response: 5: Success
- Maximum retries: 3: Failure
- Records expired: 1: Failure
-```
-
-This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
-
-- [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
-- [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
-
-
-
-???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
- We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
-
- You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
-
-!!! warning "Migrating to v3"
-
- If you're upgrading to v3, please review the [Migration Guide v3](../migration-guide-v3.md) for important breaking changes including .NET 8 requirement and AWS SDK v4 migration.
-
-## Installation
-
-You should install with NuGet:
-
-```powershell
-Install-Package AWS.Lambda.Powertools.BatchProcessing
-```
-
-Or via the .NET Core command line interface:
-
-```bash
-dotnet add package AWS.Lambda.Powertools.BatchProcessing
-```
-
-## Getting started
-
-For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.
-
-You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
-
-Batch processing can be configured with the settings bellow:
-
-| Setting | Description | Environment variable | Default |
-| ------------------------------- | ----------------------------------------------------------------------------- | ---------------------------------------------- | ----------------- |
-| **Error Handling Policy** | The error handling policy to apply during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent` |
-| **Parallel Enabled** | Controls if parallel processing of batch items is enabled. | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false` |
-| **Max Degree of Parallelism** | The maximum degree of parallelism to apply if parallel processing is enabled. | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1` |
-| **Throw on Full Batch Failure** | Controls if a `BatchProcessingException` is thrown on full batch failure. | `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE` | `true` |
-
-### Required resources
-
-The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
-
-!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
-
-=== "SQS"
-
- ```yaml title="template.yaml" hl_lines="93-94"
- --8<-- "docs/snippets/batch/templates/sqs.yaml"
- ```
-
-=== "Kinesis Data Streams"
-
- ```yaml title="template.yaml" hl_lines="109-110"
- --8<-- "docs/snippets/batch/templates/kinesis.yaml"
- ```
-
-=== "DynamoDB Streams"
-
- ```yaml title="template.yaml" hl_lines="102-103"
- --8<-- "docs/snippets/batch/templates/dynamodb.yaml"
- ```
-
-### Processing messages from SQS
-
-#### Using Typed Handler decorator (Recommended)
-
-Processing batches from SQS using typed Lambda handler decorator with automatic deserialization works in four stages:
-
-1. Define your data model class
-2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
-3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedSqsBatchProcessor.Result.BatchItemFailuresResponse`**
-
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 8 19 29 32"
- public class Product
- {
- public int Id { get; set; }
- public string? Name { get; set; }
- public decimal Price { get; set; }
- }
-
- public class TypedSqsRecordHandler : ITypedRecordHandler // (1)!
- {
- public async Task HandleAsync(Product product, CancellationToken cancellationToken)
- {
- /*
- * Your business logic with automatic deserialization.
- * If an exception is thrown, the item will be marked as a partial batch item failure.
- */
-
- Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})");
-
- if (product.Id == 4) // (2)!
- {
- throw new ArgumentException("Error on id 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
-
- }
-
- [BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingTypedAttribute(SQSEvent _)
- {
- return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Product is automatically deserialized from SQS message body.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 4. **Step 4**. Lambda function returns the Partial batch response using TypedSqsBatchProcessor
-
-=== "Sample event"
-
- ```json
- {
- "Records": [
- {
- "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"id\": 1, \"name\": \"Laptop Computer\", \"price\": 999.99}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"id\": 4, \"name\": \"Invalid Product\", \"price\": -10.00}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "7b270e59b47ff90a553787216d55d92e",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- }
- ]
- }
-
- ```
-
-=== "Sample response"
-
- The second record failed to be processed, therefore the processor added its message ID in the response.
-
- ```json
- {
- "batchItemFailures": [
- {
- "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
- }
- ]
- }
- ```
-
-#### Using Handler decorator (Traditional)
-
-Processing batches from SQS using Lambda handler decorator works in three stages:
-
-1. Decorate your handler with **`BatchProcessor`** attribute
-2. Create a class that implements **`ISqsRecordHandler`** interface and the HandleAsync method.
-3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`SqsBatchProcessor.Result.BatchItemFailuresResponse`**
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 12 22 17 25"
- public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
- {
- public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
- {
- /*
- * Your business logic.
- * If an exception is thrown, the item will be marked as a partial batch item failure.
- */
-
- var product = JsonSerializer.Deserialize(record.Body);
-
- if (product.Id == 4) // (2)!
- {
- throw new ArgumentException("Error on id 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
-
- }
-
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
- {
- return SqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements ISqsRecordHandler interface and the HandleAsync method.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 3. **Step 4**. Lambda function returns the Partial batch response
-
-=== "Sample event"
-
- ```json
- {
- "Records": [
- {
- "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "fail",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- ]
- }
-
- ```
-
-=== "Sample response"
-
- The second record failed to be processed, therefore the processor added its message ID in the response.
-
- ```json
- {
- "batchItemFailures": [
- {
- "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
- },
- {
- "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
- }
- ]
- }
- ```
-
-#### FIFO queues
-
-When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
-This helps preserve the ordering of messages in your queue. Powertools automatically detects a FIFO queue.
-
-
-
-
-
-### Processing messages from Kinesis
-
-#### Using Typed Handler decorator (Recommended)
-
-Processing batches from Kinesis using typed Lambda handler decorator with automatic deserialization works in four stages:
-
-1. Define your data model class
-2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
-3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse`**
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 9 15 20 24 27"
- public class Order
- {
- public string? OrderId { get; set; }
- public DateTime OrderDate { get; set; }
- public List Items { get; set; } = new();
- public decimal TotalAmount { get; set; }
- }
-
- internal class TypedKinesisRecordHandler : ITypedRecordHandler // (1)!
- {
- public async Task HandleAsync(Order order, CancellationToken cancellationToken)
- {
- Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");
-
- if (order.TotalAmount <= 0) // (2)!
- {
- throw new ArgumentException("Invalid order total");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
- }
-
- [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
- {
- return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Order is automatically deserialized from Kinesis record data.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 4. **Step 4**. Lambda function returns the Partial batch response using TypedKinesisEventBatchProcessor
-
-#### Using Handler decorator (Traditional)
-
-Processing batches from Kinesis using Lambda handler decorator works in three stages:
-
-1. Decorate your handler with **`BatchProcessor`** attribute
-2. Create a class that implements **`IKinesisEventRecordHandler`** interface and the HandleAsync method.
-3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`KinesisEventBatchProcessor.Result.BatchItemFailuresResponse`**
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 7 12 17 20"
- internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)!
- {
- public async Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken)
- {
- var product = JsonSerializer.Deserialize(record.Kinesis.Data);
-
- if (product.Id == 4) // (2)!
- {
- throw new ArgumentException("Error on id 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
- }
-
-
- [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _)
- {
- return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements the IKinesisEventRecordHandler interface and the HandleAsync method.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 3. **Step 4**. Lambda function returns the Partial batch response
-
-=== "Sample event"
-
- ```json
- {
- "Records": [
- {
- "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "fail",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- ]
- }
-
- ```
-
-=== "Sample response"
-
- The second record failed to be processed, therefore the processor added its message ID in the response.
-
- ```json
- {
- "batchItemFailures": [
- {
- "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
- },
- {
- "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
- }
- ]
- }
- ```
-
-### Processing messages from DynamoDB
-
-#### Using Typed Handler decorator (Recommended)
-
-Processing batches from DynamoDB Streams using typed Lambda handler decorator with automatic deserialization works in four stages:
-
-1. Define your data model class
-2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
-3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse`**
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 9 15 20 24 27"
- public class Customer
- {
- public string? CustomerId { get; set; }
- public string? Name { get; set; }
- public string? Email { get; set; }
- public DateTime CreatedAt { get; set; }
- }
-
- internal class TypedDynamoDbRecordHandler : ITypedRecordHandler // (1)!
- {
- public async Task HandleAsync(Customer customer, CancellationToken cancellationToken)
- {
- Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");
-
- if (string.IsNullOrEmpty(customer.Email)) // (2)!
- {
- throw new ArgumentException("Customer email is required");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
- }
-
- [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
- {
- return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Customer is automatically deserialized from DynamoDB stream record.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 4. **Step 4**. Lambda function returns the Partial batch response using TypedDynamoDbStreamBatchProcessor
-
-#### Using Handler decorator (Traditional)
-
-Processing batches from DynamoDB Streams using Lambda handler decorator works in three stages:
-
-1. Decorate your handler with **`BatchProcessor`** attribute
-2. Create a class that implements **`IDynamoDbStreamRecordHandler`** and the HandleAsync method.
-3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
-4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse`**
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 7 12 17 20"
- internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)!
- {
- public async Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
- {
- var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S);
-
- if (product.Id == 4) // (2)!
- {
- throw new ArgumentException("Error on id 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
- }
-
-
- [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
- {
- return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
- }
-
- ```
-
- 1. **Step 1**. Creates a class that implements the IDynamoDbStreamRecordHandler and the HandleAsync method.
- 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
- 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
- 3. **Step 4**. Lambda function returns the Partial batch response
-
-=== "Sample event"
-
- ```json
- {
- "Records": [
- {
- "eventID": "1",
- "eventVersion": "1.0",
- "dynamodb": {
- "Keys": {
- "Id": {
- "N": "101"
- }
- },
- "NewImage": {
- "Product": {
- "S": "{\"Id\":1,\"Name\":\"product-name\",\"Price\":14}"
- }
- },
- "StreamViewType": "NEW_AND_OLD_IMAGES",
- "SequenceNumber": "3275880929",
- "SizeBytes": 26
- },
- "awsRegion": "us-west-2",
- "eventName": "INSERT",
- "eventSourceARN": "eventsource_arn",
- "eventSource": "aws:dynamodb"
- },
- {
- "eventID": "1",
- "eventVersion": "1.0",
- "dynamodb": {
- "Keys": {
- "Id": {
- "N": "101"
- }
- },
- "NewImage": {
- "Product": {
- "S": "fail"
- }
- },
- "StreamViewType": "NEW_AND_OLD_IMAGES",
- "SequenceNumber": "8640712661",
- "SizeBytes": 26
- },
- "awsRegion": "us-west-2",
- "eventName": "INSERT",
- "eventSourceARN": "eventsource_arn",
- "eventSource": "aws:dynamodb"
- }
- ]
- }
-
- ```
-
-=== "Sample response"
-
- The second record failed to be processed, therefore the processor added its message ID in the response.
-
- ```json
- {
- "batchItemFailures": [
- {
- "itemIdentifier": "8640712661"
- }
- ]
- }
- ```
-
-### Error handling
-
-By default, we catch any exception raised by your custom record handler HandleAsync method (ISqsRecordHandler, IKinesisEventRecordHandler, IDynamoDbStreamRecordHandler).
-This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
-
-=== "Function.cs"
-
- ```csharp hl_lines="14"
- public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
- {
- public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
- {
- /*
- * Your business logic.
- * If an exception is thrown, the item will be marked as a partial batch item failure.
- */
-
- var product = JsonSerializer.Deserialize(record.Body);
-
- if (product.Id == 4) // (2)!
- {
- throw new ArgumentException("Error on id 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None); // (3)!
- }
-
- }
-
- ```
-
-=== "Sample event"
-
- ```json
- {
- "Records": [
- {
- "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "fail",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- {
- "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
- "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
- "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
- "attributes": {
- "ApproximateReceiveCount": "1",
- "SentTimestamp": "1545082649183",
- "SenderId": "SENDER_ID",
- "ApproximateFirstReceiveTimestamp": "1545082649185"
- },
- "messageAttributes": {},
- "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
- "eventSource": "aws:sqs",
- "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
- "awsRegion": "us-east-1"
- },
- ]
- }
-
- ```
-
-=== "Sample response"
-
- The second record failed to be processed, therefore the processor added its message ID in the response.
-
- ```json
- {
- "batchItemFailures": [
- {
- "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
- },
- {
- "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
- }
- ]
- }
- ```
-
-#### Error Handling Policy
-
-You can specify the error handling policy applied during batch processing.
-
-`ErrorHandlingPolicy` is used to control the error handling policy of the batch item processing.
-With a value of `DeriveFromEvent` (default), the specific BatchProcessor, determines the policy based on the incoming event.
-
-For example, the `SqsBatchProcessor` looks at the EventSourceArn to determine if the ErrorHandlingPolicy should be `StopOnFirstBatchItemFailure` (for FIFO queues) or `ContinueOnBatchItemFailure` (for standard queues).
-For `StopOnFirstBatchItemFailure` the batch processor stops processing and marks any remaining records as batch item failures.
-For `ContinueOnBatchItemFailure` the batch processor continues processing batch items regardless of item failures.
-
-| Policy | Description |
-| ------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
-| **DeriveFromEvent** | Auto-derive the policy based on the event. |
-| **ContinueOnBatchItemFailure** | Continue processing regardless of whether other batch items fails during processing. |
-| **StopOnFirstBatchItemFailure** | Stop processing other batch items after the first batch item has failed processing. This is useful to preserve ordered processing of events. |
-
-!!! note
-
- When using **StopOnFirstBatchItemFailure** and parallel processing is enabled, all batch items already scheduled to be processed, will be allowed to complete before the batch processing stops.
-
- Therefore, if order is important, it is recommended to use sequential (non-parallel) processing together with this value."
-
-To change the default error handling policy, you can set the **`POWERTOOLS_BATCH_ERROR_HANDLING_POLICY`** Environment Variable.
-
-Another approach is to decorate the handler and use one of the policies in the **`ErrorHandlingPolicy`** Enum property of the **`BatchProcessor`** attribute
-
-=== "Function.cs"
-
- ```csharp hl_lines="2"
- [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler),
- ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
- public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
- {
- return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
- }
-
- ```
-
-### Partial failure mechanics
-
-All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
-
-- **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`.
-- **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing.
-- **All records failed to be processed**. By defaullt, we will throw a `BatchProcessingException` with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See [Working with full batch failures](#working-with-full-batch-failures) for more information.
-
-The following sequence diagrams explain how each Batch processor behaves under different scenarios.
-
-#### SQS Standard
-
-> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
-
-Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
-
-
-
-#### SQS FIFO
-
-> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
-
-Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
-
-
-```mermaid
-sequenceDiagram
- autonumber
- participant SQS queue
- participant Lambda service
- participant Lambda function
- Lambda service->>SQS queue: Poll
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3rd batch item
- Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
- deactivate Lambda function
- activate SQS queue
- Lambda service->>SQS queue: Delete successful messages (1-2)
- SQS queue-->>SQS queue: Failed messages return (3-10)
- deactivate SQS queue
-```
-SQS FIFO mechanism with Batch Item Failures
-
-
-#### Kinesis and DynamoDB Streams
-
-> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
-
-Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
-
-For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}
-
-
-```mermaid
-sequenceDiagram
- autonumber
- participant Streams
- participant Lambda service
- participant Lambda function
- Lambda service->>Streams: Poll latest records
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3rd batch item
- Lambda function-->Lambda function: Continue processing batch items (4-10)
- Lambda function->>Lambda service: Report batch item as failure (3)
- deactivate Lambda function
- activate Streams
- Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
- Lambda service->>Streams: Poll records starting from updated checkpoint
- deactivate Streams
-```
-Kinesis and DynamoDB streams mechanism with single batch item failure
-
-
-The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported.
-
-!!! important "Note that the batch item sequence number could be different from batch item number in the illustration."
-
-
-```mermaid
-sequenceDiagram
- autonumber
- participant Streams
- participant Lambda service
- participant Lambda function
- Lambda service->>Streams: Poll latest records
- Lambda service->>Lambda function: Invoke (batch event)
- activate Lambda function
- Lambda function-->Lambda function: Process 2 out of 10 batch items
- Lambda function--xLambda function: Fail on 3-5 batch items
- Lambda function-->Lambda function: Continue processing batch items (6-10)
- Lambda function->>Lambda service: Report batch items as failure (3-5)
- deactivate Lambda function
- activate Streams
- Lambda service->>Streams: Checkpoints to lowest sequence number
- Lambda service->>Streams: Poll records starting from updated checkpoint
- deactivate Streams
-```
-Kinesis and DynamoDB streams mechanism with multiple batch item failures
-
-
-## Typed Batch Processing Advanced Features
-
-### AOT (Ahead-of-Time) Compilation Support
-
-For Native AOT scenarios, you can configure JsonSerializerContext:
-
-=== "JsonSerializerContext Configuration"
-
- ```csharp
- [JsonSerializable(typeof(Product))]
- [JsonSerializable(typeof(Order))]
- [JsonSerializable(typeof(Customer))]
- [JsonSerializable(typeof(List))]
- [JsonSourceGenerationOptions(
- PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
- WriteIndented = false,
- DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
- public partial class MyJsonSerializerContext : JsonSerializerContext
- {
- }
- ```
-
-=== "Using with Attribute"
-
- ```csharp hl_lines="2 3"
- [BatchProcessor(
- TypedRecordHandler = typeof(TypedSqsRecordHandler),
- JsonSerializerContext = typeof(MyJsonSerializerContext))]
- public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent)
- {
- return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-
-
-### Lambda Context Injection
-
-For typed handlers that need access to Lambda context, use `ITypedRecordHandlerWithContext`:
-
-=== "Handler with Context"
-
- ```csharp hl_lines="1 3"
- public class ProductHandlerWithContext : ITypedRecordHandlerWithContext
- {
- public async Task HandleAsync(Product product, ILambdaContext context, CancellationToken cancellationToken)
- {
- Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}");
- Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s");
-
- // Use context for timeout handling
- if (context.RemainingTime.TotalSeconds < 5)
- {
- Logger.LogWarning("Low remaining time, processing quickly");
- }
-
- return RecordHandlerResult.None;
- }
- }
- ```
-
-=== "Function Usage"
-
- ```csharp hl_lines="1 2"
- [BatchProcessor(TypedRecordHandler = typeof(ProductHandlerWithContext))]
- public BatchItemFailuresResponse ProcessWithContext(SQSEvent sqsEvent, ILambdaContext context)
- {
- return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-### Migration from Traditional to Typed Handlers
-
-You can gradually migrate from traditional to typed handlers:
-
-=== "Before (Traditional)"
-
- ```csharp hl_lines="1 6"
- public class TraditionalSqsHandler : ISqsRecordHandler
- {
- public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
- {
- // Manual deserialization
- var product = JsonSerializer.Deserialize(record.Body);
-
- Logger.LogInformation($"Processing product {product.Id}");
-
- if (product.Price < 0)
- throw new ArgumentException("Invalid price");
-
- return RecordHandlerResult.None;
- }
- }
-
- [BatchProcessor(RecordHandler = typeof(TraditionalSqsHandler))]
- public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent)
- {
- return SqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-=== "After (Typed)"
-
- ```csharp hl_lines="1 5"
- public class TypedSqsHandler : ITypedRecordHandler
- {
- public async Task HandleAsync(Product product, CancellationToken cancellationToken)
- {
- // Automatic deserialization - product is already deserialized!
- Logger.LogInformation($"Processing product {product.Id}");
-
- // Same business logic
- if (product.Price < 0)
- throw new ArgumentException("Invalid price");
-
- return RecordHandlerResult.None;
- }
- }
-
- [BatchProcessor(TypedRecordHandler = typeof(TypedSqsHandler))]
- public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent)
- {
- return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-### Error Handling with Typed Processors
-
-Typed processors support the same error handling policies as traditional processors:
-
-=== "Custom Error Handling"
-
- ```csharp hl_lines="2"
- [BatchProcessor(
- TypedRecordHandler = typeof(TypedSqsHandler),
- ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
- public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent)
- {
- return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-### Advanced
-
-#### Using utility outside handler and IoC
-
-You can use Batch processing without using the decorator.
-
-Calling the **`ProcessAsync`** method on the Instance of the static BatchProcessor (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`)
-
-=== "Function.cs"
-
- ```csharp hl_lines="3"
- public async Task HandlerUsingUtility(DynamoDBEvent dynamoDbEvent)
- {
- var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler.From(record =>
- {
- var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S);
-
- if (product.GetProperty("Id").GetInt16() == 4)
- {
- throw new ArgumentException("Error on 4");
- }
- }));
- return result.BatchItemFailuresResponse;
- }
-
- ```
-
-To make the handler testable you can use Dependency Injection to resolve the BatchProcessor (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`) instance and then call the **`ProcessAsync`** method.
-
-=== "GetRequiredService inside the method"
-
- ```csharp hl_lines="3 4 5"
- public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent)
- {
- var batchProcessor = Services.Provider.GetRequiredService();
- var recordHandler = Services.Provider.GetRequiredService();
- var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
- return result.BatchItemFailuresResponse;
- }
-
- ```
-
-=== "Injecting method parameters"
-
- ```csharp hl_lines="2 4"
- public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent,
- IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler)
- {
- var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
- return result.BatchItemFailuresResponse;
- }
-
- ```
-
-=== "Example implementation of IServiceProvider"
-
- ```csharp hl_lines="16 17"
- internal class Services
- {
- private static readonly Lazy LazyInstance = new(Build);
-
- private static ServiceCollection _services;
- public static IServiceProvider Provider => LazyInstance.Value;
-
- public static IServiceProvider Init()
- {
- return LazyInstance.Value;
- }
-
- private static IServiceProvider Build()
- {
- _services = new ServiceCollection();
- _services.AddScoped();
- _services.AddScoped();
- return _services.BuildServiceProvider();
- }
- }
-
- ```
-
-#### Processing messages in parallel
-
-You can set the `POWERTOOLS_BATCH_PARALLEL_ENABLED` Environment Variable to `true` or set the property `BatchParallelProcessingEnabled` on the Lambda decorator to process messages concurrently.
-
-You can also set `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` Environment Variable to the number of parallelism you which.
-
-!!! note
-
- MaxDegreeOfParallelism is used to control the parallelism of the batch item processing.
-
- With a value of 1, the processing is done sequentially (default). Sequential processing is recommended when preserving order is important - i.e. with SQS FIFIO queues.
-
- With a value > 1, the processing is done in parallel. Doing parallel processing can enable processing to complete faster, i.e., when processing does downstream service calls.
-
- With a value of -1, the parallelism is automatically configured to be the vCPU count of the Lambda function. Internally, the Batch Processing Utility utilizes Parallel.ForEachAsync Method and the ParallelOptions.MaxDegreeOfParallelism Property to enable this functionality.
-
-???+ question "When is this useful?"
- Your use case might be able to process multiple records at the same time without conflicting with one another.
-
- For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
-
- The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
-
-=== "Function.cs"
-
- ```csharp hl_lines="1"
- [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )]
- public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
- {
- return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-#### Working with full batch failures
-
-By default, the `BatchProcessor` will throw a `BatchProcessingException` if all records in the batch fail to process. We do this to reflect the failure in your operational metrics.
-
-When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
-
-For these scenarios, you can set `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false`, or the equivalent on either the `BatchProcessor` decorator or on the `ProcessingOptions` object. See examples below.
-
-=== "Setting ThrowOnFullBatchFailure on Decorator"
-
- ```csharp hl_lines="3"
- [BatchProcessor(
- RecordHandler = typeof(CustomSqsRecordHandler),
- ThrowOnFullBatchFailure = false)]
- public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
- {
- return SqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
-
- ```
-
-=== "Setting ThrowOnFullBatchFailure outside Decorator"
-
- ```csharp hl_lines="8"
- public async Task HandlerUsingUtility(SQSEvent sqsEvent)
- {
- var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x =>
- {
- // Inline handling of SQS message...
- }), new ProcessingOptions
- {
- ThrowOnFullBatchFailure = false
- });
- return result.BatchItemFailuresResponse;
- }
-
- ```
-
-#### Extending BatchProcessor
-
-You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
-
-For these scenarios, you can create a class that inherits from `BatchProcessor` (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`) and quickly override `ProcessAsync` and `HandleRecordFailureAsync` methods:
-
-- **`ProcessAsync()`** – Keeps track of successful batch records
-- **`HandleRecordFailureAsync()`** – Keeps track of failed batch records
-
-???+ example
- Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing. And also override the default error handling policy to stop on first item failure.
-
-=== "Function.cs"
-
- ```csharp hl_lines="1 21 54 97"
-
- public class CustomDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor
- {
- public override async Task> ProcessAsync(DynamoDBEvent @event,
- IRecordHandler recordHandler, ProcessingOptions processingOptions)
- {
- ProcessingResult = new ProcessingResult();
-
- // Prepare batch records (order is preserved)
- var batchRecords = GetRecordsFromEvent(@event).Select(x => new KeyValuePair(GetRecordId(x), x))
- .ToArray();
-
- // We assume all records fail by default to avoid loss of data
- var failureBatchRecords = batchRecords.Select(x => new KeyValuePair>(x.Key,
- new RecordFailure
- {
- Exception = new UnprocessedRecordException($"Record: '{x.Key}' has not been processed."),
- Record = x.Value
- }));
-
- // Override to fail on first failure
- var errorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure;
-
- var successRecords = new Dictionary>();
- var failureRecords = new Dictionary>(failureBatchRecords);
-
- try
- {
- foreach (var pair in batchRecords)
- {
- var (recordId, record) = pair;
-
- try
- {
- var result = await HandleRecordAsync(record, recordHandler, CancellationToken.None);
- failureRecords.Remove(recordId, out _);
- successRecords.TryAdd(recordId, new RecordSuccess
- {
- Record = record,
- RecordId = recordId,
- HandlerResult = result
- });
- }
- catch (Exception ex)
- {
- // Capture exception
- failureRecords[recordId] = new RecordFailure
- {
- Exception = new RecordProcessingException(
- $"Failed processing record: '{recordId}'. See inner exception for details.", ex),
- Record = record,
- RecordId = recordId
- };
-
- Metrics.AddMetric("BatchRecordFailures", 1, MetricUnit.Count);
-
- try
- {
- // Invoke hook
- await HandleRecordFailureAsync(record, ex);
- }
- catch
- {
- // NOOP
- }
-
- // Check if we should stop record processing on first error
- // ReSharper disable once ConditionIsAlwaysTrueOrFalse
- if (errorHandlingPolicy == BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)
- {
- // This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internally
- throw new CircuitBreakerException(
- "Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.",
- ex);
- }
- }
- }
- }
- catch (Exception ex) when (ex is CircuitBreakerException or OperationCanceledException)
- {
- // NOOP
- }
-
- ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x => x.Value));
- ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x =>
- new BatchItemFailuresResponse.BatchItemFailure
- {
- ItemIdentifier = x.Key
- }));
- ProcessingResult.FailureRecords.AddRange(failureRecords.Values);
-
- ProcessingResult.SuccessRecords.AddRange(successRecords.Values);
-
- return ProcessingResult;
- }
-
- // ReSharper disable once RedundantOverriddenMember
- protected override async Task HandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecord record, Exception exception)
- {
- await base.HandleRecordFailureAsync(record, exception);
- }
- }
- ```
-
-## Testing your code
-
-### Testing Typed Handlers
-
-Testing typed batch processors is straightforward since you work directly with your data models:
-
-=== "Typed Handler Test"
-
- ```csharp
- [Fact]
- public async Task TypedHandler_ValidProduct_ProcessesSuccessfully()
- {
- // Arrange
- var product = new Product { Id = 1, Name = "Test Product", Price = 10.99m };
- var handler = new TypedSqsRecordHandler();
- var cancellationToken = CancellationToken.None;
-
- // Act
- var result = await handler.HandleAsync(product, cancellationToken);
-
- // Assert
- Assert.Equal(RecordHandlerResult.None, result);
- }
-
- [Fact]
- public async Task TypedHandler_InvalidProduct_ThrowsException()
- {
- // Arrange
- var product = new Product { Id = 4, Name = "Invalid", Price = -10 };
- var handler = new TypedSqsRecordHandler();
-
- // Act & Assert
- await Assert.ThrowsAsync(() =>
- handler.HandleAsync(product, CancellationToken.None));
- }
- ```
-
-=== "Integration Test"
-
- ```csharp
- [Fact]
- public async Task ProcessSqsEvent_WithTypedHandler_ProcessesAllRecords()
- {
- // Arrange
- var sqsEvent = new SQSEvent
- {
- Records = new List
- {
- new() {
- MessageId = "1",
- Body = JsonSerializer.Serialize(new Product { Id = 1, Name = "Product 1", Price = 10 }),
- EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"
- },
- new() {
- MessageId = "2",
- Body = JsonSerializer.Serialize(new Product { Id = 2, Name = "Product 2", Price = 20 }),
- EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"
- }
- }
- };
-
- var function = new TypedFunction();
-
- // Act
- var result = function.HandlerUsingTypedAttribute(sqsEvent);
-
- // Assert
- Assert.Empty(result.BatchItemFailures);
- }
- ```
-
-### Testing Traditional Handlers
-
-As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
-
-=== "Test.cs"
-
- ```csharp
- [Fact]
- public Task Sqs_Handler_Using_Attribute()
- {
- var request = new SQSEvent
- {
- Records = TestHelper.SqsMessages
- };
-
- var function = new HandlerFunction();
-
- var response = function.HandlerUsingAttribute(request);
-
- Assert.Equal(2, response.BatchItemFailures.Count);
- Assert.Equal("2", response.BatchItemFailures[0].ItemIdentifier);
- Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier);
-
- return Task.CompletedTask;
- }
- ```
-
-=== "Function.cs"
-
- ```csharp
- [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
- public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
- {
- return SqsBatchProcessor.Result.BatchItemFailuresResponse;
- }
- ```
-
-=== "CustomSqsRecordHandler.cs"
-
- ```csharp
- public class CustomSqsRecordHandler : ISqsRecordHandler
- {
- public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
- {
- var product = JsonSerializer.Deserialize(record.Body);
-
- if (product.GetProperty("Id").GetInt16() == 4)
- {
- throw new ArgumentException("Error on 4");
- }
-
- return await Task.FromResult(RecordHandlerResult.None);
- }
- }
- ```
-
-=== "SQS Event.cs"
-
- ```csharp
- internal static List SqsMessages => new()
- {
- new SQSEvent.SQSMessage
- {
- MessageId = "1",
- Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
- EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
- },
- new SQSEvent.SQSMessage
- {
- MessageId = "2",
- Body = "fail",
- EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
- },
- new SQSEvent.SQSMessage
- {
- MessageId = "3",
- Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}",
- EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
- },
- new SQSEvent.SQSMessage
- {
- MessageId = "4",
- Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
- EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
- },
- new SQSEvent.SQSMessage
- {
- MessageId = "5",
- Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}",
- EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
- },
- };
- ```
-
-## Complete Examples and Documentation
-
-The [BatchProcessing example](https://github.com/aws-powertools/powertools-lambda-dotnet/tree/develop/examples/BatchProcessing){target="\_blank"} contains complete working examples:
-
-- **TypedFunction.cs** - Complete examples using all typed batch processing patterns
-- **TypedHandlers/** - Example implementations for SQS, Kinesis, and DynamoDB
+---
+title: Batch Processing
+description: Utility
+---
+
+The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
+
+```mermaid
+stateDiagram-v2
+ direction LR
+ BatchSource: Amazon SQS
Amazon Kinesis Data Streams
Amazon DynamoDB Streams
+ LambdaInit: Lambda invocation
+ BatchProcessor: Batch Processor
+ RecordHandler: Record Handler function
+ YourLogic: Your logic to process each batch item
+ LambdaResponse: Lambda response
+
+ BatchSource --> LambdaInit
+
+ LambdaInit --> BatchProcessor
+ BatchProcessor --> RecordHandler
+
+ state BatchProcessor {
+ [*] --> RecordHandler: Your function
+ RecordHandler --> YourLogic
+ }
+
+ RecordHandler --> BatchProcessor: Collect results
+ BatchProcessor --> LambdaResponse: Report items that failed processing
+```
+
+## Key features
+
+- Reports batch item failures to reduce number of retries for a record upon errors
+- Simple interface to process each batch record
+- Typed batch processing with automatic deserialization
+- Lambda context injection for typed handlers
+- AOT (Ahead-of-Time) compilation support
+
+- Bring your own batch processor
+- Parallel processing
+
+## Background
+
+When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
+
+If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire.
+
+```mermaid
+journey
+ section Conditions
+ Successful response: 5: Success
+ Maximum retries: 3: Failure
+ Records expired: 1: Failure
+```
+
+This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
+
+- [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
+- [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
+
+
+
+???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
+ We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
+
+ You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
+
+!!! warning "Migrating to v3"
+
+ If you're upgrading to v3, please review the [Migration Guide v3](../migration-guide-v3.md) for important breaking changes including .NET 8 requirement and AWS SDK v4 migration.
+
+## Installation
+
+You should install with NuGet:
+
+```powershell
+Install-Package AWS.Lambda.Powertools.BatchProcessing
+```
+
+Or via the .NET Core command line interface:
+
+```bash
+dotnet add package AWS.Lambda.Powertools.BatchProcessing
+```
+
+## Getting started
+
+For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.
+
+You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
+
+Batch processing can be configured with the settings bellow:
+
+| Setting | Description | Environment variable | Default |
+| ------------------------------- | ----------------------------------------------------------------------------- | ---------------------------------------------- | ----------------- |
+| **Error Handling Policy** | The error handling policy to apply during batch processing. | `POWERTOOLS_BATCH_ERROR_HANDLING_POLICY` | `DeriveFromEvent` |
+| **Parallel Enabled** | Controls if parallel processing of batch items is enabled. | `POWERTOOLS_BATCH_PARALLEL_ENABLED` | `false` |
+| **Max Degree of Parallelism** | The maximum degree of parallelism to apply if parallel processing is enabled. | `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` | `1` |
+| **Throw on Full Batch Failure** | Controls if a `BatchProcessingException` is thrown on full batch failure. | `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE` | `true` |
+
+### Required resources
+
+The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
+
+!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
+
+=== "SQS"
+
+ ```yaml title="template.yaml" hl_lines="93-94"
+ --8<-- "docs/snippets/batch/templates/sqs.yaml"
+ ```
+
+=== "Kinesis Data Streams"
+
+ ```yaml title="template.yaml" hl_lines="109-110"
+ --8<-- "docs/snippets/batch/templates/kinesis.yaml"
+ ```
+
+=== "DynamoDB Streams"
+
+ ```yaml title="template.yaml" hl_lines="102-103"
+ --8<-- "docs/snippets/batch/templates/dynamodb.yaml"
+ ```
+
+### Processing messages from SQS
+
+#### Using Typed Handler decorator (Recommended)
+
+Processing batches from SQS using typed Lambda handler decorator with automatic deserialization works in four stages:
+
+1. Define your data model class
+2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
+3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedSqsBatchProcessor.Result.BatchItemFailuresResponse`**
+
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 8 19 29 32"
+ public class Product
+ {
+ public int Id { get; set; }
+ public string? Name { get; set; }
+ public decimal Price { get; set; }
+ }
+
+ public class TypedSqsRecordHandler : ITypedRecordHandler // (1)!
+ {
+ public async Task HandleAsync(Product product, CancellationToken cancellationToken)
+ {
+ /*
+ * Your business logic with automatic deserialization.
+ * If an exception is thrown, the item will be marked as a partial batch item failure.
+ */
+
+ Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})");
+
+ if (product.Id == 4) // (2)!
+ {
+ throw new ArgumentException("Error on id 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+
+ }
+
+ [BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingTypedAttribute(SQSEvent _)
+ {
+ return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Product is automatically deserialized from SQS message body.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 4. **Step 4**. Lambda function returns the Partial batch response using TypedSqsBatchProcessor
+
+=== "Sample event"
+
+ ```json
+ {
+ "Records": [
+ {
+ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"id\": 1, \"name\": \"Laptop Computer\", \"price\": 999.99}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "7b270e59b47ff90a553787216d55d91d",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"id\": 4, \"name\": \"Invalid Product\", \"price\": -10.00}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "7b270e59b47ff90a553787216d55d92e",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ }
+ ]
+ }
+
+ ```
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its message ID in the response.
+
+ ```json
+ {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
+ }
+ ]
+ }
+ ```
+
+#### Using Handler decorator (Traditional)
+
+Processing batches from SQS using Lambda handler decorator works in three stages:
+
+1. Decorate your handler with **`BatchProcessor`** attribute
+2. Create a class that implements **`ISqsRecordHandler`** interface and the HandleAsync method.
+3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`SqsBatchProcessor.Result.BatchItemFailuresResponse`**
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 12 22 17 25"
+ public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
+ {
+ public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
+ {
+ /*
+ * Your business logic.
+ * If an exception is thrown, the item will be marked as a partial batch item failure.
+ */
+
+ var product = JsonSerializer.Deserialize(record.Body);
+
+ if (product.Id == 4) // (2)!
+ {
+ throw new ArgumentException("Error on id 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+
+ }
+
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements ISqsRecordHandler interface and the HandleAsync method.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 3. **Step 4**. Lambda function returns the Partial batch response
+
+=== "Sample event"
+
+ ```json
+ {
+ "Records": [
+ {
+ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "fail",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ ]
+ }
+
+ ```
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its message ID in the response.
+
+ ```json
+ {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
+ },
+ {
+ "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
+ }
+ ]
+ }
+ ```
+
+#### FIFO queues
+
+When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
+This helps preserve the ordering of messages in your queue. Powertools automatically detects a FIFO queue.
+
+
+
+
+
+### Processing messages from Kinesis
+
+#### Using Typed Handler decorator (Recommended)
+
+Processing batches from Kinesis using typed Lambda handler decorator with automatic deserialization works in four stages:
+
+1. Define your data model class
+2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
+3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse`**
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 9 15 20 24 27"
+ public class Order
+ {
+ public string? OrderId { get; set; }
+ public DateTime OrderDate { get; set; }
+ public List Items { get; set; } = new();
+ public decimal TotalAmount { get; set; }
+ }
+
+ internal class TypedKinesisRecordHandler : ITypedRecordHandler // (1)!
+ {
+ public async Task HandleAsync(Order order, CancellationToken cancellationToken)
+ {
+ Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");
+
+ if (order.TotalAmount <= 0) // (2)!
+ {
+ throw new ArgumentException("Invalid order total");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+ }
+
+ [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
+ {
+ return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Order is automatically deserialized from Kinesis record data.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 4. **Step 4**. Lambda function returns the Partial batch response using TypedKinesisEventBatchProcessor
+
+#### Using Handler decorator (Traditional)
+
+Processing batches from Kinesis using Lambda handler decorator works in three stages:
+
+1. Decorate your handler with **`BatchProcessor`** attribute
+2. Create a class that implements **`IKinesisEventRecordHandler`** interface and the HandleAsync method.
+3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`KinesisEventBatchProcessor.Result.BatchItemFailuresResponse`**
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 7 12 17 20"
+ internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)!
+ {
+ public async Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken)
+ {
+ var product = JsonSerializer.Deserialize(record.Kinesis.Data);
+
+ if (product.Id == 4) // (2)!
+ {
+ throw new ArgumentException("Error on id 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+ }
+
+
+ [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _)
+ {
+ return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements the IKinesisEventRecordHandler interface and the HandleAsync method.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 3. **Step 4**. Lambda function returns the Partial batch response
+
+=== "Sample event"
+
+ ```json
+ {
+ "Records": [
+ {
+ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "fail",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ ]
+ }
+
+ ```
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its message ID in the response.
+
+ ```json
+ {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
+ },
+ {
+ "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
+ }
+ ]
+ }
+ ```
+
+### Processing messages from DynamoDB
+
+#### Using Typed Handler decorator (Recommended)
+
+Processing batches from DynamoDB Streams using typed Lambda handler decorator with automatic deserialization works in four stages:
+
+1. Define your data model class
+2. Create a class that implements **`ITypedRecordHandler`** interface and the HandleAsync method
+3. Decorate your handler with **`BatchProcessor`** attribute using **`TypedRecordHandler`** property
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse`**
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 9 15 20 24 27"
+ public class Customer
+ {
+ public string? CustomerId { get; set; }
+ public string? Name { get; set; }
+ public string? Email { get; set; }
+ public DateTime CreatedAt { get; set; }
+ }
+
+ internal class TypedDynamoDbRecordHandler : ITypedRecordHandler // (1)!
+ {
+ public async Task HandleAsync(Customer customer, CancellationToken cancellationToken)
+ {
+ Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");
+
+ if (string.IsNullOrEmpty(customer.Email)) // (2)!
+ {
+ throw new ArgumentException("Customer email is required");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+ }
+
+ [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _)
+ {
+ return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Customer is automatically deserialized from DynamoDB stream record.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 4. **Step 4**. Lambda function returns the Partial batch response using TypedDynamoDbStreamBatchProcessor
+
+#### Using Handler decorator (Traditional)
+
+Processing batches from DynamoDB Streams using Lambda handler decorator works in three stages:
+
+1. Decorate your handler with **`BatchProcessor`** attribute
+2. Create a class that implements **`IDynamoDbStreamRecordHandler`** and the HandleAsync method.
+3. Pass the type of that class to **`RecordHandler`** property of the **`BatchProcessor`** attribute
+4. Return **`BatchItemFailuresResponse`** from Lambda handler using **`DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse`**
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 7 12 17 20"
+ internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)!
+ {
+ public async Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)
+ {
+ var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S);
+
+ if (product.Id == 4) // (2)!
+ {
+ throw new ArgumentException("Error on id 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+ }
+
+
+ [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
+ {
+ return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)!
+ }
+
+ ```
+
+ 1. **Step 1**. Creates a class that implements the IDynamoDbStreamRecordHandler and the HandleAsync method.
+ 2. **Step 2**. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
+ 3. **Step 3**. RecordHandlerResult can return empty (None) or some data.
+ 3. **Step 4**. Lambda function returns the Partial batch response
+
+=== "Sample event"
+
+ ```json
+ {
+ "Records": [
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "NewImage": {
+ "Product": {
+ "S": "{\"Id\":1,\"Name\":\"product-name\",\"Price\":14}"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "3275880929",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "eventsource_arn",
+ "eventSource": "aws:dynamodb"
+ },
+ {
+ "eventID": "1",
+ "eventVersion": "1.0",
+ "dynamodb": {
+ "Keys": {
+ "Id": {
+ "N": "101"
+ }
+ },
+ "NewImage": {
+ "Product": {
+ "S": "fail"
+ }
+ },
+ "StreamViewType": "NEW_AND_OLD_IMAGES",
+ "SequenceNumber": "8640712661",
+ "SizeBytes": 26
+ },
+ "awsRegion": "us-west-2",
+ "eventName": "INSERT",
+ "eventSourceARN": "eventsource_arn",
+ "eventSource": "aws:dynamodb"
+ }
+ ]
+ }
+
+ ```
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its message ID in the response.
+
+ ```json
+ {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": "8640712661"
+ }
+ ]
+ }
+ ```
+
+### Error handling
+
+By default, we catch any exception raised by your custom record handler HandleAsync method (ISqsRecordHandler, IKinesisEventRecordHandler, IDynamoDbStreamRecordHandler).
+This allows us to **(1)** continue processing the batch, **(2)** collect each batch item that failed processing, and **(3)** return the appropriate response correctly without failing your Lambda function execution.
+
+=== "Function.cs"
+
+ ```csharp hl_lines="14"
+ public class CustomSqsRecordHandler : ISqsRecordHandler // (1)!
+ {
+ public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
+ {
+ /*
+ * Your business logic.
+ * If an exception is thrown, the item will be marked as a partial batch item failure.
+ */
+
+ var product = JsonSerializer.Deserialize(record.Body);
+
+ if (product.Id == 4) // (2)!
+ {
+ throw new ArgumentException("Error on id 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None); // (3)!
+ }
+
+ }
+
+ ```
+
+=== "Sample event"
+
+ ```json
+ {
+ "Records": [
+ {
+ "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "fail",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ {
+ "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",
+ "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",
+ "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
+ "attributes": {
+ "ApproximateReceiveCount": "1",
+ "SentTimestamp": "1545082649183",
+ "SenderId": "SENDER_ID",
+ "ApproximateFirstReceiveTimestamp": "1545082649185"
+ },
+ "messageAttributes": {},
+ "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
+ "eventSource": "aws:sqs",
+ "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",
+ "awsRegion": "us-east-1"
+ },
+ ]
+ }
+
+ ```
+
+=== "Sample response"
+
+ The second record failed to be processed, therefore the processor added its message ID in the response.
+
+ ```json
+ {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"
+ },
+ {
+ "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"
+ }
+ ]
+ }
+ ```
+
+#### Error Handling Policy
+
+You can specify the error handling policy applied during batch processing.
+
+`ErrorHandlingPolicy` is used to control the error handling policy of the batch item processing.
+With a value of `DeriveFromEvent` (default), the specific BatchProcessor, determines the policy based on the incoming event.
+
+For example, the `SqsBatchProcessor` looks at the EventSourceArn to determine if the ErrorHandlingPolicy should be `StopOnFirstBatchItemFailure` (for FIFO queues) or `ContinueOnBatchItemFailure` (for standard queues).
+For `StopOnFirstBatchItemFailure` the batch processor stops processing and marks any remaining records as batch item failures.
+For `ContinueOnBatchItemFailure` the batch processor continues processing batch items regardless of item failures.
+
+| Policy | Description |
+| ------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
+| **DeriveFromEvent** | Auto-derive the policy based on the event. |
+| **ContinueOnBatchItemFailure** | Continue processing regardless of whether other batch items fails during processing. |
+| **StopOnFirstBatchItemFailure** | Stop processing other batch items after the first batch item has failed processing. This is useful to preserve ordered processing of events. |
+
+!!! note
+
+ When using **StopOnFirstBatchItemFailure** and parallel processing is enabled, all batch items already scheduled to be processed, will be allowed to complete before the batch processing stops.
+
+ Therefore, if order is important, it is recommended to use sequential (non-parallel) processing together with this value."
+
+To change the default error handling policy, you can set the **`POWERTOOLS_BATCH_ERROR_HANDLING_POLICY`** Environment Variable.
+
+Another approach is to decorate the handler and use one of the policies in the **`ErrorHandlingPolicy`** Enum property of the **`BatchProcessor`** attribute
+
+=== "Function.cs"
+
+ ```csharp hl_lines="2"
+ [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler),
+ ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
+ public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
+ {
+ return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+### Partial failure mechanics
+
+All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
+
+- **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`.
+- **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing.
+- **All records failed to be processed**. By defaullt, we will throw a `BatchProcessingException` with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See [Working with full batch failures](#working-with-full-batch-failures) for more information.
+
+The following sequence diagrams explain how each Batch processor behaves under different scenarios.
+
+#### SQS Standard
+
+> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
+
+Sequence diagram to explain how [`BatchProcessor` works](#processing-messages-from-sqs) with SQS Standard queues.
+
+
+
+#### SQS FIFO
+
+> Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}.
+
+Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues.
+
+
+```mermaid
+sequenceDiagram
+ autonumber
+ participant SQS queue
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>SQS queue: Poll
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3rd batch item
+ Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure
+ deactivate Lambda function
+ activate SQS queue
+ Lambda service->>SQS queue: Delete successful messages (1-2)
+ SQS queue-->>SQS queue: Failed messages return (3-10)
+ deactivate SQS queue
+```
+SQS FIFO mechanism with Batch Item Failures
+
+
+#### Kinesis and DynamoDB Streams
+
+> Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}.
+
+Sequence diagram to explain how `BatchProcessor` works with both [Kinesis Data Streams](#processing-messages-from-kinesis) and [DynamoDB Streams](#processing-messages-from-dynamodb).
+
+For brevity, we will use `Streams` to refer to either services. For theory on stream checkpoints, see this [blog post](https://aws.amazon.com/blogs/compute/optimizing-batch-processing-with-custom-checkpoints-in-aws-lambda/){target="_blank"}
+
+
+```mermaid
+sequenceDiagram
+ autonumber
+ participant Streams
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>Streams: Poll latest records
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3rd batch item
+ Lambda function-->Lambda function: Continue processing batch items (4-10)
+ Lambda function->>Lambda service: Report batch item as failure (3)
+ deactivate Lambda function
+ activate Streams
+ Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item
+ Lambda service->>Streams: Poll records starting from updated checkpoint
+ deactivate Streams
+```
+Kinesis and DynamoDB streams mechanism with single batch item failure
+
+
+The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported.
+
+!!! important "Note that the batch item sequence number could be different from batch item number in the illustration."
+
+
+```mermaid
+sequenceDiagram
+ autonumber
+ participant Streams
+ participant Lambda service
+ participant Lambda function
+ Lambda service->>Streams: Poll latest records
+ Lambda service->>Lambda function: Invoke (batch event)
+ activate Lambda function
+ Lambda function-->Lambda function: Process 2 out of 10 batch items
+ Lambda function--xLambda function: Fail on 3-5 batch items
+ Lambda function-->Lambda function: Continue processing batch items (6-10)
+ Lambda function->>Lambda service: Report batch items as failure (3-5)
+ deactivate Lambda function
+ activate Streams
+ Lambda service->>Streams: Checkpoints to lowest sequence number
+ Lambda service->>Streams: Poll records starting from updated checkpoint
+ deactivate Streams
+```
+Kinesis and DynamoDB streams mechanism with multiple batch item failures
+
+
+## Typed Batch Processing Advanced Features
+
+### AOT (Ahead-of-Time) Compilation Support
+
+For Native AOT scenarios, you can configure JsonSerializerContext:
+
+=== "JsonSerializerContext Configuration"
+
+ ```csharp
+ [JsonSerializable(typeof(Product))]
+ [JsonSerializable(typeof(Order))]
+ [JsonSerializable(typeof(Customer))]
+ [JsonSerializable(typeof(List))]
+ [JsonSourceGenerationOptions(
+ PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,
+ WriteIndented = false,
+ DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)]
+ public partial class MyJsonSerializerContext : JsonSerializerContext
+ {
+ }
+ ```
+
+=== "Using with Attribute"
+
+ ```csharp hl_lines="2 3"
+ [BatchProcessor(
+ TypedRecordHandler = typeof(TypedSqsRecordHandler),
+ JsonSerializerContext = typeof(MyJsonSerializerContext))]
+ public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent)
+ {
+ return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+
+
+### Lambda Context Injection
+
+For typed handlers that need access to Lambda context, use `ITypedRecordHandlerWithContext`:
+
+=== "Handler with Context"
+
+ ```csharp hl_lines="1 3"
+ public class ProductHandlerWithContext : ITypedRecordHandlerWithContext
+ {
+ public async Task HandleAsync(Product product, ILambdaContext context, CancellationToken cancellationToken)
+ {
+ Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}");
+ Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s");
+
+ // Use context for timeout handling
+ if (context.RemainingTime.TotalSeconds < 5)
+ {
+ Logger.LogWarning("Low remaining time, processing quickly");
+ }
+
+ return RecordHandlerResult.None;
+ }
+ }
+ ```
+
+=== "Function Usage"
+
+ ```csharp hl_lines="1 2"
+ [BatchProcessor(TypedRecordHandlerWithContext = typeof(ProductHandlerWithContext))]
+ public BatchItemFailuresResponse ProcessWithContext(SQSEvent sqsEvent, ILambdaContext context)
+ {
+ return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+### Migration from Traditional to Typed Handlers
+
+You can gradually migrate from traditional to typed handlers:
+
+=== "Before (Traditional)"
+
+ ```csharp hl_lines="1 6"
+ public class TraditionalSqsHandler : ISqsRecordHandler
+ {
+ public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
+ {
+ // Manual deserialization
+ var product = JsonSerializer.Deserialize(record.Body);
+
+ Logger.LogInformation($"Processing product {product.Id}");
+
+ if (product.Price < 0)
+ throw new ArgumentException("Invalid price");
+
+ return RecordHandlerResult.None;
+ }
+ }
+
+ [BatchProcessor(RecordHandler = typeof(TraditionalSqsHandler))]
+ public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+=== "After (Typed)"
+
+ ```csharp hl_lines="1 5"
+ public class TypedSqsHandler : ITypedRecordHandler
+ {
+ public async Task HandleAsync(Product product, CancellationToken cancellationToken)
+ {
+ // Automatic deserialization - product is already deserialized!
+ Logger.LogInformation($"Processing product {product.Id}");
+
+ // Same business logic
+ if (product.Price < 0)
+ throw new ArgumentException("Invalid price");
+
+ return RecordHandlerResult.None;
+ }
+ }
+
+ [BatchProcessor(TypedRecordHandler = typeof(TypedSqsHandler))]
+ public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent)
+ {
+ return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+### Error Handling with Typed Processors
+
+Typed processors support the same error handling policies as traditional processors:
+
+=== "Custom Error Handling"
+
+ ```csharp hl_lines="2"
+ [BatchProcessor(
+ TypedRecordHandler = typeof(TypedSqsHandler),
+ ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)]
+ public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent)
+ {
+ return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+### Advanced
+
+#### Using utility outside handler and IoC
+
+You can use Batch processing without using the decorator.
+
+Calling the **`ProcessAsync`** method on the Instance of the static BatchProcessor (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`)
+
+=== "Function.cs"
+
+ ```csharp hl_lines="3"
+ public async Task HandlerUsingUtility(DynamoDBEvent dynamoDbEvent)
+ {
+ var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler.From(record =>
+ {
+ var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S);
+
+ if (product.GetProperty("Id").GetInt16() == 4)
+ {
+ throw new ArgumentException("Error on 4");
+ }
+ }));
+ return result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+To make the handler testable you can use Dependency Injection to resolve the BatchProcessor (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`) instance and then call the **`ProcessAsync`** method.
+
+=== "GetRequiredService inside the method"
+
+ ```csharp hl_lines="3 4 5"
+ public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent)
+ {
+ var batchProcessor = Services.Provider.GetRequiredService();
+ var recordHandler = Services.Provider.GetRequiredService();
+ var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
+ return result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+=== "Injecting method parameters"
+
+ ```csharp hl_lines="2 4"
+ public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent,
+ IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler)
+ {
+ var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);
+ return result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+=== "Example implementation of IServiceProvider"
+
+ ```csharp hl_lines="16 17"
+ internal class Services
+ {
+ private static readonly Lazy LazyInstance = new(Build);
+
+ private static ServiceCollection _services;
+ public static IServiceProvider Provider => LazyInstance.Value;
+
+ public static IServiceProvider Init()
+ {
+ return LazyInstance.Value;
+ }
+
+ private static IServiceProvider Build()
+ {
+ _services = new ServiceCollection();
+ _services.AddScoped();
+ _services.AddScoped();
+ return _services.BuildServiceProvider();
+ }
+ }
+
+ ```
+
+#### Processing messages in parallel
+
+You can set the `POWERTOOLS_BATCH_PARALLEL_ENABLED` Environment Variable to `true` or set the property `BatchParallelProcessingEnabled` on the Lambda decorator to process messages concurrently.
+
+You can also set `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` Environment Variable to the number of parallelism you which.
+
+!!! note
+
+ MaxDegreeOfParallelism is used to control the parallelism of the batch item processing.
+
+ With a value of 1, the processing is done sequentially (default). Sequential processing is recommended when preserving order is important - i.e. with SQS FIFIO queues.
+
+ With a value > 1, the processing is done in parallel. Doing parallel processing can enable processing to complete faster, i.e., when processing does downstream service calls.
+
+ With a value of -1, the parallelism is automatically configured to be the vCPU count of the Lambda function. Internally, the Batch Processing Utility utilizes Parallel.ForEachAsync Method and the ParallelOptions.MaxDegreeOfParallelism Property to enable this functionality.
+
+???+ question "When is this useful?"
+ Your use case might be able to process multiple records at the same time without conflicting with one another.
+
+ For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
+
+ The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1"
+ [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )]
+ public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _)
+ {
+ return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+#### Working with full batch failures
+
+By default, the `BatchProcessor` will throw a `BatchProcessingException` if all records in the batch fail to process. We do this to reflect the failure in your operational metrics.
+
+When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.
+
+For these scenarios, you can set `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false`, or the equivalent on either the `BatchProcessor` decorator or on the `ProcessingOptions` object. See examples below.
+
+=== "Setting ThrowOnFullBatchFailure on Decorator"
+
+ ```csharp hl_lines="3"
+ [BatchProcessor(
+ RecordHandler = typeof(CustomSqsRecordHandler),
+ ThrowOnFullBatchFailure = false)]
+ public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+=== "Setting ThrowOnFullBatchFailure outside Decorator"
+
+ ```csharp hl_lines="8"
+ public async Task HandlerUsingUtility(SQSEvent sqsEvent)
+ {
+ var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x =>
+ {
+ // Inline handling of SQS message...
+ }), new ProcessingOptions
+ {
+ ThrowOnFullBatchFailure = false
+ });
+ return result.BatchItemFailuresResponse;
+ }
+
+ ```
+
+#### Extending BatchProcessor
+
+You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
+
+For these scenarios, you can create a class that inherits from `BatchProcessor` (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`) and quickly override `ProcessAsync` and `HandleRecordFailureAsync` methods:
+
+- **`ProcessAsync()`** – Keeps track of successful batch records
+- **`HandleRecordFailureAsync()`** – Keeps track of failed batch records
+
+???+ example
+ Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing. And also override the default error handling policy to stop on first item failure.
+
+=== "Function.cs"
+
+ ```csharp hl_lines="1 21 54 97"
+
+ public class CustomDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor
+ {
+ public override async Task> ProcessAsync(DynamoDBEvent @event,
+ IRecordHandler recordHandler, ProcessingOptions processingOptions)
+ {
+ ProcessingResult = new ProcessingResult();
+
+ // Prepare batch records (order is preserved)
+ var batchRecords = GetRecordsFromEvent(@event).Select(x => new KeyValuePair(GetRecordId(x), x))
+ .ToArray();
+
+ // We assume all records fail by default to avoid loss of data
+ var failureBatchRecords = batchRecords.Select(x => new KeyValuePair>(x.Key,
+ new RecordFailure
+ {
+ Exception = new UnprocessedRecordException($"Record: '{x.Key}' has not been processed."),
+ Record = x.Value
+ }));
+
+ // Override to fail on first failure
+ var errorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure;
+
+ var successRecords = new Dictionary>();
+ var failureRecords = new Dictionary>(failureBatchRecords);
+
+ try
+ {
+ foreach (var pair in batchRecords)
+ {
+ var (recordId, record) = pair;
+
+ try
+ {
+ var result = await HandleRecordAsync(record, recordHandler, CancellationToken.None);
+ failureRecords.Remove(recordId, out _);
+ successRecords.TryAdd(recordId, new RecordSuccess
+ {
+ Record = record,
+ RecordId = recordId,
+ HandlerResult = result
+ });
+ }
+ catch (Exception ex)
+ {
+ // Capture exception
+ failureRecords[recordId] = new RecordFailure
+ {
+ Exception = new RecordProcessingException(
+ $"Failed processing record: '{recordId}'. See inner exception for details.", ex),
+ Record = record,
+ RecordId = recordId
+ };
+
+ Metrics.AddMetric("BatchRecordFailures", 1, MetricUnit.Count);
+
+ try
+ {
+ // Invoke hook
+ await HandleRecordFailureAsync(record, ex);
+ }
+ catch
+ {
+ // NOOP
+ }
+
+ // Check if we should stop record processing on first error
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalse
+ if (errorHandlingPolicy == BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)
+ {
+ // This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internally
+ throw new CircuitBreakerException(
+ "Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.",
+ ex);
+ }
+ }
+ }
+ }
+ catch (Exception ex) when (ex is CircuitBreakerException or OperationCanceledException)
+ {
+ // NOOP
+ }
+
+ ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x => x.Value));
+ ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x =>
+ new BatchItemFailuresResponse.BatchItemFailure
+ {
+ ItemIdentifier = x.Key
+ }));
+ ProcessingResult.FailureRecords.AddRange(failureRecords.Values);
+
+ ProcessingResult.SuccessRecords.AddRange(successRecords.Values);
+
+ return ProcessingResult;
+ }
+
+ // ReSharper disable once RedundantOverriddenMember
+ protected override async Task HandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecord record, Exception exception)
+ {
+ await base.HandleRecordFailureAsync(record, exception);
+ }
+ }
+ ```
+
+## Testing your code
+
+### Testing Typed Handlers
+
+Testing typed batch processors is straightforward since you work directly with your data models:
+
+=== "Typed Handler Test"
+
+ ```csharp
+ [Fact]
+ public async Task TypedHandler_ValidProduct_ProcessesSuccessfully()
+ {
+ // Arrange
+ var product = new Product { Id = 1, Name = "Test Product", Price = 10.99m };
+ var handler = new TypedSqsRecordHandler();
+ var cancellationToken = CancellationToken.None;
+
+ // Act
+ var result = await handler.HandleAsync(product, cancellationToken);
+
+ // Assert
+ Assert.Equal(RecordHandlerResult.None, result);
+ }
+
+ [Fact]
+ public async Task TypedHandler_InvalidProduct_ThrowsException()
+ {
+ // Arrange
+ var product = new Product { Id = 4, Name = "Invalid", Price = -10 };
+ var handler = new TypedSqsRecordHandler();
+
+ // Act & Assert
+ await Assert.ThrowsAsync(() =>
+ handler.HandleAsync(product, CancellationToken.None));
+ }
+ ```
+
+=== "Integration Test"
+
+ ```csharp
+ [Fact]
+ public async Task ProcessSqsEvent_WithTypedHandler_ProcessesAllRecords()
+ {
+ // Arrange
+ var sqsEvent = new SQSEvent
+ {
+ Records = new List
+ {
+ new() {
+ MessageId = "1",
+ Body = JsonSerializer.Serialize(new Product { Id = 1, Name = "Product 1", Price = 10 }),
+ EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"
+ },
+ new() {
+ MessageId = "2",
+ Body = JsonSerializer.Serialize(new Product { Id = 2, Name = "Product 2", Price = 20 }),
+ EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"
+ }
+ }
+ };
+
+ var function = new TypedFunction();
+
+ // Act
+ var result = function.HandlerUsingTypedAttribute(sqsEvent);
+
+ // Assert
+ Assert.Empty(result.BatchItemFailures);
+ }
+ ```
+
+### Testing Traditional Handlers
+
+As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
+
+=== "Test.cs"
+
+ ```csharp
+ [Fact]
+ public Task Sqs_Handler_Using_Attribute()
+ {
+ var request = new SQSEvent
+ {
+ Records = TestHelper.SqsMessages
+ };
+
+ var function = new HandlerFunction();
+
+ var response = function.HandlerUsingAttribute(request);
+
+ Assert.Equal(2, response.BatchItemFailures.Count);
+ Assert.Equal("2", response.BatchItemFailures[0].ItemIdentifier);
+ Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier);
+
+ return Task.CompletedTask;
+ }
+ ```
+
+=== "Function.cs"
+
+ ```csharp
+ [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]
+ public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _)
+ {
+ return SqsBatchProcessor.Result.BatchItemFailuresResponse;
+ }
+ ```
+
+=== "CustomSqsRecordHandler.cs"
+
+ ```csharp
+ public class CustomSqsRecordHandler : ISqsRecordHandler
+ {
+ public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)
+ {
+ var product = JsonSerializer.Deserialize(record.Body);
+
+ if (product.GetProperty("Id").GetInt16() == 4)
+ {
+ throw new ArgumentException("Error on 4");
+ }
+
+ return await Task.FromResult(RecordHandlerResult.None);
+ }
+ }
+ ```
+
+=== "SQS Event.cs"
+
+ ```csharp
+ internal static List SqsMessages => new()
+ {
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "1",
+ Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "2",
+ Body = "fail",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "3",
+ Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "4",
+ Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
+ },
+ new SQSEvent.SQSMessage
+ {
+ MessageId = "5",
+ Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}",
+ EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"
+ },
+ };
+ ```
+
+## Complete Examples and Documentation
+
+The [BatchProcessing example](https://github.com/aws-powertools/powertools-lambda-dotnet/tree/develop/examples/BatchProcessing){target="\_blank"} contains complete working examples:
+
+- **TypedFunction.cs** - Complete examples using all typed batch processing patterns
+- **TypedHandlers/** - Example implementations for SQS, Kinesis, and DynamoDB
- **Sample Events** - Test events for all event types with typed data
\ No newline at end of file
diff --git a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
index f28f7f3be..5ff851411 100644
--- a/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
+++ b/libraries/src/AWS.Lambda.Powertools.BatchProcessing/BatchProcessorAttribute.cs
@@ -299,19 +299,26 @@ internal IBatchProcessingAspectHandler CreateAspectHandler(IReadOnlyList