-
Notifications
You must be signed in to change notification settings - Fork 460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wait for owned series recomputation before lowering local series limit #7411
Conversation
e9e1b92
to
4edbf9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation changes look good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! The logic changes make sense to me, but I haven't reviewed tests yet. Also @pstibrany is more familiar with all such work, so would be great if you could review it too. Tomorrow I will review tests.
pkg/ingester/limiter.go
Outdated
@@ -135,7 +137,7 @@ func (l *Limiter) convertGlobalToLocalLimit(userShardSize int, globalLimit int) | |||
// Global limit is equally distributed among all the active zones. | |||
// The portion of global limit related to each zone is then equally distributed | |||
// among all the ingesters belonging to that zone. | |||
return int((float64(globalLimit*l.replicationFactor) / float64(zonesCount)) / float64(ingestersInZoneCount)) | |||
return max(int((float64(globalLimit*l.replicationFactor)/float64(zonesCount))/float64(ingestersInZoneCount)), minLocalLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] May be slightly easier to read:
return max(int((float64(globalLimit*l.replicationFactor)/float64(zonesCount))/float64(ingestersInZoneCount)), minLocalLimit) | |
desiredLocalLimit := int((float64(globalLimit*l.replicationFactor)/float64(zonesCount))/float64(ingestersInZoneCount)) | |
return max(desiredLocalLimit, minLocalLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would actually suggest moving this into convertGlobalToLocalLimitOrUnlimited
. This is not part of "conversion", and in my PR #7424 I'm introducing different conversion when partition-ring is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the max
up a level in 8ce33a75fdf7a0e5b0801e6be16fa6e392defdb5
pkg/ingester/user_tsdb.go
Outdated
@@ -289,8 +294,8 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error { | |||
} | |||
|
|||
// Total series limit. | |||
series, shards := u.getSeriesAndShardsForSeriesLimit() | |||
if !u.limiter.IsWithinMaxSeriesPerUser(u.userID, series, shards) { | |||
series, minLimit := u.getSeriesAndMinForSeriesLimit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] I suggest to be consistent with naming. It's called minLocalLimit
in other places and I suggest to call it minLocalLimit
here as well (it's clearer and keep consistency).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed in 82e5e8229ddce202770927c7eda43a06a502ac63
pkg/ingester/user_tsdb.go
Outdated
// series limit. | ||
func (u *userTSDB) getSeriesAndShardsForSeriesLimit() (int, int) { | ||
func (u *userTSDB) getSeriesAndMinForSeriesLimit() (int, int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] WDYT?
func (u *userTSDB) getSeriesAndMinForSeriesLimit() (int, int) { | |
func (u *userTSDB) getSeriesCountAndMinLocalLimit() (int, int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed in 5e79268c2ca3ab4227e0ff563420f507342c5dd0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also reviewed the tests. Nice job on unit tests! Not super excited about all the sleeps in the integration tests and I'm wondering if these new tests are really necessary (looks your unit tests are relatively high level anyway) or we can improve the integration tests to remove some of the sleeps.
@@ -419,6 +419,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) { | |||
ringReplicationFactor int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Can you rename the test function to TestLimiter_IsWithinMaxSeriesPerUser()
given that's what we test here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in cbd21dcd289e38b358f65326a809a4b1ef992849. I also renamed the other tests that were similarly misnamed.
pkg/ingester/owned_series_test.go
Outdated
// initial limit | ||
c.checkCalculatedLocalLimit(t, ownedServiceTestUserSeriesLimit, "") | ||
|
||
// we'd normally scale up all zones at once, but doing it one by one lets us |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'd normally scale up all zones at once
Scaling is never atomic. Ingesters are added to the ring at different times. So even if we increase the replicas count of different zones at different times, the new ingesters will register to the ring at slightly different time. That's just to say that your test LGTM, but I would clarify this comment to match what happens in reality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment applies below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests were deleted
pkg/ingester/owned_series_test.go
Outdated
for name, tc := range testCases { | ||
t.Run(name, func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are a bit slow. WDYT parallelising them?
for name, tc := range testCases { | |
t.Run(name, func(t *testing.T) { | |
for name, tc := range testCases { | |
tc := tc | |
t.Run(name, func(t *testing.T) { | |
t.Parallel() | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in 53836caef913ecb8305c79dd22d18c675a2ce8d6
integration/ingester_limits_test.go
Outdated
gen.Wait() | ||
|
||
// Wait for owned series service to run and metrics to get updated | ||
// Metrics are updated only every 15 seconds :/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the 15s hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that referring to 15s default value in -ingester.owned-series-update-interval
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We hard-code it here:
mimir/pkg/ingester/ingester.go
Line 667 in 16877ec
limitMetricsUpdateTicker := time.NewTicker(time.Second * 15) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, I've left some minor comments and suggestions.
pkg/ingester/limiter.go
Outdated
@@ -135,7 +137,7 @@ func (l *Limiter) convertGlobalToLocalLimit(userShardSize int, globalLimit int) | |||
// Global limit is equally distributed among all the active zones. | |||
// The portion of global limit related to each zone is then equally distributed | |||
// among all the ingesters belonging to that zone. | |||
return int((float64(globalLimit*l.replicationFactor) / float64(zonesCount)) / float64(ingestersInZoneCount)) | |||
return max(int((float64(globalLimit*l.replicationFactor)/float64(zonesCount))/float64(ingestersInZoneCount)), minLocalLimit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would actually suggest moving this into convertGlobalToLocalLimitOrUnlimited
. This is not part of "conversion", and in my PR #7424 I'm introducing different conversion when partition-ring is used.
pkg/ingester/owned_series_test.go
Outdated
// wait for the ingester to see the updated ring | ||
test.Poll(t, 2*c.ringHeartbeatPeriod, 2, func() interface{} { | ||
return c.ing.lifecycler.InstancesCount() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add these waits into register
methods? Simple idea is to check instances count before registration, and after, and check if it's before+1. It would simplify tests a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed these checks entirely, since the limiter no longer uses the lifecycler
c.updateOwnedSeriesAndCheckResult(t, false, 1, recomputeOwnedSeriesReasonShardSizeChanged) | ||
c.checkUpdateReasonForUser(t, "") | ||
c.checkUserSeriesOwnedAndShardsByTestedIngester(t, ownedServiceSeriesCount/2, 2) | ||
"shard size = 0, scale ingesters up and down": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or shard size = 100
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should i run a separate test with shard size >> number of ingesters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just for completeness, to make sure it doesn't break in the future.
pkg/ingester/owned_series_test.go
Outdated
@@ -374,6 +756,529 @@ func TestOwnedSeriesRingChanged(t *testing.T) { | |||
}) | |||
} | |||
|
|||
func TestOwnedSeriesLimiting(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test useful? Limiter already has its own limits, and we also check computed min-local limit in TestOwnedSeriesService
test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed -- deleted the test
cbd21dc
to
8a02111
Compare
47144ff
to
1a953b5
Compare
Signed-off-by: Peter Štibraný <pstibrany@gmail.com>
…iesPerUser_WithPartitionsRing
d3d975d
to
acc9668
Compare
Dismissing my own review because of recent changes that I still have to review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thank you!
require.NoError(t, err) | ||
|
||
strategy := newPartitionRingLimiterStrategy(&partitionRingHolder{pr: pr}, limits.IngestionTenantShardSize) | ||
strategy := newPartitionRingLimiterStrategy(&partitionRingHolder{pr: pr}, limits.IngestionPartitionsTenantShardSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this bug.
What this PR does
The existing owned series implementation prevents discards when the local series limit is lowered due to a shard size increase. It does this by caching the current shard size until after the owned series are recomputed, only using the new (higher) shard size once the computation successfully completes.
However, it does not prevent discards when the lowered local limit is due to an increase in ingester count (e.g.: when shuffle sharding is disabled); the lower local limit takes effect immediately, and samples are discarded until the next owned series computation (at most
ingester.owned-series-update-interval
seconds later). This is due to thelimiter
fetching the ingester count from the ring lifecycler directly, meaning it picks up changes before the owned series service has a change to run and update the owned series count.To address this, this PR caches the calculated local series limit on each tenant's
userDB
whenever owned series are recomputed, and uses this value as a minimum when calculating local limits during push requests. Because the cached value functions as a minimum, local limits are able to increase instantaneously, but can't be lowered until after the next owned series computation.Summary of changes:
pkg/ingester/limiter.go
:limiter
takesminLocalLimit
as a parameter instead ofuserShardSize
when calculating local limits. This is0
for every limit except for the user series limit.pkg/ingester/user_tsdb.go
: when we recompute the owned series count, we get the local series limit from the limiter and cache this value on theuserDB
. This value is passed as a minimum to the limiter when checking the series limit inPreCreation
.pkg/ingester/owned_series.go
: added a new owned series recompute reason,recomputeOwnedSeriesReasonLocalLimitChanged
, which is used when the user's calculated local series limit changes without any change to ingester or shard counts. This means that we'll recompute the owned series whenever ONLY the global series limit for a user changes, but this is necessary to correctly enforce lowered series limits (without this it wouldn't be possible to have a lowered series limit take effect until the user's actual subring changed).The bulk of the changes, by line count, are to tests:
ingesterRing
to the ingester creation helpers iningester_test.go
Which issue(s) this PR fixes or relates to
Fixes #5578
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.