Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage memory improvement #713

Merged
merged 25 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -312,3 +312,7 @@ push-plugin: build-plugin

enable-plugin:
docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)

benchmark-store:
go run ./pkg/storage/hack/main.go
go test ./pkg/storage/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
3 changes: 2 additions & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ ingester:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 15m
chunk_idle_period: 5m
chunk_retain_period: 30s

schema_config:
configs:
Expand Down
3 changes: 2 additions & 1 deletion pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)

const (
Expand Down Expand Up @@ -51,7 +52,7 @@ func (c *dumbChunk) Size() int {

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
})
}

// Facade for compatibility with cortex chunk type, so we can use it's chunk store.
// Facade for compatibility with cortex chunk type, so we can use its chunk store.
type Facade struct {
c Chunk
encoding.Chunk
Expand Down
165 changes: 95 additions & 70 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"hash"
"hash/crc32"
"io"
"math"
"time"

"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/loki/pkg/iter"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -53,8 +50,7 @@ type MemChunk struct {
head *headBlock

encoding Encoding
cw func(w io.Writer) CompressionWriter
cr func(r io.Reader) (CompressionReader, error)
cPool CompressionPool
}

type block struct {
Expand Down Expand Up @@ -96,10 +92,10 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}

func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) {
func (hb *headBlock) serialise(pool CompressionPool) ([]byte, error) {
buf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := cw(buf)
compressedWriter := pool.GetWriter(buf)
for _, logEntry := range hb.entries {
n := binary.PutVarint(encBuf, logEntry.t)
_, err := compressedWriter.Write(encBuf[:n])
Expand All @@ -120,7 +116,7 @@ func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte,
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}

pool.PutWriter(compressedWriter)
return buf.Bytes(), nil
}

Expand All @@ -136,18 +132,14 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {
blockSize: blockSize, // The blockSize in bytes.
blocks: []block{},

head: &headBlock{
mint: math.MaxInt64,
maxt: math.MinInt64,
},
head: &headBlock{},

encoding: enc,
}

switch enc {
case EncGZIP:
c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) }
c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }
c.cPool = &Gzip
default:
panic("unknown encoding")
}
Expand All @@ -163,8 +155,8 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) },
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
}

db := decbuf{b: b}
Expand Down Expand Up @@ -192,6 +184,7 @@ func NewByteChunk(b []byte) (*MemChunk, error) {

// Read the number of blocks.
num := db.uvarint()
bc.blocks = make([]block, 0, num)

for i := 0; i < num; i++ {
blk := block{}
Expand Down Expand Up @@ -343,7 +336,7 @@ func (c *MemChunk) cut() error {
return nil
}

b, err := c.head.serialise(c.cw)
b, err := c.head.serialise(c.cPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -384,22 +377,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks))
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
it, err := b.iterator(c.cr)
if err != nil {
return nil, err
}

its = append(its, it)
its = append(its, b.iterator(c.cPool, filter))
}
}

its = append(its, c.head.iterator(mint, maxt))
if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
}

iterForward := iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(its, ""),
Expand All @@ -414,21 +404,14 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction
return iter.NewEntryIteratorBackward(iterForward)
}

func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) {
func (b block) iterator(pool CompressionPool, filter logql.Filter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator, nil
}

r, err := cr(bytes.NewBuffer(b.b))
if err != nil {
return nil, err
return emptyIterator
}

s := bufio.NewReader(r)
return newBufferedIterator(s), nil
return newBufferedIterator(pool, b.b, filter)
}

func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
Expand All @@ -438,8 +421,16 @@ func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.

entries := make([]entry, len(hb.entries))
copy(entries, hb.entries)
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
}

if len(entries) == 0 {
return emptyIterator
}

return &listIterator{
entries: entries,
Expand Down Expand Up @@ -477,73 +468,107 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }

type bufferedIterator struct {
s *bufio.Reader
s *bufio.Reader
reader CompressionReader
pool CompressionPool

curT int64
curLog string
cur logproto.Entry

err error

buf []byte // The buffer a single entry.
decBuf []byte // The buffer for decoding the lengths.
buf *bytes.Buffer // The buffer for a single entry.
decBuf []byte // The buffer for decoding the lengths.

closed bool

filter logql.Filter
}

func newBufferedIterator(s *bufio.Reader) *bufferedIterator {
func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *bufferedIterator {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
r := pool.GetReader(bytes.NewBuffer(b))
return &bufferedIterator{
s: s,
buf: make([]byte, 1024),
s: BufReaderPool.Get(r),
reader: r,
pool: pool,
filter: filter,
buf: BytesBufferPool.Get(),
decBuf: make([]byte, binary.MaxVarintLen64),
}
}

func (si *bufferedIterator) Next() bool {
for {
ts, line, ok := si.moveNext()
if !ok {
si.Close()
return false
}
if si.filter != nil && !si.filter(line) {
continue
}
si.cur.Line = string(line)
si.cur.Timestamp = time.Unix(0, ts)
return true
}
}

// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
ts, err := binary.ReadVarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
}
return false
return 0, nil, false
}

l, err := binary.ReadUvarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err

return false
return 0, nil, false
}
}

for len(si.buf) < int(l) {
si.buf = append(si.buf, make([]byte, 1024)...)
if si.buf.Cap() < int(l) {
si.buf.Grow(int(l) - si.buf.Cap())
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
}

n, err := si.s.Read(si.buf[:l])
n, err := si.s.Read(si.buf.Bytes()[:l])
if err != nil && err != io.EOF {
si.err = err
return false
return 0, nil, false
}
if n < int(l) {
_, err = si.s.Read(si.buf[n:l])
for n < int(l) {
r, err := si.s.Read(si.buf.Bytes()[n:l])
if err != nil {
si.err = err
return false
return 0, nil, false
}
n += r
}

si.curT = ts
si.curLog = string(si.buf[:l])

return true
return ts, si.buf.Bytes()[:l], true
}

func (si *bufferedIterator) Entry() logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(0, si.curT),
Line: si.curLog,
}
return si.cur
}

func (si *bufferedIterator) Error() error { return si.err }

func (si *bufferedIterator) Close() error {
if !si.closed {
si.closed = true
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.s)
BytesBufferPool.Put(si.buf)
si.s = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
return si.err
}
return si.err
}

func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error { return si.err }
func (si *bufferedIterator) Labels() string { return "" }
Loading