This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 73
/
reporter.go
320 lines (296 loc) · 8.91 KB
/
reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package usagestats
import (
"bytes"
"context"
"errors"
"flag"
"io"
"math"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/grafana/phlare/pkg/util/build"
)
const (
// File name for the cluster seed file.
ClusterSeedFileName = "pyroscope_cluster_seed.json"
// attemptNumber how many times we will try to read a corrupted cluster seed before deleting it.
attemptNumber = 4
// seedKey is the key for the cluster seed to use with the kv store.
seedKey = "usagestats_token"
)
var (
reportCheckInterval = time.Minute
reportInterval = 4 * time.Hour
stabilityCheckInterval = 5 * time.Second
stabilityMinimunRequired = 6
)
type Config struct {
Enabled bool `yaml:"reporting_enabled"`
Leader bool `yaml:"-"`
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "usage-stats.enabled", true, "Enable anonymous usage reporting.")
}
type Reporter struct {
logger log.Logger
bucket objstore.Bucket
reg prometheus.Registerer
services.Service
conf Config
kvConfig kv.Config
cluster ClusterSeed
lastReport time.Time
}
func NewReporter(config Config, kvConfig kv.Config, objectClient objstore.Bucket, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) {
if !config.Enabled {
return nil, nil
}
r := &Reporter{
logger: logger,
bucket: objectClient,
conf: config,
kvConfig: kvConfig,
reg: reg,
}
r.Service = services.NewBasicService(nil, r.running, nil)
return r, nil
}
func (rep *Reporter) initLeader(ctx context.Context) ClusterSeed {
kvClient, err := kv.NewClient(rep.kvConfig, JSONCodec, nil, rep.logger)
if err != nil {
level.Info(rep.logger).Log("msg", "failed to create kv client", "err", err)
return ClusterSeed{}
}
// Try to become leader via the kv client
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return seed.Clone(), true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
// we only want to retry if the error is not an object not found error
return !rep.bucket.IsObjNotFoundErr(err)
})
if err != nil {
if rep.bucket.IsObjNotFoundErr(err) {
// we are the leader and we need to save the file.
if err := rep.writeSeedFile(ctx, seed); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
backoff.Wait()
continue
}
return seed
}
backoff.Wait()
continue
}
return remoteSeed
}
return ClusterSeed{}
}
// ensureStableKey ensures that the cluster seed is stable for at least 30seconds.
// This is required when using gossiping kv client like memberlist which will never have the same seed
// but will converge eventually.
func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) *ClusterSeed {
var (
previous *ClusterSeed
stableCount int
)
for {
time.Sleep(stabilityCheckInterval)
value, err := kvClient.Get(ctx, seedKey)
if err != nil {
level.Debug(logger).Log("msg", "failed to get cluster seed key for stability check", "err", err)
continue
}
if seed, ok := value.(*ClusterSeed); ok && seed != nil {
if previous == nil {
previous = seed
continue
}
if previous.UID != seed.UID {
previous = seed
stableCount = 0
continue
}
stableCount++
if stableCount > stabilityMinimunRequired {
return seed
}
}
}
}
func (rep *Reporter) init(ctx context.Context) {
if rep.conf.Leader {
rep.cluster = rep.initLeader(ctx)
return
}
// follower only wait for the cluster seed to be set.
// it will try forever to fetch the cluster seed.
seed, _ := rep.fetchSeed(ctx, nil)
rep.cluster = seed
}
// fetchSeed fetches the cluster seed from the object store and try until it succeeds.
// continueFn allow you to decide if we should continue retrying. Nil means always retry
func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (ClusterSeed, error) {
var (
backoff = backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: time.Minute,
MaxRetries: 0,
})
readingErr = 0
)
for backoff.Ongoing() {
seed, err := rep.readSeedFile(ctx)
if err != nil {
if !rep.bucket.IsObjNotFoundErr(err) {
readingErr++
}
level.Debug(rep.logger).Log("msg", "failed to read cluster seed file", "err", err)
if readingErr > attemptNumber {
if err := rep.bucket.Delete(ctx, ClusterSeedFileName); err != nil {
level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err)
}
readingErr = 0
}
if continueFn == nil || continueFn(err) {
backoff.Wait()
continue
}
return ClusterSeed{}, err
}
return seed, nil
}
return ClusterSeed{}, backoff.Err()
}
// readSeedFile reads the cluster seed file from the object store.
func (rep *Reporter) readSeedFile(ctx context.Context) (ClusterSeed, error) {
reader, err := rep.bucket.Get(ctx, ClusterSeedFileName)
if err != nil {
return ClusterSeed{}, err
}
defer func() {
if err := reader.Close(); err != nil {
level.Error(rep.logger).Log("msg", "failed to close reader", "err", err)
}
}()
data, err := io.ReadAll(reader)
if err != nil {
return ClusterSeed{}, err
}
seed, err := JSONCodec.Decode(data)
if err != nil {
return ClusterSeed{}, err
}
return *(seed.(*ClusterSeed)), nil
}
// writeSeedFile writes the cluster seed to the object store.
func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error {
data, err := JSONCodec.Encode(seed)
if err != nil {
return err
}
return rep.bucket.Upload(ctx, ClusterSeedFileName, bytes.NewReader(data))
}
// running inits the reporter seed and start sending report for every interval
func (rep *Reporter) running(ctx context.Context) error {
rep.init(ctx)
if rep.cluster.UID == "" {
<-ctx.Done()
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
return err
}
return nil
}
// check every minute if we should report.
ticker := time.NewTicker(reportCheckInterval)
defer ticker.Stop()
// find when to send the next report.
next := nextReport(reportInterval, rep.cluster.CreatedAt, time.Now())
if rep.lastReport.IsZero() {
// if we never reported assumed it was the last interval.
rep.lastReport = next.Add(-reportInterval)
}
for {
select {
case <-ticker.C:
now := time.Now()
if !next.Equal(now) && now.Sub(rep.lastReport) < reportInterval {
continue
}
level.Debug(rep.logger).Log("msg", "reporting cluster stats", "date", time.Now())
if err := rep.reportUsage(ctx, next); err != nil {
level.Info(rep.logger).Log("msg", "failed to report usage", "err", err)
continue
}
rep.lastReport = next
next = next.Add(reportInterval)
case <-ctx.Done():
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
return err
}
return nil
}
}
}
// reportUsage reports the usage to grafana.com.
func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error {
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: 30 * time.Second,
MaxRetries: 5,
})
var errs multierror.MultiError
for backoff.Ongoing() {
if err := sendReport(ctx, rep.cluster, interval); err != nil {
level.Info(rep.logger).Log("msg", "failed to send usage report", "retries", backoff.NumRetries(), "err", err)
errs.Add(err)
backoff.Wait()
continue
}
level.Debug(rep.logger).Log("msg", "usage report sent with success")
return nil
}
return errs.Err()
}
// nextReport compute the next report time based on the interval.
// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time.
func nextReport(interval time.Duration, createdAt, now time.Time) time.Time {
// createdAt * (x * interval ) >= now
return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval)
}