Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-use memory for chunks in ingester QueryStream #3177

Merged
merged 5 commits into from Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] When a tenant accesses the Alertmanager UI or its API, if we have valid `-alertmanager.configs.fallback` we'll use that to start the manager and avoid failing the request. #3073
* [ENHANCEMENT] Add `DELETE api/v1/rules/{namespace}` to the Ruler. It allows all the rule groups of a namespace to be deleted. #3120
* [ENHANCEMENT] Experimental Delete Series: Retry processing of Delete requests during failures. #2926
* [ENHANCEMENT] Improve performance of QueryStream() in ingesters. #3177
* [ENHANCEMENT] Modules included in "All" target are now visible in output of `-modules` CLI flag. #3155
* [BUGFIX] Ruler: when loading rules from "local" storage, check for directory after resolving symlink. #3137
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/ingester.go
Expand Up @@ -723,6 +723,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
}

numSeries, numChunks := 0, 0
reuseWireChunks := [queryStreamBatchSize][]client.Chunk{}
batch := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
// We'd really like to have series in label order, not FP order, so we
// can iteratively merge them with entries coming from the chunk store. But
Expand All @@ -741,10 +742,12 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
}

numSeries++
wireChunks, err := toWireChunks(chunks, nil)
reusePos := len(batch)
wireChunks, err := toWireChunks(chunks, reuseWireChunks[reusePos])
if err != nil {
return err
}
reuseWireChunks[reusePos] = wireChunks

numChunks += len(wireChunks)
batch = append(batch, client.TimeSeriesChunk{
Expand Down
75 changes: 62 additions & 13 deletions pkg/ingester/ingester_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -843,19 +844,9 @@ func BenchmarkIngesterPushErrors(b *testing.B) {
benchmarkIngesterPush(b, limits, true)
}

func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
cfg := defaultIngesterTestConfig()
clientCfg := defaultClientTestConfig()

const (
series = 100
samples = 100
)

// Construct a set of realistic-looking samples, all with slightly different label sets
var allLabels []labels.Labels
var allSamples []client.Sample
for j := 0; j < series; j++ {
// Construct a set of realistic-looking samples, all with slightly different label sets
func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []client.Sample) {
for j := 0; j < nSeries; j++ {
labels := chunk.BenchmarkLabels.Copy()
for i := range labels {
if labels[i].Name == "cpu" {
Expand All @@ -865,6 +856,19 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
allLabels = append(allLabels, labels)
allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)})
}
return
}

func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) {
cfg := defaultIngesterTestConfig()
clientCfg := defaultClientTestConfig()

const (
series = 100
samples = 100
)

allLabels, allSamples := benchmarkData(series)
ctx := user.InjectOrgID(context.Background(), "1")

encodings := []struct {
Expand Down Expand Up @@ -897,3 +901,48 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
}

}

func BenchmarkIngester_QueryStream(b *testing.B) {
cfg := defaultIngesterTestConfig()
clientCfg := defaultClientTestConfig()
limits := defaultLimitsTestConfig()
_, ing := newTestStore(b, cfg, clientCfg, limits, nil)
ctx := user.InjectOrgID(context.Background(), "1")

const (
series = 2000
samples = 1000
)

allLabels, allSamples := benchmarkData(series)

// Bump the timestamp and set a random value on each of our test samples each time round the loop
for j := 0; j < samples; j++ {
for i := range allSamples {
allSamples[i].TimestampMs = int64(j + 1)
allSamples[i].Value = rand.Float64()
}
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, nil, client.API))
require.NoError(b, err)
}

req := &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: samples + 1,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "container_cpu_usage_seconds_total",
}},
}

mockStream := &mockQueryStreamServer{ctx: ctx}

b.ResetTimer()

for ix := 0; ix < b.N; ix++ {
err := ing.QueryStream(req, mockStream)
require.NoError(b, err)
}
}