diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index e2fe4167575..1def7f9935e 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -16,7 +16,8 @@ type mergeIterator struct { batches batchStream // Buffers to merge in. - batchesBuf batchStream + batchesBuf batchStream + nextBatchBuf [1]promchunk.Batch currErr error } @@ -32,7 +33,7 @@ func newMergeIterator(cs []chunk.Chunk) *mergeIterator { its: its, h: make(iteratorHeap, 0, len(its)), batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize), - batchesBuf: make(batchStream, 0, len(its)*2*promchunk.BatchSize), + batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize), } for _, iter := range c.its { @@ -109,12 +110,9 @@ func (c *mergeIterator) nextBatchEndTime() int64 { func (c *mergeIterator) buildNextBatch(size int) bool { // All we need to do is get enough batches that our first batch's last entry // is before all iterators next entry. - var nextBatchArr [1]promchunk.Batch - nextBatch := nextBatchArr[:] - for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) { - nextBatch[0] = c.h[0].Batch() - c.batchesBuf = mergeStreams(c.batches, nextBatch, c.batchesBuf, size) + c.nextBatchBuf[0] = c.h[0].Batch() + c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size) copy(c.batches[:len(c.batchesBuf)], c.batchesBuf) c.batches = c.batches[:len(c.batchesBuf)] diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 3a1946aefb5..0cd371b4cb4 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -19,21 +19,7 @@ func (bs batchStream) print() { fmt.Println("]") } -// append, reset, hasNext, next, atTime etc are all inlined in go1.11. -// append isn't a pointer receiver as that was causing bs to escape to the heap. -func (bs batchStream) append(t int64, v float64, size int) batchStream { - l := len(bs) - if l == 0 || bs[l-1].Index == size { - bs = append(bs, promchunk.Batch{}) - l++ - } - b := &bs[l-1] - b.Timestamps[b.Index] = t - b.Values[b.Index] = v - b.Index++ - b.Length++ - return bs -} +// reset, hasNext, next, atTime etc are all inlined in go1.11. func (bs *batchStream) reset() { for i := range *bs { @@ -61,61 +47,74 @@ func (bs *batchStream) at() (int64, float64) { return b.Timestamps[b.Index], b.Values[b.Index] } -// mergeBatches assumes the contents of batches are overlapping and unstorted. -// Merge them together into a sorted, non-overlapping stream in result. -// Caller must guarantee result is big enough. Return value will always be a -// slice pointing to the same underly array as result, allowing mergeBatches -// to call itself recursively. -func mergeBatches(batches batchStream, result batchStream, size int) batchStream { - switch len(batches) { - case 0: - return nil - case 1: - copy(result[:1], batches) - return result[:1] - case 2: - return mergeStreams(batches[0:1], batches[1:2], result, size) - default: - n := len(batches) / 2 - left := mergeBatches(batches[n:], result[n:], size) - right := mergeBatches(batches[:n], result[:n], size) - - batches = mergeStreams(left, right, batches, size) - result = result[:len(batches)] - copy(result, batches) - - return result[:len(batches)] +func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { + // Reset the Index and Length of existing batches. + for i := range result { + result[i].Index = 0 + result[i].Length = 0 + } + resultLen := 1 // Number of batches in the final result. + b := &result[0] + + // This function adds a new batch to the result + // if the current batch being appended is full. + checkForFullBatch := func() { + if b.Index == size { + // The batch reached it intended size. + // Add another batch the the result + // and use it for further appending. + + // The Index is the place at which new sample + // has to be appended, hence it tells the length. + b.Length = b.Index + resultLen++ + if resultLen > len(result) { + // It is possible that result can grow longer + // then the one provided. + result = append(result, promchunk.Batch{}) + } + b = &result[resultLen-1] + } } -} -func mergeStreams(left, right batchStream, result batchStream, size int) batchStream { - result.reset() - result = result[:0] for left.hasNext() && right.hasNext() { + checkForFullBatch() t1, t2 := left.atTime(), right.atTime() if t1 < t2 { - t, v := left.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = left.at() left.next() } else if t1 > t2 { - t, v := right.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = right.at() right.next() } else { - t, v := left.at() - result = result.append(t, v, size) + b.Timestamps[b.Index], b.Values[b.Index] = left.at() left.next() right.next() } + b.Index++ } - for ; left.hasNext(); left.next() { - t, v := left.at() - result = result.append(t, v, size) - } - for ; right.hasNext(); right.next() { - t, v := right.at() - result = result.append(t, v, size) + + // This function adds all the samples from the provided + // batchStream into the result in the same order. + addToResult := func(bs batchStream) { + for ; bs.hasNext(); bs.next() { + checkForFullBatch() + b.Timestamps[b.Index], b.Values[b.Index] = bs.at() + b.Index++ + b.Length++ + } } + + addToResult(left) + addToResult(right) + + // The Index is the place at which new sample + // has to be appended, hence it tells the length. + b.Length = b.Index + + // The provided 'result' slice might be bigger + // than the actual result, hence return the subslice. + result = result[:resultLen] result.reset() return result } diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index ce379e2e160..d890927331b 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -10,42 +10,41 @@ import ( func TestStream(t *testing.T) { for i, tc := range []struct { - input []promchunk.Batch - output batchStream + input1, input2 []promchunk.Batch + output batchStream }{ { - input: []promchunk.Batch{mkBatch(0)}, + input1: []promchunk.Batch{mkBatch(0)}, output: []promchunk.Batch{mkBatch(0)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(0)}, + input1: []promchunk.Batch{mkBatch(0)}, + input2: []promchunk.Batch{mkBatch(0)}, output: []promchunk.Batch{mkBatch(0)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(0)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize)}, output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(0), mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, - }, - - { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)}, + input2: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, + output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, }, { - input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)}, - output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)}, + input1: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(5 * promchunk.BatchSize / 2)}, + input2: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, + output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)}, }, } { t.Run(strconv.Itoa(i), func(t *testing.T) { - result := make(batchStream, len(tc.input)) - result = mergeBatches(tc.input, result, promchunk.BatchSize) + result := make(batchStream, len(tc.input1)+len(tc.input2)) + result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize) require.Equal(t, batchStream(tc.output), result) }) }