Skip to content

Commit

Permalink
Improve entry deduplication. (#2302)
Browse files Browse the repository at this point in the history
* Improve entry deduplication.

This PR removes mostcommon and sort insert function in the heap iterator. I discovered while working on #2293 that those are actually not helping since we're deduping those lines anyways. There were no tests checking if deduping was correctly working  so I did added those.

Bonus point this means deduping will run faster and the code is less complex. The only side effect is that the order of entries that are at the same timestamp, before the most common entry would appear first, now we keep the same order as we stored them, which I think is better.

I also change the label ordering, I think whether we are forward or backward we should keep the same aphabetical labels ordering not sure why direction was altering this before.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve heap iterator backward test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jul 8, 2020
1 parent c149099 commit cd74043
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 94 deletions.
50 changes: 3 additions & 47 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h iteratorMaxHeap) Less(i, j int) bool {
case un1 > un2:
return true
default: // un1 == un2
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}
}

Expand Down Expand Up @@ -232,8 +232,7 @@ func (i *heapIterator) Next() bool {
}

heap.Pop(i.heap)
// insert keeps i.tuples sorted
i.tuples = insert(i.tuples, tuple{
i.tuples = append(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
Expand All @@ -250,7 +249,7 @@ func (i *heapIterator) Next() bool {

// Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := mostCommon(i.tuples)
t := i.tuples[0]
i.currEntry = t.Entry
i.currLabels = t.Labels()

Expand All @@ -270,49 +269,6 @@ func (i *heapIterator) Next() bool {
return true
}

// Insert new tuple to correct position into ordered set of tuples.
// Insert sort is fast for small number of elements, and here we only expect max [number of replicas] elements.
func insert(ts []tuple, n tuple) []tuple {
ix := 0
for ix < len(ts) && ts[ix].Line <= n.Line {
ix++
}
if ix < len(ts) {
ts = append(ts, tuple{}) // zero element
copy(ts[ix+1:], ts[ix:])
ts[ix] = n
} else {
ts = append(ts, n)
}
return ts
}

// Expects that tuples are sorted already. We achieve that by using insert.
func mostCommon(tuples []tuple) tuple {
// trivial case, no need to do extra work.
if len(tuples) == 1 {
return tuples[0]
}

result := tuples[0]
count, max := 0, 0
for i := 0; i < len(tuples)-1; i++ {
if tuples[i].Line == tuples[i+1].Line {
count++
continue
}
if count > max {
result = tuples[i]
max = count
}
count = 0
}
if count > max {
result = tuples[len(tuples)-1]
}
return result
}

func (i *heapIterator) Entry() logproto.Entry {
return i.currEntry
}
Expand Down
111 changes: 67 additions & 44 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package iter
import (
"context"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -231,50 +230,74 @@ func inverse(g generator) generator {
}
}

func TestMostCommon(t *testing.T) {
// First is most common.
tuples := []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "a"}},
}
require.Equal(t, "a", mostCommon(tuples).Entry.Line)

tuples = []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "d"}},
}
require.Equal(t, "c", mostCommon(tuples).Entry.Line)
}

func TestInsert(t *testing.T) {
toInsert := []tuple{
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "e"}},
{Entry: logproto.Entry{Line: "c"}},
{Entry: logproto.Entry{Line: "b"}},
{Entry: logproto.Entry{Line: "d"}},
{Entry: logproto.Entry{Line: "a"}},
{Entry: logproto.Entry{Line: "c"}},
func TestHeapIteratorDeduplication(t *testing.T) {
foo := logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 3), Line: "3"},
},
}

var ts []tuple
for _, e := range toInsert {
ts = insert(ts, e)
bar := logproto.Stream{
Labels: `{app="bar"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 3), Line: "3"},
},
}
assertIt := func(it EntryIterator, reversed bool, length int) {
for i := 0; i < length; i++ {
j := i
if reversed {
j = length - 1 - i
}
require.True(t, it.Next())
require.NoError(t, it.Error())
require.Equal(t, bar.Labels, it.Labels())
require.Equal(t, bar.Entries[j], it.Entry())

require.True(t, sort.SliceIsSorted(ts, func(i, j int) bool {
return ts[i].Line < ts[j].Line
}))
require.True(t, it.Next())
require.NoError(t, it.Error())
require.Equal(t, foo.Labels, it.Labels())
require.Equal(t, foo.Entries[j], it.Entry())

}
require.False(t, it.Next())
require.NoError(t, it.Error())
}
// forward iteration
it := NewHeapIterator(context.Background(), []EntryIterator{
NewStreamIterator(foo),
NewStreamIterator(bar),
NewStreamIterator(foo),
NewStreamIterator(bar),
NewStreamIterator(foo),
NewStreamIterator(bar),
NewStreamIterator(foo),
}, logproto.FORWARD)
assertIt(it, false, len(foo.Entries))

// backward iteration
it = NewHeapIterator(context.Background(), []EntryIterator{
mustReverseStreamIterator(NewStreamIterator(foo)),
mustReverseStreamIterator(NewStreamIterator(bar)),
mustReverseStreamIterator(NewStreamIterator(foo)),
mustReverseStreamIterator(NewStreamIterator(bar)),
mustReverseStreamIterator(NewStreamIterator(foo)),
mustReverseStreamIterator(NewStreamIterator(bar)),
mustReverseStreamIterator(NewStreamIterator(foo)),
}, logproto.BACKWARD)
assertIt(it, true, len(foo.Entries))
}

func mustReverseStreamIterator(it EntryIterator) EntryIterator {
reversed, err := NewReversedIter(it, 0, true)
if err != nil {
panic(err)
}
return reversed
}

func TestReverseIterator(t *testing.T) {
Expand All @@ -288,10 +311,10 @@ func TestReverseIterator(t *testing.T) {
for i := int64((testSize / 2) + 1); i <= testSize; i++ {
assert.Equal(t, true, reversedIter.Next())
assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i))
assert.Equal(t, reversedIter.Labels(), itr1.Labels())
assert.Equal(t, reversedIter.Labels(), itr2.Labels())
assert.Equal(t, true, reversedIter.Next())
assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i))
assert.Equal(t, reversedIter.Labels(), itr2.Labels())
assert.Equal(t, reversedIter.Labels(), itr1.Labels())
}

assert.Equal(t, false, reversedIter.Next())
Expand Down
6 changes: 3 additions & 3 deletions pkg/logql/series_extractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ func Test_seriesIterator_Peek(t *testing.T) {
extractBytes,
),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: 0, Value: 4}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 4}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 4}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}},
{false, Sample{}},
},
},
Expand Down

0 comments on commit cd74043

Please sign in to comment.