Navigation Menu

Skip to content

Commit

Permalink
feat(kinesisfirehose): add support for BufferingHints (#15557)
Browse files Browse the repository at this point in the history
Closes #15554

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
madeline-k committed Jul 29, 2021
1 parent 8f97d97 commit 099b584
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 2 deletions.
Expand Up @@ -64,3 +64,22 @@ export function createLoggingOptions(scope: Construct, props: DestinationLogging
}
return undefined;
}

export function createBufferingHints(
interval?: cdk.Duration,
size?: cdk.Size,
): firehose.CfnDeliveryStream.BufferingHintsProperty | undefined {
if (!interval && !size) {
return undefined;
}

const intervalInSeconds = interval?.toSeconds() ?? 300;
if (intervalInSeconds < 60 || intervalInSeconds > 900) {
throw new Error(`Buffering interval must be between 60 and 900 seconds. Buffering interval provided was ${intervalInSeconds} seconds.`);
}
const sizeInMBs = size?.toMebibytes() ?? 5;
if (sizeInMBs < 1 || sizeInMBs > 128) {
throw new Error(`Buffering size must be between 1 and 128 MiBs. Buffering size provided was ${sizeInMBs} MiBs.`);
}
return { intervalInSeconds, sizeInMBs };
}
@@ -1,14 +1,37 @@
import * as iam from '@aws-cdk/aws-iam';
import * as firehose from '@aws-cdk/aws-kinesisfirehose';
import * as s3 from '@aws-cdk/aws-s3';
import { Duration, Size } from '@aws-cdk/core';
import { Construct } from 'constructs';
import { CommonDestinationProps, Compression } from './common';
import { createLoggingOptions } from './private/helpers';
import { createBufferingHints, createLoggingOptions } from './private/helpers';

/**
* Props for defining an S3 destination of a Kinesis Data Firehose delivery stream.
*/
export interface S3BucketProps extends CommonDestinationProps {
/**
* The length of time that Firehose buffers incoming data before delivering
* it to the S3 bucket.
*
* Minimum: Duration.seconds(60)
* Maximum: Duration.seconds(900)
*
* @default Duration.seconds(300)
*/
readonly bufferingInterval?: Duration;

/**
* The size of the buffer that Kinesis Data Firehose uses for incoming data before
* delivering it to the S3 bucket.
*
* Minimum: Size.mebibytes(1)
* Maximum: Size.mebibytes(128)
*
* @default Size.mebibytes(5)
*/
readonly bufferingSize?: Size;

/**
* The type of compression that Kinesis Data Firehose uses to compress the data
* that it delivers to the Amazon S3 bucket.
Expand Down Expand Up @@ -66,6 +89,7 @@ export class S3Bucket implements firehose.IDestination {
extendedS3DestinationConfiguration: {
cloudWatchLoggingOptions: loggingOptions,
roleArn: role.roleArn,
bufferingHints: createBufferingHints(this.props.bufferingInterval, this.props.bufferingSize),
bucketArn: this.bucket.bucketArn,
compressionFormat: this.props.compression?.value,
errorOutputPrefix: this.props.errorOutputPrefix,
Expand Down
Expand Up @@ -290,6 +290,10 @@
"Arn"
]
},
"BufferingHints": {
"IntervalInSeconds": 60,
"SizeInMBs": 1
},
"CloudWatchLoggingOptions": {
"Enabled": true,
"LogGroupName": {
Expand Down
Expand Up @@ -25,6 +25,8 @@ new firehose.DeliveryStream(stack, 'Delivery Stream', {
compression: destinations.Compression.GZIP,
dataOutputPrefix: 'regularPrefix',
errorOutputPrefix: 'errorPrefix',
bufferingInterval: cdk.Duration.seconds(60),
bufferingSize: cdk.Size.mebibytes(1),
})],
});

Expand Down
Expand Up @@ -252,4 +252,57 @@ describe('S3 destination', () => {
});
});
});

describe('buffering', () => {
it('creates configuration when interval and size provided', () => {
new firehose.DeliveryStream(stack, 'DeliveryStream', {
destinations: [new firehosedestinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.minutes(1),
bufferingSize: cdk.Size.mebibytes(1),
})],
});

expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', {
ExtendedS3DestinationConfiguration: {
BufferingHints: {
IntervalInSeconds: 60,
SizeInMBs: 1,
},
},
});
});

it('validates bufferingInterval', () => {
expect(() => new firehose.DeliveryStream(stack, 'DeliveryStream', {
destinations: [new firehosedestinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.seconds(30),
bufferingSize: cdk.Size.mebibytes(1),
})],
})).toThrowError('Buffering interval must be between 60 and 900 seconds. Buffering interval provided was 30 seconds.');

expect(() => new firehose.DeliveryStream(stack, 'DeliveryStream2', {
destinations: [new firehosedestinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.minutes(16),
bufferingSize: cdk.Size.mebibytes(1),
})],
})).toThrowError('Buffering interval must be between 60 and 900 seconds. Buffering interval provided was 960 seconds.');
});

it('validates bufferingSize', () => {
expect(() => new firehose.DeliveryStream(stack, 'DeliveryStream', {
destinations: [new firehosedestinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.minutes(1),
bufferingSize: cdk.Size.mebibytes(0),

})],
})).toThrowError('Buffering size must be between 1 and 128 MiBs. Buffering size provided was 0 MiBs');

expect(() => new firehose.DeliveryStream(stack, 'DeliveryStream2', {
destinations: [new firehosedestinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.minutes(1),
bufferingSize: cdk.Size.mebibytes(256),
})],
})).toThrowError('Buffering size must be between 1 and 128 MiBs. Buffering size provided was 256 MiBs');
});
});
});
27 changes: 26 additions & 1 deletion packages/@aws-cdk/aws-kinesisfirehose/README.md
Expand Up @@ -157,7 +157,7 @@ new DeliveryStream(this, 'Delivery Stream Implicit Customer Managed', {
});
// SSE with an customer-managed CMK that is explicitly specified
const key = new kms.Key(this, 'Key');
new DeliveryStream(this, 'Delivery Stream Explicit Customer Managed'', {
new DeliveryStream(this, 'Delivery Stream Explicit Customer Managed', {
encryptionKey: key,
destinations: [destination],
});
Expand Down Expand Up @@ -265,6 +265,31 @@ new DeliveryStream(this, 'Delivery Stream', {
});
```

## Buffering

Incoming data is buffered before it is delivered to the specified destination. The
delivery stream will wait until the amount of incoming data has exceeded some threshold
(the "buffer size") or until the time since the last data delivery occurred exceeds some
threshold (the "buffer interval"), whichever happens first. You can configure these
thresholds based on the capabilities of the destination and your use-case. By default, the
buffer size is 5 MiB and the buffer interval is 5 minutes.

```ts fixture=with-bucket
import * as cdk from '@aws-cdk/core';

// Increase the buffer interval and size to 10 minutes and 8 MiB, respectively
const destination = new destinations.S3Bucket(bucket, {
bufferingInterval: cdk.Duration.minutes(10),
bufferingSize: cdk.Size.mebibytes(8),
});
new DeliveryStream(this, 'Delivery Stream', {
destinations: [destination],
});
```

See: [Data Delivery Frequency](https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#frequency)
in the *Kinesis Data Firehose Developer Guide*.

## Specifying an IAM role

The DeliveryStream class automatically creates IAM service roles with all the minimum
Expand Down

0 comments on commit 099b584

Please sign in to comment.