-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ephemeral storage to ingester #3922
Conversation
9b2dfff
to
e36736e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc bit looks fine.
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
2c4584e
to
587aaeb
Compare
|
||
level.Debug(spanlog).Log("event", "got appender for ephemeral series", "ephemeralSeries", len(req.EphemeralTimeseries)) | ||
|
||
err = i.pushSamplesToAppender(userID, req.EphemeralTimeseries, ephemeralApp, startAppend, &ephemeralStats, updateFirstPartial, nil, 0, minAppendTimeAvailable, minAppendTime, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the future we might want to consider accepting out-of-order samples here as well, but yeah let's ignore that for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a quick look it seems to be that we would need to enable WBL for that, but I may be wrong. I plan to explore that option later, after we have the basics down.
cortex_discarded_samples_total{group="",reason="sample-out-of-order",user="test"} 1 | ||
`, | ||
}, | ||
"request with mix of ephemeral and persistent series, with some good and some bad samples plus some metadata": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this helper func which i added in this PR you could generate such mixed persistent/ephemeral requests, but not sure if it's worth it to copy this over:
It can be used like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, when it's in, I will start using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks great
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
I would add a CHANGELOG entry and mention metrics too. Edit: I didn't read the next line "Changelog will be added later after the feature is finished.". LGTM. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solid work! I left few comment and questions. Thanks!
# CLI flag: -ingester.ephemeral-series-retention-period | ||
[ephemeral_series_retention_period: <duration> | default = 10m] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have expected to see this close to -blocks-storage.tsdb.retention-period
. Have you considered having something like -blocks-storage.ephemeral-tsdb.retention-period
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Funny that you ask. I originally started with similar flag, but it then found metadata retention and copied that instead. I don't have strong opinion about naming, I will switch to -blocks-storage.ephemeral-tsdb.retention-period
.
pkg/ingester/ingester.go
Outdated
@@ -306,6 +312,11 @@ func New(cfg Config, limits *validation.Overrides, activeGroupsCleanupService *u | |||
Help: "The current number of series in memory.", | |||
}, i.getMemorySeriesMetric) | |||
|
|||
promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{ | |||
Name: "cortex_ingester_ephemeral_series", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Have you considered adding a memory_
to clarify they're in-memory? Similar to cortex_ingester_memory_series
.
Name: "cortex_ingester_ephemeral_series", | |
Name: "cortex_ingester_memory_ephemeral_series", |
pkg/ingester/ingester.go
Outdated
@@ -701,7 +712,7 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) ( | |||
|
|||
// Keep track of some stats which are tracked only if the samples will be | |||
// successfully committed | |||
stats pushStats | |||
stats, ephemeralStats pushStats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] What if we rename stats
to persistentStats
? Is that how we're going to call the standard TSDB storage, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Yes, we use "persistent storage" to refer to original TSDB-based storage.
pkg/ingester/ingester.go
Outdated
rollback := func() { | ||
if persistentApp != nil { | ||
if err := persistentApp.Rollback(); err != nil { | ||
level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I would clarify it.
level.Warn(i.logger).Log("msg", "failed to rollback on error", "user", userID, "err", err) | |
level.Warn(i.logger).Log("msg", "failed to rollback persistent appender on error", "user", userID, "err", err) |
pkg/ingester/ingester.go
Outdated
if len(req.Timeseries) > 0 { | ||
persistentApp = db.Appender(ctx).(extendedAppender) | ||
|
||
level.Debug(spanlog).Log("event", "got appender", "series", len(req.Timeseries)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
level.Debug(spanlog).Log("event", "got appender", "series", len(req.Timeseries)) | |
level.Debug(spanlog).Log("event", "got persistent appender", "series", len(req.Timeseries)) |
return nil, wrapWithUser(err, userID) | ||
if persistentApp != nil { | ||
if err := persistentApp.Commit(); err != nil { | ||
rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll try to rollback persistentApp
too (after Commit()
was called and failed). Is it safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation for Commit()
says:
If Commit returns a non-nil error, it also rolls back all modifications made in the appender so far, as Rollback would do.
In any case, an Appender must not be used anymore after Commit has been called.
Which tells me that 1) Rollback is automatic in case of errors, 2) we should not use the appender. Let's clear it here. Same for ephemeralApp
below.
pkg/ingester/ingester.go
Outdated
db.updatedRatesFromStats(stats.succeededSamplesCount, req.Source) | ||
db.updatedRatesFromStats(ephemeralStats.succeededSamplesCount, req.Source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this new function and can't keep the metrics update in updateMetricsFromPushStats()
, passing the req.Source
there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to pass db
there. We can do that too.
pkg/ingester/ingester.go
Outdated
IsolationDisabled: true, | ||
} | ||
|
||
// We need to set this, despite OOO time window being 0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why we need to set it to i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some value, because Head checks that it's set, but it won't be used as we don't enable OOO. I used i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax
to get some valid value. I will switch it to tsdb.DefaultOutOfOrderCapMax
.
pkg/ingester/ingester.go
Outdated
// We need to set this, despite OOO time window being 0. | ||
headOptions.OutOfOrderCapMax.Store(int64(i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapacityMax)) | ||
|
||
h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "ephemeral", "true"), nil, nil, headOptions, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Have you considered "storage=ephemeral" instead? And then wrap the logger passed to tsdb.Open()
with storage=persistent
?
h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "ephemeral", "true"), nil, nil, headOptions, nil) | |
h, err := tsdb.NewHead(prometheus.WrapRegistererWithPrefix(ephemeralPrometheusMetricsPrefix, tsdbPromReg), log.With(userLogger, "storage", "ephemeral"), nil, nil, headOptions, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
pkg/ingester/ingester.go
Outdated
@@ -2011,7 +2126,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util. | |||
|
|||
default: | |||
reason = "regular" | |||
err = userDB.Compact() | |||
err = userDB.Compact(time.Now().Add(-i.cfg.EphemeralSeriesRetentionPeriod)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered storing EphemeralSeriesRetentionPeriod
in userTSDB
instead? It's a bit weird reading the function call here. It makes you think it just compacts the ephemeral TSDB, while it compacts both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
From PR description:
I think you meant cortex_ingester_ephemeral_series, right?
This is not new, but I can't find the ephemeral counterpart. Where is it defined? |
Yes, that is already on the list.
There is no ephemeral counterpart for number of users, but I think it's a good idea to add it. Let me to that in this PR. (I mistakenly included |
…reuse values from TSDBConfig. Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Thank you both for your reviews! I believe I've addressed all your feedback now. |
I forgot to add this part. Will do that tomorrow.
|
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
I've added this, it's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have re-read it, thanks for addressing all the feedback
… series are ingested. Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
Thanks Mauro, I've addressed your latest feedback. I've also added new test |
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent job! LGTM 👏
i.metrics.memEphemeralUsers.Inc() | ||
|
||
// Don't allow ingestion of old samples into ephemeral storage. | ||
h.SetMinValidTime(time.Now().Add(-i.cfg.BlocksStorageConfig.EphemeralTSDB.Retention).UnixMilli()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non blocking, but I would like to talk more about this] Is this actually correct? What if a customer sends samples with an older timestamp (e.g. while catching up after an outage)? Also we should be consistent with any check done in Push(): we allow any timestamp there (up to TSDB out-of-bounds limits).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a customer sends samples with an older timestamp (e.g. while catching up after an outage)?
Yes, that's unfortunately how "ephemeral" storage will work.
Also we should be consistent with any check done in Push(): we allow any timestamp there (up to TSDB out-of-bounds limits).
We are consistent, because we're updating minValidTime
of ephemeral storage periodically via *userDB.Compact
where we call Truncate
on ephemeral storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
What this PR does
This PR adds ephemeral storage to ingester. This storage is first created when write request contains ephemeral series. Ephemeral series from write request are then appended to the ephemeral storage.
This PR does include new metrics, but also reuses some existing metrics: specifically "reasons" in
cortex_discarded_samples_total
do not distinguish between samples for ephemeral series or "persistent" series, and metrics related to adding samples to appender were reused (so append and commit times include both persistent and ephemeral appenders).List of new metrics is:
cortex_ingester_ingested_ephemeral_samples_total
cortex_ingester_ingested_ephemeral_samples_failures_total
cortex_ingester_ephemeral_series
(existed before)cortex_ingester_memory_series
(existed before)cortex_ingester_memory_users
cortex_ingester_ephemeral_series_created_total
cortex_ingester_ephemeral_series_removed_total
cortex_ingester_memory_ephemeral_users
This PR does not address limits or querying of ephemeral storage. This will be focus of subsequent PRs.
Changelog will be added later after the feature is finished.
Which issue(s) this PR fixes or relates to
Part of #3884
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]