Skip to content

Commit

Permalink
Add support for ARNs in kinesis components
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 16, 2024
1 parent d80074c commit bade847
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 103 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ All notable changes to this project will be documented in this file.
- A debug endpoint `/debug/pprof/allocs` has been added for profiling allocations.
- New `cockroachdb_changefeed` input.
- The `open_telemetry_collector` tracer now supports sampling.
- The `aws_kinesis` input and output now support specifying ARNs as the stream target.

### Fixed

Expand Down
216 changes: 133 additions & 83 deletions internal/impl/aws/input_kinesis.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/impl/aws/input_kinesis_record_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ type awsKinesisRecordBatcher struct {
ackedWG sync.WaitGroup
}

func (k *kinesisReader) newAWSKinesisRecordBatcher(streamID, shardID, sequence string) (*awsKinesisRecordBatcher, error) {
func (k *kinesisReader) newAWSKinesisRecordBatcher(info streamInfo, shardID, sequence string) (*awsKinesisRecordBatcher, error) {
batchPolicy, err := k.batcher.NewBatcher(k.mgr)
if err != nil {
return nil, fmt.Errorf("failed to initialize batch policy for shard consumer: %w", err)
}

return &awsKinesisRecordBatcher{
streamID: streamID,
streamID: info.id,
shardID: shardID,
batchPolicy: batchPolicy,
checkpointer: checkpoint.NewCapped[string](int64(k.conf.CheckpointLimit)),
Expand Down
67 changes: 67 additions & 0 deletions internal/impl/aws/input_kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package aws

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStreamIDParser(t *testing.T) {
tests := []struct {
name string
id string
remaining string
shard string
errContains string
}{
{
name: "no shards stream name",
id: "foo-bar",
remaining: "foo-bar",
},
{
name: "no shards stream arn",
id: "arn:aws:kinesis:region:account-id:stream/stream-name",
remaining: "arn:aws:kinesis:region:account-id:stream/stream-name",
},
{
name: "sharded stream name",
id: "foo-bar:baz",
remaining: "foo-bar",
shard: "baz",
},
{
name: "sharded stream arn",
id: "arn:aws:kinesis:region:account-id:stream/stream-name:baz",
remaining: "arn:aws:kinesis:region:account-id:stream/stream-name",
shard: "baz",
},
{
name: "multiple shards stream name",
id: "foo-bar:baz:buz",
errContains: "only one shard should be specified",
},
{
name: "multiple shards stream arn",
id: "arn:aws:kinesis:region:account-id:stream/stream-name:baz:buz",
errContains: "only one shard should be specified",
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
rem, shard, err := parseStreamID(test.id)
if test.errContains != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
} else {
require.NoError(t, err)
assert.Equal(t, test.remaining, rem)
assert.Equal(t, test.shard, shard)
}
})
}

}
32 changes: 18 additions & 14 deletions internal/impl/aws/output_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package aws
import (
"context"
"fmt"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -69,7 +70,8 @@ Both the `+"`partition_key`"+`(required) and `+"`hash_key`"+` (optional) fields
By default Benthos will use a shared credentials file when connecting to AWS services. It's also possible to set them explicitly at the component level, allowing you to transfer data across accounts. You can find out more [in this document](/docs/guides/cloud/aws).`)).
Fields(
service.NewStringField(koFieldStream).
Description("The stream to publish messages to."),
Description("The stream to publish messages to. Streams can either be specified by their name or full ARN.").
Examples("foo", "arn:aws:kinesis:*:111122223333:stream/my-stream"),
service.NewInterpolatedStringField(koFieldPartitionKey).
Description("A required key for partitioning messages."),
service.NewInterpolatedStringField(koFieldHashKey).
Expand Down Expand Up @@ -115,9 +117,10 @@ type kinesisAPI interface {
}

type kinesisWriter struct {
conf koConfig
kinesis kinesisAPI
log *service.Logger
conf koConfig
streamARN string
kinesis kinesisAPI
log *service.Logger
}

func newKinesisWriter(conf koConfig, mgr *service.Resources) (*kinesisWriter, error) {
Expand Down Expand Up @@ -178,20 +181,21 @@ func (a *kinesisWriter) Connect(ctx context.Context) error {
}

k := kinesis.NewFromConfig(a.conf.aconf)
if _, err := k.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: &a.conf.Stream,
}); err != nil {
return err
}

waiter := kinesis.NewStreamExistsWaiter(k)

if err := waiter.Wait(ctx, &kinesis.DescribeStreamInput{
StreamName: &a.conf.Stream,
}, time.Minute); err != nil {
in := &kinesis.DescribeStreamInput{}
if strings.HasPrefix(a.conf.Stream, "arn:") {
in.StreamARN = &a.conf.Stream
} else {
in.StreamName = &a.conf.Stream
}
out, err := waiter.WaitForOutput(ctx, in, time.Minute)
if err != nil {
return err
}

a.streamARN = *out.StreamDescription.StreamARN
a.kinesis = k
a.log.Infof("Sending messages to Kinesis stream: %v\n", a.conf.Stream)

Expand All @@ -211,8 +215,8 @@ func (a *kinesisWriter) WriteBatch(ctx context.Context, batch service.MessageBat
}

input := &kinesis.PutRecordsInput{
Records: records,
StreamName: &a.conf.Stream,
Records: records,
StreamARN: &a.streamARN,
}

// trim input record length to max kinesis batch size
Expand Down
10 changes: 9 additions & 1 deletion website/docs/components/inputs/aws_kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,19 @@ Use the `batching` fields to configure an optional [batching policy](/docs/confi

### `streams`

One or more Kinesis data streams to consume from. Shards of a stream are automatically balanced across consumers by coordinating through the provided DynamoDB table. Multiple comma separated streams can be listed in a single element. Shards are automatically distributed across consumers of a stream by coordinating through the provided DynamoDB table. Alternatively, it's possible to specify an explicit shard to consume from with a colon after the stream name, e.g. `foo:0` would consume the shard `0` of the stream `foo`.
One or more Kinesis data streams to consume from. Streams can either be specified by their name or full ARN. Shards of a stream are automatically balanced across consumers by coordinating through the provided DynamoDB table. Multiple comma separated streams can be listed in a single element. Shards are automatically distributed across consumers of a stream by coordinating through the provided DynamoDB table. Alternatively, it's possible to specify an explicit shard to consume from with a colon after the stream name, e.g. `foo:0` would consume the shard `0` of the stream `foo`.


Type: `array`

```yml
# Examples
streams:
- foo
- arn:aws:kinesis:*:111122223333:stream/my-stream
```

### `dynamodb`

Determines the table used for storing and accessing the latest consumed sequence for shards, and for coordinating balanced consumers of streams.
Expand Down
14 changes: 11 additions & 3 deletions website/docs/components/outputs/aws_kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Introduced in version 3.36.0.
output:
label: ""
aws_kinesis:
stream: "" # No default (required)
stream: foo # No default (required)
partition_key: "" # No default (required)
max_in_flight: 64
batching:
Expand All @@ -49,7 +49,7 @@ output:
output:
label: ""
aws_kinesis:
stream: "" # No default (required)
stream: foo # No default (required)
partition_key: "" # No default (required)
hash_key: "" # No default (optional)
max_in_flight: 64
Expand Down Expand Up @@ -99,11 +99,19 @@ Batches can be formed at both the input and output level. You can find out more

### `stream`

The stream to publish messages to.
The stream to publish messages to. Streams can either be specified by their name or full ARN.


Type: `string`

```yml
# Examples
stream: foo
stream: arn:aws:kinesis:*:111122223333:stream/my-stream
```

### `partition_key`

A required key for partitioning messages.
Expand Down

0 comments on commit bade847

Please sign in to comment.