Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib/storage: adds nextIndexDB to mitigate performance degradation on #4626

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions app/vmstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(idbm().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_index_records_repopulated_total`, func() float64 {
return float64(idbm().DailyIndexRecordsRepopulated)
})
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
return float64(idbm().MissingTSIDsForMetricID)
})
Expand Down
49 changes: 44 additions & 5 deletions lib/storage/index_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type indexDB struct {
// The counter for time series which were re-populated from previous indexDB after the rotation.
timeseriesRepopulated uint64

// The counter for daily index records which were re-populated from current indexDB to the new indexDB before the rotation.
dailyIndexRecordsRepopulated uint64
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is unclear when this counter increases:

  1. When current indexDB is used to re-populate entires for new indexDB object
  2. Or when current indexDB gets new entries from previous IndexDB object.


// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
Expand Down Expand Up @@ -92,6 +95,9 @@ type indexDB struct {
// The unix timestamp in seconds for the indexDB rotation.
rotationTimestamp uint64

// The unix timestamp in seconds for the next indexDB rotation.
nextRotationTimestamp uint64

name string
tb *mergeset.Table

Expand All @@ -109,6 +115,29 @@ type indexDB struct {
loopsPerDateTagFilterCache *workingsetcache.Cache

indexSearchPool sync.Pool

// dateMetricIDCache is (Date, MetricID) cache.
dateMetricIDCache *dateMetricIDCache

// Fast cache for MetricID values occurred during the current hour.
currHourMetricIDs atomic.Pointer[hourMetricIDs]

// Fast cache for MetricID values occurred during the previous hour.
prevHourMetricIDs atomic.Pointer[hourMetricIDs]

// Fast cache for pre-populating per-day inverted index for the next day.
// This is needed in order to remove CPU usage spikes at 00:00 UTC
// due to creation of per-day inverted index for active time series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry]

// Pending MetricID values to be added to currHourMetricIDs.
pendingHourEntriesLock sync.Mutex
pendingHourEntries *uint64set.Set

// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync.Mutex
pendingNextDayMetricIDs *uint64set.Set
}

var maxTagFiltersCacheSize int
Expand Down Expand Up @@ -159,7 +188,15 @@ func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOn
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),

dateMetricIDCache: newDateMetricIDCache(),
pendingHourEntries: &uint64set.Set{},
pendingNextDayMetricIDs: &uint64set.Set{},
}
db.currHourMetricIDs.Store(&hourMetricIDs{})
db.prevHourMetricIDs.Store(&hourMetricIDs{})
db.nextDayMetricIDs.Store(&byDateMetricIDEntry{})

return db
}

Expand All @@ -177,9 +214,10 @@ type IndexDBMetrics struct {

IndexDBRefCount uint64

NewTimeseriesCreated uint64
TimeseriesRepopulated uint64
MissingTSIDsForMetricID uint64
NewTimeseriesCreated uint64
TimeseriesRepopulated uint64
DailyIndexRecordsRepopulated uint64
MissingTSIDsForMetricID uint64

RecentHourMetricIDsSearchCalls uint64
RecentHourMetricIDsSearchHits uint64
Expand Down Expand Up @@ -221,6 +259,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated)
m.DailyIndexRecordsRepopulated += atomic.LoadUint64(&db.dailyIndexRecordsRepopulated)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)

m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
Expand Down Expand Up @@ -398,7 +437,7 @@ func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date
}
mn.sortTags()
is.createGlobalIndexes(tsid, mn)
is.createPerDayIndexes(date, tsid.MetricID, mn)
// do not create per day indexes, it must be created by updatePerDateData function
PutMetricName(mn)
atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
return true, nil
Expand Down Expand Up @@ -2837,7 +2876,7 @@ func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName
kb.B = encoding.MarshalUint64(kb.B, date)
ii.registerTagIndexes(kb.B, mn, metricID)
is.db.tb.AddItems(ii.Items)
is.db.s.dateMetricIDCache.Set(date, metricID)
is.db.dateMetricIDCache.Set(date, metricID)
}

func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
Expand Down
14 changes: 8 additions & 6 deletions lib/storage/index_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,14 +2005,16 @@ func newTestStorage() *Storage {
s := &Storage{
cachePath: "test-storage-cache",

metricIDCache: workingsetcache.New(1234),
metricNameCache: workingsetcache.New(1234),
tsidCache: workingsetcache.New(1234),
dateMetricIDCache: newDateMetricIDCache(),
retentionMsecs: maxRetentionMsecs,
metricIDCache: workingsetcache.New(1234),
metricNameCache: workingsetcache.New(1234),
tsidCache: workingsetcache.New(1234),

retentionMsecs: maxRetentionMsecs,
}
s.setDeletedMetricIDs(&uint64set.Set{})
var idb *indexDB
idb := &indexDB{
dateMetricIDCache: newDateMetricIDCache(),
}
s.idbCurr.Store(idb)
return s
}
Expand Down