-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow force-merges to run in parallel on a node #69416
Conversation
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I left a few small comments.
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener) { | ||
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.run(listener, | ||
() -> { | ||
if (task instanceof CancellableTask && ((CancellableTask)task).isCancelled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task
is never a CancellableTask
here is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not right now. Given that there was already a general integration of cancellation in the base class, and that with my change here it might introduce a pitfall for cancelling force-merges in case we ever activated cancellation at the level of BroadcastRequest
(which is my plan), I decided to keep that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer assert (task instanceof CancellableTask) == false
for now, otherwise we'll forget to remove this if
when a force-merge shard task becomes cancellable and then we'll wonder why it's here some years later.
...ugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java
Show resolved
Hide resolved
List<BroadcastShardOperationFailedException> accumulatedExceptions = new ArrayList<>(); | ||
List<ShardOperationResult> results = new ArrayList<>(); | ||
for (int i = 0; i < totalShards; i++) { | ||
if (shardResultOrExceptions[i] instanceof BroadcastShardOperationFailedException) { | ||
accumulatedExceptions.add((BroadcastShardOperationFailedException) shardResultOrExceptions[i]); | ||
if (shardResultOrExceptions.get(i) instanceof TaskCancelledException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just check the task for cancellation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, changed in b0cc8f3
logger.trace("[{}] executing operation for shard [{}]", actionName, shardRouting.shortSummary()); | ||
} | ||
final Consumer<Exception> failureHandler = e -> { | ||
if (e instanceof TaskCancelledException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we just checked the task for cancellation in finishHim
, I think we wouldn't need this special case handling: we could just treat it like any other shard-level exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++, fixed in b0cc8f3
...ain/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
Show resolved
Hide resolved
...ain/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
Outdated
Show resolved
Hide resolved
...ain/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I left one optional request
ActionListener<TransportBroadcastByNodeAction.EmptyResult> listener) { | ||
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.run(listener, | ||
() -> { | ||
if (task instanceof CancellableTask && ((CancellableTask)task).isCancelled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer assert (task instanceof CancellableTask) == false
for now, otherwise we'll forget to remove this if
when a force-merge shard task becomes cancellable and then we'll wonder why it's here some years later.
Increasing the number of threads to be used for force-merging does not automatically give you any parallelism, even if you have many shards per node, as force-merge requests are split into node-level subrequests (see TransportBroadcastByNodeAction, superclass of TransportForceMergeAction), one for each node, and these subrequests are then executing sequentially for all the shards on that node.
Increasing the number of threads to be used for force-merging does not automatically give you any parallelism, even if
you have many shards per node, as force-merge requests are split into node-level subrequests (see
TransportBroadcastByNodeAction, superclass of TransportForceMergeAction), one for each node, and these
subrequests are then executing sequentially for all the shards on that node.
Closes #69327