-
Notifications
You must be signed in to change notification settings - Fork 119
/
manager.go
275 lines (230 loc) · 8.38 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package client
import (
"crypto/sha256"
"fmt"
"strings"
"sync"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/alloy/logging/level"
"github.com/grafana/alloy/internal/component/common/loki/client/internal"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/component/common/loki/limit"
"github.com/grafana/alloy/internal/component/common/loki/wal"
)
// WriterEventsNotifier implements a notifier that's received by the Manager, to which wal.Watcher can subscribe for
// writer events.
type WriterEventsNotifier interface {
SubscribeCleanup(subscriber wal.CleanupEventSubscriber)
SubscribeWrite(subscriber wal.WriteEventSubscriber)
}
var (
// NilNotifier is a no-op WriterEventsNotifier.
NilNotifier = nilNotifier{}
)
// nilNotifier implements WriterEventsNotifier with no-ops callbacks.
type nilNotifier struct{}
func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {}
func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {}
type StoppableWatcher interface {
Stop()
Drain()
}
type StoppableClient interface {
Stop()
StopNow()
}
// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client.
type watcherClientPair struct {
watcher StoppableWatcher
client StoppableClient
}
// Stop will proceed to stop, in order, the possibly-nil watcher and the client.
func (p watcherClientPair) Stop(drain bool) {
// if the config has WAL disabled, there will be no watcher per client config
if p.watcher != nil {
// if drain enabled, drain the WAL
if drain {
p.watcher.Drain()
}
p.watcher.Stop()
}
// subsequently stop the client
p.client.Stop()
}
// Manager manages remote write client instantiation, and connects the related
// components to orchestrate the flow of loki.Entry from the scrape targets, to
// the remote write clients themselves.
//
// Right now it just supports instantiating the WAL writer side of the
// future-to-be WAL enabled client. In follow-up work, tracked in
// https://github.com/grafana/loki/issues/8197, this Manager will be
// responsible for instantiating all client types: Logger, Multi and WAL.
type Manager struct {
name string
clients []Client
pairs []watcherClientPair
entries chan loki.Entry
once sync.Once
wg sync.WaitGroup
}
// NewManager creates a new Manager
func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error) {
var fake struct{}
walWatcherMetrics := wal.NewWatcherMetrics(reg)
walMarkerMetrics := internal.NewMarkerMetrics(reg)
queueClientMetrics := NewQueueClientMetrics(reg)
if len(clientCfgs) == 0 {
return nil, fmt.Errorf("at least one client config must be provided")
}
clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
pairs := make([]watcherClientPair, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
clientName := GetClientName(cfg)
if _, ok := clientsCheck[clientName]; ok {
return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name)
}
clientsCheck[clientName] = fake
if walCfg.Enabled {
// add some context information for the logger the watcher uses
wlog := log.With(logger, "client", clientName)
markerFileHandler, err := internal.NewMarkerFileHandler(logger, walCfg.Dir)
if err != nil {
return nil, err
}
markerHandler := internal.NewMarkerHandler(markerFileHandler, walCfg.MaxSegmentAge, logger, walMarkerMetrics.WithCurriedId(clientName))
queue, err := NewQueue(metrics, queueClientMetrics.CurryWithId(clientName), cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger, markerHandler)
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
// series cache whenever a segment is deleted.
notifier.SubscribeCleanup(queue)
watcher := wal.NewWatcher(walCfg.Dir, clientName, walWatcherMetrics, queue, wlog, walCfg.WatchConfig, markerHandler)
// subscribe watcher to wal write events
notifier.SubscribeWrite(watcher)
level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName)
watcher.Start()
pairs = append(pairs, watcherClientPair{
watcher: watcher,
client: queue,
})
} else {
client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger)
if err != nil {
return nil, fmt.Errorf("error starting client: %w", err)
}
clients = append(clients, client)
pairs = append(pairs, watcherClientPair{
client: client,
})
}
}
manager := &Manager{
clients: clients,
pairs: pairs,
entries: make(chan loki.Entry),
}
if walCfg.Enabled {
manager.name = buildManagerName("wal", clientCfgs...)
manager.startWithConsume()
} else {
manager.name = buildManagerName("multi", clientCfgs...)
manager.startWithForward()
}
return manager, nil
}
// startWithConsume starts the main manager routine, which reads and discards entries from the exposed channel.
// This is necessary since to treat the WAL-enabled manager the same way as the WAL-disabled one, the processing pipeline
// send entries both to the WAL writer, and the channel exposed by the manager. In the case the WAL is enabled, these entries
// are not used since they are read from the WAL, so we need a routine to just read the entries received through the channel
// and discarding them, to not block the sending side.
func (m *Manager) startWithConsume() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
// discard read entries
//nolint:revive
for range m.entries {
}
}()
}
// startWithForward starts the main manager routine, which reads entries from the exposed channel, and forwards them
// doing a fan-out across all inner clients.
func (m *Manager) startWithForward() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
for e := range m.entries {
for _, c := range m.clients {
c.Chan() <- e
}
}
}()
}
func (m *Manager) StopNow() {
for _, pair := range m.pairs {
pair.client.StopNow()
}
}
func (m *Manager) Name() string {
return m.name
}
func (m *Manager) Chan() chan<- loki.Entry {
return m.entries
}
// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled.
func (m *Manager) Stop() {
m.StopWithDrain(false)
}
// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
// the Watchers will attempt to drain the WAL completely.
// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then
// the clients are shut down accordingly.
func (m *Manager) StopWithDrain(drain bool) {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
m.wg.Wait()
var stopWG sync.WaitGroup
// Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is
// the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each
// client config, each (watcher, client) pair is stopped concurrently.
for _, pair := range m.pairs {
stopWG.Add(1)
go func(pair watcherClientPair) {
defer stopWG.Done()
pair.Stop(drain)
}(pair)
}
// wait for all pairs to be stopped
stopWG.Wait()
}
// GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
// or a hash of the config as whole, this allows us to detect repeated configs.
func GetClientName(cfg Config) string {
if cfg.Name != "" {
return cfg.Name
}
return asSha256(cfg)
}
func asSha256(o interface{}) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%v", o)))
temp := fmt.Sprintf("%x", h.Sum(nil))
return temp[:6]
}
// buildManagerName assembles the Manager's name from all configs, and a given prefix.
func buildManagerName(prefix string, cfgs ...Config) string {
var sb strings.Builder
sb.WriteString(prefix)
sb.WriteString(":")
for i, c := range cfgs {
sb.WriteString(GetClientName(c))
if i != len(cfgs)-1 {
sb.WriteString(",")
}
}
return sb.String()
}