-
Notifications
You must be signed in to change notification settings - Fork 482
/
config_queue.go
70 lines (57 loc) · 1.99 KB
/
config_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package otelcol
import (
"fmt"
"github.com/grafana/agent/pkg/river"
otelexporterhelper "go.opentelemetry.io/collector/exporter/exporterhelper"
)
// QueueArguments holds shared settings for components which can queue
// requests.
type QueueArguments struct {
Enabled bool `river:"enabled,attr,optional"`
NumConsumers int `river:"num_consumers,attr,optional"`
QueueSize int `river:"queue_size,attr,optional"`
// TODO(rfratto): queues can send to persistent storage through an extension.
}
var _ river.Unmarshaler = (*QueueArguments)(nil)
// DefaultQueueArguments holds default settings for QueueArguments.
var DefaultQueueArguments = QueueArguments{
Enabled: true,
NumConsumers: 10,
// Copied from [upstream]:
//
// 5000 queue elements at 100 requests/sec gives about 50 seconds of survival
// of destination outage. This is a pretty decent value for production. Users
// should calculate this from the perspective of how many seconds to buffer
// in case of a backend outage and multiply that by the number of requests
// per second.
//
// [upstream]: https://github.com/open-telemetry/opentelemetry-collector/blob/ff73e49f74d8fd8c57a849aa3ff23ae1940cc16a/exporter/exporterhelper/queued_retry.go#L62-L65
QueueSize: 5000,
}
// UnmarshalRiver implements river.Unmarshaler.
func (args *QueueArguments) UnmarshalRiver(f func(interface{}) error) error {
*args = DefaultQueueArguments
type arguments QueueArguments
return f((*arguments)(args))
}
// Convert converts args into the upstream type.
func (args *QueueArguments) Convert() *otelexporterhelper.QueueSettings {
if args == nil {
return nil
}
return &otelexporterhelper.QueueSettings{
Enabled: args.Enabled,
NumConsumers: args.NumConsumers,
QueueSize: args.QueueSize,
}
}
// Validate returns an error if args is invalid.
func (args *QueueArguments) Validate() error {
if args == nil || !args.Enabled {
return nil
}
if args.QueueSize <= 0 {
return fmt.Errorf("queue_size must be greater than zero")
}
return nil
}