Skip to content

Commit

Permalink
feat(lambda): Support S3 as onFailure destinations on MSK and SelfMan…
Browse files Browse the repository at this point in the history
…agedKafka events (#28010)

## Summary
This PR will include following features
- add destination(currently sns/sqs) for Kafka event sources (SelfManagedKafkaEventSource andManagedKafkaEventSource) 
- add a dedicated S3 destination to the kafka event sources (non-kafka event source doesn't support S3 as destination)

## Backgrounds
Lambda Event Source Mapping (ESM) processes events from event sources in a sequence manner. However, a potential issue with this approach is that if a record is deemed a “poison pill”, it will be retried indefinitely until it is processed successfully or until the record expires. This can cause delays in processing others records in the queue. Additionally, for events that exceed the Lambda payload limit of 6 MB, they might be dropped as they cannot be processed.  

Today, Lambda supports configuring an OnSuccess destination and OnFailure destination for asynchronous invocations. For stream-based event sources, such as Kinesis, and DynamoDB streams, Lambda supports configuring an OnFailure destination. Regarding SQS, SQS poller doesn’t support OnFailure destination, but SQS support Dead Letter Queue (DLQ) natively.

For CDK, Some event source mappings (events) can have onFailure destination through a DestinationConfig. Right now that supports only DynamoDB and Kinesis event sources and only SQS and SNS destinations.


## Solution
- Add a new `s3onFailureDestination` destination in `s3-ofd.ts` file
- Add a new field `onFailure` to `KafkaEventSourceProps` that customer can use `s3onFailureDestination` to pass in.
- Add a check in `enrichMappingOptions` for every event type against `EventSourceMappingOptions.supportS3OFD` to check if they support S3 as onFailure.

## User Experience

```diff
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
+ import { ManagedKafkaEventSource, S3OnFailureDestination } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda'
import { App, StackProps, Stack }  from 'aws-cdk-lib';
+ import { Bucket } from 'aws-cdk-lib/aws-s3';

export class CdkTestStack extends Stack {
  constructor(scope: App, id: string, props?: StackProps) {
    super(scope, id, props);

    const myFunction = new lambda.Function(this, 'myFunction', {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: 'index.handler',
      code: lambda.Code.fromInline('//handler_code_here'),
    });
    // Your MSK cluster arn
    const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';

    // The Kafka topic you want to subscribe to
    const topic = 'some-cool-topic';

    // The secret that allows access to your MSK cluster
    // You still have to make sure that it is associated with your cluster as described in the documentation
    const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
    // Your bucket for Kafka's onFailure Destination
+    const bucket = Bucket.fromBucketName(this, 'BucketByName', 'my-bucket');
+    const s3ofd = new S3OnFailureDestination(bucket);
    myFunction.addEventSource(new ManagedKafkaEventSource({
      clusterArn,
      topic: topic,
      secret: secret,
      batchSize: 100, // default
      startingPosition: lambda.StartingPosition.TRIM_HORIZON,
+      onFailure: s3ofd,
    }));
  }
}
```

## sample synth output
```
  myFunctionServiceRoleDefaultPolicyECBA61F7:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - s3:Abort*
              - s3:DeleteObject*
              - s3:GetBucket*
              - s3:GetObject*
              - s3:List*
              - s3:PutObject
              - s3:PutObjectLegalHold
              - s3:PutObjectRetention
              - s3:PutObjectTagging
              - s3:PutObjectVersionTagging
            Effect: Allow
            Resource:
              - Fn::Join:
                  - ""
                  - - "arn:"
                    - Ref: AWS::Partition
                    - :s3:::my-bucket
              - Fn::Join:
                  - ""
                  - - "arn:"
                    - Ref: AWS::Partition
                    - :s3:::my-bucket/*
  myFunctionKafkaEventSourceCdkTestStackmyFunction09D44E80somecooltopic1B4580FF:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 100
      DestinationConfig:
        OnFailure:
          Destination:
            Fn::Join:
              - ""
              - - "arn:"
                - Ref: AWS::Partition
                - :s3:::my-bucket
```

## To discuss
### Change parent class name
The parent class of `s3ofd` is called `IEventSourceDlq`. `DLQ` stands for Dead Letter Queue, Which is very specific for SQS, we might want to work out a new naming for S3 

- Option 1:
Rename the `IEventSourceDlq`  to `IEventSourceOfd` (Ofd: OnFailureDestinaion)And create `IEventSourceDlq` again to extends from it for keeping backwards compatibility. 
For S3, Create a new class `IEventSourceS3ofd`. also extend from the parent class and will be implemented by our new S3 Destination class.

- Option 2:
Don’t create new parent classes, the new S3 Destination class just extend the original `IEventSourceDlq` Class. Like what is shown in the current code.

### Where to add checking for s3 on failure desintation support
Currently in the commit, I was checking for s3ofd support in `enrichMappingOptions` Which will be called by all event source classes. Does this design make sense, or should we create a new dedicated function for chcecking?

## Questions:
- Check out the current UX


Closes #<issue number here>.

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
otaviomacedo committed Nov 15, 2023
1 parent ef4dee6 commit e789adc
Show file tree
Hide file tree
Showing 19 changed files with 1,438 additions and 2 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e789adc

Please sign in to comment.