Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop appending duplicate exemplars #1316

Merged
merged 13 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- [ENHANCEMENT] integrations-next: Add `extra_labels` to add a custom set of
labels to integration targets. (@rfratto)

- [ENHANCEMENT] The agent no longer appends duplicate exemplars. (@tpaschalis)

- [BUGFIX] Fixed issue where Grafana Agent may panic if there is a very large
WAL loading while old WALs are being deleted or the `/agent/api/v1/targets`
endpoint is called. (@tpaschalis)
Expand Down
47 changes: 39 additions & 8 deletions pkg/metrics/wal/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wal
import (
"sync"

"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/intern"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand Down Expand Up @@ -97,10 +98,11 @@ const (
//
// This code is copied from the Prometheus TSDB.
type stripeSeries struct {
size int
series []map[uint64]*memSeries
hashes []seriesHashmap
locks []stripeLock
size int
series []map[uint64]*memSeries
hashes []seriesHashmap
exemplars []map[uint64]*exemplar.Exemplar
locks []stripeLock
}

type stripeLock struct {
Expand All @@ -112,10 +114,11 @@ type stripeLock struct {
func newStripeSeries() *stripeSeries {
stripeSize := defaultStripeSize
s := &stripeSeries{
size: stripeSize,
series: make([]map[uint64]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
locks: make([]stripeLock, stripeSize),
size: stripeSize,
series: make([]map[uint64]*memSeries, stripeSize),
hashes: make([]seriesHashmap, stripeSize),
exemplars: make([]map[uint64]*exemplar.Exemplar, stripeSize),
locks: make([]stripeLock, stripeSize),
}

for i := range s.series {
Expand All @@ -124,6 +127,9 @@ func newStripeSeries() *stripeSeries {
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
}
for i := range s.exemplars {
s.exemplars[i] = map[uint64]*exemplar.Exemplar{}
}
return s
}

Expand Down Expand Up @@ -171,6 +177,10 @@ func (s *stripeSeries) gc(mint int64) map[uint64]struct{} {
delete(s.series[i], series.ref)
s.hashes[j].del(seriesHash, series.ref)

// Since the series is gone, we'll also delete
// the latest stored exemplar.
delete(s.exemplars[i], series.ref)

if i != j {
s.locks[j].Unlock()
}
Expand Down Expand Up @@ -216,6 +226,27 @@ func (s *stripeSeries) set(hash uint64, series *memSeries) {
s.locks[i].Unlock()
}

func (s *stripeSeries) getLatestExemplar(id uint64) *exemplar.Exemplar {
i := id & uint64(s.size-1)

s.locks[i].RLock()
exemplar := s.exemplars[i][id]
s.locks[i].RUnlock()

return exemplar
}

func (s *stripeSeries) setLatestExemplar(id uint64, exemplar *exemplar.Exemplar) {
i := id & uint64(s.size-1)

// Make sure that's a valid series id and record its latest exemplar
s.locks[i].Lock()
if s.series[i][id] != nil {
s.exemplars[i][id] = exemplar
}
s.locks[i].Unlock()
}

func (s *stripeSeries) iterator() *stripeSeriesIterator {
return &stripeSeriesIterator{s}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/metrics/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ func (w *Storage) loadWAL(r *wal.Reader) (err error) {
decoded <- samples
case record.Tombstones, record.Exemplars:
// We don't care about decoding tombstones or exemplars
// TODO: If decide to decode exemplars, we should make sure to prepopulate
// stripeSeries.exemplars in the next block by using setLatestExemplar.
continue
default:
errCh <- &wal.CorruptionErr{
Expand Down Expand Up @@ -646,13 +648,24 @@ func (a *appender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Exempl
}
}

// Check for duplicate vs last stored exemplar for this series, and discard those.
// Otherwise, record the current exemplar as the latest.
// Prometheus returns 0 when encountering duplicates, so we do the same here.
prevExemplar := a.w.series.getLatestExemplar(ref)
if prevExemplar != nil && prevExemplar.Equals(e) {
// Duplicate, don't return an error but don't accept the exemplar.
return 0, nil
tpaschalis marked this conversation as resolved.
Show resolved Hide resolved
}
a.w.series.setLatestExemplar(ref, &e)

a.exemplars = append(a.exemplars, record.RefExemplar{
Ref: ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})

a.w.metrics.totalAppendedExemplars.Inc()
return s.ref, nil
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/metrics/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,47 @@ func TestStorage(t *testing.T) {
require.Equal(t, expectedExemplars, actualExemplars)
}

func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "wal")
require.NoError(t, err)
defer os.RemoveAll(walDir)

s, err := NewStorage(log.NewNopLogger(), nil, walDir)
require.NoError(t, err)

app := s.Appender(context.Background())

sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
require.NoError(t, err, "should not reject valid series")

// If the Labels, Value or Timestamp are different than the last exemplar,
// then a new one should be appended; Otherwise, it should be skipped.
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

e.Labels = labels.Labels{{Name: "b", Value: "2"}}
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

e.Value = 42
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

e.Ts = 25
_, _ = app.AppendExemplar(sRef, nil, e)
_, _ = app.AppendExemplar(sRef, nil, e)

require.NoError(t, app.Commit())
collector := walDataCollector{}
replayer := walReplayer{w: &collector}
require.NoError(t, replayer.Replay(s.wal.Dir()))

// We had 9 calls to AppendExemplar but only 4 of those should have gotten through
require.Equal(t, 4, len(collector.exemplars))
}

func TestStorage_ExistingWAL(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "wal")
require.NoError(t, err)
Expand Down Expand Up @@ -331,6 +372,27 @@ func TestStorage_TruncateAfterClose(t *testing.T) {
require.Error(t, ErrWALClosed, s.Truncate(0))
}

func BenchmarkAppendExemplar(b *testing.B) {
walDir, _ := ioutil.TempDir(os.TempDir(), "wal")
defer os.RemoveAll(walDir)

s, _ := NewStorage(log.NewNopLogger(), nil, walDir)
defer s.Close()
app := s.Appender(context.Background())
sRef, _ := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
tpaschalis marked this conversation as resolved.
Show resolved Hide resolved

b.StartTimer()
for i := 0; i < b.N; i++ {
e.Ts = int64(i)
_, _ = app.AppendExemplar(sRef, nil, e)
}
tpaschalis marked this conversation as resolved.
Show resolved Hide resolved
b.StopTimer()

// Actually use appended exemplars in case they get eliminated
_ = app.Commit()
}

type sample struct {
ts int64
val float64
Expand Down