Skip to content

Commit

Permalink
Reverse entry iterator pool (#2064)
Browse files Browse the repository at this point in the history
* wi

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored May 11, 2020
1 parent 9988ce4 commit a8b94ae
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
return iterForward, nil
}

return iter.NewReversedIter(iterForward, 0, false)
return iter.NewEntryReversedIter(iterForward)
}

func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.LineFilter) iter.EntryIterator {
Expand Down
19 changes: 19 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,25 @@ func BenchmarkRead(b *testing.B) {
}
}

func BenchmarkBackwardIterator(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil)
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
}

func TestGenerateDataSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
Expand Down
79 changes: 79 additions & 0 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -586,6 +587,84 @@ func (i *reverseIterator) Close() error {
return nil
}

var entryBufferPool = sync.Pool{
New: func() interface{} {
return &entryBuffer{
entries: make([]logproto.Entry, 0, 1024),
}
},
}

type entryBuffer struct {
entries []logproto.Entry
}

type reverseEntryIterator struct {
iter EntryIterator
cur logproto.Entry
buf *entryBuffer

loaded bool
}

// NewEntryReversedIter returns an iterator which loads all entries and iterates backward.
// The labels of entries is always empty.
func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) {
iter, err := &reverseEntryIterator{
iter: it,
buf: entryBufferPool.Get().(*entryBuffer),
}, it.Error()

if err != nil {
return nil, err
}

return iter, nil
}

func (i *reverseEntryIterator) load() {
if !i.loaded {
i.loaded = true
for i.iter.Next() {
i.buf.entries = append(i.buf.entries, i.iter.Entry())
}
i.iter.Close()
}
}

func (i *reverseEntryIterator) Next() bool {
i.load()
if len(i.buf.entries) == 0 {
entryBufferPool.Put(i.buf)
i.buf.entries = nil
return false
}
i.cur, i.buf.entries = i.buf.entries[len(i.buf.entries)-1], i.buf.entries[:len(i.buf.entries)-1]
return true
}

func (i *reverseEntryIterator) Entry() logproto.Entry {
return i.cur
}

func (i *reverseEntryIterator) Labels() string {
return ""
}

func (i *reverseEntryIterator) Error() error { return nil }

func (i *reverseEntryIterator) Close() error {
if i.buf.entries != nil {
i.buf.entries = i.buf.entries[:0]
entryBufferPool.Put(i.buf)
i.buf.entries = nil
}
if !i.loaded {
return i.iter.Close()
}
return nil
}

// ReadBatch reads a set of entries off an iterator.
func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) {
streams := map[string]*logproto.Stream{}
Expand Down
19 changes: 18 additions & 1 deletion pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestInsert(t *testing.T) {
}))
}

func TestReverseEntryIterator(t *testing.T) {
func TestReverseIterator(t *testing.T) {
itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels)
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")

Expand All @@ -300,6 +300,23 @@ func TestReverseEntryIterator(t *testing.T) {
assert.NoError(t, reversedIter.Close())
}

func TestReverseEntryIterator(t *testing.T) {
itr1 := mkStreamIterator(identity, defaultLabels)

reversedIter, err := NewEntryReversedIter(itr1)
require.NoError(t, err)

for i := int64(testSize - 1); i >= 0; i-- {
assert.Equal(t, true, reversedIter.Next())
assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i))
assert.Equal(t, reversedIter.Labels(), "")
}

assert.Equal(t, false, reversedIter.Next())
assert.Equal(t, nil, reversedIter.Error())
assert.NoError(t, reversedIter.Close())
}

func TestReverseEntryIteratorUnlimited(t *testing.T) {
itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels)
itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}")
Expand Down

0 comments on commit a8b94ae

Please sign in to comment.