Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type mergeIterator struct {
batches batchStream

// Buffers to merge in.
batchesBuf batchStream
batchesBuf batchStream
nextBatchBuf [1]promchunk.Batch

currErr error
}
Expand All @@ -32,7 +33,7 @@ func newMergeIterator(cs []chunk.Chunk) *mergeIterator {
its: its,
h: make(iteratorHeap, 0, len(its)),
batches: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
batchesBuf: make(batchStream, 0, len(its)*2*promchunk.BatchSize),
batchesBuf: make(batchStream, len(its)*2*promchunk.BatchSize),
}

for _, iter := range c.its {
Expand Down Expand Up @@ -109,12 +110,9 @@ func (c *mergeIterator) nextBatchEndTime() int64 {
func (c *mergeIterator) buildNextBatch(size int) bool {
// All we need to do is get enough batches that our first batch's last entry
// is before all iterators next entry.
var nextBatchArr [1]promchunk.Batch
nextBatch := nextBatchArr[:]

for len(c.h) > 0 && (len(c.batches) == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
nextBatch[0] = c.h[0].Batch()
c.batchesBuf = mergeStreams(c.batches, nextBatch, c.batchesBuf, size)
c.nextBatchBuf[0] = c.h[0].Batch()
c.batchesBuf = mergeStreams(c.batches, c.nextBatchBuf[:], c.batchesBuf, size)
copy(c.batches[:len(c.batchesBuf)], c.batchesBuf)
c.batches = c.batches[:len(c.batchesBuf)]

Expand Down
111 changes: 55 additions & 56 deletions pkg/querier/batch/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,7 @@ func (bs batchStream) print() {
fmt.Println("]")
}

// append, reset, hasNext, next, atTime etc are all inlined in go1.11.
// append isn't a pointer receiver as that was causing bs to escape to the heap.
func (bs batchStream) append(t int64, v float64, size int) batchStream {
l := len(bs)
if l == 0 || bs[l-1].Index == size {
bs = append(bs, promchunk.Batch{})
l++
}
b := &bs[l-1]
b.Timestamps[b.Index] = t
b.Values[b.Index] = v
b.Index++
b.Length++
return bs
}
// reset, hasNext, next, atTime etc are all inlined in go1.11.

func (bs *batchStream) reset() {
for i := range *bs {
Expand Down Expand Up @@ -61,61 +47,74 @@ func (bs *batchStream) at() (int64, float64) {
return b.Timestamps[b.Index], b.Values[b.Index]
}

// mergeBatches assumes the contents of batches are overlapping and unstorted.
// Merge them together into a sorted, non-overlapping stream in result.
// Caller must guarantee result is big enough. Return value will always be a
// slice pointing to the same underly array as result, allowing mergeBatches
// to call itself recursively.
func mergeBatches(batches batchStream, result batchStream, size int) batchStream {
switch len(batches) {
case 0:
return nil
case 1:
copy(result[:1], batches)
return result[:1]
case 2:
return mergeStreams(batches[0:1], batches[1:2], result, size)
default:
n := len(batches) / 2
left := mergeBatches(batches[n:], result[n:], size)
right := mergeBatches(batches[:n], result[:n], size)

batches = mergeStreams(left, right, batches, size)
result = result[:len(batches)]
copy(result, batches)

return result[:len(batches)]
func mergeStreams(left, right batchStream, result batchStream, size int) batchStream {
// Reset the Index and Length of existing batches.
for i := range result {
result[i].Index = 0
result[i].Length = 0
}
resultLen := 1 // Number of batches in the final result.
b := &result[0]

// This function adds a new batch to the result
// if the current batch being appended is full.
checkForFullBatch := func() {
if b.Index == size {
// The batch reached it intended size.
// Add another batch the the result
// and use it for further appending.

// The Index is the place at which new sample
// has to be appended, hence it tells the length.
b.Length = b.Index
resultLen++
if resultLen > len(result) {
// It is possible that result can grow longer
// then the one provided.
result = append(result, promchunk.Batch{})
}
b = &result[resultLen-1]
}
}
}

func mergeStreams(left, right batchStream, result batchStream, size int) batchStream {
result.reset()
result = result[:0]
for left.hasNext() && right.hasNext() {
checkForFullBatch()
t1, t2 := left.atTime(), right.atTime()
if t1 < t2 {
t, v := left.at()
result = result.append(t, v, size)
b.Timestamps[b.Index], b.Values[b.Index] = left.at()
left.next()
} else if t1 > t2 {
t, v := right.at()
result = result.append(t, v, size)
b.Timestamps[b.Index], b.Values[b.Index] = right.at()
right.next()
} else {
t, v := left.at()
result = result.append(t, v, size)
b.Timestamps[b.Index], b.Values[b.Index] = left.at()
left.next()
right.next()
}
b.Index++
}
for ; left.hasNext(); left.next() {
t, v := left.at()
result = result.append(t, v, size)
}
for ; right.hasNext(); right.next() {
t, v := right.at()
result = result.append(t, v, size)

// This function adds all the samples from the provided
// batchStream into the result in the same order.
addToResult := func(bs batchStream) {
for ; bs.hasNext(); bs.next() {
checkForFullBatch()
b.Timestamps[b.Index], b.Values[b.Index] = bs.at()
b.Index++
b.Length++
}
}

addToResult(left)
addToResult(right)

// The Index is the place at which new sample
// has to be appended, hence it tells the length.
b.Length = b.Index

// The provided 'result' slice might be bigger
// than the actual result, hence return the subslice.
result = result[:resultLen]
result.reset()
return result
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/querier/batch/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,41 @@ import (

func TestStream(t *testing.T) {
for i, tc := range []struct {
input []promchunk.Batch
output batchStream
input1, input2 []promchunk.Batch
output batchStream
}{
{
input: []promchunk.Batch{mkBatch(0)},
input1: []promchunk.Batch{mkBatch(0)},
output: []promchunk.Batch{mkBatch(0)},
},

{
input: []promchunk.Batch{mkBatch(0), mkBatch(0)},
input1: []promchunk.Batch{mkBatch(0)},
input2: []promchunk.Batch{mkBatch(0)},
output: []promchunk.Batch{mkBatch(0)},
},

{
input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)},
input1: []promchunk.Batch{mkBatch(0)},
input2: []promchunk.Batch{mkBatch(promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)},
},

{
input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(0), mkBatch(promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)},
},

{
input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)},
input1: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize)},
input2: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)},
},

{
input: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize / 2), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(2 * promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize)},
input1: []promchunk.Batch{mkBatch(promchunk.BatchSize / 2), mkBatch(3 * promchunk.BatchSize / 2), mkBatch(5 * promchunk.BatchSize / 2)},
input2: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)},
output: []promchunk.Batch{mkBatch(0), mkBatch(promchunk.BatchSize), mkBatch(2 * promchunk.BatchSize), mkBatch(3 * promchunk.BatchSize)},
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
result := make(batchStream, len(tc.input))
result = mergeBatches(tc.input, result, promchunk.BatchSize)
result := make(batchStream, len(tc.input1)+len(tc.input2))
result = mergeStreams(tc.input1, tc.input2, result, promchunk.BatchSize)
require.Equal(t, batchStream(tc.output), result)
})
}
Expand Down