forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kinesis.go
75 lines (66 loc) · 2.89 KB
/
kinesis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package input
import (
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/message/batch"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/dafanshu/benthos/v3/lib/util/aws/session"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeKinesis] = TypeSpec{
constructor: fromSimpleConstructor(NewKinesis),
Status: docs.StatusDeprecated,
Summary: `
Receive messages from a Kinesis stream.`,
Description: `
## Alternatives
This input is being replaced with the shiny new ` + "[`aws_kinesis` input](/docs/components/inputs/aws_kinesis)" + `, which has improved features, consider trying it out instead.
It's possible to use DynamoDB for persisting shard iterators by setting the
table name. Offsets will then be tracked per ` + "`client_id`" + ` per
` + "`shard_id`" + `. When using this mode you should create a table with
` + "`namespace`" + ` as the primary key and ` + "`shard_id`" + ` as a sort key.
Use the ` + "`batching`" + ` fields to configure an optional
[batching policy](/docs/configuration/batching#batch-policy). Any other batching
mechanism will stall with this input due its sequential transaction model.`,
FieldSpecs: append(
append(docs.FieldSpecs{
docs.FieldCommon("stream", "The Kinesis stream to consume from."),
docs.FieldCommon("shard", "The shard to consume from."),
docs.FieldCommon("client_id", "The client identifier to assume."),
docs.FieldCommon("commit_period", "The rate at which offset commits should be sent."),
docs.FieldCommon("dynamodb_table", "A DynamoDB table to use for offset storage."),
docs.FieldCommon("start_from_oldest", "Whether to consume from the oldest message when an offset does not yet exist for the stream."),
}, session.FieldSpecs()...),
docs.FieldAdvanced("timeout", "The period of time to wait before abandoning a request and trying again."),
docs.FieldAdvanced("limit", "The maximum number of messages to consume from each request."),
batch.FieldSpec(),
),
Categories: []Category{
CategoryServices,
CategoryAWS,
},
}
}
//------------------------------------------------------------------------------
// NewKinesis creates a new AWS Kinesis input type.
func NewKinesis(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
k, err := reader.NewKinesis(conf.Kinesis, log, stats)
if err != nil {
return nil, err
}
var kb reader.Type = k
if !conf.Kinesis.Batching.IsNoop() {
if kb, err = reader.NewSyncBatcher(conf.Kinesis.Batching, k, mgr, log, stats); err != nil {
return nil, err
}
}
return NewReader(
TypeKinesis,
reader.NewPreserver(kb),
log, stats,
)
}
//------------------------------------------------------------------------------