Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Labels computation LogQLv2 #2875

Merged
merged 6 commits into from
Nov 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"sort"
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)

const (
Expand Down Expand Up @@ -72,7 +70,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
Expand All @@ -97,7 +95,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi
}, nil
}

func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ labels.Labels, _ logql.SampleExtractor) iter.SampleIterator {
func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator {
return nil
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
)

// Errors returned by the chunk interface.
Expand Down Expand Up @@ -100,8 +98,8 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
Size() int
Expand All @@ -126,7 +124,7 @@ type Block interface {
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}
56 changes: 26 additions & 30 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/stats"
)

Expand Down Expand Up @@ -471,19 +470,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}

// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, lbs, pipeline))
its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.isEmpty() {
its = append(its, c.head.iterator(ctx, direction, mint, maxt, lbs, pipeline))
its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline))
}

if direction == logproto.FORWARD {
Expand Down Expand Up @@ -513,19 +512,19 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}

// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)

for _, b := range c.blocks {
if maxt < b.mint || b.maxt < mint {
continue
}
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, lbs, extractor))
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}

if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, lbs, extractor))
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
}

return iter.NewTimeRangedSampleIterator(
Expand Down Expand Up @@ -557,18 +556,18 @@ type encBlock struct {
block
}

func (b encBlock) Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, lbs, pipeline)
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline)
}

func (b encBlock) SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, lbs, extractor)
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor)
}

func (b block) Offset() int {
Expand All @@ -585,7 +584,7 @@ func (b block) MaxTime() int64 {
return b.maxt
}

func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
Expand All @@ -601,7 +600,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
newLine, parsedLbs, ok := pipeline.Process(line, lbs)
newLine, parsedLbs, ok := pipeline.Process(line)
if !ok {
continue
}
Expand Down Expand Up @@ -630,7 +629,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}

func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
Expand All @@ -640,7 +639,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l
for _, e := range hb.entries {
chunkStats.HeadChunkBytes += int64(len(e.s))
line := []byte(e.s)
value, parsedLabels, ok := extractor.Process(line, lbs)
value, parsedLabels, ok := extractor.Process(line)
if !ok {
continue
}
Expand Down Expand Up @@ -687,11 +686,9 @@ type bufferedIterator struct {
currTs int64

closed bool

baseLbs labels.Labels
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels) *bufferedIterator {
func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.CompressedBytes += int64(len(b))
return &bufferedIterator{
Expand All @@ -701,7 +698,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs lab
bufReader: nil, // will be initialized later
pool: pool,
decBuf: make([]byte, binary.MaxVarintLen64),
baseLbs: lbs,
}
}

Expand Down Expand Up @@ -806,19 +802,19 @@ func (si *bufferedIterator) close() {
si.decBuf = nil
}

func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator {
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator {
return &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
pipeline: pipeline,
}
}

type entryBufferedIterator struct {
*bufferedIterator
pipeline logql.Pipeline
pipeline log.StreamPipeline

cur logproto.Entry
currLabels labels.Labels
currLabels log.LabelsResult
}

func (e *entryBufferedIterator) Entry() logproto.Entry {
Expand All @@ -829,7 +825,7 @@ func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() }

func (e *entryBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
newLine, lbs, ok := e.pipeline.Process(e.currLine, e.baseLbs)
newLine, lbs, ok := e.pipeline.Process(e.currLine)
if !ok {
continue
}
Expand All @@ -841,9 +837,9 @@ func (e *entryBufferedIterator) Next() bool {
return false
}

func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator {
func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator {
it := &sampleBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, lbs),
bufferedIterator: newBufferedIterator(ctx, pool, b),
extractor: extractor,
}
return it
Expand All @@ -852,15 +848,15 @@ func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs label
type sampleBufferedIterator struct {
*bufferedIterator

extractor logql.SampleExtractor
extractor log.StreamSampleExtractor

cur logproto.Sample
currLabels labels.Labels
currLabels log.LabelsResult
}

func (e *sampleBufferedIterator) Next() bool {
for e.bufferedIterator.Next() {
val, labels, ok := e.extractor.Process(e.currLine, e.baseLbs)
val, labels, ok := e.extractor.Process(e.currLine)
if !ok {
continue
}
Expand Down
Loading