forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3.go
132 lines (111 loc) · 6.03 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package input
import (
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/dafanshu/benthos/v3/lib/util/aws/session"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeS3] = TypeSpec{
constructor: fromSimpleConstructor(NewAmazonS3),
Status: docs.StatusDeprecated,
Summary: `
Downloads objects within an Amazon S3 bucket, optionally filtered by a prefix.
If an SQS queue has been configured then only object keys read from the queue
will be downloaded.`,
Description: `
## Alternatives
This input is being replaced with the shiny new ` + "[`aws_s3` input](/docs/components/inputs/aws_s3)" + `, which has improved features, consider trying it out instead.
If an SQS queue is not specified the entire list of objects found when this
input starts will be consumed. Note that the prefix configuration is only used
when downloading objects without SQS configured.
If your bucket is configured to send events directly to an SQS queue then you
need to set the ` + "`sqs_body_path`" + ` field to a
[dot path](/docs/configuration/field_paths) where the object key is found in the payload.
However, it is also common practice to send bucket events to an SNS topic which
sends enveloped events to SQS, in which case you must also set the
` + "`sqs_envelope_path`" + ` field to where the payload can be found.
When using SQS events it's also possible to extract target bucket names from the
events by specifying a path in the field ` + "`sqs_bucket_path`" + `. For each
SQS event, if that path exists and contains a string it will used as the bucket
of the download instead of the ` + "`bucket`" + ` field.
Here is a guide for setting up an SQS queue that receives events for new S3
bucket objects:
https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html
WARNING: When using SQS please make sure you have sensible values for
` + "`sqs_max_messages`" + ` and also the visibility timeout of the queue
itself.
When Benthos consumes an S3 item as a result of receiving an SQS message the
message is not deleted until the S3 item has been sent onwards. This ensures
at-least-once crash resiliency, but also means that if the S3 item takes longer
to process than the visibility timeout of your queue then the same items might
be processed multiple times.
### Credentials
By default Benthos will use a shared credentials file when connecting to AWS
services. It's also possible to set them explicitly at the component level,
allowing you to transfer data across accounts. You can find out more
[in this document](/docs/guides/cloud/aws).
### Metadata
This input adds the following metadata fields to each message:
` + "```" + `
- s3_key
- s3_bucket
- s3_last_modified_unix*
- s3_last_modified (RFC3339)*
- s3_content_type*
- s3_content_encoding*
- All user defined metadata*
* Only added when NOT using download manager
` + "```" + `
You can access these metadata fields using
[function interpolation](/docs/configuration/interpolation#metadata).`,
FieldSpecs: append(
append(docs.FieldSpecs{
docs.FieldCommon("bucket", "The bucket to consume from. If `sqs_bucket_path` is set this field is still required as a fallback."),
docs.FieldCommon("prefix", "An optional path prefix, if set only objects with the prefix are consumed. This field is ignored when SQS is used."),
docs.FieldCommon("sqs_url", "An optional SQS URL to connect to. When specified this queue will control which objects are downloaded from the target bucket."),
docs.FieldCommon("sqs_body_path", "A [dot path](/docs/configuration/field_paths) whereby object keys are found in SQS messages, this field is only required when an `sqs_url` is specified."),
docs.FieldCommon("sqs_bucket_path", "An optional [dot path](/docs/configuration/field_paths) whereby the bucket of an object can be found in consumed SQS messages."),
docs.FieldCommon("sqs_envelope_path", "An optional [dot path](/docs/configuration/field_paths) of enveloped payloads to extract from SQS messages. This is required when pushing events from S3 to SNS to SQS."),
docs.FieldAdvanced("sqs_max_messages", "The maximum number of SQS messages to consume from each request."),
docs.FieldAdvanced("sqs_endpoint", "A custom endpoint to use when connecting to SQS."),
}, session.FieldSpecs()...),
docs.FieldAdvanced("retries", "The maximum number of times to attempt an object download."),
docs.FieldAdvanced("force_path_style_urls", "Forces the client API to use path style URLs, which helps when connecting to custom endpoints."),
docs.FieldAdvanced("delete_objects", "Whether to delete downloaded objects from the bucket."),
docs.FieldAdvanced("download_manager", "Controls if and how to use the download manager API. This can help speed up file downloads, but results in file metadata not being copied.").WithChildren(
docs.FieldCommon("enabled", "Whether to use to download manager API."),
),
docs.FieldAdvanced("timeout", "The period of time to wait before abandoning a request and trying again."),
docs.FieldDeprecated("max_batch_count"),
),
Categories: []Category{
CategoryServices,
CategoryAWS,
},
}
}
//------------------------------------------------------------------------------
// NewAmazonS3 creates a new AWS S3 input type.
func NewAmazonS3(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
// TODO: V4 Remove this.
if conf.S3.MaxBatchCount > 1 {
log.Warnf("Field '%v.max_batch_count' is deprecated, use the batching methods outlined in https://benthos.dev/docs/configuration/batching instead.\n", conf.Type)
}
r, err := reader.NewAmazonS3(conf.S3, log, stats)
if err != nil {
return nil, err
}
return NewAsyncReader(
TypeS3,
true,
reader.NewAsyncBundleUnacks(
reader.NewAsyncPreserver(r),
),
log, stats,
)
}
//------------------------------------------------------------------------------