From 41e57a8e31856653ee2dcbb12ab2dcf758448126 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Nov 2020 23:12:16 +0100 Subject: [PATCH 1/2] Avoid parsing labels when tailer is sending from a stream. This also include some code cleanup. Signed-off-by: Cyril Tovena --- pkg/ingester/instance_test.go | 12 +++++----- pkg/ingester/stream.go | 4 +--- pkg/ingester/tailer.go | 42 +++++++++++++++-------------------- pkg/ingester/tailer_test.go | 10 +++++---- 4 files changed, 31 insertions(+), 37 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index f11e688ff6b1..ee57361fad2b 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -69,10 +69,10 @@ func TestConcurrentPushes(t *testing.T) { wg := sync.WaitGroup{} for i := 0; i < concurrent; i++ { l := makeRandomLabels() - for uniqueLabels[l] { + for uniqueLabels[l.String()] { l = makeRandomLabels() } - uniqueLabels[l] = true + uniqueLabels[l.String()] = true wg.Add(1) go func(labels string) { @@ -91,7 +91,7 @@ func TestConcurrentPushes(t *testing.T) { tt = tt.Add(entriesPerIteration * time.Nanosecond) } - }(l) + }(l.String()) } time.Sleep(100 * time.Millisecond) // ready @@ -123,7 +123,7 @@ func TestSyncPeriod(t *testing.T) { result = append(result, logproto.Entry{Timestamp: tt, Line: fmt.Sprintf("hello %d", i)}) tt = tt.Add(time.Duration(1 + rand.Int63n(randomStep.Nanoseconds()))) } - pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls, Entries: result}}} + pr := &logproto.PushRequest{Streams: []logproto.Stream{{Labels: lbls.String(), Entries: result}}} err = inst.Push(context.Background(), pr) require.NoError(t, err) @@ -250,12 +250,12 @@ func entries(n int, t time.Time) []logproto.Entry { var labelNames = []string{"app", "instance", "namespace", "user", "cluster"} -func makeRandomLabels() string { +func makeRandomLabels() labels.Labels { ls := labels.NewBuilder(nil) for _, ln := range labelNames { ls.Set(ln, fmt.Sprintf("%d", rand.Int31())) } - return ls.Labels().String() + return ls.Labels() } func Benchmark_PushInstance(b *testing.B) { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 801331c42376..e3d256732c8f 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -195,9 +195,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize closedTailers = append(closedTailers, tailer.getID()) continue } - if err := tailer.send(stream); err != nil { - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to send stream to tailer", "err", err) - } + tailer.send(stream, s.labels) } s.tailerMtx.RUnlock() diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 091eb9e691ef..67d36f6a50e2 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -110,52 +110,46 @@ func (t *tailer) loop() { } } -func (t *tailer) send(stream logproto.Stream) error { +func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { if t.isClosed() { - return nil + return } // if we are already dropping streams due to blocked connection, drop new streams directly to save some effort if blockedSince := t.blockedSince(); blockedSince != nil { if blockedSince.Before(time.Now().Add(-time.Second * 15)) { t.close() - return nil + return } t.dropStream(stream) - return nil + return } - streams, err := t.processStream(stream) - if err != nil { - return err - } + streams := t.processStream(stream, lbs) if len(streams) == 0 { - return nil + return } for _, s := range streams { select { - case t.sendChan <- &logproto.Stream{Labels: s.Labels, Entries: s.Entries}: + case t.sendChan <- s: default: - t.dropStream(s) + t.dropStream(*s) } } - return nil + return } -func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error) { +func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { // Optimization: skip filtering entirely, if no filter is set if log.IsNoopPipeline(t.pipeline) { - return []logproto.Stream{stream}, nil + return []*logproto.Stream{&stream} } // pipeline are not thread safe and tailer can process multiple stream at once. t.pipelineMtx.Lock() defer t.pipelineMtx.Unlock() streams := map[uint64]*logproto.Stream{} - lbs, err := logql.ParseLabels(stream.Labels) - if err != nil { - return nil, err - } + sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { newLine, parsedLbs, ok := sp.Process([]byte(e.Line)) @@ -174,11 +168,11 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error Line: string(newLine), }) } - streamsResult := make([]logproto.Stream, 0, len(streams)) + streamsResult := make([]*logproto.Stream, 0, len(streams)) for _, stream := range streams { - streamsResult = append(streamsResult, *stream) + streamsResult = append(streamsResult, stream) } - return streamsResult, nil + return streamsResult } // Returns true if tailer is interested in the passed labelset @@ -232,12 +226,12 @@ func (t *tailer) dropStream(stream logproto.Stream) { blockedAt := time.Now() t.blockedAt = &blockedAt } - droppedStream := logproto.DroppedStream{ + + t.droppedStreams = append(t.droppedStreams, &logproto.DroppedStream{ From: stream.Entries[0].Timestamp, To: stream.Entries[len(stream.Entries)-1].Timestamp, Labels: stream.Labels, - } - t.droppedStreams = append(t.droppedStreams, &droppedStream) + }) } func (t *tailer) popDroppedStreams() []*logproto.DroppedStream { diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 466f1948f515..84167da19113 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -35,7 +36,7 @@ func TestTailer_sendRaceConditionOnSendWhileClosing(t *testing.T) { go assert.NotPanics(t, func() { defer routines.Done() time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) - _ = tailer.send(stream) + tailer.send(stream, labels.Labels{{Name: "type", Value: "test"}}) }) go assert.NotPanics(t, func() { @@ -61,14 +62,15 @@ func Test_TailerSendRace(t *testing.T) { for i := 1; i <= 20; i++ { wg.Add(1) go func() { - _ = tail.send(logproto.Stream{ - Labels: makeRandomLabels(), + lbs := makeRandomLabels() + tail.send(logproto.Stream{ + Labels: lbs.String(), Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 1), Line: "1"}, {Timestamp: time.Unix(0, 2), Line: "2"}, {Timestamp: time.Unix(0, 3), Line: "3"}, }, - }) + }, lbs) wg.Done() }() } From 84fa4983020e9be4bf74a300132594919664efbc Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Sat, 21 Nov 2020 12:03:01 +0100 Subject: [PATCH 2/2] I got linted. Signed-off-by: Cyril Tovena --- pkg/ingester/tailer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 67d36f6a50e2..35c2cb579d60 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -136,7 +136,6 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { t.dropStream(*s) } } - return } func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream {