Skip to content

Commit

Permalink
Close iterator properly and check nil before releasing buffers. (#1480)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena committed Jan 8, 2020
1 parent 8de2bc6 commit be010f0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 7 deletions.
15 changes: 10 additions & 5 deletions pkg/chunkenc/memchunk.go
Expand Up @@ -695,16 +695,21 @@ func (si *bufferedIterator) close() {
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.bufReader)
if si.reader != nil {
si.pool.PutReader(si.reader)
si.reader = nil
}
if si.bufReader != nil {
BufReaderPool.Put(si.bufReader)
si.bufReader = nil
}

if si.buf != nil {
BytesBufferPool.Put(si.buf)
si.buf = nil
}
si.origBytes = nil
si.bufReader = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
}

func (si *bufferedIterator) Labels() string { return "" }
42 changes: 42 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/grafana/loki/pkg/chunkenc/testdata"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -394,6 +395,47 @@ func TestChunkSize(t *testing.T) {
}
}

func TestIteratorClose(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
for _, test := range []func(iter iter.EntryIterator, t *testing.T){
func(iter iter.EntryIterator, t *testing.T) {
// close without iterating
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after iterating
for iter.Next() {
_ = iter.Entry()
}
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after a single iteration
iter.Next()
_ = iter.Entry()
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
} {
c := NewMemChunk(enc)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil)
if err != nil {
t.Fatal(err)
}
test(iter, t)
}

})
}
}

var result []Chunk

func BenchmarkWrite(b *testing.B) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/iter/iterator.go
Expand Up @@ -522,7 +522,12 @@ func (i *entryIteratorBackward) Entry() logproto.Entry {
return i.cur
}

func (i *entryIteratorBackward) Close() error { return nil }
func (i *entryIteratorBackward) Close() error {
if !i.loaded {
return i.forwardIter.Close()
}
return nil
}

func (i *entryIteratorBackward) Error() error { return nil }

Expand Down Expand Up @@ -608,7 +613,12 @@ func (i *entryIteratorForward) Entry() logproto.Entry {
return i.cur.entry
}

func (i *entryIteratorForward) Close() error { return nil }
func (i *entryIteratorForward) Close() error {
if !i.loaded {
return i.backwardIter.Close()
}
return nil
}

func (i *entryIteratorForward) Error() error { return nil }

Expand Down

0 comments on commit be010f0

Please sign in to comment.