Skip to content

Commit

Permalink
feat(iot): add Action to put record to Kinesis Data stream (#18321)
Browse files Browse the repository at this point in the history
Fixes #17703

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
yamatatsu committed Jan 19, 2022
1 parent 6937564 commit 1480213
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 0 deletions.
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/README.md
Expand Up @@ -26,6 +26,7 @@ Currently supported are:
- Put logs to CloudWatch Logs
- Capture CloudWatch metrics
- Change state for a CloudWatch alarm
- Put records to Kinesis Data stream
- Put records to Kinesis Data Firehose stream
- Send messages to SQS queues

Expand Down Expand Up @@ -172,6 +173,26 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
});
```

## Put records to Kinesis Data stream

The code snippet below creates an AWS IoT Rule that put records to Kinesis Data
stream when it is triggered.

```ts
import * as kinesis from '@aws-cdk/aws-kinesis';

const stream = new kinesis.Stream(this, 'MyStream');

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"),
actions: [
new actions.KinesisPutRecordAction(stream, {
partitionKey: '${newuuid()}',
}),
],
});
```

## Put records to Kinesis Data Firehose stream

The code snippet below creates an AWS IoT Rule that put records to Put records
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/index.ts
Expand Up @@ -3,6 +3,7 @@ export * from './cloudwatch-put-metric-action';
export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './firehose-put-record-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
export * from './sqs-queue-action';
58 changes: 58 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts
@@ -0,0 +1,58 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as kinesis from '@aws-cdk/aws-kinesis';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* Configuration properties of an action for the Kinesis Data stream.
*/
export interface KinesisPutRecordActionProps extends CommonActionProps {
/**
* The partition key used to determine to which shard the data is written.
* The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}).
*
* @see https://docs.aws.amazon.com/iot/latest/developerguide/iot-substitution-templates.html
*
* You can use the expression '${newuuid()}' if your payload does not have a high cardinarity property.
* If you use empty string, this action use no partition key and all records will put same one shard.
*
* @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters
*/
readonly partitionKey: string;
}

/**
* The action to put the record from an MQTT message to the Kinesis Data stream.
*/
export class KinesisPutRecordAction implements iot.IAction {
private readonly partitionKey?: string;
private readonly role?: iam.IRole;

/**
* @param stream The Kinesis Data stream to which to put records.
* @param props Optional properties to not use default
*/
constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps) {
this.partitionKey = props.partitionKey;
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
resources: [this.stream.streamArn],
}));

return {
configuration: {
kinesis: {
streamName: this.stream.streamName,
partitionKey: this.partitionKey || undefined,
roleArn: role.roleArn,
},
},
};
}
}
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/package.json
Expand Up @@ -90,6 +90,7 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
Expand All @@ -104,6 +105,7 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
Expand Down
@@ -0,0 +1,116 @@
{
"Resources": {
"TopicRule40A4EA44": {
"Type": "AWS::IoT::TopicRule",
"Properties": {
"TopicRulePayload": {
"Actions": [
{
"Kinesis": {
"PartitionKey": "${timestamp()}",
"RoleArn": {
"Fn::GetAtt": [
"TopicRuleTopicRuleActionRole246C4F77",
"Arn"
]
},
"StreamName": {
"Ref": "MyStream5C050E93"
}
}
}
],
"AwsIotSqlVersion": "2016-03-23",
"Sql": "SELECT * FROM 'device/+/data'"
}
}
},
"TopicRuleTopicRuleActionRole246C4F77": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "kinesis:PutRecord",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
"Roles": [
{
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
}
]
}
},
"MyStream5C050E93": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"RetentionPeriodHours": 24,
"ShardCount": 3,
"StreamEncryption": {
"Fn::If": [
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions",
{
"Ref": "AWS::NoValue"
},
{
"EncryptionType": "KMS",
"KeyId": "alias/aws/kinesis"
}
]
},
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
}
}
}
},
"Conditions": {
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions": {
"Fn::Or": [
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-north-1"
]
},
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-northwest-1"
]
}
]
}
}
}
@@ -0,0 +1,27 @@
import * as iot from '@aws-cdk/aws-iot';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

class TestStack extends cdk.Stack {
constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
super(scope, id, props);

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323(
"SELECT * FROM 'device/+/data'",
),
});

const stream = new kinesis.Stream(this, 'MyStream', {
shardCount: 3,
});
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
partitionKey: '${timestamp()}',
}));
}
}

const app = new cdk.App();
new TestStack(app, 'test-kinesis-stream-action-stack');
app.synth();
@@ -0,0 +1,122 @@
import { Template, Match } from '@aws-cdk/assertions';
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as cdk from '@aws-cdk/core';
import * as actions from '../../lib';

test('Default kinesis stream action', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');

// WHEN
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
partitionKey: '${newuuid()}',
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
{
Kinesis: {
StreamName: 'my-stream',
PartitionKey: '${newuuid()}',
RoleArn: {
'Fn::GetAtt': ['MyTopicRuleTopicRuleActionRoleCE2D05DA', 'Arn'],
},
},
},
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Statement: [
{
Action: 'sts:AssumeRole',
Effect: 'Allow',
Principal: {
Service: 'iot.amazonaws.com',
},
},
],
Version: '2012-10-17',
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: 'kinesis:PutRecord',
Effect: 'Allow',
Resource: 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream',
},
],
Version: '2012-10-17',
},
PolicyName: 'MyTopicRuleTopicRuleActionRoleDefaultPolicy54A701F7',
Roles: [
{ Ref: 'MyTopicRuleTopicRuleActionRoleCE2D05DA' },
],
});
});

test('passes undefined to partitionKey if empty string is given', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');

// WHEN
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
partitionKey: '',
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({ Kinesis: { PartitionKey: Match.absent() } }),
],
},
});
});

test('can set role', () => {
// GIVEN
const stack = new cdk.Stack();
const topicRule = new iot.TopicRule(stack, 'MyTopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT topic(2) as device_id FROM 'device/+/data'"),
});
const stream = kinesis.Stream.fromStreamArn(stack, 'MyStream', 'arn:aws:kinesis:xx-west-1:111122223333:stream/my-stream');
const role = iam.Role.fromRoleArn(stack, 'MyRole', 'arn:aws:iam::123456789012:role/ForTest');

// WHEN
topicRule.addAction(new actions.KinesisPutRecordAction(stream, {
partitionKey: '${newuuid()}',
role,
}));

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::IoT::TopicRule', {
TopicRulePayload: {
Actions: [
Match.objectLike({ Kinesis: { RoleArn: 'arn:aws:iam::123456789012:role/ForTest' } }),
],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyName: 'MyRolePolicy64AB00A5',
Roles: ['ForTest'],
});
});

0 comments on commit 1480213

Please sign in to comment.