Skip to content

Commit

Permalink
lib/storage: pre-create timeseries before indexDB rotation
Browse files Browse the repository at this point in the history
during an hour before indexDB rotation start creating records at the next indexDB
it must improve performance during switch for the next indexDB and remove ingestion issues.
Since there is no need for creation new index records for timeseries already ingested into current indexDB
#4563
  • Loading branch information
f41gh7 committed Jul 18, 2023
1 parent e2367b6 commit d72bbd3
Show file tree
Hide file tree
Showing 5 changed files with 382 additions and 231 deletions.
3 changes: 3 additions & 0 deletions app/vmstorage/main.go
Expand Up @@ -662,6 +662,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(m().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_timeseries_precreated_total`, func() float64 {
return float64(m().TimeSeriesPreCreated)
})
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
return float64(m().NewTimeseriesCreated)
})
Expand Down
47 changes: 38 additions & 9 deletions lib/storage/index_db.go
Expand Up @@ -96,15 +96,38 @@ type indexDB struct {
// and is used for syncing items from different indexDBs
generation uint64

// The unix timestamp in seconds for the indexDB rotation.
rotationTimestamp uint64
// The unix timestamp in seconds for the next indexDB rotation.
nextRotationTimestamp uint64

name string
tb *mergeset.Table

extDB *indexDB
extDBLock sync.Mutex

// dateMetricIDCache is (Date, MetricID) cache.
dateMetricIDCache *dateMetricIDCache

// Fast cache for MetricID values occurred during the current hour.
currHourMetricIDs atomic.Pointer[hourMetricIDs]

// Fast cache for MetricID values occurred during the previous hour.
prevHourMetricIDs atomic.Pointer[hourMetricIDs]

// Fast cache for pre-populating per-day inverted index for the next day.
// This is needed in order to remove CPU usage spikes at 00:00 UTC
// due to creation of per-day inverted index for active time series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details.
nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry]

// Pending MetricID values to be added to currHourMetricIDs.
pendingHourEntriesLock sync.Mutex
pendingHourEntries *uint64set.Set

// Pending MetricIDs to be added to nextDayMetricIDs.
pendingNextDayMetricIDsLock sync.Mutex
pendingNextDayMetricIDs *uint64set.Set

// Cache for fast TagFilters -> MetricIDs lookup.
tagFiltersToMetricIDsCache *workingsetcache.Cache

Expand Down Expand Up @@ -137,9 +160,9 @@ func getTagFiltersCacheSize() int {
// The last segment of the path should contain unique hex value which
// will be then used as indexDB.generation
//
// The rotationTimestamp must be set to the current unix timestamp when mustOpenIndexDB
// The nextRetentionTimestamp must be set to the current unix timestamp when mustOpenIndexDB
// is called when creating new indexdb during indexdb rotation.
func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) *indexDB {
func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
Expand All @@ -157,16 +180,22 @@ func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOn
tagFiltersCacheSize := getTagFiltersCacheSize()

db := &indexDB{
refCount: 1,
generation: gen,
rotationTimestamp: rotationTimestamp,
tb: tb,
name: name,
refCount: 1,
generation: gen,
tb: tb,
name: name,

tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
dateMetricIDCache: newDateMetricIDCache(),
pendingHourEntries: &uint64set.Set{},
pendingNextDayMetricIDs: &uint64set.Set{},
}
db.currHourMetricIDs.Store(&hourMetricIDs{})
db.prevHourMetricIDs.Store(&hourMetricIDs{})
db.nextDayMetricIDs.Store(&byDateMetricIDEntry{})

return db
}

Expand Down
14 changes: 8 additions & 6 deletions lib/storage/index_db_test.go
Expand Up @@ -495,7 +495,7 @@ func TestIndexDBOpenClose(t *testing.T) {
tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ {
var isReadOnly uint32
db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly)
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
db.MustClose()
}
if err := os.RemoveAll(tableName); err != nil {
Expand Down Expand Up @@ -1962,12 +1962,14 @@ func newTestStorage() *Storage {
s := &Storage{
cachePath: "test-storage-cache",

metricIDCache: workingsetcache.New(1234),
metricNameCache: workingsetcache.New(1234),
tsidCache: workingsetcache.New(1234),
dateMetricIDCache: newDateMetricIDCache(),
retentionMsecs: maxRetentionMsecs,
metricIDCache: workingsetcache.New(1234),
metricNameCache: workingsetcache.New(1234),
tsidCache: workingsetcache.New(1234),
retentionMsecs: maxRetentionMsecs,
}
var idb indexDB
idb.dateMetricIDCache = newDateMetricIDCache()
s.idbCurr.Store(&idb)
s.setDeletedMetricIDs(&uint64set.Set{})
return s
}
Expand Down

0 comments on commit d72bbd3

Please sign in to comment.