From 110d996fe45f0c916143cffbbf692d57b1c7f935 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Fri, 24 Jun 2022 17:19:57 +0530 Subject: [PATCH] Add out-of-order sample support (#2187) * Add out-of-order sample support Signed-off-by: Ganesh Vernekar Co-authored-by: Jesus Vazquez * Fix review comments Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar * Update test to check runtime change of OutOfOrderTimeWindow Signed-off-by: Ganesh Vernekar * Fix race in the test Signed-off-by: Ganesh Vernekar * Fix Peter's comments Signed-off-by: Ganesh Vernekar * Fix CI Signed-off-by: Ganesh Vernekar * Fix review comments Signed-off-by: Ganesh Vernekar Co-authored-by: Jesus Vazquez --- CHANGELOG.md | 14 +- cmd/mimir/config-descriptor.json | 50 +++++-- cmd/mimir/help-all.txt.tmpl | 12 +- .../index.md | 33 ++++- .../operators-guide/mimir-runbooks/_index.md | 5 + pkg/ingester/ingester.go | 71 ++++++--- pkg/ingester/ingester_test.go | 138 +++++++++++++++++- pkg/ingester/metrics.go | 15 +- pkg/ingester/metrics_test.go | 32 +++- pkg/storage/tsdb/config.go | 8 +- pkg/util/globalerror/errors.go | 1 + pkg/util/validation/limits.go | 8 + 12 files changed, 325 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9539902202..23539e5dc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,15 @@ * [CHANGE] Config flag category overrides can be set dynamically at runtime. #1934 * [CHANGE] Ingester: deprecated `-ingester.ring.join-after`. Mimir now behaves as this setting is always set to 0s. This configuration option will be removed in Mimir 2.4.0. #1965 * [CHANGE] Blocks uploaded by ingester no longer contain `__org_id__` label. Compactor now ignores this label and will compact blocks with and without this label together. `mimirconvert` tool will remove the label from blocks as "unknown" label. #1972 +* [CHANGE] Querier: deprecated `-querier.shuffle-sharding-ingesters-lookback-period`, instead adding `-querier.shuffle-sharding-ingesters-enabled` to enable or disable shuffle sharding on the read path. The value of `-querier.query-ingesters-within` is now used internally for shuffle sharding lookback. #2110 +* [CHANGE] Memberlist: `-memberlist.abort-if-join-fails` now defaults to false. Previously it defaulted to true. #2168 +* [CHANGE] Ruler: `/api/v1/rules*` and `/prometheus/rules*` configuration endpoints are removed. Use `/prometheus/config/v1/rules*`. #2182 +* [CHANGE] Ingester: `-ingester.exemplars-update-period` has been renamed to `-ingester.tsdb-config-update-period`. You can use it to update multiple, per-tenant TSDB configurations. #2187 +* [FEATURE] Ingester: (Experimental) Add the ability to ingest out-of-order samples up to an allowed limit. If you enable this feature, it requires additional memory and disk space. This feature also enables a write-behind log, which might lead to longer ingester-start replays. When this feature is disabled, there is no overhead on memory, disk space, or startup times. #2187 + * `-ingester.out-of-order-time-window`, as duration string, allows you to set how back in time a sample can be. The default is `0s`, where `s` is seconds. + * `cortex_ingester_tsdb_out_of_order_samples_appended_total` metric tracks the total number of out-of-order samples ingested by the ingester. + * `cortex_discarded_samples_total` has a new label `reason="sample-too-old"`, when the `-ingester.out-of-order-time-window` flag is greater than zero. The label tracks the number of samples that were discarded for being too old; they were out of order, but beyond the time window allowed. +>>>>>>> adba8ec76 (Add out-of-order sample support (#2187)) * [ENHANCEMENT] Distributor: Added limit to prevent tenants from sending excessive number of requests: #1843 * The following CLI flags (and their respective YAML config options) have been added: * `-distributor.request-rate-limit` @@ -28,9 +37,12 @@ * [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028 * [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. Note this flag only has an effect when running on Linux. #2019 #2054 * [ENHANCEMENT] Chunk Mapper: reduce memory usage of async chunk mapper. #2043 -* [ENHANCEMENT] Ingesters: Added new configuration option that makes it possible for mimir ingesters to perform queries on overlapping blocks in the filesystem. Enabled with `-blocks-storage.tsdb.allow-overlapping-queries`. #2091 * [ENHANCEMENT] Ingester: reduce sleep time when reading WAL. #2098 * [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. #1694 +* [ENHANCEMENT] Compactor: Run sanity check on blocks storage configuration at startup. #2143 +* [ENHANCEMENT] Compactor: Add HTTP API for uploading TSDB blocks. Enabled with `-compactor.block-upload-enabled`. #1694 #2126 +* [ENHANCEMENT] Ingester: Enable querying overlapping blocks by default. #2187 +>>>>>>> adba8ec76 (Add out-of-order sample support (#2187)) * [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883 * [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893 * [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933 diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 93b450125f..f20f1e6bc6 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -2306,12 +2306,12 @@ }, { "kind": "field", - "name": "exemplars_update_period", + "name": "tsdb_config_update_period", "required": false, - "desc": "Period with which to update per-tenant max exemplar limit.", + "desc": "Period with which to update the per-tenant TSDB configuration.", "fieldValue": null, "fieldDefaultValue": 15000000000, - "fieldFlag": "ingester.exemplars-update-period", + "fieldFlag": "ingester.tsdb-config-update-period", "fieldType": "duration", "fieldCategory": "experimental" }, @@ -2648,6 +2648,17 @@ "fieldType": "map of tracker name (string) to matcher (string)", "fieldCategory": "advanced" }, + { + "kind": "field", + "name": "out_of_order_time_window", + "required": false, + "desc": "Non-zero value enables out-of-order support for most recent samples that are within the time window in relation to the following two conditions: (1) The newest sample for that time series, if it exists. For example, within [series.maxTime-timeWindow, series.maxTime]). (2) The TSDB's maximum time, if the series does not exist. For example, within [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a factor of _rate of out-of-order samples being ingested_ and _the number of series that are getting out-of-order samples_. You can configure it per tenant.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingester.out-of-order-time-window", + "fieldType": "duration", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "max_fetched_chunks_per_query", @@ -5370,17 +5381,6 @@ "fieldType": "boolean", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "allow_overlapping_queries", - "required": false, - "desc": "Enable querying overlapping blocks. If there are going to be overlapping blocks in the ingesters this should be enabled.", - "fieldValue": null, - "fieldDefaultValue": false, - "fieldFlag": "blocks-storage.tsdb.allow-overlapping-queries", - "fieldType": "boolean", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "series_hash_cache_max_size_bytes", @@ -5402,6 +5402,28 @@ "fieldFlag": "blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup", "fieldType": "int", "fieldCategory": "advanced" + }, + { + "kind": "field", + "name": "out_of_order_cap_min", + "required": false, + "desc": "Minimum capacity for out-of-order chunks, in samples between 0 and 255.", + "fieldValue": null, + "fieldDefaultValue": 4, + "fieldFlag": "blocks-storage.tsdb.out-of-order-cap-min", + "fieldType": "int", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "out_of_order_cap_max", + "required": false, + "desc": "Maximum capacity for out of order chunks, in samples between 1 and 255.", + "fieldValue": null, + "fieldDefaultValue": 32, + "fieldFlag": "blocks-storage.tsdb.out-of-order-cap-max", + "fieldType": "int", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 363edb9380..ea44e8644a 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -477,8 +477,6 @@ Usage of ./cmd/mimir/mimir: OpenStack Swift user ID. -blocks-storage.swift.username string OpenStack Swift username. - -blocks-storage.tsdb.allow-overlapping-queries - [experimental] Enable querying overlapping blocks. If there are going to be overlapping blocks in the ingesters this should be enabled. -blocks-storage.tsdb.block-ranges-period value TSDB blocks range period. (default 2h0m0s) -blocks-storage.tsdb.close-idle-tsdb-timeout duration @@ -507,6 +505,10 @@ Usage of ./cmd/mimir/mimir: [experimental] True to enable snapshotting of in-memory TSDB data on disk when shutting down. -blocks-storage.tsdb.new-chunk-disk-mapper [experimental] Temporary flag to select whether to use the new (used in upstream Prometheus) or the old (legacy) chunk disk mapper. + -blocks-storage.tsdb.out-of-order-cap-max int + [experimental] Maximum capacity for out of order chunks, in samples between 1 and 255. (default 32) + -blocks-storage.tsdb.out-of-order-cap-min int + [experimental] Minimum capacity for out-of-order chunks, in samples between 0 and 255. (default 4) -blocks-storage.tsdb.retention-period duration TSDB blocks retention in the ingester before a block is removed, relative to the newest block written for the tenant. This should be larger than the -blocks-storage.tsdb.block-ranges-period, -querier.query-store-after and large enough to give store-gateways and queriers enough time to discover newly uploaded blocks. (default 24h0m0s) -blocks-storage.tsdb.series-hash-cache-max-size-bytes uint @@ -839,8 +841,6 @@ Usage of ./cmd/mimir/mimir: Path to the key file for the client certificate. Also requires the client certificate to be configured. -ingester.client.tls-server-name string Override the expected name on the server certificate. - -ingester.exemplars-update-period duration - [experimental] Period with which to update per-tenant max exemplar limit. (default 15s) -ingester.ignore-series-limit-for-metric-names string Comma-separated list of metric names, for which the -ingester.max-global-series-per-metric limit will be ignored. Does not affect the -ingester.max-global-series-per-user limit. -ingester.instance-limits.max-inflight-push-requests int @@ -863,6 +863,8 @@ Usage of ./cmd/mimir/mimir: The maximum number of active series per tenant, across the cluster before replication. 0 to disable. (default 150000) -ingester.metadata-retain-period duration Period at which metadata we have not seen will remain in memory before being deleted. (default 10m0s) + -ingester.out-of-order-time-window value + [experimental] Non-zero value enables out-of-order support for most recent samples that are within the time window in relation to the following two conditions: (1) The newest sample for that time series, if it exists. For example, within [series.maxTime-timeWindow, series.maxTime]). (2) The TSDB's maximum time, if the series does not exist. For example, within [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a factor of _rate of out-of-order samples being ingested_ and _the number of series that are getting out-of-order samples_. You can configure it per tenant. -ingester.rate-update-period duration Period with which to update the per-tenant ingestion rates. (default 15s) -ingester.ring.consul.acl-token string @@ -947,6 +949,8 @@ Usage of ./cmd/mimir/mimir: True to enable the zone-awareness and replicate ingested samples across different availability zones. This option needs be set on ingesters, distributors, queriers and rulers when running in microservices mode. -ingester.stream-chunks-when-using-blocks Stream chunks from ingesters to queriers. (default true) + -ingester.tsdb-config-update-period duration + [experimental] Period with which to update the per-tenant TSDB configuration. (default 15s) -log.format value Output log messages in the given format. Valid formats: [logfmt, json] (default logfmt) -log.level value diff --git a/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md b/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md index 5ed653b5f2..bfb687850c 100644 --- a/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md +++ b/docs/sources/operators-guide/configuring/reference-configuration-parameters/index.md @@ -741,9 +741,9 @@ ring: # prod: '{namespace=~"prod-.*"}' [active_series_custom_trackers: | default = ] -# (experimental) Period with which to update per-tenant max exemplar limit. -# CLI flag: -ingester.exemplars-update-period -[exemplars_update_period: | default = 15s] +# (experimental) Period with which to update the per-tenant TSDB configuration. +# CLI flag: -ingester.tsdb-config-update-period +[tsdb_config_update_period: | default = 15s] instance_limits: # (advanced) Max ingestion rate (samples/sec) that ingester will accept. This @@ -2721,6 +2721,18 @@ The `limits` block configures default and per-tenant limits imposed by component # CLI flag: -ingester.active-series-custom-trackers [active_series_custom_trackers: | default = ] +# (experimental) Non-zero value enables out-of-order support for most recent +# samples that are within the time window in relation to the following two +# conditions: (1) The newest sample for that time series, if it exists. For +# example, within [series.maxTime-timeWindow, series.maxTime]). (2) The TSDB's +# maximum time, if the series does not exist. For example, within +# [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a +# factor of _rate of out-of-order samples being ingested_ and _the number of +# series that are getting out-of-order samples_. You can configure it per +# tenant. +# CLI flag: -ingester.out-of-order-time-window +[out_of_order_time_window: | default = 0s] + # Maximum number of chunks that can be fetched in a single query from ingesters # and long-term storage. This limit is enforced in the querier, ruler and # store-gateway. 0 to disable. @@ -3515,11 +3527,6 @@ tsdb: # CLI flag: -blocks-storage.tsdb.isolation-enabled [isolation_enabled: | default = false] - # (experimental) Enable querying overlapping blocks. If there are going to be - # overlapping blocks in the ingesters this should be enabled. - # CLI flag: -blocks-storage.tsdb.allow-overlapping-queries - [allow_overlapping_queries: | default = false] - # (advanced) Max size - in bytes - of the in-memory series hash cache. The # cache is shared across all tenants and it's used only when query sharding is # enabled. @@ -3529,6 +3536,16 @@ tsdb: # (advanced) limit the number of concurrently opening TSDB's on startup # CLI flag: -blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] + + # (experimental) Minimum capacity for out-of-order chunks, in samples between + # 0 and 255. + # CLI flag: -blocks-storage.tsdb.out-of-order-cap-min + [out_of_order_cap_min: | default = 4] + + # (experimental) Maximum capacity for out of order chunks, in samples between + # 1 and 255. + # CLI flag: -blocks-storage.tsdb.out-of-order-cap-max + [out_of_order_cap_max: | default = 32] ``` ### compactor diff --git a/docs/sources/operators-guide/mimir-runbooks/_index.md b/docs/sources/operators-guide/mimir-runbooks/_index.md index 8adc187d79..654caa66e7 100644 --- a/docs/sources/operators-guide/mimir-runbooks/_index.md +++ b/docs/sources/operators-guide/mimir-runbooks/_index.md @@ -1412,6 +1412,11 @@ Common **causes**: > **Note**: You can learn more about out of order samples in Prometheus, in the blog post [Debugging out of order samples](https://www.robustperception.io/debugging-out-of-order-samples/). +### err-mimir-sample-too-old + +This error is similar to `err-mimir-sample-out-of-order`. The main difference is that the out-of-order support is enabled, but the sample is +older than the out-of-order time window as it relates to the latest sample for that particular time series or the TSDB. + ### err-mimir-sample-duplicate-timestamp This error occurs when the ingester rejects a sample because it is a duplicate of a previously received sample with the same timestamp but different value in the same time series. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e600aa4bd1..703ba06fc4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -85,6 +85,7 @@ const ( instanceIngestionRateTickInterval = time.Second sampleOutOfOrder = "sample-out-of-order" + sampleTooOld = "sample-too-old" newValueForTimestamp = "new-value-for-timestamp" sampleOutOfBounds = "sample-out-of-bounds" ) @@ -122,7 +123,7 @@ type Config struct { ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout" category:"advanced"` ActiveSeriesCustomTrackers activeseries.CustomTrackersConfig `yaml:"active_series_custom_trackers" doc:"description=[Deprecated] This config has been moved to the limits config, please set it there. Additional custom trackers for active metrics. If there are active series matching a provided matcher (map value), the count will be exposed in the custom trackers metric labeled using the tracker name (map key). Zero valued counts are not exposed (and removed when they go back to zero)." category:"advanced"` - ExemplarsUpdatePeriod time.Duration `yaml:"exemplars_update_period" category:"experimental"` + TSDBConfigUpdatePeriod time.Duration `yaml:"tsdb_config_update_period" category:"experimental"` BlocksStorageConfig mimir_tsdb.BlocksStorageConfig `yaml:"-"` StreamChunksWhenUsingBlocks bool `yaml:"-" category:"advanced"` @@ -150,7 +151,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", true, "Stream chunks from ingesters to queriers.") - f.DurationVar(&cfg.ExemplarsUpdatePeriod, "ingester.exemplars-update-period", 15*time.Second, "Period with which to update per-tenant max exemplar limit.") + f.DurationVar(&cfg.TSDBConfigUpdatePeriod, "ingester.tsdb-config-update-period", 15*time.Second, "Period with which to update the per-tenant TSDB configuration.") cfg.DefaultLimits.RegisterFlags(f) @@ -413,8 +414,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { ingestionRateTicker := time.NewTicker(instanceIngestionRateTickInterval) defer ingestionRateTicker.Stop() - exemplarUpdateTicker := time.NewTicker(i.cfg.ExemplarsUpdatePeriod) - defer exemplarUpdateTicker.Stop() + tsdbUpdateTicker := time.NewTicker(i.cfg.TSDBConfigUpdatePeriod) + defer tsdbUpdateTicker.Stop() var activeSeriesTickerChan <-chan time.Time if i.cfg.ActiveSeriesMetricsEnabled { @@ -441,8 +442,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { } i.tsdbsMtx.RUnlock() - case <-exemplarUpdateTicker.C: - i.applyExemplarsSettings() + case <-tsdbUpdateTicker.C: + i.applyTSDBSettings() case <-activeSeriesTickerChan: i.updateActiveSeries(time.Now()) @@ -495,14 +496,21 @@ func (i *Ingester) updateActiveSeries(now time.Time) { } } -// Go through all tenants and apply the current max-exemplars setting. -// If it changed, tsdb will resize the buffer; if it didn't change tsdb will return quickly. -func (i *Ingester) applyExemplarsSettings() { +// applyTSDBSettings goes through all tenants and applies +// * The current max-exemplars setting. If it changed, tsdb will resize the buffer; if it didn't change tsdb will return quickly. +// * The current out-of-order time window. If it changes from 0 to >0, then a new Write-Behind-Log gets created for that tenant. +func (i *Ingester) applyTSDBSettings() { for _, userID := range i.getTSDBUsers() { globalValue := i.limits.MaxGlobalExemplarsPerUser(userID) localValue := i.limiter.convertGlobalToLocalLimit(userID, globalValue) - // We populate a Config struct with just one value, which is OK - // because Head.ApplyConfig only looks at one value. + + oooTW := i.limits.OutOfOrderTimeWindow(userID) + if oooTW < 0 { + oooTW = 0 + } + + // We populate a Config struct with just TSDB related config, which is OK + // because DB.ApplyConfig only looks at the specified config. // The other fields in Config are things like Rules, Scrape // settings, which don't apply to Head. cfg := promcfg.Config{ @@ -510,13 +518,16 @@ func (i *Ingester) applyExemplarsSettings() { ExemplarsConfig: &promcfg.ExemplarsConfig{ MaxExemplars: int64(localValue), }, + TSDBConfig: &promcfg.TSDBConfig{ + OutOfOrderAllowance: time.Duration(oooTW).Milliseconds(), + }, }, } - tsdb := i.getTSDB(userID) - if tsdb == nil { + db := i.getTSDB(userID) + if db == nil { continue } - if err := tsdb.Head().ApplyConfig(&cfg); err != nil { + if err := db.db.ApplyConfig(&cfg); err != nil { level.Error(i.logger).Log("msg", "failed to apply config to TSDB", "user", userID, "err", err) } } @@ -599,6 +610,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques startAppend = time.Now() sampleOutOfBoundsCount = 0 sampleOutOfOrderCount = 0 + sampleTooOldCount = 0 newValueForTimestampCount = 0 perUserSeriesLimitCount = 0 perMetricSeriesLimitCount = 0 @@ -620,12 +632,17 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques otlog.Int("numseries", len(req.Timeseries))) } + oooTW := i.limits.OutOfOrderTimeWindow(userID) for _, ts := range req.Timeseries { // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). - // Fast path in case we only have samples and they are all out of bounds. - if minAppendTimeAvailable && len(ts.Samples) > 0 && len(ts.Exemplars) == 0 && allOutOfBounds(ts.Samples, minAppendTime) { + // Fast path in case we only have samples and they are all out of bound + // and out-of-order support is not enabled. + // TODO(jesus.vazquez) If we had too many old samples we might want to + // extend the fast path to fail early. + if oooTW <= 0 && minAppendTimeAvailable && + len(ts.Samples) > 0 && len(ts.Exemplars) == 0 && allOutOfBounds(ts.Samples, minAppendTime) { failedSamplesCount += len(ts.Samples) sampleOutOfBoundsCount += len(ts.Samples) @@ -678,6 +695,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques updateFirstPartial(func() error { return newIngestErrSampleOutOfOrder(model.Time(s.TimestampMs), ts.Labels) }) continue + case storage.ErrTooOldSample: + sampleTooOldCount++ + updateFirstPartial(func() error { return newIngestErrSampleTooOld(model.Time(s.TimestampMs), ts.Labels) }) + continue + case storage.ErrDuplicateSampleForTimestamp: newValueForTimestampCount++ updateFirstPartial(func() error { @@ -781,6 +803,9 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques if sampleOutOfOrderCount > 0 { validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount)) } + if sampleTooOldCount > 0 { + validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount)) + } if newValueForTimestampCount > 0 { validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount)) } @@ -1453,6 +1478,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } maxExemplars := i.limiter.convertGlobalToLocalLimit(userID, i.limits.MaxGlobalExemplarsPerUser(userID)) + oooTW := time.Duration(i.limits.OutOfOrderTimeWindow(userID)) // Create a new user database db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{ RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), @@ -1473,8 +1499,11 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { IsolationDisabled: !i.cfg.BlocksStorageConfig.TSDB.IsolationEnabled, HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize, NewChunkDiskMapper: i.cfg.BlocksStorageConfig.TSDB.NewChunkDiskMapper, - AllowOverlappingQueries: i.cfg.BlocksStorageConfig.TSDB.AllowOverlappingQueries, - AllowOverlappingCompaction: false, // always false since Mimir only uploads lvl 1 compacted blocks + AllowOverlappingQueries: true, // We can have overlapping blocks from past or out-of-order enabled during runtime. + AllowOverlappingCompaction: false, // always false since Mimir only uploads lvl 1 compacted blocks + OutOfOrderAllowance: oooTW.Milliseconds(), // The unit must be same as our timestamps. + OutOfOrderCapMin: int64(i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMin), + OutOfOrderCapMax: int64(i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax), }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) @@ -2081,7 +2110,11 @@ func newIngestErrSampleTimestampTooOld(timestamp model.Time, labels []mimirpb.La } func newIngestErrSampleOutOfOrder(timestamp model.Time, labels []mimirpb.LabelAdapter) error { - return newIngestErr(globalerror.SampleOutOfOrder, "the sample has been rejected because another sample with a more recent timestamp has already been ingested and out of order samples are not allowed", timestamp, labels) + return newIngestErr(globalerror.SampleOutOfOrder, "the sample has been rejected because another sample with a more recent timestamp has already been ingested and out-of-order samples are not allowed", timestamp, labels) +} + +func newIngestErrSampleTooOld(timestamp model.Time, labels []mimirpb.LabelAdapter) error { + return newIngestErr(globalerror.SampleTooOld, "the sample has been rejected because another sample with a more recent timestamp has already been ingested and this sample is beyond the out-of-order time window", timestamp, labels) } func newIngestErrSampleDuplicateTimestamp(timestamp model.Time, labels []mimirpb.LabelAdapter) error { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index eb4d655e12..2984289847 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -257,7 +257,7 @@ func TestIngester_Push(t *testing.T) { # TYPE cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds gauge cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="test"} 1 - # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out of order exemplar ingestion failed attempts. + # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out-of-order exemplar ingestion failed attempts. # TYPE cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total counter cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total 0 `, @@ -303,7 +303,7 @@ func TestIngester_Push(t *testing.T) { cortex_ingester_memory_series_removed_total{user="test"} 0 `, }, - "should soft fail on sample out of order": { + "should soft fail on sample out-of-order": { reqs: []*mimirpb.WriteRequest{ mimirpb.ToWriteRequest( []labels.Labels{metricLabels}, @@ -574,7 +574,7 @@ func TestIngester_Push(t *testing.T) { # TYPE cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds gauge cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="test"} 0 - # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out of order exemplar ingestion failed attempts. + # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out-of-order exemplar ingestion failed attempts. # TYPE cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total counter cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total 0 `, @@ -1098,7 +1098,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { } }, }, - "out of order samples": { + "out-of-order samples": { prepareConfig: func(limits *validation.Limits, instanceLimits *InstanceLimits) bool { return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // For each series, push a single sample with a timestamp greater than next pushes. @@ -1117,7 +1117,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []mimirpb.Sample) { expectedErr := storage.ErrOutOfOrderSample.Error() - // Push out of order samples. + // Push out-of-order samples. for n := 0; n < b.N; n++ { _, err := ingester.Push(ctx, mimirpb.ToWriteRequest(metrics, samples, nil, nil, mimirpb.API)) // nolint:errcheck @@ -5757,6 +5757,132 @@ func TestGetIgnoreSeriesLimitForMetricNamesMap(t *testing.T) { require.Equal(t, map[string]struct{}{"foo": {}, "bar": {}}, cfg.getIgnoreSeriesLimitForMetricNamesMap()) } +// Test_Ingester_OutOfOrder tests basic ingestion and query of out-of-order samples. +// It also tests if the OutOfOrderTimeWindow gets changed during runtime. +// The correctness of changed runtime is already tested in Prometheus, so we only check if the +// change is being applied here. +func Test_Ingester_OutOfOrder(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.TSDBConfigUpdatePeriod = 1 * time.Second + + l := defaultLimitsTestConfig() + tenantOverride := new(TenantLimitsMock) + tenantOverride.On("ByUserID", "test").Return(nil) + override, err := validation.NewOverrides(l, tenantOverride) + require.NoError(t, err) + + setOOOTimeWindow := func(oooTW model.Duration) { + tenantOverride.ExpectedCalls = nil + tenantOverride.On("ByUserID", "test").Return(&validation.Limits{ + OutOfOrderTimeWindow: oooTW, + }) + // TSDB config is updated every second. + <-time.After(1500 * time.Millisecond) + } + + i, err := prepareIngesterWithBlockStorageAndOverrides(t, cfg, override, "", nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's healthy + test.Poll(t, 1*time.Second, 1, func() interface{} { + return i.lifecycler.HealthyInstancesCount() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + + pushSamples := func(start, end int64, expErr bool) { + start = start * time.Minute.Milliseconds() + end = end * time.Minute.Milliseconds() + + s := labels.FromStrings(labels.MetricName, "test_1", "status", "200") + var samples []mimirpb.Sample + var lbls []labels.Labels + for ts := start; ts <= end; ts += time.Minute.Milliseconds() { + samples = append(samples, mimirpb.Sample{ + TimestampMs: ts, + Value: float64(ts), + }) + lbls = append(lbls, s) + } + + wReq := mimirpb.ToWriteRequest(lbls, samples, nil, nil, mimirpb.API) + _, err = i.Push(ctx, wReq) + if expErr { + require.Error(t, err, "should have failed on push") + require.ErrorAs(t, err, &storage.ErrTooOldSample) + } else { + require.NoError(t, err) + } + } + + verifySamples := func(start, end int64) { + start = start * time.Minute.Milliseconds() + end = end * time.Minute.Milliseconds() + + var expSamples []model.SamplePair + for ts := start; ts <= end; ts += time.Minute.Milliseconds() { + expSamples = append(expSamples, model.SamplePair{ + Timestamp: model.Time(ts), + Value: model.SampleValue(ts), + }) + } + expMatrix := model.Matrix{{ + Metric: model.Metric{"__name__": "test_1", "status": "200"}, + Values: expSamples, + }} + + req := &client.QueryRequest{ + StartTimestampMs: math.MinInt64, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{ + {Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"}, + }, + } + + s := stream{ctx: ctx} + err = i.QueryStream(req, &s) + require.NoError(t, err) + + res, err := chunkcompat.StreamsToMatrix(model.Earliest, model.Latest, s.responses) + require.NoError(t, err) + assert.ElementsMatch(t, expMatrix, res) + } + + // Push first in-order sample at minute 100. + pushSamples(100, 100, false) + verifySamples(100, 100) + + // OOO is not enabled. So it errors out. No sample ingested. + pushSamples(90, 99, true) + verifySamples(100, 100) + + // Increasing the OOO time window. + setOOOTimeWindow(model.Duration(30 * time.Minute)) + // Now it works. + pushSamples(90, 99, false) + verifySamples(90, 100) + + // Gives an error for sample 69 since it's outside time window, but rest is ingested. + pushSamples(69, 99, true) + verifySamples(70, 100) + + // All beyond the ooo time window. None ingested. + pushSamples(50, 69, true) + verifySamples(70, 100) + + // Increase the time window again. It works. + setOOOTimeWindow(model.Duration(60 * time.Minute)) + pushSamples(50, 69, false) + verifySamples(50, 100) + + // Decrease the time window again. Same push should fail. + setOOOTimeWindow(model.Duration(30 * time.Minute)) + pushSamples(50, 69, true) + verifySamples(50, 100) +} + func TestNewIngestErrMsgs(t *testing.T) { timestamp := model.Time(1575043969) metricLabelAdapters := []mimirpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} @@ -5771,7 +5897,7 @@ func TestNewIngestErrMsgs(t *testing.T) { }, "newIngestErrSampleOutOfOrder": { err: newIngestErrSampleOutOfOrder(timestamp, metricLabelAdapters), - msg: `the sample has been rejected because another sample with a more recent timestamp has already been ingested and out of order samples are not allowed (err-mimir-sample-out-of-order). The affected sample has timestamp 1970-01-19T05:30:43.969Z and is from series {__name__="test"}`, + msg: `the sample has been rejected because another sample with a more recent timestamp has already been ingested and out-of-order samples are not allowed (err-mimir-sample-out-of-order). The affected sample has timestamp 1970-01-19T05:30:43.969Z and is from series {__name__="test"}`, }, "newIngestErrSampleDuplicateTimestamp": { err: newIngestErrSampleDuplicateTimestamp(timestamp, metricLabelAdapters), diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 361130d7ec..a153610bdf 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -323,6 +323,8 @@ type tsdbMetrics struct { tsdbTimeRetentionCount *prometheus.Desc tsdbBlocksBytes *prometheus.Desc + tsdbOOOAppendedSamples *prometheus.Desc + checkpointDeleteFail *prometheus.Desc checkpointDeleteTotal *prometheus.Desc checkpointCreationFail *prometheus.Desc @@ -436,7 +438,7 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { []string{"operation"}, nil), tsdbOOOHistogram: prometheus.NewDesc( "cortex_ingester_tsdb_sample_out_of_order_delta_seconds", - "Delta in seconds by which a sample is considered out of order.", + "Delta in seconds by which a sample is considered out-of-order.", nil, nil), tsdbLoadedBlocks: prometheus.NewDesc( "cortex_ingester_tsdb_blocks_loaded", @@ -502,7 +504,12 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { []string{"user"}, nil), tsdbExemplarsOutOfOrder: prometheus.NewDesc( "cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total", - "Total number of out of order exemplar ingestion failed attempts.", + "Total number of out-of-order exemplar ingestion failed attempts.", + nil, nil), + + tsdbOOOAppendedSamples: prometheus.NewDesc( + "cortex_ingester_tsdb_out_of_order_samples_appended_total", + "Total number of out-of-order samples appended.", nil, nil), memSeriesCreatedTotal: prometheus.NewDesc( @@ -565,6 +572,8 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { out <- sm.tsdbExemplarLastTs out <- sm.tsdbExemplarsOutOfOrder + out <- sm.tsdbOOOAppendedSamples + out <- sm.memSeriesCreatedTotal out <- sm.memSeriesRemovedTotal } @@ -615,6 +624,8 @@ func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfGaugesPerUser(out, sm.tsdbExemplarLastTs, "prometheus_tsdb_exemplar_last_exemplars_timestamp_seconds") data.SendSumOfCounters(out, sm.tsdbExemplarsOutOfOrder, "prometheus_tsdb_exemplar_out_of_order_exemplars_total") + data.SendSumOfCounters(out, sm.tsdbOOOAppendedSamples, "prometheus_tsdb_head_out_of_order_samples_appended_total") + data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total") data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") } diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 2d9dddafda..0942dafb9d 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -187,7 +187,7 @@ func TestTSDBMetrics(t *testing.T) { # TYPE cortex_ingester_tsdb_reloads_total counter cortex_ingester_tsdb_reloads_total 30 - # HELP cortex_ingester_tsdb_sample_out_of_order_delta_seconds Delta in seconds by which a sample is considered out of order. + # HELP cortex_ingester_tsdb_sample_out_of_order_delta_seconds Delta in seconds by which a sample is considered out-of-order. # TYPE cortex_ingester_tsdb_sample_out_of_order_delta_seconds histogram # observations buckets # 600 @@ -225,7 +225,7 @@ func TestTSDBMetrics(t *testing.T) { cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="user2"} 1234 cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="user3"} 1234 - # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out of order exemplar ingestion failed attempts. + # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out-of-order exemplar ingestion failed attempts. # TYPE cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total counter cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total 9 @@ -241,6 +241,10 @@ func TestTSDBMetrics(t *testing.T) { cortex_ingester_tsdb_exemplar_exemplars_appended_total{user="user2"} 100 cortex_ingester_tsdb_exemplar_exemplars_appended_total{user="user3"} 100 + # HELP cortex_ingester_tsdb_out_of_order_samples_appended_total Total number of out-of-order samples appended. + # TYPE cortex_ingester_tsdb_out_of_order_samples_appended_total counter + cortex_ingester_tsdb_out_of_order_samples_appended_total 9 + # HELP cortex_ingester_tsdb_exemplar_exemplars_in_storage Number of TSDB exemplars currently in storage. # TYPE cortex_ingester_tsdb_exemplar_exemplars_in_storage gauge cortex_ingester_tsdb_exemplar_exemplars_in_storage 30 @@ -417,7 +421,7 @@ func TestTSDBMetricsWithRemoval(t *testing.T) { # TYPE cortex_ingester_tsdb_reloads_total counter cortex_ingester_tsdb_reloads_total 30 - # HELP cortex_ingester_tsdb_sample_out_of_order_delta_seconds Delta in seconds by which a sample is considered out of order. + # HELP cortex_ingester_tsdb_sample_out_of_order_delta_seconds Delta in seconds by which a sample is considered out-of-order. # TYPE cortex_ingester_tsdb_sample_out_of_order_delta_seconds histogram # observations buckets # 600 @@ -452,7 +456,7 @@ func TestTSDBMetricsWithRemoval(t *testing.T) { cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="user1"} 1234 cortex_ingester_tsdb_exemplar_last_exemplars_timestamp_seconds{user="user2"} 1234 - # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out of order exemplar ingestion failed attempts. + # HELP cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total Total number of out-of-order exemplar ingestion failed attempts. # TYPE cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total counter cortex_ingester_tsdb_exemplar_out_of_order_exemplars_total 9 @@ -469,6 +473,10 @@ func TestTSDBMetricsWithRemoval(t *testing.T) { # HELP cortex_ingester_tsdb_exemplar_exemplars_in_storage Number of TSDB exemplars currently in storage. # TYPE cortex_ingester_tsdb_exemplar_exemplars_in_storage gauge cortex_ingester_tsdb_exemplar_exemplars_in_storage 20 + + # HELP cortex_ingester_tsdb_out_of_order_samples_appended_total Total number of out-of-order samples appended. + # TYPE cortex_ingester_tsdb_out_of_order_samples_appended_total counter + cortex_ingester_tsdb_out_of_order_samples_appended_total 9 `)) require.NoError(t, err) } @@ -638,7 +646,7 @@ func populateTSDBMetrics(base float64) *prometheus.Registry { tsdbOOOHistogram := promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_sample_ooo_delta", - Help: "Delta in seconds by which a sample is considered out of order.", + Help: "Delta in seconds by which a sample is considered out-of-order.", Buckets: []float64{60 * 10, 60 * 60 * 24}, // for testing: 3 buckets: 10 min, 24 hour, and inf }) tsdbOOOHistogram.Observe(7 * base) @@ -731,9 +739,21 @@ func populateTSDBMetrics(base float64) *prometheus.Registry { exemplarsOutOfOrderTotal := promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_exemplar_out_of_order_exemplars_total", - Help: "Total number of out of order exemplar ingestion failed attempts.", + Help: "Total number of out-of-order exemplar ingestion failed attempts.", }) exemplarsOutOfOrderTotal.Add(3) + outOfOrderSamplesAppendedTotal := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_head_out_of_order_samples_appended_total", + Help: "Total number of appended out-of-order samples.", + }) + outOfOrderSamplesAppendedTotal.Add(3) + + tooOldSamplesTotal := promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_too_old_samples_total", + Help: "Total number of out-of-order samples ingestion failed attempts.", + }) + tooOldSamplesTotal.Add(3) + return r } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b25a6f7045..bdecd8840d 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -164,7 +164,6 @@ type TSDBConfig struct { HeadChunksWriteQueueSize int `yaml:"head_chunks_write_queue_size" category:"experimental"` NewChunkDiskMapper bool `yaml:"new_chunk_disk_mapper" category:"experimental"` IsolationEnabled bool `yaml:"isolation_enabled" category:"advanced"` // TODO Remove in Mimir 2.3.0 - AllowOverlappingQueries bool `yaml:"allow_overlapping_queries" category:"experimental"` // Series hash cache. SeriesHashCacheMaxBytes uint64 `yaml:"series_hash_cache_max_size_bytes" category:"advanced"` @@ -178,6 +177,10 @@ type TSDBConfig struct { // How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override. CloseIdleTSDBInterval time.Duration `yaml:"-"` + + // For experimental out of order metrics support. + OutOfOrderCapMin int `yaml:"out_of_order_cap_min" category:"experimental"` + OutOfOrderCapMax int `yaml:"out_of_order_cap_max" category:"experimental"` } // RegisterFlags registers the TSDBConfig flags. @@ -207,7 +210,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", 0, "The size of the write queue used by the head chunks mapper. Lower values reduce memory utilisation at the cost of potentially higher ingest latency. Value of 0 switches chunks mapper to implementation without a queue. This flag is only used if the new chunk disk mapper is enabled with -blocks-storage.tsdb.new-chunk-disk-mapper.") f.BoolVar(&cfg.NewChunkDiskMapper, "blocks-storage.tsdb.new-chunk-disk-mapper", false, "Temporary flag to select whether to use the new (used in upstream Prometheus) or the old (legacy) chunk disk mapper.") f.BoolVar(&cfg.IsolationEnabled, "blocks-storage.tsdb.isolation-enabled", false, "[Deprecated] Enables TSDB isolation feature. Disabling may improve performance.") - f.BoolVar(&cfg.AllowOverlappingQueries, "blocks-storage.tsdb.allow-overlapping-queries", false, "Enable querying overlapping blocks. If there are going to be overlapping blocks in the ingesters this should be enabled.") + f.IntVar(&cfg.OutOfOrderCapMin, "blocks-storage.tsdb.out-of-order-cap-min", 4, "Minimum capacity for out-of-order chunks, in samples between 0 and 255.") + f.IntVar(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", 32, "Maximum capacity for out of order chunks, in samples between 1 and 255.") } // Validate the config. diff --git a/pkg/util/globalerror/errors.go b/pkg/util/globalerror/errors.go index 1fa5bb1aa9..9d14daf898 100644 --- a/pkg/util/globalerror/errors.go +++ b/pkg/util/globalerror/errors.go @@ -55,6 +55,7 @@ const ( SampleTimestampTooOld ID = "sample-timestamp-too-old" SampleOutOfOrder ID = "sample-out-of-order" + SampleTooOld ID = "sample-too-old" SampleDuplicateTimestamp ID = "sample-duplicate-timestamp" ExemplarSeriesMissing ID = "exemplar-series-missing" ) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index c2c6f69db0..9485f8a531 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -95,6 +95,8 @@ type Limits struct { // TODO remove this with Mimir version 2.4 ActiveSeriesCustomTrackersConfigOld activeseries.CustomTrackersConfig `yaml:"active_series_custom_trackers_config" json:"active_series_custom_trackers_config" doc:"hidden"` ActiveSeriesCustomTrackersConfig activeseries.CustomTrackersConfig `yaml:"active_series_custom_trackers" json:"active_series_custom_trackers" doc:"description=Additional custom trackers for active metrics. If there are active series matching a provided matcher (map value), the count will be exposed in the custom trackers metric labeled using the tracker name (map key). Zero valued counts are not exposed (and removed when they go back to zero)." category:"advanced"` + // Max allowed time window for out-of-order samples. + OutOfOrderTimeWindow model.Duration `yaml:"out_of_order_time_window" json:"out_of_order_time_window" category:"experimental"` // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` @@ -178,6 +180,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, MaxMetadataPerMetricFlag, 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxGlobalExemplarsPerUser, "ingester.max-global-exemplars-per-user", 0, "The maximum number of exemplars in memory, across the cluster. 0 to disable exemplars ingestion.") f.Var(&l.ActiveSeriesCustomTrackersConfig, "ingester.active-series-custom-trackers", "Additional active series metrics, matching the provided matchers. Matchers should be in form :, like 'foobar:{foo=\"bar\"}'. Multiple matchers can be provided either providing the flag multiple times or providing multiple semicolon-separated values to a single flag.") + f.Var(&l.OutOfOrderTimeWindow, "ingester.out-of-order-time-window", "Non-zero value enables out-of-order support for most recent samples that are within the time window in relation to the following two conditions: (1) The newest sample for that time series, if it exists. For example, within [series.maxTime-timeWindow, series.maxTime]). (2) The TSDB's maximum time, if the series does not exist. For example, within [db.maxTime-timeWindow, db.maxTime]). The ingester will need more memory as a factor of _rate of out-of-order samples being ingested_ and _the number of series that are getting out-of-order samples_. You can configure it per tenant.") f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and storage. This limit is enforced in the querier and ruler. 0 to disable") @@ -499,6 +502,11 @@ func (o *Overrides) ActiveSeriesCustomTrackersConfig(userID string) activeseries return o.getOverridesForUser(userID).ActiveSeriesCustomTrackersConfig } +// OutOfOrderTimeWindow returns the out-of-order time window for the user. +func (o *Overrides) OutOfOrderTimeWindow(userID string) model.Duration { + return o.getOverridesForUser(userID).OutOfOrderTimeWindow +} + // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { return o.getOverridesForUser(userID).IngestionTenantShardSize