Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow RerouteService to reroute at lower priority #44338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
// assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(reason, ActionListener.wrap(
rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.ArrayList;
Expand Down Expand Up @@ -150,7 +151,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
results.success(joinTask);
}
if (nodesChanged) {
rerouteService.reroute("post-join reroute", ActionListener.wrap(
rerouteService.reroute("post-join reroute", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;

/**
Expand All @@ -49,7 +50,8 @@ public class BatchedRerouteService implements RerouteService {

private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners;
private List<ActionListener<Void>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID;

/**
* @param reroute Function that computes the updated cluster state after it has been rerouted.
Expand All @@ -63,29 +65,55 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
* Initiates a reroute.
*/
@Override
public final void reroute(String reason, ActionListener<Void> listener) {
final PlainListenableActionFuture<Void> currentListeners;
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
final List<ActionListener<Void>> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
logger.trace("already has pending reroute, adding [{}] to batch", reason);
pendingRerouteListeners.addListener(listener);
return;
if (priority.sameOrAfter(pendingTaskPriority)) {
logger.trace("already has pending reroute at priority [{}], adding [{}] with priority [{}] to batch",
pendingTaskPriority, reason, priority);
pendingRerouteListeners.add(listener);
return;
} else {
logger.trace("already has pending reroute at priority [{}], promoting batch to [{}] and adding [{}]",
pendingTaskPriority, priority, reason);
currentListeners = new ArrayList<>(1 + pendingRerouteListeners.size());
currentListeners.add(listener);
currentListeners.addAll(pendingRerouteListeners);
pendingRerouteListeners.clear();
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
}
} else {
logger.trace("no pending reroute, scheduling reroute [{}] at priority [{}]", reason, priority);
currentListeners = new ArrayList<>(1);
currentListeners.add(listener);
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
}
currentListeners = PlainListenableActionFuture.newListenableFuture();
currentListeners.addListener(listener);
pendingRerouteListeners = currentListeners;
}
logger.trace("rerouting [{}]", reason);
try {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
new ClusterStateUpdateTask(Priority.HIGH) {
new ClusterStateUpdateTask(priority) {

@Override
public ClusterState execute(ClusterState currentState) {
final boolean currentListenersArePending;
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners)
: "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners;
currentListenersArePending = pendingRerouteListeners == currentListeners;
if (currentListenersArePending) {
pendingRerouteListeners = null;
}
}
if (currentListenersArePending) {
logger.trace("performing batched reroute [{}]", reason);
return reroute.apply(currentState, reason);
} else {
logger.trace("batched reroute [{}] was promoted", reason);
return currentState;
}
return reroute.apply(currentState, reason);
}

@Override
Expand All @@ -95,7 +123,7 @@ public void onNoLongerMaster(String source) {
pendingRerouteListeners = null;
}
}
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
ActionListener.onFailure(currentListeners, new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again
}

Expand All @@ -114,22 +142,26 @@ public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
source, state.version()), e);
}
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
currentListeners.onResponse(null);
ActionListener.onResponse(currentListeners, null);
}
});
} catch (Exception e) {
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners);
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
ClusterState state = clusterService.state();
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Priority;

/**
* Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate.
*/
@FunctionalInterface
public interface RerouteService {
void reroute(String reason, ActionListener<Void> listener);

/**
* Schedule a cluster reroute.
* @param priority the (minimum) priority at which to run this reroute. If there is already a pending reroute at a higher priority then
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority.
*/
void reroute(String reason, Priority priority, ActionListener<Void> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -185,7 +186,7 @@ public void onNewInfo(ClusterInfo info) {

if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> {
rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/elasticsearch/common/Priority.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ public static Priority fromByte(byte b) {
this.value = value;
}

/**
* @return whether tasks of {@code this} priority will run after those of priority {@code p}.
* For instance, {@code Priority.URGENT.after(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean after(Priority p) {
return this.compareTo(p) > 0;
}

/**
* @return whether tasks of {@code this} priority will run no earlier than those of priority {@code p}.
* For instance, {@code Priority.URGENT.sameOrAfter(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean sameOrAfter(Priority p) {
return this.compareTo(p) >= 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -106,6 +107,8 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
}

public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand All @@ -127,8 +130,10 @@ protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
*/
public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
if (unassignedShard.primary()) {
assert primaryShardAllocator != null;
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaShardAllocator != null;
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}
Expand All @@ -142,7 +147,8 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
@Override
protected void reroute(ShardId shardId, String reason) {
logger.trace("{} scheduling reroute for {}", shardId, reason);
rerouteService.reroute("async_shard_fetch", ActionListener.wrap(
assert rerouteService != null;
rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testJoinDeduplication() {
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {});
Collections.emptyList(), (s, p, r) -> {});
transportService.start();

DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {}); // registers request handler
Collections.emptyList(), (s, p, r) -> {}); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ transportService, writableRegistry(),
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;
Expand Down
Loading