Skip to content

Commit

Permalink
feat(lambda-event-sources): msk and self-managed kafka event sources (#…
Browse files Browse the repository at this point in the history
…12507)

Fixes #12099 
----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
bracki committed Mar 16, 2021
1 parent 5dbb0ba commit 73209e1
Show file tree
Hide file tree
Showing 12 changed files with 835 additions and 3 deletions.
61 changes: 61 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,67 @@ myFunction.addEventSource(new KinesisEventSource(stream, {
}));
```

## Kafka

You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
import * as msk from '@aws-cdk/aws-lambda';
import { Secret } from '@aws-cdk/aws-secretmanager';
import { ManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';

// Your MSK cluster
const cluster = msk.Cluster.fromClusterArn(this, 'Cluster',
'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4');

// The Kafka topic you want to subscribe to
const topic = 'some-cool-topic'

// The secret that allows access to your MSK cluster
// You still have to make sure that it is associated with your cluster as described in the documentation
const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });

myFunction.addEventSource(new ManagedKafkaEventSource({
cluster: cluster,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON
}));
```

The following code sets up a self managed Kafka cluster as an event source. Username and password based authentication
will need to be set up as described in [Managing access and permissions](https://docs.aws.amazon.com/lambda/latest/dg/smaa-permissions.html#smaa-permissions-add-secret).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
import { Secret } from '@aws-cdk/aws-secretmanager';
import { SelfManagedKafkaEventSource } from '@aws-cdk/aws-lambda-event-sources';

// The list of Kafka brokers
const bootstrapServers = ['kafka-broker:9092']

// The Kafka topic you want to subscribe to
const topic = 'some-cool-topic'

// The secret that allows access to your self hosted Kafka cluster
const secret = new Secret(this, 'Secret', { ... });

myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON
}));
```

If your self managed Kafka cluster is only reachable via VPC also configure `vpc` `vpcSubnets` and `securityGroup`.

## Roadmap

Eventually, this module will support all the event sources described under
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './api';
export * from './dynamodb';
export * from './kafka';
export * from './kinesis';
export * from './s3';
export * from './sns';
Expand Down
194 changes: 194 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import * as crypto from 'crypto';
import { ISecurityGroup, IVpc, SubnetSelection } from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import * as msk from '@aws-cdk/aws-msk';
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
import { Stack } from '@aws-cdk/core';
import { StreamEventSource, StreamEventSourceProps } from './stream';

// keep this import separate from other imports to reduce chance for merge conflicts with v2-main
// eslint-disable-next-line no-duplicate-imports, import/order
import { Construct } from '@aws-cdk/core';

/**
* Properties for a Kafka event source
*/
export interface KafkaEventSourceProps extends StreamEventSourceProps {
/**
* the Kafka topic to subscribe to
*/
readonly topic: string,
/**
* the secret with the Kafka credentials, see https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html for details
*/
readonly secret: secretsmanager.ISecret
}

/**
* Properties for a MSK event source
*/
export interface ManagedKafkaEventSourceProps extends KafkaEventSourceProps {
/**
* an MSK cluster construct
*/
readonly cluster: msk.ICluster
}

/**
* The authentication method to use with SelfManagedKafkaEventSource
*/
export enum AuthenticationMethod {
/**
* SASL_SCRAM_512_AUTH authentication method for your Kafka cluster
*/
SASL_SCRAM_512_AUTH = 'SASL_SCRAM_512_AUTH',
/**
* SASL_SCRAM_256_AUTH authentication method for your Kafka cluster
*/
SASL_SCRAM_256_AUTH = 'SASL_SCRAM_256_AUTH',
}

/**
* Properties for a self managed Kafka cluster event source.
* If your Kafka cluster is only reachable via VPC make sure to configure it.
*/
export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps {
/**
* The list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that
* a Kafka client connects to initially to bootstrap itself. They are in the format `abc.xyz.com:xxxx`.
*/
readonly bootstrapServers: string[]

/**
* If your Kafka brokers are only reachable via VPC provide the VPC here
*
* @default none
*/
readonly vpc?: IVpc;

/**
* If your Kafka brokers are only reachable via VPC, provide the subnets selection here
*
* @default - none, required if setting vpc
*/
readonly vpcSubnets?: SubnetSelection,

/**
* If your Kafka brokers are only reachable via VPC, provide the security group here
*
* @default - none, required if setting vpc
*/
readonly securityGroup?: ISecurityGroup

/**
* The authentication method for your Kafka cluster
*
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
*/
readonly authenticationMethod?: AuthenticationMethod
}

/**
* Use a MSK cluster as a streaming source for AWS Lambda
*/
export class ManagedKafkaEventSource extends StreamEventSource {
// This is to work around JSII inheritance problems
private innerProps: ManagedKafkaEventSourceProps;

constructor(props: ManagedKafkaEventSourceProps) {
super(props);
this.innerProps = props;
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(
`KafkaEventSource:${this.innerProps.cluster.clusterArn}${this.innerProps.topic}`,
this.enrichMappingOptions({
eventSourceArn: this.innerProps.cluster.clusterArn,
startingPosition: this.innerProps.startingPosition,
// From https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-limitations, "Amazon MSK only supports SCRAM-SHA-512 authentication."
sourceAccessConfigurations: [{ type: lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH, uri: this.innerProps.secret.secretArn }],
kafkaTopic: this.innerProps.topic,
}),
);

this.innerProps.secret.grantRead(target);

target.addToRolePolicy(new iam.PolicyStatement(
{
actions: ['kafka:DescribeCluster', 'kafka:GetBootstrapBrokers', 'kafka:ListScramSecrets'],
resources: [this.innerProps.cluster.clusterArn],
},
));

target.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaMSKExecutionRole'));
}
}

/**
* Use a self hosted Kafka installation as a streaming source for AWS Lambda.
*/
export class SelfManagedKafkaEventSource extends StreamEventSource {
// This is to work around JSII inheritance problems
private innerProps: SelfManagedKafkaEventSourceProps;

constructor(props: SelfManagedKafkaEventSourceProps) {
super(props);
if (props.vpc) {
if (!props.securityGroup) {
throw new Error('securityGroup must be set when providing vpc');
}
if (!props.vpcSubnets) {
throw new Error('vpcSubnets must be set when providing vpc');
}
}
this.innerProps = props;
}

public bind(target: lambda.IFunction) {
if (!Construct.isConstruct(target)) { throw new Error('Function is not a construct. Unexpected error.'); }
target.addEventSourceMapping(
this.mappingId(target),
this.enrichMappingOptions({
kafkaBootstrapServers: this.innerProps.bootstrapServers,
kafkaTopic: this.innerProps.topic,
startingPosition: this.innerProps.startingPosition,
sourceAccessConfigurations: this.sourceAccessConfigurations(),
}),
);
this.innerProps.secret.grantRead(target);
}

private mappingId(target: lambda.IFunction) {
let hash = crypto.createHash('md5');
hash.update(JSON.stringify(Stack.of(target).resolve(this.innerProps.bootstrapServers)));
const idHash = hash.digest('hex');
return `KafkaEventSource:${idHash}:${this.innerProps.topic}`;
}

private sourceAccessConfigurations() {
let authType;
switch (this.innerProps.authenticationMethod) {
case AuthenticationMethod.SASL_SCRAM_256_AUTH:
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_256_AUTH;
break;
case AuthenticationMethod.SASL_SCRAM_512_AUTH:
default:
authType = lambda.SourceAccessConfigurationType.SASL_SCRAM_512_AUTH;
break;
}
let sourceAccessConfigurations = [{ type: authType, uri: this.innerProps.secret.secretArn }];
if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
uri: this.innerProps.securityGroup.securityGroupId,
},
);
this.innerProps.vpc?.selectSubnets(this.innerProps.vpcSubnets).subnetIds.forEach((id) => {
sourceAccessConfigurations.push({ type: lambda.SourceAccessConfigurationType.VPC_SUBNET, uri: id });
});
}
return sourceAccessConfigurations;
}
}
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Duration } from '@aws-cdk/core';

/**
* The set of properties for event sources that follow the streaming model,
* such as, Dynamo and Kinesis.
* such as, Dynamo, Kinesis and Kafka.
*/
export interface StreamEventSourceProps {
/**
Expand Down
6 changes: 6 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@
"dependencies": {
"@aws-cdk/aws-apigateway": "0.0.0",
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-msk": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-notifications": "0.0.0",
"@aws-cdk/aws-secretsmanager": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand All @@ -88,12 +91,15 @@
"peerDependencies": {
"@aws-cdk/aws-apigateway": "0.0.0",
"@aws-cdk/aws-dynamodb": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-events": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-msk": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-s3-notifications": "0.0.0",
"@aws-cdk/aws-secretsmanager": "0.0.0",
"@aws-cdk/aws-sns": "0.0.0",
"@aws-cdk/aws-sns-subscriptions": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand Down
Loading

0 comments on commit 73209e1

Please sign in to comment.