-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch rollover cluster state updates. #79945
Conversation
c8f326a
to
bff9018
Compare
In cases where many indices are managed by ILM, it is likely that rollovers for different indices or data streams happen concurrently. This change allows the cluster state updates that these rollovers generate to be batch. This change also changes the rollover service to not do a reroute and instead perform a single reroute for multiple batched rollovers. Relates to elastic#77466 Closes to elastic#79782
bc35a15
to
41ba6e1
Compare
Pinging @elastic/es-data-management (Team:Data Management) |
Just a quick update here from our offline discussion on it earlier: This PR seems to work correctly. The remaining problem with it is that running a large number of index creates in a loop even without the reroute in the mix can take a very long time (as in many minutes) due to the mapping validation etc. via the temporary index services that we do. We are looking into a fix for this that avoids validating the same mapping over and over. => No need to review this yet. UPDATE: after further discussion and thought we decide the above is an acceptable trade-off in the short term. This change prevents a massive queue of rollovers breaking things like snapshotting for an extended period of time and saves a potentially significant amount of (master-)work in the cluster overall. The cases where this would run up a multi-minute task on the master are terminally broken without this change so it's an exclusive move in the right direction we believe and this is good for review now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg this looks really good. I have a few small question, one thing that needs fixing and one suggestion :) But this looks pretty close.
} | ||
} | ||
String reason = "bulk rollover [" | ||
+ tasks.stream().map(t -> t.sourceIndex.get() + "->" + t.rolloverIndex.get()).collect(Collectors.joining()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We gotta limit the output length here. See #79443 which you can probably reuse here easily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: cc0aeb9
String reason = "bulk rollover [" | ||
+ tasks.stream().map(t -> t.sourceIndex.get() + "->" + t.rolloverIndex.get()).collect(Collectors.joining()) | ||
+ "]"; | ||
state = allocationService.reroute(state, reason); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a situation where the rollovers were noops? Maybe check for an unchanged state before triggering reroute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed: 7fadeb8
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { | ||
// Now assuming we have a new state and the name of the rolled over index, we need to wait for the | ||
// configured number of active shards, as well as return the names of the indices that were rolled/created | ||
if (conditionsMet.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a noop task here by any chance and have both states equal, do we have to account for that somehow? It seems no, but just double checking :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this be may be true if something changed between the trail rollover and actual rollover, that makes a condition go from true to false. The if statement that I will add for the previous comment, will catch this, if it happens.
ClusterState state = currentState; | ||
for (RolloverTask task : tasks) { | ||
try { | ||
state = task.performRollover(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do wonder, aren't we just using the Metadata
here both as input and as we do the reroute later. Maybe we can get away with just loop over building Metadata
which would be slightly cheaper? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventually down in MetadataCreateIndexService#clusterStateCreateIndex(...)
we do update the routing table (adding unassigned shards iirc). This gets invoked from the MetadataRolloverService
. So I don't think this is possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this would require a little extra effort to do :) When I last profiled this it wasn't a big deal though, we can look into that kind of optimization later I supoose.
@@ -478,7 +478,14 @@ private ClusterState applyCreateIndexWithTemporaryService( | |||
); | |||
|
|||
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(), indexMetadata.getSettings()); | |||
return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer); | |||
BiFunction<ClusterState, String, ClusterState> rerouteFunction = (current, reason) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized this: we never mutate the request later. So we could just do:
rerouteFunction = request.performReroute() ? allocationService::reroute : (cs, reason) -> cs;
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done: 51077a9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one cleanup ask that'd be nice to resolve only :)
Also, this shouldn't go into 7.16 right? The state of settings+mappings dedup. in those versions doesn't really make it worth it at the scales that'll work with 7.16 and it's not a 100% risk-free change here obviously :)
In cases where many indices are managed by ILM, it is likely that rollovers for different indices or data streams happen concurrently. This change allows the cluster state updates that these rollovers generate to be batch. This change also changes the rollover service to not do a reroute and instead perform a single reroute for multiple batched rollovers. Relates to elastic#77466 Closes to elastic#79782
💚 Backport successful
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I were reviewing this as it got merged but thought I would drop my drive by comments anway. Can we add more testing here, to check that the batched case works (perhaps just an integ test doing X rollovers concurrently)?
@@ -80,6 +90,33 @@ public TransportRolloverAction( | |||
this.rolloverService = rolloverService; | |||
this.client = client; | |||
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); | |||
this.rolloverTaskExecutor = (currentState, tasks) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we turn this lambda into an explicit class? It makes it easier to reason about the state that it uses (should not depend on local state).
final List<Condition<?>> trialMetConditions = rolloverRequest.getConditions() | ||
.values() | ||
.stream() | ||
.filter(condition -> trialConditionResults.get(condition.toString())) | ||
.collect(Collectors.toList()); | ||
|
||
final RolloverResponse trailRolloverResponse = new RolloverResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be trial
, not trail
?
private final RolloverResponse trialRolloverResponse; | ||
private final ActionListener<RolloverResponse> listener; | ||
|
||
private final AtomicBoolean conditionsMet = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this not be a plain boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I wonder if changed
or performedRollover
were better names - since this is used to figure out if anything changed when the clusterStateProcessed
method is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be a boolean, but it should be volatile? I'm not 100% sure whether performRollover() method is executed by the same thread as clusterStateProcessed() method.
I like the name clusterStateProcessed
and will change it to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure whether performRollover() method is executed by the same thread as clusterStateProcessed() method.
I think it is executed by the same thread. I will change it to be an ordinary boolean.
In cases where many indices are managed by ILM, it is likely that rollovers for different indices or data streams happen concurrently. This change allows the cluster state updates that these rollovers generate to be batch. This change also changes the rollover service to not do a reroute and instead perform a single reroute for multiple batched rollovers. Relates to #77466 Closes to #79782
… test. (elastic#80397) Code cleanups around rollover executor in TransportRolloverAction and added an integration test that tests rollover concurrently. Relates to elastic#79945
In cases where many indices are managed by ILM,
it is likely that rollovers for different indices or data streams
happen concurrently. This change allows the cluster state updates
that these rollovers generate to be batch.
This change also changes the rollover service to not do a reroute and
instead perform a single reroute for multiple batched rollovers.
Relates to #77466
Closes to #79782