Skip to content

Commit

Permalink
lib/storage: do not create tsid if metric contains stale marker(#5069)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyatrefilov committed Oct 13, 2023
1 parent 98a5007 commit 70a638a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
9 changes: 9 additions & 0 deletions lib/storage/storage.go
Expand Up @@ -1716,6 +1716,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci

// Return only the first error, since it has no sense in returning all errors.
var firstWarn error
var isStaleNan bool
j := 0
for i := range mrs {
mr := &mrs[i]
Expand All @@ -1725,6 +1726,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
// doesn't know how to work with them.
continue
}
isStaleNan = true
}
if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention.
Expand Down Expand Up @@ -1838,6 +1840,13 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}

// If TSID was not found in cache and in indexdb, it's deleted. If metric contains stale
// marker, do not create a new TSID.
if isStaleNan {
j--
continue
}

// Slowest path - the TSID for the given mr.MetricNameRaw isn't found in indexdb. Create it.
generateTSID(&genTSID.TSID, mn)

Expand Down
61 changes: 61 additions & 0 deletions lib/storage/storage_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"testing/quick"
"time"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
)
Expand Down Expand Up @@ -1213,3 +1214,63 @@ func TestStorageDeleteStaleSnapshots(t *testing.T) {
t.Fatalf("cannot remove %q: %s", path, err)
}
}

func TestStorageSeriesAreNotCreatedOnStaleMarkers(t *testing.T) {
rng := rand.New(rand.NewSource(1))
path := "TestStorageSeriesAreNotCreatedOnStaleMarkers"
s := MustOpenStorage(path, -1, 1e5, 1e6)

// Verify no label names exist
lns, err := s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil {
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
}
if len(lns) != 0 {
t.Fatalf("found non-empty tag keys at the start: %q", lns)
}

// Add rows with stale markers to the storage
mrs := testGenerateStaleMetricRows(rng, 100, 0, 2e10)
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
t.Fatal("error when adding mrs: %w", err)
}

// Verify that tag keys were not created
lns, err = s.SearchLabelNamesWithFiltersOnTimeRange(nil, nil, TimeRange{}, 1e5, 1e9, noDeadline)
if err != nil {
t.Fatalf("error in SearchLabelNamesWithFiltersOnTimeRange() at the start: %s", err)
}
if len(lns) != 0 {
t.Fatalf("found non-empty tag keys at the start: %q", lns)
}

s.MustClose()
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
}

func testGenerateStaleMetricRows(rng *rand.Rand, rows uint64, timestampMin, timestampMax int64) []MetricRow {
var mrs []MetricRow
var mn MetricName
mn.Tags = []Tag{
{[]byte("job"), []byte("test_service")},
{[]byte("instance"), []byte("1.2.3.4")},
}

for i := 0; i < int(rows); i++ {
mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i))
metricNameRaw := mn.marshalRaw(nil)
timestamp := rng.Int63n(timestampMax - timestampMin)
value := decimal.StaleNaN

mr := MetricRow{
MetricNameRaw: metricNameRaw,
Timestamp: timestamp,
Value: value,
}
mrs = append(mrs, mr)
}

return mrs
}

0 comments on commit 70a638a

Please sign in to comment.