diff --git a/CHANGELOG.md b/CHANGELOG.md index 6742df88fff2..ecf3f2270349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ - [ENHANCEMENT] integrations-next: Add `extra_labels` to add a custom set of labels to integration targets. (@rfratto) +- [ENHANCEMENT] The agent no longer appends duplicate exemplars. (@tpaschalis) + - [BUGFIX] Fixed issue where Grafana Agent may panic if there is a very large WAL loading while old WALs are being deleted or the `/agent/api/v1/targets` endpoint is called. (@tpaschalis) diff --git a/pkg/metrics/wal/series.go b/pkg/metrics/wal/series.go index fd1de2effd9b..35bda48e2974 100644 --- a/pkg/metrics/wal/series.go +++ b/pkg/metrics/wal/series.go @@ -3,6 +3,7 @@ package wal import ( "sync" + "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/intern" "github.com/prometheus/prometheus/pkg/labels" ) @@ -97,10 +98,11 @@ const ( // // This code is copied from the Prometheus TSDB. type stripeSeries struct { - size int - series []map[uint64]*memSeries - hashes []seriesHashmap - locks []stripeLock + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + exemplars []map[uint64]*exemplar.Exemplar + locks []stripeLock } type stripeLock struct { @@ -112,10 +114,11 @@ type stripeLock struct { func newStripeSeries() *stripeSeries { stripeSize := defaultStripeSize s := &stripeSeries{ - size: stripeSize, - series: make([]map[uint64]*memSeries, stripeSize), - hashes: make([]seriesHashmap, stripeSize), - locks: make([]stripeLock, stripeSize), + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + exemplars: make([]map[uint64]*exemplar.Exemplar, stripeSize), + locks: make([]stripeLock, stripeSize), } for i := range s.series { @@ -124,6 +127,9 @@ func newStripeSeries() *stripeSeries { for i := range s.hashes { s.hashes[i] = seriesHashmap{} } + for i := range s.exemplars { + s.exemplars[i] = map[uint64]*exemplar.Exemplar{} + } return s } @@ -171,6 +177,10 @@ func (s *stripeSeries) gc(mint int64) map[uint64]struct{} { delete(s.series[i], series.ref) s.hashes[j].del(seriesHash, series.ref) + // Since the series is gone, we'll also delete + // the latest stored exemplar. + delete(s.exemplars[i], series.ref) + if i != j { s.locks[j].Unlock() } @@ -216,6 +226,27 @@ func (s *stripeSeries) set(hash uint64, series *memSeries) { s.locks[i].Unlock() } +func (s *stripeSeries) getLatestExemplar(id uint64) *exemplar.Exemplar { + i := id & uint64(s.size-1) + + s.locks[i].RLock() + exemplar := s.exemplars[i][id] + s.locks[i].RUnlock() + + return exemplar +} + +func (s *stripeSeries) setLatestExemplar(id uint64, exemplar *exemplar.Exemplar) { + i := id & uint64(s.size-1) + + // Make sure that's a valid series id and record its latest exemplar + s.locks[i].Lock() + if s.series[i][id] != nil { + s.exemplars[i][id] = exemplar + } + s.locks[i].Unlock() +} + func (s *stripeSeries) iterator() *stripeSeriesIterator { return &stripeSeriesIterator{s} } diff --git a/pkg/metrics/wal/wal.go b/pkg/metrics/wal/wal.go index b5e8c3b7372a..15814d070739 100644 --- a/pkg/metrics/wal/wal.go +++ b/pkg/metrics/wal/wal.go @@ -282,6 +282,8 @@ func (w *Storage) loadWAL(r *wal.Reader) (err error) { decoded <- samples case record.Tombstones, record.Exemplars: // We don't care about decoding tombstones or exemplars + // TODO: If decide to decode exemplars, we should make sure to prepopulate + // stripeSeries.exemplars in the next block by using setLatestExemplar. continue default: errCh <- &wal.CorruptionErr{ @@ -646,6 +648,16 @@ func (a *appender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exempl } } + // Check for duplicate vs last stored exemplar for this series, and discard those. + // Otherwise, record the current exemplar as the latest. + // Prometheus returns 0 when encountering duplicates, so we do the same here. + prevExemplar := a.w.series.getLatestExemplar(ref) + if prevExemplar != nil && prevExemplar.Equals(e) { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + a.w.series.setLatestExemplar(ref, &e) + a.exemplars = append(a.exemplars, record.RefExemplar{ Ref: ref, T: e.Ts, @@ -653,6 +665,7 @@ func (a *appender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exempl Labels: e.Labels, }) + a.w.metrics.totalAppendedExemplars.Inc() return s.ref, nil } diff --git a/pkg/metrics/wal/wal_test.go b/pkg/metrics/wal/wal_test.go index fccc0a023aa5..a1cad5c4f86d 100644 --- a/pkg/metrics/wal/wal_test.go +++ b/pkg/metrics/wal/wal_test.go @@ -104,6 +104,47 @@ func TestStorage(t *testing.T) { require.Equal(t, expectedExemplars, actualExemplars) } +func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + + app := s.Appender(context.Background()) + + sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") + + // If the Labels, Value or Timestamp are different than the last exemplar, + // then a new one should be appended; Otherwise, it should be skipped. + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Labels = labels.Labels{{Name: "b", Value: "2"}} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Value = 42 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Ts = 25 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + require.NoError(t, app.Commit()) + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + // We had 9 calls to AppendExemplar but only 4 of those should have gotten through + require.Equal(t, 4, len(collector.exemplars)) +} + func TestStorage_ExistingWAL(t *testing.T) { walDir, err := ioutil.TempDir(os.TempDir(), "wal") require.NoError(t, err) @@ -331,6 +372,27 @@ func TestStorage_TruncateAfterClose(t *testing.T) { require.Error(t, ErrWALClosed, s.Truncate(0)) } +func BenchmarkAppendExemplar(b *testing.B) { + walDir, _ := ioutil.TempDir(os.TempDir(), "wal") + defer os.RemoveAll(walDir) + + s, _ := NewStorage(log.NewNopLogger(), nil, walDir) + defer s.Close() + app := s.Appender(context.Background()) + sRef, _ := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + + b.StartTimer() + for i := 0; i < b.N; i++ { + e.Ts = int64(i) + _, _ = app.AppendExemplar(sRef, nil, e) + } + b.StopTimer() + + // Actually use appended exemplars in case they get eliminated + _ = app.Commit() +} + type sample struct { ts int64 val float64