Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid parsing labels when tailer is sending from a stream. #2973

Merged
merged 2 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func TestConcurrentPushes(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < concurrent; i++ {
l := makeRandomLabels()
for uniqueLabels[l] {
for uniqueLabels[l.String()] {
l = makeRandomLabels()
}
uniqueLabels[l] = true
uniqueLabels[l.String()] = true

wg.Add(1)
go func(labels string) {
Expand All @@ -91,7 +91,7 @@ func TestConcurrentPushes(t *testing.T) {

tt = tt.Add(entriesPerIteration * time.Nanosecond)
}
}(l)
}(l.String())
}

time.Sleep(100 * time.Millisecond) // ready
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestSyncPeriod(t *testing.T) {
result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)})
tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds())))
}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls, Entries: result}}}
pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls.String(), Entries: result}}}
err = inst.Push(context.Background(), pr)
require.NoError(t, err)

Expand Down Expand Up @@ -250,12 +250,12 @@ func entries(n int, t time.Time) []logproto.Entry {

var labelNames = []string{"app", "instance", "namespace", "user", "cluster"}

func makeRandomLabels() string {
func makeRandomLabels() labels.Labels {
ls := labels.NewBuilder(nil)
for _, ln := range labelNames {
ls.Set(ln, fmt.Sprintf("%d", rand.Int31()))
}
return ls.Labels().String()
return ls.Labels()
}

func Benchmark_PushInstance(b *testing.B) {
Expand Down
4 changes: 1 addition & 3 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
closedTailers = append(closedTailers, tailer.getID())
continue
}
if err := tailer.send(stream); err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to send stream to tailer", "err", err)
}
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()

Expand Down
41 changes: 17 additions & 24 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,52 +110,45 @@ func (t *tailer) loop() {
}
}

func (t *tailer) send(stream logproto.Stream) error {
func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) {
if t.isClosed() {
return nil
return
}

// if we are already dropping streams due to blocked connection, drop new streams directly to save some effort
if blockedSince := t.blockedSince(); blockedSince != nil {
if blockedSince.Before(time.Now().Add(-time.Second * 15)) {
t.close()
return nil
return
}
t.dropStream(stream)
return nil
return
}

streams, err := t.processStream(stream)
if err != nil {
return err
}
streams := t.processStream(stream, lbs)
if len(streams) == 0 {
return nil
return
}
for _, s := range streams {
select {
case t.sendChan <- &logproto.Stream{Labels: s.Labels, Entries: s.Entries}:
case t.sendChan <- s:
default:
t.dropStream(s)
t.dropStream(*s)
}
}
return nil
}

func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error) {
func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream {
// Optimization: skip filtering entirely, if no filter is set
if log.IsNoopPipeline(t.pipeline) {
return []logproto.Stream{stream}, nil
return []*logproto.Stream{&stream}
}
// pipeline are not thread safe and tailer can process multiple stream at once.
t.pipelineMtx.Lock()
defer t.pipelineMtx.Unlock()

streams := map[uint64]*logproto.Stream{}
lbs, err := logql.ParseLabels(stream.Labels)
if err != nil {
return nil, err
}

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.Process([]byte(e.Line))
Expand All @@ -174,11 +167,11 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error
Line: string(newLine),
})
}
streamsResult := make([]logproto.Stream, 0, len(streams))
streamsResult := make([]*logproto.Stream, 0, len(streams))
for _, stream := range streams {
streamsResult = append(streamsResult, *stream)
streamsResult = append(streamsResult, stream)
}
return streamsResult, nil
return streamsResult
}

// Returns true if tailer is interested in the passed labelset
Expand Down Expand Up @@ -232,12 +225,12 @@ func (t *tailer) dropStream(stream logproto.Stream) {
blockedAt := time.Now()
t.blockedAt = &blockedAt
}
droppedStream := logproto.DroppedStream{

t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{
From: stream.Entries[0].Timestamp,
To: stream.Entries[len(stream.Entries)-1].Timestamp,
Labels: stream.Labels,
}
t.droppedStreams = append(t.droppedStreams, &droppedStream)
})
}

func (t *tailer) popDroppedStreams() []*logproto.DroppedStream {
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -35,7 +36,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) {
go assert.NotPanics(t, func() {
defer routines.Done()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
_ = tailer.send(stream)
tailer.send(stream, labels.Labels{{Name: "type", Value: "test"}})
})

go assert.NotPanics(t, func() {
Expand All @@ -61,14 +62,15 @@ func Test_TailerSendRace(t *testing.T) {
for i := 1; i <= 20; i++ {
wg.Add(1)
go func() {
_ = tail.send(logproto.Stream{
Labels: makeRandomLabels(),
lbs := makeRandomLabels()
tail.send(logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 3), Line: "3"},
},
})
}, lbs)
wg.Done()
}()
}
Expand Down