Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis #886

Merged

Conversation

heitorlessa
Copy link
Contributor

@heitorlessa heitorlessa commented Dec 10, 2021

Issue #, if available: #858

Description of changes:

Implements new Batch Processor following the new built-in partial response handling for Lambda SQS, DynamoDB Stream, and Kinesis Data Streams.

Checklist

  • Meet tenets criteria
  • Update tests
  • Update docs
  • PR title follows conventional commit semantics
  • Add DynamoDB Stream support
  • Add Kinesis Data Stream support
  • Think of a better name other than .report() -> response to match Java counterpart
  • Support event source data classes by default
  • Mypy support
  • Review with @cakepietoast whether we still need suppress_exception behaviour
  • Bring-your-own-model Pydantic Parser as an option
  • Test for incorrect data/payload that won't be parsed (either Model or Data Class)
  • Raise BatchProcessingError when all batch records fail to ensure failure arises

Tasks to be created in separate PR to ease reviewing

  • Support for handling permanent exceptions (might be for the release after next)

UX

As a decorator

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

processor = BatchProcessor(event_type=EventType.SQS) # or, EventType.KinesisDataStreams, EventType.DynamoDB

def record_handler(record: SQSRecord): # or DynamoDBRecord, KinesisStreamRecord
    return "success"

@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
    return processor.response() # this is necessary to ensure Lambda returns the correct response

As a context manager, in case you need full access to the batch processed, or handle exceptions yourself

from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, EventType

processor = BatchProcessor(event_type=EventType.SQS) 


def lambda_handler(event, context):
    records = event["Records"]
    with processor(records, record_handler) as batch:
        processed_messages = batch.process()
    
    return batch.response() # this is necessary to ensure Lambda returns the correct response

Breaking change checklist

RFC issue #:

  • Migration process documented
  • Implement warnings (if it can live side by side)

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@pull-request-size pull-request-size bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Dec 10, 2021
@heitorlessa heitorlessa marked this pull request as draft December 10, 2021 17:39
@codecov-commenter
Copy link

codecov-commenter commented Dec 12, 2021

Codecov Report

Merging #886 (95715d0) into develop (99227ce) will decrease coverage by 0.09%.
The diff coverage is 97.24%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #886      +/-   ##
===========================================
- Coverage    99.88%   99.79%   -0.10%     
===========================================
  Files          118      118              
  Lines         5161     5262     +101     
  Branches       578      596      +18     
===========================================
+ Hits          5155     5251      +96     
- Misses           2        6       +4     
- Partials         4        5       +1     
Impacted Files Coverage Δ
aws_lambda_powertools/utilities/batch/base.py 97.67% <96.70%> (-2.33%) ⬇️
aws_lambda_powertools/utilities/batch/__init__.py 100.00% <100.00%> (ø)
...ws_lambda_powertools/utilities/batch/exceptions.py 100.00% <100.00%> (ø)
aws_lambda_powertools/utilities/parser/parser.py 93.93% <0.00%> (-6.07%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 99227ce...95715d0. Read the comment docs.

@heitorlessa heitorlessa marked this pull request as ready for review December 12, 2021 13:55
@heitorlessa
Copy link
Contributor Author

@cakepietoast ready to review implementation before it gets bigger with mypy and doc stuff.

@ran-isenberg
Copy link
Contributor

ran-isenberg commented Dec 13, 2021

@heitorlessa as discussed, here's the sqs example:

@validator('body', pre=True)
def transform_body_to_dict(cls, value):
        return json.loads(value)

@boring-cyborg boring-cyborg bot added the internal Maintenance changes label Dec 13, 2021
@heitorlessa heitorlessa changed the title feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis [WIP] feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis Dec 13, 2021
@heitorlessa heitorlessa changed the title [WIP] feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis Dec 13, 2021
@heitorlessa
Copy link
Contributor Author

heitorlessa commented Dec 15, 2021 via email

@heitorlessa
Copy link
Contributor Author

heitorlessa commented Dec 15, 2021 via email

Co-authored-by: Guilherme Martins Crocetti <gmcrocetti@gmail.com>
…tools-python into feat/batch-new-processor

* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python:
  fix(parser): kinesis sequence number is str, not int (aws-powertools#907)
  feat(apigateway): add exception_handler support (aws-powertools#898)
  fix(event-sources): Pass authorizer data to APIGatewayEventAuthorizer (aws-powertools#897)
  chore(deps): bump fastjsonschema from 2.15.1 to 2.15.2 (aws-powertools#891)
@pull-request-size pull-request-size bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Dec 17, 2021
@@ -146,3 +185,120 @@ def batch_processor(
processor.process()

return handler(event, context)


class BatchProcessor(BasePartialProcessor):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have separate processors for each event type (SQS, DynamoDB or Kinesis) instead of growing the complexity of this class? Then you could encapsulate the failure collection in the specific processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the initial version we wanted to implemented - a KinesisDataStreamProcessor, DynamoDB... then @cakepietoast argued that this was gonna confuse customers with other available processors (Sqs, PartialProcessor, BaseProcessor), as we can only deprecate them in v2.

I'm 50/50 here if I'm honest. I prefer a separate one but I also can see customers easily confused of which one to pick despite docs change I'm gonna make.

Implementation wise, this will remain stable. The only two changes I can anticipate is 1/ supporting the new Permanent Exception parameter, and 2/ raising a descriptive exception in case we reach an AttributeError when collecting message id/sequence number from a malformed event/model.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we changed the nomenclature to be “producer” and “consumer” (these processors would be consumers). I had that other idea earlier to make it easier to use the SQS and DynamoDB batch write methods taking into account their batch sizes, those could be “producers” 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would the producer sit and what would be its responsibilities?

For that suggestion on partitioning, we should add it to the Event Source Data Class as it's a no brainier.

I think the word Consumer wouldn't be explicit enough on the capabilities Batch provide - maybe something else then?

Features

  • Transform incoming records into Event Source Data Class or Pydantic Models
  • Call an user defined function for each record in the batch
  • Keep track of exceptions raised for any record
  • For partial successes, extract message IDs (SQS) or sequence numbers (DynamoDB & Kinesis) and build a response to the new BatchItemIdentifiers contract
  • Future: If a raised exception matches the permanent_exceptions set, send these records to the configured DLQ in batches of 10

@pull-request-size pull-request-size bot removed the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Dec 19, 2021
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Dec 19, 2021
@pull-request-size pull-request-size bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Dec 19, 2021
@heitorlessa
Copy link
Contributor Author

Docs finally done - @michaelbrewer @ran-isenberg could you have a quick pass before we release it tomorrow? All I need to do now is merge and start writing the release notes.

Here's what I've changed:

  • Updated the docs to reflect the new BatchProcessor in favour of sqs_batch_processor and PartialSQSProcessor
  • Added a Migration guide for those using sqs_batch_processor and PartialSQSProcessor
  • Added Caveats section in case people use Tracer response auto-capture feature with this utility in large batch sizes
  • Added sample event and sample response for each
  • Added SAM infrastructure for SQS, Kinesis and DynamoDB along with DLQ and IAM Permissions
  • Added an Extensibility section
  • Added Pydantic section
  • Updated Create your own processor
  • Updated Key features
  • Updated Intro
  • Recreated Background
  • Repurposed a few previous sections like Sentry integration under Legacy

@michaelbrewer
Copy link
Contributor

@heitorlessa super cool. i will try to play around with it once merged

@ran-isenberg
Copy link
Contributor

@heitorlessa looks great, thank you very much for putting the effort supporting both parser and data classes.

@heitorlessa heitorlessa added the feature New feature or functionality label Dec 19, 2021
@heitorlessa heitorlessa merged commit c74811b into aws-powertools:develop Dec 19, 2021
@heitorlessa heitorlessa deleted the feat/batch-new-processor branch December 19, 2021 18:52
heitorlessa added a commit to ran-isenberg/aws-lambda-powertools-python that referenced this pull request Dec 31, 2021
…tools-python into complex

* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python: (24 commits)
  docs: consistency around admonitions and snippets (aws-powertools#919)
  chore(deps-dev): bump mypy from 0.920 to 0.930 (aws-powertools#925)
  fix(event-sources): handle dynamodb null type as none, not bool (aws-powertools#929)
  fix(apigateway): support @app.not_found() syntax & housekeeping (aws-powertools#926)
  docs: Added GraphQL Sample API to Examples section of README.md (aws-powertools#930)
  feat(idempotency): support dataclasses & pydantic models payloads (aws-powertools#908)
  feat(tracer): ignore tracing for certain hostname(s) or url(s) (aws-powertools#910)
  feat(event-sources): cache parsed json in data class (aws-powertools#909)
  fix(warning): future distutils deprecation (aws-powertools#921)
  docs(batch): remove leftover from legacy
  docs(layer): bump Lambda Layer to version 6
  chore: bump to 1.23.0
  docs(apigateway): add new not_found feature (aws-powertools#915)
  docs: external reference to cloudformation custom resource helper (aws-powertools#914)
  feat(logger): allow handler with custom kwargs signature (aws-powertools#913)
  chore: minor housekeeping before release (aws-powertools#912)
  chore(deps-dev): bump mypy from 0.910 to 0.920 (aws-powertools#903)
  feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis (aws-powertools#886)
  fix(parser): overload parse when using envelope (aws-powertools#885)
  fix(parser): kinesis sequence number is str, not int (aws-powertools#907)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation feature New feature or functionality internal Maintenance changes size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants