-
Notifications
You must be signed in to change notification settings - Fork 486
/
walstats.go
242 lines (200 loc) · 6.25 KB
/
walstats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package agentctl
import (
"math"
"time"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
)
// WALStats stores statistics on the whole WAL.
type WALStats struct {
// From holds the first timestamp for the oldest sample found within the WAL.
From time.Time
// To holds the last timestamp for the newest sample found within the WAL.
To time.Time
// CheckpointNumber is the segment number of the most recently created
// checkpoint.
CheckpointNumber int
// FirstSegment is the segment number of the first (oldest) non-checkpoint
// segment file found within the WAL folder.
FirstSegment int
// FirstSegment is the segment number of the last (newest) non-checkpoint
// segment file found within the WAL folder.
LastSegment int
// InvalidRefs is the number of samples with a ref ID to which there is no
// series defined.
InvalidRefs int
// HashCollisions is the total number of times there has been a hash
// collision. A hash collision is any instance in which a hash of labels
// is defined by two ref IDs.
//
// For the Grafana Agent, a hash collision has no negative side effects
// on data sent to the remote_write endpoint but may have a noticeable inpact
// on memory while the collision exists.
HashCollisions int
// Targets holds stats on specific scrape targets.
Targets []WALTargetStats
}
// Series returns the number of series across all targets.
func (s WALStats) Series() int {
var series int
for _, t := range s.Targets {
series += t.Series
}
return series
}
// Samples returns the number of Samples across all targets.
func (s WALStats) Samples() int {
var samples int
for _, t := range s.Targets {
samples += t.Samples
}
return samples
}
// WALTargetStats aggregates statistics on scrape targets across the entirety
// of the WAL and its checkpoints.
type WALTargetStats struct {
// Job corresponds to the "job" label on the scraped target.
Job string
// Instance corresponds to the "instance" label on the scraped target.
Instance string
// Series is the total number of series for the scraped target. It is
// equivalent to the total cardinality.
Series int
// Samples is the total number of samples for the scraped target.
Samples int
}
// CalculateStats calculates the statistics of the WAL for the given directory.
// walDir must be a folder containing segment files and checkpoint directories.
func CalculateStats(walDir string) (WALStats, error) {
w, err := wal.Open(nil, walDir)
if err != nil {
return WALStats{}, err
}
defer w.Close()
return newWALStatsCalculator(w).Calculate()
}
type walStatsCalculator struct {
w *wal.WAL
fromTime int64
toTime int64
invalidRefs int
stats []*WALTargetStats
statsLookup map[chunks.HeadSeriesRef]*WALTargetStats
// hash -> # ref IDs with that hash
hashInstances map[uint64]int
}
func newWALStatsCalculator(w *wal.WAL) *walStatsCalculator {
return &walStatsCalculator{
w: w,
fromTime: math.MaxInt64,
statsLookup: make(map[chunks.HeadSeriesRef]*WALTargetStats),
hashInstances: make(map[uint64]int),
}
}
func (c *walStatsCalculator) Calculate() (WALStats, error) {
var (
stats WALStats
err error
)
_, checkpointIdx, err := wal.LastCheckpoint(c.w.Dir())
if err != nil && err != record.ErrNotFound {
return stats, err
}
firstSegment, lastSegment, err := wal.Segments(c.w.Dir())
if err != nil {
return stats, err
}
stats.FirstSegment = firstSegment
stats.LastSegment = lastSegment
stats.CheckpointNumber = checkpointIdx
// Iterate over the WAL and collect stats. This must be done before the rest
// of the function as readWAL populates internal state used for calculating
// stats.
err = walIterate(c.w, c.readWAL)
if err != nil {
return stats, err
}
// Fill in the rest of the stats
stats.From = timestamp.Time(c.fromTime)
stats.To = timestamp.Time(c.toTime)
stats.InvalidRefs = c.invalidRefs
for _, hashCount := range c.hashInstances {
if hashCount > 1 {
stats.HashCollisions++
}
}
for _, tgt := range c.stats {
stats.Targets = append(stats.Targets, *tgt)
}
return stats, nil
}
func (c *walStatsCalculator) readWAL(r *wal.Reader) error {
var dec record.Decoder
for r.Next() {
rec := r.Record()
// We ignore other record types here; we only write records and samples
// but we don't want to return an error for an unexpected record type;
// doing so would prevent users from getting stats on a traditional
// Prometheus WAL, which would be nice to support.
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, nil)
if err != nil {
return err
}
for _, s := range series {
var (
jobLabel = s.Labels.Get("job")
instanceLabel = s.Labels.Get("instance")
)
// Find or create the WALTargetStats for this job/instance pair.
var stats *WALTargetStats
for _, wts := range c.stats {
if wts.Job == jobLabel && wts.Instance == instanceLabel {
stats = wts
break
}
}
if stats == nil {
stats = &WALTargetStats{Job: jobLabel, Instance: instanceLabel}
c.stats = append(c.stats, stats)
}
// Every time we get a new series, we want to increment the series
// count for the specific job/instance pair, store the ref ID so
// samples can mofidy the stats, and then store the hash of our
// labels to detect collisions (or flapping series).
stats.Series++
c.statsLookup[s.Ref] = stats
c.hashInstances[s.Labels.Hash()]++
}
case record.Samples:
samples, err := dec.Samples(rec, nil)
if err != nil {
return err
}
for _, s := range samples {
if s.T < c.fromTime {
c.fromTime = s.T
}
if s.T > c.toTime {
c.toTime = s.T
}
stats := c.statsLookup[s.Ref]
if stats == nil {
c.invalidRefs++
continue
}
stats.Samples++
}
}
}
return r.Err()
}
// BySeriesCount can sort a slice of target stats by the count of
// series. The slice is sorted in descending order.
type BySeriesCount []WALTargetStats
func (s BySeriesCount) Len() int { return len(s) }
func (s BySeriesCount) Less(i, j int) bool { return s[i].Series > s[j].Series }
func (s BySeriesCount) Swap(i, j int) { s[i], s[j] = s[j], s[i] }