-
Notifications
You must be signed in to change notification settings - Fork 455
/
active_series.go
161 lines (135 loc) · 4.8 KB
/
active_series.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// SPDX-License-Identifier: AGPL-3.0-only
package ingester
import (
"context"
"fmt"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/grafana/mimir/pkg/ingester/activeseries"
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/sharding"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
const activeSeriesMaxSizeBytes = 1 * 1024 * 1024
// ActiveSeries implements the ActiveSeries RPC. It returns a stream of active
// series that match the given matchers.
func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream client.Ingester_ActiveSeriesServer) (err error) {
defer func() { err = i.mapReadErrorToErrorWithStatus(err) }()
if err := i.checkAvailableForRead(); err != nil {
return err
}
if err := i.checkReadOverloaded(); err != nil {
return err
}
spanlog, ctx := spanlogger.NewWithLogger(stream.Context(), i.logger, "Ingester.ActiveSeries")
defer spanlog.Finish()
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
matchers, err := client.FromLabelMatchers(request.GetMatchers())
if err != nil {
return fmt.Errorf("error parsing label matchers: %w", err)
}
// Enforce read consistency before getting TSDB (covers the case the tenant's data has not been ingested
// in this ingester yet, but there's some to ingest in the backlog).
if err := i.enforceReadConsistency(ctx, userID); err != nil {
return err
}
db := i.getTSDB(userID)
if db == nil {
level.Debug(i.logger).Log("msg", "no TSDB for user", "userID", userID)
return nil
}
idx, err := db.Head().Index()
if err != nil {
return fmt.Errorf("error getting index: %w", err)
}
isNativeHistogram := request.GetType() == client.NATIVE_HISTOGRAM_SERIES
postings, err := getPostings(ctx, db, idx, matchers, isNativeHistogram)
if err != nil {
return fmt.Errorf("error listing active series: %w", err)
}
buf := labels.NewScratchBuilder(10)
resp := &client.ActiveSeriesResponse{}
currentSize := 0
for postings.Next() {
seriesRef, count := postings.AtBucketCount()
err = idx.Series(seriesRef, &buf, nil)
if err != nil {
return fmt.Errorf("error getting series: %w", err)
}
m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(buf.Labels())}
mSize := m.Size()
if isNativeHistogram {
mSize += 8 // 8 bytes for the bucket count.
}
if currentSize+mSize > activeSeriesMaxSizeBytes {
if err := client.SendActiveSeriesResponse(stream, resp); err != nil {
return fmt.Errorf("error sending response: %w", err)
}
resp = &client.ActiveSeriesResponse{}
currentSize = 0
}
resp.Metric = append(resp.Metric, m)
if isNativeHistogram {
resp.BucketCount = append(resp.BucketCount, uint64(count))
}
currentSize += mSize
}
if err := postings.Err(); err != nil {
return fmt.Errorf("error iterating over series: %w", err)
}
if len(resp.Metric) > 0 {
if err := client.SendActiveSeriesResponse(stream, resp); err != nil {
return fmt.Errorf("error sending response: %w", err)
}
}
return nil
}
func getPostings(ctx context.Context, db *userTSDB, idx tsdb.IndexReader, matchers []*labels.Matcher, isNativeHistogram bool) (activeseries.BucketCountPostings, error) {
if db.activeSeries == nil {
return nil, fmt.Errorf("active series tracker is not initialized")
}
shard, matchers, err := sharding.RemoveShardFromMatchers(matchers)
if err != nil {
return nil, fmt.Errorf("error removing shard matcher: %w", err)
}
postings, err := tsdb.PostingsForMatchers(ctx, idx, matchers...)
if err != nil {
return nil, fmt.Errorf("error getting postings: %w", err)
}
if shard != nil {
postings = idx.ShardedPostings(postings, shard.ShardIndex, shard.ShardCount)
}
if isNativeHistogram {
return activeseries.NewNativeHistogramPostings(db.activeSeries, postings), nil
}
return &ZeroBucketCountPostings{*activeseries.NewPostings(db.activeSeries, postings)}, nil
}
// listActiveSeries is used for testing purposes, builds the whole array of active series in memory.
func listActiveSeries(ctx context.Context, db *userTSDB, matchers []*labels.Matcher) (series *Series, err error) {
idx, err := db.Head().Index()
if err != nil {
return nil, fmt.Errorf("error getting index: %w", err)
}
postings, err := getPostings(ctx, db, idx, matchers, false)
if err != nil {
return nil, err
}
return NewSeries(postings, idx), nil
}
type ZeroBucketCountPostings struct {
activeseries.Postings
}
func (z *ZeroBucketCountPostings) AtBucketCount() (storage.SeriesRef, int) {
return z.At(), 0
}
// Type check.
var _ index.Postings = &ZeroBucketCountPostings{}
var _ activeseries.BucketCountPostings = &ZeroBucketCountPostings{}