Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ephemeral storage to ingester #3922

Merged
merged 18 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,17 @@
"fieldFlag": "ingester.ignore-series-limit-for-metric-names",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "ephemeral_series_retention_period",
"required": false,
"desc": "Retention of ephemeral series.",
"fieldValue": null,
"fieldDefaultValue": 600000000000,
"fieldFlag": "ingester.ephemeral-series-retention-period",
"fieldType": "duration",
"fieldCategory": "advanced"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ingester.client.tls-server-name string
Override the expected name on the server certificate.
-ingester.ephemeral-series-retention-period duration
Retention of ephemeral series. (default 10m0s)
-ingester.ignore-series-limit-for-metric-names string
Comma-separated list of metric names, for which the -ingester.max-global-series-per-metric limit will be ignored. Does not affect the -ingester.max-global-series-per-user limit.
-ingester.instance-limits.max-inflight-push-requests int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,10 @@ instance_limits:
# the -ingester.max-global-series-per-user limit.
# CLI flag: -ingester.ignore-series-limit-for-metric-names
[ignore_series_limit_for_metric_names: <string> | default = ""]

# (advanced) Retention of ephemeral series.
# CLI flag: -ingester.ephemeral-series-retention-period
[ephemeral_series_retention_period: <duration> | default = 10m]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected to see this close to -blocks-storage.tsdb.retention-period. Have you considered having something like -blocks-storage.ephemeral-tsdb.retention-period?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny that you ask. I originally started with similar flag, but it then found metadata retention and copied that instead. I don't have strong opinion about naming, I will switch to -blocks-storage.ephemeral-tsdb.retention-period.

```

### querier
Expand Down
197 changes: 156 additions & 41 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ const (
tenantsWithOutOfOrderEnabledStatName = "ingester_ooo_enabled_tenants"
minOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_min_window"
maxOutOfOrderTimeWindowSecondsStatName = "ingester_ooo_max_window"

// Prefix used in Prometheus registry for ephemeral storage.
ephemeralPrometheusMetricsPrefix = "ephemeral_"
)

// BlocksUploader interface is used to have an easy way to mock it in tests.
Expand Down Expand Up @@ -148,6 +151,8 @@ type Config struct {
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"`

EphemeralSeriesRetentionPeriod time.Duration `yaml:"ephemeral_series_retention_period" category:"advanced"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.DefaultLimits.RegisterFlags(f)

f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which the -ingester.max-global-series-per-metric limit will be ignored. Does not affect the -ingester.max-global-series-per-user limit.")
f.DurationVar(&cfg.EphemeralSeriesRetentionPeriod, "ingester.ephemeral-series-retention-period", 10*time.Minute, "Retention of ephemeral series.")
}

func (cfg *Config) getIgnoreSeriesLimitForMetricNamesMap() map[string]struct{} {
Expand Down Expand Up @@ -306,6 +312,11 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u
Help: "The current number of series in memory.",
}, i.getMemorySeriesMetric)

promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_ephemeral_series",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Have you considered adding a memory_ to clarify they're in-memory? Similar to cortex_ingester_memory_series .

Suggested change
Name: "cortex_ingester_ephemeral_series",
Name: "cortex_ingester_memory_ephemeral_series",

Help: "The current number of ephemeral series in memory.",
}, i.getEphemeralSeriesMetric)

promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_ingester_oldest_unshipped_block_timestamp_seconds",
Help: "Unix timestamp of the oldest TSDB block not shipped to the storage yet. 0 if ingester has no blocks or all blocks have been shipped.",
Expand Down Expand Up @@ -678,7 +689,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
}

// Early exit if no timeseries in request - don't create a TSDB or an appender.
if len(req.Timeseries) == 0 {
if len(req.Timeseries) == 0 && len(req.EphemeralTimeseries) == 0 {
return &mimirpb.WriteResponse{}, nil
}

Expand All @@ -701,7 +712,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (

// Keep track of some stats which are tracked only if the samples will be
// successfully committed
stats pushStats
stats, ephemeralStats pushStats
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] What if we rename stats to persistentStats? Is that how we're going to call the standard TSDB storage, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Yes, we use "persistent storage" to refer to original TSDB-based storage.


minAppendTime, minAppendTimeAvailable = db.Head().AppendableMinValidTime()

Expand All @@ -714,16 +725,54 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
)

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)
level.Debug(spanlog).Log("event", "got appender", "numSeries", len(req.Timeseries))
var persistentApp, ephemeralApp extendedAppender

var activeSeries *activeseries.ActiveSeries
if i.cfg.ActiveSeriesMetricsEnabled {
activeSeries = db.activeSeries
rollback := func() {
if persistentApp != nil {
if err := persistentApp.Rollback(); err != nil {
level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] I would clarify it.

Suggested change
level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", err)
level.Warn(i.logger).Log("msg", "failed to rollback persistent appender on error", "user", userID, "err", err)

}
}
if ephemeralApp != nil {
if err := ephemeralApp.Rollback(); err != nil {
level.Warn(i.logger).Log("msg", "failed to rollback ephemeral appender on error", "user", userID, "err", err)
}
}
}
err = i.pushSamplesToAppender(userID, req.Timeseries, app, startAppend, &stats, updateFirstPartial, activeSeries, i.limits.OutOfOrderTimeWindow(userID), minAppendTimeAvailable, minAppendTime)
if err != nil {
return nil, err

if len(req.Timeseries) > 0 {
persistentApp = db.Appender(ctx).(extendedAppender)

level.Debug(spanlog).Log("event", "got appender", "series", len(req.Timeseries))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Debug(spanlog).Log("event", "got appender", "series", len(req.Timeseries))
level.Debug(spanlog).Log("event", "got persistent appender", "series", len(req.Timeseries))


var activeSeries *activeseries.ActiveSeries
if i.cfg.ActiveSeriesMetricsEnabled {
activeSeries = db.activeSeries
}
err = i.pushSamplesToAppender(userID, req.Timeseries, persistentApp, startAppend, &stats, updateFirstPartial, activeSeries, i.limits.OutOfOrderTimeWindow(userID), minAppendTimeAvailable, minAppendTime, true)
if err != nil {
rollback()
return nil, err
}
}

if len(req.EphemeralTimeseries) > 0 {
a, err := db.EphemeralAppender(ctx)
if err != nil {
// TODO: handle error caused by limit (ephemeral storage disabled), and report it via firstPartialErr instead.
rollback()
return nil, err
}

ephemeralApp = a.(extendedAppender)

level.Debug(spanlog).Log("event", "got appender for ephemeral series", "ephemeralSeries", len(req.EphemeralTimeseries))

err = i.pushSamplesToAppender(userID, req.EphemeralTimeseries, ephemeralApp, startAppend, &ephemeralStats, updateFirstPartial, nil, 0, minAppendTimeAvailable, minAppendTime, false)
Copy link
Contributor

@replay replay Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future we might want to consider accepting out-of-order samples here as well, but yeah let's ignore that for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a quick look it seems to be that we would need to enable WBL for that, but I may be wrong. I plan to explore that option later, after we have the basics down.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minAppendTimeAvailable, minAppendTime are based on the persistent TSDB. Is it correct using them here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I misread the logic in pushSamplesToAppender and thought they won't be used if OOO time window is 0, but it's exactly the opposite. Fixing to use minAppendTime from ephemeral storage instead.

if err != nil {
rollback()
return nil, err
}
}

// At this point all samples have been added to the appender, so we can track the time it took.
Expand All @@ -735,19 +784,37 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
"failedSamplesCount", stats.failedSamplesCount,
"succeededExemplarsCount", stats.succeededExemplarsCount,
"failedExemplarsCount", stats.failedExemplarsCount,

"ephemeralSucceededSamplesCount", ephemeralStats.succeededSamplesCount,
"ephemeralFailedSamplesCount", ephemeralStats.failedSamplesCount,
"ephemeralSucceededExemplarsCount", ephemeralStats.succeededExemplarsCount,
"ephemeralFailedExemplarsCount", ephemeralStats.failedExemplarsCount,
)

startCommit := time.Now()
if err := app.Commit(); err != nil {
return nil, wrapWithUser(err, userID)
if persistentApp != nil {
if err := persistentApp.Commit(); err != nil {
rollback()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll try to rollback persistentApp too (after Commit() was called and failed). Is it safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation for Commit() says:

If Commit returns a non-nil error, it also rolls back all modifications made in the appender so far, as Rollback would do.
In any case, an Appender must not be used anymore after Commit has been called.

Which tells me that 1) Rollback is automatic in case of errors, 2) we should not use the appender. Let's clear it here. Same for ephemeralApp below.

return nil, wrapWithUser(err, userID)
}

persistentApp = nil // Disable rollback for this appender.
}
if ephemeralApp != nil {
if err := ephemeralApp.Commit(); err != nil {
rollback()
return nil, wrapWithUser(err, userID)
}

ephemeralApp = nil // Disable rollback for this appender.
}

commitDuration := time.Since(startCommit)
i.metrics.appenderCommitDuration.Observe(commitDuration.Seconds())
level.Debug(spanlog).Log("event", "complete commit", "commitDuration", commitDuration.String())

// If only invalid samples are pushed, don't change "last update", as TSDB was not modified.
if stats.succeededSamplesCount > 0 {
if stats.succeededSamplesCount > 0 || ephemeralStats.succeededSamplesCount > 0 {
db.setLastUpdate(time.Now())
}

Expand All @@ -761,8 +828,31 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
i.appendedSamplesStats.Inc(int64(stats.succeededSamplesCount))
i.appendedExemplarsStats.Inc(int64(stats.succeededExemplarsCount))

i.metrics.ephemeralIngestedSamples.WithLabelValues(userID).Add(float64(ephemeralStats.succeededSamplesCount))
i.metrics.ephemeralIngestedSamplesFail.WithLabelValues(userID).Add(float64(ephemeralStats.failedSamplesCount))
i.appendedSamplesStats.Inc(int64(ephemeralStats.succeededSamplesCount))

group := i.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(i.limits, userID, req.Timeseries), startAppend)

i.updateMetricsFromPushStats(userID, group, &stats)
i.updateMetricsFromPushStats(userID, group, &ephemeralStats)

db.updatedRatesFromStats(stats.succeededSamplesCount, req.Source)
db.updatedRatesFromStats(ephemeralStats.succeededSamplesCount, req.Source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this new function and can't keep the metrics update in updateMetricsFromPushStats(), passing the req.Source there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to pass db there. We can do that too.


if firstPartialErr != nil {
code := http.StatusBadRequest
var ve *validationError
if errors.As(firstPartialErr, &ve) {
code = ve.code
}
return &mimirpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error())
}

return &mimirpb.WriteResponse{}, nil
}

func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats *pushStats) {
if stats.sampleOutOfBoundsCount > 0 {
i.metrics.discardedSamplesSampleOutOfBounds.WithLabelValues(userID, group).Add(float64(stats.sampleOutOfBoundsCount))
}
Expand All @@ -783,34 +873,14 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
}
if stats.succeededSamplesCount > 0 {
i.ingestionRate.Add(int64(stats.succeededSamplesCount))

switch req.Source {
case mimirpb.RULE:
db.ingestedRuleSamples.Add(int64(stats.succeededSamplesCount))
case mimirpb.API:
fallthrough
default:
db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount))
}
}

if firstPartialErr != nil {
code := http.StatusBadRequest
var ve *validationError
if errors.As(firstPartialErr, &ve) {
code = ve.code
}
return &mimirpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error())
}

return &mimirpb.WriteResponse{}, nil
}

// pushSamplesToAppender appends samples and exemplars to the appender. Most errors are handled via updateFirstPartial function,
// but in case of unhandled errors, appender is rolled back and such error is returned.
func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time,
stats *pushStats, updateFirstPartial func(errFn func() error), activeSeries *activeseries.ActiveSeries,
outOfOrderWindow model.Duration, minAppendTimeAvailable bool, minAppendTime int64) error {
outOfOrderWindow model.Duration, minAppendTimeAvailable bool, minAppendTime int64, appendExemplars bool) error {
for _, ts := range timeseries {
// The labels must be sorted (in our case, it's guaranteed a write request
// has sorted labels once hit the ingester).
Expand Down Expand Up @@ -901,11 +971,6 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
continue
}

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
}

return wrapWithUser(err, userID)
}

Expand All @@ -916,7 +981,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
})
}

if len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 {
if appendExemplars && len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 {
// app.AppendExemplar currently doesn't create the series, it must
// already exist. If it does not then drop.
if ref == 0 {
Expand Down Expand Up @@ -1673,6 +1738,36 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
}

i.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)

userDB.ephemeralFactory = func() (*tsdb.Head, error) {
// TODO: check user limit for ephemeral series. If it's 0, don't create head and return error.

headOptions := &tsdb.HeadOptions{
ChunkRange: i.cfg.EphemeralSeriesRetentionPeriod.Milliseconds(),
ChunkDirRoot: filepath.Join(udir, "ephemeral_chunks"),
ChunkWriteBufferSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteBufferSize,
ChunkEndTimeVariance: 0,
ChunkWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize,
SeriesCallback: nil, // TODO: handle limits.
EnableExemplarStorage: false,
EnableMemorySnapshotOnShutdown: false,
IsolationDisabled: true,
}

// We need to set this, despite OOO time window being 0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need to set it to i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some value, because Head checks that it's set, but it won't be used as we don't enable OOO. I used i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax to get some valid value. I will switch it to tsdb.DefaultOutOfOrderCapMax.

headOptions.OutOfOrderCapMax.Store(int64(i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax))

h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "ephemeral", "true"), nil, nil, headOptions, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[question] Have you considered "storage=ephemeral" instead? And then wrap the logger passed to tsdb.Open() with storage=persistent?

Suggested change
h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "ephemeral", "true"), nil, nil, headOptions, nil)
h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "storage", "ephemeral"), nil, nil, headOptions, nil)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

if err != nil {
return nil, err
}

// Don't allow ingestion of old samples into ephemeral storage.
h.SetMinValidTime(time.Now().Add(-i.cfg.EphemeralSeriesRetentionPeriod).UnixMilli())
return h, nil
}

return userDB, nil
}

Expand Down Expand Up @@ -1832,6 +1927,26 @@ func (i *Ingester) getMemorySeriesMetric() float64 {
return float64(count)
}

// getEphemeralSeriesMetric returns the total number of in-memory series in ephemeral storage across all tenants.
func (i *Ingester) getEphemeralSeriesMetric() float64 {
if err := i.checkRunning(); err != nil {
return 0
}

i.tsdbsMtx.RLock()
defer i.tsdbsMtx.RUnlock()

count := uint64(0)
for _, db := range i.tsdbs {
eph := db.getEphemeralStorage()
if eph != nil {
count += eph.NumSeries()
}
}

return float64(count)
}

// getOldestUnshippedBlockMetric returns the unix timestamp of the oldest unshipped block or
// 0 if all blocks have been shipped.
func (i *Ingester) getOldestUnshippedBlockMetric() float64 {
Expand Down Expand Up @@ -2011,7 +2126,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.

default:
reason = "regular"
err = userDB.Compact()
err = userDB.Compact(time.Now().Add(-i.cfg.EphemeralSeriesRetentionPeriod))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered storing EphemeralSeriesRetentionPeriod in userTSDB instead? It's a bit weird reading the function call here. It makes you think it just compacts the ephemeral TSDB, while it compacts both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

if err != nil {
Expand Down
Loading