Skip to content

Commit

Permalink
live-tailing: preload all the historic entries before query context i…
Browse files Browse the repository at this point in the history
…s cancelled (#862)

* preload all the historic entries before query context is cancelled

* fix lint error
  • Loading branch information
sandeepsukhani authored and cyriltovena committed Aug 7, 2019
1 parent d21c60b commit 2b8a3d9
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
14 changes: 12 additions & 2 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,18 @@ type entryIteratorForward struct {

// NewEntryIteratorBackward returns an iterator which loads all or upton N entries
// of an existing iterator, and then iterates over them backward.
func NewEntryIteratorForward(it EntryIterator, limit uint32) (EntryIterator, error) {
return &entryIteratorForward{entriesWithLabels: make([]entryWithLabels, 0, 1024), backwardIter: it, limit: limit}, it.Error()
// preload entries when they are being queried with a timeout
func NewEntryIteratorForward(it EntryIterator, limit uint32, preload bool) (EntryIterator, error) {
itr, err := &entryIteratorForward{entriesWithLabels: make([]entryWithLabels, 0, 1024), backwardIter: it, limit: limit}, it.Error()
if err != nil {
return nil, err
}

if preload {
itr.load()
}

return itr, nil
}

func (i *entryIteratorForward) load() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestEntryIteratorForward(t *testing.T) {
itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}")

heapIterator := NewHeapIterator([]EntryIterator{itr1, itr2}, logproto.BACKWARD)
forwardIterator, err := NewEntryIteratorForward(heapIterator, testSize)
forwardIterator, err := NewEntryIteratorForward(heapIterator, testSize, false)
require.NoError(t, err)

for i := int64((testSize / 2) + 1); i <= testSize; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
return nil, err
}

reversedIterator, err := iter.NewEntryIteratorForward(iter.NewHeapIterator(histIterators, logproto.BACKWARD), req.Limit)
reversedIterator, err := iter.NewEntryIteratorForward(iter.NewHeapIterator(histIterators, logproto.BACKWARD), req.Limit, true)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 2b8a3d9

Please sign in to comment.