Skip to content

Commit

Permalink
docs(batch): add example on how to integrate with sentry.io (#308)
Browse files Browse the repository at this point in the history
  • Loading branch information
heitorlessa committed Mar 4, 2021
1 parent 271f560 commit 153567e
Showing 1 changed file with 104 additions and 52 deletions.
156 changes: 104 additions & 52 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ description: Utility

The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.

**Key Features**
## Key Features

* Prevent successfully processed messages being returned to SQS
* Simple interface for individually processing messages from a batch
* Build your own batch processor using the base classes

**Background**
## Background

When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS.

Expand All @@ -25,77 +25,111 @@ are returned to the queue.

More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html)

## Getting started

**IAM Permissions**
### IAM Permissions

This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission.
Before your use this utility, your AWS Lambda function must have `sqs:DeleteMessageBatch` permission to delete successful messages directly from the queue.

## Processing messages from SQS
> Example using AWS Serverless Application Model (SAM)
You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager.
=== "template.yml"
```yaml hl_lines="2-3 12-15"
Resources:
MyQueue:
Type: AWS::SQS::Queue

They have nearly the same behaviour when it comes to processing messages from the batch:
HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: python3.8
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: example
Policies:
- SQSPollerPolicy:
QueueName:
!GetAtt MyQueue.QueueName
```

* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will:
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
- **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue
### Processing messages from SQS

The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need.
You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager if you'd like access to the processed results.

## Record Handler
You need to create a function to handle each record from the batch - We call it `record_handler` from here on.

Both decorator and context managers require an explicit function to process the batch of messages - namely `record_handler` parameter.
=== "Decorator"

This function is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.
```python hl_lines="3 6"
from aws_lambda_powertools.utilities.batch import sqs_batch_processor

**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion.
def record_handler(record):
return do_something_with(record["body"])

### sqs_batch_processor decorator
@sqs_batch_processor(record_handler=record_handler)
def lambda_handler(event, context):
return {"statusCode": 200}
```
=== "Context manager"

When using this decorator, you need provide a function via `record_handler` param that will process individual messages from the batch - It should raise an exception if it is unable to process the record.
```python hl_lines="3 9 11-12"
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

def record_handler(record):
return_value = do_something_with(record["body"])
return return_value

def lambda_handler(event, context):
records = event["Records"]
processor = PartialSQSProcessor()

with processor(records, record_handler) as proc:
result = proc.process() # Returns a list of all results from record_handler

return result
```

!!! tip
**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion.

If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons.

### Partial failure mechanics

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch`
* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue

!!! warning
You will not have accessed to the <strong>processed messages</strong> within the Lambda Handler - all processing logic will and should be performed by the <code>record_handler</code> function.
You will not have accessed to the **processed messages** within the Lambda Handler.

=== "app.py"
All processing logic will and should be performed by the `record_handler` function.

```python
from aws_lambda_powertools.utilities.batch import sqs_batch_processor
## Advanced

def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value
### Choosing between decorator and context manager

@sqs_batch_processor(record_handler=record_handler)
def lambda_handler(event, context):
return {"statusCode": 200}
```
They have nearly the same behaviour when it comes to processing messages from the batch:

### PartialSQSProcessor context manager
* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will:
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
- **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue

The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need.

If you require access to the result of processed messages, you can use this context manager.
### Accessing processed messages

The result from calling `process()` on the context manager will be a list of all the return values from your `record_handler` function.
Use `PartialSQSProcessor` context manager to access a list of all return values from your `record_handler` function.

=== "app.py"

```python
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

def record_handler(record):
# This will be called for each individual message from a batch
# It should raise an exception if the message was not processed successfully
return_value = do_something_with(record["body"])
return return_value

return do_something_with(record["body"])

def lambda_handler(event, context):
records = event["Records"]
Expand All @@ -108,7 +142,7 @@ The result from calling `process()` on the context manager will be a list of all
return result
```

## Passing custom boto3 config
### Passing custom boto3 config

If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to
the `sqs_batch_processor` decorator:
Expand Down Expand Up @@ -159,30 +193,32 @@ the `sqs_batch_processor` decorator:
```


## Suppressing exceptions
### Suppressing exceptions

If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument.

=== "Decorator"

```python hl_lines="2"
...
```python hl_lines="3"
from aws_lambda_powertools.utilities.batch import sqs_batch_processor

@sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True)
def lambda_handler(event, context):
return {"statusCode": 200}
```

=== "Context manager"

```python hl_lines="2"
...
```python hl_lines="3"
from aws_lambda_powertools.utilities.batch import PartialSQSProcessor

processor = PartialSQSProcessor(config=config, suppress_exception=True)

with processor(records, record_handler):
result = processor.process()
```

## Create your own partial processor
### Create your own partial processor

You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`.

Expand All @@ -192,11 +228,9 @@ You can create your own partial batch processor by inheriting the `BasePartialPr

You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.

**Example:**

=== "custom_processor.py"

```python
```python hl_lines="3 9 24 30 37 57"
from random import randint

from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
Expand All @@ -223,14 +257,12 @@ You can then use this class as a context manager, or pass it to `batch_processor
def _prepare(self):
# It's called once, *before* processing
# Creates table resource and clean previous results
# E.g.:
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
self.success_messages.clear()

def _clean(self):
# It's called once, *after* closing processing all records (closing the context manager)
# Here we're sending, at once, all successful messages to a ddb table
# E.g.:
with ddb_table.batch_writer() as batch:
for result in self.success_messages:
batch.put_item(Item=result)
Expand All @@ -239,7 +271,6 @@ You can then use this class as a context manager, or pass it to `batch_processor
# It handles how your record is processed
# Here we're keeping the status of each run
# where self.handler is the record_handler function passed as an argument
# E.g.:
try:
result = self.handler(record) # record_handler passed to decorator/context manager
return self.success_handler(record, result)
Expand All @@ -260,3 +291,24 @@ You can then use this class as a context manager, or pass it to `batch_processor
def lambda_handler(event, context):
return {"statusCode": 200}
```

### Integrating exception handling with Sentry.io

When using Sentry.io for error monitoring, you can override `failure_handler` to include to capture each processing exception:

> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732)
=== "sentry_integration.py"

```python hl_lines="4 7-8"
from typing import Tuple

from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
from sentry_sdk import capture_exception

class SQSProcessor(PartialSQSProcessor):
def failure_handler(self, record: Event, exception: Tuple) -> Tuple: # type: ignore
capture_exception() # send exception to Sentry
logger.exception("got exception while processing SQS message")
return super().failure_handler(record, exception) # type: ignore
```

0 comments on commit 153567e

Please sign in to comment.