Skip to content

Commit

Permalink
Implement Appender.AppendIdentifyingLabels
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed Feb 8, 2024
1 parent a5028ca commit 0b0249c
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 9 deletions.
4 changes: 4 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,10 @@ func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels
return 0, tsdb.ErrNotReady
}

func (n notReadyAppender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error {
return tsdb.ErrNotReady
}

func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}
Expand Down
15 changes: 15 additions & 0 deletions scrape/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (a nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *h
return 0, nil
}

func (a nopAppender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error {
return nil
}

func (a nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}
Expand Down Expand Up @@ -148,6 +152,17 @@ func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.
return a.next.AppendHistogram(ref, l, t, h, fh)
}

func (a *collectResultAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error {
a.mtx.Lock()
defer a.mtx.Unlock()
// TODO: Add to pendingDataLabels
if a.next == nil {
return nil
}

return a.next.AppendIdentifyingLabels(ref, identifyingLabels, t)
}

func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
Expand Down
14 changes: 14 additions & 0 deletions storage/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ func (f *fanoutAppender) AppendHistogram(ref SeriesRef, l labels.Labels, t int64
return ref, nil
}

func (f *fanoutAppender) AppendIdentifyingLabels(ref SeriesRef, identifyingLabels []string, t int64) error {
if err := f.primary.AppendIdentifyingLabels(ref, identifyingLabels, t); err != nil {
return err
}

for _, appender := range f.secondaries {
if err := appender.AppendIdentifyingLabels(ref, identifyingLabels, t); err != nil {
return err
}
}

return nil
}

func (f *fanoutAppender) UpdateMetadata(ref SeriesRef, l labels.Labels, m metadata.Metadata) (SeriesRef, error) {
ref, err := f.primary.UpdateMetadata(ref, l, m)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ type Appender interface {
// If the reference is 0 it must not be used for caching.
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)

AppendIdentifyingLabels(ref SeriesRef, identifyingLabels []string, t int64) error

// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
Expand Down
8 changes: 8 additions & 0 deletions storage/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ func (t *timestampTracker) AppendHistogram(_ storage.SeriesRef, _ labels.Labels,
return 0, nil
}

// AppendIdentifyingLabels implements storage.Appender.
func (t *timestampTracker) AppendIdentifyingLabels(_ storage.SeriesRef, _ []string, ts int64) error {
if ts > t.highestTimestamp {
t.highestTimestamp = ts
}
return nil
}

func (t *timestampTracker) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Add and increment a `metadata` field when we get around to wiring metadata in remote_write.
// UpadteMetadata is no-op for remote write (where timestampTracker is being used) for now.
Expand Down
11 changes: 9 additions & 2 deletions storage/remote/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"net/http"
"slices"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -169,8 +170,14 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
}

if len(ts.DataLabels) > 0 {
// Info type metric with metadata represented as data labels
if err := app.AppendDataLabels(ts.DataLabels, ts.Samples[0].Timestamp); err != nil {
// Info type metric with metadata represented as data labels.
identifyingLabels := make([]string, 0, len(ts.Labels)-len(ts.DataLabels))
for _, l := range ts.Labels {
if !slices.Contains(ts.DataLabels, l.Name) {
identifyingLabels = append(identifyingLabels, l.Name)
}
}
if err := app.AppendIdentifyingLabels(ref, identifyingLabels, ts.Samples[0].Timestamp); err != nil {
return err
}
}
Expand Down
25 changes: 18 additions & 7 deletions storage/remote/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,14 @@ func genSeriesWithSample(numSeries int, ts int64) []prompb.TimeSeries {
}

type mockAppendable struct {
latestSample int64
samples []mockSample
latestExemplar int64
exemplars []mockExemplar
latestHistogram int64
histograms []mockHistogram
commitErr error
latestSample int64
samples []mockSample
latestExemplar int64
exemplars []mockExemplar
latestHistogram int64
latestDataLabels int64
histograms []mockHistogram
commitErr error
}

type mockSample struct {
Expand Down Expand Up @@ -335,6 +336,16 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t
return 0, nil
}

func (m *mockAppendable) AppendIdentifyingLabels(_ storage.SeriesRef, identifyingLabels []string, t int64) error {
if t < m.latestDataLabels {
return storage.ErrOutOfOrderSample
}

m.latestDataLabels = t
// TODO: Record data labels sample
return nil
}

func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write.
// UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now.
Expand Down
5 changes: 5 additions & 0 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,11 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
return storage.SeriesRef(series.ref), nil
}

func (a *appender) AppendIdentifyingLabels(storage.SeriesRef, []string, int64) error {
// TODO: Implement
return nil
}

func (a *appender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
// TODO: Wire metadata in the Agent's appender.
return 0, nil
Expand Down
20 changes: 20 additions & 0 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ func (a *initAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t
return a.app.AppendHistogram(ref, l, t, h, fh)
}

func (a *initAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error {
if a.app != nil {
return a.app.AppendIdentifyingLabels(ref, identifyingLabels, t)
}

a.head.initTime(t)
a.app = a.head.appender()
return a.app.AppendIdentifyingLabels(ref, identifyingLabels, t)
}

func (a *initAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.UpdateMetadata(ref, l, m)
Expand Down Expand Up @@ -677,6 +687,16 @@ func (a *headAppender) AppendHistogram(ref storage.SeriesRef, lset labels.Labels
return storage.SeriesRef(s.ref), nil
}

func (a *headAppender) AppendIdentifyingLabels(ref storage.SeriesRef, identifyingLabels []string, t int64) error {
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
return fmt.Errorf("series not found: %d", ref)
}

// TODO: Implement persisting of identifying labels index
return nil
}

// UpdateMetadata for headAppender assumes the series ref already exists, and so it doesn't
// use getOrCreate or make any of the lset sanity checks that Append does.
func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, meta metadata.Metadata) (storage.SeriesRef, error) {
Expand Down

0 comments on commit 0b0249c

Please sign in to comment.