forked from benthosdev/benthos
/
gcp_pubsub.go
75 lines (63 loc) · 2.97 KB
/
gcp_pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package output
import (
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/metadata"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/output/writer"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeGCPPubSub] = TypeSpec{
constructor: fromSimpleConstructor(NewGCPPubSub),
Summary: `
Sends messages to a GCP Cloud Pub/Sub topic. [Metadata](/docs/configuration/metadata) from messages are sent as attributes.`,
Description: `
For information on how to set up credentials check out [this guide](https://cloud.google.com/docs/authentication/production).
### Troubleshooting
If you're consistently seeing ` + "`Failed to send message to gcp_pubsub: context deadline exceeded`" + ` error logs without any further information it is possible that you are encountering https://github.com/Jeffail/benthos/issues/1042, which occurs when metadata values contain characters that are not valid utf-8. This can frequently occur when consuming from Kafka as the key metadata field may be populated with an arbitrary binary value, but this issue is not exclusive to Kafka.
If you are blocked by this issue then a work around is to delete either the specific problematic keys:
` + "```yaml" + `
pipeline:
processors:
- bloblang: |
meta kafka_key = deleted()
` + "```" + `
Or delete all keys with:
` + "```yaml" + `
pipeline:
processors:
- bloblang: meta = deleted()
` + "```" + ``,
Async: true,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("project", "The project ID of the topic to publish to."),
docs.FieldCommon("topic", "The topic to publish to.").IsInterpolated(),
docs.FieldCommon("max_in_flight", "The maximum number of messages to have in flight at a given time. Increase this to improve throughput."),
docs.FieldAdvanced("publish_timeout", "The maximum length of time to wait before abandoning a publish attempt for a message.", "10s", "5m", "60m"),
docs.FieldAdvanced("ordering_key", "The ordering key to use for publishing messages.").IsInterpolated(),
docs.FieldCommon("metadata", "Specify criteria for which metadata values are sent as attributes.").WithChildren(metadata.ExcludeFilterFields()...),
},
Categories: []Category{
CategoryServices,
CategoryGCP,
},
}
}
//------------------------------------------------------------------------------
// NewGCPPubSub creates a new GCPPubSub output type.
func NewGCPPubSub(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
a, err := writer.NewGCPPubSubV2(conf.GCPPubSub, mgr, log, stats)
if err != nil {
return nil, err
}
w, err := NewAsyncWriter(
TypeGCPPubSub, conf.GCPPubSub.MaxInFlight, a, log, stats,
)
if err != nil {
return nil, err
}
return OnlySinglePayloads(w), nil
}
//------------------------------------------------------------------------------