Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iot): add Action to put record to Kinesis Data stream #18321

Merged
merged 14 commits into from
Jan 19, 2022
Merged
23 changes: 23 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/README.md
Expand Up @@ -26,6 +26,7 @@ Currently supported are:
- Put logs to CloudWatch Logs
- Capture CloudWatch metrics
- Change state for a CloudWatch alarm
- Put records to Kinesis Data stream
- Put records to Kinesis Data Firehose stream
- Send messages to SQS queues

Expand Down Expand Up @@ -183,6 +184,28 @@ const topicRule = new iot.TopicRule(this, 'TopicRule', {
});
```

## Put records to Kinesis Data stream

The code snippet below creates an AWS IoT Rule that put records to Kinesis Data
stream when it is triggered.

```ts
import * as iot from '@aws-cdk/aws-iot';
import * as actions from '@aws-cdk/aws-iot-actions';
import * as kinesis from '@aws-cdk/aws-kinesis';

const stream = new kinesis.Stream(this, 'MyStream');

const topicRule = new iot.TopicRule(this, 'TopicRule', {
sql: iot.IotSql.fromStringAsVer20160323("SELECT * FROM 'device/+/data'"),
actions: [
new actions.KinesisPutRecordAction(stream, {
partitionKey: '${timestamp()}', // optional property
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention what's the default.

}),
],
});
```

## Put records to Kinesis Data Firehose stream

The code snippet below creates an AWS IoT Rule that put records to Put records
Expand Down
2 changes: 1 addition & 1 deletion packages/@aws-cdk/aws-iot-actions/lib/index.ts
Expand Up @@ -3,7 +3,7 @@ export * from './cloudwatch-put-metric-action';
export * from './cloudwatch-set-alarm-state-action';
export * from './common-action-props';
export * from './firehose-stream-action';
export * from './kinesis-put-record-action';
export * from './lambda-function-action';
export * from './s3-put-object-action';
export * from './sqs-queue-action';

54 changes: 54 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/lib/kinesis-put-record-action.ts
@@ -0,0 +1,54 @@
import * as iam from '@aws-cdk/aws-iam';
import * as iot from '@aws-cdk/aws-iot';
import * as kinesis from '@aws-cdk/aws-kinesis';
import { CommonActionProps } from './common-action-props';
import { singletonActionRole } from './private/role';

/**
* Configuration properties of an action for the Kinesis Data stream.
*/
export interface KinesisPutRecordActionProps extends CommonActionProps {
/**
* The partition key used to determine to which shard the data is written.
* The partition key is usually composed of an expression (for example, ${topic()} or ${timestamp()}).
* For more information @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestParameters
yamatatsu marked this conversation as resolved.
Show resolved Hide resolved
*
* @default - None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what does "None" mean here?

Copy link
Contributor Author

@yamatatsu yamatatsu Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I tried to elaborate, but I could not😞.

Proberbly, IoT Core rule is fill default value (e.g. MQTT payload JSON string or hash of it) because PartitionKey is required in this API Reference.
But there is no description about it in IoT Core rule documentation and CloudFormation Documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try deploying an IoT app without this property filled?

Maybe this is simply a mistake in the CloudFormation resource, and this should actually be required?

Copy link
Contributor Author

@yamatatsu yamatatsu Jan 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try deploying an IoT app without this property filled?

Yes I tried and the deploying secceeded.

Again, I deployed, and I tested following for confirmation to be spreaded records to shards:

  • Prepare
    • Deploy Kinesis Stream with 10 shards
  • Test1
    • Prepare: Deploy IoT Core Kinesis Rule action with ${timestamp()} partitionKey
    • Send 10 same payloads
  • Test2
    • Prepare: Deploy IoT Core Kinesis Rule action with no partitionKey
    • Send 10 same payloads
  • Test3
    • Prepare: Deploy IoT Core Kinesis Rule action with no partitionKey
    • Send 10 payloads different each time

Results:

  • Test1: Records was spreaded to each shards
  • Test2: All records was put in same record
  • Test3: All records was put in same record

The IoT Core may be filling in a static partitionKey if it is not specified.
But this is just a guess...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I think that's a good guess. Let's put that in the docs - it's certainly better than just "None", which gives you no clue what that actually means 🙂.

Thanks for the detailed testing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

OK. If we beleave this guess, I think it is better that fill default value. Because in Kinesis Stream, it is pretty not helpful to not spread records by shards.

I found that ${newuuid} is suggested in Web Console.
image

This is the documentation of ${newuuid}.

I try to add default value ${newuuid}. But if you have any opinions, please feel free to tell me😉 .

Copy link
Contributor

@skinny85 skinny85 Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't hate it, but I'm a little concerned that the L2 won't have all of the capabilities of the underlying service (in this case, leaving the partition key as empty).

Does the Console allow you to leave it as empty, or do you have to provide a value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the L2 won't have all of the capabilities of the underlying service

It is so reasonable I think! So I will never mind to revert my commit that add default value.

Does the Console allow you to leave it as empty, or do you have to provide a value?

The Console requre PartitionKey, but CloudFormation does not. For keeping compatibility for CloudFormation, let's not add a default value, instead add a detailed description in JSDoc and Readme?

Copy link
Contributor

@skinny85 skinny85 Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this.

Make this field required, like it is in the console. But, allow specifying an empty string there (''). If an empty string is passed, then we will actually pass undefined to the underlying CloudFormation resource for that field. Of course, we will need to cover that in our documentation.

This way, we will be close to the Console experience, while covering the entire service capabilities.

What do you think about this @yamatatsu?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good I think! 👍🏻
OK, I will do it!

*/
readonly partitionKey?: string;
}

/**
* The action to put the record from an MQTT message to the Kinesis Data stream.
*/
export class KinesisPutRecordAction implements iot.IAction {
yamatatsu marked this conversation as resolved.
Show resolved Hide resolved
private readonly partitionKey?: string;
private readonly role?: iam.IRole;

/**
* @param stream The Kinesis Data stream to which to put records.
* @param props Optional properties to not use default
*/
constructor(private readonly stream: kinesis.IStream, props: KinesisPutRecordActionProps = {}) {
this.partitionKey = props.partitionKey;
this.role = props.role;
}

bind(rule: iot.ITopicRule): iot.ActionConfig {
const role = this.role ?? singletonActionRole(rule);
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['kinesis:PutRecord'],
skinny85 marked this conversation as resolved.
Show resolved Hide resolved
resources: [this.stream.streamArn],
}));

return {
configuration: {
kinesis: {
streamName: this.stream.streamName,
partitionKey: this.partitionKey,
roleArn: role.roleArn,
},
},
};
}
}
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-iot-actions/package.json
Expand Up @@ -83,8 +83,10 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-lambda-event-sources": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand All @@ -97,8 +99,10 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-iot": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kinesisfirehose": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-lambda-event-sources": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
"@aws-cdk/aws-s3": "0.0.0",
"@aws-cdk/aws-sqs": "0.0.0",
Expand Down
@@ -0,0 +1,268 @@
{
"Resources": {
"Logs6819BB44": {
"Type": "AWS::Logs::LogGroup",
"Properties": {
"RetentionInDays": 731
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"TopicRuleTopicRuleActionRole246C4F77": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "iot.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
}
},
"TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"Logs6819BB44",
"Arn"
]
}
},
{
"Action": "logs:DescribeLogStreams",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"Logs6819BB44",
"Arn"
]
}
},
{
"Action": "kinesis:PutRecord",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "TopicRuleTopicRuleActionRoleDefaultPolicy99ADD687",
"Roles": [
{
"Ref": "TopicRuleTopicRuleActionRole246C4F77"
}
]
}
},
"TopicRule40A4EA44": {
"Type": "AWS::IoT::TopicRule",
"Properties": {
"TopicRulePayload": {
"Actions": [
{
"Kinesis": {
"PartitionKey": "${timestamp()}",
"RoleArn": {
"Fn::GetAtt": [
"TopicRuleTopicRuleActionRole246C4F77",
"Arn"
]
},
"StreamName": {
"Ref": "MyStream5C050E93"
}
}
}
],
"AwsIotSqlVersion": "2016-03-23",
"ErrorAction": {
"CloudwatchLogs": {
"LogGroupName": {
"Ref": "Logs6819BB44"
},
"RoleArn": {
"Fn::GetAtt": [
"TopicRuleTopicRuleActionRole246C4F77",
"Arn"
]
}
}
},
"Sql": "SELECT * FROM 'device/+/data'"
}
}
},
"MyStream5C050E93": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 3,
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions",
{
"Ref": "AWS::NoValue"
},
{
"EncryptionType": "KMS",
"KeyId": "alias/aws/kinesis"
}
]
}
}
},
"MyLambdaServiceRole4539ECB6": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"MyLambdaServiceRoleDefaultPolicy5BBC6F68": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:SubscribeToShard",
"kinesis:DescribeStream",
"kinesis:ListStreams"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn"
]
}
},
{
"Action": "kinesis:DescribeStream",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "MyLambdaServiceRoleDefaultPolicy5BBC6F68",
"Roles": [
{
"Ref": "MyLambdaServiceRole4539ECB6"
}
]
}
},
"MyLambdaCCE802FB": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "\n exports.handler = (event) => {\n event.Records.forEach(rec => {\n console.log('eventID:', rec.eventID)\n })\n }\n "
},
"Role": {
"Fn::GetAtt": [
"MyLambdaServiceRole4539ECB6",
"Arn"
]
},
"Handler": "index.handler",
"Runtime": "nodejs14.x"
},
"DependsOn": [
"MyLambdaServiceRoleDefaultPolicy5BBC6F68",
"MyLambdaServiceRole4539ECB6"
]
},
"MyLambdaKinesisEventSourceteststackMyStreamED37A4A443E4E7EF": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": {
"Ref": "MyLambdaCCE802FB"
},
"BatchSize": 100,
"EventSourceArn": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn"
]
},
"StartingPosition": "TRIM_HORIZON"
}
}
},
"Conditions": {
"AwsCdkKinesisEncryptedStreamsUnsupportedRegions": {
"Fn::Or": [
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-north-1"
]
},
{
"Fn::Equals": [
{
"Ref": "AWS::Region"
},
"cn-northwest-1"
]
}
]
}
}
}