forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sqs.go
71 lines (58 loc) · 2.45 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package input
import (
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/dafanshu/benthos/v3/lib/util/aws/session"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeSQS] = TypeSpec{
constructor: fromSimpleConstructor(NewAmazonSQS),
Status: docs.StatusDeprecated,
Summary: `
Receive messages from an Amazon SQS URL.`,
Description: `
## Alternatives
This input is being replaced with the shiny new ` + "[`aws_sqs` input](/docs/components/inputs/aws_sqs)" + `, which has improved features, consider trying it out instead.
### Credentials
By default Benthos will use a shared credentials file when connecting to AWS
services. It's also possible to set them explicitly at the component level,
allowing you to transfer data across accounts. You can find out more
[in this document](/docs/guides/cloud/aws).
### Metadata
This input adds the following metadata fields to each message:
` + "```text" + `
- sqs_message_id
- sqs_receipt_handle
- sqs_approximate_receive_count
- All message attributes
` + "```" + `
You can access these metadata fields using
[function interpolation](/docs/configuration/interpolation#metadata).`,
FieldSpecs: append(
append(docs.FieldSpecs{
docs.FieldCommon("url", "The SQS URL to consume from."),
docs.FieldAdvanced("delete_message", "Whether to delete the consumed message once it is acked. Disabling allows you to handle the deletion using a different mechanism."),
}, session.FieldSpecs()...),
docs.FieldAdvanced("timeout", "The period of time to wait before abandoning a request and trying again."),
docs.FieldAdvanced("max_number_of_messages", "The maximum number of messages to consume from each request."),
),
Categories: []Category{
CategoryServices,
CategoryAWS,
},
}
}
//------------------------------------------------------------------------------
// NewAmazonSQS creates a new AWS SQS input type.
func NewAmazonSQS(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
s, err := reader.NewAmazonSQS(conf.SQS, log, stats)
if err != nil {
return nil, err
}
return NewAsyncReader(TypeSQS, true, reader.NewAsyncBundleUnacks(s), log, stats)
}
//------------------------------------------------------------------------------