-
Notifications
You must be signed in to change notification settings - Fork 25
/
receiver.go
173 lines (149 loc) · 4.85 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Leader-only-receiver wraps any metrics receiver and starts it only when agent is a leader.
package leaderonlyreceiver
import (
"context"
"errors"
"fmt"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/fx"
etcd "github.com/fluxninja/aperture/v2/pkg/etcd/client"
otelconsts "github.com/fluxninja/aperture/v2/pkg/otelcollector/consts"
)
const (
type_ component.Type = "aperture_leader_only"
stability = component.StabilityLevelDevelopment
lateStartTimeout = 15 * time.Second
)
// Module provides receiver factory.
func Module() fx.Option {
return fx.Provide(
fx.Annotate(
NewFactory,
fx.ResultTags(otelconsts.ReceiverFactoriesFxTag),
),
)
}
// NewFactory creates a new aperture_leader_only receiver factory using given leader election.
func NewFactory(etcdClient *etcd.Client) receiver.Factory {
return receiver.NewFactory(
type_,
func() component.Config {
return &Config{
etcdClient: etcdClient,
}
},
receiver.WithMetrics(createMetricsReceiver, stability))
}
// Config is a config for leader-only-receiver.
type Config struct {
// Config for the wrapped receiver
etcdClient *etcd.Client
Inner map[string]any `mapstructure:"config"`
// Type of the wrapped receiver
InnerType component.Type `mapstructure:"type"`
}
// Validate implements component.ConfigValidator.
func (c *Config) Validate() error {
if c.InnerType == "" {
return errors.New("type is required")
}
return nil
}
func createMetricsReceiver(
_ context.Context,
createSettings receiver.CreateSettings,
rConf component.Config,
consumer consumer.Metrics,
) (receiver.Metrics, error) {
// At this point we do not have access to Factories, so we cannot do anything with the config
return &leaderOnlyReceiver{
config: *rConf.(*Config),
consumer: consumer,
origCreateSettings: createSettings,
}, nil
}
type leaderOnlyReceiver struct {
consumer consumer.Metrics
factory receiver.Factory
host component.Host
inner receiver.Metrics // nil if inner receiver not started
origCreateSettings receiver.CreateSettings
config Config
}
// leaderOnlyReceiver implements LeaderWatcher interface.
var _ etcd.ElectionWatcher = (*leaderOnlyReceiver)(nil)
// Start implements component.Component.
func (r *leaderOnlyReceiver) Start(startCtx context.Context, host component.Host) error {
factory := host.GetFactory(component.KindReceiver, r.config.InnerType)
if factory == nil {
return fmt.Errorf("factory for %s receiver not found", r.config.InnerType)
}
r.factory = factory.(receiver.Factory)
r.host = host
if r.config.etcdClient.IsLeader() {
// If we already know we're the leader, we can skip creating background
// goroutine and start inner receiver immediately.
if err := r.startInnerReceiver(startCtx); err != nil {
return fmt.Errorf("failed to start %s receiver: %w", r.config.InnerType, err)
}
return nil
}
r.config.etcdClient.AddElectionWatcher(r)
return nil
}
// Shutdown implements component.Component.
func (r *leaderOnlyReceiver) Shutdown(ctx context.Context) error {
r.config.etcdClient.RemoveElectionWatcher(r)
if r.inner != nil {
return r.inner.Shutdown(ctx)
}
return nil
}
// OnLeaderStart starts the inner receiver.
func (r *leaderOnlyReceiver) OnLeaderStart() {
startCtx, cancel := context.WithTimeout(context.Background(), lateStartTimeout)
defer cancel()
if err := r.startInnerReceiver(startCtx); err != nil {
r.host.ReportFatalError(fmt.Errorf(
"failed to start %s receiver after becoming a leader: %w",
r.config.InnerType, err,
))
}
}
// OnLeaderStop stops the inner receiver.
func (r *leaderOnlyReceiver) OnLeaderStop() {
if r.inner != nil {
err := r.inner.Shutdown(context.Background())
if err != nil {
r.host.ReportFatalError(fmt.Errorf(
"failed to stop %s receiver after becoming a follower: %w",
r.config.InnerType, err,
))
}
}
}
func (r *leaderOnlyReceiver) startInnerReceiver(ctx context.Context) error {
cfg := r.factory.CreateDefaultConfig()
if err := component.UnmarshalConfig(confmap.NewFromStringMap(r.config.Inner), cfg); err != nil {
return fmt.Errorf("error reading configuration: %w", err)
}
if err := component.ValidateConfig(cfg); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
// Setting ID for inner receiver to: "<innerType>/aperture_leader_only/<name>"
set := r.origCreateSettings
set.ID = component.NewIDWithName(r.config.InnerType, r.origCreateSettings.ID.String())
inner, err := r.factory.CreateMetricsReceiver(ctx, set, cfg, r.consumer)
if err != nil {
return fmt.Errorf("error creating receiver: %w", err)
}
if err := inner.Start(ctx, r.host); err != nil {
return err
}
r.inner = inner
return nil
}