From 0a5b9dec421740b858ff96fdd5a02ae756ba0f4b Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 1 Jul 2019 18:21:38 +0530 Subject: [PATCH 1/7] Optimise mergeStream Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/merge.go | 2 +- pkg/querier/batch/stream.go | 83 ++++++++++++++++++++++++------------- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index e2fe4167575..0982a7b9ed3 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -32,7 +32,7 @@ func newMergeIterator(cs []chunk.Chunk) *mergeIterator { its: its, h: make(iteratorHeap, 0, len(its)), batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize), - batchesBuf: make(batchStream, 0, len(its)*2*promchunk.BatchSize), + batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize), } for _, iter := range c.its { diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 3a1946aefb5..dc933526a8f 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -19,22 +19,6 @@ func (bs batchStream) print() { fmt.Println("]") } -// append, reset, hasNext, next, atTime etc are all inlined in go1.11. -// append isn't a pointer receiver as that was causing bs to escape to the heap. -func (bs batchStream) append(t int64, v float64, size int) batchStream { - l := len(bs) - if l == 0 || bs[l-1].Index == size { - bs = append(bs, promchunk.Batch{}) - l++ - } - b := &bs[l-1] - b.Timestamps[b.Index] = t - b.Values[b.Index] = v - b.Index++ - b.Length++ - return bs -} - func (bs *batchStream) reset() { for i := range *bs { (*bs)[i].Index = 0 @@ -61,7 +45,7 @@ func (bs *batchStream) at() (int64, float64) { return b.Timestamps[b.Index], b.Values[b.Index] } -// mergeBatches assumes the contents of batches are overlapping and unstorted. +// mergeBatches assumes the contents of batches are overlapping and unsorted. // Merge them together into a sorted, non-overlapping stream in result. // Caller must guarantee result is big enough. Return value will always be a // slice pointing to the same underly array as result, allowing mergeBatches @@ -89,33 +73,74 @@ func mergeBatches(batches batchStream, result batchStream, size int) batchStream } func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { - result.reset() - result = result[:0] + if cap(result) >= len(left)+len(right) { + for i := range result { + result[i].Index = 0 + result[i].Length = 0 + } + for len(result) < len(left)+len(right) { + result = append(result, promchunk.Batch{}) + } + } else { + result = make([]promchunk.Batch, len(left)+len(right)) + } + l := 1 + b := &result[0] + for left.hasNext() && right.hasNext() { + if b.Index == size { + b.Length = b.Index + l++ + if l > len(result) { + result = append(result, promchunk.Batch{}) + } + b = &result[l-1] + } t1, t2 := left.atTime(), right.atTime() if t1 < t2 { - t, v := left.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = left.at() left.next() } else if t1 > t2 { - t, v := right.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = right.at() right.next() } else { - t, v := left.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = left.at() left.next() right.next() } + b.Index++ } + for ; left.hasNext(); left.next() { - t, v := left.at() - result = result.append(t, v, size) + if b.Index == size { + b.Length = b.Index + l++ + if l > len(result) { + result = append(result, promchunk.Batch{}) + } + b = &result[l-1] + } + b.Timestamps[b.Index], b.Values[b.Index] = left.at() + b.Index++ + b.Length++ } + for ; right.hasNext(); right.next() { - t, v := right.at() - result = result.append(t, v, size) + if b.Index == size { + b.Length = b.Index + l++ + if l > len(result) { + result = append(result, promchunk.Batch{}) + } + b = &result[l-1] + } + b.Timestamps[b.Index], b.Values[b.Index] = right.at() + b.Index++ + b.Length++ } + b.Length = b.Index + + result = result[:l] result.reset() return result } From c6eee9b0fc9abcd4188cf061f2aacc6edc737dee Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 1 Jul 2019 20:15:44 +0530 Subject: [PATCH 2/7] Fix review comments Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/stream.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index dc933526a8f..19b27ef84ca 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -19,6 +19,8 @@ func (bs batchStream) print() { fmt.Println("]") } +// reset, hasNext, next, atTime etc are all inlined in go1.11. + func (bs *batchStream) reset() { for i := range *bs { (*bs)[i].Index = 0 @@ -73,6 +75,7 @@ func mergeBatches(batches batchStream, result batchStream, size int) batchStream } func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { + // Ensure that 'result' has enough capacity of left and right added together. if cap(result) >= len(left)+len(right) { for i := range result { result[i].Index = 0 @@ -84,17 +87,17 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt } else { result = make([]promchunk.Batch, len(left)+len(right)) } - l := 1 + resultLen := 1 b := &result[0] for left.hasNext() && right.hasNext() { if b.Index == size { b.Length = b.Index - l++ - if l > len(result) { + resultLen++ + if resultLen > len(result) { result = append(result, promchunk.Batch{}) } - b = &result[l-1] + b = &result[resultLen-1] } t1, t2 := left.atTime(), right.atTime() if t1 < t2 { @@ -114,11 +117,11 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt for ; left.hasNext(); left.next() { if b.Index == size { b.Length = b.Index - l++ - if l > len(result) { + resultLen++ + if resultLen > len(result) { result = append(result, promchunk.Batch{}) } - b = &result[l-1] + b = &result[resultLen-1] } b.Timestamps[b.Index], b.Values[b.Index] = left.at() b.Index++ @@ -128,11 +131,11 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt for ; right.hasNext(); right.next() { if b.Index == size { b.Length = b.Index - l++ - if l > len(result) { + resultLen++ + if resultLen > len(result) { result = append(result, promchunk.Batch{}) } - b = &result[l-1] + b = &result[resultLen-1] } b.Timestamps[b.Index], b.Values[b.Index] = right.at() b.Index++ @@ -140,7 +143,7 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt } b.Length = b.Index - result = result[:l] + result = result[:resultLen] result.reset() return result } From 151148fe160ef2786e85adcebb334fb77b9b0d4c Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 3 Jul 2019 16:41:51 +0530 Subject: [PATCH 3/7] Refactor code Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/stream.go | 42 +++++++++++++++---------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 19b27ef84ca..b90e4f92a74 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -90,15 +90,22 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt resultLen := 1 b := &result[0] - for left.hasNext() && right.hasNext() { + checkForFullBatch := func() { if b.Index == size { b.Length = b.Index resultLen++ if resultLen > len(result) { + // It is possible that result can grow longer + // than left and right combined based on the + // 'size' variable. result = append(result, promchunk.Batch{}) } b = &result[resultLen-1] } + } + + for left.hasNext() && right.hasNext() { + checkForFullBatch() t1, t2 := left.atTime(), right.atTime() if t1 < t2 { b.Timestamps[b.Index], b.Values[b.Index] = left.at() @@ -114,33 +121,18 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt b.Index++ } - for ; left.hasNext(); left.next() { - if b.Index == size { - b.Length = b.Index - resultLen++ - if resultLen > len(result) { - result = append(result, promchunk.Batch{}) - } - b = &result[resultLen-1] + addToResult := func(bs batchStream) { + for ; bs.hasNext(); bs.next() { + checkForFullBatch() + b.Timestamps[b.Index], b.Values[b.Index] = bs.at() + b.Index++ + b.Length++ } - b.Timestamps[b.Index], b.Values[b.Index] = left.at() - b.Index++ - b.Length++ } - for ; right.hasNext(); right.next() { - if b.Index == size { - b.Length = b.Index - resultLen++ - if resultLen > len(result) { - result = append(result, promchunk.Batch{}) - } - b = &result[resultLen-1] - } - b.Timestamps[b.Index], b.Values[b.Index] = right.at() - b.Index++ - b.Length++ - } + addToResult(left) + addToResult(right) + b.Length = b.Index result = result[:resultLen] From 6484ce93e6724962e12f7d9c340aa7a5b30531e6 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 4 Jul 2019 18:13:05 +0530 Subject: [PATCH 4/7] Reuse nextBatchArr as nextBatchBuf After some changes at some other places, this code suddenly started taking a lot of allocs (upto 7x) and B/op (and also time) for query involving a lot of chunks. Reusing the buffer solves it. Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/merge.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 0982a7b9ed3..1def7f9935e 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -16,7 +16,8 @@ type mergeIterator struct { batches batchStream // Buffers to merge in. - batchesBuf batchStream + batchesBuf batchStream + nextBatchBuf [1]promchunk.Batch currErr error } @@ -109,12 +110,9 @@ func (c *mergeIterator) nextBatchEndTime() int64 { func (c *mergeIterator) buildNextBatch(size int) bool { // All we need to do is get enough batches that our first batch's last entry // is before all iterators next entry. - var nextBatchArr [1]promchunk.Batch - nextBatch := nextBatchArr[:] - for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { - nextBatch[0] = c.h[0].Batch() - c.batchesBuf = mergeStreams(c.batches, nextBatch, c.batchesBuf, size) + c.nextBatchBuf[0] = c.h[0].Batch() + c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size) copy(c.batches[:len(c.batchesBuf)], c.batchesBuf) c.batches = c.batches[:len(c.batchesBuf)] From 46d7e236962f06a0eb1c1a8fdf2b8d3df63f35f7 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 4 Jul 2019 22:23:57 +0530 Subject: [PATCH 5/7] Add comments to mergeStreams Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/stream.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index b90e4f92a74..9ee76f6b424 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -75,29 +75,29 @@ func mergeBatches(batches batchStream, result batchStream, size int) batchStream } func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { - // Ensure that 'result' has enough capacity of left and right added together. - if cap(result) >= len(left)+len(right) { - for i := range result { - result[i].Index = 0 - result[i].Length = 0 - } - for len(result) < len(left)+len(right) { - result = append(result, promchunk.Batch{}) - } - } else { - result = make([]promchunk.Batch, len(left)+len(right)) + // Reset the Index and Length of existing batches. + for i := range result { + result[i].Index = 0 + result[i].Length = 0 } - resultLen := 1 + resultLen := 1 // Number of batches in the final result. b := &result[0] + // This function adds a new batch to the result + // if the current batch being appended is full. checkForFullBatch := func() { if b.Index == size { + // The batch reached it intended size. + // Add another batch the the result + // and use it for further appending. + + // The Index is the place at which new sample + // has to be appended, hence it tells the length. b.Length = b.Index resultLen++ if resultLen > len(result) { // It is possible that result can grow longer - // than left and right combined based on the - // 'size' variable. + // then the one provided. result = append(result, promchunk.Batch{}) } b = &result[resultLen-1] @@ -121,6 +121,8 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt b.Index++ } + // This function adds all the samples from the provided + // batchStream into the result in the same order. addToResult := func(bs batchStream) { for ; bs.hasNext(); bs.next() { checkForFullBatch() @@ -133,8 +135,12 @@ func mergeStreams(left, right batchStream, result batchStream, size int) batchSt addToResult(left) addToResult(right) + // The Index is the place at which new sample + // has to be appended, hence it tells the length. b.Length = b.Index + // The provided 'result' slice might be bigger + // than the actual result, hence return the subslice. result = result[:resultLen] result.reset() return result From 879d92bcaf2d3a05808da14c8ee2ce7efee1a079 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 5 Jul 2019 16:34:17 +0530 Subject: [PATCH 6/7] Remove mergeBatch Signed-off-by: Ganesh Vernekar --- pkg/querier/batch/stream.go | 27 ------------------------- pkg/querier/batch/stream_test.go | 34 +++++++++++++------------------- 2 files changed, 14 insertions(+), 47 deletions(-) diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 9ee76f6b424..0cd371b4cb4 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -47,33 +47,6 @@ func (bs *batchStream) at() (int64, float64) { return b.Timestamps[b.Index], b.Values[b.Index] } -// mergeBatches assumes the contents of batches are overlapping and unsorted. -// Merge them together into a sorted, non-overlapping stream in result. -// Caller must guarantee result is big enough. Return value will always be a -// slice pointing to the same underly array as result, allowing mergeBatches -// to call itself recursively. -func mergeBatches(batches batchStream, result batchStream, size int) batchStream { - switch len(batches) { - case 0: - return nil - case 1: - copy(result[:1], batches) - return result[:1] - case 2: - return mergeStreams(batches[0:1], batches[1:2], result, size) - default: - n := len(batches) / 2 - left := mergeBatches(batches[n:], result[n:], size) - right := mergeBatches(batches[:n], result[:n], size) - - batches = mergeStreams(left, right, batches, size) - result = result[:len(batches)] - copy(result, batches) - - return result[:len(batches)] - } -} - func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { // Reset the Index and Length of existing batches. for i := range result { diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index ce379e2e160..00941c0e941 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -10,42 +10,36 @@ import ( func TestStream(t *testing.T) { for i, tc := range []struct { - input []promchunk.Batch - output batchStream + input1, input2 []promchunk.Batch + output batchStream }{ { - input: []promchunk.Batch{mkBatch(0)}, + input1: []promchunk.Batch{mkBatch(0)}, + input2: []promchunk.Batch{mkBatch(0)}, output: []promchunk.Batch{mkBatch(0)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(0)}, - output: []promchunk.Batch{mkBatch(0)}, - }, - - { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(0)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize)}, output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(0), mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - }, - - { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, + output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(5 * promchunk.BatchSize / 2)}, + input2: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, + output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - result := make(batchStream, len(tc.input)) - result = mergeBatches(tc.input, result, promchunk.BatchSize) + result := make(batchStream, len(tc.input1)+len(tc.input2)) + result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) require.Equal(t, batchStream(tc.output), result) }) } From b0711b0afa3e0fe0bc23b4bcdfe03dfb09da352c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 7 Aug 2019 12:02:57 +0100 Subject: [PATCH 7/7] Check merge against empty batch stream still works. Signed-off-by: Tom Wilkie --- pkg/querier/batch/stream_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index 00941c0e941..d890927331b 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -13,6 +13,11 @@ func TestStream(t *testing.T) { input1, input2 []promchunk.Batch output batchStream }{ + { + input1: []promchunk.Batch{mkBatch(0)}, + output: []promchunk.Batch{mkBatch(0)}, + }, + { input1: []promchunk.Batch{mkBatch(0)}, input2: []promchunk.Batch{mkBatch(0)},