diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a559278314..4b9086ef9b 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -5990,6 +5990,104 @@ ], "fieldValue": null, "fieldDefaultValue": null + }, + { + "kind": "block", + "name": "ephemeral_tsdb", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "retention_period", + "required": false, + "desc": "Retention of ephemeral series.", + "fieldValue": null, + "fieldDefaultValue": 600000000000, + "fieldFlag": "blocks-storage.ephemeral-tsdb.retention-period", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_chunks_write_buffer_size_bytes", + "required": false, + "desc": "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.", + "fieldValue": null, + "fieldDefaultValue": 4194304, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-chunks-write-buffer-size-bytes", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_chunks_end_time_variance", + "required": false, + "desc": "How much variance (as percentage between 0 and 1) should be applied to the chunk end time, to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 means no variance.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-chunks-end-time-variance", + "fieldType": "float", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "stripe_size", + "required": false, + "desc": "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.", + "fieldValue": null, + "fieldDefaultValue": 16384, + "fieldFlag": "blocks-storage.ephemeral-tsdb.stripe-size", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_chunks_write_queue_size", + "required": false, + "desc": "The size of the write queue used by the head chunks mapper. Lower values reduce memory utilisation at the cost of potentially higher ingest latency. Value of 0 switches chunks mapper to implementation without a queue.", + "fieldValue": null, + "fieldDefaultValue": 1000000, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-chunks-write-queue-size", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_postings_for_matchers_cache_ttl", + "required": false, + "desc": "How long to cache postings for matchers in the Head and OOOHead. 0 disables the cache and just deduplicates the in-flight calls.", + "fieldValue": null, + "fieldDefaultValue": 10000000000, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-ttl", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_postings_for_matchers_cache_size", + "required": false, + "desc": "Maximum number of entries in the cache for postings for matchers in the Head and OOOHead when ttl \u003e 0.", + "fieldValue": null, + "fieldDefaultValue": 100, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-size", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "head_postings_for_matchers_cache_force", + "required": false, + "desc": "Force the cache to be used for postings for matchers in the Head and OOOHead, even if it's not a concurrent (query-sharding) call.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-force", + "fieldType": "boolean", + "fieldCategory": "experimental" + } + ], + "fieldValue": null, + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 8374084138..d37e254f54 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -403,6 +403,22 @@ Usage of ./cmd/mimir/mimir: How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction). (default 15m0s) -blocks-storage.bucket-store.tenant-sync-concurrency int Maximum number of concurrent tenants synching blocks. (default 10) + -blocks-storage.ephemeral-tsdb.head-chunks-end-time-variance float + [experimental] How much variance (as percentage between 0 and 1) should be applied to the chunk end time, to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 means no variance. + -blocks-storage.ephemeral-tsdb.head-chunks-write-buffer-size-bytes int + [experimental] The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations. (default 4194304) + -blocks-storage.ephemeral-tsdb.head-chunks-write-queue-size int + [experimental] The size of the write queue used by the head chunks mapper. Lower values reduce memory utilisation at the cost of potentially higher ingest latency. Value of 0 switches chunks mapper to implementation without a queue. (default 1000000) + -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-force + [experimental] Force the cache to be used for postings for matchers in the Head and OOOHead, even if it's not a concurrent (query-sharding) call. + -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-size int + [experimental] Maximum number of entries in the cache for postings for matchers in the Head and OOOHead when ttl > 0. (default 100) + -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-ttl duration + [experimental] How long to cache postings for matchers in the Head and OOOHead. 0 disables the cache and just deduplicates the in-flight calls. (default 10s) + -blocks-storage.ephemeral-tsdb.retention-period duration + [experimental] Retention of ephemeral series. (default 10m0s) + -blocks-storage.ephemeral-tsdb.stripe-size int + [experimental] The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance. (default 16384) -blocks-storage.filesystem.dir string Local filesystem storage directory. (default "blocks") -blocks-storage.gcs.bucket-name string diff --git a/docs/sources/mimir/reference-configuration-parameters/index.md b/docs/sources/mimir/reference-configuration-parameters/index.md index 8fd028ea32..6d38719377 100644 --- a/docs/sources/mimir/reference-configuration-parameters/index.md +++ b/docs/sources/mimir/reference-configuration-parameters/index.md @@ -3177,6 +3177,51 @@ tsdb: # Head and OOOHead, even if it's not a concurrent (query-sharding) call. # CLI flag: -blocks-storage.tsdb.head-postings-for-matchers-cache-force [head_postings_for_matchers_cache_force: | default = false] + +ephemeral_tsdb: + # (experimental) Retention of ephemeral series. + # CLI flag: -blocks-storage.ephemeral-tsdb.retention-period + [retention_period: | default = 10m] + + # (experimental) The write buffer size used by the head chunks mapper. Lower + # values reduce memory utilisation on clusters with a large number of tenants + # at the cost of increased disk I/O operations. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-chunks-write-buffer-size-bytes + [head_chunks_write_buffer_size_bytes: | default = 4194304] + + # (experimental) How much variance (as percentage between 0 and 1) should be + # applied to the chunk end time, to spread chunks writing across time. Doesn't + # apply to the last chunk of the chunk range. 0 means no variance. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-chunks-end-time-variance + [head_chunks_end_time_variance: | default = 0] + + # (experimental) The number of shards of series to use in TSDB (must be a + # power of 2). Reducing this will decrease memory footprint, but can + # negatively impact performance. + # CLI flag: -blocks-storage.ephemeral-tsdb.stripe-size + [stripe_size: | default = 16384] + + # (experimental) The size of the write queue used by the head chunks mapper. + # Lower values reduce memory utilisation at the cost of potentially higher + # ingest latency. Value of 0 switches chunks mapper to implementation without + # a queue. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-chunks-write-queue-size + [head_chunks_write_queue_size: | default = 1000000] + + # (experimental) How long to cache postings for matchers in the Head and + # OOOHead. 0 disables the cache and just deduplicates the in-flight calls. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-ttl + [head_postings_for_matchers_cache_ttl: | default = 10s] + + # (experimental) Maximum number of entries in the cache for postings for + # matchers in the Head and OOOHead when ttl > 0. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-size + [head_postings_for_matchers_cache_size: | default = 100] + + # (experimental) Force the cache to be used for postings for matchers in the + # Head and OOOHead, even if it's not a concurrent (query-sharding) call. + # CLI flag: -blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-force + [head_postings_for_matchers_cache_force: | default = false] ``` ### compactor diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c63084ac2e..3aba7caed8 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -103,6 +103,9 @@ const ( tenantsWithOutOfOrderEnabledStatName = "ingester_ooo_enabled_tenants" minOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_min_window" maxOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_max_window" + + // Prefix used in Prometheus registry for ephemeral storage. + ephemeralPrometheusMetricsPrefix = "ephemeral_" ) // BlocksUploader interface is used to have an easy way to mock it in tests. @@ -306,6 +309,11 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u Help: "The current number of series in memory.", }, i.getMemorySeriesMetric) + promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_ephemeral_series", + Help: "The current number of ephemeral series in memory.", + }, i.getEphemeralSeriesMetric) + promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ Name: "cortex_ingester_oldest_unshipped_block_timestamp_seconds", Help: "Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.", @@ -678,7 +686,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( } // Early exit if no timeseries in request - don't create a TSDB or an appender. - if len(req.Timeseries) == 0 { + if len(req.Timeseries) == 0 && len(req.EphemeralTimeseries) == 0 { return &mimirpb.WriteResponse{}, nil } @@ -701,9 +709,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( // Keep track of some stats which are tracked only if the samples will be // successfully committed - stats pushStats - - minAppendTime, minAppendTimeAvailable = db.Head().AppendableMinValidTime() + persistentStats, ephemeralStats pushStats firstPartialErr error updateFirstPartial = func(errFn func() error) { @@ -714,16 +720,59 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( ) // Walk the samples, appending them to the users database - app := db.Appender(ctx).(extendedAppender) - level.Debug(spanlog).Log("event", "got appender", "numSeries", len(req.Timeseries)) + var persistentApp, ephemeralApp extendedAppender - var activeSeries *activeseries.ActiveSeries - if i.cfg.ActiveSeriesMetricsEnabled { - activeSeries = db.activeSeries + rollback := func() { + if persistentApp != nil { + if err := persistentApp.Rollback(); err != nil { + level.Warn(i.logger).Log("msg", "failed to rollback persistent appender on error", "user", userID, "err", err) + } + } + if ephemeralApp != nil { + if err := ephemeralApp.Rollback(); err != nil { + level.Warn(i.logger).Log("msg", "failed to rollback ephemeral appender on error", "user", userID, "err", err) + } + } } - err = i.pushSamplesToAppender(userID, req.Timeseries, app, startAppend, &stats, updateFirstPartial, activeSeries, i.limits.OutOfOrderTimeWindow(userID), minAppendTimeAvailable, minAppendTime) - if err != nil { - return nil, err + + if len(req.Timeseries) > 0 { + persistentApp = db.Appender(ctx).(extendedAppender) + + level.Debug(spanlog).Log("event", "got appender for persistent series", "series", len(req.Timeseries)) + + var activeSeries *activeseries.ActiveSeries + if i.cfg.ActiveSeriesMetricsEnabled { + activeSeries = db.activeSeries + } + + minAppendTime, minAppendTimeAvailable := db.Head().AppendableMinValidTime() + + err = i.pushSamplesToAppender(userID, req.Timeseries, persistentApp, startAppend, &persistentStats, updateFirstPartial, activeSeries, i.limits.OutOfOrderTimeWindow(userID), minAppendTimeAvailable, minAppendTime, true) + if err != nil { + rollback() + return nil, err + } + } + + if len(req.EphemeralTimeseries) > 0 { + a, err := db.EphemeralAppender(ctx) + if err != nil { + // TODO: handle error caused by limit (ephemeral storage disabled), and report it via firstPartialErr instead. + rollback() + return nil, err + } + + ephemeralApp = a.(extendedAppender) + + level.Debug(spanlog).Log("event", "got appender for ephemeral series", "ephemeralSeries", len(req.EphemeralTimeseries)) + + minAppendTime, minAppendTimeAvailable := db.getEphemeralStorage().AppendableMinValidTime() + + err = i.pushSamplesToAppender(userID, req.EphemeralTimeseries, ephemeralApp, startAppend, &ephemeralStats, updateFirstPartial, nil, 0, minAppendTimeAvailable, minAppendTime, false) + if err != nil { + rollback() + return nil, err + } } // At this point all samples have been added to the appender, so we can track the time it took. @@ -731,15 +780,35 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( level.Debug(spanlog).Log( "event", "start commit", - "succeededSamplesCount", stats.succeededSamplesCount, - "failedSamplesCount", stats.failedSamplesCount, - "succeededExemplarsCount", stats.succeededExemplarsCount, - "failedExemplarsCount", stats.failedExemplarsCount, + "succeededSamplesCount", persistentStats.succeededSamplesCount, + "failedSamplesCount", persistentStats.failedSamplesCount, + "succeededExemplarsCount", persistentStats.succeededExemplarsCount, + "failedExemplarsCount", persistentStats.failedExemplarsCount, + + "ephemeralSucceededSamplesCount", ephemeralStats.succeededSamplesCount, + "ephemeralFailedSamplesCount", ephemeralStats.failedSamplesCount, + "ephemeralSucceededExemplarsCount", ephemeralStats.succeededExemplarsCount, + "ephemeralFailedExemplarsCount", ephemeralStats.failedExemplarsCount, ) startCommit := time.Now() - if err := app.Commit(); err != nil { - return nil, wrapWithUser(err, userID) + if persistentApp != nil { + app := persistentApp + persistentApp = nil // Disable rollback for appender. If Commit fails, it auto-rollbacks. + + if err := app.Commit(); err != nil { + rollback() + return nil, wrapWithUser(err, userID) + } + } + if ephemeralApp != nil { + app := ephemeralApp + ephemeralApp = nil // Disable rollback for appender. If Commit fails, it auto-rollbacks. + + if err := app.Commit(); err != nil { + rollback() + return nil, wrapWithUser(err, userID) + } } commitDuration := time.Since(startCommit) @@ -747,22 +816,45 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( level.Debug(spanlog).Log("event", "complete commit", "commitDuration", commitDuration.String()) // If only invalid samples are pushed, don't change "last update", as TSDB was not modified. - if stats.succeededSamplesCount > 0 { + if persistentStats.succeededSamplesCount > 0 || ephemeralStats.succeededSamplesCount > 0 { db.setLastUpdate(time.Now()) } // Increment metrics only if the samples have been successfully committed. // If the code didn't reach this point, it means that we returned an error // which will be converted into an HTTP 5xx and the client should/will retry. - i.metrics.ingestedSamples.WithLabelValues(userID).Add(float64(stats.succeededSamplesCount)) - i.metrics.ingestedSamplesFail.WithLabelValues(userID).Add(float64(stats.failedSamplesCount)) - i.metrics.ingestedExemplars.Add(float64(stats.succeededExemplarsCount)) - i.metrics.ingestedExemplarsFail.Add(float64(stats.failedExemplarsCount)) - i.appendedSamplesStats.Inc(int64(stats.succeededSamplesCount)) - i.appendedExemplarsStats.Inc(int64(stats.succeededExemplarsCount)) + i.metrics.ingestedSamples.WithLabelValues(userID).Add(float64(persistentStats.succeededSamplesCount)) + i.metrics.ingestedSamplesFail.WithLabelValues(userID).Add(float64(persistentStats.failedSamplesCount)) + i.metrics.ingestedExemplars.Add(float64(persistentStats.succeededExemplarsCount)) + i.metrics.ingestedExemplarsFail.Add(float64(persistentStats.failedExemplarsCount)) + i.appendedSamplesStats.Inc(int64(persistentStats.succeededSamplesCount)) + i.appendedExemplarsStats.Inc(int64(persistentStats.succeededExemplarsCount)) + + if ephemeralStats.succeededSamplesCount > 0 || ephemeralStats.failedSamplesCount > 0 { + i.metrics.ephemeralIngestedSamples.WithLabelValues(userID).Add(float64(ephemeralStats.succeededSamplesCount)) + i.metrics.ephemeralIngestedSamplesFail.WithLabelValues(userID).Add(float64(ephemeralStats.failedSamplesCount)) + + i.appendedSamplesStats.Inc(int64(ephemeralStats.succeededSamplesCount)) + } group := i.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(i.limits, userID, req.Timeseries), startAppend) + i.updateMetricsFromPushStats(userID, group, &persistentStats, req.Source, db) + i.updateMetricsFromPushStats(userID, group, &ephemeralStats, req.Source, db) + + if firstPartialErr != nil { + code := http.StatusBadRequest + var ve *validationError + if errors.As(firstPartialErr, &ve) { + code = ve.code + } + return &mimirpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error()) + } + + return &mimirpb.WriteResponse{}, nil +} + +func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats *pushStats, samplesSource mimirpb.WriteRequest_SourceEnum, db *userTSDB) { if stats.sampleOutOfBoundsCount > 0 { i.metrics.discardedSamplesSampleOutOfBounds.WithLabelValues(userID, group).Add(float64(stats.sampleOutOfBoundsCount)) } @@ -784,33 +876,19 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( if stats.succeededSamplesCount > 0 { i.ingestionRate.Add(int64(stats.succeededSamplesCount)) - switch req.Source { - case mimirpb.RULE: + if samplesSource == mimirpb.RULE { db.ingestedRuleSamples.Add(int64(stats.succeededSamplesCount)) - case mimirpb.API: - fallthrough - default: + } else { db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount)) } } - - if firstPartialErr != nil { - code := http.StatusBadRequest - var ve *validationError - if errors.As(firstPartialErr, &ve) { - code = ve.code - } - return &mimirpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error()) - } - - return &mimirpb.WriteResponse{}, nil } // pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function, // but in case of unhandled errors, appender is rolled back and such error is returned. func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time, stats *pushStats, updateFirstPartial func(errFn func() error), activeSeries *activeseries.ActiveSeries, - outOfOrderWindow model.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { + outOfOrderWindow model.Duration, minAppendTimeAvailable bool, minAppendTime int64, appendExemplars bool) error { for _, ts := range timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -901,11 +979,6 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre continue } - // The error looks an issue on our side, so we should rollback - if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) - } - return wrapWithUser(err, userID) } @@ -916,7 +989,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre }) } - if len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 { + if appendExemplars && len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 { // app.AppendExemplar currently doesn't create the series, it must // already exist. If it does not then drop. if ref == 0 { @@ -1603,7 +1676,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID)) oooTW := time.Duration(i.limits.OutOfOrderTimeWindow(userID)) // Create a new user database - db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{ + const storageKey = "storage" + db, err := tsdb.Open(udir, log.With(userLogger, storageKey, "persistent"), tsdbPromReg, &tsdb.Options{ RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), MinBlockDuration: blockRanges[0], MaxBlockDuration: blockRanges[len(blockRanges)-1], @@ -1673,6 +1747,45 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } i.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) + + userDB.ephemeralSeriesRetentionPeriod = i.cfg.BlocksStorageConfig.EphemeralTSDB.Retention + userDB.ephemeralFactory = func() (*tsdb.Head, error) { + // TODO: check user limit for ephemeral series. If it's 0, don't create head and return error. + + headOptions := &tsdb.HeadOptions{ + ChunkRange: i.cfg.BlocksStorageConfig.EphemeralTSDB.Retention.Milliseconds(), + ChunkDirRoot: filepath.Join(udir, "ephemeral_chunks"), + ChunkPool: nil, + ChunkWriteBufferSize: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadChunksWriteBufferSize, + ChunkEndTimeVariance: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadChunksEndTimeVariance, + ChunkWriteQueueSize: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadChunksWriteQueueSize, + StripeSize: i.cfg.BlocksStorageConfig.EphemeralTSDB.StripeSize, + SeriesCallback: nil, // TODO: handle limits. + EnableExemplarStorage: false, + EnableMemorySnapshotOnShutdown: false, + IsolationDisabled: true, + PostingsForMatchersCacheTTL: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadPostingsForMatchersCacheTTL, + PostingsForMatchersCacheSize: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadPostingsForMatchersCacheSize, + PostingsForMatchersCacheForce: i.cfg.BlocksStorageConfig.EphemeralTSDB.HeadPostingsForMatchersCacheForce, + } + + headOptions.MaxExemplars.Store(0) + headOptions.OutOfOrderTimeWindow.Store(0) + headOptions.OutOfOrderCapMax.Store(int64(tsdb.DefaultOutOfOrderCapMax)) // We need to set this, despite OOO time window being 0. + headOptions.EnableNativeHistograms.Store(false) + + h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, storageKey, "ephemeral"), nil, nil, headOptions, nil) + if err != nil { + return nil, err + } + + i.metrics.memEphemeralUsers.Inc() + + // Don't allow ingestion of old samples into ephemeral storage. + h.SetMinValidTime(time.Now().Add(-i.cfg.BlocksStorageConfig.EphemeralTSDB.Retention).UnixMilli()) + return h, nil + } + return userDB, nil } @@ -1689,6 +1802,8 @@ func (i *Ingester) closeAllTSDB() { go func(db *userTSDB) { defer wg.Done() + ephemeral := db.hasEphemeralStorage() + if err := db.Close(); err != nil { level.Warn(i.logger).Log("msg", "unable to close TSDB", "err", err, "user", userID) return @@ -1704,6 +1819,9 @@ func (i *Ingester) closeAllTSDB() { i.metrics.memUsers.Dec() i.metrics.deletePerUserCustomTrackerMetrics(userID, db.activeSeries.CurrentMatcherNames()) + if ephemeral { + i.metrics.memEphemeralUsers.Dec() + } }(userDB) } @@ -1832,6 +1950,26 @@ func (i *Ingester) getMemorySeriesMetric() float64 { return float64(count) } +// getEphemeralSeriesMetric returns the total number of in-memory series in ephemeral storage across all tenants. +func (i *Ingester) getEphemeralSeriesMetric() float64 { + if err := i.checkRunning(); err != nil { + return 0 + } + + i.tsdbsMtx.RLock() + defer i.tsdbsMtx.RUnlock() + + count := uint64(0) + for _, db := range i.tsdbs { + eph := db.getEphemeralStorage() + if eph != nil { + count += eph.NumSeries() + } + } + + return float64(count) +} + // getOldestUnshippedBlockMetric returns the unix timestamp of the oldest unshipped block or // 0 if all blocks have been shipped. func (i *Ingester) getOldestUnshippedBlockMetric() float64 { @@ -2011,7 +2149,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util. default: reason = "regular" - err = userDB.Compact() + err = userDB.Compact(time.Now()) } if err != nil { @@ -2077,6 +2215,8 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes dir := userDB.db.Dir() + ephemeral := userDB.hasEphemeralStorage() + if err := userDB.Close(); err != nil { level.Error(i.logger).Log("msg", "failed to close idle TSDB", "user", userID, "err", err) return tsdbCloseFailed @@ -2099,6 +2239,9 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes }() i.metrics.memUsers.Dec() + if ephemeral { + i.metrics.memEphemeralUsers.Dec() + } i.tsdbMetrics.removeRegistryForUser(userID) i.deleteUserMetadata(userID) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b11e82a81d..35667fe511 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -4170,7 +4170,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { db := i.getTSDB(userID) require.NotNil(t, db) - require.Nil(t, db.Compact()) + require.Nil(t, db.Compact(time.Now())) oldBlocks := db.Blocks() require.Equal(t, 3, len(oldBlocks)) @@ -4194,7 +4194,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { _, err := i.Push(ctx, req) require.NoError(t, err) } - require.Nil(t, db.Compact()) + require.Nil(t, db.Compact(time.Now())) // Only the second block should be gone along with a new block. newBlocks := db.Blocks() @@ -4222,7 +4222,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { _, err := i.Push(ctx, req) require.NoError(t, err) } - require.Nil(t, db.Compact()) + require.Nil(t, db.Compact(time.Now())) // All blocks from the old blocks should be gone now. newBlocks2 := db.Blocks() @@ -5920,3 +5920,436 @@ func TestNewIngestErrMsgs(t *testing.T) { }) } } + +func TestIngester_PushEphemeral(t *testing.T) { + metricLabelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + metricLabels := mimirpb.FromLabelAdaptersToLabels(metricLabelAdapters) + + metricNames := []string{ + "cortex_ingester_ingested_ephemeral_samples_total", + "cortex_ingester_ingested_ephemeral_samples_failures_total", + "cortex_ingester_memory_ephemeral_series", + "cortex_ingester_memory_series", + "cortex_ingester_memory_users", + "cortex_ingester_ephemeral_series_created_total", + "cortex_ingester_ephemeral_series_removed_total", + "cortex_discarded_samples_total", + "cortex_ingester_memory_ephemeral_users", + } + userID := "test" + + now := time.Now() + + tests := map[string]struct { + reqs []*mimirpb.WriteRequest + additionalMetrics []string + expectedErr error + expectedMetrics string + }{ + "should succeed on pushing valid series to ephemeral storage": { + reqs: []*mimirpb.WriteRequest{ + ToWriteRequestEphemeral( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 1, TimestampMs: now.UnixMilli() - 10}}, + nil, + nil, + mimirpb.API), + + ToWriteRequestEphemeral( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 2, TimestampMs: now.UnixMilli()}}, + nil, + nil, + mimirpb.API), + }, + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_ingested_ephemeral_samples_total The total number of samples ingested per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_total counter + cortex_ingester_ingested_ephemeral_samples_total{user="test"} 2 + + # HELP cortex_ingester_ingested_ephemeral_samples_failures_total The total number of samples that errored on ingestion per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_failures_total counter + cortex_ingester_ingested_ephemeral_samples_failures_total{user="test"} 0 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="test"} 1 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="test"} 0 + + # HELP cortex_ingester_memory_ephemeral_series The current number of ephemeral series in memory. + # TYPE cortex_ingester_memory_ephemeral_series gauge + cortex_ingester_memory_ephemeral_series 1 + + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 0 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + + # HELP cortex_ingester_memory_ephemeral_users The current number of users with ephemeral storage in memory. + # TYPE cortex_ingester_memory_ephemeral_users gauge + cortex_ingester_memory_ephemeral_users 1 + `, + }, + + "old ephemeral samples are discarded": { + reqs: []*mimirpb.WriteRequest{ + ToWriteRequestEphemeral( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 1, TimestampMs: 100}}, + nil, + nil, + mimirpb.API), + }, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleTimestampTooOld(model.Time(100), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()), + expectedMetrics: ` + # HELP cortex_ingester_ingested_ephemeral_samples_total The total number of samples ingested per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_total counter + cortex_ingester_ingested_ephemeral_samples_total{user="test"} 0 + + # HELP cortex_ingester_ingested_ephemeral_samples_failures_total The total number of samples that errored on ingestion per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_failures_total counter + cortex_ingester_ingested_ephemeral_samples_failures_total{user="test"} 1 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="test"} 0 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="test"} 0 + + # HELP cortex_ingester_memory_ephemeral_series The current number of ephemeral series in memory. + # TYPE cortex_ingester_memory_ephemeral_series gauge + cortex_ingester_memory_ephemeral_series 0 + + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 0 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{group="",reason="sample-out-of-bounds",user="test"} 1 + + # HELP cortex_ingester_memory_ephemeral_users The current number of users with ephemeral storage in memory. + # TYPE cortex_ingester_memory_ephemeral_users gauge + cortex_ingester_memory_ephemeral_users 1 + `, + }, + "should fail on out-of-order samples": { + reqs: []*mimirpb.WriteRequest{ + ToWriteRequestEphemeral( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 2, TimestampMs: now.UnixMilli()}}, + nil, + nil, + mimirpb.API, + ), + + ToWriteRequestEphemeral( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 1, TimestampMs: now.UnixMilli() - 10}}, + nil, + nil, + mimirpb.API, + ), + }, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleOutOfOrder(model.Time(now.UnixMilli()-10), mimirpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()), + expectedMetrics: ` + # HELP cortex_ingester_ingested_ephemeral_samples_total The total number of samples ingested per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_total counter + cortex_ingester_ingested_ephemeral_samples_total{user="test"} 1 + + # HELP cortex_ingester_ingested_ephemeral_samples_failures_total The total number of samples that errored on ingestion per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_failures_total counter + cortex_ingester_ingested_ephemeral_samples_failures_total{user="test"} 1 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="test"} 1 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="test"} 0 + + # HELP cortex_ingester_memory_ephemeral_series The current number of ephemeral series in memory. + # TYPE cortex_ingester_memory_ephemeral_series gauge + cortex_ingester_memory_ephemeral_series 1 + + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 0 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{group="",reason="sample-out-of-order",user="test"} 1 + + # HELP cortex_ingester_memory_ephemeral_users The current number of users with ephemeral storage in memory. + # TYPE cortex_ingester_memory_ephemeral_users gauge + cortex_ingester_memory_ephemeral_users 1 + `, + }, + "request with mix of ephemeral and persistent series, with some good and some bad samples plus some metadata": { + reqs: []*mimirpb.WriteRequest{ + { + Source: mimirpb.API, + EphemeralTimeseries: []mimirpb.PreallocTimeseries{{ + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "eph_metric2"}}, + Samples: []mimirpb.Sample{ + {TimestampMs: now.UnixMilli(), Value: 100}, // Good sample. Next request will contain sample with lower TS. + }, + }, + }}, + }, + + { + Source: mimirpb.API, + Metadata: []*mimirpb.MetricMetadata{ + { + Type: mimirpb.COUNTER, + MetricFamilyName: "per_metric", + Help: "Some help goes here...", + Unit: "light years", + }, + }, + + Timeseries: []mimirpb.PreallocTimeseries{{ + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{ + {Name: labels.MetricName, Value: "per_metric"}, + }, + Exemplars: []mimirpb.Exemplar{{ + Labels: []mimirpb.LabelAdapter{{Name: "traceID", Value: "123"}}, + TimestampMs: 1000, + Value: 1000, + }}, + Samples: []mimirpb.Sample{ + {TimestampMs: now.UnixMilli(), Value: 100}, + {TimestampMs: now.UnixMilli() + 1000, Value: 200}, + }, + }, + }}, + + EphemeralTimeseries: []mimirpb.PreallocTimeseries{{ + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "eph_metric1"}}, + Exemplars: []mimirpb.Exemplar{{ + Labels: []mimirpb.LabelAdapter{{Name: "traceID", Value: "123"}}, + TimestampMs: 1000, + Value: 1000, + }}, + Samples: []mimirpb.Sample{ + {TimestampMs: 100, Value: 100}, // out of bounds, this will be reported as first error + {TimestampMs: now.UnixMilli(), Value: 200}, + {TimestampMs: now.UnixMilli() + 1000, Value: 200}, + {TimestampMs: now.UnixMilli() + 2000, Value: 300}, + }, + }, + }, { + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "eph_metric2"}}, + Samples: []mimirpb.Sample{ + {TimestampMs: now.UnixMilli() - 1000, Value: 100}, // out of order (compared to previous request) + {TimestampMs: now.UnixMilli(), Value: 500}, // This sample was sent in previous request, with different value. + {TimestampMs: now.UnixMilli() + 1000, Value: 1000}, + }, + }, + }}, + }}, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleTimestampTooOld(model.Time(100), []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "eph_metric1"}}), userID).Error()), + additionalMetrics: []string{"cortex_ingester_ingested_samples_total", "cortex_ingester_ingested_exemplars_total", "cortex_ingester_ingested_metadata_total"}, + expectedMetrics: ` + # HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested. + # TYPE cortex_ingester_ingested_exemplars_total counter + cortex_ingester_ingested_exemplars_total 1 + + # HELP cortex_ingester_ingested_samples_total The total number of samples ingested per user. + # TYPE cortex_ingester_ingested_samples_total counter + cortex_ingester_ingested_samples_total{user="test"} 2 + + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + + # HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested. + # TYPE cortex_ingester_ingested_metadata_total counter + cortex_ingester_ingested_metadata_total 1 + + # HELP cortex_ingester_ingested_ephemeral_samples_total The total number of samples ingested per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_total counter + cortex_ingester_ingested_ephemeral_samples_total{user="test"} 5 + + # HELP cortex_ingester_ingested_ephemeral_samples_failures_total The total number of samples that errored on ingestion per user for ephemeral series. + # TYPE cortex_ingester_ingested_ephemeral_samples_failures_total counter + cortex_ingester_ingested_ephemeral_samples_failures_total{user="test"} 3 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="test"} 2 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="test"} 0 + + # HELP cortex_ingester_memory_ephemeral_series The current number of ephemeral series in memory. + # TYPE cortex_ingester_memory_ephemeral_series gauge + cortex_ingester_memory_ephemeral_series 2 + + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{group="",reason="sample-out-of-bounds",user="test"} 1 + cortex_discarded_samples_total{group="",reason="sample-out-of-order",user="test"} 1 + cortex_discarded_samples_total{group="",reason="new-value-for-timestamp",user="test"} 1 + + # HELP cortex_ingester_memory_ephemeral_users The current number of users with ephemeral storage in memory. + # TYPE cortex_ingester_memory_ephemeral_users gauge + cortex_ingester_memory_ephemeral_users 1 + `, + }, + + "only persistent series -- does not initialize ephemeral storage": { + reqs: []*mimirpb.WriteRequest{ + mimirpb.ToWriteRequest( + []labels.Labels{metricLabels}, + []mimirpb.Sample{{Value: 1, TimestampMs: now.UnixMilli() - 10}}, + nil, + nil, + mimirpb.API), + }, + expectedErr: nil, + expectedMetrics: ` + # HELP cortex_ingester_memory_ephemeral_series The current number of ephemeral series in memory. + # TYPE cortex_ingester_memory_ephemeral_series gauge + cortex_ingester_memory_ephemeral_series 0 + + # HELP cortex_ingester_memory_series The current number of series in memory. + # TYPE cortex_ingester_memory_series gauge + cortex_ingester_memory_series 1 + + # HELP cortex_ingester_memory_users The current number of users in memory. + # TYPE cortex_ingester_memory_users gauge + cortex_ingester_memory_users 1 + + # HELP cortex_ingester_memory_ephemeral_users The current number of users with ephemeral storage in memory. + # TYPE cortex_ingester_memory_ephemeral_users gauge + cortex_ingester_memory_ephemeral_users 0 + `, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + registry := prometheus.NewRegistry() + + // Create a mocked ingester + cfg := defaultIngesterTestConfig(t) + cfg.IngesterRing.ReplicationFactor = 1 + cfg.ActiveSeriesMetricsEnabled = false + limits := defaultLimitsTestConfig() + limits.MaxGlobalExemplarsPerUser = 100 + limits.OutOfOrderTimeWindow = model.Duration(time.Minute * 10) + + i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), userID) + + // Wait until the ingester is healthy + test.Poll(t, 100*time.Millisecond, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) + + // Push timeseries + for idx, req := range testData.reqs { + // Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one. + _, err := i.PushWithCleanup(ctx, push.NewParsedRequest(req)) + + // We expect no error on any request except the last one + // which may error (and in that case we assert on it) + if idx < len(testData.reqs)-1 { + assert.NoError(t, err) + } else { + assert.Equal(t, testData.expectedErr, err) + } + } + + // Check tracked Prometheus metrics + err = testutil.GatherAndCompare(registry, strings.NewReader(testData.expectedMetrics), append(metricNames, testData.additionalMetrics...)...) + assert.NoError(t, err) + }) + } +} + +func ToWriteRequestEphemeral(lbls []labels.Labels, samples []mimirpb.Sample, exemplars []*mimirpb.Exemplar, metadata []*mimirpb.MetricMetadata, source mimirpb.WriteRequest_SourceEnum) *mimirpb.WriteRequest { + req := mimirpb.ToWriteRequest(lbls, samples, exemplars, metadata, source) + req.EphemeralTimeseries = req.Timeseries + req.Timeseries = nil + return req +} + +func TestIngesterTruncationOfEphemeralSeries(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.EphemeralTSDB.Retention = 10 * time.Minute + + // Create ingester + reg := prometheus.NewPedanticRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, reg) + require.NoError(t, err) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's healthy + test.Poll(t, 1*time.Second, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) + + metricLabels := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} + + now := time.Now() + + // Push ephemeral series with good timestamps (in last 10 minutes) + ctx := user.InjectOrgID(context.Background(), userID) + req := ToWriteRequestEphemeral( + []labels.Labels{mimirpb.FromLabelAdaptersToLabels(metricLabels)}, + []mimirpb.Sample{{Value: float64(100), TimestampMs: now.Add(-9 * time.Minute).UnixMilli()}}, + nil, + nil, + mimirpb.API, + ) + _, err = i.Push(ctx, req) + require.NoError(t, err) + + db := i.getTSDB(userID) + require.NotNil(t, db) + + // Advance time for ephemeral storage + require.Nil(t, db.Compact(now.Add(5*time.Minute))) + + // Pushing the same request should now fail, because min valid time for ephemeral storage has moved on to (now + 5 minutes - ephemeral series retention = now - 5 minutes) + _, err = i.Push(ctx, req) + require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(newIngestErrSampleTimestampTooOld(model.Time(req.EphemeralTimeseries[0].Samples[0].TimestampMs), metricLabels), userID).Error())) +} diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 63c7596250..12c085c097 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -16,18 +16,23 @@ import ( ) type ingesterMetrics struct { - ingestedSamples *prometheus.CounterVec - ingestedExemplars prometheus.Counter - ingestedMetadata prometheus.Counter - ingestedSamplesFail *prometheus.CounterVec - ingestedExemplarsFail prometheus.Counter - ingestedMetadataFail prometheus.Counter + ingestedSamples *prometheus.CounterVec + ingestedExemplars prometheus.Counter + ingestedMetadata prometheus.Counter + ingestedSamplesFail *prometheus.CounterVec + ingestedExemplarsFail prometheus.Counter + ingestedMetadataFail prometheus.Counter + + ephemeralIngestedSamples *prometheus.CounterVec + ephemeralIngestedSamplesFail *prometheus.CounterVec + queries prometheus.Counter queriedSamples prometheus.Histogram queriedExemplars prometheus.Histogram queriedSeries prometheus.Histogram memMetadata prometheus.Gauge memUsers prometheus.Gauge + memEphemeralUsers prometheus.Gauge memMetadataCreatedTotal *prometheus.CounterVec memMetadataRemovedTotal *prometheus.CounterVec @@ -124,6 +129,14 @@ func newIngesterMetrics( Name: "cortex_ingester_ingested_metadata_failures_total", Help: "The total number of metadata that errored on ingestion.", }), + ephemeralIngestedSamples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_ingested_ephemeral_samples_total", + Help: "The total number of samples ingested per user for ephemeral series.", + }, []string{"user"}), + ephemeralIngestedSamplesFail: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_ingested_ephemeral_samples_failures_total", + Help: "The total number of samples that errored on ingestion per user for ephemeral series.", + }, []string{"user"}), queries: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_queries_total", Help: "The total number of queries the ingester has handled.", @@ -154,6 +167,10 @@ func newIngesterMetrics( Name: "cortex_ingester_memory_users", Help: "The current number of users in memory.", }), + memEphemeralUsers: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_memory_ephemeral_users", + Help: "The current number of users with ephemeral storage in memory.", + }), memMetadataCreatedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_memory_metadata_created_total", Help: "The total number of metadata that were created per user", @@ -266,7 +283,7 @@ func newIngesterMetrics( }), appenderCommitDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_tsdb_appender_commit_duration_seconds", - Help: "The total time it takes for a push request to commit samples appended to TSDB.", + Help: "The total time it takes for a push request to commit samples appended to TSDB (both persistent and ephemeral).", Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, }), @@ -292,6 +309,9 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.memMetadataCreatedTotal.DeleteLabelValues(userID) m.memMetadataRemovedTotal.DeleteLabelValues(userID) + m.ephemeralIngestedSamples.DeleteLabelValues(userID) + m.ephemeralIngestedSamplesFail.DeleteLabelValues(userID) + filter := prometheus.Labels{"user": userID} m.discardedSamplesSampleOutOfBounds.DeletePartialMatch(filter) m.discardedSamplesSampleOutOfOrder.DeletePartialMatch(filter) @@ -376,6 +396,13 @@ type tsdbMetrics struct { memSeriesCreatedTotal *prometheus.Desc memSeriesRemovedTotal *prometheus.Desc + ephemeralHeadTruncateFail *prometheus.Desc + ephemeralHeadTruncateTotal *prometheus.Desc + ephemeralHeadGcDuration *prometheus.Desc + + ephemeralSeriesCreatedTotal *prometheus.Desc + ephemeralSeriesRemovedTotal *prometheus.Desc + regs *util.UserRegistries } @@ -563,6 +590,28 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { "cortex_ingester_memory_series_removed_total", "The total number of series that were removed per user.", []string{"user"}, nil), + + ephemeralHeadTruncateFail: prometheus.NewDesc( + "cortex_ingester_ephemeral_head_truncations_failed_total", + "Total number of TSDB head truncations that failed for ephemeral storage.", + nil, nil), + ephemeralHeadTruncateTotal: prometheus.NewDesc( + "cortex_ingester_ephemeral_head_truncations_total", + "Total number of TSDB head truncations attempted for ephemeral storage.", + nil, nil), + ephemeralHeadGcDuration: prometheus.NewDesc( + "cortex_ingester_ephemeral_head_gc_duration_seconds", + "Runtime of garbage collection in the TSDB head for ephemeral storage.", + nil, nil), + + ephemeralSeriesCreatedTotal: prometheus.NewDesc( + "cortex_ingester_ephemeral_series_created_total", + "The total number of series in ephemeral storage that were created per user.", + []string{"user"}, nil), + ephemeralSeriesRemovedTotal: prometheus.NewDesc( + "cortex_ingester_ephemeral_series_removed_total", + "The total number of series in ephemeral storage that were removed per user.", + []string{"user"}, nil), } if r != nil { @@ -619,6 +668,12 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.memSeriesCreatedTotal out <- sm.memSeriesRemovedTotal + + out <- sm.ephemeralHeadTruncateFail + out <- sm.ephemeralHeadTruncateTotal + out <- sm.ephemeralHeadGcDuration + out <- sm.ephemeralSeriesCreatedTotal + out <- sm.ephemeralSeriesRemovedTotal } func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { @@ -671,6 +726,12 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total") data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") + + data.SendSumOfCounters(out, sm.ephemeralHeadTruncateFail, ephemeralPrometheusMetricsPrefix+"prometheus_tsdb_head_truncations_failed_total") + data.SendSumOfCounters(out, sm.ephemeralHeadTruncateTotal, ephemeralPrometheusMetricsPrefix+"prometheus_tsdb_head_truncations_total") + data.SendSumOfSummaries(out, sm.ephemeralHeadGcDuration, ephemeralPrometheusMetricsPrefix+"prometheus_tsdb_head_gc_duration_seconds") + data.SendSumOfCountersPerUser(out, sm.ephemeralSeriesCreatedTotal, ephemeralPrometheusMetricsPrefix+"prometheus_tsdb_head_series_created_total") + data.SendSumOfCountersPerUser(out, sm.ephemeralSeriesRemovedTotal, ephemeralPrometheusMetricsPrefix+"prometheus_tsdb_head_series_removed_total") } func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) { diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 232106e8f1..1964fbf860 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -126,6 +126,31 @@ func TestTSDBMetrics(t *testing.T) { # TYPE cortex_ingester_tsdb_checkpoint_creations_total counter cortex_ingester_tsdb_checkpoint_creations_total 1883489 + # HELP cortex_ingester_ephemeral_head_gc_duration_seconds Runtime of garbage collection in the TSDB head for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_gc_duration_seconds summary + cortex_ingester_ephemeral_head_gc_duration_seconds_sum 5.154812e+06 + cortex_ingester_ephemeral_head_gc_duration_seconds_count 3 + + # HELP cortex_ingester_ephemeral_head_truncations_failed_total Total number of TSDB head truncations that failed for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_truncations_failed_total counter + cortex_ingester_ephemeral_head_truncations_failed_total 4.95655e+06 + + # HELP cortex_ingester_ephemeral_head_truncations_total Total number of TSDB head truncations attempted for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_truncations_total counter + cortex_ingester_ephemeral_head_truncations_total 5.055681e+06 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="user1"} 654285 + cortex_ingester_ephemeral_series_created_total{user="user2"} 4.546711e+06 + cortex_ingester_ephemeral_series_created_total{user="user3"} 52947 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="user1"} 666630 + cortex_ingester_ephemeral_series_removed_total{user="user2"} 4.632498e+06 + cortex_ingester_ephemeral_series_removed_total{user="user3"} 53946 + # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. # TYPE cortex_ingester_memory_series_created_total counter # 5 * (12345, 85787 and 999 respectively) @@ -480,6 +505,29 @@ func TestTSDBMetricsWithRemoval(t *testing.T) { # TYPE cortex_ingester_tsdb_out_of_order_samples_appended_total counter cortex_ingester_tsdb_out_of_order_samples_appended_total{user="user1"} 3 cortex_ingester_tsdb_out_of_order_samples_appended_total{user="user2"} 3 + + # HELP cortex_ingester_ephemeral_head_gc_duration_seconds Runtime of garbage collection in the TSDB head for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_gc_duration_seconds summary + cortex_ingester_ephemeral_head_gc_duration_seconds_sum 5.154812e+06 + cortex_ingester_ephemeral_head_gc_duration_seconds_count 3 + + # HELP cortex_ingester_ephemeral_head_truncations_failed_total Total number of TSDB head truncations that failed for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_truncations_failed_total counter + cortex_ingester_ephemeral_head_truncations_failed_total 4.95655e+06 + + # HELP cortex_ingester_ephemeral_head_truncations_total Total number of TSDB head truncations attempted for ephemeral storage. + # TYPE cortex_ingester_ephemeral_head_truncations_total counter + cortex_ingester_ephemeral_head_truncations_total 5.055681e+06 + + # HELP cortex_ingester_ephemeral_series_created_total The total number of series in ephemeral storage that were created per user. + # TYPE cortex_ingester_ephemeral_series_created_total counter + cortex_ingester_ephemeral_series_created_total{user="user1"} 654285 + cortex_ingester_ephemeral_series_created_total{user="user2"} 4.546711e+06 + + # HELP cortex_ingester_ephemeral_series_removed_total The total number of series in ephemeral storage that were removed per user. + # TYPE cortex_ingester_ephemeral_series_removed_total counter + cortex_ingester_ephemeral_series_removed_total{user="user1"} 666630 + cortex_ingester_ephemeral_series_removed_total{user="user2"} 4.632498e+06 `)) require.NoError(t, err) } @@ -752,5 +800,33 @@ func populateTSDBMetrics(base float64) *prometheus.Registry { }) outOfOrderSamplesAppendedTotal.Add(3) + ephHeadTruncateFail := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: ephemeralPrometheusMetricsPrefix + "prometheus_tsdb_head_truncations_failed_total", + Help: "Total number of head truncations that failed.", + }) + ephHeadTruncateFail.Add(50 * base) + + ephHeadTruncateTotal := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: ephemeralPrometheusMetricsPrefix + "prometheus_tsdb_head_truncations_total", + Help: "Total number of head truncations attempted.", + }) + ephHeadTruncateTotal.Add(51 * base) + + ephGcDuration := promauto.With(r).NewSummary(prometheus.SummaryOpts{ + Name: ephemeralPrometheusMetricsPrefix + "prometheus_tsdb_head_gc_duration_seconds", + Help: "Runtime of garbage collection in the head block.", + }) + ephGcDuration.Observe(52 * base) + + ephSeriesCreated := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: ephemeralPrometheusMetricsPrefix + "prometheus_tsdb_head_series_created_total", + }) + ephSeriesCreated.Add(53 * base) + + ephSeriesRemoved := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: ephemeralPrometheusMetricsPrefix + "prometheus_tsdb_head_series_removed_total", + }) + ephSeriesRemoved.Add(54 * base) + return r } diff --git a/pkg/ingester/user_tsdb.go b/pkg/ingester/user_tsdb.go index df5c8d15aa..20fed8ef86 100644 --- a/pkg/ingester/user_tsdb.go +++ b/pkg/ingester/user_tsdb.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/grafana/dskit/multierror" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -60,6 +61,13 @@ type userTSDB struct { seriesInMetric *metricCounter limiter *Limiter + // Function that creates ephemeral storage (*tsdb.Head) for the user. + ephemeralFactory func() (*tsdb.Head, error) + ephemeralSeriesRetentionPeriod time.Duration + + ephemeralMtx sync.RWMutex + ephemeralStorage *tsdb.Head + instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. instanceLimitsFn func() *InstanceLimits @@ -95,6 +103,50 @@ func (u *userTSDB) Appender(ctx context.Context) storage.Appender { return u.db.Appender(ctx) } +func (u *userTSDB) EphemeralAppender(ctx context.Context) (storage.Appender, error) { + es := u.getEphemeralStorage() + if es != nil { + return es.Appender(ctx), nil + } + + es, err := u.createEphemeralStorage() + if err != nil { + return nil, err + } + + return es.Appender(ctx), nil +} + +func (u *userTSDB) createEphemeralStorage() (*tsdb.Head, error) { + u.ephemeralMtx.Lock() + defer u.ephemeralMtx.Unlock() + + if u.ephemeralStorage != nil { + return u.ephemeralStorage, nil + } + + es, err := u.ephemeralFactory() + if err == nil { + u.ephemeralStorage = es + } + return u.ephemeralStorage, err +} + +// getEphemeralStorage returns ephemeral storage, if it exists, or nil otherwise. +func (u *userTSDB) getEphemeralStorage() *tsdb.Head { + u.ephemeralMtx.RLock() + defer u.ephemeralMtx.RUnlock() + + return u.ephemeralStorage +} + +func (u *userTSDB) hasEphemeralStorage() bool { + u.ephemeralMtx.RLock() + defer u.ephemeralMtx.RUnlock() + + return u.ephemeralStorage != nil +} + // Querier returns a new querier over the data partition for the given time range. func (u *userTSDB) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return u.db.Querier(ctx, mint, maxt) @@ -121,11 +173,26 @@ func (u *userTSDB) Blocks() []*tsdb.Block { } func (u *userTSDB) Close() error { - return u.db.Close() + var merr multierror.MultiError + + eph := u.getEphemeralStorage() + if eph != nil { + merr.Add(errors.Wrap(eph.Close(), "ephemeral storage")) + } + + merr.Add(errors.Wrap(u.db.Close(), "persistent storage")) + return merr.Err() } -func (u *userTSDB) Compact() error { - return u.db.Compact() +func (u *userTSDB) Compact(now time.Time) error { + var merr multierror.MultiError + eph := u.getEphemeralStorage() + if eph != nil { + merr.Add(errors.Wrap(eph.Truncate(now.Add(-u.ephemeralSeriesRetentionPeriod).UnixMilli()), "ephemeral storage")) + } + + merr.Add(errors.Wrap(u.db.Compact(), "persistent storage")) + return merr.Err() } func (u *userTSDB) StartTime() (int64, error) { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index c2452784d4..7d42f8b9a7 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -65,6 +65,14 @@ const ( // DefaultPartitionerMaxGapSize is the default max size - in bytes - of a gap for which the store-gateway // partitioner aggregates together two bucket GET object requests. DefaultPartitionerMaxGapSize = uint64(512 * 1024) + + headChunkWriterBufferSizeHelp = "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations." + headChunksEndTimeVarianceHelp = "How much variance (as percentage between 0 and 1) should be applied to the chunk end time, to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 means no variance." + headStripeSizeHelp = "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance." + headChunksWriteQueueSizeHelp = "The size of the write queue used by the head chunks mapper. Lower values reduce memory utilisation at the cost of potentially higher ingest latency. Value of 0 switches chunks mapper to implementation without a queue." + headPostingsForMatchersCacheTTLHelp = "How long to cache postings for matchers in the Head and OOOHead. 0 disables the cache and just deduplicates the in-flight calls." + headPostingsForMatchersCacheSizeHelp = "Maximum number of entries in the cache for postings for matchers in the Head and OOOHead when ttl > 0." + headPostingsForMatchersCacheForce = "Force the cache to be used for postings for matchers in the Head and OOOHead, even if it's not a concurrent (query-sharding) call." ) // Validation errors @@ -80,9 +88,10 @@ var ( // BlocksStorageConfig holds the config information for the blocks storage. type BlocksStorageConfig struct { - Bucket bucket.Config `yaml:",inline"` - BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the querier and store-gateway discover and synchronize blocks stored in the bucket."` - TSDB TSDBConfig `yaml:"tsdb"` + Bucket bucket.Config `yaml:",inline"` + BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the querier and store-gateway discover and synchronize blocks stored in the bucket."` + TSDB TSDBConfig `yaml:"tsdb"` + EphemeralTSDB EphemeralTSDBConfig `yaml:"ephemeral_tsdb"` } // DurationList is the block ranges for a tsdb @@ -127,6 +136,7 @@ func (cfg *BlocksStorageConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger cfg.Bucket.RegisterFlagsWithPrefixAndDefaultDirectory("blocks-storage.", "blocks", f, logger) cfg.BucketStore.RegisterFlags(f) cfg.TSDB.RegisterFlags(f) + cfg.EphemeralTSDB.RegisterFlags(f) } // Validate the config. @@ -206,19 +216,19 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionInterval, "blocks-storage.tsdb.head-compaction-interval", 1*time.Minute, "How frequently ingesters try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "blocks-storage.tsdb.head-compaction-concurrency", 1, "Maximum number of tenants concurrently compacting TSDB head into a new block") f.DurationVar(&cfg.HeadCompactionIdleTimeout, "blocks-storage.tsdb.head-compaction-idle-timeout", 1*time.Hour, "If TSDB head is idle for this duration, it is compacted. Note that up to 25% jitter is added to the value to avoid ingesters compacting concurrently. 0 means disabled.") - f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, "The write buffer size used by the head chunks mapper. Lower values reduce memory utilisation on clusters with a large number of tenants at the cost of increased disk I/O operations.") - f.Float64Var(&cfg.HeadChunksEndTimeVariance, "blocks-storage.tsdb.head-chunks-end-time-variance", 0, "How much variance (as percentage between 0 and 1) should be applied to the chunk end time, to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 means no variance.") - f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") + f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, headChunkWriterBufferSizeHelp) + f.Float64Var(&cfg.HeadChunksEndTimeVariance, "blocks-storage.tsdb.head-chunks-end-time-variance", 0, headChunksEndTimeVarianceHelp) + f.IntVar(&cfg.StripeSize, "blocks-storage.tsdb.stripe-size", 16384, headStripeSizeHelp) f.BoolVar(&cfg.WALCompressionEnabled, "blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") f.IntVar(&cfg.WALSegmentSizeBytes, "blocks-storage.tsdb.wal-segment-size-bytes", wlog.DefaultSegmentSize, "TSDB WAL segments files max size (bytes).") f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.") f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 13*time.Hour, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.") f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.") - f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", 1000000, "The size of the write queue used by the head chunks mapper. Lower values reduce memory utilisation at the cost of potentially higher ingest latency. Value of 0 switches chunks mapper to implementation without a queue.") + f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", 1000000, headChunksWriteQueueSizeHelp) f.IntVar(&cfg.OutOfOrderCapacityMax, "blocks-storage.tsdb.out-of-order-capacity-max", 32, "Maximum capacity for out of order chunks, in samples between 1 and 255.") - f.DurationVar(&cfg.HeadPostingsForMatchersCacheTTL, "blocks-storage.tsdb.head-postings-for-matchers-cache-ttl", 10*time.Second, "How long to cache postings for matchers in the Head and OOOHead. 0 disables the cache and just deduplicates the in-flight calls.") - f.IntVar(&cfg.HeadPostingsForMatchersCacheSize, "blocks-storage.tsdb.head-postings-for-matchers-cache-size", 100, "Maximum number of entries in the cache for postings for matchers in the Head and OOOHead when ttl > 0.") - f.BoolVar(&cfg.HeadPostingsForMatchersCacheForce, "blocks-storage.tsdb.head-postings-for-matchers-cache-force", false, "Force the cache to be used for postings for matchers in the Head and OOOHead, even if it's not a concurrent (query-sharding) call.") + f.DurationVar(&cfg.HeadPostingsForMatchersCacheTTL, "blocks-storage.tsdb.head-postings-for-matchers-cache-ttl", 10*time.Second, headPostingsForMatchersCacheTTLHelp) + f.IntVar(&cfg.HeadPostingsForMatchersCacheSize, "blocks-storage.tsdb.head-postings-for-matchers-cache-size", 100, headPostingsForMatchersCacheSizeHelp) + f.BoolVar(&cfg.HeadPostingsForMatchersCacheForce, "blocks-storage.tsdb.head-postings-for-matchers-cache-force", false, headPostingsForMatchersCacheForce) } // Validate the config. @@ -372,3 +382,40 @@ func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.") f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, and this check is enforced in the querier (at query time).") } + +// EphemeralTSDBConfig holds the config for Ephemeral Storage opened in the ingesters. +type EphemeralTSDBConfig struct { + Retention time.Duration `yaml:"retention_period" category:"experimental"` + HeadChunksWriteBufferSize int `yaml:"head_chunks_write_buffer_size_bytes" category:"experimental"` + HeadChunksEndTimeVariance float64 `yaml:"head_chunks_end_time_variance" category:"experimental"` + StripeSize int `yaml:"stripe_size" category:"experimental"` + HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size" category:"experimental"` + HeadPostingsForMatchersCacheTTL time.Duration `yaml:"head_postings_for_matchers_cache_ttl" category:"experimental"` + HeadPostingsForMatchersCacheSize int `yaml:"head_postings_for_matchers_cache_size" category:"experimental"` + HeadPostingsForMatchersCacheForce bool `yaml:"head_postings_for_matchers_cache_force" category:"experimental"` +} + +// RegisterFlags registers the TSDBConfig flags. +func (cfg *EphemeralTSDBConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.Retention, "blocks-storage.ephemeral-tsdb.retention-period", 10*time.Minute, "Retention of ephemeral series.") + f.IntVar(&cfg.HeadChunksWriteBufferSize, "blocks-storage.ephemeral-tsdb.head-chunks-write-buffer-size-bytes", chunks.DefaultWriteBufferSize, headChunkWriterBufferSizeHelp) + f.Float64Var(&cfg.HeadChunksEndTimeVariance, "blocks-storage.ephemeral-tsdb.head-chunks-end-time-variance", 0, headChunksEndTimeVarianceHelp) + f.IntVar(&cfg.StripeSize, "blocks-storage.ephemeral-tsdb.stripe-size", 16384, headStripeSizeHelp) + f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.ephemeral-tsdb.head-chunks-write-queue-size", 1000000, headChunksWriteQueueSizeHelp) + f.DurationVar(&cfg.HeadPostingsForMatchersCacheTTL, "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-ttl", 10*time.Second, headPostingsForMatchersCacheTTLHelp) + f.IntVar(&cfg.HeadPostingsForMatchersCacheSize, "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-size", 100, headPostingsForMatchersCacheSizeHelp) + f.BoolVar(&cfg.HeadPostingsForMatchersCacheForce, "blocks-storage.ephemeral-tsdb.head-postings-for-matchers-cache-force", false, headPostingsForMatchersCacheForce) +} + +// Validate the config. +func (cfg *EphemeralTSDBConfig) Validate() error { + if cfg.HeadChunksWriteBufferSize < chunks.MinWriteBufferSize || cfg.HeadChunksWriteBufferSize > chunks.MaxWriteBufferSize || cfg.HeadChunksWriteBufferSize%1024 != 0 { + return errors.Errorf("head chunks write buffer size must be a multiple of 1024 between %d and %d", chunks.MinWriteBufferSize, chunks.MaxWriteBufferSize) + } + + if cfg.StripeSize <= 1 || (cfg.StripeSize&(cfg.StripeSize-1)) != 0 { // ensure stripe size is a positive power of 2 + return errInvalidStripeSize + } + + return nil +}